From 6972e0cbbfcda09a56c06883781f1b37b6c0932f Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Mon, 5 Aug 2024 15:53:38 -0700 Subject: [PATCH 1/3] [#32085][prism] Fix session windowing. --- .../runners/prism/internal/handlerunner.go | 24 ++++++++++++------- .../runners/portability/prism_runner_test.py | 19 +++++++++++++++ 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go index a1eeeba02c4b..eecebde3d693 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go +++ b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go @@ -244,7 +244,7 @@ func (h *runner) ExecuteTransform(stageID, tid string, t *pipepb.PTransform, com kc := coders[kcID] ec := coders[ecID] - data = append(data, gbkBytes(ws, wc, kc, ec, inputData, coders, watermark)) + data = append(data, gbkBytes(ws, wc, kc, ec, inputData, coders)) if len(data[0]) == 0 { panic("no data for GBK") } @@ -290,7 +290,7 @@ func windowingStrategy(comps *pipepb.Components, tid string) *pipepb.WindowingSt } // gbkBytes re-encodes gbk inputs in a gbk result. -func gbkBytes(ws *pipepb.WindowingStrategy, wc, kc, vc *pipepb.Coder, toAggregate [][]byte, coders map[string]*pipepb.Coder, watermark mtime.Time) []byte { +func gbkBytes(ws *pipepb.WindowingStrategy, wc, kc, vc *pipepb.Coder, toAggregate [][]byte, coders map[string]*pipepb.Coder) []byte { // Pick how the timestamp of the aggregated output is computed. var outputTime func(typex.Window, mtime.Time, mtime.Time) mtime.Time switch ws.GetOutputTime() { @@ -333,9 +333,8 @@ func gbkBytes(ws *pipepb.WindowingStrategy, wc, kc, vc *pipepb.Coder, toAggregat kd := pullDecoder(kc, coders) vd := pullDecoder(vc, coders) - // Right, need to get the key coder, and the element coder. - // Cus I'll need to pull out anything the runner knows how to deal with. - // And repeat. + // Aggregate by windows and keys, using the window coder and KV coders. + // We need to extract and split the key bytes from the element bytes. for _, data := range toAggregate { // Parse out each element's data, and repeat. buf := bytes.NewBuffer(data) @@ -388,34 +387,41 @@ func gbkBytes(ws *pipepb.WindowingStrategy, wc, kc, vc *pipepb.Coder, toAggregat } // Use a decreasing sort (latest to earliest) so we can correct // the output timestamp to the new end of window immeadiately. - // TODO need to correct this if output time is different. sort.Slice(ordered, func(i, j int) bool { return ordered[i].MaxTimestamp() > ordered[j].MaxTimestamp() }) cur := ordered[0] sessionData := windows[cur] + delete(windows, cur) for _, iw := range ordered[1:] { - // If they overlap, then we merge the data. + // Check if the gap between windows is less than the gapSize. + // If not, this window is done, and we start a next window. if iw.End+gapSize < cur.Start { - // Start a new session. + // Store current data with the current window. windows[cur] = sessionData + // Use the incoming window instead, and clear it from the map. cur = iw sessionData = windows[iw] + delete(windows, cur) + // There's nothing to merge, since we've just started with this windowed data. continue } - // Extend the session + // Extend the session with the incoming window, and merge the the incoming window's data. cur.Start = iw.Start toMerge := windows[iw] delete(windows, iw) for k, kt := range toMerge { skt := sessionData[k] + // Ensure the output time matches the given function. + skt.time = outputTime(cur, kt.time, skt.time) skt.key = kt.key skt.w = cur skt.values = append(skt.values, kt.values...) sessionData[k] = skt } } + windows[cur] = sessionData } // Everything's aggregated! // Time to turn things into a windowed KV> diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index f1ccf66a2289..2ae600887a68 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -40,7 +40,9 @@ from apache_beam.runners.portability import portable_runner_test from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.transforms import window from apache_beam.transforms.sql import SqlTransform +from apache_beam.utils import timestamp # Run as # @@ -178,6 +180,23 @@ def create_options(self): return options + # Slightly more robust session window test: + # Validates that an inner grouping doesn't duplicate data either. + # Copied also because the timestamp in fn_runner_test.py isn't being + # inferred correctly as seconds for some reason, but as micros. + # The belabored specification is validating the timestamp type works at least. + # See https://github.com/apache/beam/issues/32085 + def test_windowing(self): + with self.create_pipeline() as p: + res = ( + p + | beam.Create([1, 2, 100, 101, 102, 123]) + | beam.Map(lambda t: window.TimestampedValue(('k', t), timestamp.Timestamp.of(t).micros)) + | beam.WindowInto(beam.transforms.window.Sessions(10)) + | beam.GroupByKey() + | beam.Map(lambda k_vs1: (k_vs1[0], sorted(k_vs1[1])))) + assert_that(res, equal_to([('k', [1, 2]), ('k', [100, 101, 102]), ('k', [123])])) + # Can't read host files from within docker, read a "local" file there. def test_read(self): print('name:', __name__) From 871c154c98bb1449b000ed1f2a390878eec57a65 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Tue, 6 Aug 2024 09:34:13 -0700 Subject: [PATCH 2/3] pylint fmt --- .../runners/portability/prism_runner_test.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index 2ae600887a68..75b9b5fa667f 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -191,11 +191,14 @@ def test_windowing(self): res = ( p | beam.Create([1, 2, 100, 101, 102, 123]) - | beam.Map(lambda t: window.TimestampedValue(('k', t), timestamp.Timestamp.of(t).micros)) + | beam.Map( + lambda t: window.TimestampedValue( + ('k', t), timestamp.Timestamp.of(t).micros)) | beam.WindowInto(beam.transforms.window.Sessions(10)) | beam.GroupByKey() | beam.Map(lambda k_vs1: (k_vs1[0], sorted(k_vs1[1])))) - assert_that(res, equal_to([('k', [1, 2]), ('k', [100, 101, 102]), ('k', [123])])) + assert_that( + res, equal_to([('k', [1, 2]), ('k', [100, 101, 102]), ('k', [123])])) # Can't read host files from within docker, read a "local" file there. def test_read(self): @@ -239,8 +242,8 @@ def test_expand_kafka_read(self): timestamp_policy=ReadFromKafka.create_time_policy, expansion_service=self.get_expansion_service())) self.assertTrue( - 'No resolvable bootstrap urls given in bootstrap.servers' in str( - ctx.exception), + 'No resolvable bootstrap urls given in bootstrap.servers' + in str(ctx.exception), 'Expected to fail due to invalid bootstrap.servers, but ' 'failed due to:\n%s' % str(ctx.exception)) From 2c7f3c2905a8fa9d61c21565ef3f800ccf08aa94 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Tue, 6 Aug 2024 11:32:00 -0700 Subject: [PATCH 3/3] Formatter Config difference. --- .../apache_beam/runners/portability/prism_runner_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index 75b9b5fa667f..324fe5a17b54 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -242,8 +242,8 @@ def test_expand_kafka_read(self): timestamp_policy=ReadFromKafka.create_time_policy, expansion_service=self.get_expansion_service())) self.assertTrue( - 'No resolvable bootstrap urls given in bootstrap.servers' - in str(ctx.exception), + 'No resolvable bootstrap urls given in bootstrap.servers' in str( + ctx.exception), 'Expected to fail due to invalid bootstrap.servers, but ' 'failed due to:\n%s' % str(ctx.exception))