Skip to content

Commit

Permalink
storage: Disable flush on AbfsOutputStream. HADOOP-16548 #TASK-6722
Browse files Browse the repository at this point in the history
  • Loading branch information
j-coll committed Nov 11, 2024
1 parent ad3521e commit ab50d6e
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public void write(byte[] b, int off, int len) throws IOException {
count += len;
}

@Override
public void close() throws IOException {
out.close();
}

public long getByteCount() {
return count;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package org.opencb.opencga.storage.hadoop.variant.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
Expand All @@ -37,6 +39,7 @@
import org.opencb.opencga.storage.hadoop.variant.metadata.HBaseVariantStorageMetadataDBAdaptorFactory;

import java.io.DataOutputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;

Expand Down Expand Up @@ -69,7 +72,19 @@ public RecordWriter<Variant, NullWritable> getRecordWriter(TaskAttemptContext jo
}
Path file = this.getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
OutputStream out = fs.create(file, false);
FSDataOutputStream fsOs = fs.create(file, false);
OutputStream out;
if (fsOs.getWrappedStream() instanceof AbfsOutputStream) {
// Disable flush on ABFS. See HADOOP-16548
out = new FilterOutputStream(fsOs) {
@Override
public void flush() throws IOException {
// Do nothing
}
};
} else {
out = fsOs;
}
if (isCompressed) {
out = new DataOutputStream(codec.createOutputStream(out));
}
Expand Down

0 comments on commit ab50d6e

Please sign in to comment.