Skip to content

Commit

Permalink
feat: multiple events per topic
Browse files Browse the repository at this point in the history
  • Loading branch information
Rebecca Graber committed May 4, 2023
1 parent f6471f0 commit a34a4f1
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 43 deletions.
68 changes: 25 additions & 43 deletions edx_event_bus_kafka/internal/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@
# See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0005-optional-import-of-confluent-kafka.rst
try:
import confluent_kafka
from confluent_kafka import TIMESTAMP_NOT_AVAILABLE, DeserializingConsumer
from confluent_kafka import TIMESTAMP_NOT_AVAILABLE, DeserializingConsumer, Consumer
from confluent_kafka.error import KafkaError
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import SerializationContext, MessageField
except ImportError: # pragma: no cover
confluent_kafka = None

Expand Down Expand Up @@ -107,13 +108,12 @@ class KafkaEventConsumer:
Can also consume messages indefinitely off the queue.
"""

def __init__(self, topic, group_id, signal):
def __init__(self, topic, group_id):
if confluent_kafka is None: # pragma: no cover
raise Exception('Library confluent-kafka not available. Cannot create event consumer.')

self.topic = topic
self.group_id = group_id
self.signal = signal
self.consumer = self._create_consumer()
self._shut_down_loop = False

Expand All @@ -126,30 +126,19 @@ def _create_consumer(self):
None if confluent_kafka is not available.
DeserializingConsumer if it is.
"""

schema_registry_client = get_schema_registry_client()

signal_deserializer = AvroSignalDeserializer(self.signal)

def inner_from_dict(event_data_dict, ctx=None): # pylint: disable=unused-argument
return signal_deserializer.from_dict(event_data_dict)

consumer_config = load_common_settings()

# We do not deserialize the key because we don't need it for anything yet.
# Also see https://github.com/openedx/openedx-events/issues/86 for some challenges on determining key schema.
consumer_config.update({
'group.id': self.group_id,
'value.deserializer': AvroDeserializer(schema_str=signal_deserializer.schema_string(),
schema_registry_client=schema_registry_client,
from_dict=inner_from_dict),
# Turn off auto commit. Auto commit will commit offsets for the entire batch of messages received,
# potentially resulting in data loss if some of those messages are not fully processed. See
# https://newrelic.com/blog/best-practices/kafka-consumer-config-auto-commit-data-loss
'enable.auto.commit': False,
})

return DeserializingConsumer(consumer_config)
return Consumer(consumer_config)

def _shut_down(self):
"""
Expand Down Expand Up @@ -248,7 +237,6 @@ def _consume_indefinitely(self):
run_context = {
'full_topic': full_topic,
'consumer_group': self.group_id,
'expected_signal': self.signal,
}
self.consumer.subscribe([full_topic])
logger.info(f"Running consumer for {run_context!r}")
Expand Down Expand Up @@ -276,8 +264,26 @@ def _consume_indefinitely(self):
with function_trace('_consume_indefinitely_consume_single_message'):
# Before processing, make sure our db connection is still active
_reconnect_to_db_if_needed()
ctx = SerializationContext(msg.topic(), MessageField.VALUE, msg.headers())
value = msg.value()
event_type = get_message_header_values(msg.headers(), HEADER_EVENT_TYPE)[0]
the_signal = OpenEdxPublicSignal.get_signal_by_type(event_type)
signal_deserializer = AvroSignalDeserializer(the_signal)

schema_registry_client = get_schema_registry_client()

self.emit_signals_from_message(msg)

def inner_from_dict(event_data_dict, ctx=None): # pylint: disable=unused-argument
return signal_deserializer.from_dict(event_data_dict)

full_deserializer = AvroDeserializer(schema_str=signal_deserializer.schema_string(),
schema_registry_client=schema_registry_client,
from_dict=inner_from_dict)
logger.info(f"{full_deserializer=}")
new_value = full_deserializer(value, ctx)
msg.set_value(new_value)

self.emit_signals_from_message(msg, the_signal)
consecutive_errors = 0

self._add_message_monitoring(run_context=run_context, message=msg)
Expand Down Expand Up @@ -325,7 +331,7 @@ def consume_indefinitely(self, offset_timestamp=None):
self.reset_offsets_and_sleep_indefinitely(offset_timestamp)

@function_trace('emit_signals_from_message')
def emit_signals_from_message(self, msg):
def emit_signals_from_message(self, msg, signal):
"""
Determine the correct signal and send the event from the message.
Expand All @@ -345,29 +351,13 @@ def emit_signals_from_message(self, msg):

headers = msg.headers() or [] # treat None as []

event_types = get_message_header_values(headers, HEADER_EVENT_TYPE)
if len(event_types) == 0:
raise UnusableMessageError(
"Missing ce_type header on message, cannot determine signal"
)
if len(event_types) > 1:
raise UnusableMessageError(
"Multiple ce_type headers found on message, cannot determine signal"
)
event_type = event_types[0]

if event_type != self.signal.event_type:
raise UnusableMessageError(
f"Signal types do not match. Expected {self.signal.event_type}. "
f"Received message of type {event_type}."
)
try:
event_metadata = _get_metadata_from_headers(headers)
except Exception as e:
raise UnusableMessageError(f"Error determining metadata from message headers: {e}") from e

with function_trace('emit_signals_from_message_send_event_with_custom_metadata'):
send_results = self.signal.send_event_with_custom_metadata(event_metadata, **msg.value())
send_results = signal.send_event_with_custom_metadata(event_metadata, **msg.value())

# Raise an exception if any receivers errored out. This allows logging of the receivers
# along with partition, offset, etc. in record_event_consuming_error. Hopefully the
Expand Down Expand Up @@ -599,12 +589,6 @@ def add_arguments(self, parser):
required=True,
help='Consumer group id'
)
parser.add_argument(
'-s', '--signal',
nargs=1,
required=True,
help='Type of signal to emit from consumed messages.'
)
parser.add_argument(
'-o', '--offset_time',
nargs=1,
Expand All @@ -628,7 +612,6 @@ def handle(self, *args, **options):

try:
load_all_signals()
signal = OpenEdxPublicSignal.get_signal_by_type(options['signal'][0])
if options['offset_time'] and options['offset_time'][0] is not None:
try:
offset_timestamp = datetime.fromisoformat(options['offset_time'][0])
Expand All @@ -641,7 +624,6 @@ def handle(self, *args, **options):
event_consumer = KafkaEventConsumer(
topic=options['topic'][0],
group_id=options['group_id'][0],
signal=signal,
)
if offset_timestamp is None:
event_consumer.consume_indefinitely()
Expand Down
3 changes: 3 additions & 0 deletions edx_event_bus_kafka/internal/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import confluent_kafka
from confluent_kafka import Producer
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.schema_registry import topic_record_subject_name_strategy
from confluent_kafka.serialization import MessageField, SerializationContext
except ImportError: # pragma: no cover
confluent_kafka = None
Expand Down Expand Up @@ -165,11 +166,13 @@ def inner_to_dict(event_data, ctx=None): # pylint: disable=unused-argument
schema_str=extract_key_schema(signal_serializer, event_key_field),
schema_registry_client=client,
to_dict=inner_to_dict,
conf={'subject.name.strategy': topic_record_subject_name_strategy}
)
value_serializer = AvroSerializer(
schema_str=signal_serializer.schema_string(),
schema_registry_client=client,
to_dict=inner_to_dict,
conf={'subject.name.strategy': topic_record_subject_name_strategy}
)

return key_serializer, value_serializer
Expand Down

0 comments on commit a34a4f1

Please sign in to comment.