From b25dcb955f0571a8db2290df438af66b7a97dd87 Mon Sep 17 00:00:00 2001 From: github-actions Date: Thu, 31 Oct 2024 11:55:44 -0600 Subject: [PATCH] Reorder disks if mounts have been shuffled. --- .../ambry/clustermap/ClusterParticipant.java | 11 ++ .../com/github/ambry/config/StoreConfig.java | 9 ++ .../ambry/clustermap/HelixParticipant.java | 37 +++++ .../store/ReplicaPlacementValidator.java | 133 ++++++++++++++++++ .../github/ambry/store/StorageManager.java | 17 +++ 5 files changed, 207 insertions(+) create mode 100644 ambry-store/src/main/java/com/github/ambry/store/ReplicaPlacementValidator.java diff --git a/ambry-api/src/main/java/com/github/ambry/clustermap/ClusterParticipant.java b/ambry-api/src/main/java/com/github/ambry/clustermap/ClusterParticipant.java index 0950192032..8217f4d4f5 100644 --- a/ambry-api/src/main/java/com/github/ambry/clustermap/ClusterParticipant.java +++ b/ambry-api/src/main/java/com/github/ambry/clustermap/ClusterParticipant.java @@ -207,4 +207,15 @@ default boolean updateDiskCapacity(int diskCapacity) { default boolean setDisksState(List diskId, HardwareState state) { return true; } + + /** + * Given a map of old disks and new disks, swaps the old disks with the new disks in the DataNodeConfig, + * and persists the resulting changes to Helix. + * @param newDiskMapping A map of old disks to new disks. + * @return {@code true} if the disks order was successfully updated, {@code false} otherwise. + */ + default boolean setDisksOrder(Map newDiskMapping) { + // The default should be to do nothing about disk order, so return false. + return false; + } } diff --git a/ambry-api/src/main/java/com/github/ambry/config/StoreConfig.java b/ambry-api/src/main/java/com/github/ambry/config/StoreConfig.java index 82bca8c88f..32b48e5dab 100644 --- a/ambry-api/src/main/java/com/github/ambry/config/StoreConfig.java +++ b/ambry-api/src/main/java/com/github/ambry/config/StoreConfig.java @@ -674,6 +674,14 @@ public class StoreConfig { public final boolean storeBlockStaleBlobStoreToStart; public final static String storeBlockStaleBlobStoreToStartName = "store.block.stale.blob.store.to.start"; + /** + * Whether to attempt reshuffling of reordered disks and subsequent process termination. + */ + @Config("store.reshuffle.disks.on.reorder") + @Default("false") + public final boolean storeReshuffleDisksOnReorder; + public final static String storeReshuffleDisksOnReorderName = "store.reshuffle.disks.on.reorder"; + public StoreConfig(VerifiableProperties verifiableProperties) { storeKeyFactory = verifiableProperties.getString("store.key.factory", "com.github.ambry.commons.BlobIdFactory"); storeDataFlushIntervalSeconds = verifiableProperties.getLong("store.data.flush.interval.seconds", 60); @@ -852,5 +860,6 @@ public StoreConfig(VerifiableProperties verifiableProperties) { verifiableProperties.getIntInRange(storeProactiveTestDelayInSecondsName, 60, 0, Integer.MAX_VALUE); storeStaleTimeInDays = verifiableProperties.getIntInRange(storeStaleTimeInDaysName, 7, 0, Integer.MAX_VALUE); storeBlockStaleBlobStoreToStart = verifiableProperties.getBoolean(storeBlockStaleBlobStoreToStartName, false); + storeReshuffleDisksOnReorder = verifiableProperties.getBoolean(storeReshuffleDisksOnReorderName, false); } } diff --git a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java index f8e4de5b60..b2effd7dc7 100644 --- a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java +++ b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java @@ -436,6 +436,43 @@ public boolean setDisksState(List diskIds, HardwareState state) { } } + /** + * Given a map of old disks and new disks, swaps the old disks with the new disks in the DataNodeConfig, + * and persists the resulting changes to Helix. + * @param newDiskMapping A map of old disks to new disks. + * @return {@code true} if the disks order was successfully updated, {@code false} otherwise. + */ + public boolean setDisksOrder(Map newDiskMapping) { + if (newDiskMapping == null || newDiskMapping.isEmpty()) { + throw new IllegalArgumentException("Map of disk mappings is empty when attempting to set disks order"); + } + + // Update DataNodeConfig and save it. + synchronized (helixAdministrationLock) { + DataNodeConfig dataNodeConfig = getDataNodeConfig(); + boolean success = true; + for (DiskId oldDisk : newDiskMapping.keySet()) { + DiskId newDisk = newDiskMapping.get(oldDisk); + + // Confirm that both disks are present in the DataNodeConfig + DataNodeConfig.DiskConfig oldDiskConfig = dataNodeConfig.getDiskConfigs().get(oldDisk.getMountPath()); + DataNodeConfig.DiskConfig newDiskConfig = dataNodeConfig.getDiskConfigs().get(newDisk.getMountPath()); + if (oldDiskConfig == null || newDiskConfig == null) { + throw new IllegalArgumentException("Disk " + oldDisk.getMountPath() + " or " + newDisk.getMountPath() + " can't be found in the DataNodeConfig"); + } + + // Swap the disks in the DataNodeConfig + logger.info("Replacing disk {} with disk {}", oldDisk.getMountPath(), newDisk.getMountPath()); + dataNodeConfig.getDiskConfigs().put(oldDisk.getMountPath(), dataNodeConfig.getDiskConfigs().get(newDisk)); + if (!dataNodeConfigSource.set(dataNodeConfig)) { + logger.error("Setting disks order failed DataNodeConfig update"); + success = false; + } + } + return success; + } + } + @Override public boolean supportsStateChanges() { return true; diff --git a/ambry-store/src/main/java/com/github/ambry/store/ReplicaPlacementValidator.java b/ambry-store/src/main/java/com/github/ambry/store/ReplicaPlacementValidator.java new file mode 100644 index 0000000000..b8cdd45abf --- /dev/null +++ b/ambry-store/src/main/java/com/github/ambry/store/ReplicaPlacementValidator.java @@ -0,0 +1,133 @@ +package com.github.ambry.store; + +import com.github.ambry.clustermap.DiskId; +import com.github.ambry.clustermap.ReplicaId; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + + +/** + * Loop through all the host's disks and check if the replicas are placed correctly + * (as compared with the latest Helix state). Sometimes, disks can get re-mounted in the wrong + * order. This class checks for such cases and reshuffles the replicas if necessary, creating + * a map of the broken disks and the disks they should be reshuffled to. + */ +public class ReplicaPlacementValidator { + private final Map> foundDiskToPartitionMap; + private Map> expectedDiskToReplicaMap; + private Map> newDiskToReplicaMap; + private List brokenDisks = new ArrayList<>(); + public ReplicaPlacementValidator(Map> expectedDiskToReplicaMap) { + foundDiskToPartitionMap = new HashMap<>(); + newDiskToReplicaMap = new HashMap<>(); + this.expectedDiskToReplicaMap = expectedDiskToReplicaMap; + + for (Map.Entry> entry : expectedDiskToReplicaMap.entrySet()) { + DiskId currentDisk = entry.getKey(); + foundDiskToPartitionMap.put(currentDisk, findPartitionsOnDisk(currentDisk)); + List replicas = entry.getValue(); + for (ReplicaId replica : replicas) { + String partitionID = replica.getPartitionId().toString(); + if(!foundDiskToPartitionMap.get(currentDisk).contains(partitionID)) { + brokenDisks.add(currentDisk); + } + } + } + } + + /** + * Check the placement of replicas on the disks and reshuffle them if necessary. + * @return A map of the broken disks and the disks they should be reshuffled to. + * An empty map if no reshuffling is necessary or if the reshuffling failed. + */ + public Map reshuffleDisks() { + Map shuffledDisks = new HashMap<>(); + + // Sanity checks: - Abort if we did not find the same number of disks on the + // host as we got from Helix. + // - Abort if we did not find any broken disks. + if ((expectedDiskToReplicaMap.size() != foundDiskToPartitionMap.size()) || + brokenDisks.size() == 0) { + return Collections.emptyMap(); + } + + for (DiskId currentDisk : brokenDisks) { + List expectedReplicas = expectedDiskToReplicaMap.get(currentDisk); + DiskId foundDisk = findDiskWithReplicas(expectedReplicas); + if(foundDisk == null) { + return Collections.emptyMap(); + } else { + shuffledDisks.put(currentDisk, foundDisk); + } + } + return shuffledDisks; + } + + /** + * Find the disk that contains the expected replicas. + * @param expectedReplicas A list of replicas that should be on the disk. + * @return The disk that contains the expected replicas, or null if no such disk was found. + */ + private DiskId findDiskWithReplicas(List expectedReplicas) { + for (Map.Entry> entry : foundDiskToPartitionMap.entrySet()) { + DiskId currentDisk = entry.getKey(); + Set partitions = entry.getValue(); + boolean found = true; + for (ReplicaId replica : expectedReplicas) { + String partitionID = replica.getPartitionId().toString(); + if(!partitions.contains(partitionID)) { + found = false; + break; + } + } + if(found) { + return currentDisk; + } + } + return null; + } + + /** + * Checks whether a directory name looks like a partition. + * @param directoryName the name of the directory to check. + * @return True if the directory name looks like a partition, false otherwise. + */ + private boolean looksLikePartition(String directoryName) { + try { + // The convention is simply to use Java long as partition IDs. + Long.parseLong(directoryName); + } catch (NumberFormatException e) { + return false; + } + return true; + } + + /** + * Find all the partition directories on the disk. + * @param disk an instance of DiskId to search for partition directories. + * @return A list of partition directories on the disk. + */ + private Set findPartitionsOnDisk(DiskId disk) { + Set partitionDirs = new HashSet<>(); + File[] directories = new File(disk.getMountPath()).listFiles(File::isDirectory); + + if (directories != null) { + for (File dir : directories) { + // Tommy: If we store just the leaf name from File.getName() will that work + // when comparing with the directory names we get from ReplicaId?? + // AmbryPartition::toPathString() returns the partition ID as a string. + String dirName = dir.getName(); + if (looksLikePartition(dirName)) { + partitionDirs.add(dirName); + } + } + } + return partitionDirs; + } +} diff --git a/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java b/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java index a4a37a646b..6390bc3e2c 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java +++ b/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java @@ -147,6 +147,23 @@ public StorageManager(StoreConfig storeConfig, DiskManagerConfig diskManagerConf diskToReplicaMap.computeIfAbsent(disk, key -> new ArrayList<>()).add(replica); partitionNameToReplicaId.put(replica.getPartitionId().toPathString(), replica); } + + // Assume it's safe to place the new code here, AFTER partitionNameToReplicaId is populated. + if (storeConfig.storeReshuffleDisksOnReorder) { + ReplicaPlacementValidator placementValidator = new ReplicaPlacementValidator(diskToReplicaMap); + Map disksToReshuffle = placementValidator.reshuffleDisks(); + if (!disksToReshuffle.isEmpty()) { + logger.info("Disks need to be reshuffled: {}", disksToReshuffle); + if(primaryClusterParticipant.setDisksOrder(disksToReshuffle)) { + logger.info(this.getClass().getSimpleName() + " - successfully reshuffled disks. Now terminating" + + " the process so we can restart with the new disk order."); + System.exit(0); + } else { + logger.error("Failed to reshuffle disks - continuing with the current disk order"); + } + } + } + for (Map.Entry> entry : diskToReplicaMap.entrySet()) { DiskId disk = entry.getKey(); List replicasForDisk = entry.getValue();