diff --git a/heidgaf_log_collector/batch_handler.py b/heidgaf_log_collector/batch_handler.py index ffc982b..32871fb 100644 --- a/heidgaf_log_collector/batch_handler.py +++ b/heidgaf_log_collector/batch_handler.py @@ -21,18 +21,25 @@ class KafkaBatchSender: def __init__(self, topic: str): - self._start_kafka_producer() - self.topic = topic self.messages = [] self.lock = Lock() self.timer = None + self.kafka_producer = None + + def start_kafka_producer(self): + if self.kafka_producer: + logger.warning(f"Kafka Producer already running on {KAFKA_BROKER_HOST}:{KAFKA_BROKER_PORT}.") + return - def _start_kafka_producer(self): conf = {'bootstrap.servers': f"{KAFKA_BROKER_HOST}:{KAFKA_BROKER_PORT}"} self.kafka_producer = Producer(conf) def _send_batch(self): + if not self.kafka_producer: + logger.error(f"Kafka Producer not running!") + return + with self.lock: if self.messages: self.kafka_producer.produce( diff --git a/heidgaf_log_collector/tests/test_batch_handler.py b/heidgaf_log_collector/tests/test_batch_handler.py index 19c7910..535ac05 100644 --- a/heidgaf_log_collector/tests/test_batch_handler.py +++ b/heidgaf_log_collector/tests/test_batch_handler.py @@ -2,6 +2,7 @@ from threading import Lock from unittest.mock import patch, MagicMock +from heidgaf_log_collector import utils from heidgaf_log_collector.batch_handler import KafkaBatchSender # placeholders @@ -12,20 +13,14 @@ class TestInit(unittest.TestCase): - @patch('heidgaf_log_collector.batch_handler.Producer') - def test_init(self, mock_producer): - mock_producer_instance = MagicMock() - mock_producer.return_value = mock_producer_instance + def test_init(self): sender_instance = KafkaBatchSender(topic="test_topic") - expected_conf = {'bootstrap.servers': f"{KAFKA_BROKER_HOST}:{KAFKA_BROKER_PORT}"} - mock_producer.assert_called_once_with(expected_conf) - - self.assertIs(sender_instance.kafka_producer, mock_producer_instance) self.assertEqual(sender_instance.topic, "test_topic") self.assertEqual(sender_instance.messages, []) self.assertIsInstance(sender_instance.lock, type(Lock())) self.assertIsNone(sender_instance.timer) + self.assertIsNone(sender_instance.kafka_producer) class TestStartKafkaProducer(unittest.TestCase): @@ -35,11 +30,60 @@ def test_start_kafka_producer(self, mock_producer): mock_producer.return_value = mock_producer_instance sender_instance = KafkaBatchSender(topic="test_topic") + sender_instance.start_kafka_producer() + expected_conf = {'bootstrap.servers': f"{KAFKA_BROKER_HOST}:{KAFKA_BROKER_PORT}"} mock_producer.assert_called_once_with(expected_conf) self.assertIs(sender_instance.kafka_producer, mock_producer_instance) + @patch('heidgaf_log_collector.batch_handler.Producer') + def test_start_kafka_producer_already_running(self, mock_producer): + mock_producer_instance = MagicMock() + mock_producer.return_value = mock_producer_instance + sender_instance = KafkaBatchSender(topic="test_topic") + + sender_instance.kafka_producer = MagicMock() + + mock_producer.assert_not_called() + + +class TestSendBatch(unittest.TestCase): + @patch('heidgaf_log_collector.batch_handler.Producer') + def test_send_batch(self, mock_producer): + mock_producer_instance = MagicMock() + mock_producer.return_value = mock_producer_instance + sender_instance = KafkaBatchSender(topic="test_topic") + sender_instance._reset_timer = MagicMock() + sender_instance.kafka_producer = mock_producer_instance + + sender_instance.messages = ["message1", "message2"] + + sender_instance._send_batch() + + mock_producer_instance.produce.assert_called_once_with( + topic="test_topic", + key=None, + value=b'["message1", "message2"]', + callback=utils.kafka_delivery_report, + ) + + mock_producer_instance.flush.assert_called_once() + sender_instance._reset_timer.assert_called_once() + self.assertEqual(sender_instance.messages, []) + + def test_send_batch_no_producer(self): + sender_instance = KafkaBatchSender(topic="test_topic") + sender_instance._reset_timer = MagicMock() + sender_instance.kafka_producer = None + + sender_instance.messages = ["message1", "message2"] + + sender_instance._send_batch() + + sender_instance._reset_timer.assert_not_called() + self.assertEqual(sender_instance.messages, ["message1", "message2"]) + class TestClose(unittest.TestCase): @patch('heidgaf_log_collector.batch_handler.Timer')