Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Task]: Reenable single iteration #23043

Closed
damccorm opened this issue Sep 6, 2022 · 3 comments · Fixed by #27744
Closed

[Task]: Reenable single iteration #23043

damccorm opened this issue Sep 6, 2022 · 3 comments · Fixed by #27744
Assignees

Comments

@damccorm
Copy link
Contributor

damccorm commented Sep 6, 2022

What needs to happen?

Single iteration was temporarily disabled in #23042 and should either be turned back on or ripped out entirely. This should also address the issue raised in #22933

Issue Priority

Priority: 2

Issue Component

Component: sdk-go

@lostluck
Copy link
Contributor

To replicate this locally, you need 2 terminals, starting from the root directory of the beam repo.

Terminal 1: Spark runner.

$ ./gradlew :runners:spark:3:job-server:runShadow

Terminal 2: Go SDK process.

$ cd sdks/go/test/regression
$ go1.20.4 test *.go -run ^TestLPErrorPipeline$ --environment_type=LOOPBACK --runner=universal --endpoint=localhost:8099

This sends just the test pipeline to the local spark runner, allowing debugging from the SDK side along with whatever debugging process you like in the go binary.

@lostluck
Copy link
Contributor

OK, have determined what's going on.

The Spark runner always uses "multi-chunk" iterables, which isn't true of Flink or Prism (or even Dataflow, but that's harder to verify).

eg. Spark:
[128 32 196 155 160 188 247 247 0 0 0 1 7 1 0 255 255 255 255 3 1 0 5 65 112 112 108 101 1 0 6 66 97 110 97 110 97 1 0 6 67 104 101 114 114 121 0]

vs Prism:
[128 32 196 155 160 188 247 247 0 0 0 1 15 1 0 0 0 0 3 8 1 0 5 65 112 112 108 101 9 1 0 6 66 97 110 97 110 97 9 1 0 6 67 104 101 114 114 121]

vs Flink:

[128 32 196 155 160 188 247 247 0 0 0 1 15 1 0 0 0 0 3 1 0 5 65 112 112 108 101 1 0 6 66 97 110 97 110 97 1 0 6 67 104 101 114 114 121]

That means the values are always coming over with a -1 (the 255 255 255 255 in the enccoded value from spark) as the length of the chunk header, enabling the multi-chunk protocol, but then not doing a state backed iterable. It's a bug on the Go SDK side, because outside of the state backed case, I didn't think any runner implemented the multi-chunk protocol.

@lostluck
Copy link
Contributor

The issue is that since the DoFn didn't drain the iterable, there were still bytes to be read when processing returned to the datasource.

So, a real bug, but on a disused path for most runners. #27762 has been filed to implement the behavior and test in prism, to allow future SDK devs to validate this behavior more easily.

lostluck added a commit that referenced this issue Aug 1, 2023
* [#23043] Re-enable single iteration for the Go SDK.

* more debuging

* don't drop plan

* debug text.

* Fix beam23043

* clean up debugging.

* update unit test.

* go fmt

---------

Co-authored-by: lostluck <[email protected]>
@github-actions github-actions bot added this to the 2.50.0 Release milestone Aug 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants