Skip to content

Commit

Permalink
Explicit checkpoint wait on Tablet Split is now based on LastRecordCh…
Browse files Browse the repository at this point in the history
…eckpoint (#246)

We will populate the last seen record's checkpoint in the "lastRecordCheckpoint" field. This is meant to have the explicit checkpoint details of the last seen record from any response (i,e the from_op_id and the record’s commit_time). We will try to update this value in every GetChanges call.

After “lastRecordCheckpoint” is updated as needed, we will then see the response checkpoint from the current GetChanges response. If the response checkpoint is higher :
1. We will store the response’s checkpoint, if the response’s checkpoint is higher than “lastReadRecordCheckpoint”
2. We will add this tablet to a wait-list, where we will first wait until the ExplicitCheckpoint of the tablet is equal to: “lastRecordCheckpoint”.
3. After the ExplicitCheckpoint was equal to “lastRecordCheckpoint”, then we can update the ExplicitCheckpoint value of the tablet to the response’s checkpoint i.e directly update ExplicitCheckpoint in the “tabletToExplicitCheckpoint“ map.
This will take care of updating the ExplicitCheckpoint in cases of NoOp messages in the RAFT WAL , and also take care of tablets splitting right after a previous split in which case there will be no records to checkpoint.
  • Loading branch information
Adithya Bharadwaj authored Jul 13, 2023
1 parent 1b028fb commit 3179462
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 28 deletions.
19 changes: 11 additions & 8 deletions src/main/java/io/debezium/connector/yugabytedb/SourceInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public final class SourceInfo extends BaseSourceInfo {
private final String dbName;

private OpId lsn;
private OpId lastCommitLsn;
private OpId lastRecordCheckpoint;
private String txId;
private Instant timestamp;
private String schemaName;
Expand All @@ -63,7 +63,7 @@ protected SourceInfo(YugabyteDBConnectorConfig connectorConfig) {
protected SourceInfo(YugabyteDBConnectorConfig connectorConfig, OpId lastCommitLsn) {
super(connectorConfig);
this.dbName = connectorConfig.databaseName();
this.lastCommitLsn = lastCommitLsn;
this.lastRecordCheckpoint = lastCommitLsn;
this.lsn = lastCommitLsn;
}

Expand Down Expand Up @@ -105,8 +105,7 @@ protected SourceInfo update(YBPartition partition, OpId lsn, long commitTime, St
* Updates the source with the LSN of the last committed transaction.
*/
protected SourceInfo updateLastCommit(OpId lsn) {
this.lastCommitLsn = lsn;
this.lsn = lsn;
this.lastRecordCheckpoint = lsn;
return this;
}

Expand All @@ -125,10 +124,14 @@ public OpId lsn() {
return this.lsn;
}

public OpId lastRecordCheckpoint() {
return lastRecordCheckpoint;
}

public String sequence() {
List<String> sequence = new ArrayList<String>(2);
String lastCommitLsn = (this.lastCommitLsn != null)
? this.lastCommitLsn.toSerString()
String lastCommitLsn = (this.lastRecordCheckpoint != null)
? this.lastRecordCheckpoint.toSerString()
: null;
String lsn = (this.lsn != null)
? this.lsn.toSerString()
Expand Down Expand Up @@ -197,8 +200,8 @@ public String toString() {
if (txId != null) {
sb.append(", txId=").append(txId);
}
if (lastCommitLsn != null) {
sb.append(", lastCommitLsn=").append(lastCommitLsn);
if (lastRecordCheckpoint != null) {
sb.append(", lastCommitLsn=").append(lastRecordCheckpoint);
}
if (timestamp != null) {
sb.append(", timestamp=").append(timestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class YugabyteDBOffsetContext implements OffsetContext {

private static final Logger LOGGER =
LoggerFactory.getLogger(YugabyteDBSnapshotChangeEventSource.class);

// The two maps tabletSourceInfo and fromLsn are used to store offsets. However, there are
// differences between the offsets they store:
// tabletSourceInfo - this has the offset for each processed record and thus these offsets are
Expand Down Expand Up @@ -254,6 +254,7 @@ public void updateRecordPosition(YBPartition partition, OpId lsn,
}

info.update(partition, lsn, commitTime, txId, tableId, recordTime);
info.updateLastCommit(lsn);
this.tabletSourceInfo.put(partition.getId(), info);
}

Expand All @@ -276,6 +277,7 @@ OpId lsn(YBPartition partition) {
return this.fromLsn.getOrDefault(partition.getId(), streamingStartLsn());
}


/**
* If a previous OpId is null then we want the server to send the snapshot from the
* beginning. Requesting from the term -1, index -1 and empty key would indicate
Expand Down Expand Up @@ -375,7 +377,7 @@ public YugabyteDBOffsetContext load(Map<String, ?> offset) {
* final OpId lastCommitLsn = OpId.valueOf(readOptionalString(offset,
* LAST_COMPLETELY_PROCESSED_LSN_KEY));
* final String txId = readOptionalString(offset, SourceInfo.TXID_KEY);
*
*
* final Instant useconds = Conversions.toInstantFromMicros((Long) offset
* .get(SourceInfo.TIMESTAMP_USEC_KEY));
* final boolean snapshot = (boolean) ((Map<String, Object>) offset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
Expand Down Expand Up @@ -329,6 +328,9 @@ protected void getChanges2(ChangeEventSourceContext context,
// based on just their tablet IDs - pass false as the 'colocated' flag to enforce the same.
YBPartition p = new YBPartition(entry.getKey(), entry.getValue(), false /* colocated */);
offsetContext.initSourceInfo(p, this.connectorConfig, opId);
// We can initialise the explicit checkpoint for this tablet to the value returned by
// the cdc_service through the 'GetTabletListToPollForCDC' API
tabletToExplicitCheckpoint.put(p.getId(), opId.toCdcSdkCheckpoint());
schemaNeeded.put(p.getId(), Boolean.TRUE);
}

Expand Down Expand Up @@ -397,7 +399,9 @@ protected void getChanges2(ChangeEventSourceContext context,
// checkpoint is the same as from_op_id, if yes then handle the tablet
// for split.
CdcSdkCheckpoint explicitCheckpoint = tabletToExplicitCheckpoint.get(part.getId());
if (explicitCheckpoint != null && cp.equals(explicitCheckpoint)) {
OpId lastRecordCheckpoint = offsetContext.getSourceInfo(part).lastRecordCheckpoint();

if (explicitCheckpoint != null && (lastRecordCheckpoint == null || lastRecordCheckpoint.isLesserThanOrEqualTo(explicitCheckpoint))) {
// At this position, we know we have received a callback for split tablet
// handle tablet split and delete the tablet from the waiting list.

Expand Down Expand Up @@ -449,11 +453,16 @@ protected void getChanges2(ChangeEventSourceContext context,
LOGGER.debug("Requesting schema for tablet: {}", tabletId);
}

CdcSdkCheckpoint explicitCheckpoint = null;
if (taskContext.shouldEnableExplicitCheckpointing()) {
explicitCheckpoint = tabletToExplicitCheckpoint.get(part.getId());
}

try {
response = this.syncClient.getChangesCDCSDK(
table, streamId, tabletId, cp.getTerm(), cp.getIndex(), cp.getKey(),
cp.getWrite_id(), cp.getTime(), schemaNeeded.get(part.getId()),
taskContext.shouldEnableExplicitCheckpointing() ? tabletToExplicitCheckpoint.get(part.getId()) : null,
explicitCheckpoint,
tabletSafeTime.getOrDefault(part.getId(), cp.getTime()), offsetContext.getWalSegmentIndex(part));

tabletSafeTime.put(part.getId(), response.getResp().getSafeHybridTime());
Expand All @@ -467,20 +476,23 @@ protected void getChanges2(ChangeEventSourceContext context,
}

if (taskContext.shouldEnableExplicitCheckpointing()) {
OpId lastRecordCheckpoint = offsetContext.getSourceInfo(part).lastRecordCheckpoint();

// If explicit checkpointing is enabled then we should check if we have the explicit checkpoint
// the same as from_op_id, if yes then handle tablet split directly, if not, add the partition ID
// (table.tablet) to be processed later.
CdcSdkCheckpoint explicitCheckpoint = tabletToExplicitCheckpoint.get(part.getId());
if (explicitCheckpoint != null && cp.equals(explicitCheckpoint)) {
LOGGER.info("Explicit checkpoint same as from_op_id, handling tablet split immediately for partition {}, explicit checkpoint {}:{} from_op_id: {}.{}",
part.getId(), explicitCheckpoint.getTerm(), explicitCheckpoint.getIndex(), cp.getTerm(), cp.getIndex());
LOGGER.info("");
if (explicitCheckpoint != null && (lastRecordCheckpoint == null || lastRecordCheckpoint.isLesserThanOrEqualTo(explicitCheckpoint))) {
LOGGER.info("Explicit checkpoint same as last seen record's checkpoint, handling tablet split immediately for partition {}, explicit checkpoint {}:{}:{} lastRecordCheckpoint: {}.{}.{}",
part.getId(), explicitCheckpoint.getTerm(), explicitCheckpoint.getIndex(), explicitCheckpoint.getTime(), lastRecordCheckpoint.getTerm(), lastRecordCheckpoint.getIndex(), lastRecordCheckpoint.getTime());

handleTabletSplit(cdcException, tabletPairList, offsetContext, streamId, schemaNeeded);
} else {
// Add the tablet for being processed later, this will mark the tablet as locked. There is a chance that explicit checkpoint may
// be null, in that case, just to avoid NullPointerException in the log, simply log a null value.
final String explicitString = (explicitCheckpoint == null) ? null : (explicitCheckpoint.getTerm() + "." + explicitCheckpoint.getIndex());
LOGGER.info("Adding partition {} to wait-list since the explicit checkpoint ({}) and from_op_id ({}.{}) are not the same",
part.getId(), explicitString, cp.getTerm(), cp.getIndex());
final String explicitString = (explicitCheckpoint == null) ? null : (explicitCheckpoint.getTerm() + "." + explicitCheckpoint.getIndex() + ":" + explicitCheckpoint.getTime());
LOGGER.info("Adding partition {} to wait-list since the explicit checkpoint ({}) and last seen record's checkpoint ({}.{}.{}) are not the same",
part.getId(), explicitString, lastRecordCheckpoint.getTerm(), lastRecordCheckpoint.getIndex(), lastRecordCheckpoint.getTime());
splitTabletsWaitingForCallback.add(part.getId());
}
} else {
Expand Down Expand Up @@ -522,6 +534,8 @@ protected void getChanges2(ChangeEventSourceContext context,
continue;
}

// TODO: Rename to Checkpoint, since OpId is misleading.
// This is the checkpoint which will be stored in Kafka and will be used for explicit checkpointing.
final OpId lsn = new OpId(record.getFromOpId().getTerm(),
record.getFromOpId().getIndex(),
record.getFromOpId().getWriteIdKey().toByteArray(),
Expand Down Expand Up @@ -667,10 +681,21 @@ else if (message.isDDLMessage()) {
response.getIndex(),
response.getKey(),
response.getWriteId(),
response.getSnapshotTime());
response.getResp().getSafeHybridTime());
offsetContext.updateWalPosition(part, finalOpid);
offsetContext.updateWalSegmentIndex(part, response.getResp().getWalSegmentIndex());

// In cases where there is no transactions on the server, the response checkpoint can still move ahead and we should
// also move the explicit checkpoint forward, given that it was already greater than the lsn of the last seen valid record.
// Otherwise the explicit checkpoint can get stuck at older values, and upon connector restart
// we will resume from an older point than necessary.
if (taskContext.shouldEnableExplicitCheckpointing()) {
OpId lastRecordCheckpoint = offsetContext.getSourceInfo(part).lastRecordCheckpoint();
if (lastRecordCheckpoint == null || lastRecordCheckpoint.isLesserThanOrEqualTo(explicitCheckpoint)) {
tabletToExplicitCheckpoint.put(part.getId(), finalOpid.toCdcSdkCheckpoint());
}
}

LOGGER.debug("The final opid for tablet {} is {}", part.getId(), finalOpid);
}
// Reset the retry count, because if flow reached at this point, it means that the connection
Expand Down Expand Up @@ -902,8 +927,9 @@ private void addTabletIfNotPresent(List<Pair<String,String>> tabletPairList,
// is not possible on colocated tables, it is safe to assume that the tablets here
// would be all non-colocated.
YBPartition p = new YBPartition(tableId, tabletId, false /* colocated */);
offsetContext.initSourceInfo(p, this.connectorConfig,
OpId.from(pair.getCdcSdkCheckpoint()));
OpId checkpoint = OpId.from(pair.getCdcSdkCheckpoint());
offsetContext.initSourceInfo(p, this.connectorConfig, checkpoint);
tabletToExplicitCheckpoint.put(p.getId(), checkpoint.toCdcSdkCheckpoint());

LOGGER.info("Initialized offset context for tablet {} with OpId {}", tabletId, OpId.from(pair.getCdcSdkCheckpoint()));

Expand Down Expand Up @@ -947,6 +973,9 @@ protected void handleTabletSplit(String splitTabletId,
// Remove the corresponding entry to indicate that we don't need the schema now.
schemaNeeded.remove(entryToBeDeleted.getValue());

// Remove the entry for the tablet which has been split from: 'tabletToExplicitCheckpoint'.
tabletToExplicitCheckpoint.remove(splitTabletId);

// Log a warning if the element cannot be removed from the list.
if (!removeSuccessful) {
LOGGER.warn("Failed to remove the entry table {} - tablet {} from tablet pair list after split, will try once again", entryToBeDeleted.getKey(), entryToBeDeleted.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,13 @@ public CdcSdkCheckpoint toCdcSdkCheckpoint() {
}

/**
* Verify the equality of OpId with the given {@link CdcSdkCheckpoint}
* Verify that the OpId is lesser than or equal to the given {@link CdcSdkCheckpoint}
* @param checkpoint
* @return true if the term and index of this {@link OpId} are equal to the ones in
* {@link CdcSdkCheckpoint}
* @return true if the term and index of time of this {@link OpId} are lesser than or equal to
* the corresponding values in {@link CdcSdkCheckpoint}
*/
public boolean equals(CdcSdkCheckpoint checkpoint) {
return (this.term == checkpoint.getTerm()) && (this.index == checkpoint.getIndex());
public boolean isLesserThanOrEqualTo(CdcSdkCheckpoint checkpoint) {
return (checkpoint != null && this.term <= checkpoint.getTerm()
&& this.index <= checkpoint.getIndex() && this.time <= checkpoint.getTime());
}
}

0 comments on commit 3179462

Please sign in to comment.