-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Invoke teardown when DoFn throws in portable runners #32522
Conversation
7a0f26b
to
c0fe7fc
Compare
however the job does succeeded. The worker log shows
took extended period of time to match the file source (actually start to process data). Intermittent UW issue? |
// going to be re-used. | ||
for (ThrowingRunnable teardownFunction : Lists.reverse(this.getTearDownFunctions())) { | ||
try { | ||
teardownFunction.run(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we throw the exception above during one of the start functions we may not have called all of them. That could be confusing since then finish could be called without corresponding start. If we knew which start/teardown functions corresponeded (ie change to a single list with both matched up) we coudl remember the index we made it to on start functions to call teardown only on the right ones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The naming here was confusing. startFunction
are for startBundle
. tearDown
corresponds to setup
in terms of DoFns, where setup
was called as early as:
org.apache.beam.sdk.transforms.reflect.DoFnInvokers.tryInvokeSetupFor(DoFnInvokers.java:53)
org.apache.beam.fn.harness.FnApiDoFnRunner.<init>(FnApiDoFnRunner.java:505)
org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.createRunnerForPTransform(FnApiDoFnRunner.java:195)
org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.createRunnerForPTransform(FnApiDoFnRunner.java:163)
org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:307)
org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:261)
org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:261)
org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:261)
org.apache.beam.fn.harness.control.ProcessBundleHandler.createBundleProcessor(ProcessBundleHandler.java:861)
org.apache.beam.fn.harness.control.ProcessBundleHandler.lambda$processBundle$0(ProcessBundleHandler.java:511)
org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessorCache.get(ProcessBundleHandler.java:972)
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:507)
that was when BundleProcessor gets created. Therefore in bundleProcessor.discard call DoFn's @Setup
have been called before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the question is about throw the exception in setupFunction
, then tearDown is invoked earlier, at
beam/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
Line 56 in c3be9f0
doFnInvoker.invokeTeardown(); |
and createBundleProcessor won't complete and discard() method won't invoked
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the DoFnInvoker link where teardown is called if exception during startup is for a single DoFn.
If we have fused DoFnA->DoFnB we start in reverse order. If A throws exception in setup, tearDown is invoked on DoFnA. Do we call teardown on B which had startup invoked? If not we probably need some try catch in createBundleProcessor to invoke the added teardown functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tested that teardown on B isn't called, indeed. However this is same for legacy runner, except for legacy runner setup is called in sequence order (not reverse), which means if B throws in setup, tearDown in A won't be called.
Prefer to note this as a separate issue for now.
Java PVR Spark Batch failed twice, each time different test. This is due to #30512. None of them are related to ParDo lifecycle. |
Rerun Dataflow Example and test passed. Looks like intermittent issue (cause the SDK harness start to work 12 minutes late) |
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
PTAL @scwhittle @robertwb thanks! |
Assigning reviewers. If you would like to opt out of this review, comment R: @kennknowles for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
Outdated
Show resolved
Hide resolved
} catch (Throwable e) { | ||
LOG.warn( | ||
"Exceptions are thrown from DoFn.teardown method when trying to discard " | ||
+ "ProcessBundleHandler", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adjusted to warning and wording of it. Originally it was an Error, which may cause confuse on the cause of the error in case of a failed pipeline (the cause is upstream error in processElement, etc. not tearDown throws)
Reminder, please take a look at this pr: @kennknowles @damccorm |
R: @scwhittle |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
rebased onto latest HEAD to resolve merge conflict (on CHANGES.md) |
Looks like the spark runner failure might be real from a test that is now not being filtered. |
PostCommit Java PVR Spark Batch two attempt different random test failing due to #30512, not related to this PR, merging for now |
Fix (partly) #31381
Please add a meaningful description for your change here
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.