Skip to content

Commit

Permalink
Reorder disks if mounts have been shuffled.
Browse files Browse the repository at this point in the history
  • Loading branch information
github-actions committed Oct 31, 2024
1 parent 132e023 commit b25dcb9
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,4 +207,15 @@ default boolean updateDiskCapacity(int diskCapacity) {
default boolean setDisksState(List<DiskId> diskId, HardwareState state) {
return true;
}

/**
* Given a map of old disks and new disks, swaps the old disks with the new disks in the DataNodeConfig,
* and persists the resulting changes to Helix.
* @param newDiskMapping A map of old disks to new disks.
* @return {@code true} if the disks order was successfully updated, {@code false} otherwise.
*/
default boolean setDisksOrder(Map<DiskId, DiskId> newDiskMapping) {
// The default should be to do nothing about disk order, so return false.
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,14 @@ public class StoreConfig {
public final boolean storeBlockStaleBlobStoreToStart;
public final static String storeBlockStaleBlobStoreToStartName = "store.block.stale.blob.store.to.start";

/**
* Whether to attempt reshuffling of reordered disks and subsequent process termination.
*/
@Config("store.reshuffle.disks.on.reorder")
@Default("false")
public final boolean storeReshuffleDisksOnReorder;
public final static String storeReshuffleDisksOnReorderName = "store.reshuffle.disks.on.reorder";

public StoreConfig(VerifiableProperties verifiableProperties) {
storeKeyFactory = verifiableProperties.getString("store.key.factory", "com.github.ambry.commons.BlobIdFactory");
storeDataFlushIntervalSeconds = verifiableProperties.getLong("store.data.flush.interval.seconds", 60);
Expand Down Expand Up @@ -852,5 +860,6 @@ public StoreConfig(VerifiableProperties verifiableProperties) {
verifiableProperties.getIntInRange(storeProactiveTestDelayInSecondsName, 60, 0, Integer.MAX_VALUE);
storeStaleTimeInDays = verifiableProperties.getIntInRange(storeStaleTimeInDaysName, 7, 0, Integer.MAX_VALUE);
storeBlockStaleBlobStoreToStart = verifiableProperties.getBoolean(storeBlockStaleBlobStoreToStartName, false);
storeReshuffleDisksOnReorder = verifiableProperties.getBoolean(storeReshuffleDisksOnReorderName, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,43 @@ public boolean setDisksState(List<DiskId> diskIds, HardwareState state) {
}
}

/**
* Given a map of old disks and new disks, swaps the old disks with the new disks in the DataNodeConfig,
* and persists the resulting changes to Helix.
* @param newDiskMapping A map of old disks to new disks.
* @return {@code true} if the disks order was successfully updated, {@code false} otherwise.
*/
public boolean setDisksOrder(Map<DiskId, DiskId> newDiskMapping) {
if (newDiskMapping == null || newDiskMapping.isEmpty()) {
throw new IllegalArgumentException("Map of disk mappings is empty when attempting to set disks order");
}

// Update DataNodeConfig and save it.
synchronized (helixAdministrationLock) {
DataNodeConfig dataNodeConfig = getDataNodeConfig();
boolean success = true;
for (DiskId oldDisk : newDiskMapping.keySet()) {
DiskId newDisk = newDiskMapping.get(oldDisk);

// Confirm that both disks are present in the DataNodeConfig
DataNodeConfig.DiskConfig oldDiskConfig = dataNodeConfig.getDiskConfigs().get(oldDisk.getMountPath());
DataNodeConfig.DiskConfig newDiskConfig = dataNodeConfig.getDiskConfigs().get(newDisk.getMountPath());
if (oldDiskConfig == null || newDiskConfig == null) {
throw new IllegalArgumentException("Disk " + oldDisk.getMountPath() + " or " + newDisk.getMountPath() + " can't be found in the DataNodeConfig");
}

// Swap the disks in the DataNodeConfig
logger.info("Replacing disk {} with disk {}", oldDisk.getMountPath(), newDisk.getMountPath());
dataNodeConfig.getDiskConfigs().put(oldDisk.getMountPath(), dataNodeConfig.getDiskConfigs().get(newDisk));
if (!dataNodeConfigSource.set(dataNodeConfig)) {
logger.error("Setting disks order failed DataNodeConfig update");
success = false;
}
}
return success;
}
}

@Override
public boolean supportsStateChanges() {
return true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package com.github.ambry.store;

import com.github.ambry.clustermap.DiskId;
import com.github.ambry.clustermap.ReplicaId;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;


/**
* Loop through all the host's disks and check if the replicas are placed correctly
* (as compared with the latest Helix state). Sometimes, disks can get re-mounted in the wrong
* order. This class checks for such cases and reshuffles the replicas if necessary, creating
* a map of the broken disks and the disks they should be reshuffled to.
*/
public class ReplicaPlacementValidator {
private final Map<DiskId, Set<String>> foundDiskToPartitionMap;
private Map<DiskId, List<ReplicaId>> expectedDiskToReplicaMap;
private Map<DiskId, List<ReplicaId>> newDiskToReplicaMap;
private List<DiskId> brokenDisks = new ArrayList<>();
public ReplicaPlacementValidator(Map<DiskId, List<ReplicaId>> expectedDiskToReplicaMap) {
foundDiskToPartitionMap = new HashMap<>();
newDiskToReplicaMap = new HashMap<>();
this.expectedDiskToReplicaMap = expectedDiskToReplicaMap;

for (Map.Entry<DiskId, List<ReplicaId>> entry : expectedDiskToReplicaMap.entrySet()) {
DiskId currentDisk = entry.getKey();
foundDiskToPartitionMap.put(currentDisk, findPartitionsOnDisk(currentDisk));
List<ReplicaId> replicas = entry.getValue();
for (ReplicaId replica : replicas) {
String partitionID = replica.getPartitionId().toString();
if(!foundDiskToPartitionMap.get(currentDisk).contains(partitionID)) {
brokenDisks.add(currentDisk);
}
}
}
}

/**
* Check the placement of replicas on the disks and reshuffle them if necessary.
* @return A map of the broken disks and the disks they should be reshuffled to.
* An empty map if no reshuffling is necessary or if the reshuffling failed.
*/
public Map<DiskId, DiskId> reshuffleDisks() {
Map<DiskId, DiskId> shuffledDisks = new HashMap<>();

// Sanity checks: - Abort if we did not find the same number of disks on the
// host as we got from Helix.
// - Abort if we did not find any broken disks.
if ((expectedDiskToReplicaMap.size() != foundDiskToPartitionMap.size()) ||
brokenDisks.size() == 0) {
return Collections.emptyMap();
}

for (DiskId currentDisk : brokenDisks) {
List<ReplicaId> expectedReplicas = expectedDiskToReplicaMap.get(currentDisk);
DiskId foundDisk = findDiskWithReplicas(expectedReplicas);
if(foundDisk == null) {
return Collections.emptyMap();
} else {
shuffledDisks.put(currentDisk, foundDisk);
}
}
return shuffledDisks;
}

/**
* Find the disk that contains the expected replicas.
* @param expectedReplicas A list of replicas that should be on the disk.
* @return The disk that contains the expected replicas, or null if no such disk was found.
*/
private DiskId findDiskWithReplicas(List<ReplicaId> expectedReplicas) {
for (Map.Entry<DiskId, Set<String>> entry : foundDiskToPartitionMap.entrySet()) {
DiskId currentDisk = entry.getKey();
Set<String> partitions = entry.getValue();
boolean found = true;
for (ReplicaId replica : expectedReplicas) {
String partitionID = replica.getPartitionId().toString();
if(!partitions.contains(partitionID)) {
found = false;
break;
}
}
if(found) {
return currentDisk;
}
}
return null;
}

/**
* Checks whether a directory name looks like a partition.
* @param directoryName the name of the directory to check.
* @return True if the directory name looks like a partition, false otherwise.
*/
private boolean looksLikePartition(String directoryName) {
try {
// The convention is simply to use Java long as partition IDs.
Long.parseLong(directoryName);
} catch (NumberFormatException e) {
return false;
}
return true;
}

/**
* Find all the partition directories on the disk.
* @param disk an instance of DiskId to search for partition directories.
* @return A list of partition directories on the disk.
*/
private Set<String> findPartitionsOnDisk(DiskId disk) {
Set<String> partitionDirs = new HashSet<>();
File[] directories = new File(disk.getMountPath()).listFiles(File::isDirectory);

if (directories != null) {
for (File dir : directories) {
// Tommy: If we store just the leaf name from File.getName() will that work
// when comparing with the directory names we get from ReplicaId??
// AmbryPartition::toPathString() returns the partition ID as a string.
String dirName = dir.getName();
if (looksLikePartition(dirName)) {
partitionDirs.add(dirName);
}
}
}
return partitionDirs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,23 @@ public StorageManager(StoreConfig storeConfig, DiskManagerConfig diskManagerConf
diskToReplicaMap.computeIfAbsent(disk, key -> new ArrayList<>()).add(replica);
partitionNameToReplicaId.put(replica.getPartitionId().toPathString(), replica);
}

// Assume it's safe to place the new code here, AFTER partitionNameToReplicaId is populated.
if (storeConfig.storeReshuffleDisksOnReorder) {
ReplicaPlacementValidator placementValidator = new ReplicaPlacementValidator(diskToReplicaMap);
Map<DiskId, DiskId> disksToReshuffle = placementValidator.reshuffleDisks();
if (!disksToReshuffle.isEmpty()) {
logger.info("Disks need to be reshuffled: {}", disksToReshuffle);
if(primaryClusterParticipant.setDisksOrder(disksToReshuffle)) {
logger.info(this.getClass().getSimpleName() + " - successfully reshuffled disks. Now terminating"
+ " the process so we can restart with the new disk order.");
System.exit(0);
} else {
logger.error("Failed to reshuffle disks - continuing with the current disk order");
}
}
}

for (Map.Entry<DiskId, List<ReplicaId>> entry : diskToReplicaMap.entrySet()) {
DiskId disk = entry.getKey();
List<ReplicaId> replicasForDisk = entry.getValue();
Expand Down

0 comments on commit b25dcb9

Please sign in to comment.