diff --git a/src/mqtt_kafka_connector/clients/kafka.py b/src/mqtt_kafka_connector/clients/kafka.py index 4301690..754abd5 100644 --- a/src/mqtt_kafka_connector/clients/kafka.py +++ b/src/mqtt_kafka_connector/clients/kafka.py @@ -45,13 +45,23 @@ def _check_message_interval(self, msg: dict) -> bool: return False return True + @staticmethod + def prepare_msg_headers(headers: list) -> list: + headers.append( + ( + 'dt_send_to_kafka', + dt.datetime.now(dt.timezone.utc).isoformat().encode() + ) + ) + return headers + def prepare_msg_for_kafka(self, raw_msg: dict) -> bytes | None: try: if MODIFY_MESSAGE_RM_NONE_FIELDS: raw_msg = clean_none_fields(raw_msg) # Implicit casting to JSON standard without - # NaN, Inf, -Inf values with orjson) + # NaN, Inf, -Inf values with orjson msg_for_kafka = ( orjson.dumps(raw_msg) if MODIFY_MESSAGE_RM_NON_NUMBER_FLOAT_FIELDS @@ -102,7 +112,7 @@ async def send_batch( if not msg: i += 1 continue - + headers = self.message_helper.prepare_msg_headers(headers) metadata = batch.append( key=key, value=msg, timestamp=None, headers=headers ) @@ -138,6 +148,7 @@ async def send( headers: list, ) -> bool: value = self.message_helper.prepare_msg_for_kafka(message) + headers = self.message_helper.prepare_msg_headers(headers) res = await self.producer.send_and_wait( topic, value=value, key=key, headers=headers ) diff --git a/src/mqtt_kafka_connector/conf.py b/src/mqtt_kafka_connector/conf.py index 1e5f612..857a389 100644 --- a/src/mqtt_kafka_connector/conf.py +++ b/src/mqtt_kafka_connector/conf.py @@ -7,7 +7,11 @@ from dotenv import load_dotenv from sentry_sdk.integrations.logging import LoggingIntegration -from mqtt_kafka_connector.context_vars import device_id_var, message_uuid_var +from mqtt_kafka_connector.context_vars import ( + customer_id_var, + device_id_var, + message_uuid_var, +) load_dotenv() LOGLEVEL = os.getenv('LOGLEVEL', 'INFO') @@ -79,6 +83,7 @@ class MessageParamsFilter(Filter): def filter(self, record): message_uuid = message_uuid_var.get() record.device_id = device_id_var.get() + record.customer_id = customer_id_var.get() record.message_uuid = message_uuid record.service_name = SERVICE_NAME record.environment = ENVIRONMENT diff --git a/tests/conftest.py b/tests/conftest.py index f5c0abd..87a0774 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,6 +10,19 @@ from mqtt_kafka_connector.clients.kafka import KafkaProducer, MessageHelper from mqtt_kafka_connector.services.prometheus import Prometheus +FAKE_TIME = datetime.datetime(2012, 12, 12, 12, 12, 12) + + +@pytest.fixture +def patch_datetime_now(monkeypatch): + + class fake_dt(datetime.datetime): + @classmethod + def now(cls, *args, **kwargs): + return FAKE_TIME + + monkeypatch.setattr(datetime, 'datetime', fake_dt) + @pytest.fixture() def now_timestamp(): diff --git a/tests/test_connector.py b/tests/test_connector.py index 8522675..1fa9d77 100644 --- a/tests/test_connector.py +++ b/tests/test_connector.py @@ -56,7 +56,8 @@ async def test_connector( assert sending_message == message headers = dict(send_batch_kwargs['headers']) - assert headers.pop('message_uuid') + assert headers.pop('message_uuid') is not None + assert headers.pop('dt_send_to_kafka') is not None assert headers == dict( schema_id=SCHEMA_ID.encode(), message_deserialized=b'1', diff --git a/tests/test_kafka_client.py b/tests/test_kafka_client.py index bb45ea6..3502660 100644 --- a/tests/test_kafka_client.py +++ b/tests/test_kafka_client.py @@ -6,13 +6,15 @@ MIN_TELEMETRY_INTERVAL_AGE_HOURS, ) +from .conftest import FAKE_TIME + async def test_send_batch(kafka_producer, unpack_message_pack): await kafka_producer.send_batch( 'topic', unpack_message_pack, '1', - 'headers', + [('header_1', 'value')], ) kafka_producer.producer.create_batch.assert_called() @@ -20,7 +22,7 @@ async def test_send_batch(kafka_producer, unpack_message_pack): kafka_producer.producer.send_batch.assert_called() -async def test_send(kafka_producer, unpack_message_pack): +async def test_send(kafka_producer, unpack_message_pack, patch_datetime_now): result = await kafka_producer.send( 'topic', unpack_message_pack[0], b'key', [('header', b'value')] ) @@ -31,7 +33,10 @@ async def test_send(kafka_producer, unpack_message_pack): unpack_message_pack[0] ), key=b'key', - headers=[('header', b'value')], + headers=[ + ('header', b'value'), + ('dt_send_to_kafka', FAKE_TIME.isoformat().encode()) + ] ) assert result is True