-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ErrorHandler DLQ API to Python #31856
Conversation
R: @johnjcasey |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while pending @johnjcasey review also went through once and mostly LGTM
a fluent manner, disaggregating the error processing specification from | ||
the main processing chain. | ||
|
||
They is typically used as follows:: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: They
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Thanks. Any other thoughts on this, @johnjcasey ? |
Another ping on this. |
would be nice to get this in by next release. For past contributor/reviewers on this topic, maybe @bzablocki (who reviewed #27145) could take another look if needed? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small comments, LGTM overall
with beam.Pipeline() as p: | ||
pcoll = p | beam.Create(['a', 'bb', 'cccc']) | ||
with error_handling.ErrorHandler( | ||
beam.Map(lambda x: "error: %s" % x[0])) as error_handler: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we access here the first element in the array (Map(lambda x: "error: %s" % x[0]))
) and in the test above we just access the entire element (Map(lambda x: "error: %s" % x)
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PTransformWithErrors.with_error_handling()
returns a PCollection of elements as errors, whereas the standard ParDo(...).with_error_handling()
returns the Python equivalent of bad records that attach the bad elements to the exception thrown.
assert_that(result, equal_to(['A', 'Bb']), label='CheckGood') | ||
assert_that(error_pcoll, equal_to(['cccc']), label='CheckBad') | ||
|
||
def test_error_on_collecting_error_handler_without_output_retrieval(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we also add a unit test with the CollectingErrorHandler that is closed but not consumed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is closed (due to the context) but not consumed.
timeout, | ||
error_handler) | ||
|
||
def with_error_handler(self, error_handler, **exception_handling_kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we also have a unit test that would show how the **exception_handling_kwargs
can be used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. Done.
Co-authored-by: Bartosz Zablocki <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review!
It recently came to my attention that this was only added to Java.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.