-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
slow draining fix #24435
slow draining fix #24435
Conversation
…ing runs smoothly
R: @tvalentyn |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
@@ -537,8 +536,9 @@ def extract_output(self, accumulator, *args, **kwargs): | |||
else: | |||
assert not holds_comparables | |||
|
|||
assert len(heap) <= self._n | |||
heap.sort(reverse=True) | |||
if heap: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added null checking to handle empty variables that would've triggered the two assertions removed from this file.
This was the only error to emerge from removing the assertions. It seems that an empty accumulators being passed at lines 501 and 531 is what those assertions were meant to handle, but other than this, the existing logic handles it fine.
@@ -2475,7 +2475,7 @@ def typed(transform): | |||
|
|||
def inject_default(_, combined): | |||
if combined: | |||
assert len(combined) == 1 | |||
assert len(combined) >= 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case we've just changed the assertion to allow for "combined" longer than one. This doesn't throw an error but we do lose some info when we only return combined[0] one line later, though I had some trouble understanding what the significance of that was.
If that lost info doesn't matter, or if there's a better way to deal with it, I'm wondering if we need the assertion at all since the pipeline would just fail on the next line anyways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the intent of the _inject_default
function is to add a default element in the combined result if the combined result is empty and a default value can be well-defined as combine_fn.apply([], *args, **kwargs)
.
Users can disable default values if they use .without_defaults()
value when they apply a combiner.
The replacement of the empty output with default value happens at the last step of combining. At this stage, PCollection being combined should be reduced to 1 element, or be empty. Default value is not necessary when the result of combining is NOT empty - in this case we should take the reduced result. However, since this is evaluated after prior combining, we expect that the result of prior combining is a single value. Therefore, we have an assertion that checks that prior combining has reduced the pcollection to 1 element. If somehow we have more than 1 element, then the result may not be fully combined, so we should understand how that happened.
Codecov Report
@@ Coverage Diff @@
## master #24435 +/- ##
==========================================
+ Coverage 73.35% 73.38% +0.03%
==========================================
Files 719 719
Lines 97137 96904 -233
==========================================
- Hits 71251 71110 -141
+ Misses 24539 24471 -68
+ Partials 1347 1323 -24
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
re: [Merge branch 'drain-fix' of github.com:BjornPrime/my-beam into drain-fix] looks like locally you may have cloned beam repo from your fork of Beam (github.com:BjornPrime/my-beam) . Note that it's easier to clone Beam only from main repo, create branches on this clone locally, then use your fork to push the branches, and create pull requests from your fork into beam master branch. In other words, you create the fork github.com:BjornPrime/my-beam once but never use the code from this fork during day to day development. |
…riptive error message
if len(combined) > 1: | ||
raise ValueError( | ||
"Input from multiple simultaneous windows" | ||
"isn't currently supported for CombineGlobally()") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chatted briefly with @robertwb and my understanding is that CombineGlobally is expected support multiple windows, so this message may be confusing to users. We'll need to look further to understand the failure mode here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is that, as defined, CombineGlobally is intended to return a single-element PCollection and I'm not sure how to interpret that in a multi-window context. My recollection is that attempting to implement an output with more than one element (e.g. "output = [1,2,3]"), as well as trying to nest that output in a list (e.g. "output = [[1,2,3]]"), both created broader issues for other parts of the execution.
As it relates to the issue at hand, CombineGlobally was only seeing multiple windows due to an issue with closing windows not triggering the transform correctly (see #24682) that has now been resolved. I also created #24683 to address this gap in CombineGlobally's functionality, since I felt it stretched beyond the scope of this issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it can be repro'd w/ PubSub, that's also an issue that should be addressed. Do we need to do that to close the initial issue, or can that be its own issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my opinion the issue is the same, but I am not convinced we can close either at this time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we agree that this PR is at least a step in the right direction for now? I only set it to "address" #24438, so approving this PR won't close that issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies for slow response here.
My understanding is that, as defined, CombineGlobally is intended to return a single-element PCollection and I'm not sure how to interpret that in a multi-window context.
My understanding is that CombineGlobally should work for a multi-window context and result in combining down to 1 element per each key+window pair. If you are interested in looking further into this problem, you can reproduce this pipeline in Java SDK, which presumably doesn't have this bug in examine the behavior, and/or get a second opinion. That being said, in my opinion the "Input from multiple simultaneous windows isn't currently supported for CombineGlobally()" may discourage users from using CombineGlobally but it's something that we should fix instead.
Closing due to #24438 being closed. |
Problematic assertions removed or altered, null checking added. Draining runs smoothly now.
Addresses #24438.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.