Skip to content

Commit

Permalink
Update filtering of vcf file
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanjameskennedy committed Aug 22, 2024
1 parent 1ebe440 commit 0fd8487
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 19 deletions.
7 changes: 5 additions & 2 deletions prp/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,13 @@ def create_bonsai_input(

# parse SNV and SV variants.
if snv_vcf:
results["snv_variants"] = load_variants(snv_vcf)
results["snv_variants"] = load_variants(snv_vcf)["snv_variants"]

if sv_vcf:
results["sv_variants"] = load_variants(sv_vcf)
results["sv_variants"] = load_variants(sv_vcf)["sv_variants"]

if vcf:
results.update(load_variants(vcf))

# entries for reference genome and read mapping
if all([bam, reference_genome_fasta, reference_genome_gff]):
Expand Down
80 changes: 63 additions & 17 deletions prp/parse/variant.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,58 @@
SOURCE_PATTERN = r"##source=(.+)\n"


def _filter_variants(variant_list):
# Initialize the results dictionary
filetered_variants = {
'sv_variants': [],
'mnv_variants': [],
'snv_variants': []
}

# Iterate through each variant in the list
for variant in variant_list:
variant_type = dict(variant).get('variant_type') # Extract the variant_type
# Append the variant to the appropriate key in the results dictionary
if variant_type == "SV":
filetered_variants['sv_variants'].append(variant)
elif variant_type == "MNV":
filetered_variants['mnv_variants'].append(variant)
elif variant_type == "SNV":
filetered_variants['snv_variants'].append(variant)
return filetered_variants


def _get_variant_type(variant) -> VariantType:
"""Parse variant type."""
match variant.var_type:
case "snp":
var_type = VariantType.SNV
case "mnp":
var_type = VariantType.MNV
case "indel":
var_type = VariantType.SV
case _:
var_type = VariantType(variant.var_type.upper())
return var_type


def _get_variant_subtype(ref_base, alt_base):
# Define purines and pyrimidines
purines = {'A', 'G'}
pyrimidines = {'C', 'T'}

# Check for transition substitution
if (ref_base in purines and alt_base in purines) or (ref_base in pyrimidines and alt_base in pyrimidines):
return "TS"

# If it's not a transition, it must be a transversion
return "TV"


def parse_variant(variant: Variant, var_id: int, caller: str | None = None):
"""Parse variant info from VCF row."""

var_objs = []
# check if variant passed qc filtering
if len(variant.FILTERS) == 0:
passed_qc = None
Expand All @@ -36,20 +74,29 @@ def parse_variant(variant: Variant, var_id: int, caller: str | None = None):

var_type: VariantType = _get_variant_type(variant)

var_obj = VariantBase(
id=var_id,
variant_type=var_type,
variant_subtype=variant.var_subtype.upper(),
gene_symbol=variant.CHROM,
start=variant.start,
end=variant.end,
ref_nt=variant.REF,
alt_nt=variant.ALT[0], # haploid
method=variant.INFO.get("SVMETHOD", caller),
confidence=variant.QUAL,
passed_qc=passed_qc,
)
return var_obj
for alt_idx, alt_var in enumerate(variant.ALT):
possible_minority_var = False
var_subtype = variant.var_subtype.upper()
if var_subtype == "UNKNOWN":
var_subtype = _get_variant_subtype(variant.REF, alt_var)
possible_minority_var = True
var_obj = VariantBase(
id=var_id,
variant_type=var_type,
variant_subtype=var_subtype,
gene_symbol=variant.CHROM,
start=variant.start,
end=variant.end,
ref_nt=variant.REF,
alt_nt=alt_var,
frequency=variant.INFO.get("AF") if type(variant.INFO.get("AF")) != tuple else variant.INFO.get("AF")[alt_idx],
depth=variant.INFO.get("DP") if type(variant.INFO.get("DP")) != tuple else variant.INFO.get("DP")[alt_idx],
method=variant.INFO.get("SVMETHOD", caller),
confidence=variant.QUAL,
passed_qc=passed_qc,
)
var_objs.append(var_obj)
return var_objs


def _get_variant_caller(vcf_obj: VCF) -> str | None:
Expand All @@ -76,9 +123,8 @@ def load_variants(variant_file: str) -> list[VariantBase]:
# parse header from vcf file
variants = []
for var_id, variant in enumerate(vcf_obj, start=1):
variants.append(parse_variant(variant, var_id=var_id, caller=variant_caller))

return variants
variants.extend(parse_variant(variant, var_id=var_id, caller=variant_caller))
return _filter_variants(variants)


def annotate_delly_variants(writer, vcf, annotation, annot_chrom=False):
Expand Down

0 comments on commit 0fd8487

Please sign in to comment.