Skip to content

Commit

Permalink
[FLINK-34468][Connector/Cassandra] Adding support for Flink 1.19
Browse files Browse the repository at this point in the history
  • Loading branch information
HuangZhenQiu committed Aug 21, 2024
1 parent 997a12e commit a4af9a1
Show file tree
Hide file tree
Showing 6 changed files with 6 additions and 27 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/push_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ jobs:
compile_and_test:
strategy:
matrix:
flink: [ 1.19.0 ]
flink: [ 1.20.0 ]
include:
- flink: 1.19.0
- flink: 1.18.1

uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.eslintcache
.cache
.java-version
scalastyle-output.xml
.classpath
.idea/*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
Constructor <org.apache.flink.connector.cassandra.source.CassandraSource.<init>(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method <org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in (CassandraSource.java:138)
Constructor <org.apache.flink.connector.cassandra.source.CassandraSource.<init>(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (CassandraSource.java:124)
Constructor <org.apache.flink.connector.cassandra.source.CassandraSource.<init>(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (CassandraSource.java:125)
Constructor <org.apache.flink.connector.cassandra.source.CassandraSource.<init>(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (CassandraSource.java:126)
Constructor <org.apache.flink.connector.cassandra.source.CassandraSource.<init>(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.String, [Ljava.lang.Object;)> in (CassandraSource.java:127)
Method <org.apache.flink.connector.cassandra.source.CassandraSource.checkQueryValidity(java.lang.String)> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.Object)> in (CassandraSource.java:145)
Method <org.apache.flink.connector.cassandra.source.CassandraSource.checkQueryValidity(java.lang.String)> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.Object)> in (CassandraSource.java:149)
Method <org.apache.flink.connector.cassandra.source.CassandraSource.checkQueryValidity(java.lang.String)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (CassandraSource.java:0)
Method <org.apache.flink.connector.cassandra.source.reader.CassandraSplitReader.generateRangeQuery(java.lang.String, java.lang.String)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (CassandraSplitReader.java:0)
Method <org.apache.flink.connector.cassandra.source.split.SplitsGenerator.estimateTableSize()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (SplitsGenerator.java:0)
Method <org.apache.flink.connector.cassandra.source.split.SplitsGenerator.estimateTableSize()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (SplitsGenerator.java:0)
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ public void testGenerateSplitsWithTooHighMaximumSplitSize(
}

// overridden to use unordered checks
@Override
protected void checkResultWithSemantic(
CloseableIterator<Pojo> resultIterator,
List<List<Pojo>> testData,
Expand All @@ -197,36 +196,31 @@ protected void checkResultWithSemantic(
}

@Disabled("Not a unbounded source")
@Override
public void testSourceMetrics(
TestEnvironment testEnv,
DataStreamSourceExternalContext<Pojo> externalContext,
CheckpointingMode semantic)
throws Exception {}

@Disabled("Not a unbounded source")
@Override
public void testSavepoint(
TestEnvironment testEnv,
DataStreamSourceExternalContext<Pojo> externalContext,
CheckpointingMode semantic) {}

@Disabled("Not a unbounded source")
@Override
public void testScaleUp(
TestEnvironment testEnv,
DataStreamSourceExternalContext<Pojo> externalContext,
CheckpointingMode semantic) {}

@Disabled("Not a unbounded source")
@Override
public void testScaleDown(
TestEnvironment testEnv,
DataStreamSourceExternalContext<Pojo> externalContext,
CheckpointingMode semantic) {}

@Disabled("Not a unbounded source")
@Override
public void testTaskManagerFailure(
TestEnvironment testEnv,
DataStreamSourceExternalContext<Pojo> externalContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.testutils.junit.extensions.retry.RetryExtension;
Expand Down Expand Up @@ -80,10 +79,7 @@
@SuppressWarnings("serial")
@Testcontainers
@ExtendWith(RetryExtension.class)
class CassandraConnectorITCase
extends WriteAheadSinkTestBase<
Tuple3<String, Integer, Integer>,
CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {
class CassandraConnectorITCase {

private static final CassandraTestEnvironment cassandraTestEnvironment =
new CassandraTestEnvironment(false);
Expand Down Expand Up @@ -284,7 +280,6 @@ void testAnnotatePojoWithTable() {
// Exactly-once Tests
// ------------------------------------------------------------------------

@Override
protected CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> createSink()
throws Exception {
return new CassandraTupleWriteAheadSink<>(
Expand All @@ -295,17 +290,14 @@ protected CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> createS
new CassandraCommitter(cassandraTestEnvironment.getBuilderForReading()));
}

@Override
protected TupleTypeInfo<Tuple3<String, Integer, Integer>> createTypeInfo() {
return TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Integer.class);
}

@Override
protected Tuple3<String, Integer, Integer> generateValue(int counter, int checkpointID) {
return new Tuple3<>(UUID.randomUUID().toString(), counter, checkpointID);
}

@Override
protected void verifyResultsIdealCircumstances(
CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {

Expand All @@ -325,7 +317,6 @@ protected void verifyResultsIdealCircumstances(
.isEmpty();
}

@Override
protected void verifyResultsDataPersistenceUponMissedNotify(
CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {

Expand All @@ -345,7 +336,6 @@ protected void verifyResultsDataPersistenceUponMissedNotify(
.isEmpty();
}

@Override
protected void verifyResultsDataDiscardingUponRestore(
CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {

Expand All @@ -368,7 +358,6 @@ protected void verifyResultsDataDiscardingUponRestore(
.isEmpty();
}

@Override
protected void verifyResultsWhenReScaling(
CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink,
int startElementCounter,
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ under the License.
</scm>

<properties>
<flink.version>1.18.0</flink.version>
<flink.version>1.20.0</flink.version>
<japicmp.referenceVersion>3.1.0-1.17</japicmp.referenceVersion>
<guava.version>19.0</guava.version>
</properties>
Expand Down

0 comments on commit a4af9a1

Please sign in to comment.