Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

slow draining fix #24435

Closed
wants to merge 12 commits into from
Closed
5 changes: 3 additions & 2 deletions sdks/python/apache_beam/transforms/combiners.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,6 @@ def merge_accumulators(self, accumulators, *args, **kwargs):
(holds_comparables, result_heap),
comparable.value if holds_comparables else comparable)

assert result_heap is not None and holds_comparables is not None
return (holds_comparables, result_heap)

def compact(self, accumulator, *args, **kwargs):
Expand All @@ -537,8 +536,10 @@ def extract_output(self, accumulator, *args, **kwargs):
else:
assert not holds_comparables

assert len(heap) <= self._n
if not heap: return []

heap.sort(reverse=True)

return [
comparable.value if holds_comparables else comparable
for comparable in heap
Expand Down
6 changes: 5 additions & 1 deletion sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2474,9 +2474,13 @@ def typed(transform):
# Capture in closure (avoiding capturing self).
args, kwargs = self.args, self.kwargs

#TODO: We need to add support for multi-window inputs to CombineGlobally()
def inject_default(_, combined):
if combined:
assert len(combined) == 1
if len(combined) > 1:
raise ValueError(
"Input from multiple simultaneous windows"
"isn't currently supported for CombineGlobally()")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chatted briefly with @robertwb and my understanding is that CombineGlobally is expected support multiple windows, so this message may be confusing to users. We'll need to look further to understand the failure mode here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that, as defined, CombineGlobally is intended to return a single-element PCollection and I'm not sure how to interpret that in a multi-window context. My recollection is that attempting to implement an output with more than one element (e.g. "output = [1,2,3]"), as well as trying to nest that output in a list (e.g. "output = [[1,2,3]]"), both created broader issues for other parts of the execution.

As it relates to the issue at hand, CombineGlobally was only seeing multiple windows due to an issue with closing windows not triggering the transform correctly (see #24682) that has now been resolved. I also created #24683 to address this gap in CombineGlobally's functionality, since I felt it stretched beyond the scope of this issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#24682 was fixed for PeriodicImpulse, I think #24438 was reproducibile w/ a PubSub source as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it can be repro'd w/ PubSub, that's also an issue that should be addressed. Do we need to do that to close the initial issue, or can that be its own issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion the issue is the same, but I am not convinced we can close either at this time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we agree that this PR is at least a step in the right direction for now? I only set it to "address" #24438, so approving this PR won't close that issue.

Copy link
Contributor

@tvalentyn tvalentyn Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for slow response here.

My understanding is that, as defined, CombineGlobally is intended to return a single-element PCollection and I'm not sure how to interpret that in a multi-window context.

My understanding is that CombineGlobally should work for a multi-window context and result in combining down to 1 element per each key+window pair. If you are interested in looking further into this problem, you can reproduce this pipeline in Java SDK, which presumably doesn't have this bug in examine the behavior, and/or get a second opinion. That being said, in my opinion the "Input from multiple simultaneous windows isn't currently supported for CombineGlobally()" may discourage users from using CombineGlobally but it's something that we should fix instead.

return combined[0]
else:
try:
Expand Down