Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#29917][prism] Initial TestStream support #30072

Merged
merged 11 commits into from
Feb 16, 2024
99 changes: 73 additions & 26 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ type ElementManager struct {

livePending atomic.Int64 // An accessible live pending count. DEBUG USE ONLY
pendingElements sync.WaitGroup // pendingElements counts all unprocessed elements in a job. Jobs with no pending elements terminate successfully.

testStreamHandler *testStreamHandler // Optional test stream handler when a test stream is in the pipeline.
}

func (em *ElementManager) addPending(v int) {
Expand Down Expand Up @@ -223,6 +225,15 @@ func (em *ElementManager) StageStateful(ID string) {
em.stages[ID].stateful = true
}

// AddTestStream provides a builder interface for the execution layer to build the test stream from
// the protos.
func (em *ElementManager) AddTestStream(id string, tagToPCol map[string]string) TestStreamBuilder {
impl := &testStreamImpl{em: em}
impl.initHandler(id)
impl.TagsToPCollections(tagToPCol)
return impl
}

// Impulse marks and initializes the given stage as an impulse which
// is a root transform that starts processing.
func (em *ElementManager) Impulse(stageID string) {
Expand Down Expand Up @@ -319,37 +330,72 @@ func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() string)
em.refreshCond.L.Lock()
}
}
if len(em.inprogressBundles) == 0 && len(em.watermarkRefreshes) == 0 {
v := em.livePending.Load()
slog.Debug("Bundles: nothing in progress and no refreshes", slog.Int64("pendingElementCount", v))
if v > 0 {
var stageState []string
ids := maps.Keys(em.stages)
sort.Strings(ids)
for _, id := range ids {
ss := em.stages[id]
inW := ss.InputWatermark()
outW := ss.OutputWatermark()
upPCol, upW := ss.UpstreamWatermark()
upS := em.pcolParents[upPCol]
stageState = append(stageState, fmt.Sprintln(id, "watermark in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending, "byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle", ss.inprogressKeysByBundle, "holds", ss.watermarkHoldHeap, "holdCounts", ss.watermarkHoldsCounts))
}
panic(fmt.Sprintf("nothing in progress and no refreshes with non zero pending elements: %v\n%v", v, strings.Join(stageState, "")))
}
} else if len(em.inprogressBundles) == 0 {
v := em.livePending.Load()
slog.Debug("Bundles: nothing in progress after advance",
slog.Any("advanced", advanced),
slog.Int("refreshCount", len(em.watermarkRefreshes)),
slog.Int64("pendingElementCount", v),
)
}
em.refreshCond.L.Unlock()
em.checkForQuiescence(advanced)
}
}()
return runStageCh
}

// checkForQuiescence sees if this element manager is no longer able to do any pending work or make progress.
//
// Quiescense can happen if there are no inprogress bundles, and there are no further watermark refreshes, which
// are the only way to access new pending elements. If there are no pending elements, then the pipeline will
// terminate successfully.
//
// Otherwise, produce information for debugging why the pipeline is stuck and take appropriate action, such as
// executing off the next TestStream event.
//
// Must be called while holding em.refreshCond.L.
func (em *ElementManager) checkForQuiescence(advanced set[string]) {
defer em.refreshCond.L.Unlock()
if len(em.inprogressBundles) > 0 {
// If there are bundles in progress, then there may be watermark refreshes when they terminate.
return
}
if len(em.watermarkRefreshes) > 0 {
// If there are watermarks to refresh, we aren't yet stuck.
v := em.livePending.Load()
slog.Debug("Bundles: nothing in progress after advance",
slog.Any("advanced", advanced),
slog.Int("refreshCount", len(em.watermarkRefreshes)),
slog.Int64("pendingElementCount", v),
)
return
}
// The job has quiesced!

// There are no further incoming watermark changes, see if there are test stream events for this job.
nextEvent := em.testStreamHandler.NextEvent()
if nextEvent != nil {
nextEvent.Execute(em)
// Decrement pending for the event being processed.
em.addPending(-1)
return
}

v := em.livePending.Load()
if v == 0 {
// Since there are no further pending elements, the job will be terminating successfully.
return
}
// The job is officially stuck. Fail fast and produce debugging information.
// Jobs must never get stuck so this indicates a bug in prism to be investigated.

slog.Debug("Bundles: nothing in progress and no refreshes", slog.Int64("pendingElementCount", v))
var stageState []string
ids := maps.Keys(em.stages)
sort.Strings(ids)
for _, id := range ids {
ss := em.stages[id]
inW := ss.InputWatermark()
outW := ss.OutputWatermark()
upPCol, upW := ss.UpstreamWatermark()
upS := em.pcolParents[upPCol]
stageState = append(stageState, fmt.Sprintln(id, "watermark in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending, "byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle", ss.inprogressKeysByBundle, "holds", ss.watermarkHoldHeap, "holdCounts", ss.watermarkHoldsCounts))
}
panic(fmt.Sprintf("nothing in progress and no refreshes with non zero pending elements: %v\n%v", v, strings.Join(stageState, "")))
}

// InputForBundle returns pre-allocated data for the given bundle, encoding the elements using
// the PCollection's coders.
func (em *ElementManager) InputForBundle(rb RunBundle, info PColInfo) [][]byte {
Expand Down Expand Up @@ -429,6 +475,7 @@ const (
BlockTimer // BlockTimer represents timers for the bundle.
)

// Block represents a contiguous set of data or timers for the same destination.
type Block struct {
Kind BlockKind
Bytes [][]byte
Expand Down
47 changes: 47 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,50 @@ func TestElementManagerCoverage(t *testing.T) {
})
}
}

func TestTestStream(t *testing.T) {
initRunner(t)

tests := []struct {
pipeline func(s beam.Scope)
}{
{pipeline: primitives.TestStreamBoolSequence},
{pipeline: primitives.TestStreamByteSliceSequence},
{pipeline: primitives.TestStreamFloat64Sequence},
{pipeline: primitives.TestStreamInt64Sequence},
{pipeline: primitives.TestStreamInt16Sequence},
{pipeline: primitives.TestStreamStrings},
{pipeline: primitives.TestStreamTwoBoolSequences},
{pipeline: primitives.TestStreamTwoFloat64Sequences},
{pipeline: primitives.TestStreamTwoInt64Sequences},
{pipeline: primitives.TestStreamTwoUserTypeSequences},
}

configs := []struct {
name string
OneElementPerKey, OneKeyPerBundle bool
}{
{"Greedy", false, false},
{"AllElementsPerKey", false, true},
{"OneElementPerKey", true, false},
{"OneElementPerBundle", true, true},
}
for _, config := range configs {
for _, test := range tests {
t.Run(initTestName(test.pipeline)+"_"+config.name, func(t *testing.T) {
t.Cleanup(func() {
engine.OneElementPerKey = false
engine.OneKeyPerBundle = false
})
engine.OneElementPerKey = config.OneElementPerKey
engine.OneKeyPerBundle = config.OneKeyPerBundle
p, s := beam.NewPipelineWithRoot()
test.pipeline(s)
_, err := executeWithT(context.Background(), t, p)
if err != nil {
t.Fatalf("pipeline failed, but feature should be implemented in Prism: %v", err)
}
})
}
}
}
lostluck marked this conversation as resolved.
Show resolved Hide resolved
Loading
Loading