From 1afec63da7fa81a989ace8b7739fbb4eadfdf0bf Mon Sep 17 00:00:00 2001 From: Sanketh Nalli Date: Thu, 22 Aug 2024 13:52:12 -0700 Subject: [PATCH] [DR] Verify all partitions serially (#2863) We need to restore and verify all partitions backed up in cloud, serially. --- .../ambry/cloud/BackupIntegrityMonitor.java | 82 ++++++++++++++----- 1 file changed, 63 insertions(+), 19 deletions(-) diff --git a/ambry-cloud/src/main/java/com/github/ambry/cloud/BackupIntegrityMonitor.java b/ambry-cloud/src/main/java/com/github/ambry/cloud/BackupIntegrityMonitor.java index b21d695301..4ae114d29d 100644 --- a/ambry-cloud/src/main/java/com/github/ambry/cloud/BackupIntegrityMonitor.java +++ b/ambry-cloud/src/main/java/com/github/ambry/cloud/BackupIntegrityMonitor.java @@ -44,14 +44,13 @@ import com.github.ambry.store.StoreFindToken; import com.github.ambry.utils.Utils; import java.io.File; -import java.text.DateFormat; -import java.text.SimpleDateFormat; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Random; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -95,9 +94,9 @@ public class BackupIntegrityMonitor implements Runnable { private final ScheduledExecutorService executor; private final StorageManager storageManager; private final AzureCloudConfig azureConfig; - public final long SCAN_MILESTONE = TimeUnit.DAYS.toMillis(1); private final RecoveryMetrics metrics; - private final HashSet seen; + private long currentPartitionId; + private int currentDiskId; public BackupIntegrityMonitor(RecoveryManager azure, ReplicationManager server, CompositeClusterManager cluster, StorageManager storage, DataNodeId node, @@ -117,7 +116,19 @@ public BackupIntegrityMonitor(RecoveryManager azure, ReplicationManager server, storageManager = storage; azureSyncClient = new AzureCloudDestinationSync(properties, helixClusterManager.getMetricRegistry(), helixClusterManager, null); - seen = new HashSet<>(); + /** + * After a restart, we scan the output folder to get the last partition-id verified. We don't know if + * we crashed or restarted while scanning that partition, so we start from there. At the end, we increment the + * currentPartitionId so that the next run() scans and verifies the next partition. This way we do not miss scanning + * and verifying any partition. We mod and circle back to 0 once all partitions in the cluster-map have been verified. + * + * So the sequence looks like this: + * R1: 0 1 2 3 x (crash/restart/deployment) + * R2: 3 4 5 x (crash/restart/deployment) + * R3: 5 6 7 ... so on. + */ + currentPartitionId = getLastPartitionIdVerified(); + currentDiskId = 0; // log disk state staticClusterManager.getDataNodeId(nodeId.getHostname(), nodeId.getPort()) .getDiskIds() @@ -126,11 +137,35 @@ public BackupIntegrityMonitor(RecoveryManager azure, ReplicationManager server, logger.info("[BackupIntegrityMonitor] Created BackupIntegrityMonitor"); } + /** + * Returns the last partition-id scanned and verified + * @return + */ + long getLastPartitionIdVerified() { + long maxPartitionId = 0; + try { + for(File file : new File(replicationConfig.backupCheckerReportDir).listFiles()) { + maxPartitionId = Math.max(maxPartitionId, Long.parseLong(file.getName())); + } + } catch (Throwable e) { + metrics.backupCheckerRuntimeError.inc(); + logger.error("[BackupIntegrityMonitor] Failed to get last partition-id due to {}, start verification from partition-id 0", e.getMessage()); + } + return maxPartitionId; + } + + class PartitionIdComparator implements Comparator { + @Override + public int compare(PartitionId p1, PartitionId p2) { + return p1.getId() >= p2.getId() ? 1 : -1; + } + } + /** * Starts and schedules monitor */ public void start() { - executor.scheduleWithFixedDelay(this::run, 0, 1, TimeUnit.HOURS); + executor.scheduleWithFixedDelay(this::run, 0, 30, TimeUnit.MINUTES); logger.info("[BackupIntegrityMonitor] Started BackupIntegrityMonitor"); } @@ -166,8 +201,8 @@ private BlobStore startLocalStore(AmbryPartition partition) throws Exception { .filter(d -> d.getState() == HardwareState.AVAILABLE) .collect(Collectors.toList()); logger.info("[BackupIntegrityMonitor] {} disks can accommodate partition-{}", disks.size(), partition.getId()); - // Pick disk randomly; any disk is ok as we will wipe it out after this - DiskId disk = disks.get(new Random().nextInt(disks.size())); + // Pick the next disk to make debugging easier + DiskId disk = disks.get(++currentDiskId % disks.size()); logger.info("[BackupIntegrityMonitor] Selected disk at mount path {}", disk.getMountPath()); // Clear disk to make space, this is simpler instead of deciding which partition to delete. // This is why this is thread-unsafe. @@ -315,19 +350,25 @@ public void run() { RemoteReplicaInfo cloudReplica = null; AmbryPartition partition = null; serverScanner = serverReplicationManager.getBackupCheckerThread("ambry_backup_integrity_monitor"); - Random random = new Random(); + PartitionIdComparator partitionIdComparator = new PartitionIdComparator(); try { /** Select partition P */ List partitions = helixClusterManager.getAllPartitionIds(null); - if (((double) seen.size())/partitions.size() >= 0.9) { - // If we have seen 90% of the partitions, then just clear the seen-set - seen.clear(); + PartitionId maxPartitionId = partitions.stream().max(partitionIdComparator).get(); + logger.info("[BackupIntegrityMonitor] Total number of partitions = {}, max partition-id = {}", + partitions.size(), maxPartitionId.getId()); + partition = (AmbryPartition) partitions.stream() + .filter(p -> p.getId() == currentPartitionId) + .collect(Collectors.toList()) + .get(0); + if (currentPartitionId == 0) { + // We circled back to partition 0 because we either finished all partitions, + // or there was some error in finding out the last partition scanned. Clear state. + logger.info("[BackupIntegrityMonitor] Deleting directory {}", replicationConfig.backupCheckerReportDir); + Utils.deleteFileOrDirectory(new File(replicationConfig.backupCheckerReportDir)); + logger.info("[BackupIntegrityMonitor] Creating directory {}", replicationConfig.backupCheckerReportDir); + Files.createDirectories(Paths.get(replicationConfig.backupCheckerReportDir)); } - partitions = partitions.stream() - .filter(p -> !seen.contains(p.getId())) - .collect(Collectors.toList()); - partition = (AmbryPartition) partitions.get(random.nextInt(partitions.size())); - seen.add(partition.getId()); logger.info("[BackupIntegrityMonitor] Verifying backup partition-{}", partition.getId()); /** Create local Store S */ @@ -381,6 +422,9 @@ public void run() { serverScanner.setAzureBlobInfo(azureBlobs); compareMetadata(serverReplica, cloudReplica); } + + currentPartitionId = ++currentPartitionId % maxPartitionId.getId(); + // Don't delete any state, leave it for inspection. } catch (Throwable e) { metrics.backupCheckerRuntimeError.inc(); logger.error(String.format("[BackupIntegrityMonitor] Failed to verify cloud backup partition-%s due to",