Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[prism] Support Empty Flattens as Side Input - no way to make progress with pending elements #32003

Closed
Tracked by #29650
lostluck opened this issue Jul 26, 2024 · 2 comments · Fixed by #32029
Closed
Tracked by #29650
Assignees

Comments

@lostluck
Copy link
Contributor

lostluck commented Jul 26, 2024

Java FlattenTests are failing by no longer making progress. Largely related to "empty flattens"for the tests testEmptyFlattenAsSideInput, testFlattenPCollectionsEmpty and testFlattenPCollectionsEmptyThenParDo.

Iterate by setting as the pipeline test, then running the Gradle command, pointed at a prism instance.

TEST=org.apache.beam.sdk.transforms.FlattenTest
./gradlew :runners:portability:java:ulrLoopbackValidatesRunnerTests -PjobEndpoint=localhost:8073 --tests="$TEST"

There should be at least 3 failing tests.

This seems like it should be a simple enough fix.

For testEmptyFlattenAsSideInput it's probably that there's no signal that the side input is ready, preventing progress. Both the other tests testFlattenPCollectionsEmptyThenParDo and testFlattenPCollectionsEmpty uses a PAssert to check emptiness, which is still the side input issue since that's how an empty side input is detected. So it's all the same root cause.

Investigation should be a replication of those test cases as equivalent Go pipelines, if they don't already exist, to eliminate potential Java or Go specific behaviors. From there, investigation should focus on whether there needs to be a "back signal" to learn if an empty pcollection is "ready" or not from the consumer side (such as for side inputs).

Full error for the testFlattenPCollectionsEmpty case.

java.lang.RuntimeException: The Runner experienced the following error during execution:
nothing in progress and no refreshes with non zero pending elements: 1
ElementManager Now: 1720541143863 processingTimeEvents: map[]
stage-000 watermark in +inf out +inf upstream +inf from  pending [] byKey map[] inprogressKeys map[] byBundle map[] holds [] holdCounts map[] holdsInBundle map[] pttEvents map[]
stage-001 watermark in +inf out +inf upstream +inf from stage-000 pending [] byKey map[] inprogressKeys map[] byBundle map[] holds [] holdCounts map[] holdsInBundle map[] pttEvents map[]
stage-002 watermark in -inf out -inf upstream +inf from  pending [] byKey map[] inprogressKeys map[] byBundle map[] holds [] holdCounts map[] holdsInBundle map[] pttEvents map[]
stage-003 watermark in -inf out -inf upstream -inf from stage-002 pending [] byKey map[] inprogressKeys map[] byBundle map[] holds [] holdCounts map[] holdsInBundle map[] pttEvents map[]
stage-004 watermark in -inf out -inf upstream -inf from stage-003 pending [{Data - Window [*], EventTime -inf, Element [0 0 0 0]}] byKey map[] inprogressKeys map[] byBundle map[] holds [] holdCounts map[] holdsInBundle map[] pttEvents map[]
stage-005 watermark in -inf out -inf upstream -inf from stage-004 pending [] byKey map[] inprogressKeys map[] byBundle map[] holds [] holdCounts map[] holdsInBundle map[] pttEvents map[]
stage-006 watermark in -inf out -inf upstream -inf from stage-005 pending [] byKey map[] inprogressKeys map[] byBundle map[] holds [] holdCounts map[] holdsInBundle map[] pttEvents map[]
stage-007 watermark in -inf out -inf upstream -inf from stage-006 pending [] byKey map[] inprogressKeys map[] byBundle map[] holds [] holdCounts map[] holdsInBundle map[] pttEvents map[]
stage-008 watermark in -inf out -inf upstream -inf from stage-007 pending [] byKey map[] inprogressKeys map[] byBundle map[] holds [] holdCounts map[] holdsInBundle map[] pttEvents map[]

	at org.apache.beam.runners.portability.JobServicePipelineResult.propagateErrors(JobServicePipelineResult.java:176)
	at org.apache.beam.runners.portability.JobServicePipelineResult.waitUntilFinish(JobServicePipelineResult.java:117)
	at org.apache.beam.runners.portability.testing.TestUniversalRunner.run(TestUniversalRunner.java:83)
	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325)
	at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:404)
	at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:343)
	at org.apache.beam.sdk.transforms.FlattenTest.testFlattenPCollectionsEmpty(FlattenTest.java:138)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
	at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:750)

Full error for the Side Input case:

java.lang.RuntimeException: The Runner experienced the following error during execution:
nothing in progress and no refreshes with non zero pending elements: 2
ElementManager Now: 1720541122891 processingTimeEvents: map[]
stage-000 watermark in +inf out +inf upstream +inf from  pending [] byKey map[] inprogressKeys map[] byBundle map[] holds [] holdCounts map[] holdsInBundle map[] pttEvents map[]
stage-001 watermark in +inf out +inf upstream +inf from stage-000 pending [] byKey map[] inprogressKeys map[] byBundle map[] holds [] holdCounts map[] holdsInBundle map[] pttEvents map[]
stage-002 watermark in -inf out -inf upstream +inf from  pending [] byKey map[] inprogressKeys map[] byBundle map[] holds [] holdCounts map[] holdsInBundle map[] pttEvents map[]
stage-003 watermark in -inf out -inf upstream -inf from stage-002 pending [] byKey map[] inprogressKeys map[] byBundle map[] holds [] holdCounts map[] holdsInBundle map[] pttEvents map[]
stage-004 watermark in +inf out +inf upstream +inf from  pending [] byKey map[] inprogressKeys map[] byBundle map[] holds [] holdCounts map[] holdsInBundle map[] pttEvents map[]
stage-005 watermark in -inf out -inf upstream +inf from stage-004 pending [{Data - Window [*], EventTime -inf, Element [0]}] byKey map[] inprogressKeys map[] byBundle map[] holds [] holdCounts map[] holdsInBundle map[] pttEvents map[]
stage-006 watermark in -inf out -inf upstream -inf from stage-005 pending [{Data - Window [*], EventTime -inf, Element [0 0 0 0]}] byKey map[] inprogressKeys map[] byBundle map[] holds [] holdCounts map[] holdsInBundle map[] pttEvents map[]
stage-007 watermark in -inf out -inf upstream -inf from stage-006 pending [] byKey map[] inprogressKeys map[] byBundle map[] holds [] holdCounts map[] holdsInBundle map[] pttEvents map[]
stage-008 watermark in -inf out -inf upstream -inf from stage-007 pending [] byKey map[] inprogressKeys map[] byBundle map[] holds [] holdCounts map[] holdsInBundle map[] pttEvents map[]
stage-009 watermark in -inf out -inf upstream -inf from stage-008 pending [] byKey map[] inprogressKeys map[] byBundle map[] holds [] holdCounts map[] holdsInBundle map[] pttEvents map[]
stage-010 watermark in -inf out -inf upstream -inf from stage-009 pending [] byKey map[] inprogressKeys map[] byBundle map[] holds [] holdCounts map[] holdsInBundle map[] pttEvents map[]

	at org.apache.beam.runners.portability.JobServicePipelineResult.propagateErrors(JobServicePipelineResult.java:176)
	at org.apache.beam.runners.portability.JobServicePipelineResult.waitUntilFinish(JobServicePipelineResult.java:117)
	at org.apache.beam.runners.portability.testing.TestUniversalRunner.run(TestUniversalRunner.java:83)
	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325)
	at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:404)
	at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:343)
	at org.apache.beam.sdk.transforms.FlattenTest.testEmptyFlattenAsSideInput(FlattenTest.java:219)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
	at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:750)

@lostluck
Copy link
Contributor Author

lostluck commented Jul 30, 2024

The root issue appears to be that the interstitial flatten transform prevents a signal propagation to the side input to indicate it's ready.

Basically, this wouldn't happen if Flatten Unzipping were implemented, allowing them to be elided entirely. But I think a mechanism to propagate watermark through unexecutable stages would be valuable, since reshuffles, or stateful prefixes or SDFs might cause the same issue due to how stages are built.

So the question is: how do we ensure this signal gets propagated through the element manager, throuhg unexecutable stages (no parallel data)?

@lostluck
Copy link
Contributor Author

Oh no. It's weirder than that.

The flatten stage has no inputs (hence being a truly empty flatten), so nothing is ever going to trigger the watermark refresh. We can't synthetically add an impulse (the data nonce needs to go somewhere), but we can ensure that a stage without a parallel input, is automatically watermark refreshed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant