From 443f901c9891decfe49007215c95985ac3da99ad Mon Sep 17 00:00:00 2001 From: Manuel Fuchs Date: Sun, 3 Nov 2024 17:37:46 +0100 Subject: [PATCH 1/6] Introduce a separator symbol to correctly distinguish received messages --- docker/docker-compose.kafka.yml | 1 + src/logserver/server.py | 29 ++++++++++++++++++++--------- src/mock/generator.py | 2 -- 3 files changed, 21 insertions(+), 11 deletions(-) diff --git a/docker/docker-compose.kafka.yml b/docker/docker-compose.kafka.yml index b76a621..e9b8c0c 100644 --- a/docker/docker-compose.kafka.yml +++ b/docker/docker-compose.kafka.yml @@ -5,6 +5,7 @@ services: networks: heidgaf: ipv4_address: 172.27.0.2 + restart: "unless-stopped" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_SERVER_ID: 1 diff --git a/src/logserver/server.py b/src/logserver/server.py index ac2506b..5ab5f3b 100644 --- a/src/logserver/server.py +++ b/src/logserver/server.py @@ -26,9 +26,9 @@ class LogServer: """ - Server for receiving, storing and sending single log lines. Opens a port for receiving messages, listens for log - lines via Kafka and reads newly added lines from an input file. To retrieve a log line from the server, - other modules can connect to its outgoing/sending port. The server will then send its oldest log line as a response. + Server for receiving, storing and sending single log lines. Opens a port for receiving messages, listens for + messages via Kafka and reads newly added lines from an input file. To retrieve a message from the server, + other modules can connect to its outgoing/sending port. The server will then send its oldest message as a response. """ def __init__(self) -> None: @@ -217,18 +217,29 @@ async def send_logline(writer, logline) -> None: async def receive_logline(self, reader) -> None: """ - Receives a log line encoded as UTF-8 from the connected component and adds it to the data queue. + Receives one or multiple log lines encoded as UTF-8 separated by and ending with separator '\n' from the + connected component and adds it or them to the data queue. Message must end with separator symbol. Args: reader: Responsible for reading incoming data """ while True: - data = await reader.read(1024) - if not data: + try: + data = await reader.readuntil(separator=b"\n") + if not data: + break + received_message = data.decode().strip() + logger.info(f"Received message:\n ⤷ {received_message}") + self.data_queue.put(received_message) + except asyncio.exceptions.IncompleteReadError as e: + logger.warning(f"Ignoring message: No separator symbol found: {e}") + break + except asyncio.LimitOverrunError as e: + logger.error(f"Message size exceeded, separator symbol not found") break - received_message = data.decode() - logger.info(f"Received message:\n ⤷ {received_message}") - self.data_queue.put(received_message) + except Exception as e: + logger.error(f"Unexpected error: {e}") + raise def get_next_logline(self) -> str | None: """ diff --git a/src/mock/generator.py b/src/mock/generator.py index 6e93f77..97f2f37 100644 --- a/src/mock/generator.py +++ b/src/mock/generator.py @@ -4,13 +4,11 @@ import time sys.path.append(os.getcwd()) -from src.base.log_config import setup_logging from src.mock.log_generator import generate_dns_log_line from src.base.log_config import get_logger logger = get_logger() - if __name__ == "__main__": with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket: client_socket.connect((str("127.0.0.1"), 9998)) From cb28048dcc2fc33a382edd47c38a3d1f439f430c Mon Sep 17 00:00:00 2001 From: Manuel Fuchs Date: Sun, 3 Nov 2024 17:45:22 +0100 Subject: [PATCH 2/6] Fix test --- src/logserver/server.py | 2 +- tests/test_server.py | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/logserver/server.py b/src/logserver/server.py index 5ab5f3b..50028a7 100644 --- a/src/logserver/server.py +++ b/src/logserver/server.py @@ -234,7 +234,7 @@ async def receive_logline(self, reader) -> None: except asyncio.exceptions.IncompleteReadError as e: logger.warning(f"Ignoring message: No separator symbol found: {e}") break - except asyncio.LimitOverrunError as e: + except asyncio.LimitOverrunError: logger.error(f"Message size exceeded, separator symbol not found") break except Exception as e: diff --git a/tests/test_server.py b/tests/test_server.py index 4e6c626..842ca92 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -346,13 +346,15 @@ async def test_send_logline_no_logline(self): class TestReceiveLogline(unittest.IsolatedAsyncioTestCase): @patch("src.logserver.server.logger") - async def test_receive_logline(self, mock_logger): + async def test_receive_one_logline(self, mock_logger): reader = AsyncMock() data_queue = MagicMock() server_instance = LogServer() server_instance.data_queue = data_queue - reader.read = AsyncMock(side_effect=[b"Test message 1", b"Test message 2", b""]) + reader.readuntil = AsyncMock( + side_effect=[b"Test message 1\n", b"Test message 2\n", b""] + ) receive_task = asyncio.create_task(server_instance.receive_logline(reader)) await receive_task From 340866a15c8aa75007a507e8cdbd785c0e576c66 Mon Sep 17 00:00:00 2001 From: Manuel Fuchs Date: Mon, 4 Nov 2024 19:04:52 +0100 Subject: [PATCH 3/6] Update tests for full coverage --- tests/test_server.py | 41 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/tests/test_server.py b/tests/test_server.py index 842ca92..4ff3098 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -346,7 +346,7 @@ async def test_send_logline_no_logline(self): class TestReceiveLogline(unittest.IsolatedAsyncioTestCase): @patch("src.logserver.server.logger") - async def test_receive_one_logline(self, mock_logger): + async def test_receive_logline(self, mock_logger): reader = AsyncMock() data_queue = MagicMock() server_instance = LogServer() @@ -364,6 +364,45 @@ async def test_receive_one_logline(self, mock_logger): self.assertEqual(data_queue.put.call_count, 2) + @patch("src.logserver.server.logger") + async def test_receive_without_separator(self, mock_logger): + reader = AsyncMock() + data_queue = MagicMock() + server_instance = LogServer() + server_instance.data_queue = data_queue + + reader.readuntil = AsyncMock( + side_effect=asyncio.exceptions.IncompleteReadError(b"", 100) + ) + + # noinspection PyAsyncCall + asyncio.create_task(server_instance.receive_logline(reader)) + + @patch("src.logserver.server.logger") + async def test_receive_too_long(self, mock_logger): + reader = AsyncMock() + data_queue = MagicMock() + server_instance = LogServer() + server_instance.data_queue = data_queue + + reader.readuntil = AsyncMock(side_effect=asyncio.LimitOverrunError("", 1)) + + # noinspection PyAsyncCall + asyncio.create_task(server_instance.receive_logline(reader)) + + @patch("src.logserver.server.logger") + async def test_receive_raise_other_exception(self, mock_logger): + reader = AsyncMock() + data_queue = MagicMock() + server_instance = LogServer() + server_instance.data_queue = data_queue + + reader.readuntil = AsyncMock(side_effect=ValueError("Something went wrong")) + + with self.assertRaises(ValueError): + task = asyncio.create_task(server_instance.receive_logline(reader)) + await task + class TestGetNextLogline(unittest.TestCase): def test_valid(self): From 841e7b0125f2fcbfbaf6446b3fb815c781481448 Mon Sep 17 00:00:00 2001 From: "Manuel F." <94921205+lamr02n@users.noreply.github.com> Date: Mon, 4 Nov 2024 19:57:43 +0100 Subject: [PATCH 4/6] Update MacOS version in build file --- .github/workflows/build_test_macos.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build_test_macos.yml b/.github/workflows/build_test_macos.yml index 2bc3b29..d28f7e0 100644 --- a/.github/workflows/build_test_macos.yml +++ b/.github/workflows/build_test_macos.yml @@ -13,7 +13,7 @@ jobs: strategy: fail-fast: false matrix: - os: [ "macos-12" ] + os: [ "macos-14" ] python-version: [ "3.11", "3.10", ] From 9de15c8e6ae724422e243be4b61ffb3c8250fd29 Mon Sep 17 00:00:00 2001 From: "Manuel F." <94921205+lamr02n@users.noreply.github.com> Date: Mon, 4 Nov 2024 20:00:00 +0100 Subject: [PATCH 5/6] Update MacOS version (second try) --- .github/workflows/build_test_macos.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build_test_macos.yml b/.github/workflows/build_test_macos.yml index d28f7e0..1b7876d 100644 --- a/.github/workflows/build_test_macos.yml +++ b/.github/workflows/build_test_macos.yml @@ -13,7 +13,7 @@ jobs: strategy: fail-fast: false matrix: - os: [ "macos-14" ] + os: [ "macos-13" ] python-version: [ "3.11", "3.10", ] From 712ae011ab551aa3d5886f3544ca4b58124a0328 Mon Sep 17 00:00:00 2001 From: "Manuel F." <94921205+lamr02n@users.noreply.github.com> Date: Mon, 4 Nov 2024 20:04:19 +0100 Subject: [PATCH 6/6] Update MacOS version (third try) --- .github/workflows/build_test_macos.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build_test_macos.yml b/.github/workflows/build_test_macos.yml index 1b7876d..e91090c 100644 --- a/.github/workflows/build_test_macos.yml +++ b/.github/workflows/build_test_macos.yml @@ -13,7 +13,7 @@ jobs: strategy: fail-fast: false matrix: - os: [ "macos-13" ] + os: [ "macos-14" ] python-version: [ "3.11", "3.10", ] @@ -32,6 +32,7 @@ jobs: - name: Install requirements run: | + brew install libomp python -m pip install --upgrade pip python -m pip install pyyaml python -m pip install -r requirements/requirements.dev.txt -r requirements/requirements.detector.txt -r requirements/requirements.logcollector.txt -r requirements/requirements.prefilter.txt -r requirements/requirements.inspector.txt -r requirements/requirements.logserver.txt