-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Bug]: Unable to drain Flink job when RequiresStableInput is used #28554
Comments
@dmvk Is it possible that this is bug in Flink? I think there should be checkpoint barrier strictly after emitting MAX_WATERMARK, does it make sense? |
I have tried stopping the job using both the CLI (with --drain) as well as via flink RestClusterClient library where Note that when drain is set to false, pipeline is able to successfully stop with savepoint and DoFn with So this issue is only for the specific scenario when drain flag is used to advance the watermark and permanently stop the pipeline. |
I think I see the problem. From my understanding, using The problem is that after the savepoint is taken in step c) we process the data and this results in more elements being sent downstream after the savepoint barrier has passed. This problem should arise only in case when there is a chain of at least two transforms with |
My job only has 1 DoFn using the
The job always fails during the final savepoint operation with the watermark hold error and not after the savepoint is taken (fails between step b and c). |
This feels strange. Can you try to log the elements arriving at the stable DoFn after the savepoint to see where are they coming from? I would be surprised if this would be coming from the KafkaIO (provided it stops sending data before triggering the checkpoint). |
These events are the buffered events that are emitted from DoFn1. I tried the following steps:
From my experience, the only instance drain operation works is if the buffered elements are fully processed (via checkpoint trigger) and buffer is empty at time of drain execution. |
Sorry for late reaction, I was OOO. I once again walked though the code and it seems to make sense now. Here is the problem: The only solution then seems to make sure that in call to |
No worries and thanks for your response. I wanted to double check that if we flush the buffer within Issue i see is that once we flush the buffer, it would trigger the downstream DoFn processing. And in the scenario where the downstream stable DoFn operation fails (say its writing to a sink and the sink is unavailable and/or checkpoint itself has timeout), our final checkpoint and drain operation will fail, which could lead to inconsistent state. |
Yes, if the final checkpoint fails (for whatever reason), then the retry can yield different result than the first run. I think we have only two options: Currently, we do b), which is actually semantically correct. We can change the exception to be more explanatory or even better throw exception unconditionally, if DoFn has stable input (currently it might fail non-deterministically). Because switching between a) and b) might be subject to user decision, we might introduce a flag to |
ok, adding a flag for drain behavior sounds reasonable. Lastly, I just wanted to double check with what issues do you see if we instead skip the watermark hold check inside If watermark skip approach doesn't seem correct, then i will update my PR to implement approach A. |
There are two problems: |
Thanks for the clarification! Makes sense to make this operation best-effort in that case. |
Closed via #29102 |
What happened?
Issue:
Flink pipeline does not get drained when RequiresStableInput annotation is used. Stack-trace when drain is triggered:
Caused by: java.lang.RuntimeException: There are still watermark holds left when terminating operator KVStoreTransform/GroupIntoBatches/ParDo(GroupIntoBatches)/ParMultiDo(GroupIntoBatches) -> PersistenceTransform/WriteSpendStateToDb/ParMultiDo(WriteSpendStateDbStreaming) (1/1)#0 Watermark held 1695147850921 at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.flushData(DoFnOperator.java:631) at org.apache.beam.runners.flink.translation.wrappers.streaming.AbstractStreamOperatorCompat.finish(AbstractStreamOperatorCompat.java:67) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finishOperator(StreamOperatorWrapper.java:239) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferFinishOperatorToMailbox$3(StreamOperatorWrapper.java:213) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:97) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndFinishOperator(StreamOperatorWrapper.java:186) ... 13 more
Cause:
When drain is triggered, MAX_WATERMARK gets emitted before the last checkpoint barrier. This helps in triggering all the registered event-time timers and flushing out any state. Therefore, the expectation is that when flushData is finally invoked in DoFnOperator, all the event timers should be fired and the watermark should proceed.
However, when RequiresStableInput annotation is used, the behavior is to process the DoFn after the checkpoint operation is complete. Since, flush is invoked when the final checkpoint/savepoint operation is in progress, the watermark is held by the DoFn with the RequiresStableInput annotation and is waiting for the checkpoint to complete.
Potential Solution:
Skip this check in case the DoFn has RequiresStableInput set, since we know that all the final pending data will be processed after the final savepoint operation completes.
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
The text was updated successfully, but these errors were encountered: