Skip to content

Commit

Permalink
add test for sending in batch handler
Browse files Browse the repository at this point in the history
  • Loading branch information
lamr02n committed Jun 1, 2024
1 parent 3638d83 commit 3b7deae
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 11 deletions.
13 changes: 10 additions & 3 deletions heidgaf_log_collector/batch_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,25 @@

class KafkaBatchSender:
def __init__(self, topic: str):
self._start_kafka_producer()

self.topic = topic
self.messages = []
self.lock = Lock()
self.timer = None
self.kafka_producer = None

def start_kafka_producer(self):
if self.kafka_producer:
logger.warning(f"Kafka Producer already running on {KAFKA_BROKER_HOST}:{KAFKA_BROKER_PORT}.")
return

def _start_kafka_producer(self):
conf = {'bootstrap.servers': f"{KAFKA_BROKER_HOST}:{KAFKA_BROKER_PORT}"}
self.kafka_producer = Producer(conf)

def _send_batch(self):
if not self.kafka_producer:
logger.error(f"Kafka Producer not running!")
return

with self.lock:
if self.messages:
self.kafka_producer.produce(
Expand Down
60 changes: 52 additions & 8 deletions heidgaf_log_collector/tests/test_batch_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from threading import Lock
from unittest.mock import patch, MagicMock

from heidgaf_log_collector import utils
from heidgaf_log_collector.batch_handler import KafkaBatchSender

# placeholders
Expand All @@ -12,20 +13,14 @@


class TestInit(unittest.TestCase):
@patch('heidgaf_log_collector.batch_handler.Producer')
def test_init(self, mock_producer):
mock_producer_instance = MagicMock()
mock_producer.return_value = mock_producer_instance
def test_init(self):
sender_instance = KafkaBatchSender(topic="test_topic")

expected_conf = {'bootstrap.servers': f"{KAFKA_BROKER_HOST}:{KAFKA_BROKER_PORT}"}
mock_producer.assert_called_once_with(expected_conf)

self.assertIs(sender_instance.kafka_producer, mock_producer_instance)
self.assertEqual(sender_instance.topic, "test_topic")
self.assertEqual(sender_instance.messages, [])
self.assertIsInstance(sender_instance.lock, type(Lock()))
self.assertIsNone(sender_instance.timer)
self.assertIsNone(sender_instance.kafka_producer)


class TestStartKafkaProducer(unittest.TestCase):
Expand All @@ -35,11 +30,60 @@ def test_start_kafka_producer(self, mock_producer):
mock_producer.return_value = mock_producer_instance
sender_instance = KafkaBatchSender(topic="test_topic")

sender_instance.start_kafka_producer()

expected_conf = {'bootstrap.servers': f"{KAFKA_BROKER_HOST}:{KAFKA_BROKER_PORT}"}
mock_producer.assert_called_once_with(expected_conf)

self.assertIs(sender_instance.kafka_producer, mock_producer_instance)

@patch('heidgaf_log_collector.batch_handler.Producer')
def test_start_kafka_producer_already_running(self, mock_producer):
mock_producer_instance = MagicMock()
mock_producer.return_value = mock_producer_instance
sender_instance = KafkaBatchSender(topic="test_topic")

sender_instance.kafka_producer = MagicMock()

mock_producer.assert_not_called()


class TestSendBatch(unittest.TestCase):
@patch('heidgaf_log_collector.batch_handler.Producer')
def test_send_batch(self, mock_producer):
mock_producer_instance = MagicMock()
mock_producer.return_value = mock_producer_instance
sender_instance = KafkaBatchSender(topic="test_topic")
sender_instance._reset_timer = MagicMock()
sender_instance.kafka_producer = mock_producer_instance

sender_instance.messages = ["message1", "message2"]

sender_instance._send_batch()

mock_producer_instance.produce.assert_called_once_with(
topic="test_topic",
key=None,
value=b'["message1", "message2"]',
callback=utils.kafka_delivery_report,
)

mock_producer_instance.flush.assert_called_once()
sender_instance._reset_timer.assert_called_once()
self.assertEqual(sender_instance.messages, [])

def test_send_batch_no_producer(self):
sender_instance = KafkaBatchSender(topic="test_topic")
sender_instance._reset_timer = MagicMock()
sender_instance.kafka_producer = None

sender_instance.messages = ["message1", "message2"]

sender_instance._send_batch()

sender_instance._reset_timer.assert_not_called()
self.assertEqual(sender_instance.messages, ["message1", "message2"])


class TestClose(unittest.TestCase):
@patch('heidgaf_log_collector.batch_handler.Timer')
Expand Down

0 comments on commit 3b7deae

Please sign in to comment.