Skip to content

Commit

Permalink
storage: Use SNAPPY as intermediate compression algorithm. #TASK-6722
Browse files Browse the repository at this point in the history
  • Loading branch information
j-coll committed Nov 8, 2024
1 parent b52ca27 commit ad3521e
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,21 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.ApplicationId;
Expand Down Expand Up @@ -44,8 +49,6 @@
import java.util.*;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import static org.opencb.opencga.core.common.IOUtils.humanReadableByteCount;
import static org.opencb.opencga.storage.hadoop.variant.HadoopVariantStorageOptions.MR_EXECUTOR_SSH_PASSWORD;
Expand Down Expand Up @@ -196,6 +199,11 @@ public final int run(String[] args) throws Exception {
LOGGER.info(" - OutputTable : " + job.getConfiguration().get(TableOutputFormat.OUTPUT_TABLE));
} else if (StringUtils.isNotEmpty(job.getConfiguration().get(FileOutputFormat.OUTDIR))) {
LOGGER.info(" - Outdir : " + job.getConfiguration().get(FileOutputFormat.OUTDIR));

if (TextOutputFormat.getCompressOutput(job)) {
Class<? extends CompressionCodec> compressorClass = TextOutputFormat.getOutputCompressorClass(job, GzipCodec.class);
LOGGER.info(" - Compress : " + compressorClass.getName());
}
}
LOGGER.info("=================================================");
reportRunningJobs();
Expand Down Expand Up @@ -431,18 +439,18 @@ public class MapReduceOutputFile {
public MapReduceOutputFile(String tempFilePrefix) throws IOException {
this.nameGenerator = () -> null;
this.tempFilePrefix = tempFilePrefix;
getOutputPath();
initOutputPath();
namedOutput = null;
}

public MapReduceOutputFile(Supplier<String> nameGenerator, String tempFilePrefix) throws IOException {
this.nameGenerator = nameGenerator;
this.tempFilePrefix = tempFilePrefix;
getOutputPath();
initOutputPath();
namedOutput = null;
}

protected void getOutputPath() throws IOException {
private void initOutputPath() throws IOException {
String outdirStr = getParam(OUTPUT_PARAM);
if (StringUtils.isNotEmpty(outdirStr)) {
outdir = new Path(outdirStr);
Expand All @@ -452,7 +460,7 @@ protected void getOutputPath() throws IOException {
outdir = getTempOutdir(tempFilePrefix, localOutput.getName());
outdir.getFileSystem(getConf()).deleteOnExit(outdir);
}
if (localOutput != null) {
if (hasTempOutput()) {
LOGGER.info(" * Output file : " + toUri(localOutput));
LOGGER.info(" * Temporary outdir : " + toUri(outdir));
} else {
Expand All @@ -463,15 +471,19 @@ protected void getOutputPath() throws IOException {

public void postExecute(boolean succeed) throws IOException {
if (succeed) {
if (localOutput != null) {
if (hasTempOutput()) {
getConcatMrOutputToLocal();
}
}
if (localOutput != null) {
if (hasTempOutput()) {
deleteTemporaryFile(outdir);
}
}

public boolean hasTempOutput() {
return localOutput != null;
}

public MapReduceOutputFile setNamedOutput(String partFilePrefix) {
this.namedOutput = partFilePrefix;
return this;
Expand Down Expand Up @@ -556,27 +568,20 @@ protected List<Path> concatMrOutputToLocal(Path mrOutdir, Path localOutput, bool
fileSystem.copyToLocalFile(false, paths.get(0), localOutput);
} else {
LOGGER.info("Concat and copy to local : " + paths.size() + " partial files");
LOGGER.info(" Source : " + mrOutdir.toUri());
LOGGER.info(" Target : " + localOutput.toUri());
LOGGER.info(" Source {}: {}", getCompression(paths.get(0).getName()), mrOutdir.toUri());
LOGGER.info(" Target {}: {}", getCompression(localOutput.getName()), localOutput.toUri());
LOGGER.info(" ---- ");
boolean isGzip = paths.get(0).getName().endsWith(".gz");
try (FSDataOutputStream fsOs = localOutput.getFileSystem(getConf()).create(localOutput);
OutputStream gzOs = isGzip ? new GZIPOutputStream(fsOs) : null) {
OutputStream os = gzOs == null ? fsOs : gzOs;

try (OutputStream os = getOutputStreamPlain(localOutput.getName(), localOutput.getFileSystem(getConf()).create(localOutput))) {
for (int i = 0; i < paths.size(); i++) {
Path path = paths.get(i);
LOGGER.info("[{}] Concat {} : '{}' ({}) ",
LOGGER.info("[{}] Concat {} file : '{}' ({}) ",
i,
isGzip ? "gzip file" : "file",
getCompression(path.getName()),
path.toUri(),
humanReadableByteCount(fileSystem.getFileStatus(path).getLen(), false));
try (FSDataInputStream fsIs = fileSystem.open(path)) {
InputStream is;
if (isGzip) {
is = new GZIPInputStream(fsIs);
} else {
is = fsIs;
}
try (InputStream isAux = getInputStream(path.getName(), fileSystem.open(path))) {
InputStream is = isAux;
// Remove extra headers from all files but the first
if (removeExtraHeaders && i != 0) {
BufferedReader br = new BufferedReader(new InputStreamReader(is));
Expand All @@ -600,6 +605,59 @@ protected List<Path> concatMrOutputToLocal(Path mrOutdir, Path localOutput, bool
return paths;
}

private static String getCompression(String name) throws IOException {
if (name.endsWith(".gz")) {
return "gzip";
} else if (name.endsWith(".snappy")) {
return "snappy";
} else if (name.endsWith(".lz4")) {
return "lz4";
} else if (name.endsWith(".zst")) {
return "ztandard";
} else {
return "plain";
}
}

private OutputStream getOutputStreamPlain(String name, OutputStream fsOs) throws IOException {
CompressionCodec codec = getCompressionCodec(name);
if (codec == null) {
return fsOs;
}
return codec.createOutputStream(fsOs);
}

private CompressionCodec getCompressionCodec(String name) throws IOException {
Class<? extends CompressionCodec> codecClass;
switch (getCompression(name)) {
case "gzip":
codecClass = GzipCodec.class;
break;
case "snappy":
codecClass = SnappyCodec.class;
break;
case "lz4":
codecClass = Lz4Codec.class;
break;
case "ztandard":
codecClass = ZStandardCodec.class;
break;
case "plain":
return null;
default:
throw new IOException("Unknown compression codec for file " + name);
}
return ReflectionUtils.newInstance(codecClass, getConf());
}

private InputStream getInputStream(String name, InputStream is) throws IOException {
CompressionCodec codec = getCompressionCodec(name);
if (codec == null) {
return is;
}
return codec.createInputStream(is);
}

protected final int getServersSize(String table) throws IOException {
int serversSize;
try (HBaseManager hBaseManager = new HBaseManager(getConf())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,12 @@ protected void setupJob(Job job) throws IOException {
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
outputFormatClass = LazyOutputFormat.class;
}
if (outputFormat.isGzip()) {
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); // compression
} else if (outputFormat.isSnappy()) {
FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class); // compression
if (SnappyCodec.isNativeCodeLoaded()) {
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class);
} else {
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
}
job.getConfiguration().set(VariantFileOutputFormat.VARIANT_OUTPUT_FORMAT, outputFormat.name());
job.setOutputKeyClass(Variant.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DeflateCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
Expand Down Expand Up @@ -162,9 +164,13 @@ protected void setupJob(Job job) throws IOException {
outputFormatClass = LazyOutputFormat.class;

job.setOutputFormatClass(ValueOnlyTextOutputFormat.class);
TextOutputFormat.setCompressOutput(job, true);
TextOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
// TextOutputFormat.setOutputCompressorClass(job, DeflateCodec.class);
if (SnappyCodec.isNativeCodeLoaded()) {
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class);
} else {
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
}
job.setOutputKeyClass(keyClass);
job.setOutputValueClass(valueClass);
}
Expand Down

0 comments on commit ad3521e

Please sign in to comment.