Skip to content

Commit

Permalink
storage: Fix VariantLocusKey serialization. #TASK-6722
Browse files Browse the repository at this point in the history
  • Loading branch information
j-coll committed Nov 1, 2024
1 parent 0df69dc commit f6fd3d4
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.apache.commons.lang3.time.StopWatch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
Expand All @@ -19,6 +20,10 @@
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.opencb.commons.datastore.core.ObjectMap;
Expand All @@ -34,11 +39,9 @@
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

Expand Down Expand Up @@ -193,6 +196,7 @@ public final int run(String[] args) throws Exception {
LOGGER.info(" - Outdir : " + job.getConfiguration().get(FileOutputFormat.OUTDIR));
}
LOGGER.info("=================================================");
reportRunningJobs();
boolean succeed = executeJob(job);
if (!succeed) {
LOGGER.error("error with job!");
Expand All @@ -215,6 +219,32 @@ public final int run(String[] args) throws Exception {
return succeed ? 0 : 1;
}

private void reportRunningJobs() {
// Get the number of pending or running jobs in yarn
try (YarnClient yarnClient = YarnClient.createYarnClient()) {
yarnClient.init(getConf());
yarnClient.start();

List<ApplicationReport> applications = yarnClient.getApplications(EnumSet.of(
YarnApplicationState.NEW,
YarnApplicationState.NEW_SAVING,
YarnApplicationState.SUBMITTED,
YarnApplicationState.ACCEPTED,
YarnApplicationState.RUNNING));
if (applications.isEmpty()) {
LOGGER.info("No pending or running jobs in yarn");
} else {
LOGGER.info("Found " + applications.size() + " pending or running jobs in yarn");
for (Map.Entry<YarnApplicationState, List<ApplicationReport>> entry : applications.stream()
.collect(Collectors.groupingBy(ApplicationReport::getYarnApplicationState)).entrySet()) {
LOGGER.info(" * " + entry.getKey() + " : " + entry.getValue().size());
}
}
} catch (IOException | YarnException e) {
LOGGER.error("Error getting list of pending jobs from YARN", e);
}
}

private boolean configFromArgs(String[] args) {
int fixedSizeArgs = getFixedSizeArgs();

Expand Down Expand Up @@ -468,6 +498,8 @@ protected List<Path> concatMrOutputToLocal(Path mrOutdir, Path localOutput, bool
}
}
}
StopWatch stopWatch = new StopWatch();
stopWatch.start();
if (paths.isEmpty()) {
LOGGER.warn("The MapReduce job didn't produce any output. This may not be expected.");
} else if (paths.size() == 1) {
Expand Down Expand Up @@ -517,6 +549,7 @@ protected List<Path> concatMrOutputToLocal(Path mrOutdir, Path localOutput, bool
}
}
LOGGER.info("File size : " + humanReadableByteCount(Files.size(Paths.get(localOutput.toUri())), false));
LOGGER.info("Time to copy from HDFS and concat : " + TimeUtils.durationToString(stopWatch));
}
return paths;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,12 @@ public void run(Context context) throws IOException, InterruptedException {
// or if the chromosome changes
if (processedBytes > maxInputBytesPerProcess) {
LOG.info("Processed bytes = " + processedBytes + " > " + maxInputBytesPerProcess + ". Restarting process.");
restartProcess(context, "BYTES_LIMIT");
restartProcess(context, "bytes_limit");
} else if (!currentChromosome.equals(currentValue.getChromosome())) {
// TODO: Should we change only when the chromosome change would produce a partition change?
LOG.info("Chromosome changed from " + currentChromosome + " to " + currentValue.getChromosome()
+ ". Restarting process.");
restartProcess(context, "CHR_CHANGE");
restartProcess(context, "chr_change");
}
map(context.getCurrentKey(), currentValue, context);
} while (!hasExceptions() && context.nextKeyValue());
Expand Down Expand Up @@ -169,14 +169,14 @@ public void run(Context context) throws IOException, InterruptedException {
addException(th);
}
} else {
context.getCounter(COUNTER_GROUP_NAME, "EMPTY_INPUT_SPLIT").increment(1);
context.getCounter(COUNTER_GROUP_NAME, "empty_input_split").increment(1);
}
throwExceptionIfAny();
}

private void restartProcess(Mapper<Object, Variant, VariantLocusKey, Text>.Context context, String reason)
throws IOException, InterruptedException, StorageEngineException {
context.getCounter(COUNTER_GROUP_NAME, "RESTARTED_PROCESS_" + reason).increment(1);
context.getCounter(COUNTER_GROUP_NAME, "restarted_process_" + reason).increment(1);
closeProcess(context);
startProcess(context);
}
Expand Down Expand Up @@ -332,7 +332,7 @@ private void closeProcess(Context context) throws IOException, InterruptedExcept

private void startProcess(Context context) throws IOException, StorageEngineException, InterruptedException {
LOG.info("bash -ce '" + commandLine + "'");
context.getCounter(COUNTER_GROUP_NAME, "START_PROCESS").increment(1);
context.getCounter(COUNTER_GROUP_NAME, "start_process").increment(1);

Variant variant = context.getCurrentValue();
currentChromosome = variant.getChromosome();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,15 @@ protected void reduce(VariantLocusKey key, Iterable<Text> values,
context.getCounter(VariantsTableMapReduceHelper.COUNTER_GROUP_NAME, "body_records").increment(1);
}
context.getCounter(VariantsTableMapReduceHelper.COUNTER_GROUP_NAME, "stdout_records").increment(1);
} else {
context.getCounter(VariantsTableMapReduceHelper.COUNTER_GROUP_NAME, "stdout_records_bytes")
.increment(value.getLength());
} else if (key.getOther().startsWith(STDERR_KEY)) {
mos.write("stderr", key, value);
context.getCounter(VariantsTableMapReduceHelper.COUNTER_GROUP_NAME, "stderr_records").increment(1);
context.getCounter(VariantsTableMapReduceHelper.COUNTER_GROUP_NAME, "stderr_records_bytes")
.increment(value.getLength());
} else {
throw new IllegalStateException("Unknown key " + key);
}
context.getCounter(VariantsTableMapReduceHelper.COUNTER_GROUP_NAME, "records").increment(1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,21 +71,20 @@ public static boolean isSingleDigitChromosome(String chromosome) {

@Override
public void write(DataOutput out) throws IOException {
out.writeChars(chromosome);
out.writeChars("\n");
out.writeUTF(chromosome);
out.writeInt(position);
if (other != null) {
out.writeChars(other);
out.writeUTF(other);
} else {
out.writeChars("");
out.writeUTF("");
}
}

@Override
public void readFields(DataInput in) throws IOException {
chromosome = in.readLine();
chromosome = in.readUTF();
position = in.readInt();
other = in.readLine();
other = in.readUTF();
}

public String getChromosome() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.junit.experimental.categories.Category;
import org.opencb.opencga.core.testclassification.duration.ShortTests;

import java.io.*;
import java.util.Arrays;
import java.util.List;

Expand Down Expand Up @@ -149,4 +150,29 @@ public void shouldCompareChromosomesCorrectly() {
}
}
}

@Test
public void testWriteAndRead() throws IOException {
testWriteAndRead(new VariantLocusKey("1_random", 1000, "A"));
testWriteAndRead(new VariantLocusKey("1", 3541316, "O:31231"));
testWriteAndRead(new VariantLocusKey("0", 3541316, "O:31231"));
testWriteAndRead(new VariantLocusKey("", 3541316, ""));
testWriteAndRead(new VariantLocusKey("", -2, ""));
}

private static void testWriteAndRead(VariantLocusKey originalKey) throws IOException {
// Write the object to a byte array output stream
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
originalKey.write(dataOutputStream);

// Read the object from a byte array input stream
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream);
VariantLocusKey readKey = new VariantLocusKey();
readKey.readFields(dataInputStream);

// Assert that the read object is equal to the original object
assertEquals(originalKey, readKey);
}
}

0 comments on commit f6fd3d4

Please sign in to comment.