From ff44f799a58b9172167de914211c6fc75d7829ac Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha <34186745+vaibhav-yb@users.noreply.github.com> Date: Wed, 19 Jul 2023 17:56:40 +0530 Subject: [PATCH] Add `commit_time` to the checkpoint in case of consistent streaming (#250) --- .../yugabytedb/YugabyteDBConsistentStreamingSource.java | 5 +++-- .../yugabytedb/YugabyteDBStreamingChangeEventSource.java | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConsistentStreamingSource.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConsistentStreamingSource.java index 9021432e..7104dc00 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConsistentStreamingSource.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConsistentStreamingSource.java @@ -5,6 +5,7 @@ import io.debezium.connector.yugabytedb.connection.OpId; import io.debezium.connector.yugabytedb.connection.ReplicationConnection; import io.debezium.connector.yugabytedb.connection.ReplicationMessage; +import io.debezium.connector.yugabytedb.connection.ReplicationMessage.Operation; import io.debezium.connector.yugabytedb.connection.YugabyteDBConnection; import io.debezium.connector.yugabytedb.connection.pgproto.YbProtoReplicationMessage; import io.debezium.connector.yugabytedb.consistent.Merger; @@ -239,7 +240,7 @@ protected void getChanges2(ChangeEventSourceContext context, YBPartition ybParti response.getIndex(), response.getKey(), response.getWriteId(), - response.getSnapshotTime()); + response.getResp().getSafeHybridTime()); offsetContext.updateWalPosition(part, finalOpid); offsetContext.updateWalSegmentIndex(part, response.getWalSegmentIndex()); LOGGER.debug("The final opid for tablet {} is {}", part.getTabletId(), finalOpid); @@ -323,7 +324,7 @@ private void dispatchMessage(YugabyteDBOffsetContext offsetContext, Map