Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Flink 1.20 #32863

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
},
"JavaTestProperties": {
"SUPPORTED_VERSIONS": ["8", "11", "17", "21"],
"FLINK_VERSIONS": ["1.17", "1.18", "1.19"],
"FLINK_VERSIONS": ["1.17", "1.18", "1.19", "1.20"],
"DEFAULT_FLINK_VERSION": "1.20",
"SPARK_VERSIONS": ["2", "3"]
},
"GoTestProperties": {
Expand Down
4 changes: 1 addition & 3 deletions .github/trigger_files/beam_PostCommit_Go_VR_Flink.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1,
"https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19 support"
"": "testing addition of Flink 1.20 support"
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"https://github.com/apache/beam/pull/32648": "testing flink 1.19 support"
"": "testing Flink 1.20 support"
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"https://github.com/apache/beam/pull/32648": "testing flink 1.19 support"
"": "Testing Flink 1.20 support"
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19 support"
"": "testing addition of Flink 1.20 support"
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19 support"
"": "testing addition of Flink 1.20 support"
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
{

"https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19 support"
"": "testing addition of Flink 1.20 support"
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19 support"
"": "testing addition of Flink 1.20 support"
}
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_XVR_Flink.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19 support"
"": "testing addition of Flink 1.20 support"
}
2 changes: 1 addition & 1 deletion .github/workflows/beam_LoadTests_Java_GBK_Smoke.yml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ jobs:
arguments: |
--info \
-PloadTest.mainClass=org.apache.beam.sdk.loadtests.GroupByKeyLoadTest \
-Prunner=:runners:flink:1.19 \
-Prunner=:runners:flink:1.20 \
'-PloadTest.args=${{ env.beam_LoadTests_Java_GBK_Smoke_test_arguments_3 }}' \
- name: run GroupByKey load test Spark
uses: ./.github/actions/gradle-command-self-hosted-action
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/beam_PostCommit_Java_Examples_Flink.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ jobs:
- name: run examplesIntegrationTest script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :runners:flink:1.19:examplesIntegrationTest
gradle-command: :runners:flink:1.20:examplesIntegrationTest
- name: Archive JUnit Test Results
uses: actions/upload-artifact@v4
if: ${{ !success() }}
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/beam_PostCommit_Java_Nexmark_Flink.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,13 @@ jobs:
with:
gradle-command: :sdks:java:testing:nexmark:run
arguments: |
-Pnexmark.runner=:runners:flink:1.19 \
-Pnexmark.runner=:runners:flink:1.20 \
"${{ env.GRADLE_COMMAND_ARGUMENTS }} --streaming=${{ matrix.streaming }} --queryLanguage=${{ matrix.queryLanguage }}" \
- name: run PostCommit Java Nexmark Flink (${{ matrix.streaming }}) script
if: matrix.queryLanguage == 'none'
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :sdks:java:testing:nexmark:run
arguments: |
-Pnexmark.runner=:runners:flink:1.19 \
-Pnexmark.runner=:runners:flink:1.20 \
"${{ env.GRADLE_COMMAND_ARGUMENTS }}--streaming=${{ matrix.streaming }}"
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ jobs:
- name: run PostCommit Java Flink PortableValidatesRunner Streaming script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: runners:flink:1.19:job-server:validatesPortableRunnerStreaming
gradle-command: runners:flink:1.20:job-server:validatesPortableRunnerStreaming
- name: Archive JUnit Test Results
uses: actions/upload-artifact@v4
if: ${{ !success() }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/beam_PostCommit_Java_Tpcds_Flink.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,5 +101,5 @@ jobs:
with:
gradle-command: :sdks:java:testing:tpcds:run
arguments: |
-Ptpcds.runner=:runners:flink:1.19 \
-Ptpcds.runner=:runners:flink:1.20 \
"-Ptpcds.args=${{env.tpcdsBigQueryArgs}} ${{env.tpcdsInfluxDBArgs}} ${{ env.GRADLE_COMMAND_ARGUMENTS }} --queries=${{env.tpcdsQueriesArg}}" \
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jobs:
- name: run validatesRunner script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :runners:flink:1.19:validatesRunner
gradle-command: :runners:flink:1.20:validatesRunner
- name: Archive JUnit Test Results
uses: actions/upload-artifact@v4
if: ${{ !success() }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ jobs:
11
- name: run jar Java8 script
run: |
./gradlew :runners:flink:1.19:jar :runners:flink:1.19:testJar
./gradlew :runners:flink:1.20:jar :runners:flink:1.20:testJar
- name: run validatesRunner Java8 script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :runners:flink:1.19:validatesRunner
gradle-command: :runners:flink:1.20:validatesRunner
arguments: |
-x shadowJar \
-x shadowTestJar \
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/beam_PostCommit_XVR_Flink.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ env:
DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}
FlinkVersion: 1.19
FlinkVersion: 1.20

jobs:
beam_PostCommit_XVR_Flink:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/beam_PreCommit_Java_PVR_Flink_Batch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ jobs:
- name: run validatesPortableRunnerBatch script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :runners:flink:1.19:job-server:validatesPortableRunnerBatch
gradle-command: :runners:flink:1.20:job-server:validatesPortableRunnerBatch
env:
CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH }}
- name: Archive JUnit Test Results
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/beam_PreCommit_Java_PVR_Flink_Docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ jobs:
- name: run PreCommit Java PVR Flink Docker script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :runners:flink:1.19:job-server:validatesPortableRunnerDocker
gradle-command: :runners:flink:1.20:job-server:validatesPortableRunnerDocker
env:
CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH}}
- name: Archive JUnit Test Results
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/beam_Publish_Docker_Snapshots.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ jobs:
- name: run Publish Docker Snapshots script for Flink
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :runners:flink:1.17:job-server-container:dockerPush
gradle-command: :runners:flink:1.20:job-server-container:dockerPush
arguments: |
-Pdocker-repository-root=gcr.io/apache-beam-testing/beam_portability \
-Pdocker-tag-list=latest
-Pdocker-tag-list=latest
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).
* [Python] Introduce Managed Transforms API ([#31495](https://github.com/apache/beam/pull/31495))
* Flink 1.19 support added ([#32648](https://github.com/apache/beam/pull/32648))
* Flink 1.20 support added ([#32863](https://github.com/apache/beam/pull/32863))

## I/Os

Expand Down
2 changes: 1 addition & 1 deletion contributor-docs/release-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,7 @@ write to BigQuery, and create a cluster of machines for running containers (for
```
**Flink Local Runner**
```
./gradlew :runners:flink:1.19:runQuickstartJavaFlinkLocal \
./gradlew :runners:flink:1.20:runQuickstartJavaFlinkLocal \
-Prepourl=https://repository.apache.org/content/repositories/orgapachebeam-${KEY} \
-Pver=${RELEASE_VERSION}
```
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ docker_image_default_repo_root=apache
docker_image_default_repo_prefix=beam_

# supported flink versions
flink_versions=1.17,1.18,1.19
flink_versions=1.17,1.18,1.19,1.20
# supported python versions
python_versions=3.9,3.10,3.11,3.12
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ $ wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
{{if (eq .Sdk "java")}}

##### Portable
1. Starting with Beam 2.18.0, pre-built Flink Job Service Docker images are available at Docker Hub: `Flink 1.17`, `Flink 1.18`, `Flink 1.19`.
2. Start the JobService endpoint: `docker run --net=host apache/beam_flink1.19_job_server:latest`
1. Starting with Beam 2.18.0, pre-built Flink Job Service Docker images are available at Docker Hub: `Flink 1.17`, `Flink 1.18`, `Flink `1.19`, `Flink 1.20`.
2. Start the JobService endpoint: `docker run --net=host apache/beam_flink1.20_job_server:latest`
3. Submit the pipeline to the above endpoint by using the PortableRunner, job_endpoint set to localhost:8099 (this is the default address of the JobService). Optionally set environment_type set to LOOPBACK. For example:

```
Expand Down Expand Up @@ -233,8 +233,8 @@ mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
{{end}}

{{if (eq .Sdk "python")}}
1. Starting with Beam 2.18.0, pre-built Flink Job Service Docker images are available at Docker Hub: `Flink 1.17`, `Flink 1.18`, `Flink 1.19`.
2. Start the JobService endpoint: `docker run --net=host apache/beam_flink1.19_job_server:latest`
1. Starting with Beam 2.18.0, pre-built Flink Job Service Docker images are available at Docker Hub: `Flink 1.17`, `Flink 1.18`, `Flink 1.19`, `Flink 1.20`.
2. Start the JobService endpoint: `docker run --net=host apache/beam_flink1.20_job_server:latest`
3. Submit the pipeline to the above endpoint by using the PortableRunner, job_endpoint set to localhost:8099 (this is the default address of the JobService). Optionally set environment_type set to LOOPBACK. For example:

```
Expand Down
2 changes: 1 addition & 1 deletion release/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ task("runJavaExamplesValidationTask") {
dependsOn(":runners:direct-java:runQuickstartJavaDirect")
dependsOn(":runners:google-cloud-dataflow-java:runQuickstartJavaDataflow")
dependsOn(":runners:spark:3:runQuickstartJavaSpark")
dependsOn(":runners:flink:1.19:runQuickstartJavaFlinkLocal")
dependsOn(":runners:flink:1.20:runQuickstartJavaFlinkLocal")
dependsOn(":runners:direct-java:runMobileGamingJavaDirect")
dependsOn(":runners:google-cloud-dataflow-java:runMobileGamingJavaDataflow")
dependsOn(":runners:twister2:runQuickstartJavaTwister2")
Expand Down
25 changes: 25 additions & 0 deletions runners/flink/1.20/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

project.ext {
flink_major = '1.20'
flink_version = '1.20.0'
}

// Load the main build script which contains all build logic.
apply from: "../flink_runner.gradle"
26 changes: 26 additions & 0 deletions runners/flink/1.20/job-server-container/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

def basePath = '../../job-server-container'

project.ext {
resource_path = basePath
}

// Load the main build script which contains all build logic.
apply from: "$basePath/flink_job_server_container.gradle"
31 changes: 31 additions & 0 deletions runners/flink/1.20/job-server/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

def basePath = '../../job-server'

project.ext {
// Look for the source code in the parent module
main_source_dirs = ["$basePath/src/main/java"]
test_source_dirs = ["$basePath/src/test/java"]
main_resources_dirs = ["$basePath/src/main/resources"]
test_resources_dirs = ["$basePath/src/test/resources"]
archives_base_name = 'beam-runners-flink-1.20-job-server'
}

// Load the main build script which contains all build logic.
apply from: "$basePath/flink_job_server.gradle"
4 changes: 4 additions & 0 deletions runners/flink/flink_runner.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ dependencies {
implementation "org.apache.flink:flink-metrics-core:$flink_version"
implementation "org.apache.flink:flink-java:$flink_version"

if (flink_version.compareTo("1.20") >= 0) {
implementation "org.apache.flink:flink-core-api:$flink_version"
}

implementation "org.apache.flink:flink-runtime:$flink_version"
implementation "org.apache.flink:flink-metrics-core:$flink_version"
testImplementation "org.apache.flink:flink-runtime:$flink_version:tests"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,14 @@ public class DoFnOperator<InputT, OutputT> extends AbstractStreamOperator<Window
private static final int MAX_NUMBER_PENDING_BUNDLE_FINALIZATIONS = 32;

protected transient InternalTimerService<TimerData> timerService;
private transient InternalTimeServiceManager<?> timeServiceManager;

/**
* In Flink 1.19 and below, this is a private field of the superclass that we access and cache
* here to avoid the boilerplate of `Optional` unpacking every time.
*
* <p>In Flink 1.20 and above, this becomes a protected field and we can remove it.
*/
private transient InternalTimeServiceManager<?> cachedTimeServiceManager;

private transient PushedBackElementsHandler<WindowedValue<InputT>> pushedBackElementsHandler;

Expand Down Expand Up @@ -474,7 +481,7 @@ public void initializeState(StateInitializationContext context) throws Exception
}

timerInternals = new FlinkTimerInternals(timerService);
timeServiceManager =
cachedTimeServiceManager =
getTimeServiceManager()
.orElseThrow(() -> new IllegalStateException("Time service manager is not set."));
}
Expand Down Expand Up @@ -688,17 +695,18 @@ protected int numProcessingTimeTimers() {
return getTimeServiceManager()
.map(
manager -> {
if (timeServiceManager instanceof InternalTimeServiceManagerImpl) {
if (cachedTimeServiceManager instanceof InternalTimeServiceManagerImpl) {
final InternalTimeServiceManagerImpl<?> cast =
(InternalTimeServiceManagerImpl<?>) timeServiceManager;
(InternalTimeServiceManagerImpl<?>) cachedTimeServiceManager;
return cast.numProcessingTimeTimers();
} else if (timeServiceManager instanceof BatchExecutionInternalTimeServiceManager) {
} else if (cachedTimeServiceManager
instanceof BatchExecutionInternalTimeServiceManager) {
return 0;
} else {
throw new IllegalStateException(
String.format(
"Unknown implementation of InternalTimerServiceManager. %s",
timeServiceManager));
cachedTimeServiceManager));
}
})
.orElse(0);
Expand Down Expand Up @@ -837,7 +845,7 @@ public final void processWatermark1(Watermark mark) throws Exception {
private void processInputWatermark(boolean advanceInputWatermark) throws Exception {
long inputWatermarkHold = applyInputWatermarkHold(getEffectiveInputWatermark());
if (keyCoder != null && advanceInputWatermark) {
timeServiceManager.advanceWatermark(new Watermark(inputWatermarkHold));
cachedTimeServiceManager.advanceWatermark(new Watermark(inputWatermarkHold));
}

long potentialOutputWatermark =
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/examples/stringsplit/stringsplit.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
// 1. From a command line, navigate to the top-level beam/ directory and run
// the Flink job server:
//
// ./gradlew :runners:flink:1.19:job-server:runShadow -Djob-host=localhost -Dflink-master=local
// ./gradlew :runners:flink:1.20:job-server:runShadow -Djob-host=localhost -Dflink-master=local
//
// 2. The job server is ready to receive jobs once it outputs a log like the
// following: `JobService started on localhost:8099`. Take note of the endpoint
Expand Down
Loading
Loading