Skip to content

Commit

Permalink
fixup!: tests pass
Browse files Browse the repository at this point in the history
  • Loading branch information
Rebecca Graber committed May 4, 2023
1 parent a34a4f1 commit 0535f74
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 99 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ Change Log
Unreleased
**********

[4.0.0] - 2023-05-04
********************
Changed
=======
* **BREAKING CHANGE**: consume_events no longer takes the signal as an argument. Consumers determine how to deserialize based on the signal type in the message header.
* Switch from ``edx-sphinx-theme`` to ``sphinx-book-theme`` since the former is
deprecated

Expand Down
2 changes: 1 addition & 1 deletion edx_event_bus_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer
from edx_event_bus_kafka.internal.producer import KafkaEventProducer, create_producer

__version__ = '3.9.6'
__version__ = '4.0.0'
98 changes: 61 additions & 37 deletions edx_event_bus_kafka/internal/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,19 +117,17 @@ def __init__(self, topic, group_id):
self.consumer = self._create_consumer()
self._shut_down_loop = False

# return type (Optional[DeserializingConsumer]) removed from signature to avoid error on import
# return type (Optional[Consumer]) removed from signature to avoid error on import
def _create_consumer(self):
"""
Create a DeserializingConsumer for events of the given signal instance.
Create a new Consumer in the consumer group.
Returns
None if confluent_kafka is not available.
DeserializingConsumer if it is.
Consumer if it is.
"""
consumer_config = load_common_settings()

# We do not deserialize the key because we don't need it for anything yet.
# Also see https://github.com/openedx/openedx-events/issues/86 for some challenges on determining key schema.
consumer_config.update({
'group.id': self.group_id,
# Turn off auto commit. Auto commit will commit offsets for the entire batch of messages received,
Expand Down Expand Up @@ -262,31 +260,10 @@ def _consume_indefinitely(self):
msg = self.consumer.poll(timeout=CONSUMER_POLL_TIMEOUT)
if msg is not None:
with function_trace('_consume_indefinitely_consume_single_message'):
# Before processing, make sure our db connection is still active
_reconnect_to_db_if_needed()
ctx = SerializationContext(msg.topic(), MessageField.VALUE, msg.headers())
value = msg.value()
event_type = get_message_header_values(msg.headers(), HEADER_EVENT_TYPE)[0]
the_signal = OpenEdxPublicSignal.get_signal_by_type(event_type)
signal_deserializer = AvroSignalDeserializer(the_signal)

schema_registry_client = get_schema_registry_client()


def inner_from_dict(event_data_dict, ctx=None): # pylint: disable=unused-argument
return signal_deserializer.from_dict(event_data_dict)

full_deserializer = AvroDeserializer(schema_str=signal_deserializer.schema_string(),
schema_registry_client=schema_registry_client,
from_dict=inner_from_dict)
logger.info(f"{full_deserializer=}")
new_value = full_deserializer(value, ctx)
msg.set_value(new_value)

self.emit_signals_from_message(msg, the_signal)
self.consume_single_message(msg, run_context)
consecutive_errors = 0
self._add_message_monitoring(run_context=run_context, message=msg)

self._add_message_monitoring(run_context=run_context, message=msg)
except Exception as e: # pylint: disable=broad-except
consecutive_errors += 1
self.record_event_consuming_error(run_context, e, msg)
Expand All @@ -309,6 +286,42 @@ def inner_from_dict(event_data_dict, ctx=None): # pylint: disable=unused-argume
finally:
self.consumer.close()

def consume_single_message(self, msg, run_context):
# Before processing, make sure our db connection is still active
_reconnect_to_db_if_needed()

# determine the event type of the message and use it to create a deserializer
all_msg_headers = msg.headers()
event_type = self._determine_event_type(all_msg_headers)
signal = None
try:
signal = OpenEdxPublicSignal.get_signal_by_type(event_type)
except KeyError as ke:
breakpoint()
raise UnusableMessageError(f"Unrecognized event_type {event_type}, cannot determine signal") from ke
run_context['expected_signal'] = signal

ctx = SerializationContext(msg.topic(), MessageField.VALUE, all_msg_headers)
value = msg.value()
signal_deserializer = AvroSignalDeserializer(signal)
schema_registry_client = get_schema_registry_client()

def inner_from_dict(event_data_dict, ctx=None): # pylint: disable=unused-argument
return signal_deserializer.from_dict(event_data_dict)

# We do not deserialize the key because we don't need it for anything yet.
# Also see https://github.com/openedx/openedx-events/issues/86 for some challenges on
# determining key schema.
value_deserializer = AvroDeserializer(schema_str=signal_deserializer.schema_string(),
schema_registry_client=schema_registry_client,
from_dict=inner_from_dict)
deserialized_value = value_deserializer(value, ctx)

# set the value of the message to the new deserialized version
msg.set_value(deserialized_value)

self.emit_signals_from_deserialized_message(msg, signal)

def consume_indefinitely(self, offset_timestamp=None):
"""
Consume events from a topic in an infinite loop.
Expand All @@ -330,13 +343,13 @@ def consume_indefinitely(self, offset_timestamp=None):
)
self.reset_offsets_and_sleep_indefinitely(offset_timestamp)

@function_trace('emit_signals_from_message')
def emit_signals_from_message(self, msg, signal):
@function_trace('emit_signals_from_deserialized_message')
def emit_signals_from_deserialized_message(self, msg, signal):
"""
Determine the correct signal and send the event from the message.
Arguments:
msg (Message): Consumed message.
msg (Message): Consumed message with the value deserialized
"""
self._log_message_received(msg)

Expand All @@ -362,7 +375,7 @@ def emit_signals_from_message(self, msg, signal):
# Raise an exception if any receivers errored out. This allows logging of the receivers
# along with partition, offset, etc. in record_event_consuming_error. Hopefully the
# receiver code is idempotent and we can just replay any messages that were involved.
self._check_receiver_results(send_results)
self._check_receiver_results(send_results, signal)

# At the very end, log that a message was processed successfully.
# Since we're single-threaded, no other information is needed;
Expand All @@ -371,7 +384,7 @@ def emit_signals_from_message(self, msg, signal):
if AUDIT_LOGGING_ENABLED.is_enabled():
logger.info('Message from Kafka processed successfully')

def _check_receiver_results(self, send_results: list):
def _check_receiver_results(self, send_results: list, signal: OpenEdxPublicSignal):
"""
Raises exception if any of the receivers produced an exception.
Expand Down Expand Up @@ -399,7 +412,7 @@ def _check_receiver_results(self, send_results: list):
raise ReceiverError(
f"{len(error_descriptions)} receiver(s) out of {len(send_results)} "
"produced errors (stack trace elsewhere in logs) "
f"when handling signal {self.signal}: {', '.join(error_descriptions)}",
f"when handling signal {signal}: {', '.join(error_descriptions)}",
errors
)

Expand Down Expand Up @@ -524,6 +537,18 @@ def _add_message_monitoring(self, run_context, message, error=None):
# Use this to fix any bugs in what should be benign monitoring code
set_custom_attribute('kafka_monitoring_error', repr(e))

def _determine_event_type(self, headers):
event_types = get_message_header_values(headers, HEADER_EVENT_TYPE)
if len(event_types) == 0:
raise UnusableMessageError(
"Missing ce_type header on message, cannot determine signal"
)
if len(event_types) > 1:
raise UnusableMessageError(
"Multiple ce_type headers found on message, cannot determine signal"
)
return event_types[0]

def _get_kafka_message_and_error(self, message, error):
"""
Returns tuple of (kafka_message, kafka_error), if they can be found.
Expand Down Expand Up @@ -566,12 +591,11 @@ class ConsumeEventsCommand(BaseCommand):
Management command for Kafka consumer workers in the event bus.
"""
help = """
Consume messages of specified signal type from a Kafka topic and send their data to that signal.
Consume messages of from a Kafka topic and emit the relevant signals.
Example::
python3 manage.py cms consume_events -t user-login -g user-activity-service \
-s org.openedx.learning.auth.session.login.completed.v1
python3 manage.py cms consume_events -t user-login -g user-activity-service
"""

def add_arguments(self, parser):
Expand Down
1 change: 1 addition & 0 deletions edx_event_bus_kafka/internal/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ def send(
key_serializer, value_serializer = get_serializers(signal, event_key_field)
key_bytes = key_serializer(event_key, SerializationContext(full_topic, MessageField.KEY, headers))
value_bytes = value_serializer(event_data, SerializationContext(full_topic, MessageField.VALUE, headers))
logger.info(f"{value_bytes=}")
self.producer.produce(
full_topic, key=key_bytes, value=value_bytes, headers=headers,
on_delivery=context.on_event_deliver,
Expand Down
Loading

0 comments on commit 0535f74

Please sign in to comment.