Skip to content

Commit

Permalink
Troubleshoot message_id handling for transactional routing
Browse files Browse the repository at this point in the history
  • Loading branch information
NeonDaniel committed Jul 26, 2023
1 parent b803877 commit f898591
Showing 1 changed file with 29 additions and 25 deletions.
54 changes: 29 additions & 25 deletions neon_mq_connector/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,8 @@ def emit_mq_message(cls,
queue: Optional[str] = '',
exchange_type: Union[str, ExchangeType] =
ExchangeType.direct,
expiration: int = 1000) -> str:
expiration: int = 1000,
override_message_id: bool = True) -> str:
"""
Emits request to the neon api service on the MQ bus
:param connection: pika connection object
Expand All @@ -359,33 +360,36 @@ def emit_mq_message(cls,
(defaults to direct)
:param expiration: mq message expiration time in millis
(defaults to 1 second)
:param override_message_id: if True, generate a unique `message_id`
:raises ValueError: invalid request data provided
:returns message_id: id of the sent message
"""
if request_data and len(request_data) > 0 and isinstance(request_data,
dict):
message_id = cls.create_unique_id()
request_data['message_id'] = message_id
with connection.channel() as channel:
if exchange:
channel.exchange_declare(exchange=exchange,
exchange_type=exchange_type,
auto_delete=False)
if queue:
declared_queue = channel.queue_declare(queue=queue,
auto_delete=False)
if exchange_type == ExchangeType.fanout.value:
channel.queue_bind(queue=declared_queue.method.queue,
exchange=exchange)
channel.basic_publish(exchange=exchange or '',
routing_key=queue,
body=dict_to_b64(request_data),
properties=pika.BasicProperties(
expiration=str(expiration)))
return message_id
else:
raise ValueError(f'Invalid request data provided: {request_data}')
if not isinstance(request_data, dict):
raise TypeError(f"Expected dict and got {type(request_data)}")
if not request_data:
raise ValueError(f'No request data provided')

if override_message_id or not request_data.get('message_id'):
request_data['message_id'] = cls.create_unique_id()

with connection.channel() as channel:
if exchange:
channel.exchange_declare(exchange=exchange,
exchange_type=exchange_type,
auto_delete=False)
if queue:
declared_queue = channel.queue_declare(queue=queue,
auto_delete=False)
if exchange_type == ExchangeType.fanout.value:
channel.queue_bind(queue=declared_queue.method.queue,
exchange=exchange)
channel.basic_publish(exchange=exchange or '',
routing_key=queue,
body=dict_to_b64(request_data),
properties=pika.BasicProperties(
expiration=str(expiration)))
LOG.debug(f"sent message: {request_data['message_id']}")
return request_data['message_id']

@classmethod
def publish_message(cls,
Expand Down

0 comments on commit f898591

Please sign in to comment.