From f0c13e3f2406ab660ba6d3f5908af33873ed1d6d Mon Sep 17 00:00:00 2001 From: lamr02n <94921205+lamr02n@users.noreply.github.com> Date: Sat, 1 Jun 2024 19:03:44 +0200 Subject: [PATCH] update InspectPrefilter --- heidgaf_log_collector/batch_handler.py | 4 +- heidgaf_log_collector/collector.py | 3 +- heidgaf_log_collector/prefilter.py | 77 ++++++++++++++++++- .../tests/test_batch_handler.py | 4 + heidgaf_log_collector/tests/test_prefilter.py | 39 +++++++++- 5 files changed, 122 insertions(+), 5 deletions(-) diff --git a/heidgaf_log_collector/batch_handler.py b/heidgaf_log_collector/batch_handler.py index 9ba350b..6bf1754 100644 --- a/heidgaf_log_collector/batch_handler.py +++ b/heidgaf_log_collector/batch_handler.py @@ -22,14 +22,14 @@ def __init__(self, topic: str): self.lock = Lock() self.timer = None self.kafka_producer = None + self.conf = {'bootstrap.servers': f"{KAFKA_BROKER_HOST}:{KAFKA_BROKER_PORT}"} def start_kafka_producer(self): if self.kafka_producer: logger.warning(f"Kafka Producer already running on {KAFKA_BROKER_HOST}:{KAFKA_BROKER_PORT}.") return - conf = {'bootstrap.servers': f"{KAFKA_BROKER_HOST}:{KAFKA_BROKER_PORT}"} - self.kafka_producer = Producer(conf) + self.kafka_producer = Producer(self.conf) def _send_batch(self): if not self.kafka_producer: diff --git a/heidgaf_log_collector/collector.py b/heidgaf_log_collector/collector.py index b17d994..188e2ba 100644 --- a/heidgaf_log_collector/collector.py +++ b/heidgaf_log_collector/collector.py @@ -142,6 +142,7 @@ def _check_size(size: str): return False +# TODO: Test def main(): collector = LogCollector("127.0.0.1", 9998) @@ -153,7 +154,7 @@ def main(): except ValueError as e: logger.debug(e) except KeyboardInterrupt: - logger.info("Closing down.") + logger.info("Closing down LogCollector.") break finally: collector.clear_logline() diff --git a/heidgaf_log_collector/prefilter.py b/heidgaf_log_collector/prefilter.py index fe68d8b..3f1d901 100644 --- a/heidgaf_log_collector/prefilter.py +++ b/heidgaf_log_collector/prefilter.py @@ -1,6 +1,22 @@ +import ast import json +import logging +import os +import sys +from confluent_kafka import Consumer + +sys.path.append(os.getcwd()) # needed for Terminal execution from heidgaf_log_collector.batch_handler import KafkaBatchSender +from heidgaf_log_collector.config import * +from pipeline_prototype.logging_config import setup_logging + +setup_logging() +logger = logging.getLogger(__name__) + + +class KafkaMessageFetchError(Exception): + pass class InspectPrefilter: @@ -12,8 +28,38 @@ def __init__(self, error_type: str): self.batch_handler = KafkaBatchSender(topic="Inspect") self.batch_handler.start_kafka_producer() + self.kafka_consumer = None + self.conf = { + 'bootstrap.servers': f"{KAFKA_BROKER_HOST}:{KAFKA_BROKER_PORT}", + 'group.id': "my_group", # TODO: Do something with this + 'auto.offset.reset': 'earliest' # TODO: Do something with this + } + + def start_kafka_consumer(self): + if self.kafka_consumer: + logger.warning(f"Kafka Consumer already running!") + return + + self.kafka_consumer = Consumer(self.conf) + self.kafka_consumer.subscribe(['Inspect']) + + # TODO: Test def consume_and_extract_data(self): - pass + message = self.kafka_consumer.poll(timeout=1.0) + + if not message: + raise KafkaMessageFetchError("No message fetched from Kafka broker.") + + if message.error(): + raise IOError(f"An error occurred while consuming: {message.error()}") + + decoded_message = message.value().decode('utf-8') + json_from_message = json.loads(decoded_message) + + if self.unfiltered_data: + logger.warning("Overwriting existing data by new message.") + + self.unfiltered_data = ast.literal_eval(json_from_message) def filter_by_error(self): for e in self.unfiltered_data: @@ -29,3 +75,32 @@ def add_filtered_data_to_batch(self): def clear_data(self): self.unfiltered_data = [] self.filtered_data = [] + + +# TODO: Test +def main(): + prefilter = InspectPrefilter(error_type="NXDOMAIN") + prefilter.start_kafka_consumer() + + while True: + try: + prefilter.consume_and_extract_data() + prefilter.filter_by_error() + prefilter.add_filtered_data_to_batch() + except IOError as e: + logger.error(e) + raise + except ValueError as e: + logger.warning(e) + except KafkaMessageFetchError as e: + logger.debug(e) + continue + except KeyboardInterrupt: + logger.info("Closing down InspectPrefilter.") + break + finally: + prefilter.clear_data() + + +if __name__ == '__main__': + main() diff --git a/heidgaf_log_collector/tests/test_batch_handler.py b/heidgaf_log_collector/tests/test_batch_handler.py index 0087331..ebdf3af 100644 --- a/heidgaf_log_collector/tests/test_batch_handler.py +++ b/heidgaf_log_collector/tests/test_batch_handler.py @@ -16,6 +16,10 @@ def test_init(self): self.assertIsInstance(sender_instance.lock, type(Lock())) self.assertIsNone(sender_instance.timer) self.assertIsNone(sender_instance.kafka_producer) + self.assertEqual( + {'bootstrap.servers': f"{KAFKA_BROKER_HOST}:{KAFKA_BROKER_PORT}"}, + sender_instance.conf, + ) class TestStartKafkaProducer(unittest.TestCase): diff --git a/heidgaf_log_collector/tests/test_prefilter.py b/heidgaf_log_collector/tests/test_prefilter.py index d27e292..2424056 100644 --- a/heidgaf_log_collector/tests/test_prefilter.py +++ b/heidgaf_log_collector/tests/test_prefilter.py @@ -1,6 +1,7 @@ import unittest -from unittest.mock import patch +from unittest.mock import patch, MagicMock +from heidgaf_log_collector.config import * from heidgaf_log_collector.prefilter import InspectPrefilter @@ -8,15 +9,51 @@ class TestInit(unittest.TestCase): @patch('heidgaf_log_collector.prefilter.KafkaBatchSender') def test_valid_init(self, mock_batch_handler): prefilter_instance = InspectPrefilter(error_type="NXDOMAIN") + expected_conf = { + 'bootstrap.servers': f"{KAFKA_BROKER_HOST}:{KAFKA_BROKER_PORT}", + 'group.id': "my_group", + 'auto.offset.reset': 'earliest' + } mock_batch_handler.assert_called_once_with(topic="Inspect") self.assertEqual([], prefilter_instance.unfiltered_data) self.assertEqual([], prefilter_instance.filtered_data) self.assertEqual("NXDOMAIN", prefilter_instance.error_type) self.assertIsNotNone(prefilter_instance.batch_handler) + self.assertEqual(expected_conf, prefilter_instance.conf) + self.assertIsNone(prefilter_instance.kafka_consumer) prefilter_instance.batch_handler.start_kafka_producer.assert_called_once() +class TestStartKafkaConsumer(unittest.TestCase): + @patch('heidgaf_log_collector.prefilter.Consumer') + def test_start_kafka_producer(self, mock_consumer): + mock_producer_instance = MagicMock() + mock_consumer.return_value = mock_producer_instance + prefilter_instance = InspectPrefilter(error_type="NXDOMAIN") + expected_conf = { + 'bootstrap.servers': f"{KAFKA_BROKER_HOST}:{KAFKA_BROKER_PORT}", + 'group.id': "my_group", + 'auto.offset.reset': 'earliest' + } + + prefilter_instance.start_kafka_consumer() + + mock_consumer.assert_called_once_with(expected_conf) + + self.assertIs(prefilter_instance.kafka_consumer, mock_producer_instance) + + @patch('heidgaf_log_collector.prefilter.Consumer') + def test_start_kafka_consumer_already_running(self, mock_consumer): + mock_consumer_instance = MagicMock() + mock_consumer.return_value = mock_consumer_instance + prefilter_instance = InspectPrefilter(error_type="NXDOMAIN") + + prefilter_instance.kafka_consumer = MagicMock() + + mock_consumer.assert_not_called() + + class TestFilterByError(unittest.TestCase): def test_filter_by_error_empty_data(self): prefilter_instance = InspectPrefilter(error_type="NXDOMAIN")