From 7adcd2dcd2341ce19fce382c32380787507f63c7 Mon Sep 17 00:00:00 2001 From: liferoad Date: Fri, 7 Jul 2023 12:56:29 -0400 Subject: [PATCH] Fix CombineGlobally with GlobalWindows (#26922) Adds a better error message when this is used in an unsupported way as well as more testing. --- .../apache_beam/transforms/combiners_test.py | 44 +++++++++++++++++++ sdks/python/apache_beam/transforms/core.py | 14 ++++++ 2 files changed, 58 insertions(+) diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py index 385b3332e0c4..a8979239f831 100644 --- a/sdks/python/apache_beam/transforms/combiners_test.py +++ b/sdks/python/apache_beam/transforms/combiners_test.py @@ -20,6 +20,7 @@ import itertools import random +import time import unittest import hamcrest as hc @@ -44,6 +45,7 @@ from apache_beam.transforms.core import Map from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher +from apache_beam.transforms.periodicsequence import PeriodicImpulse from apache_beam.transforms.ptransform import PTransform from apache_beam.transforms.trigger import AfterAll from apache_beam.transforms.trigger import AfterCount @@ -977,5 +979,47 @@ def test_combiner_latest(self): label='assert per window') +class CombineGloballyTest(unittest.TestCase): + def test_combine_globally_for_unbounded_source_with_default(self): + # this error is logged since the below combination is ill-defined. + with self.assertLogs() as captured_logs: + with TestPipeline() as p: + _ = ( + p + | PeriodicImpulse( + start_timestamp=time.time(), + stop_timestamp=time.time() + 4, + fire_interval=1, + apply_windowing=False, + ) + | beam.Map(lambda x: ('c', 1)) + | beam.WindowInto( + window.GlobalWindows(), + trigger=trigger.Repeatedly(trigger.AfterCount(2)), + accumulation_mode=trigger.AccumulationMode.DISCARDING, + ) + | beam.combiners.Count.Globally()) + self.assertIn('unbounded collections', '\n'.join(captured_logs.output)) + + def test_combine_globally_for_unbounded_source_without_defaults(self): + # this is the supported case + with TestPipeline() as p: + _ = ( + p + | PeriodicImpulse( + start_timestamp=time.time(), + stop_timestamp=time.time() + 4, + fire_interval=1, + apply_windowing=False, + ) + | beam.Map(lambda x: 1) + | beam.WindowInto( + window.GlobalWindows(), + trigger=trigger.Repeatedly(trigger.AfterCount(2)), + accumulation_mode=trigger.AccumulationMode.DISCARDING, + ) + | beam.CombineGlobally(sum).without_defaults()) + + if __name__ == '__main__': unittest.main() diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 6fd8cd2e03ba..66ac8fbad967 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -2665,6 +2665,15 @@ def add_input_types(transform): "or CombineGlobally().as_singleton_view() to get the default " "output of the CombineFn if the input PCollection is empty.") + # log the error for this ill-defined streaming case now + if not pcoll.is_bounded and not pcoll.windowing.is_default(): + _LOGGER.error( + "When combining elements in unbounded collections with " + "the non-default windowing strategy, you must explicitly " + "specify how to define the combined result of an empty window. " + "Please use CombineGlobally().without_defaults() to output " + "an empty PCollection if the input PCollection is empty.") + def typed(transform): # TODO(robertwb): We should infer this. if combined.element_type: @@ -2676,6 +2685,11 @@ def typed(transform): def inject_default(_, combined): if combined: + if len(combined) > 1: + _LOGGER.error( + "Multiple combined values unexpectedly provided" + " for a global combine: %s", + combined) assert len(combined) == 1 return combined[0] else: