diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/utils/AbstractHBaseDriver.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/utils/AbstractHBaseDriver.java index 685ece6c45..8fbd131b72 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/utils/AbstractHBaseDriver.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/utils/AbstractHBaseDriver.java @@ -7,16 +7,21 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.compress.*; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -44,8 +49,6 @@ import java.util.*; import java.util.function.Supplier; import java.util.stream.Collectors; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; import static org.opencb.opencga.core.common.IOUtils.humanReadableByteCount; import static org.opencb.opencga.storage.hadoop.variant.HadoopVariantStorageOptions.MR_EXECUTOR_SSH_PASSWORD; @@ -196,6 +199,11 @@ public final int run(String[] args) throws Exception { LOGGER.info(" - OutputTable : " + job.getConfiguration().get(TableOutputFormat.OUTPUT_TABLE)); } else if (StringUtils.isNotEmpty(job.getConfiguration().get(FileOutputFormat.OUTDIR))) { LOGGER.info(" - Outdir : " + job.getConfiguration().get(FileOutputFormat.OUTDIR)); + + if (TextOutputFormat.getCompressOutput(job)) { + Class compressorClass = TextOutputFormat.getOutputCompressorClass(job, GzipCodec.class); + LOGGER.info(" - Compress : " + compressorClass.getName()); + } } LOGGER.info("================================================="); reportRunningJobs(); @@ -431,18 +439,18 @@ public class MapReduceOutputFile { public MapReduceOutputFile(String tempFilePrefix) throws IOException { this.nameGenerator = () -> null; this.tempFilePrefix = tempFilePrefix; - getOutputPath(); + initOutputPath(); namedOutput = null; } public MapReduceOutputFile(Supplier nameGenerator, String tempFilePrefix) throws IOException { this.nameGenerator = nameGenerator; this.tempFilePrefix = tempFilePrefix; - getOutputPath(); + initOutputPath(); namedOutput = null; } - protected void getOutputPath() throws IOException { + private void initOutputPath() throws IOException { String outdirStr = getParam(OUTPUT_PARAM); if (StringUtils.isNotEmpty(outdirStr)) { outdir = new Path(outdirStr); @@ -452,7 +460,7 @@ protected void getOutputPath() throws IOException { outdir = getTempOutdir(tempFilePrefix, localOutput.getName()); outdir.getFileSystem(getConf()).deleteOnExit(outdir); } - if (localOutput != null) { + if (hasTempOutput()) { LOGGER.info(" * Output file : " + toUri(localOutput)); LOGGER.info(" * Temporary outdir : " + toUri(outdir)); } else { @@ -463,15 +471,19 @@ protected void getOutputPath() throws IOException { public void postExecute(boolean succeed) throws IOException { if (succeed) { - if (localOutput != null) { + if (hasTempOutput()) { getConcatMrOutputToLocal(); } } - if (localOutput != null) { + if (hasTempOutput()) { deleteTemporaryFile(outdir); } } + public boolean hasTempOutput() { + return localOutput != null; + } + public MapReduceOutputFile setNamedOutput(String partFilePrefix) { this.namedOutput = partFilePrefix; return this; @@ -556,27 +568,20 @@ protected List concatMrOutputToLocal(Path mrOutdir, Path localOutput, bool fileSystem.copyToLocalFile(false, paths.get(0), localOutput); } else { LOGGER.info("Concat and copy to local : " + paths.size() + " partial files"); - LOGGER.info(" Source : " + mrOutdir.toUri()); - LOGGER.info(" Target : " + localOutput.toUri()); + LOGGER.info(" Source {}: {}", getCompression(paths.get(0).getName()), mrOutdir.toUri()); + LOGGER.info(" Target {}: {}", getCompression(localOutput.getName()), localOutput.toUri()); LOGGER.info(" ---- "); - boolean isGzip = paths.get(0).getName().endsWith(".gz"); - try (FSDataOutputStream fsOs = localOutput.getFileSystem(getConf()).create(localOutput); - OutputStream gzOs = isGzip ? new GZIPOutputStream(fsOs) : null) { - OutputStream os = gzOs == null ? fsOs : gzOs; + + try (OutputStream os = getOutputStreamPlain(localOutput.getName(), localOutput.getFileSystem(getConf()).create(localOutput))) { for (int i = 0; i < paths.size(); i++) { Path path = paths.get(i); - LOGGER.info("[{}] Concat {} : '{}' ({}) ", + LOGGER.info("[{}] Concat {} file : '{}' ({}) ", i, - isGzip ? "gzip file" : "file", + getCompression(path.getName()), path.toUri(), humanReadableByteCount(fileSystem.getFileStatus(path).getLen(), false)); - try (FSDataInputStream fsIs = fileSystem.open(path)) { - InputStream is; - if (isGzip) { - is = new GZIPInputStream(fsIs); - } else { - is = fsIs; - } + try (InputStream isAux = getInputStream(path.getName(), fileSystem.open(path))) { + InputStream is = isAux; // Remove extra headers from all files but the first if (removeExtraHeaders && i != 0) { BufferedReader br = new BufferedReader(new InputStreamReader(is)); @@ -600,6 +605,59 @@ protected List concatMrOutputToLocal(Path mrOutdir, Path localOutput, bool return paths; } + private static String getCompression(String name) throws IOException { + if (name.endsWith(".gz")) { + return "gzip"; + } else if (name.endsWith(".snappy")) { + return "snappy"; + } else if (name.endsWith(".lz4")) { + return "lz4"; + } else if (name.endsWith(".zst")) { + return "ztandard"; + } else { + return "plain"; + } + } + + private OutputStream getOutputStreamPlain(String name, OutputStream fsOs) throws IOException { + CompressionCodec codec = getCompressionCodec(name); + if (codec == null) { + return fsOs; + } + return codec.createOutputStream(fsOs); + } + + private CompressionCodec getCompressionCodec(String name) throws IOException { + Class codecClass; + switch (getCompression(name)) { + case "gzip": + codecClass = GzipCodec.class; + break; + case "snappy": + codecClass = SnappyCodec.class; + break; + case "lz4": + codecClass = Lz4Codec.class; + break; + case "ztandard": + codecClass = ZStandardCodec.class; + break; + case "plain": + return null; + default: + throw new IOException("Unknown compression codec for file " + name); + } + return ReflectionUtils.newInstance(codecClass, getConf()); + } + + private InputStream getInputStream(String name, InputStream is) throws IOException { + CompressionCodec codec = getCompressionCodec(name); + if (codec == null) { + return is; + } + return codec.createInputStream(is); + } + protected final int getServersSize(String table) throws IOException { int serversSize; try (HBaseManager hBaseManager = new HBaseManager(getConf())) { diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/io/VariantExporterDriver.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/io/VariantExporterDriver.java index 7b1e96f22e..93a75006fb 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/io/VariantExporterDriver.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/io/VariantExporterDriver.java @@ -140,10 +140,12 @@ protected void setupJob(Job job) throws IOException { LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); outputFormatClass = LazyOutputFormat.class; } - if (outputFormat.isGzip()) { - FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); // compression - } else if (outputFormat.isSnappy()) { - FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class); // compression + if (SnappyCodec.isNativeCodeLoaded()) { + FileOutputFormat.setCompressOutput(job, true); + FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class); + } else { + FileOutputFormat.setCompressOutput(job, true); + FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); } job.getConfiguration().set(VariantFileOutputFormat.VARIANT_OUTPUT_FORMAT, outputFormat.name()); job.setOutputKeyClass(Variant.class); diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/mr/StreamVariantDriver.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/mr/StreamVariantDriver.java index 0985b6c0f6..91ac57391d 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/mr/StreamVariantDriver.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/mr/StreamVariantDriver.java @@ -5,11 +5,13 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DeflateCodec; import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.io.compress.SnappyCodec; import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; @@ -162,9 +164,13 @@ protected void setupJob(Job job) throws IOException { outputFormatClass = LazyOutputFormat.class; job.setOutputFormatClass(ValueOnlyTextOutputFormat.class); - TextOutputFormat.setCompressOutput(job, true); - TextOutputFormat.setOutputCompressorClass(job, GzipCodec.class); -// TextOutputFormat.setOutputCompressorClass(job, DeflateCodec.class); + if (SnappyCodec.isNativeCodeLoaded()) { + FileOutputFormat.setCompressOutput(job, true); + FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class); + } else { + FileOutputFormat.setCompressOutput(job, true); + FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); + } job.setOutputKeyClass(keyClass); job.setOutputValueClass(valueClass); }