Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable drain operation for flink jobs with requiresStableInput annotation #28567

Closed

Conversation

kkdoon
Copy link
Contributor

@kkdoon kkdoon commented Sep 20, 2023

Currently, drain operation does not work for flink pipelines when RequiresStableInput annotation is used. This is caused due to buffered data not being processed before the final checkpoint operation that causes watermark hold related exception. This PR addresses this issue by processing the buffer before the final checkpoint completes. More context in #28554.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@kkdoon
Copy link
Contributor Author

kkdoon commented Sep 20, 2023

Run PreCommit Java PVR Flink Batch

@kkdoon kkdoon marked this pull request as ready for review September 21, 2023 02:03
@kkdoon
Copy link
Contributor Author

kkdoon commented Sep 21, 2023

R: @mxm @je-ik

Could you kindly help me review this PR. Thanks!

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

@je-ik
Copy link
Contributor

je-ik commented Sep 21, 2023

I don't think we can just skip the check. The fact that the check fails implies there are still buffered data that have not been processed yet. We need to either:
a) correctly flush and process the buffered data (which will clear the hold), or
b) ensure that checkpoint is called before the final watermark

I have a suspicion that correct is only b), because otherwise, in case of failure and restore from checkpoint after we flushed the data, we might break the stable input contract.

@kkdoon
Copy link
Contributor Author

kkdoon commented Sep 21, 2023

I don't think we can just skip the check. The fact that the check fails implies there are still buffered data that have not been processed yet. We need to either: a) correctly flush and process the buffered data (which will clear the hold), or b) ensure that checkpoint is called before the final watermark

I have a suspicion that correct is only b), because otherwise, in case of failure and restore from checkpoint after we flushed the data, we might break the stable input contract.

I agree that option B is the cleanest and makes logical sense. I think in implementation terms it works if somehow flushData is invoked after last checkpoint finishes (or some other approach). Option A violates the requiresStableInput contract and hence seems incorrect (i did experiment with this approach as well and it gave the expected output when checkpoint successfully completed).

Having said that, the current approach also works correctly when requiresStableInput is set since all the buffered data is emitted after checkpoint completes and the output watermark finally becomes equal to MAX_WATERMARK (i have run multiple jobs to verify the behavior). I can defer the currentOutputWatermark < Long.MAX_VALUE check for requiresStableInput scenario and do this check inside notifyCheckpointComplete, at the time of job completion, to verify that output watermark is correct.

@je-ik
Copy link
Contributor

je-ik commented Sep 22, 2023

Having said that, the current approach also works correctly when requiresStableInput is set since all the buffered data is emitted after checkpoint completes and the output watermark finally becomes equal to MAX_WATERMARK (i have run multiple jobs to verify the behavior).

This seems to be the source of troubles. See my comment here.

@kkdoon
Copy link
Contributor Author

kkdoon commented Oct 19, 2023

@je-ik i have updated the PR based on our discussion. Could you kindly check again?

@je-ik
Copy link
Contributor

je-ik commented Oct 19, 2023

LGTM, thanks! Can you please update the title of the PR to match the implementation? We can merge it afterwards.

@je-ik je-ik self-requested a review October 19, 2023 08:20
@je-ik
Copy link
Contributor

je-ik commented Oct 19, 2023

Could you also please squash the commits? Thanks!

@kkdoon kkdoon changed the title Skip watermark hold check for requiresStableInput operator during flush Enable drain operation for flink jobs with requiresStableInput annotation Oct 19, 2023
JayajP and others added 6 commits October 21, 2023 14:00
* Implement java exponential histograms (apache#28903)

* Address comments

* Address comments
* reference PR 28915 issue_comment removed

* post commit issue_comment fix

* revert changes for postcommit
* Add link to the Dataflow service options page

* Fix link format

* Small text edit
* Update arc terraform to allow for coloaction in the default
network.Allow usage of reserved ip. Allow usage of existing SA

* sync beam env

* move aditional runners to load based scaling
robertwb and others added 12 commits October 21, 2023 14:00
Slightly stricter definitions for catching more errors, as well as avoding the use of
anyOf which often makes it difficult to deduce what the true error is.

This does mean a pipeline must have a transform (or source/sink) block rather than simply
be itself a list of transforms.
As well as good practice, not doing so may result in much more
obscure errors (e.g. during encoding) downstream.
* Add readme for PerformanceTests TextIOIT, JDBC, Kafka IO, SpannerIO, SQLBigQueryIO and BiqQueryIO Python

* Update readme

* PRs 28582 28584 28606 28581

* PR 28738 LoadTests_Java_GBK_Dataflow

* Add readme for PostCommit Java Examples Dataflow V2

* Add readme for LoadTests Java CoGBK

* Add readme for LoadTests Python CoGBK Dataflow

* Add readme for LoadTests Python ParDo and SideInput

* Add readme for LoadTests Smoke Python and Java

* Update Readme

* Update Readme

* updated README

* Update readme for Performance Tests BigQueryIO Write Python Batch

* Remove Trigger Phrases for Load Tests and Performance tests

* PR 28846 28730 28827 28861 28897

* update readme

---------

Co-authored-by: aleksandr-dudko <[email protected]>
Co-authored-by: vitaly.terentyev <[email protected]>
Co-authored-by: magicgoody <[email protected]>
…he#29046)

Bumps [cloud.google.com/go/spanner](https://github.com/googleapis/google-cloud-go) from 1.50.0 to 1.51.0.
- [Release notes](https://github.com/googleapis/google-cloud-go/releases)
- [Changelog](https://github.com/googleapis/google-cloud-go/blob/main/CHANGES.md)
- [Commits](googleapis/google-cloud-go@spanner/v1.50.0...spanner/v1.51.0)

---
updated-dependencies:
- dependency-name: cloud.google.com/go/spanner
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
…che#28947)

Bumps [golang.org/x/net](https://github.com/golang/net) from 0.7.0 to 0.17.0.
- [Commits](golang/net@v0.7.0...v0.17.0)

---
updated-dependencies:
- dependency-name: golang.org/x/net
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
correct log message format

apply spotless

flush buffer when requiresStableInput is set

re-organize imports

add flag in FlinkPipelineOptions to allow draining for pipelines with RequiresStableInput

apply spotless again
@codecov
Copy link

codecov bot commented Oct 21, 2023

Codecov Report

Merging #28567 (e08578a) into master (9fdc59b) will decrease coverage by 8.41%.
Report is 40 commits behind head on master.
The diff coverage is n/a.

@@            Coverage Diff             @@
##           master   #28567      +/-   ##
==========================================
- Coverage   38.39%   29.98%   -8.41%     
==========================================
  Files         686      391     -295     
  Lines      101640    65297   -36343     
==========================================
- Hits        39021    19580   -19441     
+ Misses      61040    45717   -15323     
+ Partials     1579        0    -1579     
Flag Coverage Δ
go ?
python 29.98% <ø> (-0.03%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

see 302 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@kkdoon
Copy link
Contributor Author

kkdoon commented Oct 21, 2023

closing this PR since i botched up squashing the commits and created #29102 instead

@kkdoon kkdoon closed this Oct 21, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.