From d3b340ffaddb4740bd1411bc425e868a418c2836 Mon Sep 17 00:00:00 2001 From: Isha Amoncar Date: Mon, 28 Feb 2022 04:59:42 +0000 Subject: [PATCH 1/2] Fixed previous offset --- .../yugabytedb/YugabyteDBOffsetContext.java | 15 +++++++++------ .../YugabyteDBSnapshotChangeEventSource.java | 3 +-- .../YugabyteDBStreamingChangeEventSource.java | 3 +++ 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java index 851b02af861..40916aa9033 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java @@ -60,7 +60,7 @@ private YugabyteDBOffsetContext(YugabyteDBConnectorConfig connectorConfig, // sourceInfo.update(lsn, time, txId, null, sourceInfo.xmin()); sourceInfo.updateLastCommit(lastCommitLsn); sourceInfoSchema = sourceInfo.schema(); - + this.lastSnapshotRecord = lastSnapshotRecord; if (this.lastSnapshotRecord) { postSnapshotCompletion(); @@ -79,6 +79,8 @@ public YugabyteDBOffsetContext(Set s, this.sourceInfoSchema = sourceInfo.schema(); for (YugabyteDBOffsetContext context : s) { if (context != null) { + this.lastCompletelyProcessedLsn = context.lastCompletelyProcessedLsn; + this.lastCommitLsn = context.lastCommitLsn; LOGGER.debug("Populating the tabletsourceinfo" + context.getTabletSourceInfo()); if (context.getTabletSourceInfo() != null) { this.tabletSourceInfo.putAll(context.getTabletSourceInfo()); @@ -273,7 +275,8 @@ public String toString() { + ", lastCommitLsn=" + lastCommitLsn + ", streamingStoppingLsn=" + streamingStoppingLsn + ", transactionContext=" + transactionContext - + ", incrementalSnapshotContext=" + incrementalSnapshotContext + "]"; + + ", incrementalSnapshotContext=" + incrementalSnapshotContext + + ", tabletSourceInfo=" + tabletSourceInfo + "]"; } public OffsetState asOffsetState() { @@ -333,7 +336,7 @@ private String readOptionalString(Map offset, String key) { public YugabyteDBOffsetContext load(Map offset) { LOGGER.debug("The offset being loaded in YugabyteDBOffsetContext.. " + offset); - + OpId opid1 = OpId.valueOf((String) offset.get(YugabyteDBOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY)); /* * final OpId lsn = OpId.valueOf(readOptionalString(offset, SourceInfo.LSN_KEY)); * final OpId lastCompletelyProcessedLsn = OpId.valueOf(readOptionalString(offset, @@ -355,9 +358,9 @@ public YugabyteDBOffsetContext load(Map offset) { */ return new YugabyteDBOffsetContext(connectorConfig, - new OpId(0, 0, null, 0, 0), - new OpId(0, 0, null, 0, 0), - new OpId(0, 0, null, 0, 0), + opid1, + opid1, + opid1, "txId", Instant.MIN, false, false, TransactionContext.load(offset), SignalBasedIncrementalSnapshotContext.load(offset)); diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java index 2dc68b1886d..151ec1bec18 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java @@ -112,7 +112,6 @@ public SnapshotResult doExecute( SnapshottingTask snapshottingTask) throws Exception { final RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx = (RelationalSnapshotChangeEventSource.RelationalSnapshotContext) snapshotContext; - // Connection connection = null; try { LOGGER.info("Snapshot step 1 - Preparing"); @@ -168,7 +167,7 @@ public SnapshotResult doExecute( // postSnapshot(); dispatcher.alwaysDispatchHeartbeatEvent(ctx.partition, ctx.offset); - return SnapshotResult.completed(ctx.offset); + return SnapshotResult.completed(previousOffset); } finally { // rollbackTransaction(connection); diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java index 78f9bd402bc..d1d191624e9 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java @@ -278,6 +278,9 @@ private void getChanges2(ChangeEventSourceContext context, for (Pair entry : tabletPairList) { final String tabletId = entry.getValue(); offsetContext.initSourceInfo(tabletId, this.connectorConfig); + if(offsetContext.lsn(tabletId).equals(new OpId(0,0,null,0,0))){ + offsetContext.getSourceInfo(tabletId).updateLastCommit(offsetContext.lastCompletelyProcessedLsn()); + } } LOGGER.debug("The init tabletSourceInfo is " + offsetContext.getTabletSourceInfo()); From 07ead05b4f033f4fa7774d53145f790fafe389bd Mon Sep 17 00:00:00 2001 From: Isha Amoncar Date: Tue, 1 Mar 2022 09:07:24 +0000 Subject: [PATCH 2/2] Addressed review comments --- .../yugabytedb/YugabyteDBOffsetContext.java | 14 ++++++++++---- .../YugabyteDBStreamingChangeEventSource.java | 4 +--- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java index 40916aa9033..ca1581a5f19 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java @@ -336,7 +336,13 @@ private String readOptionalString(Map offset, String key) { public YugabyteDBOffsetContext load(Map offset) { LOGGER.debug("The offset being loaded in YugabyteDBOffsetContext.. " + offset); - OpId opid1 = OpId.valueOf((String) offset.get(YugabyteDBOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY)); + OpId lastCompletelyProcessedLsn; + if(offset != null){ + lastCompletelyProcessedLsn = OpId.valueOf((String) offset.get(YugabyteDBOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY)); + } + else{ + lastCompletelyProcessedLsn = new OpId(0, 0, null, 0, 0); + } /* * final OpId lsn = OpId.valueOf(readOptionalString(offset, SourceInfo.LSN_KEY)); * final OpId lastCompletelyProcessedLsn = OpId.valueOf(readOptionalString(offset, @@ -358,9 +364,9 @@ public YugabyteDBOffsetContext load(Map offset) { */ return new YugabyteDBOffsetContext(connectorConfig, - opid1, - opid1, - opid1, + lastCompletelyProcessedLsn, + lastCompletelyProcessedLsn, + lastCompletelyProcessedLsn, "txId", Instant.MIN, false, false, TransactionContext.load(offset), SignalBasedIncrementalSnapshotContext.load(offset)); diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java index 2fb4c517fc1..41a4c8a7813 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java @@ -278,9 +278,7 @@ private void getChanges2(ChangeEventSourceContext context, for (Pair entry : tabletPairList) { final String tabletId = entry.getValue(); offsetContext.initSourceInfo(tabletId, this.connectorConfig); - if(offsetContext.lsn(tabletId).equals(new OpId(0,0,null,0,0))){ - offsetContext.getSourceInfo(tabletId).updateLastCommit(offsetContext.lastCompletelyProcessedLsn()); - } + offsetContext.getSourceInfo(tabletId).updateLastCommit(offsetContext.lastCompletelyProcessedLsn()); } LOGGER.debug("The init tabletSourceInfo is " + offsetContext.getTabletSourceInfo());