Skip to content

Commit

Permalink
storage: Restart process when changing chromosome to ensure correct s…
Browse files Browse the repository at this point in the history
…orting. #TASK-6722
  • Loading branch information
j-coll committed Oct 29, 2024
1 parent 8268266 commit 4147d01
Showing 1 changed file with 39 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,27 +53,31 @@ public class StreamVariantMapper extends VariantMapper<ImmutableBytesWritable, T
private VariantWriterFactory writerFactory;
private Query query;
private QueryOptions options;
// Keep an auto-incremental number for each produced record. This is used as the key for the output record,
// and will ensure a sorted output.
private int stdoutKeyNum;
private int stderrKeyNum;
private String firstKey;
private String outputKeyPrefix;
private String firstVariant;

private int processCount = 0;

////////////
// Configured for every new process
////////////
private Process process;
private DataOutputStream stdin;
private DataInputStream stdout;
private DataInputStream stderr;
private MRErrorThread stderrThread;
private MROutputThread stdoutThread;
private DataWriter<Variant> variantDataWriter;
protected final List<Throwable> throwables = Collections.synchronizedList(new ArrayList<>());
private int processedBytes = 0;
private long numRecordsRead = 0;
private long numRecordsWritten = 0;
protected final List<Throwable> throwables = Collections.synchronizedList(new ArrayList<>());
// auto-incremental number for each produced record.
// This is used with the outputKeyPrefix to ensure a sorted output.
private int stdoutKeyNum;
private int stderrKeyNum;
private String currentChromosome;
private int currentPosition;
private String outputKeyPrefix;

private volatile boolean processProvidedStatus_ = false;
private VariantMetadata metadata;
Expand Down Expand Up @@ -112,11 +116,6 @@ protected void setup(Context context) throws IOException, InterruptedException {
writerFactory = new VariantWriterFactory(metadataManager);
query = VariantMapReduceUtil.getQueryFromConfig(conf);
options = VariantMapReduceUtil.getQueryOptionsFromConfig(conf);
Variant variant = context.getCurrentValue();
firstKey = variant.getChromosome() + ":" + variant.getStart();
outputKeyPrefix = buildOutputKeyPrefix(variant.getChromosome(), variant.getStart());
stdoutKeyNum = 0;
stderrKeyNum = 0;
}

public static String buildOutputKeyPrefix(String chromosome, Integer start) {
Expand All @@ -143,14 +142,17 @@ public void run(Context context) throws IOException, InterruptedException {
startProcess(context);
// Do-while instead of "while", as we've already called context.nextKeyValue() once
do {
// FIXME: If the chromosome is different, we should start a new process and get a new outputKeyPrefix
currentValue = context.getCurrentValue();
// Restart the process if the input bytes exceed the limit
// or if the chromosome changes
if (processedBytes > maxInputBytesPerProcess) {
LOG.info("Processed bytes = " + processedBytes + " > " + maxInputBytesPerProcess + ". Restarting process.");
context.getCounter(COUNTER_GROUP_NAME, "RESTARTED_PROCESS").increment(1);
closeProcess(context);
startProcess(context);
restartProcess(context, "BYTES_LIMIT");
} else if (!currentChromosome.equals(currentValue.getChromosome())) {
LOG.info("Chromosome changed from " + currentChromosome + " to " + currentValue.getChromosome()
+ ". Restarting process.");
restartProcess(context, "CHR_CHANGE");
}
currentValue = context.getCurrentValue();
map(context.getCurrentKey(), currentValue, context);
} while (!hasExceptions() && context.nextKeyValue());
} catch (Throwable th) {
Expand Down Expand Up @@ -187,6 +189,13 @@ public void run(Context context) throws IOException, InterruptedException {
throwExceptionIfAny();
}

private void restartProcess(Mapper<Object, Variant, ImmutableBytesWritable, Text>.Context context, String reason)
throws IOException, InterruptedException, StorageEngineException {
context.getCounter(COUNTER_GROUP_NAME, "RESTARTED_PROCESS_" + reason).increment(1);
closeProcess(context);
startProcess(context);
}

private boolean hasExceptions() {
return !throwables.isEmpty();
}
Expand Down Expand Up @@ -336,10 +345,20 @@ private void closeProcess(Context context) throws IOException, InterruptedExcept
// drainStdout(context);
}

private void startProcess(Context context) throws IOException, StorageEngineException {
private void startProcess(Context context) throws IOException, StorageEngineException, InterruptedException {
LOG.info("bash -ce '" + commandLine + "'");
context.getCounter(COUNTER_GROUP_NAME, "START_PROCESS").increment(1);

Variant variant = context.getCurrentValue();
currentChromosome = variant.getChromosome();
currentPosition = variant.getStart();
if (firstVariant == null) {
firstVariant = variant.getChromosome() + ":" + variant.getStart();
}
outputKeyPrefix = buildOutputKeyPrefix(variant.getChromosome(), variant.getStart());
stdoutKeyNum = 0;
stderrKeyNum = 0;

// Start the process
ProcessBuilder builder = new ProcessBuilder("bash", "-ce", commandLine);
// System.getenv().forEach((k, v) -> LOG.info("SYSTEM ENV: " + k + "=" + v));
Expand Down Expand Up @@ -500,7 +519,8 @@ public void run() {
stopWatch.start();
write("---------- " + context.getTaskAttemptID().toString() + " -----------");
write("Start time : " + TimeUtils.getTimeMillis());
write("Batch start : " + firstKey + " -> " + outputKeyPrefix);
write("Input split : " + firstVariant);
write("Batch start : " + currentChromosome + ":" + currentPosition + " -> " + outputKeyPrefix);
write("sub-process #" + processCount);
write("--- START STDERR ---");
int numRecords = 0;
Expand Down

0 comments on commit 4147d01

Please sign in to comment.