From 4147d0157ef4a30f7af953be237b3b345c8cbb6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacobo=20Coll=20Morag=C3=B3n?= Date: Tue, 29 Oct 2024 17:52:15 +0000 Subject: [PATCH] storage: Restart process when changing chromosome to ensure correct sorting. #TASK-6722 --- .../variant/mr/StreamVariantMapper.java | 58 +++++++++++++------ 1 file changed, 39 insertions(+), 19 deletions(-) diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/mr/StreamVariantMapper.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/mr/StreamVariantMapper.java index cfcf360452..6899398099 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/mr/StreamVariantMapper.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/mr/StreamVariantMapper.java @@ -53,16 +53,13 @@ public class StreamVariantMapper extends VariantMapper variantDataWriter; + protected final List throwables = Collections.synchronizedList(new ArrayList<>()); private int processedBytes = 0; private long numRecordsRead = 0; private long numRecordsWritten = 0; - protected final List throwables = Collections.synchronizedList(new ArrayList<>()); + // auto-incremental number for each produced record. + // This is used with the outputKeyPrefix to ensure a sorted output. + private int stdoutKeyNum; + private int stderrKeyNum; + private String currentChromosome; + private int currentPosition; + private String outputKeyPrefix; private volatile boolean processProvidedStatus_ = false; private VariantMetadata metadata; @@ -112,11 +116,6 @@ protected void setup(Context context) throws IOException, InterruptedException { writerFactory = new VariantWriterFactory(metadataManager); query = VariantMapReduceUtil.getQueryFromConfig(conf); options = VariantMapReduceUtil.getQueryOptionsFromConfig(conf); - Variant variant = context.getCurrentValue(); - firstKey = variant.getChromosome() + ":" + variant.getStart(); - outputKeyPrefix = buildOutputKeyPrefix(variant.getChromosome(), variant.getStart()); - stdoutKeyNum = 0; - stderrKeyNum = 0; } public static String buildOutputKeyPrefix(String chromosome, Integer start) { @@ -143,14 +142,17 @@ public void run(Context context) throws IOException, InterruptedException { startProcess(context); // Do-while instead of "while", as we've already called context.nextKeyValue() once do { - // FIXME: If the chromosome is different, we should start a new process and get a new outputKeyPrefix + currentValue = context.getCurrentValue(); + // Restart the process if the input bytes exceed the limit + // or if the chromosome changes if (processedBytes > maxInputBytesPerProcess) { LOG.info("Processed bytes = " + processedBytes + " > " + maxInputBytesPerProcess + ". Restarting process."); - context.getCounter(COUNTER_GROUP_NAME, "RESTARTED_PROCESS").increment(1); - closeProcess(context); - startProcess(context); + restartProcess(context, "BYTES_LIMIT"); + } else if (!currentChromosome.equals(currentValue.getChromosome())) { + LOG.info("Chromosome changed from " + currentChromosome + " to " + currentValue.getChromosome() + + ". Restarting process."); + restartProcess(context, "CHR_CHANGE"); } - currentValue = context.getCurrentValue(); map(context.getCurrentKey(), currentValue, context); } while (!hasExceptions() && context.nextKeyValue()); } catch (Throwable th) { @@ -187,6 +189,13 @@ public void run(Context context) throws IOException, InterruptedException { throwExceptionIfAny(); } + private void restartProcess(Mapper.Context context, String reason) + throws IOException, InterruptedException, StorageEngineException { + context.getCounter(COUNTER_GROUP_NAME, "RESTARTED_PROCESS_" + reason).increment(1); + closeProcess(context); + startProcess(context); + } + private boolean hasExceptions() { return !throwables.isEmpty(); } @@ -336,10 +345,20 @@ private void closeProcess(Context context) throws IOException, InterruptedExcept // drainStdout(context); } - private void startProcess(Context context) throws IOException, StorageEngineException { + private void startProcess(Context context) throws IOException, StorageEngineException, InterruptedException { LOG.info("bash -ce '" + commandLine + "'"); context.getCounter(COUNTER_GROUP_NAME, "START_PROCESS").increment(1); + Variant variant = context.getCurrentValue(); + currentChromosome = variant.getChromosome(); + currentPosition = variant.getStart(); + if (firstVariant == null) { + firstVariant = variant.getChromosome() + ":" + variant.getStart(); + } + outputKeyPrefix = buildOutputKeyPrefix(variant.getChromosome(), variant.getStart()); + stdoutKeyNum = 0; + stderrKeyNum = 0; + // Start the process ProcessBuilder builder = new ProcessBuilder("bash", "-ce", commandLine); // System.getenv().forEach((k, v) -> LOG.info("SYSTEM ENV: " + k + "=" + v)); @@ -500,7 +519,8 @@ public void run() { stopWatch.start(); write("---------- " + context.getTaskAttemptID().toString() + " -----------"); write("Start time : " + TimeUtils.getTimeMillis()); - write("Batch start : " + firstKey + " -> " + outputKeyPrefix); + write("Input split : " + firstVariant); + write("Batch start : " + currentChromosome + ":" + currentPosition + " -> " + outputKeyPrefix); write("sub-process #" + processCount); write("--- START STDERR ---"); int numRecords = 0;