From 0535f7418da408d5dffc089dbf59bea206ead1c0 Mon Sep 17 00:00:00 2001 From: Rebecca Graber Date: Thu, 4 May 2023 14:08:30 -0400 Subject: [PATCH] fixup!: tests pass --- CHANGELOG.rst | 5 + edx_event_bus_kafka/__init__.py | 2 +- edx_event_bus_kafka/internal/consumer.py | 98 +++++++----- edx_event_bus_kafka/internal/producer.py | 1 + .../internal/tests/test_consumer.py | 140 ++++++++++-------- .../internal/tests/test_producer.py | 2 +- .../internal/tests/test_utils.py | 5 +- manage.py | 0 8 files changed, 154 insertions(+), 99 deletions(-) mode change 100644 => 100755 manage.py diff --git a/CHANGELOG.rst b/CHANGELOG.rst index f4807fc..85a9455 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -14,6 +14,11 @@ Change Log Unreleased ********** +[4.0.0] - 2023-05-04 +******************** +Changed +======= +* **BREAKING CHANGE**: consume_events no longer takes the signal as an argument. Consumers determine how to deserialize based on the signal type in the message header. * Switch from ``edx-sphinx-theme`` to ``sphinx-book-theme`` since the former is deprecated diff --git a/edx_event_bus_kafka/__init__.py b/edx_event_bus_kafka/__init__.py index a6f8c79..1edcf92 100644 --- a/edx_event_bus_kafka/__init__.py +++ b/edx_event_bus_kafka/__init__.py @@ -9,4 +9,4 @@ from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer from edx_event_bus_kafka.internal.producer import KafkaEventProducer, create_producer -__version__ = '3.9.6' +__version__ = '4.0.0' diff --git a/edx_event_bus_kafka/internal/consumer.py b/edx_event_bus_kafka/internal/consumer.py index 5b92565..29673c0 100644 --- a/edx_event_bus_kafka/internal/consumer.py +++ b/edx_event_bus_kafka/internal/consumer.py @@ -117,19 +117,17 @@ def __init__(self, topic, group_id): self.consumer = self._create_consumer() self._shut_down_loop = False - # return type (Optional[DeserializingConsumer]) removed from signature to avoid error on import + # return type (Optional[Consumer]) removed from signature to avoid error on import def _create_consumer(self): """ - Create a DeserializingConsumer for events of the given signal instance. + Create a new Consumer in the consumer group. Returns None if confluent_kafka is not available. - DeserializingConsumer if it is. + Consumer if it is. """ consumer_config = load_common_settings() - # We do not deserialize the key because we don't need it for anything yet. - # Also see https://github.com/openedx/openedx-events/issues/86 for some challenges on determining key schema. consumer_config.update({ 'group.id': self.group_id, # Turn off auto commit. Auto commit will commit offsets for the entire batch of messages received, @@ -262,31 +260,10 @@ def _consume_indefinitely(self): msg = self.consumer.poll(timeout=CONSUMER_POLL_TIMEOUT) if msg is not None: with function_trace('_consume_indefinitely_consume_single_message'): - # Before processing, make sure our db connection is still active - _reconnect_to_db_if_needed() - ctx = SerializationContext(msg.topic(), MessageField.VALUE, msg.headers()) - value = msg.value() - event_type = get_message_header_values(msg.headers(), HEADER_EVENT_TYPE)[0] - the_signal = OpenEdxPublicSignal.get_signal_by_type(event_type) - signal_deserializer = AvroSignalDeserializer(the_signal) - - schema_registry_client = get_schema_registry_client() - - - def inner_from_dict(event_data_dict, ctx=None): # pylint: disable=unused-argument - return signal_deserializer.from_dict(event_data_dict) - - full_deserializer = AvroDeserializer(schema_str=signal_deserializer.schema_string(), - schema_registry_client=schema_registry_client, - from_dict=inner_from_dict) - logger.info(f"{full_deserializer=}") - new_value = full_deserializer(value, ctx) - msg.set_value(new_value) - - self.emit_signals_from_message(msg, the_signal) + self.consume_single_message(msg, run_context) consecutive_errors = 0 + self._add_message_monitoring(run_context=run_context, message=msg) - self._add_message_monitoring(run_context=run_context, message=msg) except Exception as e: # pylint: disable=broad-except consecutive_errors += 1 self.record_event_consuming_error(run_context, e, msg) @@ -309,6 +286,42 @@ def inner_from_dict(event_data_dict, ctx=None): # pylint: disable=unused-argume finally: self.consumer.close() + def consume_single_message(self, msg, run_context): + # Before processing, make sure our db connection is still active + _reconnect_to_db_if_needed() + + # determine the event type of the message and use it to create a deserializer + all_msg_headers = msg.headers() + event_type = self._determine_event_type(all_msg_headers) + signal = None + try: + signal = OpenEdxPublicSignal.get_signal_by_type(event_type) + except KeyError as ke: + breakpoint() + raise UnusableMessageError(f"Unrecognized event_type {event_type}, cannot determine signal") from ke + run_context['expected_signal'] = signal + + ctx = SerializationContext(msg.topic(), MessageField.VALUE, all_msg_headers) + value = msg.value() + signal_deserializer = AvroSignalDeserializer(signal) + schema_registry_client = get_schema_registry_client() + + def inner_from_dict(event_data_dict, ctx=None): # pylint: disable=unused-argument + return signal_deserializer.from_dict(event_data_dict) + + # We do not deserialize the key because we don't need it for anything yet. + # Also see https://github.com/openedx/openedx-events/issues/86 for some challenges on + # determining key schema. + value_deserializer = AvroDeserializer(schema_str=signal_deserializer.schema_string(), + schema_registry_client=schema_registry_client, + from_dict=inner_from_dict) + deserialized_value = value_deserializer(value, ctx) + + # set the value of the message to the new deserialized version + msg.set_value(deserialized_value) + + self.emit_signals_from_deserialized_message(msg, signal) + def consume_indefinitely(self, offset_timestamp=None): """ Consume events from a topic in an infinite loop. @@ -330,13 +343,13 @@ def consume_indefinitely(self, offset_timestamp=None): ) self.reset_offsets_and_sleep_indefinitely(offset_timestamp) - @function_trace('emit_signals_from_message') - def emit_signals_from_message(self, msg, signal): + @function_trace('emit_signals_from_deserialized_message') + def emit_signals_from_deserialized_message(self, msg, signal): """ Determine the correct signal and send the event from the message. Arguments: - msg (Message): Consumed message. + msg (Message): Consumed message with the value deserialized """ self._log_message_received(msg) @@ -362,7 +375,7 @@ def emit_signals_from_message(self, msg, signal): # Raise an exception if any receivers errored out. This allows logging of the receivers # along with partition, offset, etc. in record_event_consuming_error. Hopefully the # receiver code is idempotent and we can just replay any messages that were involved. - self._check_receiver_results(send_results) + self._check_receiver_results(send_results, signal) # At the very end, log that a message was processed successfully. # Since we're single-threaded, no other information is needed; @@ -371,7 +384,7 @@ def emit_signals_from_message(self, msg, signal): if AUDIT_LOGGING_ENABLED.is_enabled(): logger.info('Message from Kafka processed successfully') - def _check_receiver_results(self, send_results: list): + def _check_receiver_results(self, send_results: list, signal: OpenEdxPublicSignal): """ Raises exception if any of the receivers produced an exception. @@ -399,7 +412,7 @@ def _check_receiver_results(self, send_results: list): raise ReceiverError( f"{len(error_descriptions)} receiver(s) out of {len(send_results)} " "produced errors (stack trace elsewhere in logs) " - f"when handling signal {self.signal}: {', '.join(error_descriptions)}", + f"when handling signal {signal}: {', '.join(error_descriptions)}", errors ) @@ -524,6 +537,18 @@ def _add_message_monitoring(self, run_context, message, error=None): # Use this to fix any bugs in what should be benign monitoring code set_custom_attribute('kafka_monitoring_error', repr(e)) + def _determine_event_type(self, headers): + event_types = get_message_header_values(headers, HEADER_EVENT_TYPE) + if len(event_types) == 0: + raise UnusableMessageError( + "Missing ce_type header on message, cannot determine signal" + ) + if len(event_types) > 1: + raise UnusableMessageError( + "Multiple ce_type headers found on message, cannot determine signal" + ) + return event_types[0] + def _get_kafka_message_and_error(self, message, error): """ Returns tuple of (kafka_message, kafka_error), if they can be found. @@ -566,12 +591,11 @@ class ConsumeEventsCommand(BaseCommand): Management command for Kafka consumer workers in the event bus. """ help = """ - Consume messages of specified signal type from a Kafka topic and send their data to that signal. + Consume messages of from a Kafka topic and emit the relevant signals. Example:: - python3 manage.py cms consume_events -t user-login -g user-activity-service \ - -s org.openedx.learning.auth.session.login.completed.v1 + python3 manage.py cms consume_events -t user-login -g user-activity-service """ def add_arguments(self, parser): diff --git a/edx_event_bus_kafka/internal/producer.py b/edx_event_bus_kafka/internal/producer.py index efd3578..f42d60b 100644 --- a/edx_event_bus_kafka/internal/producer.py +++ b/edx_event_bus_kafka/internal/producer.py @@ -275,6 +275,7 @@ def send( key_serializer, value_serializer = get_serializers(signal, event_key_field) key_bytes = key_serializer(event_key, SerializationContext(full_topic, MessageField.KEY, headers)) value_bytes = value_serializer(event_data, SerializationContext(full_topic, MessageField.VALUE, headers)) + logger.info(f"{value_bytes=}") self.producer.produce( full_topic, key=key_bytes, value=value_bytes, headers=headers, on_delivery=context.on_event_deliver, diff --git a/edx_event_bus_kafka/internal/tests/test_consumer.py b/edx_event_bus_kafka/internal/tests/test_consumer.py index 5adafb3..3a22b4a 100644 --- a/edx_event_bus_kafka/internal/tests/test_consumer.py +++ b/edx_event_bus_kafka/internal/tests/test_consumer.py @@ -66,12 +66,28 @@ def setUp(self): ) ) } + # determined by manual testing + self.normal_event_data_bytes = b'\x00\x00\x01\x86\xb3\xf6\x01\x01\x0cfoobob\x1ebob@foo.example\x0eBob Foo' self.message_id = uuid1() self.message_id_bytes = str(self.message_id).encode('utf-8') self.signal_type_bytes = b'org.openedx.learning.auth.session.login.completed.v1' self.signal_type = self.signal_type_bytes.decode('utf-8') self.normal_message = FakeMessage( + topic='local-some-topic', + partition=2, + offset=12345, + headers=[ + ('ce_id', self.message_id_bytes), + ('ce_type', self.signal_type_bytes), + ], + key=b'\x00\x00\x00\x00\x01\x0cfoobob', # Avro, as observed in manual test + value=self.normal_event_data_bytes, + error=None, + timestamp=(TIMESTAMP_CREATE_TIME, 1675114920123), + ) + + self.deserialized_normal_message = FakeMessage( topic='local-some-topic', partition=2, offset=12345, @@ -84,12 +100,14 @@ def setUp(self): error=None, timestamp=(TIMESTAMP_CREATE_TIME, 1675114920123), ) + + self.mock_receiver = Mock() self.signal = SESSION_LOGIN_COMPLETED self.signal.connect(fake_receiver_returns_quietly) self.signal.connect(fake_receiver_raises_error) self.signal.connect(self.mock_receiver) - self.event_consumer = KafkaEventConsumer('some-topic', 'test_group_id', self.signal) + self.event_consumer = KafkaEventConsumer('some-topic', 'test_group_id') def tearDown(self): self.signal.disconnect(fake_receiver_returns_quietly) @@ -163,7 +181,7 @@ def test_consume_loop(self, mock_sleep, mock_logger, mock_set_custom_attribute): def raise_exception(): raise Exception("something broke") - # How the emit_signals_from_message() mock will behave on each successive call. + # How the emit_signals_from_deserialized_message() mock will behave on each successive call. mock_emit_side_effects = [ lambda: None, # accept and ignore a message raise_exception, @@ -173,18 +191,20 @@ def raise_exception(): self.event_consumer._shut_down # pylint: disable=protected-access ] - with patch.object( - self.event_consumer, 'emit_signals_from_message', - side_effect=side_effects(mock_emit_side_effects), - ) as mock_emit: - mock_consumer = Mock(**{'poll.return_value': self.normal_message}, autospec=True) - self.event_consumer.consumer = mock_consumer - self.event_consumer.consume_indefinitely() + with patch('edx_event_bus_kafka.internal.consumer.AvroDeserializer', + return_value=lambda _x, _y: self.normal_event_data): + with patch.object( + self.event_consumer, 'emit_signals_from_deserialized_message', + side_effect=side_effects(mock_emit_side_effects), + ) as mock_emit: + mock_consumer = Mock(**{'poll.return_value': self.normal_message}, autospec=True) + self.event_consumer.consumer = mock_consumer + self.event_consumer.consume_indefinitely() # Check that each of the mocked out methods got called as expected. mock_consumer.subscribe.assert_called_once_with(['local-some-topic']) # Check that emit was called the expected number of times - assert mock_emit.call_args_list == [call(self.normal_message)] * len(mock_emit_side_effects) + assert mock_emit.call_args_list == [call(self.normal_message, self.signal)] * len(mock_emit_side_effects) # Check that there was one error log message and that it contained all the right parts, # in some order. @@ -232,16 +252,18 @@ def raise_exception(): exception_count = 4 - with patch.object( - self.event_consumer, 'emit_signals_from_message', - side_effect=side_effects([raise_exception] * exception_count) - ) as mock_emit: - mock_consumer = Mock(**{'poll.return_value': self.normal_message}, autospec=True) - self.event_consumer.consumer = mock_consumer - with pytest.raises(Exception) as exc_info: - self.event_consumer.consume_indefinitely() + with patch('edx_event_bus_kafka.internal.consumer.AvroDeserializer', + return_value=lambda _x, _y: self.normal_event_data): + with patch.object( + self.event_consumer, 'emit_signals_from_deserialized_message', + side_effect=side_effects([raise_exception] * exception_count) + ) as mock_emit: + mock_consumer = Mock(**{'poll.return_value': self.normal_message}, autospec=True) + self.event_consumer.consumer = mock_consumer + with pytest.raises(Exception) as exc_info: + self.event_consumer.consume_indefinitely() - assert mock_emit.call_args_list == [call(self.normal_message)] * exception_count + assert mock_emit.call_args_list == [call(self.normal_message, self.signal)] * exception_count assert exc_info.value.args == ("Too many consecutive errors, exiting (4 in a row)",) @override_settings( @@ -261,13 +283,15 @@ def test_connection_reset(self, has_connection, is_usable, reconnect_expected, m mock_connection.connection = None mock_connection.is_usable.return_value = is_usable - with patch.object( - self.event_consumer, 'emit_signals_from_message', - side_effect=side_effects([self.event_consumer._shut_down]) # pylint: disable=protected-access - ): - mock_consumer = Mock(**{'poll.return_value': self.normal_message}, autospec=True) - self.event_consumer.consumer = mock_consumer - self.event_consumer.consume_indefinitely() + with patch('edx_event_bus_kafka.internal.consumer.AvroDeserializer', + return_value=lambda _x, _y: self.normal_event_data): + with patch.object( + self.event_consumer, 'emit_signals_from_deserialized_message', + side_effect=side_effects([self.event_consumer._shut_down]) # pylint: disable=protected-access + ): + mock_consumer = Mock(**{'poll.return_value': self.normal_message}, autospec=True) + self.event_consumer.consumer = mock_consumer + self.event_consumer.consume_indefinitely() if reconnect_expected: mock_connection.connect.assert_called_once() @@ -307,15 +331,17 @@ def raise_exception(): self.event_consumer._shut_down, # pylint: disable=protected-access ] - with patch.object( - self.event_consumer, 'emit_signals_from_message', - side_effect=side_effects(mock_emit_side_effects) - ) as mock_emit: - mock_consumer = Mock(**{'poll.return_value': self.normal_message}, autospec=True) - self.event_consumer.consumer = mock_consumer - self.event_consumer.consume_indefinitely() # exits normally + with patch('edx_event_bus_kafka.internal.consumer.AvroDeserializer', + return_value=lambda _x, _y: self.normal_event_data): + with patch.object( + self.event_consumer, 'emit_signals_from_deserialized_message', + side_effect=side_effects(mock_emit_side_effects) + ) as mock_emit: + mock_consumer = Mock(**{'poll.return_value': self.normal_message}, autospec=True) + self.event_consumer.consumer = mock_consumer + self.event_consumer.consume_indefinitely() # exits normally - assert mock_emit.call_args_list == [call(self.normal_message)] * len(mock_emit_side_effects) + assert mock_emit.call_args_list == [call(self.normal_message, self.signal)] * len(mock_emit_side_effects) TEST_FAILED_MESSAGE = FakeMessage( partition=7, @@ -379,8 +405,6 @@ def poll_side_effect(*args, **kwargs): assert f"Error consuming event from Kafka: {repr(exception)} in context" in exc_log_msg assert "full_topic='local-some-topic'" in exc_log_msg assert "consumer_group='test_group_id'" in exc_log_msg - assert ("expected_signal=") in exc_log_msg if has_message: assert "-- event details" in exc_log_msg else: @@ -413,11 +437,12 @@ def test_check_event_error(self): but we check it anyway as a safeguard. This test exercises that branch. """ with pytest.raises(Exception) as exc_info: - self.event_consumer.emit_signals_from_message( + self.event_consumer.emit_signals_from_deserialized_message( FakeMessage( partition=2, error=KafkaError(123, "done broke"), - ) + ), + self.signal, ) assert exc_info.value.args == ( @@ -435,7 +460,7 @@ def test_emit_success(self, audit_logging, mock_logger, mock_set_attribute): self.signal.disconnect(fake_receiver_raises_error) # just successes for this one! with override_settings(EVENT_BUS_KAFKA_AUDIT_LOGGING_ENABLED=audit_logging): - self.event_consumer.emit_signals_from_message(self.normal_message) + self.event_consumer.emit_signals_from_deserialized_message(self.deserialized_normal_message, self.signal) self.assert_signal_sent_with(self.signal, self.normal_event_data) # Specifically, not called with 'kafka_logging_error' mock_set_attribute.assert_not_called() @@ -458,9 +483,9 @@ def test_emit_success(self, audit_logging, mock_logger, mock_set_attribute): @patch('edx_event_bus_kafka.internal.consumer.logger', autospec=True) def test_emit_success_tolerates_missing_timestamp(self, mock_logger, mock_set_attribute): self.signal.disconnect(fake_receiver_raises_error) # just successes for this one! - self.normal_message._timestamp = (TIMESTAMP_NOT_AVAILABLE, None) # pylint: disable=protected-access + self.deserialized_normal_message._timestamp = (TIMESTAMP_NOT_AVAILABLE, None) # pylint: disable=protected-access - self.event_consumer.emit_signals_from_message(self.normal_message) + self.event_consumer.emit_signals_from_deserialized_message(self.deserialized_normal_message, self.signal) self.assert_signal_sent_with(self.signal, self.normal_event_data) # Specifically, not called with 'kafka_logging_error' mock_set_attribute.assert_not_called() @@ -476,7 +501,7 @@ def test_emit_success_tolerates_missing_timestamp(self, mock_logger, mock_set_at @patch('django.dispatch.dispatcher.logger', autospec=True) def test_emit(self, mock_logger): with pytest.raises(ReceiverError) as exc_info: - self.event_consumer.emit_signals_from_message(self.normal_message) + self.event_consumer.emit_signals_from_deserialized_message(self.deserialized_normal_message, self.signal) self.assert_signal_sent_with(self.signal, self.normal_event_data) assert exc_info.value.args == ( "1 receiver(s) out of 3 produced errors (stack trace elsewhere in logs) " @@ -506,7 +531,7 @@ def test_malformed_receiver_errors(self): (lambda x:x, Exception("for lambda")), # This would actually raise an error inside send_robust(), but it will serve well enough for testing... ("not even a function", Exception("just plain bad")), - ]) + ], self.signal) assert exc_info.value.args == ( "2 receiver(s) out of 2 produced errors (stack trace elsewhere in logs) " "when handling signal Optional[bytes]: return self._key def value(self): - """Deserialized event value.""" + """Event value (in bytes(Avro) or JSON)""" return self._value + def set_value(self, new_value): + self._value = new_value + def error(self): return self._error diff --git a/manage.py b/manage.py old mode 100644 new mode 100755