From 3c2a76df4aecd6270e07ceeec18c099677f78516 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 17 Oct 2024 13:58:07 -0400 Subject: [PATCH] Drop Flink 1.16 support --- CHANGES.md | 2 +- gradle.properties | 2 +- .../runner-concepts/description.md | 4 +- runners/flink/1.16/build.gradle | 25 --- .../1.16/job-server-container/build.gradle | 26 --- runners/flink/1.16/job-server/build.gradle | 31 --- .../types/CoderTypeSerializer.java | 195 ------------------ .../streaming/MemoryStateBackendWrapper.java | 0 .../flink/streaming/StreamSources.java | 0 .../src/apache_beam/runners/flink.ts | 2 +- settings.gradle.kts | 4 - .../content/en/documentation/runners/flink.md | 7 +- 12 files changed, 8 insertions(+), 290 deletions(-) delete mode 100644 runners/flink/1.16/build.gradle delete mode 100644 runners/flink/1.16/job-server-container/build.gradle delete mode 100644 runners/flink/1.16/job-server/build.gradle delete mode 100644 runners/flink/1.16/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java rename runners/flink/{1.16 => 1.17}/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java (100%) rename runners/flink/{1.16 => 1.17}/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java (100%) diff --git a/CHANGES.md b/CHANGES.md index 41f2e74bbeae..67dd0ccf0601 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -83,7 +83,7 @@ ## Deprecations -* Removed support for Flink 1.15 +* Removed support for Flink 1.15 and 1.16 * X behavior is deprecated and will be removed in X versions ([#X](https://github.com/apache/beam/issues/X)). ## Bugfixes diff --git a/gradle.properties b/gradle.properties index 868c7501ac31..db1db368beb0 100644 --- a/gradle.properties +++ b/gradle.properties @@ -39,6 +39,6 @@ docker_image_default_repo_root=apache docker_image_default_repo_prefix=beam_ # supported flink versions -flink_versions=1.16,1.17,1.18,1.19 +flink_versions=1.17,1.18,1.19 # supported python versions python_versions=3.8,3.9,3.10,3.11,3.12 diff --git a/learning/tour-of-beam/learning-content/introduction/introduction-concepts/runner-concepts/description.md b/learning/tour-of-beam/learning-content/introduction/introduction-concepts/runner-concepts/description.md index 063e7f35f876..c0d7b37725ac 100644 --- a/learning/tour-of-beam/learning-content/introduction/introduction-concepts/runner-concepts/description.md +++ b/learning/tour-of-beam/learning-content/introduction/introduction-concepts/runner-concepts/description.md @@ -191,7 +191,7 @@ $ wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \ {{if (eq .Sdk "java")}} ##### Portable -1. Starting with Beam 2.18.0, pre-built Flink Job Service Docker images are available at Docker Hub: `Flink 1.16`, `Flink 1.17`, `Flink 1.18`, `Flink 1.19`. +1. Starting with Beam 2.18.0, pre-built Flink Job Service Docker images are available at Docker Hub: `Flink 1.17`, `Flink 1.18`, `Flink 1.19`. 2. Start the JobService endpoint: `docker run --net=host apache/beam_flink1.19_job_server:latest` 3. Submit the pipeline to the above endpoint by using the PortableRunner, job_endpoint set to localhost:8099 (this is the default address of the JobService). Optionally set environment_type set to LOOPBACK. For example: @@ -233,7 +233,7 @@ mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ {{end}} {{if (eq .Sdk "python")}} -1. Starting with Beam 2.18.0, pre-built Flink Job Service Docker images are available at Docker Hub: `Flink 1.16`, `Flink 1.17`, `Flink 1.18`, `Flink 1.19`. +1. Starting with Beam 2.18.0, pre-built Flink Job Service Docker images are available at Docker Hub: `Flink 1.17`, `Flink 1.18`, `Flink 1.19`. 2. Start the JobService endpoint: `docker run --net=host apache/beam_flink1.19_job_server:latest` 3. Submit the pipeline to the above endpoint by using the PortableRunner, job_endpoint set to localhost:8099 (this is the default address of the JobService). Optionally set environment_type set to LOOPBACK. For example: diff --git a/runners/flink/1.16/build.gradle b/runners/flink/1.16/build.gradle deleted file mode 100644 index 21a222864a27..000000000000 --- a/runners/flink/1.16/build.gradle +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -project.ext { - flink_major = '1.16' - flink_version = '1.16.0' -} - -// Load the main build script which contains all build logic. -apply from: "../flink_runner.gradle" diff --git a/runners/flink/1.16/job-server-container/build.gradle b/runners/flink/1.16/job-server-container/build.gradle deleted file mode 100644 index afdb68a0fc91..000000000000 --- a/runners/flink/1.16/job-server-container/build.gradle +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -def basePath = '../../job-server-container' - -project.ext { - resource_path = basePath -} - -// Load the main build script which contains all build logic. -apply from: "$basePath/flink_job_server_container.gradle" diff --git a/runners/flink/1.16/job-server/build.gradle b/runners/flink/1.16/job-server/build.gradle deleted file mode 100644 index 99dc00275a0c..000000000000 --- a/runners/flink/1.16/job-server/build.gradle +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -def basePath = '../../job-server' - -project.ext { - // Look for the source code in the parent module - main_source_dirs = ["$basePath/src/main/java"] - test_source_dirs = ["$basePath/src/test/java"] - main_resources_dirs = ["$basePath/src/main/resources"] - test_resources_dirs = ["$basePath/src/test/resources"] - archives_base_name = 'beam-runners-flink-1.16-job-server' -} - -// Load the main build script which contains all build logic. -apply from: "$basePath/flink_job_server.gradle" diff --git a/runners/flink/1.16/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/1.16/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java deleted file mode 100644 index 956aad428d8b..000000000000 --- a/runners/flink/1.16/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.types; - -import java.io.EOFException; -import java.io.IOException; -import org.apache.beam.runners.core.construction.SerializablePipelineOptions; -import org.apache.beam.runners.flink.FlinkPipelineOptions; -import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper; -import org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; -import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; -import org.apache.flink.core.io.VersionedIOReadableWritable; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.checkerframework.checker.nullness.qual.Nullable; - -/** - * Flink {@link org.apache.flink.api.common.typeutils.TypeSerializer} for Beam {@link - * org.apache.beam.sdk.coders.Coder Coders}. - */ -@SuppressWarnings({ - "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) -public class CoderTypeSerializer extends TypeSerializer { - - private static final long serialVersionUID = 7247319138941746449L; - - private final Coder coder; - - /** - * {@link SerializablePipelineOptions} deserialization will cause {@link - * org.apache.beam.sdk.io.FileSystems} registration needed for {@link - * org.apache.beam.sdk.transforms.Reshuffle} translation. - */ - private final SerializablePipelineOptions pipelineOptions; - - private final boolean fasterCopy; - - public CoderTypeSerializer(Coder coder, SerializablePipelineOptions pipelineOptions) { - Preconditions.checkNotNull(coder); - Preconditions.checkNotNull(pipelineOptions); - this.coder = coder; - this.pipelineOptions = pipelineOptions; - - FlinkPipelineOptions options = pipelineOptions.get().as(FlinkPipelineOptions.class); - this.fasterCopy = options.getFasterCopy(); - } - - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public CoderTypeSerializer duplicate() { - return new CoderTypeSerializer<>(coder, pipelineOptions); - } - - @Override - public T createInstance() { - return null; - } - - @Override - public T copy(T t) { - if (fasterCopy) { - return t; - } - try { - return CoderUtils.clone(coder, t); - } catch (CoderException e) { - throw new RuntimeException("Could not clone.", e); - } - } - - @Override - public T copy(T t, T reuse) { - return copy(t); - } - - @Override - public int getLength() { - return -1; - } - - @Override - public void serialize(T t, DataOutputView dataOutputView) throws IOException { - DataOutputViewWrapper outputWrapper = new DataOutputViewWrapper(dataOutputView); - coder.encode(t, outputWrapper); - } - - @Override - public T deserialize(DataInputView dataInputView) throws IOException { - try { - DataInputViewWrapper inputWrapper = new DataInputViewWrapper(dataInputView); - return coder.decode(inputWrapper); - } catch (CoderException e) { - Throwable cause = e.getCause(); - if (cause instanceof EOFException) { - throw (EOFException) cause; - } else { - throw e; - } - } - } - - @Override - public T deserialize(T t, DataInputView dataInputView) throws IOException { - return deserialize(dataInputView); - } - - @Override - public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException { - serialize(deserialize(dataInputView), dataOutputView); - } - - @Override - public boolean equals(@Nullable Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - CoderTypeSerializer that = (CoderTypeSerializer) o; - return coder.equals(that.coder); - } - - @Override - public int hashCode() { - return coder.hashCode(); - } - - @Override - public TypeSerializerSnapshot snapshotConfiguration() { - return new UnversionedTypeSerializerSnapshot<>(this); - } - - /** - * A legacy snapshot which does not care about schema compatibility. This is used only for state - * restore of state created by Beam 2.54.0 and below for Flink 1.16 and below. - */ - public static class LegacySnapshot extends TypeSerializerConfigSnapshot { - - /** Needs to be public to work with {@link VersionedIOReadableWritable}. */ - public LegacySnapshot() {} - - public LegacySnapshot(CoderTypeSerializer serializer) { - setPriorSerializer(serializer); - } - - @Override - public int getVersion() { - // We always return the same version - return 1; - } - - @Override - public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( - TypeSerializer newSerializer) { - - // We assume compatibility because we don't have a way of checking schema compatibility - return TypeSerializerSchemaCompatibility.compatibleAsIs(); - } - } - - @Override - public String toString() { - return "CoderTypeSerializer{" + "coder=" + coder + '}'; - } -} diff --git a/runners/flink/1.16/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java b/runners/flink/1.17/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java similarity index 100% rename from runners/flink/1.16/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java rename to runners/flink/1.17/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java diff --git a/runners/flink/1.16/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java b/runners/flink/1.17/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java similarity index 100% rename from runners/flink/1.16/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java rename to runners/flink/1.17/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java diff --git a/sdks/typescript/src/apache_beam/runners/flink.ts b/sdks/typescript/src/apache_beam/runners/flink.ts index b68d3070a720..ab2d641b3302 100644 --- a/sdks/typescript/src/apache_beam/runners/flink.ts +++ b/sdks/typescript/src/apache_beam/runners/flink.ts @@ -28,7 +28,7 @@ import { JavaJarService } from "../utils/service"; const MAGIC_HOST_NAMES = ["[local]", "[auto]"]; // These should stay in sync with gradle.properties. -const PUBLISHED_FLINK_VERSIONS = ["1.16", "1.17", "1.18", "1.19"]; +const PUBLISHED_FLINK_VERSIONS = ["1.17", "1.18", "1.19"]; const defaultOptions = { flinkMaster: "[local]", diff --git a/settings.gradle.kts b/settings.gradle.kts index 67e499e1ea31..a38f69dac09e 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -125,10 +125,6 @@ include(":runners:extensions-java:metrics") * verify versions in website/www/site/content/en/documentation/runners/flink.md * verify version in sdks/python/apache_beam/runners/interactive/interactive_beam.py */ -// Flink 1.16 -include(":runners:flink:1.16") -include(":runners:flink:1.16:job-server") -include(":runners:flink:1.16:job-server-container") // Flink 1.17 include(":runners:flink:1.17") include(":runners:flink:1.17:job-server") diff --git a/website/www/site/content/en/documentation/runners/flink.md b/website/www/site/content/en/documentation/runners/flink.md index 9bf99cf9e4c2..fb897805cfd6 100644 --- a/website/www/site/content/en/documentation/runners/flink.md +++ b/website/www/site/content/en/documentation/runners/flink.md @@ -196,7 +196,6 @@ The optional `flink_version` option may be required as well for older versions o {{< paragraph class="language-portable" >}} Starting with Beam 2.18.0, pre-built Flink Job Service Docker images are available at Docker Hub: -[Flink 1.16](https://hub.docker.com/r/apache/beam_flink1.16_job_server). [Flink 1.17](https://hub.docker.com/r/apache/beam_flink1.17_job_server). [Flink 1.18](https://hub.docker.com/r/apache/beam_flink1.18_job_server). [Flink 1.19](https://hub.docker.com/r/apache/beam_flink1.19_job_server). @@ -312,8 +311,8 @@ reference. ## Flink Version Compatibility The Flink cluster version has to match the minor version used by the FlinkRunner. -The minor version is the first two numbers in the version string, e.g. in `1.16.0` the -minor version is `1.16`. +The minor version is the first two numbers in the version string, e.g. in `1.19.0` the +minor version is `1.19`. We try to track the latest version of Apache Flink at the time of the Beam release. A Flink version is supported by Beam for the time it is supported by the Flink community. @@ -344,7 +343,7 @@ To find out which version of Flink is compatible with Beam please see the table 1.16.x beam-runners-flink-1.16 - ≥ 2.47.0 + 2.47.0 - 2.60.0 1.15.x