From 53d2c05bc9d131ba5229d8c9b18daeba17606c7d Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Sun, 20 Oct 2024 18:41:50 -0700 Subject: [PATCH] [admin-tool] Add a cluster batch processing framework command and a system store empty push task --- .../java/com/linkedin/venice/AdminTool.java | 84 ++++++++++ .../main/java/com/linkedin/venice/Arg.java | 5 +- .../linkedin/venice/ClusterTaskRunner.java | 100 ++++++++++++ .../java/com/linkedin/venice/Command.java | 7 + .../linkedin/venice/SystemStorePushTask.java | 154 ++++++++++++++++++ 5 files changed, 349 insertions(+), 1 deletion(-) create mode 100644 clients/venice-admin-tool/src/main/java/com/linkedin/venice/ClusterTaskRunner.java create mode 100644 clients/venice-admin-tool/src/main/java/com/linkedin/venice/SystemStorePushTask.java diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java index 49a3fda801..9534582e3a 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java @@ -114,6 +114,7 @@ import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.utils.pools.LandFillObjectPool; import java.io.BufferedReader; import java.io.Console; @@ -122,6 +123,7 @@ import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.text.SimpleDateFormat; import java.time.Duration; @@ -142,6 +144,10 @@ import java.util.Set; import java.util.StringJoiner; import java.util.TimeZone; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; @@ -573,6 +579,9 @@ public static void main(String[] args) throws Exception { case EXTRACT_VENICE_ZK_PATHS: extractVeniceZKPaths(cmd); break; + case RUN_CLUSTER_COMMAND: + runClusterCommand(cmd); + break; default: StringJoiner availableCommands = new StringJoiner(", "); for (Command c: Command.values()) { @@ -837,6 +846,81 @@ private static void deleteStore(CommandLine cmd) throws IOException { printObject(response); } + private static void runClusterCommand(CommandLine cmd) { + String clusterName = getRequiredArgument(cmd, Arg.CLUSTER, Command.RUN_CLUSTER_COMMAND); + String task = getRequiredArgument(cmd, Arg.TASK_NAME, Command.RUN_CLUSTER_COMMAND); + String checkpointFile = getRequiredArgument(cmd, Arg.CHECKPOINT_FILE, Command.RUN_CLUSTER_COMMAND); + int parallelism = Integer.parseInt(getOptionalArgument(cmd, Arg.THREAD_COUNT, "1")); + System.out.println( + "[**** Cluster Command Params ****] Cluster: " + clusterName + ", Task: " + task + ", Checkpoint: " + + checkpointFile + ", Parallelism: " + parallelism); + + // Create child data center controller client map. + ChildAwareResponse childAwareResponse = controllerClient.listChildControllers(clusterName); + Map controllerClientMap = getControllerClientMap(clusterName, childAwareResponse); + + // Fetch list cluster store list from parent region. + Map progressMap = new VeniceConcurrentHashMap<>(); + MultiStoreResponse clusterStoreResponse = controllerClient.queryStoreList(false); + if (clusterStoreResponse.isError()) { + throw new VeniceException("Unable to fetch cluster store list: " + clusterStoreResponse.getError()); + } + for (String storeName: clusterStoreResponse.getStores()) { + progressMap.put(storeName, Boolean.FALSE); + } + + // Load progress from checkpoint file. If file does not exist, it will create new one during checkpointing. + try { + Path filePath = Paths.get(checkpointFile).toAbsolutePath(); + if (!Files.exists(filePath)) { + System.out.println("Checkpoint file path does not exist, will create a new checkpoint file: " + filePath); + } else { + List fileLines = Files.readAllLines(Paths.get(checkpointFile)); + for (String line: fileLines) { + String storeName = line.split(",")[0]; + // For now, it is boolean to start with, we can add more states to support retry. + boolean status = false; + if (line.split(",").length > 1) { + status = Boolean.parseBoolean(line.split(",")[1]); + } + progressMap.put(storeName, status); + } + } + } catch (IOException e) { + throw new VeniceException(e); + } + List taskList = + progressMap.entrySet().stream().filter(e -> !e.getValue()).map(Map.Entry::getKey).collect(Collectors.toList()); + + // Validate task type. + Supplier> functionSupplier; + if ("PushSystemStore".equals(task)) { + functionSupplier = () -> new SystemStorePushTask(controllerClient, controllerClientMap, clusterName); + } else { + System.out.println("Undefined task: " + task); + return; + } + + // Create thread pool and start parallel processing. + ExecutorService executorService = Executors.newFixedThreadPool(parallelism); + List futureList = new ArrayList<>(); + for (int i = 0; i < parallelism; i++) { + ClusterTaskRunner clusterTaskRunner = + new ClusterTaskRunner(progressMap, checkpointFile, taskList, functionSupplier.get()); + futureList.add(executorService.submit(clusterTaskRunner)); + } + for (int i = 0; i < parallelism; i++) { + try { + futureList.get(i).get(); + System.out.println("Cluster task completed for thread : " + i); + } catch (InterruptedException | ExecutionException e) { + System.out.println(e.getMessage()); + executorService.shutdownNow(); + } + } + executorService.shutdownNow(); + } + private static void backfillSystemStores(CommandLine cmd) { String clusterName = getRequiredArgument(cmd, Arg.CLUSTER, Command.BACKFILL_SYSTEM_STORES); String systemStoreType = getRequiredArgument(cmd, Arg.SYSTEM_STORE_TYPE, Command.BACKFILL_SYSTEM_STORES); diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java index c9b7c0940d..87886c4c79 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java @@ -225,7 +225,10 @@ public enum Arg { SYSTEM_STORE_TYPE( "system-store-type", "sst", true, "Type of system store to backfill. Supported types are davinci_push_status_store and meta_store" - ), RETRY("retry", "r", false, "Retry this operation"), + ), TASK_NAME("task-name", "tn", true, "Name of the task for cluster command. Supported command [PushSystemStore]."), + CHECKPOINT_FILE("checkpoint-file", "cf", true, "Checkpoint file path for cluster command."), + THREAD_COUNT("thread-count", "tc", true, "Number of threads to execute. 1 if not specified"), + RETRY("retry", "r", false, "Retry this operation"), DISABLE_LOG("disable-log", "dl", false, "Disable logs from internal classes. Only print command output on console"), STORE_VIEW_CONFIGS( "storage-view-configs", "svc", true, diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/ClusterTaskRunner.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/ClusterTaskRunner.java new file mode 100644 index 0000000000..7791f8384b --- /dev/null +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/ClusterTaskRunner.java @@ -0,0 +1,100 @@ +package com.linkedin.venice; + +import com.linkedin.venice.exceptions.VeniceException; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +/** + * This class is a simple runnable which keeps fetching task from list and execute the assigned task. The task fetching + * and progress tracking / checkpointing is thread-safe, so it can be run in parallel. + */ +public class ClusterTaskRunner implements Runnable { + private static final Logger LOGGER = LogManager.getLogger(ClusterTaskRunner.class); + private static final String TASK_LOG_PREFIX = "[**** TASK INFO ****]"; + + private static final ReentrantLock LOCK = new ReentrantLock(); + private static final AtomicInteger INDEX = new AtomicInteger(-1); + private final List taskList; + private final Function storeRunnable; + private final Map progressMap; + private final String checkpointFile; + + public ClusterTaskRunner( + Map progressMap, + String checkpointFile, + List taskList, + Function storeRunnable) { + this.taskList = taskList; + this.storeRunnable = storeRunnable; + this.progressMap = progressMap; + this.checkpointFile = checkpointFile; + } + + @Override + public void run() { + while (true) { + int fetchedTaskIndex = INDEX.incrementAndGet(); + if (fetchedTaskIndex >= taskList.size()) { + LOGGER.info("Cannot find new store from queue, will exit."); + break; + } + String store = taskList.get(fetchedTaskIndex); + try { + LOGGER.info("{} Running store job: {} for store: {}", TASK_LOG_PREFIX, fetchedTaskIndex + 1, store); + boolean result = storeRunnable.apply(store); + if (result) { + LOGGER.info( + "{} Complete store task for job: {}/{} store: {}", + TASK_LOG_PREFIX, + fetchedTaskIndex + 1, + taskList.size(), + store); + progressMap.put(store, true); + } else { + LOGGER.info( + "{} Failed store task for job: {}/{} store: {}", + TASK_LOG_PREFIX, + fetchedTaskIndex + 1, + taskList.size(), + store); + } + // Periodically update the checkpoint file. + if ((fetchedTaskIndex % 100) == 0) { + LOGGER.info("{} Preparing to checkpoint status at index {}", TASK_LOG_PREFIX, fetchedTaskIndex); + checkpoint(checkpointFile); + } + } catch (Exception e) { + LOGGER.info("{} Caught exception: {}. Will exit.", TASK_LOG_PREFIX, e.getMessage()); + } + } + // Perform one final checkpointing before existing the runnable. + checkpoint(checkpointFile); + } + + public void checkpoint(String checkpointFile) { + try { + LOCK.lock(); + LOGGER.info("Updating checkpoint..."); + + List status = + progressMap.entrySet().stream().map(e -> e.getKey() + "," + e.getValue()).collect(Collectors.toList()); + Files.write(Paths.get(checkpointFile), status); + LOGGER.info("Updated checkpoint..."); + + } catch (IOException e) { + throw new VeniceException(e); + } finally { + LOCK.unlock(); + } + } +} diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java index cc10836b08..64cc079936 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java @@ -12,6 +12,7 @@ import static com.linkedin.venice.Arg.BATCH_GET_LIMIT; import static com.linkedin.venice.Arg.BLOB_TRANSFER_ENABLED; import static com.linkedin.venice.Arg.BOOTSTRAP_TO_ONLINE_TIMEOUT_IN_HOUR; +import static com.linkedin.venice.Arg.CHECKPOINT_FILE; import static com.linkedin.venice.Arg.CHILD_CONTROLLER_ADMIN_TOPIC_CONSUMPTION_ENABLED; import static com.linkedin.venice.Arg.CHUNKING_ENABLED; import static com.linkedin.venice.Arg.CLIENT_DECOMPRESSION_ENABLED; @@ -119,6 +120,8 @@ import static com.linkedin.venice.Arg.STORE_TYPE; import static com.linkedin.venice.Arg.STORE_VIEW_CONFIGS; import static com.linkedin.venice.Arg.SYSTEM_STORE_TYPE; +import static com.linkedin.venice.Arg.TASK_NAME; +import static com.linkedin.venice.Arg.THREAD_COUNT; import static com.linkedin.venice.Arg.UNUSED_SCHEMA_DELETION_ENABLED; import static com.linkedin.venice.Arg.URL; import static com.linkedin.venice.Arg.VALUE_SCHEMA; @@ -192,6 +195,10 @@ public enum Command { "backfill-system-stores", "Create system stores of a given type for user stores in a cluster", new Arg[] { URL, CLUSTER, SYSTEM_STORE_TYPE } ), + RUN_CLUSTER_COMMAND( + "run-cluster-command", "Run specific task for all user stores in a cluster", + new Arg[] { URL, CLUSTER, TASK_NAME, CHECKPOINT_FILE }, new Arg[] { THREAD_COUNT } + ), SET_VERSION( "set-version", "Set the version that will be served", new Arg[] { URL, STORE, VERSION }, new Arg[] { CLUSTER } ), ADD_SCHEMA("add-schema", "", new Arg[] { URL, STORE, VALUE_SCHEMA }, new Arg[] { CLUSTER }), diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/SystemStorePushTask.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/SystemStorePushTask.java new file mode 100644 index 0000000000..0846e360d2 --- /dev/null +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/SystemStorePushTask.java @@ -0,0 +1,154 @@ +package com.linkedin.venice; + +import static com.linkedin.venice.AdminTool.printObject; + +import com.linkedin.venice.common.VeniceSystemStoreType; +import com.linkedin.venice.controllerapi.ControllerClient; +import com.linkedin.venice.controllerapi.ControllerResponse; +import com.linkedin.venice.controllerapi.JobStatusQueryResponse; +import com.linkedin.venice.controllerapi.StoreResponse; +import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; +import com.linkedin.venice.controllerapi.VersionCreationResponse; +import com.linkedin.venice.controllerapi.VersionResponse; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.pushmonitor.ExecutionStatus; +import com.linkedin.venice.utils.Utils; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +/** + * This class aims to do one time emtpy push to all user system stores of a specific user store. + * It will aggregate and compute the largest used version from all regions and update store before performing empty push. + * It will also skip empty push to store which is being migrated and is in the destination cluster. + */ +public class SystemStorePushTask implements Function { + private static final Logger LOGGER = LogManager.getLogger(SystemStorePushTask.class); + private static final int JOB_POLLING_RETRY_COUNT = 200; + private static final int JOB_POLLING_RETRY_PERIOD_IN_SECONDS = 5; + private static final String SYSTEM_STORE_PUSH_TASK_LOG_PREFIX = "[**** SYSTEM STORE PUSH ****]"; + private static final List SYSTEM_STORE_TYPE = + Arrays.asList(VeniceSystemStoreType.META_STORE, VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE); + + private final ControllerClient parentControllerClient; + private final String clusterName; + private final Map childControllerClientMap; + + public SystemStorePushTask( + ControllerClient parentControllerClient, + Map controllerClientMap, + String clusterName) { + this.parentControllerClient = parentControllerClient; + this.childControllerClientMap = controllerClientMap; + this.clusterName = clusterName; + } + + public Boolean apply(String storeName) { + StoreResponse storeResponse = parentControllerClient.getStore(storeName); + if (storeResponse.isError()) { + LOGGER.error("{} Unable to locate user store: {}", SYSTEM_STORE_PUSH_TASK_LOG_PREFIX, storeName); + return false; + } + if (storeResponse.getStore().isMigrating() && storeResponse.getStore().isMigrationDuplicateStore()) { + LOGGER.error( + "{} Unable to empty push to system store of migrating dest cluster store: {} in cluster: {}", + SYSTEM_STORE_PUSH_TASK_LOG_PREFIX, + storeName, + clusterName); + return false; + } + + for (VeniceSystemStoreType type: SYSTEM_STORE_TYPE) { + String systemStoreName = type.getSystemStoreName(storeName); + VersionResponse response = parentControllerClient.getStoreLargestUsedVersion(clusterName, systemStoreName); + if (response.isError()) { + LOGGER.error( + "{} Unable to locate largest used store version for: {}", + SYSTEM_STORE_PUSH_TASK_LOG_PREFIX, + systemStoreName); + return false; + } + int largestUsedVersion = response.getVersion(); + + int version = getStoreLargestUsedVersionNumber(parentControllerClient, systemStoreName); + if (version == -1) { + return false; + } + largestUsedVersion = Math.max(largestUsedVersion, version); + for (Map.Entry controllerClientEntry: childControllerClientMap.entrySet()) { + int result = getStoreLargestUsedVersionNumber(controllerClientEntry.getValue(), systemStoreName); + if (result == -1) { + LOGGER.error( + "{} Unable to locate store for: {} in region: {}", + SYSTEM_STORE_PUSH_TASK_LOG_PREFIX, + systemStoreName, + controllerClientEntry.getKey()); + return false; + } + largestUsedVersion = Math.max(largestUsedVersion, result); + } + + LOGGER.info("Aggregate largest version: {} for store: {}", largestUsedVersion, systemStoreName); + // largestUsedVersion = largestUsedVersion + 10; + ControllerResponse controllerResponse = parentControllerClient + .updateStore(systemStoreName, new UpdateStoreQueryParams().setLargestUsedVersionNumber(largestUsedVersion)); + if (controllerResponse.isError()) { + LOGGER.error( + "{} Unable to set largest used store version for: {} as {} in all regions", + SYSTEM_STORE_PUSH_TASK_LOG_PREFIX, + systemStoreName, + largestUsedVersion); + return false; + } + + VersionCreationResponse versionCreationResponse = + parentControllerClient.emptyPush(systemStoreName, "SYSTEM_STORE_PUSH_" + System.currentTimeMillis(), 10000); + // Kafka topic name in the above response is null, and it will be fixed with this code change. + String topicName = Version.composeKafkaTopic(systemStoreName, versionCreationResponse.getVersion()); + // Polling job status to make sure the empty push hits every child colo + int count = JOB_POLLING_RETRY_COUNT; + while (true) { + JobStatusQueryResponse jobStatusQueryResponse = + parentControllerClient.retryableRequest(3, controllerClient -> controllerClient.queryJobStatus(topicName)); + printObject(jobStatusQueryResponse, System.out::print); + if (jobStatusQueryResponse.isError()) { + return false; + } + ExecutionStatus executionStatus = ExecutionStatus.valueOf(jobStatusQueryResponse.getStatus()); + if (executionStatus.isTerminal()) { + if (executionStatus.isError()) { + LOGGER.error("{} Push error for topic: {}", SYSTEM_STORE_PUSH_TASK_LOG_PREFIX, topicName); + return false; + } + LOGGER.info("{} Push completed: {}", SYSTEM_STORE_PUSH_TASK_LOG_PREFIX, topicName); + break; + } + Utils.sleep(TimeUnit.SECONDS.toMillis(JOB_POLLING_RETRY_PERIOD_IN_SECONDS)); + count--; + if (count == 0) { + LOGGER.error( + "{} Push not finished: {} in {} seconds", + SYSTEM_STORE_PUSH_TASK_LOG_PREFIX, + topicName, + JOB_POLLING_RETRY_COUNT * JOB_POLLING_RETRY_PERIOD_IN_SECONDS); + return false; + } + } + } + return true; + } + + int getStoreLargestUsedVersionNumber(ControllerClient controllerClient, String systemStoreName) { + // Make sure store exist in region and return largest used version number. + StoreResponse systemStoreResponse = controllerClient.getStore(systemStoreName); + if (systemStoreResponse.isError()) { + return -1; + } + return systemStoreResponse.getStore().getLargestUsedVersionNumber(); + } +}