Skip to content

Commit

Permalink
[yugabyte/yugabyte-db#18239] Add consistency configuration properties (
Browse files Browse the repository at this point in the history
  • Loading branch information
vaibhav-yb authored Jul 14, 2023
1 parent 41511d2 commit 3297794
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ public SnapshotChangeEventSource<YBPartition, YugabyteDBOffsetContext> getSnapsh

@Override
public StreamingChangeEventSource<YBPartition, YugabyteDBOffsetContext> getStreamingChangeEventSource() {
LOGGER.info("Consistency mode is {}", configuration.consistencyMode().getValue());
if (configuration.consistencyMode() == YugabyteDBConnectorConfig.ConsistencyMode.DEFAULT) {
LOGGER.info("Transaction ordering enabled: {}", configuration.transactionOrdering());
if (configuration.transactionOrdering()) {
LOGGER.info("Instantiating Vanilla Streaming Source");
return new YugabyteDBStreamingChangeEventSource(
configuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,15 @@ public static AutoCreateMode parse(String value, String defaultValue) {
"'skip' to skip / ignore TRUNCATE events (default), " +
"'include' to handle and include TRUNCATE events");

public static final Field TRANSACTION_ORDERING = Field.create("transaction.ordering")
.withDisplayName("Order transactions")
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 23))
.withImportance(Importance.HIGH)
.withDefault(false)
.withType(Type.BOOLEAN)
.withValidation(Field::isBoolean)
.withDescription("Specify whether the transactions need to be ordered");

public static final Field CONSISTENCY_MODE = Field.create("consistency.mode")
.withDisplayName("Transaction Consistency mode")
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 23))
Expand Down Expand Up @@ -1229,6 +1238,9 @@ public TruncateHandlingMode truncateHandlingMode() {
return truncateHandlingMode;
}

public boolean transactionOrdering() {
return getConfig().getBoolean(TRANSACTION_ORDERING);
}
public ConsistencyMode consistencyMode() {
return consistencyMode;
}
Expand Down Expand Up @@ -1330,7 +1342,8 @@ protected SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStruc
INTERVAL_HANDLING_MODE,
SCHEMA_REFRESH_MODE,
TRUNCATE_HANDLING_MODE,
INCREMENTAL_SNAPSHOT_CHUNK_SIZE)
INCREMENTAL_SNAPSHOT_CHUNK_SIZE,
TRANSACTION_ORDERING)
.excluding(INCLUDE_SCHEMA_CHANGES)
.create();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ public void throwExceptionIfExplicitCheckpointingNotConfiguredWithConsistency()
TestHelper.execute("CREATE TABLE dummy_table (id INT PRIMARY KEY);");
final String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "dummy_table", false, false);
Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.dummy_table", dbStreamId);
configBuilder.with(YugabyteDBConnectorConfig.CONSISTENCY_MODE, "global");
configBuilder.with(YugabyteDBConnectorConfig.TRANSACTION_ORDERING, true);

start(YugabyteDBConnector.class, configBuilder.build(), (success, message, error) -> {
assertFalse(success);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void consistencyWithColocatedTables() throws Exception {

private Configuration.Builder getConsistentConfigurationBuilder(String databaseName, String tableIncludeList, String dbStreamId) throws Exception {
Configuration.Builder configBuilder = TestHelper.getConfigBuilder(databaseName, tableIncludeList, dbStreamId);
configBuilder.with(YugabyteDBConnectorConfig.CONSISTENCY_MODE, "global");
configBuilder.with(YugabyteDBConnectorConfig.TRANSACTION_ORDERING, true);
configBuilder.with("transforms", "Reroute");
configBuilder.with("transforms.Reroute.type", "io.debezium.transforms.ByLogicalTableRouter");
configBuilder.with("transforms.Reroute.topic.regex", "(.*)");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void recordsShouldStreamInConsistentOrderOnly() throws Exception {

String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "department");
Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.department,public.employee", dbStreamId);
configBuilder.with(YugabyteDBConnectorConfig.CONSISTENCY_MODE, "global");
configBuilder.with(YugabyteDBConnectorConfig.TRANSACTION_ORDERING, true);
configBuilder.with("transforms", "Reroute");
configBuilder.with("transforms.Reroute.type", "io.debezium.transforms.ByLogicalTableRouter");
configBuilder.with("transforms.Reroute.topic.regex", "(.*)");
Expand Down Expand Up @@ -200,7 +200,7 @@ public void fiveTablesWithForeignKeys() throws Exception {

String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "department", false, true);
Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.department,public.employee,public.contract,public.address,public.locality", dbStreamId);
configBuilder.with(YugabyteDBConnectorConfig.CONSISTENCY_MODE, "global");
configBuilder.with(YugabyteDBConnectorConfig.TRANSACTION_ORDERING, true);
configBuilder.with("transforms", "Reroute");
configBuilder.with("transforms.Reroute.type", "io.debezium.transforms.ByLogicalTableRouter");
configBuilder.with("transforms.Reroute.topic.regex", "(.*)");
Expand Down Expand Up @@ -339,7 +339,7 @@ public void singleTableSingleTablet() throws Exception {

final String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "department");
Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.department", dbStreamId);
configBuilder.with(YugabyteDBConnectorConfig.CONSISTENCY_MODE, "global");
configBuilder.with(YugabyteDBConnectorConfig.TRANSACTION_ORDERING, true);
configBuilder.with("transforms", "Reroute");
configBuilder.with("transforms.Reroute.type", "io.debezium.transforms.ByLogicalTableRouter");
configBuilder.with("transforms.Reroute.topic.regex", "(.*)");
Expand Down Expand Up @@ -412,7 +412,7 @@ public void singleTableTwoTablet() throws Exception {

final String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "department");
Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.department", dbStreamId);
configBuilder.with(YugabyteDBConnectorConfig.CONSISTENCY_MODE, "global");
configBuilder.with(YugabyteDBConnectorConfig.TRANSACTION_ORDERING, true);
configBuilder.with("transforms", "Reroute");
configBuilder.with("transforms.Reroute.type", "io.debezium.transforms.ByLogicalTableRouter");
configBuilder.with("transforms.Reroute.topic.regex", "(.*)");
Expand Down Expand Up @@ -491,7 +491,7 @@ public void singleTableSingleTabletTwoRecord() throws Exception {

final String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "department");
Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.department", dbStreamId);
configBuilder.with(YugabyteDBConnectorConfig.CONSISTENCY_MODE, "global");
configBuilder.with(YugabyteDBConnectorConfig.TRANSACTION_ORDERING, true);
configBuilder.with("transforms", "Reroute");
configBuilder.with("transforms.Reroute.type", "io.debezium.transforms.ByLogicalTableRouter");
configBuilder.with("transforms.Reroute.topic.regex", "(.*)");
Expand Down Expand Up @@ -566,7 +566,7 @@ public void twoTableWithSingleTabletEach() throws Exception {

final String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "department");
Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.department,public.employee", dbStreamId);
configBuilder.with(YugabyteDBConnectorConfig.CONSISTENCY_MODE, "global");
configBuilder.with(YugabyteDBConnectorConfig.TRANSACTION_ORDERING, true);
configBuilder.with("transforms", "Reroute");
configBuilder.with("transforms.Reroute.type", "io.debezium.transforms.ByLogicalTableRouter");
configBuilder.with("transforms.Reroute.topic.regex", "(.*)");
Expand Down Expand Up @@ -642,7 +642,7 @@ public void fiveTablesSingleTabletEach() throws Exception {

String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "department", false, true);
Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.department,public.employee,public.contract,public.address,public.locality", dbStreamId);
configBuilder.with(YugabyteDBConnectorConfig.CONSISTENCY_MODE, "global");
configBuilder.with(YugabyteDBConnectorConfig.TRANSACTION_ORDERING, true);
configBuilder.with("transforms", "Reroute");
configBuilder.with("transforms.Reroute.type", "io.debezium.transforms.ByLogicalTableRouter");
configBuilder.with("transforms.Reroute.topic.regex", "(.*)");
Expand Down Expand Up @@ -1001,7 +1001,7 @@ private Configuration.Builder getConsistentConfigurationBuilder(String tableIncl

private Configuration.Builder getConsistentConfigurationBuilder(String databaseName, String tableIncludeList, String dbStreamId) throws Exception {
Configuration.Builder configBuilder = TestHelper.getConfigBuilder(databaseName, tableIncludeList, dbStreamId);
configBuilder.with(YugabyteDBConnectorConfig.CONSISTENCY_MODE, "global");
configBuilder.with(YugabyteDBConnectorConfig.TRANSACTION_ORDERING, true);
configBuilder.with("transforms", "Reroute");
configBuilder.with("transforms.Reroute.type", "io.debezium.transforms.ByLogicalTableRouter");
configBuilder.with("transforms.Reroute.topic.regex", "(.*)");
Expand Down

0 comments on commit 3297794

Please sign in to comment.