Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Prism] Support Bundle Finalization #31912

Closed
Tracked by #29650
lostluck opened this issue Jul 17, 2024 · 1 comment · Fixed by #32425
Closed
Tracked by #29650

[Prism] Support Bundle Finalization #31912

lostluck opened this issue Jul 17, 2024 · 1 comment · Fixed by #32425
Assignees

Comments

@lostluck
Copy link
Contributor

lostluck commented Jul 17, 2024

Support the bundle finalization feature. This will enable certain tests in the Validates Runner Suites to pass.

SDKs can set a ParDo needs finalization at pipeline submission time, and require the feature is supported by the runner.

DoFns require finalization to allow state to be cleared in external services, since it's been "durably persisted" by the runner. While durability isn't true for Prism at this time, finalization remains a blocker even for small test cases of interesting DoFns.

Places to look for implementing this feature.

Pipeline Proto:

The requirement in the proto file:
https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L1720

The proto field in question for ParDoPayload
https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L542

FnAPI:

ProcessBundleResponses set the following field if they need finalization after bundle persistence.

https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto#L434

The runner is expected to send the following InstructionRequest back to the SDK.
https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto#L156

Prism implementation tips.

Requirement filtering occurs here:
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go#L44

The Execute method handles the ProcessBundle lifecycle.
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/stage.go#L79

Data is persisted to the runner here:
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/stage.go#L272

Bundle Related SDK callbacks are implemented here:
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go#L208

Testing

There are both Python and Java Validates runner tests that require this feature, but the feature is implemented in the Go SDK, so it's possible to author a Go pipeline that exercises the feature that validates that the callback works.

It would likely be similar to the Separation Harness tests for Splittable DoFns, which turn up a small local server so the test only executes in LoopBack mode. https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/separate_test.go

It wouldn't need to be complicated, just validate that the finalized is called, in a way that can be validated outside of the Pipeline execution itself. Bonus points for having it validated in pipeline somehow. Only required to be validated in loopback mode execution though. That will show in execution coverage in the prism/internal package.

@damondouglas damondouglas changed the title [prism] Support Bundle Finalization [Prism] Support Bundle Finalization Aug 9, 2024
@damondouglas
Copy link
Contributor

damondouglas commented Aug 16, 2024

Q&A

Is all that is needed in the DoFn to add BundleFinalization to the ProcessElement?

Answer: All that's necessary for the Go SDK is to declare a BundleFinalizer parameter, in the appropriate order, and to actually register the closure for the BundleFinalizer.

In hindsight we'd want this to be not on ProcessElement, because for large bundles, it's going to keep a lot of things in scope, but probably fine in practice. (eg keeping a slice of IDs to ACK is smaller than a pile of closured IDs) Oh well. Nothing for it now.

Is this statement true? "Bundle finalization is not limited to SDFs but is called out here since this is the primary use case"

Answer: Yes. It isn't limited to Splittable DoFns, but they do apply most commonly to such sources. Basically any worthwhile source would be an SDF anyway.

I see this phrase "durably persisted". Does that refer to after "PersistBundle"?

Answer: In this case, yes. In Prism, the call to the ElementManager's PersistBundle method is where "durably persisted" is supposed to, and will eventually occur.

As a technical note, we don't do anything "durably" in Prism, due to doing everything in memory. If the program is shut down, it's all lost. But as we're doing this for testing purposes that's fine.

The phrase "The callback is invoked" in https://beam.apache.org/documentation/programming-guide/#bundle-finalization.

When Runner sends the wk.sendInstruction(ctx, &fnpb.InstructionRequest{}) with the bundle finalization request, does that trigger the SDK to execute the callback?

Answer: Yes. When the SDK receives the control instruction request to finalize a specific bundle the SDK should be executing all callbacks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants