Skip to content

Commit

Permalink
[DR] Verify all partitions serially (linkedin#2863)
Browse files Browse the repository at this point in the history
We need to restore and verify all partitions backed up in cloud, serially.
  • Loading branch information
snalli authored Aug 22, 2024
1 parent e3b411a commit 1afec63
Showing 1 changed file with 63 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,13 @@
import com.github.ambry.store.StoreFindToken;
import com.github.ambry.utils.Utils;
import java.io.File;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -95,9 +94,9 @@ public class BackupIntegrityMonitor implements Runnable {
private final ScheduledExecutorService executor;
private final StorageManager storageManager;
private final AzureCloudConfig azureConfig;
public final long SCAN_MILESTONE = TimeUnit.DAYS.toMillis(1);
private final RecoveryMetrics metrics;
private final HashSet<Long> seen;
private long currentPartitionId;
private int currentDiskId;

public BackupIntegrityMonitor(RecoveryManager azure, ReplicationManager server,
CompositeClusterManager cluster, StorageManager storage, DataNodeId node,
Expand All @@ -117,7 +116,19 @@ public BackupIntegrityMonitor(RecoveryManager azure, ReplicationManager server,
storageManager = storage;
azureSyncClient = new AzureCloudDestinationSync(properties, helixClusterManager.getMetricRegistry(),
helixClusterManager, null);
seen = new HashSet<>();
/**
* After a restart, we scan the output folder to get the last partition-id verified. We don't know if
* we crashed or restarted while scanning that partition, so we start from there. At the end, we increment the
* currentPartitionId so that the next run() scans and verifies the next partition. This way we do not miss scanning
* and verifying any partition. We mod and circle back to 0 once all partitions in the cluster-map have been verified.
*
* So the sequence looks like this:
* R1: 0 1 2 3 x (crash/restart/deployment)
* R2: 3 4 5 x (crash/restart/deployment)
* R3: 5 6 7 ... so on.
*/
currentPartitionId = getLastPartitionIdVerified();
currentDiskId = 0;
// log disk state
staticClusterManager.getDataNodeId(nodeId.getHostname(), nodeId.getPort())
.getDiskIds()
Expand All @@ -126,11 +137,35 @@ public BackupIntegrityMonitor(RecoveryManager azure, ReplicationManager server,
logger.info("[BackupIntegrityMonitor] Created BackupIntegrityMonitor");
}

/**
* Returns the last partition-id scanned and verified
* @return
*/
long getLastPartitionIdVerified() {
long maxPartitionId = 0;
try {
for(File file : new File(replicationConfig.backupCheckerReportDir).listFiles()) {
maxPartitionId = Math.max(maxPartitionId, Long.parseLong(file.getName()));
}
} catch (Throwable e) {
metrics.backupCheckerRuntimeError.inc();
logger.error("[BackupIntegrityMonitor] Failed to get last partition-id due to {}, start verification from partition-id 0", e.getMessage());
}
return maxPartitionId;
}

class PartitionIdComparator implements Comparator<PartitionId> {
@Override
public int compare(PartitionId p1, PartitionId p2) {
return p1.getId() >= p2.getId() ? 1 : -1;
}
}

/**
* Starts and schedules monitor
*/
public void start() {
executor.scheduleWithFixedDelay(this::run, 0, 1, TimeUnit.HOURS);
executor.scheduleWithFixedDelay(this::run, 0, 30, TimeUnit.MINUTES);
logger.info("[BackupIntegrityMonitor] Started BackupIntegrityMonitor");
}

Expand Down Expand Up @@ -166,8 +201,8 @@ private BlobStore startLocalStore(AmbryPartition partition) throws Exception {
.filter(d -> d.getState() == HardwareState.AVAILABLE)
.collect(Collectors.toList());
logger.info("[BackupIntegrityMonitor] {} disks can accommodate partition-{}", disks.size(), partition.getId());
// Pick disk randomly; any disk is ok as we will wipe it out after this
DiskId disk = disks.get(new Random().nextInt(disks.size()));
// Pick the next disk to make debugging easier
DiskId disk = disks.get(++currentDiskId % disks.size());
logger.info("[BackupIntegrityMonitor] Selected disk at mount path {}", disk.getMountPath());
// Clear disk to make space, this is simpler instead of deciding which partition to delete.
// This is why this is thread-unsafe.
Expand Down Expand Up @@ -315,19 +350,25 @@ public void run() {
RemoteReplicaInfo cloudReplica = null;
AmbryPartition partition = null;
serverScanner = serverReplicationManager.getBackupCheckerThread("ambry_backup_integrity_monitor");
Random random = new Random();
PartitionIdComparator partitionIdComparator = new PartitionIdComparator();
try {
/** Select partition P */
List<PartitionId> partitions = helixClusterManager.getAllPartitionIds(null);
if (((double) seen.size())/partitions.size() >= 0.9) {
// If we have seen 90% of the partitions, then just clear the seen-set
seen.clear();
PartitionId maxPartitionId = partitions.stream().max(partitionIdComparator).get();
logger.info("[BackupIntegrityMonitor] Total number of partitions = {}, max partition-id = {}",
partitions.size(), maxPartitionId.getId());
partition = (AmbryPartition) partitions.stream()
.filter(p -> p.getId() == currentPartitionId)
.collect(Collectors.toList())
.get(0);
if (currentPartitionId == 0) {
// We circled back to partition 0 because we either finished all partitions,
// or there was some error in finding out the last partition scanned. Clear state.
logger.info("[BackupIntegrityMonitor] Deleting directory {}", replicationConfig.backupCheckerReportDir);
Utils.deleteFileOrDirectory(new File(replicationConfig.backupCheckerReportDir));
logger.info("[BackupIntegrityMonitor] Creating directory {}", replicationConfig.backupCheckerReportDir);
Files.createDirectories(Paths.get(replicationConfig.backupCheckerReportDir));
}
partitions = partitions.stream()
.filter(p -> !seen.contains(p.getId()))
.collect(Collectors.toList());
partition = (AmbryPartition) partitions.get(random.nextInt(partitions.size()));
seen.add(partition.getId());
logger.info("[BackupIntegrityMonitor] Verifying backup partition-{}", partition.getId());

/** Create local Store S */
Expand Down Expand Up @@ -381,6 +422,9 @@ public void run() {
serverScanner.setAzureBlobInfo(azureBlobs);
compareMetadata(serverReplica, cloudReplica);
}

currentPartitionId = ++currentPartitionId % maxPartitionId.getId();
// Don't delete any state, leave it for inspection.
} catch (Throwable e) {
metrics.backupCheckerRuntimeError.inc();
logger.error(String.format("[BackupIntegrityMonitor] Failed to verify cloud backup partition-%s due to",
Expand Down

0 comments on commit 1afec63

Please sign in to comment.