Skip to content

Commit

Permalink
add better error for already written files/duplicate names
Browse files Browse the repository at this point in the history
  • Loading branch information
RoriCremer committed Jun 17, 2024
1 parent ec9e551 commit de9c85c
Show file tree
Hide file tree
Showing 8 changed files with 261 additions and 176 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.protobuf.Descriptors;
import htsjdk.samtools.SAMSequenceDictionary;
import htsjdk.variant.variantcontext.VariantContext;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -82,6 +83,8 @@ public RefCreator(String sampleIdentifierForOutputFileName, Long sampleId, Strin
break;
}
}
} catch (final FileAlreadyExistsException fs) {
throw new UserException("This reference parquet file already exists", fs);
} catch (final IOException ioex) {
throw new UserException("Could not create reference range outputs", ioex);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
package org.broadinstitute.hellbender.tools.gvs.ingest;

import com.google.protobuf.Descriptors;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.broadinstitute.hellbender.exceptions.UserException;
import org.broadinstitute.hellbender.tools.gvs.common.CommonCode;
import org.broadinstitute.hellbender.tools.gvs.common.IngestConstants;
import org.broadinstitute.hellbender.utils.Utils;
import org.broadinstitute.hellbender.utils.gvs.bigquery.BigQueryUtils;
import org.broadinstitute.hellbender.utils.gvs.bigquery.PendingBQWriter;
import org.broadinstitute.hellbender.utils.gvs.parquet.GvsHeaderParquetFileWriter;
import org.broadinstitute.hellbender.utils.gvs.parquet.GvsVariantParquetFileWriter;
import org.broadinstitute.hellbender.utils.tsv.SimpleXSVWriter;
import org.json.JSONObject;

import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutionException;
Expand All @@ -17,9 +26,13 @@ public class VcfHeaderLineScratchCreator {
private final String datasetName;

private PendingBQWriter vcfHeaderBQJsonWriter = null;
private GvsHeaderParquetFileWriter vcfHeaderParquetFileWriter = null;
private static final String NON_SCRATCH_TABLE_NAME = "vcf_header_lines";
private static final String SCRATCH_TABLE_NAME = "vcf_header_lines_scratch";

private static final String HEADER_FILETYPE_PREFIX = "header_";


public static boolean doScratchRowsExistFor(String projectId, String datasetName, Long sampleId) {
return BigQueryUtils.doRowsExistFor(projectId, datasetName, "vcf_header_lines_scratch", "sample_id", sampleId);
}
Expand All @@ -36,16 +49,33 @@ private static boolean doNonScratchRowsExistFor(String projectId, String dataset
return BigQueryUtils.doRowsExistFor(projectId, datasetName, NON_SCRATCH_TABLE_NAME, "vcf_header_lines_hash", headerLineHash);
}

public VcfHeaderLineScratchCreator(Long sampleId, String projectId, String datasetName) {
public VcfHeaderLineScratchCreator(Long sampleId, String projectId, String datasetName, File outputDirectory, CommonCode.OutputType outputType, MessageType headersRowSchema) {
try {
this.sampleId = sampleId;
this.projectId = projectId;
this.datasetName = datasetName;

String PREFIX_SEPARATOR = "_"; // TODO should this be moved to a common place?

if (projectId == null || datasetName == null) {
throw new UserException("Must specify project-id and dataset-name.");
}
vcfHeaderBQJsonWriter = new PendingBQWriter(projectId, datasetName, SCRATCH_TABLE_NAME);

switch (outputType) {

case BQ:
if (projectId == null || datasetName == null) {
throw new UserException("Must specify project-id and dataset-name when using BQ output mode.");
}
vcfHeaderBQJsonWriter = new PendingBQWriter(projectId, datasetName, SCRATCH_TABLE_NAME);
break;
case PARQUET:
// TODO ensure that there doesn't need to be a table_number or sampleIdentifierForOutputFileName--it's all tables/samples, yes?
final File parquetOutputFile = new File(outputDirectory, HEADER_FILETYPE_PREFIX + ".parquet");
vcfHeaderParquetFileWriter = new GvsHeaderParquetFileWriter(new Path(parquetOutputFile.toURI()), headersRowSchema, false, CompressionCodecName.SNAPPY);
break;

}
}
catch (Exception e) {
throw new UserException("Could not create VCF Header Scratch Table Writer", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.protobuf.Descriptors;
import htsjdk.variant.variantcontext.VariantContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
Expand Down Expand Up @@ -68,6 +69,8 @@ public VetCreator(String sampleIdentifierForOutputFileName, Long sampleId, Strin
vetParquetFileWriter = new GvsVariantParquetFileWriter(new Path(parquetOutputFile.toURI()), parquetSchema, false, CompressionCodecName.SNAPPY);
break;
}
} catch (final FileAlreadyExistsException fs) {
throw new UserException("This variants parquet file already exists", fs);
} catch (final IOException ioex) {
throw new UserException("Could not create vet outputs", ioex);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package org.broadinstitute.hellbender.utils.gvs.parquet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.schema.MessageType;
import org.json.JSONObject;

import java.io.IOException;

public class GvsHeaderParquetFileWriter extends ParquetWriter<JSONObject> {

/**
* This is very deprecated, and we'll need to figure out how to do this from a builder once it works!
* @param file
* @param schema
* @param enableDictionary
* @param codecName
* @throws IOException
*/
public GvsHeaderParquetFileWriter(
Path file,
MessageType schema,
boolean enableDictionary,
CompressionCodecName codecName
) throws FileAlreadyExistsException, IOException {
super(file, new GvsReferenceWriteSupport(schema), codecName, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE, enableDictionary, false);
}

GvsHeaderParquetFileWriter(
Path file,
GvsVariantWriteSupport writeSupport,
CompressionCodecName compressionCodecName,
int blockSize,
int pageSize,
boolean enableDictionary,
boolean enableValidation,
ParquetProperties.WriterVersion writerVersion,
Configuration conf)
throws IOException {
super(
file,
writeSupport,
compressionCodecName,
blockSize,
pageSize,
pageSize,
enableDictionary,
enableValidation,
writerVersion,
conf);
}

public static JSONObject writeJson(Long sampleId, String headerLineHash) {
JSONObject record = new JSONObject();
record.put("sample_id", sampleId);
record.put("headerLineHash", headerLineHash);
return record;
}

public static class Builder extends ParquetWriter.Builder<JSONObject, Builder> {
private MessageType schema = null;

private Builder(Path file) {
super(file);
}

private Builder(OutputFile file) {
super(file);
}

public Builder withType(MessageType type) {
this.schema = type;
return this;
}

@Override
protected Builder self() {
return this;
}

@Override
protected GvsVariantWriteSupport getWriteSupport(Configuration conf) {
return new GvsVariantWriteSupport(schema);
}
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.broadinstitute.hellbender.utils.gvs.parquet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.ParquetWriter;
Expand All @@ -26,7 +27,7 @@ public GvsReferenceParquetFileWriter(
MessageType schema,
boolean enableDictionary,
CompressionCodecName codecName
) throws IOException {
) throws FileAlreadyExistsException, IOException {
super(file, new GvsReferenceWriteSupport(schema), codecName, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE, enableDictionary, false);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.broadinstitute.hellbender.utils.gvs.parquet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.ParquetWriter;
Expand Down Expand Up @@ -41,7 +42,7 @@ public GvsVariantParquetFileWriter(
MessageType schema,
boolean enableDictionary,
CompressionCodecName codecName
) throws IOException {
) throws FileAlreadyExistsException, IOException {
super(file, new GvsVariantWriteSupport(schema), codecName, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE, enableDictionary, false);
}

Expand Down Expand Up @@ -90,37 +91,10 @@ protected Builder self() {
return this;
}

// @Override
// protected GVSVariantWriteSupport getWriteSupport(Configuration conf) {
// return getWriteSupport((Configuration) null);
// }

@Override
protected GvsVariantWriteSupport getWriteSupport(Configuration conf) {
return new GvsVariantWriteSupport(schema);
}
}

// @Override
// public Builder withExtraMetaData(Map<String, String> extraMetaData) {
// return super.withExtraMetaData(extraMetaData);
// }



}

}

/*
public class GvsVariantParquetFileWriter extends ParquetWriter<JSONObject> {
@SuppressWarnings("deprecation")
public GvsVariantParquetFileWriter(
Path file,
MessageType schema,
boolean enableDictionary,
CompressionCodecName codecName
) throws IOException {
// update this to the new constructor soon
super(file, new GVSVariantWriteSupport(schema), codecName, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE, enableDictionary, false);
}
}*/
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package org.broadinstitute.hellbender.tools.gvs.ingest;

import htsjdk.variant.variantcontext.*;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.broadinstitute.hellbender.tools.gvs.common.CommonCode;
import org.broadinstitute.hellbender.tools.gvs.common.IngestConstants;
import org.broadinstitute.hellbender.tools.gvs.common.IngestUtils;
import org.testng.Assert;
import org.testng.annotations.Test;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static htsjdk.variant.vcf.VCFConstants.DEPTH_KEY;
import static org.broadinstitute.hellbender.utils.variant.GATKVCFConstants.*;

public class VetCreatorUnitTest {
private static final long SAMPLE_ID = 100;
private static final String SAMPLE_NAME = "NA1";
private static final String PROJECT_ID = "test";
private static final String DATASET_NAME = "test";
private File outputDirectory = new File("quickstart/output/");
Path currentRelativePath = Paths.get("");
private final CommonCode.OutputType outputType = CommonCode.OutputType.PARQUET;
private static final String VET_FILETYPE_PREFIX = "vet_"; // should this live somewhere else--check out IngestConstants for instance--why is that a tsv?!?!
String PREFIX_SEPARATOR = "_"; // should this live somewhere else?

int sampleTableNumber = IngestUtils.getTableNumber(SAMPLE_ID, IngestConstants.partitionPerTable);
String tableNumber = String.format("%03d", sampleTableNumber);
String sampleIdentifierForOutputFileName = "parquet";
public final MessageType PARQUET_SCHEMA = MessageTypeParser // do we want this in a utils file? or as part of a method?
.parseMessageType("""
message VariantRow {
required int64 sample_id;
required int64 location;
required binary ref (UTF8);
required binary alt (UTF8);
optional binary AS_RAW_MQ (UTF8);
optional binary AS_RAW_MQRankSum (UTF8);
optional binary AS_QUALapprox (UTF8);
optional binary AS_RAW_ReadPosRankSum (UTF8);
optional binary AS_SB_TABLE (UTF8);
optional binary AS_VarDP (UTF8);
required binary call_GT (UTF8);
optional binary call_AD (UTF8);
optional binary call_DP (UTF8);
required int64 call_GQ;
optional binary call_PGT (UTF8);
optional binary call_PID (UTF8);
optional binary call_PS (UTF8);
optional binary call_PL (UTF8);
}
""");



@Test
public void testParquetOutputFile() throws IOException {
String fullPath = String.join(currentRelativePath.toAbsolutePath().toString(), outputDirectory.toString());
final File parquetOutputFile = new File(fullPath, VET_FILETYPE_PREFIX + tableNumber + PREFIX_SEPARATOR + sampleIdentifierForOutputFileName + ".parquet");

String expected = String.join(fullPath, "vet_001_parquet.parquet");
Assert.assertEquals(parquetOutputFile.getAbsoluteFile(), expected);
Files.deleteIfExists(parquetOutputFile.toPath());
}

//@Test(expected = FileAlreadyExistsException.class)
@Test
public void testErrorFile() throws IOException {
VariantContextBuilder builderA =
new VariantContextBuilder("a","1",10329,10329,
Arrays.asList(Allele.REF_C,Allele.ALT_A,Allele.NON_REF_ALLELE));


Genotype g = new GenotypeBuilder(SAMPLE_NAME)
.alleles(Arrays.asList(Allele.REF_C, Allele.ALT_A))
.PL(new int[]{74,0,34,707,390,467})
.DP(64)
.GQ(36)
.AD(new int[]{22,42,0})
.attribute(STRAND_BIAS_BY_SAMPLE_KEY, "1,21,6,50")
.make();

builderA.attribute(AS_RAW_RMS_MAPPING_QUALITY_KEY,"29707.00|39366.00|2405.00")
.attribute(AS_RAW_MAP_QUAL_RANK_SUM_KEY,"|-0.2,1|-2.5,1")
.attribute(RAW_QUAL_APPROX_KEY,"74")
.attribute(AS_RAW_QUAL_APPROX_KEY,"|74|0")
.attribute(AS_RAW_READ_POS_RANK_SUM_KEY,"|2.4,1|1.5,1")
.attribute(AS_SB_TABLE_KEY,"1,21|3,39|3,11")
.attribute(AS_VARIANT_DEPTH_KEY,"22|42|0")
.genotypes(Arrays.asList(g));

VariantContext vc = builderA.make();

final File parquetOutputFile = new File(outputDirectory, VET_FILETYPE_PREFIX + tableNumber + PREFIX_SEPARATOR + sampleIdentifierForOutputFileName + ".parquet");
// Path tempFile = Files.createTempFile(parquetOutputFile.getAbsolutePath());
// Files.createTempFile("vet_001_parquet", ".parquet");
String sampleIdentifierForOutputFileName = "bleh";

VetCreator vetCreator = new VetCreator(parquetOutputFile.getName(), SAMPLE_ID, tableNumber, outputDirectory, outputType, PROJECT_ID, DATASET_NAME, true, false, PARQUET_SCHEMA);
List<String> row = vetCreator.createRow(10329,vc, SAMPLE_NAME);

Assert.assertEquals("/ by zero", row.get(0));
Files.deleteIfExists(parquetOutputFile.toPath());

// Assert.assertEquals("/ by zero", );
}

}

0 comments on commit de9c85c

Please sign in to comment.