Skip to content

Commit

Permalink
PK-2141: add headers dt
Browse files Browse the repository at this point in the history
  • Loading branch information
SergeyDubovitsky committed Aug 20, 2024
1 parent d61648a commit 3d30df7
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 7 deletions.
15 changes: 13 additions & 2 deletions src/mqtt_kafka_connector/clients/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,23 @@ def _check_message_interval(self, msg: dict) -> bool:
return False
return True

@staticmethod
def prepare_msg_headers(headers: list) -> list:
headers.append(
(
'dt_send_to_kafka',
dt.datetime.now(dt.timezone.utc).isoformat().encode()
)
)
return headers

def prepare_msg_for_kafka(self, raw_msg: dict) -> bytes | None:
try:
if MODIFY_MESSAGE_RM_NONE_FIELDS:
raw_msg = clean_none_fields(raw_msg)

# Implicit casting to JSON standard without
# NaN, Inf, -Inf values with orjson)
# NaN, Inf, -Inf values with orjson
msg_for_kafka = (
orjson.dumps(raw_msg)
if MODIFY_MESSAGE_RM_NON_NUMBER_FLOAT_FIELDS
Expand Down Expand Up @@ -102,7 +112,7 @@ async def send_batch(
if not msg:
i += 1
continue

headers = self.message_helper.prepare_msg_headers(headers)
metadata = batch.append(
key=key, value=msg, timestamp=None, headers=headers
)
Expand Down Expand Up @@ -138,6 +148,7 @@ async def send(
headers: list,
) -> bool:
value = self.message_helper.prepare_msg_for_kafka(message)
headers = self.message_helper.prepare_msg_headers(headers)
res = await self.producer.send_and_wait(
topic, value=value, key=key, headers=headers
)
Expand Down
7 changes: 6 additions & 1 deletion src/mqtt_kafka_connector/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
from dotenv import load_dotenv
from sentry_sdk.integrations.logging import LoggingIntegration

from mqtt_kafka_connector.context_vars import device_id_var, message_uuid_var
from mqtt_kafka_connector.context_vars import (
customer_id_var,
device_id_var,
message_uuid_var,
)

load_dotenv()
LOGLEVEL = os.getenv('LOGLEVEL', 'INFO')
Expand Down Expand Up @@ -79,6 +83,7 @@ class MessageParamsFilter(Filter):
def filter(self, record):
message_uuid = message_uuid_var.get()
record.device_id = device_id_var.get()
record.customer_id = customer_id_var.get()
record.message_uuid = message_uuid
record.service_name = SERVICE_NAME
record.environment = ENVIRONMENT
Expand Down
13 changes: 13 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,19 @@
from mqtt_kafka_connector.clients.kafka import KafkaProducer, MessageHelper
from mqtt_kafka_connector.services.prometheus import Prometheus

FAKE_TIME = datetime.datetime(2012, 12, 12, 12, 12, 12)


@pytest.fixture
def patch_datetime_now(monkeypatch):

class fake_dt(datetime.datetime):
@classmethod
def now(cls, *args, **kwargs):
return FAKE_TIME

monkeypatch.setattr(datetime, 'datetime', fake_dt)


@pytest.fixture()
def now_timestamp():
Expand Down
3 changes: 2 additions & 1 deletion tests/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ async def test_connector(
assert sending_message == message

headers = dict(send_batch_kwargs['headers'])
assert headers.pop('message_uuid')
assert headers.pop('message_uuid') is not None
assert headers.pop('dt_send_to_kafka') is not None
assert headers == dict(
schema_id=SCHEMA_ID.encode(),
message_deserialized=b'1',
Expand Down
11 changes: 8 additions & 3 deletions tests/test_kafka_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,23 @@
MIN_TELEMETRY_INTERVAL_AGE_HOURS,
)

from .conftest import FAKE_TIME


async def test_send_batch(kafka_producer, unpack_message_pack):
await kafka_producer.send_batch(
'topic',
unpack_message_pack,
'1',
'headers',
[('header_1', 'value')],
)

kafka_producer.producer.create_batch.assert_called()
kafka_producer.producer.partitions_for.assert_called_with('topic')
kafka_producer.producer.send_batch.assert_called()


async def test_send(kafka_producer, unpack_message_pack):
async def test_send(kafka_producer, unpack_message_pack, patch_datetime_now):
result = await kafka_producer.send(
'topic', unpack_message_pack[0], b'key', [('header', b'value')]
)
Expand All @@ -31,7 +33,10 @@ async def test_send(kafka_producer, unpack_message_pack):
unpack_message_pack[0]
),
key=b'key',
headers=[('header', b'value')],
headers=[
('header', b'value'),
('dt_send_to_kafka', FAKE_TIME.isoformat().encode())
]
)
assert result is True

Expand Down

0 comments on commit 3d30df7

Please sign in to comment.