-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support poisioning instruction ids to prevent the FnApi data stream from blocking on failed instructions #32857
base: master
Are you sure you want to change the base?
Conversation
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
caf0eaa
to
974a9b4
Compare
…rom blocking on failed instructions
974a9b4
to
fa7cde6
Compare
Test failures appear to be unrelated flakiness |
assign set of reviewers |
Run Java PreCommit |
Assigning reviewers. If you would like to opt out of this review, comment R: @robertwb for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
@Override | ||
public void poisonInstructionId(String instructionId) { | ||
LOG.debug("Poisoning instruction {}", instructionId); | ||
for (BeamFnDataGrpcMultiplexer client : multiplexerCache.values()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one thing this doesn't yet handle is if the instruction that is being poisoned is for an endpoint that has not yet been observed. This is only possible at beginning of the processing though.
I was working on refactoring creation of the bundle processor to determine the api endpoints before possible exception was thrown but it was getting large and would prefer to do it as a follow up because as-is this helps cases where the api endpoint was created before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to getting this in first.
An alternative is for all these multiplexers to share the same poisoned id cache.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for hunting this down and fixing it.
@@ -64,7 +75,8 @@ public BeamFnDataGrpcMultiplexer( | |||
baseOutboundObserverFactory) { | |||
this.apiServiceDescriptor = apiServiceDescriptor; | |||
this.receivers = new ConcurrentHashMap<>(); | |||
this.erroredInstructionIds = new ConcurrentHashMap<>(); | |||
this.poisonedInstructionIds = | |||
CacheBuilder.newBuilder().expireAfterWrite(Duration.ofMinutes(20)).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should 20 minutes be pulled out to a top-level constant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
instructionId, | ||
(unused) -> { | ||
if (poisonedInstructionIds.getIfPresent(instructionId) != null) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we have to special case this instead of simply always returning a completable future (even if it won't be used) as before? Or is it for better error messaging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This null in the computeIfAbsent is to avoid memory leak for long-running pipelines processing lots of instructions ids. If we're poisoned, it seems possible we won't unregister and thus if we add the id here it will stay in the map forever.
I removed this helper since I think it is clearer if the two cases do their own handling.
CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>> receiverFuture = | ||
receivers.remove(instructionId); | ||
if (receiverFuture != null && !receiverFuture.isDone()) { | ||
throw new IllegalStateException("Unregistering consumer which was not registered."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This error message doesn't seem to match the clauses of the if statement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added explanation comment.
@Override | ||
public void poisonInstructionId(String instructionId) { | ||
LOG.debug("Poisoning instruction {}", instructionId); | ||
for (BeamFnDataGrpcMultiplexer client : multiplexerCache.values()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to getting this in first.
An alternative is for all these multiplexers to share the same poisoned id cache.
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #32857 +/- ##
============================================
+ Coverage 57.41% 58.95% +1.53%
- Complexity 1475 3112 +1637
============================================
Files 968 1131 +163
Lines 154224 174787 +20563
Branches 1076 3357 +2281
============================================
+ Hits 88546 103038 +14492
- Misses 63477 68398 +4921
- Partials 2201 3351 +1150
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM once tests are passing.
…iterable: - remove reference to completed encoded input page from decoder once we have read it. - re-read from cache after loading the next page to give eviction a chance to remove blocks
Run Java PreCommit |
Run Java_IOs_Direct PreCommit |
Run Java_Pulsar_IO_Direct PreCommit |
Previously it could occur that an instruction id was observed on the control stream but due to exception it would never register a handler and the data stream would be forever blocked trying to multiplex elements to it's queue.
Fixes #32714
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.