Skip to content

Commit

Permalink
add reference and header parquet writers
Browse files Browse the repository at this point in the history
  • Loading branch information
RoriCremer committed Jun 26, 2024
1 parent 7a9d105 commit e852dcf
Show file tree
Hide file tree
Showing 10 changed files with 596 additions and 224 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package org.broadinstitute.hellbender.tools.gvs.ingest;

import com.google.protobuf.Descriptors;
import htsjdk.samtools.SAMSequenceDictionary;
import htsjdk.variant.variantcontext.VariantContext;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.broadinstitute.hellbender.exceptions.UserException;
import org.broadinstitute.hellbender.tools.gvs.common.CommonCode;
import org.broadinstitute.hellbender.tools.gvs.common.GQStateEnum;
Expand All @@ -14,12 +19,16 @@
import org.broadinstitute.hellbender.utils.GenomeLocSortedSet;
import org.broadinstitute.hellbender.utils.SimpleInterval;
import org.broadinstitute.hellbender.utils.gvs.bigquery.BigQueryUtils;
import org.broadinstitute.hellbender.utils.gvs.parquet.GvsReferenceParquetFileWriter;
import org.broadinstitute.hellbender.utils.gvs.parquet.GvsVariantParquetFileWriter;
import org.json.JSONObject;

import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;


public final class RefCreator {
Expand All @@ -31,6 +40,7 @@ public final class RefCreator {

private final boolean writeReferenceRanges;
private final Long sampleId;
private GvsReferenceParquetFileWriter refRangesParquetFileWriter = null;
private SimpleInterval previousInterval;
private final Set<GQStateEnum> gqStatesToIgnore;
private final GenomeLocSortedSet coverageLocSortedSet;
Expand All @@ -43,7 +53,7 @@ public static boolean doRowsExistFor(CommonCode.OutputType outputType, String pr
return BigQueryUtils.doRowsExistFor(projectId, datasetName, REF_RANGES_FILETYPE_PREFIX + tableNumber, SchemaUtils.SAMPLE_ID_FIELD_NAME, sampleId);
}

public RefCreator(String sampleIdentifierForOutputFileName, Long sampleId, String tableNumber, SAMSequenceDictionary seqDictionary, Set<GQStateEnum> gqStatesToIgnore, final File outputDirectory, final CommonCode.OutputType outputType, final boolean writeReferenceRanges, final String projectId, final String datasetName, final boolean storeCompressedReferences) {
public RefCreator(String sampleIdentifierForOutputFileName, Long sampleId, String tableNumber, SAMSequenceDictionary seqDictionary, Set<GQStateEnum> gqStatesToIgnore, final File outputDirectory, final CommonCode.OutputType outputType, final boolean writeReferenceRanges, final String projectId, final String datasetName, final boolean storeCompressedReferences, final MessageType parquetSchema) {
this.sampleId = sampleId;
this.outputType = outputType;
this.writeReferenceRanges = writeReferenceRanges;
Expand All @@ -65,11 +75,16 @@ public RefCreator(String sampleIdentifierForOutputFileName, Long sampleId, Strin
case TSV:
refRangesWriter = new RefRangesTsvWriter(refOutputFile.getCanonicalPath());
break;
case AVRO:
case AVRO: // when do we use this/!?!
refRangesWriter = new RefRangesAvroWriter(refOutputFile.getCanonicalPath());
break;
case PARQUET:
refRangesParquetFileWriter = new GvsReferenceParquetFileWriter(new Path(refOutputFile.toURI()), parquetSchema, false, CompressionCodecName.SNAPPY);
break;
}
}
} catch (final FileAlreadyExistsException fs) {
throw new UserException("This reference parquet file already exists", fs);
} catch (final IOException ioex) {
throw new UserException("Could not create reference range outputs", ioex);
}
Expand Down Expand Up @@ -104,20 +119,44 @@ public void apply(VariantContext variant, List<GenomeLoc> intervalsToWrite) thro
int localStart = start;
while ( localStart <= end ) {
int length = Math.min(end - localStart + 1, IngestConstants.MAX_REFERENCE_BLOCK_BASES);
if (storeCompressedReferences) {
refRangesWriter.writeCompressed(
SchemaUtils.encodeCompressedRefBlock(variantChr, localStart, length,
switch(outputType) {
case BQ:
try {
if (storeCompressedReferences) {
refRangesWriter.writeCompressed(
SchemaUtils.encodeCompressedRefBlock(variantChr, localStart, length,
getGQStateEnum(variant.getGenotype(0).getGQ()).getCompressedValue()),
sampleId
);
} else {
refRangesWriter.write(SchemaUtils.encodeLocation(variantChr, localStart),
sampleId,
length,
getGQStateEnum(variant.getGenotype(0).getGQ()).getValue()
);
}
} catch (IOException ex) {
throw new IOException("BQ exception", ex);
}
break;
case PARQUET:
if (storeCompressedReferences) {
JSONObject record = GvsReferenceParquetFileWriter.writeCompressed(
SchemaUtils.encodeCompressedRefBlock(variantChr, localStart, length,
getGQStateEnum(variant.getGenotype(0).getGQ()).getCompressedValue()),
sampleId
);
} else {
refRangesWriter.write(SchemaUtils.encodeLocation(variantChr, localStart),
sampleId,
length,
getGQStateEnum(variant.getGenotype(0).getGQ()).getValue()
);
sampleId
);
refRangesParquetFileWriter.write(record);
} else {
JSONObject record = GvsReferenceParquetFileWriter.writeJson(SchemaUtils.encodeLocation(variantChr, localStart), sampleId, length, getGQStateEnum(variant.getGenotype(0).getGQ()).getValue());
refRangesParquetFileWriter.write(record);
}
break;

}



localStart = localStart + length ;
}

Expand Down Expand Up @@ -267,6 +306,13 @@ public void commitData() {
if (writeReferenceRanges && refRangesWriter != null) {
refRangesWriter.commitData();
}
} else if (outputType == CommonCode.OutputType.PARQUET && refRangesParquetFileWriter != null) {
try {
refRangesParquetFileWriter.close();
} catch (IOException exception) {
System.out.println("ERROR CLOSING PARQUET FILE: ");
exception.printStackTrace();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,44 @@
package org.broadinstitute.hellbender.tools.gvs.ingest;

import com.google.protobuf.Descriptors;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.broadinstitute.hellbender.exceptions.UserException;
import org.broadinstitute.hellbender.tools.gvs.common.CommonCode;
import org.broadinstitute.hellbender.tools.gvs.common.IngestConstants;
import org.broadinstitute.hellbender.tools.gvs.common.SchemaUtils;
import org.broadinstitute.hellbender.utils.Utils;
import org.broadinstitute.hellbender.utils.gvs.bigquery.BigQueryUtils;
import org.broadinstitute.hellbender.utils.gvs.bigquery.PendingBQWriter;
import org.broadinstitute.hellbender.utils.gvs.parquet.GvsHeaderParquetFileWriter;
import org.broadinstitute.hellbender.utils.gvs.parquet.GvsReferenceParquetFileWriter;
import org.broadinstitute.hellbender.utils.gvs.parquet.GvsVariantParquetFileWriter;
import org.broadinstitute.hellbender.utils.tsv.SimpleXSVWriter;
import org.json.JSONObject;

import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.broadinstitute.hellbender.tools.gvs.common.CommonCode;



public class VcfHeaderLineScratchCreator {
private final CommonCode.OutputType outputType;
private final Long sampleId;
private final String projectId;
private final String datasetName;

private PendingBQWriter vcfHeaderBQJsonWriter = null;
private GvsHeaderParquetFileWriter vcfHeaderParquetFileWriter = null;
private static final String NON_SCRATCH_TABLE_NAME = "vcf_header_lines";
private static final String SCRATCH_TABLE_NAME = "vcf_header_lines_scratch";

private static final String HEADER_FILETYPE_PREFIX = "header_file";


public static boolean doScratchRowsExistFor(String projectId, String datasetName, Long sampleId) {
return BigQueryUtils.doRowsExistFor(projectId, datasetName, "vcf_header_lines_scratch", "sample_id", sampleId);
}
Expand All @@ -36,16 +55,34 @@ private static boolean doNonScratchRowsExistFor(String projectId, String dataset
return BigQueryUtils.doRowsExistFor(projectId, datasetName, NON_SCRATCH_TABLE_NAME, "vcf_header_lines_hash", headerLineHash);
}

public VcfHeaderLineScratchCreator(Long sampleId, String projectId, String datasetName) {
public VcfHeaderLineScratchCreator(Long sampleId, String projectId, String datasetName, File outputDirectory, CommonCode.OutputType outputType, MessageType headersRowSchema) {
try {
this.sampleId = sampleId;
this.outputType = outputType;
this.projectId = projectId;
this.datasetName = datasetName;

// String PREFIX_SEPARATOR = "_"; // TODO should this be moved to a common place?

if (projectId == null || datasetName == null) {
throw new UserException("Must specify project-id and dataset-name.");
}
vcfHeaderBQJsonWriter = new PendingBQWriter(projectId, datasetName, SCRATCH_TABLE_NAME);

switch (outputType) {

case BQ:
if (projectId == null || datasetName == null) {
throw new UserException("Must specify project-id and dataset-name when using BQ output mode.");
}
vcfHeaderBQJsonWriter = new PendingBQWriter(projectId, datasetName, SCRATCH_TABLE_NAME);
break;
case PARQUET:
// TODO ensure that there doesn't need to be a table_number or sampleIdentifierForOutputFileName--it's all tables/samples, yes?
final File parquetOutputFile = new File(outputDirectory, HEADER_FILETYPE_PREFIX + ".parquet");
vcfHeaderParquetFileWriter = new GvsHeaderParquetFileWriter(new Path(parquetOutputFile.toURI()), headersRowSchema, false, CompressionCodecName.SNAPPY);
break;

}
}
catch (Exception e) {
throw new UserException("Could not create VCF Header Scratch Table Writer", e);
Expand All @@ -55,21 +92,41 @@ public VcfHeaderLineScratchCreator(Long sampleId, String projectId, String datas

public void apply(Map<String, Boolean> allLineHeaders) throws IOException {
for (final Map.Entry<String, Boolean> headerChunk : allLineHeaders.entrySet()) {
try {
// if this header chunk has already been added to the scratch table, only add an association between the
// sample_id and the hash, no need to rewrite the header chunk to the DB
String chunkHash = Utils.calcMD5(headerChunk.getKey());
Boolean isExpectedUnique = headerChunk.getValue();
boolean vcfScratchHeaderRowsExist = doScratchRowsExistFor(this.projectId, this.datasetName, chunkHash);
boolean vcfNonScratchHeaderRowsExist = doNonScratchRowsExistFor(this.projectId, this.datasetName, chunkHash);
if (vcfScratchHeaderRowsExist || vcfNonScratchHeaderRowsExist) {
vcfHeaderBQJsonWriter.addJsonRow(createJson(this.sampleId, null, chunkHash, isExpectedUnique));
}
else {
vcfHeaderBQJsonWriter.addJsonRow(createJson(this.sampleId, headerChunk.getKey(), chunkHash, isExpectedUnique));
}
} catch (Descriptors.DescriptorValidationException | ExecutionException | InterruptedException ex) {
throw new IOException("BQ exception", ex);
switch (outputType) {
case BQ:
try {
// if this header chunk has already been added to the scratch table, only add an association between the
// sample_id and the hash, no need to rewrite the header chunk to the DB
String chunkHash = Utils.calcMD5(headerChunk.getKey());
Boolean isExpectedUnique = headerChunk.getValue();
boolean vcfScratchHeaderRowsExist = doScratchRowsExistFor(this.projectId, this.datasetName, chunkHash);
boolean vcfNonScratchHeaderRowsExist = doNonScratchRowsExistFor(this.projectId, this.datasetName, chunkHash);
if (vcfScratchHeaderRowsExist || vcfNonScratchHeaderRowsExist) {
vcfHeaderBQJsonWriter.addJsonRow(createJson(this.sampleId, null, chunkHash, isExpectedUnique));
}
else {
vcfHeaderBQJsonWriter.addJsonRow(createJson(this.sampleId, headerChunk.getKey(), chunkHash, isExpectedUnique));
}
} catch (Descriptors.DescriptorValidationException | ExecutionException | InterruptedException ex) {
throw new IOException("BQ exception", ex);
}
break;
case PARQUET:
String chunkHash = Utils.calcMD5(headerChunk.getKey());
Boolean isExpectedUnique = headerChunk.getValue();
JSONObject record = vcfHeaderParquetFileWriter.writeJson(this.sampleId, chunkHash);
vcfHeaderParquetFileWriter.write(record);

//boolean vcfScratchHeaderRowsExist = doScratchRowsExistFor(this.projectId, this.datasetName, chunkHash);
//boolean vcfNonScratchHeaderRowsExist = doNonScratchRowsExistFor(this.projectId, this.datasetName, chunkHash);
// why is there no isExpectedUnique check here?
//if (vcfScratchHeaderRowsExist || vcfNonScratchHeaderRowsExist) {
//vcfHeaderParquetFileWriter.writeJson(this.sampleId, chunkHash);
//}
//else {
//vcfHeaderParquetFileWriter.writeJson(this.sampleId, chunkHash);
//}
break;
}
}
}
Expand All @@ -87,9 +144,16 @@ public JSONObject createJson(Long sampleId, String headerChunk, String headerHas
}

public void commitData() {
if (vcfHeaderBQJsonWriter != null) {
if (outputType == CommonCode.OutputType.BQ && vcfHeaderBQJsonWriter != null) {
vcfHeaderBQJsonWriter.flushBuffer();
vcfHeaderBQJsonWriter.commitWriteStreams();
} else if (outputType == CommonCode.OutputType.PARQUET && vcfHeaderParquetFileWriter != null) {
try {
vcfHeaderParquetFileWriter.close();
} catch (IOException exception) {
System.out.println("ERROR CLOSING PARQUET FILE: ");
exception.printStackTrace();
}
}
}

Expand All @@ -105,5 +169,4 @@ public void closeTool() {
vcfHeaderBQJsonWriter.close();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.protobuf.Descriptors;
import htsjdk.variant.variantcontext.VariantContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
Expand Down Expand Up @@ -68,6 +69,8 @@ public VetCreator(String sampleIdentifierForOutputFileName, Long sampleId, Strin
vetParquetFileWriter = new GvsVariantParquetFileWriter(new Path(parquetOutputFile.toURI()), parquetSchema, false, CompressionCodecName.SNAPPY);
break;
}
} catch (final FileAlreadyExistsException fs) {
throw new UserException("This variants parquet file already exists", fs);
} catch (final IOException ioex) {
throw new UserException("Could not create vet outputs", ioex);
}
Expand Down Expand Up @@ -149,8 +152,6 @@ public void commitData() {
exception.printStackTrace();
}
}


}

public void closeTool() {
Expand Down
Loading

0 comments on commit e852dcf

Please sign in to comment.