Skip to content

Commit

Permalink
[#32121] Support timers in interval windows. (#32180)
Browse files Browse the repository at this point in the history
* [#32121] Support timers in interval windows.

* fix test

---------

Co-authored-by: lostluck <[email protected]>
  • Loading branch information
lostluck and lostluck authored Aug 15, 2024
1 parent d716f31 commit c197e4f
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 55 deletions.
12 changes: 9 additions & 3 deletions sdks/go/pkg/beam/runners/prism/internal/coders.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
"golang.org/x/exp/slog"
"google.golang.org/protobuf/encoding/prototext"
Expand Down Expand Up @@ -80,19 +81,24 @@ func makeWindowedValueCoder(pID string, comps *pipepb.Components, coders map[str
return wvcID, nil
}

// makeWindowCoders makes the coder pair but behavior is ultimately determined by the strategy's windowFn.
func makeWindowCoders(wc *pipepb.Coder) (exec.WindowDecoder, exec.WindowEncoder) {
// makeWindowCoders categorizes and provides the encoder, decoder pair for the type of window.
func makeWindowCoders(wc *pipepb.Coder) (engine.WinCoderType, exec.WindowDecoder, exec.WindowEncoder) {
var cwc *coder.WindowCoder
var winCoder engine.WinCoderType
switch wc.GetSpec().GetUrn() {
case urns.CoderGlobalWindow:
winCoder = engine.WinGlobal
cwc = coder.NewGlobalWindow()
case urns.CoderIntervalWindow:
winCoder = engine.WinInterval
cwc = coder.NewIntervalWindow()
default:
// TODO(https://github.com/apache/beam/issues/31921): Support custom windowfns instead of panicking here.
winCoder = engine.WinCustom
slog.LogAttrs(context.TODO(), slog.LevelError, "makeWindowCoders: unknown urn", slog.String("urn", wc.GetSpec().GetUrn()))
panic(fmt.Sprintf("makeWindowCoders, unknown urn: %v", prototext.Format(wc)))
}
return exec.MakeWindowDecoder(cwc), exec.MakeWindowEncoder(cwc)
return winCoder, exec.MakeWindowDecoder(cwc), exec.MakeWindowEncoder(cwc)
}

// lpUnknownCoders takes a coder, and populates coders with any new coders
Expand Down
16 changes: 11 additions & 5 deletions sdks/go/pkg/beam/runners/prism/internal/coders_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
"github.com/google/go-cmp/cmp"
"google.golang.org/protobuf/testing/protocmp"
Expand Down Expand Up @@ -91,22 +92,27 @@ func Test_makeWindowedValueCoder(t *testing.T) {

func Test_makeWindowCoders(t *testing.T) {
tests := []struct {
urn string
window typex.Window
urn string
window typex.Window
coderType engine.WinCoderType
}{
{urns.CoderGlobalWindow, window.GlobalWindow{}},
{urns.CoderGlobalWindow, window.GlobalWindow{}, engine.WinGlobal},
{urns.CoderIntervalWindow, window.IntervalWindow{
Start: mtime.MinTimestamp,
End: mtime.MaxTimestamp,
}},
}, engine.WinInterval},
}
for _, test := range tests {
undertest := &pipepb.Coder{
Spec: &pipepb.FunctionSpec{
Urn: test.urn,
},
}
dec, enc := makeWindowCoders(undertest)
gotCoderType, dec, enc := makeWindowCoders(undertest)

if got, want := gotCoderType, test.coderType; got != want {
t.Errorf("makeWindowCoders returned different coder type: got %v, want %v", got, want)
}

// Validate we're getting a round trip coder.
var buf bytes.Buffer
Expand Down
34 changes: 28 additions & 6 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,35 @@ type elements struct {
}

type PColInfo struct {
GlobalID string
WDec exec.WindowDecoder
WEnc exec.WindowEncoder
EDec func(io.Reader) []byte
KeyDec func(io.Reader) []byte
GlobalID string
WindowCoder WinCoderType
WDec exec.WindowDecoder
WEnc exec.WindowEncoder
EDec func(io.Reader) []byte
KeyDec func(io.Reader) []byte
}

// WinCoderType indicates what kind of coder
// the window is using. There are only 3
// valid single window encodings.
//
// - Global (for Global windows)
// - Interval (for fixed, sliding, and session windows)
// - Custom (for custom user windows)
//
// TODO: Handle custom variants with built in "known" coders, and length prefixed ones as separate cases.
// As a rule we don't care about the bytes, but we do need to be able to get to the next element.
type WinCoderType int

const (
// WinGlobal indicates the window is empty coded, with 0 bytes.
WinGlobal WinCoderType = iota
// WinInterval indicates the window is interval coded with the end event time timestamp followed by the duration in milliseconds
WinInterval
// WinCustom indicates the window customm coded with end event time timestamp followed by a custom coder.
WinCustom
)

// ToData recodes the elements with their approprate windowed value header.
func (es elements) ToData(info PColInfo) [][]byte {
var ret [][]byte
Expand Down Expand Up @@ -870,7 +892,7 @@ func (em *ElementManager) triageTimers(d TentativeData, inputInfo PColInfo, stag
keyToTimers := map[timerKey]element{}
for _, t := range timers {
// TODO: Call in a for:range loop when Beam's minimum Go version hits 1.23.0
iter := decodeTimerIter(inputInfo.KeyDec, true, t)
iter := decodeTimerIter(inputInfo.KeyDec, inputInfo.WindowCoder, t)
iter(func(ret timerRet) bool {
for _, e := range ret.elms {
keyToTimers[timerKey{key: string(ret.keyBytes), tag: ret.tag, win: e.window}] = e
Expand Down
68 changes: 58 additions & 10 deletions sdks/go/pkg/beam/runners/prism/internal/engine/timers.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,31 @@ type timerRet struct {
// If the timer has been cleared, no elements will be returned. Any existing timers
// for the tag *must* be cleared from the pending queue. The windows associated with
// the clear are provided to be able to delete pending timers.
func decodeTimerIter(keyDec func(io.Reader) []byte, usesGlobalWindow bool, raw []byte) func(func(timerRet) bool) {
func decodeTimerIter(keyDec func(io.Reader) []byte, winCoder WinCoderType, raw []byte) func(func(timerRet) bool) {

var singleWindowExtractor func(*decoder) typex.Window
switch winCoder {
case WinGlobal:
singleWindowExtractor = func(*decoder) typex.Window {
return window.GlobalWindow{}
}
case WinInterval:
singleWindowExtractor = func(d *decoder) typex.Window {
return d.IntervalWindow()
}
case WinCustom:
// Default to a length prefixed window coder here until we have different information.
// Everything else is either:: variable, 1, 4, or 8 bytes long
// KVs (especially nested ones, could occur but are unlikely, and it would be
// easier for Prism to force such coders to be length prefixed.
singleWindowExtractor = func(d *decoder) typex.Window {
return d.CustomWindowLengthPrefixed()
}
default:
// Unsupported
panic(fmt.Sprintf("unsupported WindowCoder Type: %v", winCoder))
}

return func(yield func(timerRet) bool) {
for len(raw) > 0 {
keyBytes := keyDec(bytes.NewBuffer(raw))
Expand All @@ -55,15 +79,8 @@ func decodeTimerIter(keyDec func(io.Reader) []byte, usesGlobalWindow bool, raw [

var ws []typex.Window
numWin := d.Fixed32()
if usesGlobalWindow {
for i := 0; i < int(numWin); i++ {
ws = append(ws, window.GlobalWindow{})
}
} else {
// Assume interval windows here, since we don't understand custom windows yet.
for i := 0; i < int(numWin); i++ {
ws = append(ws, d.IntervalWindow())
}
for i := 0; i < int(numWin); i++ {
ws = append(ws, singleWindowExtractor(&d))
}

clear := d.Bool()
Expand Down Expand Up @@ -149,6 +166,37 @@ func (d *decoder) IntervalWindow() window.IntervalWindow {
}
}

// CustomWindowLengthPrefixed assumes the custom window coder is a variable, length prefixed type
// such as string, bytes, or a length prefix wrapped coder.
func (d *decoder) CustomWindowLengthPrefixed() customWindow {
end := d.Timestamp()

customStart := d.cursor
l := d.Varint()
endCursor := d.cursor + int(l)
d.cursor = endCursor
return customWindow{
End: end,
Custom: d.raw[customStart:endCursor],
}
}

type customWindow struct {
End typex.EventTime
Custom []byte // The custom portion of the window, ignored by the runner
}

func (w customWindow) MaxTimestamp() typex.EventTime {
return w.End
}

func (w customWindow) Equals(o typex.Window) bool {
if c, ok := o.(customWindow); ok {
return w.End == c.End && bytes.Equal(w.Custom, c.Custom)
}
return false
}

func (d *decoder) Byte() byte {
defer func() {
d.cursor += 1
Expand Down
39 changes: 21 additions & 18 deletions sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,31 +187,33 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic

col := comps.GetPcollections()[onlyOut]
ed := collectionPullDecoder(col.GetCoderId(), coders, comps)
wDec, wEnc := getWindowValueCoders(comps, col, coders)
winCoder, wDec, wEnc := getWindowValueCoders(comps, col, coders)

var kd func(io.Reader) []byte
if kcid, ok := extractKVCoderID(col.GetCoderId(), coders); ok {
kd = collectionPullDecoder(kcid, coders, comps)
}
stage.OutputsToCoders[onlyOut] = engine.PColInfo{
GlobalID: onlyOut,
WDec: wDec,
WEnc: wEnc,
EDec: ed,
KeyDec: kd,
GlobalID: onlyOut,
WindowCoder: winCoder,
WDec: wDec,
WEnc: wEnc,
EDec: ed,
KeyDec: kd,
}

// There's either 0, 1 or many inputs, but they should be all the same
// so break after the first one.
for _, global := range t.GetInputs() {
col := comps.GetPcollections()[global]
ed := collectionPullDecoder(col.GetCoderId(), coders, comps)
wDec, wEnc := getWindowValueCoders(comps, col, coders)
winCoder, wDec, wEnc := getWindowValueCoders(comps, col, coders)
stage.inputInfo = engine.PColInfo{
GlobalID: global,
WDec: wDec,
WEnc: wEnc,
EDec: ed,
GlobalID: global,
WindowCoder: winCoder,
WDec: wDec,
WEnc: wEnc,
EDec: ed,
}
break
}
Expand All @@ -222,18 +224,19 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
for _, global := range t.GetInputs() {
col := comps.GetPcollections()[global]
ed := collectionPullDecoder(col.GetCoderId(), coders, comps)
wDec, wEnc := getWindowValueCoders(comps, col, coders)
winCoder, wDec, wEnc := getWindowValueCoders(comps, col, coders)

var kd func(io.Reader) []byte
if kcid, ok := extractKVCoderID(col.GetCoderId(), coders); ok {
kd = collectionPullDecoder(kcid, coders, comps)
}
stage.inputInfo = engine.PColInfo{
GlobalID: global,
WDec: wDec,
WEnc: wEnc,
EDec: ed,
KeyDec: kd,
GlobalID: global,
WindowCoder: winCoder,
WDec: wDec,
WEnc: wEnc,
EDec: ed,
KeyDec: kd,
}
}
em.StageAggregates(stage.ID)
Expand Down Expand Up @@ -378,7 +381,7 @@ func extractKVCoderID(coldCId string, coders map[string]*pipepb.Coder) (string,
return "", false
}

func getWindowValueCoders(comps *pipepb.Components, col *pipepb.PCollection, coders map[string]*pipepb.Coder) (exec.WindowDecoder, exec.WindowEncoder) {
func getWindowValueCoders(comps *pipepb.Components, col *pipepb.PCollection, coders map[string]*pipepb.Coder) (engine.WinCoderType, exec.WindowDecoder, exec.WindowEncoder) {
ws := comps.GetWindowingStrategies()[col.GetWindowingStrategyId()]
wcID, err := lpUnknownCoders(ws.GetWindowCoderId(), coders, comps.GetCoders())
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func gbkBytes(ws *pipepb.WindowingStrategy, wc, kc, vc *pipepb.Coder, toAggregat
panic(fmt.Sprintf("unsupported OutputTime behavior: %v", ws.GetOutputTime()))
}

wDec, wEnc := makeWindowCoders(wc)
_, wDec, wEnc := makeWindowCoders(wc)

type keyTime struct {
key []byte
Expand Down
26 changes: 14 additions & 12 deletions sdks/go/pkg/beam/runners/prism/internal/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,14 +433,15 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *eng
kd = collectionPullDecoder(kcid, coders, comps)
}

wDec, wEnc := getWindowValueCoders(comps, col, coders)
winCoder, wDec, wEnc := getWindowValueCoders(comps, col, coders)
sink2Col[sinkID] = o.Global
col2Coders[o.Global] = engine.PColInfo{
GlobalID: o.Global,
WDec: wDec,
WEnc: wEnc,
EDec: ed,
KeyDec: kd,
GlobalID: o.Global,
WindowCoder: winCoder,
WDec: wDec,
WEnc: wEnc,
EDec: ed,
KeyDec: kd,
}
transforms[sinkID] = sinkTransform(sinkID, portFor(wOutCid, wk), o.Global)
}
Expand Down Expand Up @@ -493,19 +494,20 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *eng
return fmt.Errorf("buildDescriptor: failed to handle coder on stage %v for primary input, pcol %q %v:\n%w\n%v", stg.ID, stg.primaryInput, prototext.Format(col), err, stg.transforms)
}
ed := collectionPullDecoder(col.GetCoderId(), coders, comps)
wDec, wEnc := getWindowValueCoders(comps, col, coders)
winCoder, wDec, wEnc := getWindowValueCoders(comps, col, coders)

var kd func(io.Reader) []byte
if kcid, ok := extractKVCoderID(col.GetCoderId(), coders); ok {
kd = collectionPullDecoder(kcid, coders, comps)
}

inputInfo := engine.PColInfo{
GlobalID: stg.primaryInput,
WDec: wDec,
WEnc: wEnc,
EDec: ed,
KeyDec: kd,
GlobalID: stg.primaryInput,
WindowCoder: winCoder,
WDec: wDec,
WEnc: wEnc,
EDec: ed,
KeyDec: kd,
}

stg.inputTransformID = stg.ID + "_source"
Expand Down

0 comments on commit c197e4f

Please sign in to comment.