Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#32085][prism] Fix session windowing. #32086

Merged
merged 3 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (h *runner) ExecuteTransform(stageID, tid string, t *pipepb.PTransform, com
kc := coders[kcID]
ec := coders[ecID]

data = append(data, gbkBytes(ws, wc, kc, ec, inputData, coders, watermark))
data = append(data, gbkBytes(ws, wc, kc, ec, inputData, coders))
if len(data[0]) == 0 {
panic("no data for GBK")
}
Expand Down Expand Up @@ -290,7 +290,7 @@ func windowingStrategy(comps *pipepb.Components, tid string) *pipepb.WindowingSt
}

// gbkBytes re-encodes gbk inputs in a gbk result.
func gbkBytes(ws *pipepb.WindowingStrategy, wc, kc, vc *pipepb.Coder, toAggregate [][]byte, coders map[string]*pipepb.Coder, watermark mtime.Time) []byte {
func gbkBytes(ws *pipepb.WindowingStrategy, wc, kc, vc *pipepb.Coder, toAggregate [][]byte, coders map[string]*pipepb.Coder) []byte {
// Pick how the timestamp of the aggregated output is computed.
var outputTime func(typex.Window, mtime.Time, mtime.Time) mtime.Time
switch ws.GetOutputTime() {
Expand Down Expand Up @@ -333,9 +333,8 @@ func gbkBytes(ws *pipepb.WindowingStrategy, wc, kc, vc *pipepb.Coder, toAggregat
kd := pullDecoder(kc, coders)
vd := pullDecoder(vc, coders)

// Right, need to get the key coder, and the element coder.
// Cus I'll need to pull out anything the runner knows how to deal with.
// And repeat.
// Aggregate by windows and keys, using the window coder and KV coders.
// We need to extract and split the key bytes from the element bytes.
for _, data := range toAggregate {
// Parse out each element's data, and repeat.
buf := bytes.NewBuffer(data)
Expand Down Expand Up @@ -388,34 +387,41 @@ func gbkBytes(ws *pipepb.WindowingStrategy, wc, kc, vc *pipepb.Coder, toAggregat
}
// Use a decreasing sort (latest to earliest) so we can correct
// the output timestamp to the new end of window immeadiately.
// TODO need to correct this if output time is different.
sort.Slice(ordered, func(i, j int) bool {
return ordered[i].MaxTimestamp() > ordered[j].MaxTimestamp()
})

cur := ordered[0]
sessionData := windows[cur]
delete(windows, cur)
for _, iw := range ordered[1:] {
// If they overlap, then we merge the data.
// Check if the gap between windows is less than the gapSize.
// If not, this window is done, and we start a next window.
if iw.End+gapSize < cur.Start {
// Start a new session.
// Store current data with the current window.
windows[cur] = sessionData
// Use the incoming window instead, and clear it from the map.
cur = iw
sessionData = windows[iw]
delete(windows, cur)
// There's nothing to merge, since we've just started with this windowed data.
continue
}
// Extend the session
// Extend the session with the incoming window, and merge the the incoming window's data.
cur.Start = iw.Start
toMerge := windows[iw]
delete(windows, iw)
for k, kt := range toMerge {
skt := sessionData[k]
// Ensure the output time matches the given function.
skt.time = outputTime(cur, kt.time, skt.time)
skt.key = kt.key
skt.w = cur
skt.values = append(skt.values, kt.values...)
sessionData[k] = skt
}
}
windows[cur] = sessionData
}
// Everything's aggregated!
// Time to turn things into a windowed KV<K, Iterable<V>>
Expand Down
22 changes: 22 additions & 0 deletions sdks/python/apache_beam/runners/portability/prism_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
from apache_beam.runners.portability import portable_runner_test
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms import window
from apache_beam.transforms.sql import SqlTransform
from apache_beam.utils import timestamp

# Run as
#
Expand Down Expand Up @@ -178,6 +180,26 @@ def create_options(self):

return options

# Slightly more robust session window test:
# Validates that an inner grouping doesn't duplicate data either.
# Copied also because the timestamp in fn_runner_test.py isn't being
# inferred correctly as seconds for some reason, but as micros.
# The belabored specification is validating the timestamp type works at least.
# See https://github.com/apache/beam/issues/32085
def test_windowing(self):
with self.create_pipeline() as p:
res = (
p
| beam.Create([1, 2, 100, 101, 102, 123])
| beam.Map(
lambda t: window.TimestampedValue(
('k', t), timestamp.Timestamp.of(t).micros))
| beam.WindowInto(beam.transforms.window.Sessions(10))
| beam.GroupByKey()
| beam.Map(lambda k_vs1: (k_vs1[0], sorted(k_vs1[1]))))
assert_that(
res, equal_to([('k', [1, 2]), ('k', [100, 101, 102]), ('k', [123])]))

# Can't read host files from within docker, read a "local" file there.
def test_read(self):
print('name:', __name__)
Expand Down
Loading