diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeEventSourceFactory.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeEventSourceFactory.java index e73f27f0..4b7d5193 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeEventSourceFactory.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeEventSourceFactory.java @@ -85,8 +85,8 @@ public SnapshotChangeEventSource getSnapsh @Override public StreamingChangeEventSource getStreamingChangeEventSource() { - LOGGER.info("Consistency mode is {}", configuration.consistencyMode().getValue()); - if (configuration.consistencyMode() == YugabyteDBConnectorConfig.ConsistencyMode.DEFAULT) { + LOGGER.info("Transaction ordering enabled: {}", configuration.transactionOrdering()); + if (configuration.transactionOrdering()) { LOGGER.info("Instantiating Vanilla Streaming Source"); return new YugabyteDBStreamingChangeEventSource( configuration, diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorConfig.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorConfig.java index 229169e9..3ad693f4 100755 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorConfig.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorConfig.java @@ -964,6 +964,15 @@ public static AutoCreateMode parse(String value, String defaultValue) { "'skip' to skip / ignore TRUNCATE events (default), " + "'include' to handle and include TRUNCATE events"); + public static final Field TRANSACTION_ORDERING = Field.create("transaction.ordering") + .withDisplayName("Order transactions") + .withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 23)) + .withImportance(Importance.HIGH) + .withDefault(false) + .withType(Type.BOOLEAN) + .withValidation(Field::isBoolean) + .withDescription("Specify whether the transactions need to be ordered"); + public static final Field CONSISTENCY_MODE = Field.create("consistency.mode") .withDisplayName("Transaction Consistency mode") .withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 23)) @@ -1229,6 +1238,9 @@ public TruncateHandlingMode truncateHandlingMode() { return truncateHandlingMode; } + public boolean transactionOrdering() { + return getConfig().getBoolean(TRANSACTION_ORDERING); + } public ConsistencyMode consistencyMode() { return consistencyMode; } @@ -1330,7 +1342,8 @@ protected SourceInfoStructMaker getSourceInfoStruc INTERVAL_HANDLING_MODE, SCHEMA_REFRESH_MODE, TRUNCATE_HANDLING_MODE, - INCREMENTAL_SNAPSHOT_CHUNK_SIZE) + INCREMENTAL_SNAPSHOT_CHUNK_SIZE, + TRANSACTION_ORDERING) .excluding(INCLUDE_SCHEMA_CHANGES) .create(); diff --git a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBConfigTest.java b/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBConfigTest.java index 83a5bfe6..766cc387 100644 --- a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBConfigTest.java +++ b/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBConfigTest.java @@ -288,7 +288,7 @@ public void throwExceptionIfExplicitCheckpointingNotConfiguredWithConsistency() TestHelper.execute("CREATE TABLE dummy_table (id INT PRIMARY KEY);"); final String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "dummy_table", false, false); Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.dummy_table", dbStreamId); - configBuilder.with(YugabyteDBConnectorConfig.CONSISTENCY_MODE, "global"); + configBuilder.with(YugabyteDBConnectorConfig.TRANSACTION_ORDERING, true); start(YugabyteDBConnector.class, configBuilder.build(), (success, message, error) -> { assertFalse(success); diff --git a/src/test/java/io/debezium/connector/yugabytedb/consistent/YugabyteDBConsistencyWithColocatedTest.java b/src/test/java/io/debezium/connector/yugabytedb/consistent/YugabyteDBConsistencyWithColocatedTest.java index 2358f6ac..c45435e4 100644 --- a/src/test/java/io/debezium/connector/yugabytedb/consistent/YugabyteDBConsistencyWithColocatedTest.java +++ b/src/test/java/io/debezium/connector/yugabytedb/consistent/YugabyteDBConsistencyWithColocatedTest.java @@ -140,7 +140,7 @@ public void consistencyWithColocatedTables() throws Exception { private Configuration.Builder getConsistentConfigurationBuilder(String databaseName, String tableIncludeList, String dbStreamId) throws Exception { Configuration.Builder configBuilder = TestHelper.getConfigBuilder(databaseName, tableIncludeList, dbStreamId); - configBuilder.with(YugabyteDBConnectorConfig.CONSISTENCY_MODE, "global"); + configBuilder.with(YugabyteDBConnectorConfig.TRANSACTION_ORDERING, true); configBuilder.with("transforms", "Reroute"); configBuilder.with("transforms.Reroute.type", "io.debezium.transforms.ByLogicalTableRouter"); configBuilder.with("transforms.Reroute.topic.regex", "(.*)"); diff --git a/src/test/java/io/debezium/connector/yugabytedb/consistent/YugabyteDBStreamConsistencyTest.java b/src/test/java/io/debezium/connector/yugabytedb/consistent/YugabyteDBStreamConsistencyTest.java index 4333a6dd..6067ba61 100644 --- a/src/test/java/io/debezium/connector/yugabytedb/consistent/YugabyteDBStreamConsistencyTest.java +++ b/src/test/java/io/debezium/connector/yugabytedb/consistent/YugabyteDBStreamConsistencyTest.java @@ -84,7 +84,7 @@ public void recordsShouldStreamInConsistentOrderOnly() throws Exception { String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "department"); Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.department,public.employee", dbStreamId); - configBuilder.with(YugabyteDBConnectorConfig.CONSISTENCY_MODE, "global"); + configBuilder.with(YugabyteDBConnectorConfig.TRANSACTION_ORDERING, true); configBuilder.with("transforms", "Reroute"); configBuilder.with("transforms.Reroute.type", "io.debezium.transforms.ByLogicalTableRouter"); configBuilder.with("transforms.Reroute.topic.regex", "(.*)"); @@ -200,7 +200,7 @@ public void fiveTablesWithForeignKeys() throws Exception { String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "department", false, true); Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.department,public.employee,public.contract,public.address,public.locality", dbStreamId); - configBuilder.with(YugabyteDBConnectorConfig.CONSISTENCY_MODE, "global"); + configBuilder.with(YugabyteDBConnectorConfig.TRANSACTION_ORDERING, true); configBuilder.with("transforms", "Reroute"); configBuilder.with("transforms.Reroute.type", "io.debezium.transforms.ByLogicalTableRouter"); configBuilder.with("transforms.Reroute.topic.regex", "(.*)"); @@ -339,7 +339,7 @@ public void singleTableSingleTablet() throws Exception { final String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "department"); Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.department", dbStreamId); - configBuilder.with(YugabyteDBConnectorConfig.CONSISTENCY_MODE, "global"); + configBuilder.with(YugabyteDBConnectorConfig.TRANSACTION_ORDERING, true); configBuilder.with("transforms", "Reroute"); configBuilder.with("transforms.Reroute.type", "io.debezium.transforms.ByLogicalTableRouter"); configBuilder.with("transforms.Reroute.topic.regex", "(.*)"); @@ -412,7 +412,7 @@ public void singleTableTwoTablet() throws Exception { final String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "department"); Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.department", dbStreamId); - configBuilder.with(YugabyteDBConnectorConfig.CONSISTENCY_MODE, "global"); + configBuilder.with(YugabyteDBConnectorConfig.TRANSACTION_ORDERING, true); configBuilder.with("transforms", "Reroute"); configBuilder.with("transforms.Reroute.type", "io.debezium.transforms.ByLogicalTableRouter"); configBuilder.with("transforms.Reroute.topic.regex", "(.*)"); @@ -491,7 +491,7 @@ public void singleTableSingleTabletTwoRecord() throws Exception { final String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "department"); Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.department", dbStreamId); - configBuilder.with(YugabyteDBConnectorConfig.CONSISTENCY_MODE, "global"); + configBuilder.with(YugabyteDBConnectorConfig.TRANSACTION_ORDERING, true); configBuilder.with("transforms", "Reroute"); configBuilder.with("transforms.Reroute.type", "io.debezium.transforms.ByLogicalTableRouter"); configBuilder.with("transforms.Reroute.topic.regex", "(.*)"); @@ -566,7 +566,7 @@ public void twoTableWithSingleTabletEach() throws Exception { final String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "department"); Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.department,public.employee", dbStreamId); - configBuilder.with(YugabyteDBConnectorConfig.CONSISTENCY_MODE, "global"); + configBuilder.with(YugabyteDBConnectorConfig.TRANSACTION_ORDERING, true); configBuilder.with("transforms", "Reroute"); configBuilder.with("transforms.Reroute.type", "io.debezium.transforms.ByLogicalTableRouter"); configBuilder.with("transforms.Reroute.topic.regex", "(.*)"); @@ -642,7 +642,7 @@ public void fiveTablesSingleTabletEach() throws Exception { String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "department", false, true); Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.department,public.employee,public.contract,public.address,public.locality", dbStreamId); - configBuilder.with(YugabyteDBConnectorConfig.CONSISTENCY_MODE, "global"); + configBuilder.with(YugabyteDBConnectorConfig.TRANSACTION_ORDERING, true); configBuilder.with("transforms", "Reroute"); configBuilder.with("transforms.Reroute.type", "io.debezium.transforms.ByLogicalTableRouter"); configBuilder.with("transforms.Reroute.topic.regex", "(.*)"); @@ -1001,7 +1001,7 @@ private Configuration.Builder getConsistentConfigurationBuilder(String tableIncl private Configuration.Builder getConsistentConfigurationBuilder(String databaseName, String tableIncludeList, String dbStreamId) throws Exception { Configuration.Builder configBuilder = TestHelper.getConfigBuilder(databaseName, tableIncludeList, dbStreamId); - configBuilder.with(YugabyteDBConnectorConfig.CONSISTENCY_MODE, "global"); + configBuilder.with(YugabyteDBConnectorConfig.TRANSACTION_ORDERING, true); configBuilder.with("transforms", "Reroute"); configBuilder.with("transforms.Reroute.type", "io.debezium.transforms.ByLogicalTableRouter"); configBuilder.with("transforms.Reroute.topic.regex", "(.*)");