From 3179462041f34510ed304fac4d20b03f023b818e Mon Sep 17 00:00:00 2001 From: Adithya Bharadwaj Date: Thu, 13 Jul 2023 15:22:06 +0530 Subject: [PATCH] Explicit checkpoint wait on Tablet Split is now based on LastRecordCheckpoint (#246) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We will populate the last seen record's checkpoint in the "lastRecordCheckpoint" field. This is meant to have the explicit checkpoint details of the last seen record from any response (i,e the from_op_id and the record’s commit_time). We will try to update this value in every GetChanges call. After “lastRecordCheckpoint” is updated as needed, we will then see the response checkpoint from the current GetChanges response. If the response checkpoint is higher : 1. We will store the response’s checkpoint, if the response’s checkpoint is higher than “lastReadRecordCheckpoint” 2. We will add this tablet to a wait-list, where we will first wait until the ExplicitCheckpoint of the tablet is equal to: “lastRecordCheckpoint”. 3. After the ExplicitCheckpoint was equal to “lastRecordCheckpoint”, then we can update the ExplicitCheckpoint value of the tablet to the response’s checkpoint i.e directly update ExplicitCheckpoint in the “tabletToExplicitCheckpoint“ map. This will take care of updating the ExplicitCheckpoint in cases of NoOp messages in the RAFT WAL , and also take care of tablets splitting right after a previous split in which case there will be no records to checkpoint. --- .../connector/yugabytedb/SourceInfo.java | 19 ++++--- .../yugabytedb/YugabyteDBOffsetContext.java | 6 +- .../YugabyteDBStreamingChangeEventSource.java | 55 ++++++++++++++----- .../connector/yugabytedb/connection/OpId.java | 11 ++-- 4 files changed, 63 insertions(+), 28 deletions(-) diff --git a/src/main/java/io/debezium/connector/yugabytedb/SourceInfo.java b/src/main/java/io/debezium/connector/yugabytedb/SourceInfo.java index 005d50f9..28bd05ee 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/SourceInfo.java +++ b/src/main/java/io/debezium/connector/yugabytedb/SourceInfo.java @@ -45,7 +45,7 @@ public final class SourceInfo extends BaseSourceInfo { private final String dbName; private OpId lsn; - private OpId lastCommitLsn; + private OpId lastRecordCheckpoint; private String txId; private Instant timestamp; private String schemaName; @@ -63,7 +63,7 @@ protected SourceInfo(YugabyteDBConnectorConfig connectorConfig) { protected SourceInfo(YugabyteDBConnectorConfig connectorConfig, OpId lastCommitLsn) { super(connectorConfig); this.dbName = connectorConfig.databaseName(); - this.lastCommitLsn = lastCommitLsn; + this.lastRecordCheckpoint = lastCommitLsn; this.lsn = lastCommitLsn; } @@ -105,8 +105,7 @@ protected SourceInfo update(YBPartition partition, OpId lsn, long commitTime, St * Updates the source with the LSN of the last committed transaction. */ protected SourceInfo updateLastCommit(OpId lsn) { - this.lastCommitLsn = lsn; - this.lsn = lsn; + this.lastRecordCheckpoint = lsn; return this; } @@ -125,10 +124,14 @@ public OpId lsn() { return this.lsn; } + public OpId lastRecordCheckpoint() { + return lastRecordCheckpoint; + } + public String sequence() { List sequence = new ArrayList(2); - String lastCommitLsn = (this.lastCommitLsn != null) - ? this.lastCommitLsn.toSerString() + String lastCommitLsn = (this.lastRecordCheckpoint != null) + ? this.lastRecordCheckpoint.toSerString() : null; String lsn = (this.lsn != null) ? this.lsn.toSerString() @@ -197,8 +200,8 @@ public String toString() { if (txId != null) { sb.append(", txId=").append(txId); } - if (lastCommitLsn != null) { - sb.append(", lastCommitLsn=").append(lastCommitLsn); + if (lastRecordCheckpoint != null) { + sb.append(", lastCommitLsn=").append(lastRecordCheckpoint); } if (timestamp != null) { sb.append(", timestamp=").append(timestamp); diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java index 523092e1..2e133302 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java @@ -38,7 +38,7 @@ public class YugabyteDBOffsetContext implements OffsetContext { private static final Logger LOGGER = LoggerFactory.getLogger(YugabyteDBSnapshotChangeEventSource.class); - + // The two maps tabletSourceInfo and fromLsn are used to store offsets. However, there are // differences between the offsets they store: // tabletSourceInfo - this has the offset for each processed record and thus these offsets are @@ -254,6 +254,7 @@ public void updateRecordPosition(YBPartition partition, OpId lsn, } info.update(partition, lsn, commitTime, txId, tableId, recordTime); + info.updateLastCommit(lsn); this.tabletSourceInfo.put(partition.getId(), info); } @@ -276,6 +277,7 @@ OpId lsn(YBPartition partition) { return this.fromLsn.getOrDefault(partition.getId(), streamingStartLsn()); } + /** * If a previous OpId is null then we want the server to send the snapshot from the * beginning. Requesting from the term -1, index -1 and empty key would indicate @@ -375,7 +377,7 @@ public YugabyteDBOffsetContext load(Map offset) { * final OpId lastCommitLsn = OpId.valueOf(readOptionalString(offset, * LAST_COMPLETELY_PROCESSED_LSN_KEY)); * final String txId = readOptionalString(offset, SourceInfo.TXID_KEY); - * + * * final Instant useconds = Conversions.toInstantFromMicros((Long) offset * .get(SourceInfo.TIMESTAMP_USEC_KEY)); * final boolean snapshot = (boolean) ((Map) offset) diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java index 57cf071a..dd03344c 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java @@ -36,7 +36,6 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import io.debezium.connector.base.ChangeEventQueue; import io.debezium.pipeline.DataChangeEvent; @@ -329,6 +328,9 @@ protected void getChanges2(ChangeEventSourceContext context, // based on just their tablet IDs - pass false as the 'colocated' flag to enforce the same. YBPartition p = new YBPartition(entry.getKey(), entry.getValue(), false /* colocated */); offsetContext.initSourceInfo(p, this.connectorConfig, opId); + // We can initialise the explicit checkpoint for this tablet to the value returned by + // the cdc_service through the 'GetTabletListToPollForCDC' API + tabletToExplicitCheckpoint.put(p.getId(), opId.toCdcSdkCheckpoint()); schemaNeeded.put(p.getId(), Boolean.TRUE); } @@ -397,7 +399,9 @@ protected void getChanges2(ChangeEventSourceContext context, // checkpoint is the same as from_op_id, if yes then handle the tablet // for split. CdcSdkCheckpoint explicitCheckpoint = tabletToExplicitCheckpoint.get(part.getId()); - if (explicitCheckpoint != null && cp.equals(explicitCheckpoint)) { + OpId lastRecordCheckpoint = offsetContext.getSourceInfo(part).lastRecordCheckpoint(); + + if (explicitCheckpoint != null && (lastRecordCheckpoint == null || lastRecordCheckpoint.isLesserThanOrEqualTo(explicitCheckpoint))) { // At this position, we know we have received a callback for split tablet // handle tablet split and delete the tablet from the waiting list. @@ -449,11 +453,16 @@ protected void getChanges2(ChangeEventSourceContext context, LOGGER.debug("Requesting schema for tablet: {}", tabletId); } + CdcSdkCheckpoint explicitCheckpoint = null; + if (taskContext.shouldEnableExplicitCheckpointing()) { + explicitCheckpoint = tabletToExplicitCheckpoint.get(part.getId()); + } + try { response = this.syncClient.getChangesCDCSDK( table, streamId, tabletId, cp.getTerm(), cp.getIndex(), cp.getKey(), cp.getWrite_id(), cp.getTime(), schemaNeeded.get(part.getId()), - taskContext.shouldEnableExplicitCheckpointing() ? tabletToExplicitCheckpoint.get(part.getId()) : null, + explicitCheckpoint, tabletSafeTime.getOrDefault(part.getId(), cp.getTime()), offsetContext.getWalSegmentIndex(part)); tabletSafeTime.put(part.getId(), response.getResp().getSafeHybridTime()); @@ -467,20 +476,23 @@ protected void getChanges2(ChangeEventSourceContext context, } if (taskContext.shouldEnableExplicitCheckpointing()) { + OpId lastRecordCheckpoint = offsetContext.getSourceInfo(part).lastRecordCheckpoint(); + // If explicit checkpointing is enabled then we should check if we have the explicit checkpoint // the same as from_op_id, if yes then handle tablet split directly, if not, add the partition ID // (table.tablet) to be processed later. - CdcSdkCheckpoint explicitCheckpoint = tabletToExplicitCheckpoint.get(part.getId()); - if (explicitCheckpoint != null && cp.equals(explicitCheckpoint)) { - LOGGER.info("Explicit checkpoint same as from_op_id, handling tablet split immediately for partition {}, explicit checkpoint {}:{} from_op_id: {}.{}", - part.getId(), explicitCheckpoint.getTerm(), explicitCheckpoint.getIndex(), cp.getTerm(), cp.getIndex()); + LOGGER.info(""); + if (explicitCheckpoint != null && (lastRecordCheckpoint == null || lastRecordCheckpoint.isLesserThanOrEqualTo(explicitCheckpoint))) { + LOGGER.info("Explicit checkpoint same as last seen record's checkpoint, handling tablet split immediately for partition {}, explicit checkpoint {}:{}:{} lastRecordCheckpoint: {}.{}.{}", + part.getId(), explicitCheckpoint.getTerm(), explicitCheckpoint.getIndex(), explicitCheckpoint.getTime(), lastRecordCheckpoint.getTerm(), lastRecordCheckpoint.getIndex(), lastRecordCheckpoint.getTime()); + handleTabletSplit(cdcException, tabletPairList, offsetContext, streamId, schemaNeeded); } else { // Add the tablet for being processed later, this will mark the tablet as locked. There is a chance that explicit checkpoint may // be null, in that case, just to avoid NullPointerException in the log, simply log a null value. - final String explicitString = (explicitCheckpoint == null) ? null : (explicitCheckpoint.getTerm() + "." + explicitCheckpoint.getIndex()); - LOGGER.info("Adding partition {} to wait-list since the explicit checkpoint ({}) and from_op_id ({}.{}) are not the same", - part.getId(), explicitString, cp.getTerm(), cp.getIndex()); + final String explicitString = (explicitCheckpoint == null) ? null : (explicitCheckpoint.getTerm() + "." + explicitCheckpoint.getIndex() + ":" + explicitCheckpoint.getTime()); + LOGGER.info("Adding partition {} to wait-list since the explicit checkpoint ({}) and last seen record's checkpoint ({}.{}.{}) are not the same", + part.getId(), explicitString, lastRecordCheckpoint.getTerm(), lastRecordCheckpoint.getIndex(), lastRecordCheckpoint.getTime()); splitTabletsWaitingForCallback.add(part.getId()); } } else { @@ -522,6 +534,8 @@ protected void getChanges2(ChangeEventSourceContext context, continue; } + // TODO: Rename to Checkpoint, since OpId is misleading. + // This is the checkpoint which will be stored in Kafka and will be used for explicit checkpointing. final OpId lsn = new OpId(record.getFromOpId().getTerm(), record.getFromOpId().getIndex(), record.getFromOpId().getWriteIdKey().toByteArray(), @@ -667,10 +681,21 @@ else if (message.isDDLMessage()) { response.getIndex(), response.getKey(), response.getWriteId(), - response.getSnapshotTime()); + response.getResp().getSafeHybridTime()); offsetContext.updateWalPosition(part, finalOpid); offsetContext.updateWalSegmentIndex(part, response.getResp().getWalSegmentIndex()); + // In cases where there is no transactions on the server, the response checkpoint can still move ahead and we should + // also move the explicit checkpoint forward, given that it was already greater than the lsn of the last seen valid record. + // Otherwise the explicit checkpoint can get stuck at older values, and upon connector restart + // we will resume from an older point than necessary. + if (taskContext.shouldEnableExplicitCheckpointing()) { + OpId lastRecordCheckpoint = offsetContext.getSourceInfo(part).lastRecordCheckpoint(); + if (lastRecordCheckpoint == null || lastRecordCheckpoint.isLesserThanOrEqualTo(explicitCheckpoint)) { + tabletToExplicitCheckpoint.put(part.getId(), finalOpid.toCdcSdkCheckpoint()); + } + } + LOGGER.debug("The final opid for tablet {} is {}", part.getId(), finalOpid); } // Reset the retry count, because if flow reached at this point, it means that the connection @@ -902,8 +927,9 @@ private void addTabletIfNotPresent(List> tabletPairList, // is not possible on colocated tables, it is safe to assume that the tablets here // would be all non-colocated. YBPartition p = new YBPartition(tableId, tabletId, false /* colocated */); - offsetContext.initSourceInfo(p, this.connectorConfig, - OpId.from(pair.getCdcSdkCheckpoint())); + OpId checkpoint = OpId.from(pair.getCdcSdkCheckpoint()); + offsetContext.initSourceInfo(p, this.connectorConfig, checkpoint); + tabletToExplicitCheckpoint.put(p.getId(), checkpoint.toCdcSdkCheckpoint()); LOGGER.info("Initialized offset context for tablet {} with OpId {}", tabletId, OpId.from(pair.getCdcSdkCheckpoint())); @@ -947,6 +973,9 @@ protected void handleTabletSplit(String splitTabletId, // Remove the corresponding entry to indicate that we don't need the schema now. schemaNeeded.remove(entryToBeDeleted.getValue()); + // Remove the entry for the tablet which has been split from: 'tabletToExplicitCheckpoint'. + tabletToExplicitCheckpoint.remove(splitTabletId); + // Log a warning if the element cannot be removed from the list. if (!removeSuccessful) { LOGGER.warn("Failed to remove the entry table {} - tablet {} from tablet pair list after split, will try once again", entryToBeDeleted.getKey(), entryToBeDeleted.getValue()); diff --git a/src/main/java/io/debezium/connector/yugabytedb/connection/OpId.java b/src/main/java/io/debezium/connector/yugabytedb/connection/OpId.java index 64196faa..97ebb1a4 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/connection/OpId.java +++ b/src/main/java/io/debezium/connector/yugabytedb/connection/OpId.java @@ -140,12 +140,13 @@ public CdcSdkCheckpoint toCdcSdkCheckpoint() { } /** - * Verify the equality of OpId with the given {@link CdcSdkCheckpoint} + * Verify that the OpId is lesser than or equal to the given {@link CdcSdkCheckpoint} * @param checkpoint - * @return true if the term and index of this {@link OpId} are equal to the ones in - * {@link CdcSdkCheckpoint} + * @return true if the term and index of time of this {@link OpId} are lesser than or equal to + * the corresponding values in {@link CdcSdkCheckpoint} */ - public boolean equals(CdcSdkCheckpoint checkpoint) { - return (this.term == checkpoint.getTerm()) && (this.index == checkpoint.getIndex()); + public boolean isLesserThanOrEqualTo(CdcSdkCheckpoint checkpoint) { + return (checkpoint != null && this.term <= checkpoint.getTerm() + && this.index <= checkpoint.getIndex() && this.time <= checkpoint.getTime()); } }