Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-34468][Connector/Cassandra] Adding support for Flink 1.20 #29

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/push_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ jobs:
compile_and_test:
strategy:
matrix:
flink: [ 1.19.0 ]
flink: [ 1.20.0 ]
include:
- flink: 1.19.0
Comment on lines +28 to +30
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rule is the test the connector with the last 2 major Flink versions so 1.20.0 and 1.19.1. And for readability I think this it is better to use the matrix
flink: [1.20.0, 1.19.1]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, you just updated the github action job that reacts to PR pushes. But you need to update the weekly.yml file which is the main job running every sunday

In this file I'd test released (v3.2) branch against last 2 snapshots of flink (to check that ongoing iterations of flink to not break released version of the connector) and the main branch against last 2 relesed versions of flink (to check that the current iteration of the connector still works on released flink versions)

- flink: 1.18.1

uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.eslintcache
.cache
.java-version
scalastyle-output.xml
.classpath
.idea/*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
Constructor <org.apache.flink.connector.cassandra.source.CassandraSource.<init>(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method <org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in (CassandraSource.java:138)
Constructor <org.apache.flink.connector.cassandra.source.CassandraSource.<init>(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (CassandraSource.java:124)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These archunit violations were legitimate to store in the violation store as they are accepted. Now that you removed them you have build issues.
Take a look at the comments in file archunit.properties to see how to use it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the same reason as I removed WriteAheadSinkTestBase as the base class of CassandraConnectorITCase.

Constructor <org.apache.flink.connector.cassandra.source.CassandraSource.<init>(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (CassandraSource.java:125)
Constructor <org.apache.flink.connector.cassandra.source.CassandraSource.<init>(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (CassandraSource.java:126)
Constructor <org.apache.flink.connector.cassandra.source.CassandraSource.<init>(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.String, [Ljava.lang.Object;)> in (CassandraSource.java:127)
Method <org.apache.flink.connector.cassandra.source.CassandraSource.checkQueryValidity(java.lang.String)> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.Object)> in (CassandraSource.java:145)
Method <org.apache.flink.connector.cassandra.source.CassandraSource.checkQueryValidity(java.lang.String)> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.Object)> in (CassandraSource.java:149)
Method <org.apache.flink.connector.cassandra.source.CassandraSource.checkQueryValidity(java.lang.String)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (CassandraSource.java:0)
Method <org.apache.flink.connector.cassandra.source.reader.CassandraSplitReader.generateRangeQuery(java.lang.String, java.lang.String)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (CassandraSplitReader.java:0)
Method <org.apache.flink.connector.cassandra.source.split.SplitsGenerator.estimateTableSize()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (SplitsGenerator.java:0)
Method <org.apache.flink.connector.cassandra.source.split.SplitsGenerator.estimateTableSize()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (SplitsGenerator.java:0)
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ public void testGenerateSplitsWithTooHighMaximumSplitSize(
}

// overridden to use unordered checks
@Override
protected void checkResultWithSemantic(
CloseableIterator<Pojo> resultIterator,
List<List<Pojo>> testData,
Expand All @@ -197,36 +196,31 @@ protected void checkResultWithSemantic(
}

@Disabled("Not a unbounded source")
@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer keeping the overrides as these methods are indeed defined in the parent class even though they are disabled because related to streaming source

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. I guess it is auto removed after removing WriteAheadSinkTestBase as base class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, these methods come from the source base test suite.

public void testSourceMetrics(
TestEnvironment testEnv,
DataStreamSourceExternalContext<Pojo> externalContext,
CheckpointingMode semantic)
throws Exception {}

@Disabled("Not a unbounded source")
@Override
public void testSavepoint(
TestEnvironment testEnv,
DataStreamSourceExternalContext<Pojo> externalContext,
CheckpointingMode semantic) {}

@Disabled("Not a unbounded source")
@Override
public void testScaleUp(
TestEnvironment testEnv,
DataStreamSourceExternalContext<Pojo> externalContext,
CheckpointingMode semantic) {}

@Disabled("Not a unbounded source")
@Override
public void testScaleDown(
TestEnvironment testEnv,
DataStreamSourceExternalContext<Pojo> externalContext,
CheckpointingMode semantic) {}

@Disabled("Not a unbounded source")
@Override
public void testTaskManagerFailure(
TestEnvironment testEnv,
DataStreamSourceExternalContext<Pojo> externalContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.testutils.junit.extensions.retry.RetryExtension;
Expand Down Expand Up @@ -80,10 +79,7 @@
@SuppressWarnings("serial")
@Testcontainers
@ExtendWith(RetryExtension.class)
class CassandraConnectorITCase
extends WriteAheadSinkTestBase<
Tuple3<String, Integer, Integer>,
CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {
class CassandraConnectorITCase {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't remove this inheritance otherwise you no more test what's in WriteAheadSinkTestBase

private static final CassandraTestEnvironment cassandraTestEnvironment =
new CassandraTestEnvironment(false);
Expand Down Expand Up @@ -284,7 +280,6 @@ void testAnnotatePojoWithTable() {
// Exactly-once Tests
// ------------------------------------------------------------------------

@Override
protected CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> createSink()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and keep the overrides.

throws Exception {
return new CassandraTupleWriteAheadSink<>(
Expand All @@ -295,17 +290,14 @@ protected CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> createS
new CassandraCommitter(cassandraTestEnvironment.getBuilderForReading()));
}

@Override
protected TupleTypeInfo<Tuple3<String, Integer, Integer>> createTypeInfo() {
return TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Integer.class);
}

@Override
protected Tuple3<String, Integer, Integer> generateValue(int counter, int checkpointID) {
return new Tuple3<>(UUID.randomUUID().toString(), counter, checkpointID);
}

@Override
protected void verifyResultsIdealCircumstances(
CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {

Expand All @@ -325,7 +317,6 @@ protected void verifyResultsIdealCircumstances(
.isEmpty();
}

@Override
protected void verifyResultsDataPersistenceUponMissedNotify(
CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {

Expand All @@ -345,7 +336,6 @@ protected void verifyResultsDataPersistenceUponMissedNotify(
.isEmpty();
}

@Override
protected void verifyResultsDataDiscardingUponRestore(
CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {

Expand All @@ -368,7 +358,6 @@ protected void verifyResultsDataDiscardingUponRestore(
.isEmpty();
}

@Override
protected void verifyResultsWhenReScaling(
CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink,
int startElementCounter,
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ under the License.
</scm>

<properties>
<flink.version>1.18.0</flink.version>
<flink.version>1.20.0</flink.version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you also need to fix the depencies convergence issues with 1.20.0

Dependency convergence error for org.apache.commons:commons-lang3:3.12.0 paths to dependency are:
+-org.apache.flink:flink-connector-cassandra_2.12:4.0-SNAPSHOT
  +-org.apache.flink:flink-streaming-java:1.20.0
    +-org.apache.flink:flink-core:1.20.0
      +-org.apache.commons:commons-lang3:3.12.0
and
+-org.apache.flink:flink-connector-cassandra_2.12:4.0-SNAPSHOT
  +-org.apache.flink:flink-streaming-java:1.20.0
    +-org.apache.flink:flink-core:1.20.0
      +-org.apache.commons:commons-text:1.10.0
        +-org.apache.commons:commons-lang3:3.12.0
and
+-org.apache.flink:flink-connector-cassandra_2.12:4.0-SNAPSHOT
  +-org.apache.flink:flink-streaming-java:1.20.0
    +-org.apache.flink:flink-core:1.20.0
      +-org.apache.commons:commons-compress:1.26.0
        +-org.apache.commons:commons-lang3:3.14.0
and
+-org.apache.flink:flink-connector-cassandra_2.12:4.0-SNAPSHOT
  +-org.apache.flink:flink-streaming-java:1.20.0
    +-org.apache.flink:flink-runtime:1.20.0
      +-org.apache.commons:commons-lang3:3.12.0
and
+-org.apache.flink:flink-connector-cassandra_2.12:4.0-SNAPSHOT
  +-org.apache.flink:flink-streaming-java:1.20.0
    +-org.apache.flink:flink-java:1.20.0
      +-org.apache.commons:commons-lang3:3.12.0
and
+-org.apache.flink:flink-connector-cassandra_2.12:4.0-SNAPSHOT
  +-org.apache.flink:flink-test-utils:1.20.0
    +-org.apache.flink:flink-runtime:1.20.0
      +-org.apache.commons:commons-lang3:3.12.0
and
+-org.apache.flink:flink-connector-cassandra_2.12:4.0-SNAPSHOT
  +-org.apache.flink:flink-test-utils:1.20.0
    +-org.apache.flink:flink-core:1.20.0
      +-org.apache.commons:commons-lang3:3.12.0

<japicmp.referenceVersion>3.1.0-1.17</japicmp.referenceVersion>
<guava.version>19.0</guava.version>
</properties>
Expand Down
Loading