-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#29917][prism] Initial TestStream support #30072
Conversation
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## master #30072 +/- ##
==========================================
+ Coverage 38.46% 38.52% +0.06%
==========================================
Files 697 698 +1
Lines 102198 102381 +183
==========================================
+ Hits 39307 39443 +136
- Misses 61262 61305 +43
- Partials 1629 1633 +4
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
7ccf83a
to
48017b8
Compare
R: @damondouglas (because it may help understand drain) |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
Friendly Ping. |
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Outdated
Show resolved
Hide resolved
6293d92
to
33ec169
Compare
PTAL! Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Thank you kindly! |
Any chance this breaks Go Flink VR? From the history of first failed run,
this is the only Go change. However, the test log was huge and truncated, not able to investigate |
Technically yes, but it also should have filtered out the tests. I'll take
a look when I get back to my desk.
…On Thu, Feb 29, 2024, 10:51 AM Yi Hu ***@***.***> wrote:
Any chance this breaks Go Flink VR?
From the history of first failed run,
- https://github.com/apache/beam/actions/runs/7936467225 (first
failing)
- https://github.com/apache/beam/actions/runs/7932752907 (last
succeeded)
image.png (view on web)
<https://github.com/apache/beam/assets/8010435/df410f59-685e-4c45-b9fd-4239ff9eef0d>
this is the only Go change.
However, the test log was huge and truncated, not able to investigate
—
Reply to this email directly, view it on GitHub
<#30072 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/ADKDOFJHMUHFGW2GHJ2GZG3YV54ELAVCNFSM6AAAAABCF6BMF6VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTSNZRG42TSNBQGA>
.
You are receiving this because you modified the open/close state.Message
ID: ***@***.***>
|
Found it.
Basically, the Flink runner does nonsense with TestStream whenever there's a LengthPrefix, leading to an extra length prefix being added to custom coders, strings, byte slices etc. We've never successfully found where/why this gets added. But ultimately it means the data is being corrupted, since the extra length prefixing occurs flink side, but the coder received back by the SDK doesn't include it, causing decode failures. The problem in this case is that I didn't filter out all the new tests I added too. I'll have a PR to fix shortly. |
Should be fixed by #30462. The log file was around half a million lines, so grep was a tiny bit unhappy. |
Adds initial TestStream support for Prism for #29917.
A bit of light experimentation on handling design puts most test stream related code into a single file, and uses an interface to delegate the execution of events to the different events themeselves.
The only easily testable logic outside of a pipeline execution context would be popping events from a queue. But as the alternative to correct popping is a pipeline freeze and crash (pending elements remaining, but nothing to trigger their processing), these are omitted.
The only included logic that isn't currently covered is Processing Time handling, though the core is present in this PR. TestStreams with ProcessingTime events will be hard failed at job submission time for now. This is no great loss as Prism doesn't meaningfully handle ProcessingTime events at all, and that will require it's own coordination to insert their executions in the queue. Tracking processing time implementation is in #30083.
There was a bit of a delay in finishing this implementation due to the 2.54.0 beam release, and what to do about the Flink quirks. It turns out the Flink TestStream quirks were documented for the Go SDK, and the tests were made to suit them. This PR removes the quirks to validate What You Put In Is What You Get Out, for those tests (no extra length prefixing for example).
Prism also wraps the unknown byte blobs with length prefixes if it is length prefixing that coder in other places. This is simpler than doing an additional conditional re-write of coders in the pipeline graph, just due to test stream. A test that uses a row coder has been added to additionally validate this behavior.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.