Skip to content

Commit

Permalink
Add commit_time to the checkpoint in case of consistent streaming (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
vaibhav-yb authored Jul 19, 2023
1 parent ad82d71 commit ff44f79
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.debezium.connector.yugabytedb.connection.OpId;
import io.debezium.connector.yugabytedb.connection.ReplicationConnection;
import io.debezium.connector.yugabytedb.connection.ReplicationMessage;
import io.debezium.connector.yugabytedb.connection.ReplicationMessage.Operation;
import io.debezium.connector.yugabytedb.connection.YugabyteDBConnection;
import io.debezium.connector.yugabytedb.connection.pgproto.YbProtoReplicationMessage;
import io.debezium.connector.yugabytedb.consistent.Merger;
Expand Down Expand Up @@ -239,7 +240,7 @@ protected void getChanges2(ChangeEventSourceContext context, YBPartition ybParti
response.getIndex(),
response.getKey(),
response.getWriteId(),
response.getSnapshotTime());
response.getResp().getSafeHybridTime());
offsetContext.updateWalPosition(part, finalOpid);
offsetContext.updateWalSegmentIndex(part, response.getWalSegmentIndex());
LOGGER.debug("The final opid for tablet {} is {}", part.getTabletId(), finalOpid);
Expand Down Expand Up @@ -323,7 +324,7 @@ private void dispatchMessage(YugabyteDBOffsetContext offsetContext, Map<String,
record.getFromOpId().getIndex(),
record.getFromOpId().getWriteIdKey().toByteArray(),
record.getFromOpId().getWriteId(),
snapshotTime);
record.getRowMessage().getCommitTime() - 1);

if (message.isLastEventForLsn()) {
lastCompletelyProcessedLsn = lsn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ protected void getChanges2(ChangeEventSourceContext context,
record.getFromOpId().getIndex(),
record.getFromOpId().getWriteIdKey().toByteArray(),
record.getFromOpId().getWriteId(),
record.getRowMessage().getCommitTime());
record.getRowMessage().getCommitTime() - 1);

if (message.isLastEventForLsn()) {
lastCompletelyProcessedLsn = lsn;
Expand Down

0 comments on commit ff44f79

Please sign in to comment.