Add Kafka DLQ Subscriber Preview functionality (GSI-1127) #136
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
🚧 Under Construction! 🚧
KafkaConfig
kafka_preview_limit
, which is 1 by default.ExtractedEventInfo
type_
assignment used to fall back to the header value, but didn't remove it. The result is that the "type" would be found both in the top-leveltype_
variable as well as inheaders["type"]
. Now, the value is popped.KafkaDLQSubscriber
preview
method that return at most N=kafka_preview_limit
events from the configured DLQ topic.run(ignore=True)
into a new method,ignore()
.run(ignore=False)
toprocess()
.InboundProviderBase
because this is actually a specialized class that has little overlap with the normal subscriber provider class.__init__
function now takes the whole config instance and internally breaks out the values it needs (topics/preview limit)The preview operation uses
AIOKafkaConsumer.getmany
withmax_records
set to the configured preview limit with an arbitrary but reasonable timeout. The point of the timeout is to retrieve the records without hanging forever, since we can't know if or how many records will be available to fetch.