Skip to content

Commit

Permalink
update InspectPrefilter
Browse files Browse the repository at this point in the history
  • Loading branch information
lamr02n committed Jun 1, 2024
1 parent c85e268 commit f0c13e3
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 5 deletions.
4 changes: 2 additions & 2 deletions heidgaf_log_collector/batch_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ def __init__(self, topic: str):
self.lock = Lock()
self.timer = None
self.kafka_producer = None
self.conf = {'bootstrap.servers': f"{KAFKA_BROKER_HOST}:{KAFKA_BROKER_PORT}"}

def start_kafka_producer(self):
if self.kafka_producer:
logger.warning(f"Kafka Producer already running on {KAFKA_BROKER_HOST}:{KAFKA_BROKER_PORT}.")
return

conf = {'bootstrap.servers': f"{KAFKA_BROKER_HOST}:{KAFKA_BROKER_PORT}"}
self.kafka_producer = Producer(conf)
self.kafka_producer = Producer(self.conf)

def _send_batch(self):
if not self.kafka_producer:
Expand Down
3 changes: 2 additions & 1 deletion heidgaf_log_collector/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def _check_size(size: str):
return False


# TODO: Test
def main():
collector = LogCollector("127.0.0.1", 9998)

Expand All @@ -153,7 +154,7 @@ def main():
except ValueError as e:
logger.debug(e)
except KeyboardInterrupt:
logger.info("Closing down.")
logger.info("Closing down LogCollector.")
break
finally:
collector.clear_logline()
Expand Down
77 changes: 76 additions & 1 deletion heidgaf_log_collector/prefilter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,22 @@
import ast
import json
import logging
import os
import sys

from confluent_kafka import Consumer

sys.path.append(os.getcwd()) # needed for Terminal execution
from heidgaf_log_collector.batch_handler import KafkaBatchSender
from heidgaf_log_collector.config import *
from pipeline_prototype.logging_config import setup_logging

setup_logging()
logger = logging.getLogger(__name__)


class KafkaMessageFetchError(Exception):
pass


class InspectPrefilter:
Expand All @@ -12,8 +28,38 @@ def __init__(self, error_type: str):
self.batch_handler = KafkaBatchSender(topic="Inspect")
self.batch_handler.start_kafka_producer()

self.kafka_consumer = None
self.conf = {
'bootstrap.servers': f"{KAFKA_BROKER_HOST}:{KAFKA_BROKER_PORT}",
'group.id': "my_group", # TODO: Do something with this
'auto.offset.reset': 'earliest' # TODO: Do something with this
}

def start_kafka_consumer(self):
if self.kafka_consumer:
logger.warning(f"Kafka Consumer already running!")
return

self.kafka_consumer = Consumer(self.conf)
self.kafka_consumer.subscribe(['Inspect'])

# TODO: Test
def consume_and_extract_data(self):
pass
message = self.kafka_consumer.poll(timeout=1.0)

if not message:
raise KafkaMessageFetchError("No message fetched from Kafka broker.")

if message.error():
raise IOError(f"An error occurred while consuming: {message.error()}")

decoded_message = message.value().decode('utf-8')
json_from_message = json.loads(decoded_message)

if self.unfiltered_data:
logger.warning("Overwriting existing data by new message.")

self.unfiltered_data = ast.literal_eval(json_from_message)

def filter_by_error(self):
for e in self.unfiltered_data:
Expand All @@ -29,3 +75,32 @@ def add_filtered_data_to_batch(self):
def clear_data(self):
self.unfiltered_data = []
self.filtered_data = []


# TODO: Test
def main():
prefilter = InspectPrefilter(error_type="NXDOMAIN")
prefilter.start_kafka_consumer()

while True:
try:
prefilter.consume_and_extract_data()
prefilter.filter_by_error()
prefilter.add_filtered_data_to_batch()
except IOError as e:
logger.error(e)
raise
except ValueError as e:
logger.warning(e)
except KafkaMessageFetchError as e:
logger.debug(e)
continue
except KeyboardInterrupt:
logger.info("Closing down InspectPrefilter.")
break
finally:
prefilter.clear_data()


if __name__ == '__main__':
main()
4 changes: 4 additions & 0 deletions heidgaf_log_collector/tests/test_batch_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ def test_init(self):
self.assertIsInstance(sender_instance.lock, type(Lock()))
self.assertIsNone(sender_instance.timer)
self.assertIsNone(sender_instance.kafka_producer)
self.assertEqual(
{'bootstrap.servers': f"{KAFKA_BROKER_HOST}:{KAFKA_BROKER_PORT}"},
sender_instance.conf,
)


class TestStartKafkaProducer(unittest.TestCase):
Expand Down
39 changes: 38 additions & 1 deletion heidgaf_log_collector/tests/test_prefilter.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,59 @@
import unittest
from unittest.mock import patch
from unittest.mock import patch, MagicMock

from heidgaf_log_collector.config import *
from heidgaf_log_collector.prefilter import InspectPrefilter


class TestInit(unittest.TestCase):
@patch('heidgaf_log_collector.prefilter.KafkaBatchSender')
def test_valid_init(self, mock_batch_handler):
prefilter_instance = InspectPrefilter(error_type="NXDOMAIN")
expected_conf = {
'bootstrap.servers': f"{KAFKA_BROKER_HOST}:{KAFKA_BROKER_PORT}",
'group.id': "my_group",
'auto.offset.reset': 'earliest'
}

mock_batch_handler.assert_called_once_with(topic="Inspect")
self.assertEqual([], prefilter_instance.unfiltered_data)
self.assertEqual([], prefilter_instance.filtered_data)
self.assertEqual("NXDOMAIN", prefilter_instance.error_type)
self.assertIsNotNone(prefilter_instance.batch_handler)
self.assertEqual(expected_conf, prefilter_instance.conf)
self.assertIsNone(prefilter_instance.kafka_consumer)
prefilter_instance.batch_handler.start_kafka_producer.assert_called_once()


class TestStartKafkaConsumer(unittest.TestCase):
@patch('heidgaf_log_collector.prefilter.Consumer')
def test_start_kafka_producer(self, mock_consumer):
mock_producer_instance = MagicMock()
mock_consumer.return_value = mock_producer_instance
prefilter_instance = InspectPrefilter(error_type="NXDOMAIN")
expected_conf = {
'bootstrap.servers': f"{KAFKA_BROKER_HOST}:{KAFKA_BROKER_PORT}",
'group.id': "my_group",
'auto.offset.reset': 'earliest'
}

prefilter_instance.start_kafka_consumer()

mock_consumer.assert_called_once_with(expected_conf)

self.assertIs(prefilter_instance.kafka_consumer, mock_producer_instance)

@patch('heidgaf_log_collector.prefilter.Consumer')
def test_start_kafka_consumer_already_running(self, mock_consumer):
mock_consumer_instance = MagicMock()
mock_consumer.return_value = mock_consumer_instance
prefilter_instance = InspectPrefilter(error_type="NXDOMAIN")

prefilter_instance.kafka_consumer = MagicMock()

mock_consumer.assert_not_called()


class TestFilterByError(unittest.TestCase):
def test_filter_by_error_empty_data(self):
prefilter_instance = InspectPrefilter(error_type="NXDOMAIN")
Expand Down

0 comments on commit f0c13e3

Please sign in to comment.