-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Change FnApiDoFnRunner to skip trySplit checkpoint requests if not draining and nothing has yet been claimed by the tracker. #32044
Conversation
71d957c
to
1672b40
Compare
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
1672b40
to
5205d03
Compare
Assigning reviewers. If you would like to opt out of this review, comment R: @m-trieu for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
…aining and nothing has yet been claimed by the tracker.
5205d03
to
20dc4dc
Compare
Run Java PreCommit |
1 similar comment
Run Java PreCommit |
@m-trieu could you PTAL or assign to a different reviewer ? |
I've asked @robertwb to take a look as he is familiar with fnapi/splitting |
R: @kennknowles |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM to the extent I understand it but this code is a lot and I have not delved into it previously.
@@ -327,6 +328,11 @@ static class Factory<InputT, RestrictionT, PositionT, WatermarkEstimatorStateT, | |||
* otherwise. | |||
*/ | |||
private RestrictionTracker<RestrictionT, PositionT> currentTracker; | |||
/** | |||
* If non-null, set to true after currentTracker has had a tryClaim issued on it. Used to ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it can be null, mark @Nullable
and document the meaning of a null value. (it looks to me like it is just "in between calls or during calls where it is not used" but it is hard to ascertain design intent of this class TBH)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, I was being consistent with existing currentTracker but agree it is confusing and me might as well improve it a little.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally makes sense. I might do a pass on this file to narrow the scope of the suppressions. It seems like an important file to leave vulnerable. Thanks!
// We are requesting a checkpoint but have not yet progressed on the restriction, skip | ||
// request. | ||
if (fractionOfRemainder == 0 | ||
&& currentTrackerClaimed != null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given it is not marked volatile
those could be out of date, right? Based on this and your use of lazySet
that seems to align with your intent, but I don't have a holistic understanding of this code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that the setting of the atomicboolean object being consulted is protected by splitLock happensafter ordering. This is the same as currentTracker synchronization.
AtomicBoolean is used since the restriction tracker observing can happen in the background but the laziness is ok since the split would be retried and the real value should be eventually observed.
Running some internal tests, but this LGTM (and is approved, and I'm not sure how much the testing will exercise it anyhow) |
Internal tests identified an issue if processElement didn't issue a tryClaim. Fixed and added a test failing without the fix. |
Run Java PreCommit |
This ensures that an SDF makes progress if the runner repeatedly attempts to split the restriction before work has been claimed.
This affects the Dataflow Runner which attempts to split in streaming pipelines after 5 seconds by default. If the SDF takes over 5 seconds to read the first element and attempt to claim it, the split will occur first, checkpointing the same restriction and the element will be dropped. On retry, it will resume from the same restriction and this may repeat.
This matches the behavior of the core java sdf invoker here which didn't start the split thread until the tryClaim was observed. Having the sdk ignore such split requests seems better than requiring the runner to attempt to time the split better as the sdf may be making progress just not returning any data yet to the runner.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.