Skip to content

Commit

Permalink
[DBZ] Read offsets from Kafka with tablet splitting support (#351)
Browse files Browse the repository at this point in the history
  • Loading branch information
vaibhav-yb authored Sep 2, 2024
1 parent b450fa5 commit 81977a7
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.kafka.connect.source.SourceConnector;
Expand Down Expand Up @@ -45,6 +47,7 @@ public class YugabyteDBChangeEventSourceCoordinator extends ChangeEventSourceCoo
private final SlotState slotInfo;

private YugabyteDBSnapshotChangeEventSource snapshotSource;
private YugabyteDBStreamingChangeEventSource streamingChangeEventSource;

public YugabyteDBChangeEventSourceCoordinator(Offsets<YBPartition, YugabyteDBOffsetContext> previousOffsets,
ErrorHandler errorHandler,
Expand Down Expand Up @@ -153,6 +156,18 @@ protected void executeChangeEventSources(CdcSourceTaskContext taskContext,
}
}

@Override
protected void streamEvents(ChangeEventSourceContext context, YBPartition partition,
YugabyteDBOffsetContext offsetContext) throws InterruptedException {
initStreamEvents(partition, offsetContext);
LOGGER.info("Starting streaming");

this.streamingChangeEventSource = (YugabyteDBStreamingChangeEventSource) streamingSource;

streamingSource.execute(context, partition, offsetContext);
LOGGER.info("Finished streaming");
}

@Override
public void commitOffset(Map<String, ?> offset) {
if (this.snapshotSource == null) {
Expand All @@ -173,6 +188,32 @@ public void commitOffset(Map<String, ?> offset) {
}
}

/**
* @return the set of partitions i.e. {@link YBPartition} being in the streaming phase at a
* given point in time. If streamingChangeEventSource is null that means we are still in the
* snapshot phase and in that case it should be safe to return an {@link Optional#empty()}
* which should be handled by the caller of this method.
*/
public Optional<Set<YBPartition>> getPartitions() {
// There can be one window where the coordinator has not initialized streaming change event
// or the connector is still in snapshot phase (streaming source will not be initialized at
// that time) then we can return an empty optional.
// There's another small window during connector/task startup phase when the partitions
// being returned from the streaming source can be empty owing to the fact that it has not
// been populated yet. In that case, treat it as the streaming source itself has not been
// initialized.
if (this.streamingChangeEventSource == null
|| this.streamingChangeEventSource.getActivePartitionsBeingPolled().isEmpty()) {
LOGGER.debug("Returning empty optional for partition list");
return Optional.empty();
}

Optional<Set<YBPartition>> ybPartitions =
Optional.of(this.streamingChangeEventSource.getActivePartitionsBeingPolled());

return ybPartitions;
}

/**
* @return true if the connector is in snapshot phase, false otherwise
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,40 @@ Map<YBPartition, YugabyteDBOffsetContext> getPreviousOffsetss(
Offsets<YBPartition, YugabyteDBOffsetContext> getPreviousOffsetsFromProviderAndLoader(
Partition.Provider<YBPartition> provider,
OffsetContext.Loader<YugabyteDBOffsetContext> loader) {
Set<YBPartition> partitions = provider.getPartitions();
LOGGER.debug("The size of partitions is " + partitions.size());
/*
This method will be invoked at following 3 timings:
1. While initialising the task
2. Commit callback during snapshot phase
3. Commit callback during streaming phase
While initialising the task:
It is safe to return the partitions from provider since coordinator will be null at this
point and will have no information about the current set of partitions.
Snapshot phase:
The streaming change event source will be null at this stage, and we will get an
Optional.empty() and subsequently this method will call provider.getPartitions() to
get partitions which should be fine since there would be no tablet split during
snapshot phase. See YugabyteDBChangeEventCoordinator#getPartitions for more details.
Streaming phase:
We can rely on coordinator to leverage the streaming change event source and send us
the set of active partitions being polled which includes the dynamically created
partitions because of tablet splitting too. There can be a small window of time when
the partition list in streaming change event source will be empty and not populated,
so using the provider to get partitions is fine in this case as well.
*/
Set<YBPartition> partitions;

if (this.coordinator == null || this.coordinator.getPartitions().isEmpty()) {
// Coordinator can be null during task initialization, or it is snapshot phase or
// streaming initialization phase.
partitions = provider.getPartitions();
} else {
// Normal case with streaming running fine.
partitions = this.coordinator.getPartitions().get();
}

OffsetReader<YBPartition, YugabyteDBOffsetContext,
OffsetContext.Loader<YugabyteDBOffsetContext>> reader = new OffsetReader<>(
context.offsetStorageReader(), loader);
Expand Down Expand Up @@ -446,6 +478,7 @@ public void commit() throws InterruptedException {
*/
protected Map<String, ?> getHigherOffsets(Map<String, ?> offsets) {
if (this.ybOffset == null) {
LOGGER.debug("Returning original offsets since cached ybOffset is null");
return offsets;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ protected void getChanges2(ChangeEventSourceContext context, YBPartition ybParti
LOGGER.info("Processing consistent messages");

try (YBClient syncClient = YBClientUtils.getYbClient(this.connectorConfig)) {
// This tabletPairList has Pair<String, String> objects wherein the key is the table UUID
// and the value is tablet UUID
List<Pair<String, String>> tabletPairList = new ArrayList<>();

Map<String, YBTable> tableIdToTable = new HashMap<>();
Map<String, GetTabletListToPollForCDCResponse> tabletListResponse = new HashMap<>();
String streamId = connectorConfig.streamId();
Expand All @@ -67,7 +63,7 @@ protected void getChanges2(ChangeEventSourceContext context, YBPartition ybParti

GetTabletListToPollForCDCResponse resp =
YBClientUtils.getTabletListToPollForCDCWithRetry(table, tId, connectorConfig);
populateTableToTabletPairsForTask(tId, resp, tabletPairList);
populateTableToTabletPairsForTask(tId, resp);
tabletListResponse.put(tId, resp);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import io.debezium.connector.base.ChangeEventQueue;
Expand Down Expand Up @@ -101,6 +102,10 @@ public class YugabyteDBStreamingChangeEventSource implements
protected Set<String> splitTabletsWaitingForCallback;
protected List<HashPartition> partitionRanges;

// This tabletPairList has Pair<String, String> objects wherein the key is the table UUID
// and the value is tablet UUID
protected List<Pair<String, String>> tabletPairList;

public YugabyteDBStreamingChangeEventSource(YugabyteDBConnectorConfig connectorConfig, Snapshotter snapshotter,
YugabyteDBConnection connection, YugabyteDBEventDispatcher<TableId> dispatcher, ErrorHandler errorHandler, Clock clock,
YugabyteDBSchema schema, YugabyteDBTaskContext taskContext, ReplicationConnection replicationConnection,
Expand All @@ -124,6 +129,7 @@ public YugabyteDBStreamingChangeEventSource(YugabyteDBConnectorConfig connectorC
this.splitTabletsWaitingForCallback = new HashSet<>();
this.filters = new Filters(connectorConfig);
this.partitionRanges = new ArrayList<>();
this.tabletPairList = new CopyOnWriteArrayList<>();

if (TEST_TRACK_EXPLICIT_CHECKPOINTS) {
TEST_explicitCheckpoints = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -307,8 +313,7 @@ protected void markNoSnapshotNeeded(YBClient syncClient, YBTable ybTable, String
* @param response of type {@link GetTabletListToPollForCDCResponse}
* @param tabletPairList a list of {@link Pair} to be populated where each pair is {@code <tableId, tabletId>}
*/
protected void populateTableToTabletPairsForTask(String tableId, GetTabletListToPollForCDCResponse response,
List<Pair<String, String>> tabletPairList) {
protected void populateTableToTabletPairsForTask(String tableId, GetTabletListToPollForCDCResponse response) {
// Verify that the partitionRanges are already populated.
Objects.requireNonNull(partitionRanges);
assert !partitionRanges.isEmpty();
Expand All @@ -319,7 +324,7 @@ protected void populateTableToTabletPairsForTask(String tableId, GetTabletListTo

for (HashPartition parent : partitionRanges) {
if (parent.containsPartition(hp)) {
tabletPairList.add(new ImmutablePair<>(hp.getTableId(), hp.getTabletId()));
this.tabletPairList.add(new ImmutablePair<>(hp.getTableId(), hp.getTabletId()));
}
}
}
Expand All @@ -332,10 +337,6 @@ protected void getChanges2(ChangeEventSourceContext context,
throws Exception {
LOGGER.info("Processing messages");
try (YBClient syncClient = YBClientUtils.getYbClient(this.connectorConfig)) {
// This tabletPairList has Pair<String, String> objects wherein the key is the table UUID
// and the value is tablet UUID
List<Pair<String, String>> tabletPairList = new ArrayList<>();

Map<String, YBTable> tableIdToTable = new HashMap<>();
Map<String, GetTabletListToPollForCDCResponse> tabletListResponse = new HashMap<>();
String streamId = connectorConfig.streamId();
Expand All @@ -357,7 +358,7 @@ protected void getChanges2(ChangeEventSourceContext context,

// TODO: One optimisation where we initialise the offset context here itself
// without storing the GetTabletListToPollForCDCResponse
populateTableToTabletPairsForTask(tId, resp, tabletPairList);
populateTableToTabletPairsForTask(tId, resp);
LOGGER.info("Table: {} with number of tablets {}", tId, resp.getTabletCheckpointPairListSize());
tabletListResponse.put(tId, resp);
}
Expand Down Expand Up @@ -863,6 +864,20 @@ protected void setCheckpointWithGetChanges(YBClient syncClient, YBTable ybTable,
}
}

public Set<YBPartition> getActivePartitionsBeingPolled() {
Set<YBPartition> partitions = new HashSet<>();

for (Pair<String, String> pair : this.tabletPairList) {
partitions.add(new YBPartition(pair.getKey(), pair.getValue(), false));
}

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Returning an active partition set with size: {}", partitions.size());
}

return partitions;
}

/**
* If we receive change events but all of them get filtered out, we cannot
* commit any new offset with Apache Kafka. This in turn means no LSN is ever
Expand Down

0 comments on commit 81977a7

Please sign in to comment.