Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

slow draining fix #24435

Closed
wants to merge 12 commits into from
Closed
6 changes: 3 additions & 3 deletions sdks/python/apache_beam/transforms/combiners.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,6 @@ def merge_accumulators(self, accumulators, *args, **kwargs):
(holds_comparables, result_heap),
comparable.value if holds_comparables else comparable)

assert result_heap is not None and holds_comparables is not None
return (holds_comparables, result_heap)

def compact(self, accumulator, *args, **kwargs):
Expand All @@ -537,8 +536,9 @@ def extract_output(self, accumulator, *args, **kwargs):
else:
assert not holds_comparables

assert len(heap) <= self._n
heap.sort(reverse=True)
if heap:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added null checking to handle empty variables that would've triggered the two assertions removed from this file.
This was the only error to emerge from removing the assertions. It seems that an empty accumulators being passed at lines 501 and 531 is what those assertions were meant to handle, but other than this, the existing logic handles it fine.

heap.sort(reverse=True)

return [
comparable.value if holds_comparables else comparable
for comparable in heap
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2475,7 +2475,7 @@ def typed(transform):

def inject_default(_, combined):
if combined:
assert len(combined) == 1
assert len(combined) >= 1
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case we've just changed the assertion to allow for "combined" longer than one. This doesn't throw an error but we do lose some info when we only return combined[0] one line later, though I had some trouble understanding what the significance of that was.
If that lost info doesn't matter, or if there's a better way to deal with it, I'm wondering if we need the assertion at all since the pipeline would just fail on the next line anyways.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the intent of the _inject_default function is to add a default element in the combined result if the combined result is empty and a default value can be well-defined as combine_fn.apply([], *args, **kwargs).

Users can disable default values if they use .without_defaults() value when they apply a combiner.

The replacement of the empty output with default value happens at the last step of combining. At this stage, PCollection being combined should be reduced to 1 element, or be empty. Default value is not necessary when the result of combining is NOT empty - in this case we should take the reduced result. However, since this is evaluated after prior combining, we expect that the result of prior combining is a single value. Therefore, we have an assertion that checks that prior combining has reduced the pcollection to 1 element. If somehow we have more than 1 element, then the result may not be fully combined, so we should understand how that happened.

return combined[0]
else:
try:
Expand Down