Skip to content

Commit

Permalink
Merge pull request #27 from piklema/PK-2141
Browse files Browse the repository at this point in the history
PK-2141: check message time
  • Loading branch information
SergeyDubovitsky authored Sep 11, 2024
2 parents 1fd3d1b + 87360cd commit 990dc52
Show file tree
Hide file tree
Showing 10 changed files with 200 additions and 64 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ install: uninstall clean ## install the package to the active Python's site-pack
python setup.py install

run: ## run local
@python src/mqtt_kafka_connector/connector/main.py
@python src/mqtt_kafka_connector/connector/main.py

requirements_uninstall: ##
@pip freeze | grep -v "pkg-resources" | grep -v "@" | xargs -r pip uninstall -y --quiet
Expand Down
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ filterwarnings = [
tests_require = [
'develop'
]
env = [
"WITH_MESSAGE_DESERIALIZE=True",
]


[tool.coverage.report]
show_missing = true
skip_empty = true
fail_under = 89
fail_under = 88


[tool.ruff]
Expand Down
78 changes: 61 additions & 17 deletions src/mqtt_kafka_connector/clients/kafka.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime as dt
import json
import logging

Expand All @@ -6,6 +7,8 @@

from mqtt_kafka_connector.conf import (
KAFKA_BOOTSTRAP_SERVERS,
MAX_TELEMETRY_INTERVAL_AGE_HOURS,
MIN_TELEMETRY_INTERVAL_AGE_HOURS,
MODIFY_MESSAGE_RM_NON_NUMBER_FLOAT_FIELDS,
MODIFY_MESSAGE_RM_NONE_FIELDS,
)
Expand All @@ -14,9 +17,59 @@
logger = logging.getLogger(__name__)


class MessageHelper:
def __init__(self, prometheus=None):
self.prometheus = prometheus

def _check_message_interval(self, msg: dict) -> bool:
msg_time = msg.get('time')

if not msg_time:
logger.warning('Message has no time field')
return False

if isinstance(msg_time, str):
msg_time = dt.datetime.fromisoformat(msg_time)

msg_time = msg_time.astimezone(dt.timezone.utc)
now_utc = dt.datetime.now(dt.timezone.utc)
early = now_utc - dt.timedelta(hours=MIN_TELEMETRY_INTERVAL_AGE_HOURS)
late = now_utc + dt.timedelta(hours=MAX_TELEMETRY_INTERVAL_AGE_HOURS)
self.prometheus.telemetry_message_lag_add(
value=(now_utc - msg_time).total_seconds(),
)

if not early <= msg_time <= late:
logger.info('Message time is out of interval')
return False
return True

def prepare_msg_for_kafka(self, raw_msg: dict) -> bytes | None:
try:
if MODIFY_MESSAGE_RM_NONE_FIELDS:
raw_msg = clean_none_fields(raw_msg)

# Implicit casting to JSON standard without
# NaN, Inf, -Inf values with orjson)
msg_for_kafka = (
orjson.dumps(raw_msg)
if MODIFY_MESSAGE_RM_NON_NUMBER_FLOAT_FIELDS
else json.dumps(raw_msg, cls=DateTimeEncoder).encode()
)

if not self._check_message_interval(msg=raw_msg):
return None

except Exception as e:
logger.exception('Error while preparing message for Kafka: %s', e)
return None
return msg_for_kafka


class KafkaProducer:
def __init__(self):
def __init__(self, message_helper: MessageHelper):
self.producer: AIOKafkaProducer = None
self.message_helper = message_helper

async def start(self):
self.producer: AIOKafkaProducer = AIOKafkaProducer(
Expand All @@ -28,20 +81,6 @@ async def start(self):
async def stop(self):
await self.producer.stop()

@staticmethod
def _prepare_msg_for_kafka(raw_msg: dict) -> bytes:
if MODIFY_MESSAGE_RM_NONE_FIELDS:
raw_msg = clean_none_fields(raw_msg)

# Implicit casting to JSON standard without
# NaN, Inf, -Inf values with orjson)
msg_for_kafka = (
orjson.dumps(raw_msg)
if MODIFY_MESSAGE_RM_NON_NUMBER_FLOAT_FIELDS
else json.dumps(raw_msg, cls=DateTimeEncoder).encode()
)
return msg_for_kafka

async def get_partition(self, topic: str, key: bytes) -> int:
partitions = await self.producer.partitions_for(topic)
return int(key) % len(partitions)
Expand All @@ -57,7 +96,12 @@ async def send_batch(

i = 0
while i < len(messages):
msg = self._prepare_msg_for_kafka(messages[i])
msg = self.message_helper.prepare_msg_for_kafka(messages[i])

if not msg:
i += 1
continue

metadata = batch.append(
key=key, value=msg, timestamp=None, headers=headers
)
Expand Down Expand Up @@ -92,7 +136,7 @@ async def send(
key: bytes,
headers: list,
) -> bool:
value = self._prepare_msg_for_kafka(message)
value = self.message_helper.prepare_msg_for_kafka(message)
res = await self.producer.send_and_wait(
topic, value=value, key=key, headers=headers
)
Expand Down
6 changes: 6 additions & 0 deletions src/mqtt_kafka_connector/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@
)
KAFKA_SEND_BATCHES = strtobool(os.getenv('KAFKA_SEND_BATCHES', 'False'))
PROMETHEUS_PORT = int(os.getenv('PROMETHEUS_PORT', 8011))
MIN_TELEMETRY_INTERVAL_AGE_HOURS = int(
os.getenv('MIN_TELEMETRY_INTERVAL_AGE_HOURS', 24 * 3),
)
MAX_TELEMETRY_INTERVAL_AGE_HOURS = int(
os.getenv('MAX_TELEMETRY_INTERVAL_AGE_HOURS', 1),
)

if SENTRY_DSN:
sentry_sdk.init(
Expand Down
32 changes: 18 additions & 14 deletions src/mqtt_kafka_connector/connector/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@
from aiokafka.errors import KafkaConnectionError
from aiomqtt.message import Message

from mqtt_kafka_connector.clients.kafka import KafkaProducer
from mqtt_kafka_connector.clients.kafka import KafkaProducer, MessageHelper
from mqtt_kafka_connector.clients.mqtt import MQTTClient
from mqtt_kafka_connector.clients.schema_client import SchemaClient
from mqtt_kafka_connector.conf import (
KAFKA_HEADERS_LIST,
KAFKA_KEY_TEMPLATE,
KAFKA_SEND_BATCHES,
MQTT_TOPIC_SOURCE_TEMPLATE,
PROMETHEUS_PORT,
RECONNECT_INTERVAL_SEC,
TELEMETRY_KAFKA_TOPIC,
TRACE_HEADER,
Expand All @@ -42,12 +43,12 @@ def __init__(
mqtt_client: MQTTClient,
kafka_producer: KafkaProducer,
schema_client: SchemaClient,
prometheus_service: Prometheus,
prometheus: Prometheus = None,
):
self.kafka_producer = kafka_producer
self.mqtt_client = mqtt_client
self.schema_client = schema_client
self.prometheus = prometheus_service
self.prometheus = prometheus

self.mqtt_topic_params_tmpl = Template(MQTT_TOPIC_SOURCE_TEMPLATE)
self.last_messages = defaultdict(dict)
Expand Down Expand Up @@ -173,7 +174,8 @@ async def run(self):
logger.info('Connector starting...')
while True:
try:
await self.prometheus.start()
if self.prometheus:
await self.prometheus.start()
await self.mqtt_client.start()
await self.kafka_producer.start()

Expand All @@ -200,15 +202,17 @@ async def run(self):
await asyncio.sleep(RECONNECT_INTERVAL_SEC)
finally:
await self.kafka_producer.stop()
await self.prometheus.service.stop()
if self.prometheus:
await self.prometheus.service.stop()

async def handle(self, mqtt_message: Message):
mqtt_topic = mqtt_message.topic.value
mqtt_params = self.mqtt_topic_params_tmpl.to_dict(
mqtt_message.topic.value
)
device_id = mqtt_params.get('device_id')
setup_context_vars(device_id)

setup_context_vars(device_id, mqtt_params.get('customer_id'))

kafka_topic, kafka_key, kafka_headers = self.get_kafka_message_params(
mqtt_params
Expand All @@ -219,22 +223,22 @@ async def handle(self, mqtt_message: Message):
)
if self.check_telemetry_messages_pack(mqtt_topic, telemetry_msg_pack):
await self.kafka_handler(
telemetry_msg_pack, kafka_topic, kafka_key, kafka_headers
)
self.prometheus.add(
device_id,
mqtt_params.get('customer_id'),
len(telemetry_msg_pack),
telemetry_msg_pack,
kafka_topic,
kafka_key,
kafka_headers,
)
self.prometheus.messages_counter_add(value=len(telemetry_msg_pack))

return True


def main():
producer = KafkaProducer()
prometheus = Prometheus() if PROMETHEUS_PORT else None
message_helper = MessageHelper(prometheus)
producer = KafkaProducer(message_helper)
mqtt_client = MQTTClient()
schema_client = SchemaClient()
prometheus = Prometheus()
connector = Connector(mqtt_client, producer, schema_client, prometheus)
asyncio.run(connector.run())

Expand Down
8 changes: 7 additions & 1 deletion src/mqtt_kafka_connector/context_vars.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,20 @@

MESSAGE_UUID = 'message_uuid'
DEVICE_ID = 'device_id'
CUSTOMER_ID = 'customer_id'

message_uuid_var = ContextVar(MESSAGE_UUID, default='')
device_id_var = ContextVar(DEVICE_ID, default=0)
customer_id_var = ContextVar(CUSTOMER_ID, default=0)


def setup_context_vars(device_id: int):
def setup_context_vars(device_id: int, customer_id: int):
uuid_hex = uuid.uuid4().hex
message_uuid_var.set(uuid_hex)

device_id_var.set(device_id)
customer_id_var.set(customer_id)

set_tag(DEVICE_ID, device_id)
set_tag(CUSTOMER_ID, customer_id)
set_tag(MESSAGE_UUID, uuid_hex)
35 changes: 25 additions & 10 deletions src/mqtt_kafka_connector/services/prometheus.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import functools
import logging

from aioprometheus import Counter
from aioprometheus import Counter, Summary
from aioprometheus.service import Service
from mqtt_kafka_connector.conf import PROMETHEUS_PORT
from mqtt_kafka_connector.context_vars import customer_id_var, device_id_var

logger = logging.getLogger(__name__)

Expand All @@ -11,19 +13,32 @@ class Prometheus:
def __init__(self):
self.service = None
self.messages_counter = Counter(
'mqtt_to_kafka_messages_count', 'Number of messages.'
'messages_from_devices_count',
'Count of messages.',
)
self.telemetry_message_lag = Summary(
'telemetry_message_lag_seconds',
'Time lag between message time and current time.',
)

async def start(self):
self.service = Service()
await self.service.start(addr='0.0.0.0', port=PROMETHEUS_PORT)
logger.info('Prometheus Service is running')

def add(self, device_id: int, customer_id: int, messages_count: int):
self.messages_counter.add(
{
'device_id': str(device_id),
'customer_id': str(customer_id),
},
messages_count,
)
def _add(self, metric, value: float):
if self.service:
getattr(self, metric).add(
{
'device_id': str(device_id_var.get()),
'customer_id': str(customer_id_var.get()),
},
value=value,
)

messages_counter_add = functools.partialmethod(
_add, metric='messages_counter'
)
telemetry_message_lag_add = functools.partialmethod(
_add, metric='telemetry_message_lag'
)
Loading

0 comments on commit 990dc52

Please sign in to comment.