Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DBZ] Read offset from Kafka for every commit callback #348

Merged
merged 22 commits into from
Aug 30, 2024

Conversation

vaibhav-yb
Copy link
Collaborator

@vaibhav-yb vaibhav-yb commented Aug 14, 2024

Note: This PR breaks explicit checkpointing in the tablet splitting case, it will be fixed in a follow-up PR.

Problem

There are two issues with the current checkpointing mechanism using the callbacks:

They both are potential cause of data loss and have been reproduced manually as well.

DBZ-6026 -

The current code uses a method BaseSourceTask#logStatistics to log some information, however, it also ends up updating the offset map which is then used for a callback.

However, with this, there's a possibility that if there's a commit() callback after the statistics are logged and before the records are returned from the BaseSourceTask#poll, it will end up marking the checkpoint on service but if the connector restarts in that window, there will be a data loss.

Steps to reproduce:

  1. Put a sleep of 2 minutes after logStatistics() is called and before records are returned from BaseSourceTask#poll.
    a. Add a log so that we see when the method is getting called.
  2. Create a table with a single tablet.
  3. Insert a record and wait for the log in 1a to appear, that will indicate that records are not yet returned.
  4. Wait for commit callback - this generally comes before the sleep gets over.
  5. Restart the connector.
  6. Upon restart, the connector will start from the checkpoint in step 4 and the record inserted in step 3 will never be streamed.

DBZ-7816 -

According to Kafka docs, callbacks for the same Kafka partition are guaranteed to be in order but callbacks for different Kafka partitions can come out of order which can lead to a potential data loss window as mentioned in DBZ-7816

Steps to reproduce:

  1. Create a table with 20 tablets and fill it with 100k records.
  2. Create a topic with 50 partitions.
  3. Start snapshot on the table with snapshot.mode=initial
  4. Once the snapshot is finished, the tablet will be added to wait list and if the callbacks come out of order, it is possible that we will never receive a callback for the last snapshot record and we will never transition from snapshot to streaming.

Note that out of all the experiments performed for this issue, we were able to reproduce it 100% of the times but there's a possibility that it might not reproduce if callbacks are in order (1 out of 10 times maybe?)

Solution

This PR includes the changes to override the commit() method in YugabyteDBConnectorTask and reads the offsets from Kafka partitions and uses the same offsets to send via commit callbacks so that they can be marked on service for checkpointing.

@vaibhav-yb vaibhav-yb added the bug Something isn't working label Aug 14, 2024
@vaibhav-yb vaibhav-yb self-assigned this Aug 14, 2024
@@ -940,7 +940,7 @@ public void commitOffset(Map<String, ?> offset) {
// than one already present would throw the error: CDCSDK: Trying to fetch already GCed intents
if (this.tabletToExplicitCheckpoint.get(entry.getKey()) != null &&
tempOpId.getIndex() < this.tabletToExplicitCheckpoint.get(entry.getKey()).getIndex()) {
LOGGER.warn("The received OpId {} is less than the older checkpoint {} for tablet {}",
LOGGER.debug("The received OpId {} is less than the older checkpoint {} for tablet {}",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is converted to debug level considering the following scenario:

  1. Last record published by the connector has OpId 1.5
  2. Connector received a callback on the above OpId so explicit checkpoint is also at 1.5
  3. Now there were couple of NO_OP on service so connector received empty batches and explicit checkpoint was advanced to 1.8
  4. Assume that for sometime there are no records, or there are no records after the record in step 1
  5. This will result in a commit callback with checkpoint 1.5 since that was the last published record
  6. Subsequently, we will always end up printing this warning log which could be confusing to users.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants