Skip to content

Commit

Permalink
tb results pass output validation
Browse files Browse the repository at this point in the history
  • Loading branch information
mhkc committed Dec 5, 2023
1 parent 0e63196 commit 0407c89
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 169 deletions.
15 changes: 8 additions & 7 deletions prp/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from pydantic import ValidationError

from .models.metadata import SoupType, SoupVersion
from .models.phenotype import ElementStressSubtype, ElementType
from .models.phenotype import ElementType
from .models.qc import QcMethodIndex
from .models.sample import MethodIndex, PipelineResult
from .models.typing import TypingMethod
Expand Down Expand Up @@ -189,17 +189,19 @@ def create_output(
)
raise click.Abort()

# add mykrobe db version
results["run_metadata"]["databases"].append(
SoupVersion(
name="mykrobe-predictor",
version=pred_res[sample_id]["version"]["mykrobe-predictor"],
type=SoupType.DB,
)
)
amr_res: MethodIndex = parse_mykrobe_amr_pred(
pred_res[sample_id], ElementType.AMR
)
results["element_type_result"].append(amr_res)
# parse mykrobe result
amr_res = parse_mykrobe_amr_pred(pred_res[sample_id], ElementType.AMR)
if amr_res is not None:
results["element_type_result"].append(amr_res)

lin_res: MethodIndex = parse_mykrobe_lineage_results(
pred_res[sample_id], TypingMethod.LINEAGE
)
Expand Down Expand Up @@ -239,8 +241,7 @@ def create_output(


@cli.command()
@click.argument("output", type=click.File("w"), default="-")
def print_schema(output):
def print_schema():
"""Print Pipeline result output format schema."""
click.secho(PipelineResult.schema_json(indent=2))

Expand Down
11 changes: 6 additions & 5 deletions prp/models/phenotype.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class GeneBase(BaseModel):
alias="target_length",
description="The length of the query protein or gene.",
)
alignment_length: Optional[int]
alignment_length: Optional[int] = None
# amrfinder extra info
contig_id: Optional[str] = None
gene_symbol: Optional[str] = None
Expand All @@ -100,17 +100,18 @@ class GeneBase(BaseModel):
element_type: ElementType = Field(
description="The predominant function fo the gene."
)
element_subtype: ElementStressSubtype | ElementAmrSubtype | ElementVirulenceSubtype = Field(
description="Further functional categorization of the genes."
)
element_subtype: Union[
ElementStressSubtype, ElementAmrSubtype, ElementVirulenceSubtype
] = Field(description="Further functional categorization of the genes.")
res_class: Optional[str] = None
res_subclass: Optional[str] = None
method: Optional[str] = Field(
default=None, description="Generic description of the prediction method"
)
close_seq_name: Optional[str] = Field(
default=None,
description="Name of the closest competing hit if there are multiple equaly good hits",
description=("Name of the closest competing hit if there "
"are multiple equaly good hits"),
)


Expand Down
16 changes: 8 additions & 8 deletions prp/models/typing.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Typing related data models"""

from enum import Enum
from typing import Dict, List, Optional, Union
from typing import Dict, List, Optional, Union, Any

from pydantic import Field

Expand Down Expand Up @@ -48,13 +48,13 @@ class MlstErrors(str, Enum):
class LineageInformation(RWModel):
"""Base class for storing lineage information typing results"""

lin: Optional[str]
family: Optional[str]
spoligotype: Optional[str]
rd: Optional[str]
frac: Optional[str]
variant: Optional[str]
coverage: Optional[Dict]
lin: str | None = None
family: str | None = None
spoligotype: str | None = None
rd: str | None = None
fraction: float | None = None
variant: str | None = None
coverage: Dict[str, Any] = None


class ResultMlstBase(RWModel):
Expand Down
2 changes: 1 addition & 1 deletion prp/parse/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
from typing import List, TextIO

from ..models.metadata import RunInformation, RunMetadata, SoupVersion
from ..models.metadata import RunInformation, SoupVersion

LOG = logging.getLogger(__name__)

Expand Down
1 change: 0 additions & 1 deletion prp/parse/phenotype/amrfinder.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from ...models.phenotype import PredictionSoftware as Software
from ...models.phenotype import ResistanceGene, VirulenceGene
from ...models.sample import MethodIndex
from .utils import _default_resistance

LOG = logging.getLogger(__name__)

Expand Down
189 changes: 94 additions & 95 deletions prp/parse/phenotype/mykrobe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@
import re
from typing import Any, Dict, Tuple

from ...models.metadata import SoupVersions
from ...models.phenotype import ElementTypeResult
from ...models.phenotype import ElementTypeResult, ElementType, ElementAmrSubtype
from ...models.phenotype import PredictionSoftware as Software
from ...models.phenotype import ResistanceGene, ResistanceVariant
from ...models.phenotype import ResistanceGene, ResistanceVariant, VariantType
from ...models.sample import MethodIndex
from .utils import _default_resistance, _default_variant
from .utils import is_prediction_result_empty

LOG = logging.getLogger(__name__)

Expand All @@ -32,115 +31,115 @@ def _get_mykrobe_amr_sr_profie(mykrobe_result):
def _parse_mykrobe_amr_genes(mykrobe_result) -> Tuple[ResistanceGene, ...]:
"""Get resistance genes from mykrobe result."""
results = []

if not mykrobe_result:
results = _default_resistance().genes
return results

for element_type in mykrobe_result:
if mykrobe_result[element_type]["predict"].upper() == "R":
hits = mykrobe_result[element_type]["called_by"]
for hit in hits:
gene = ResistanceGene(
gene_symbol=hit.split("_")[0],
accession=None,
depth=hits[hit]["info"]["coverage"]["alternate"]["median_depth"],
identity=None,
coverage=hits[hit]["info"]["coverage"]["alternate"][
"percent_coverage"
],
ref_start_pos=None,
ref_end_pos=None,
ref_gene_length=None,
alignment_length=None,
phenotypes=element_type,
ref_database=None,
ref_id=None,
contig_id=None,
sequence_name=None,
ass_start_pos=None,
ass_end_pos=None,
strand=None,
element_type=None,
element_subtype=None,
target_length=None,
res_class=None,
res_subclass=None,
method=None,
close_seq_name=None,
)
results.append(gene)
# skip non-resistance yeilding
if not mykrobe_result[element_type]["predict"].upper() == "R":
continue

hits = mykrobe_result[element_type]["called_by"]
for hit_name, hit in hits.items():
gene = ResistanceGene(
gene_symbol=hit_name.split("_")[0],
accession=None,
depth=hit["info"]["coverage"]["alternate"]["median_depth"],
identity=None,
coverage=hit["info"]["coverage"]["alternate"]["percent_coverage"],
phenotypes=[element_type.lower()],
element_type=ElementType.AMR,
element_subtype=ElementAmrSubtype.AMR,
)
results.append(gene)
return results


def get_mutation_type(var_nom: str) -> Tuple[VariantType, str, str, int]:
"""Extract mutation type from Mykrobe mutation description.
GCG7569GTG -> mutation type, ref_nt, alt_nt, pos
:param var_nom: Mykrobe mutation description
:type var_nom: str
:return: Return variant type, ref_codon, alt_codont and position
:rtype: Tuple[VariantType, str, str, int]
"""
mut_type = None
ref_codon = None
alt_codon = None
position = None
try:
ref_idx = re.search(r"\d", var_nom, 1).start()
alt_idx = re.search(r"\d(?=[^\d]*$)", var_nom).start() + 1
except AttributeError:
return mut_type, ref_codon, alt_codon, position

ref_codon = var_nom[:ref_idx]
alt_codon = var_nom[alt_idx:]
position = int(var_nom[ref_idx:alt_idx])
if len(ref_codon) > len(alt_codon):
mut_type = VariantType.DELETION
elif len(ref_codon) < len(alt_codon):
mut_type = VariantType.INSERTION
else:
mut_type = VariantType.SUBSTITUTION
return mut_type, ref_codon, alt_codon, position


def _parse_mykrobe_amr_variants(mykrobe_result) -> Tuple[ResistanceVariant, ...]:
"""Get resistance genes from mykrobe result."""
results = []

def get_mutation_type(var_nom):
try:
ref_idx = re.search(r"\d", var_nom, 1).start()
alt_idx = re.search(r"\d(?=[^\d]*$)", var_nom).start() + 1
except AttributeError:
return [None] * 4
ref = var_nom[:ref_idx]
alt = var_nom[alt_idx:]
position = int(var_nom[ref_idx:alt_idx])
if len(ref) > len(alt):
mut_type = "deletion"
elif len(ref) < len(alt):
mut_type = "insertion"
else:
mut_type = "substitution"
return mut_type, ref, alt, position

for element_type in mykrobe_result:
if mykrobe_result[element_type]["predict"].upper() == "R":
hits = mykrobe_result[element_type]["called_by"]
for hit in hits:
if hits[hit]["variant"] is None:
var_info = hit.split("-")[1]
_, ref_nt, alt_nt, position = get_mutation_type(var_info)
var_nom = hit.split("-")[0].split("_")[1]
var_type, _, _, _ = get_mutation_type(var_nom)
variant = ResistanceVariant(
variant_type=var_type,
genes=[hit.split("_")[0]],
phenotypes=[element_type],
position=position,
ref_nt=ref_nt,
alt_nt=alt_nt,
depth=hits[hit]["info"]["coverage"]["alternate"][
"median_depth"
],
ref_database=None,
ref_id=None,
type=None,
change=var_nom,
nucleotide_change=None,
protein_change=None,
annotation=None,
drugs=None,
)
results.append(variant)
if not results:
results = _default_variant().mutations
return results

# skip non-resistance yeilding
if not mykrobe_result[element_type]["predict"].upper() == "R":
continue

hits = mykrobe_result[element_type]["called_by"]
for hit in hits:
if hits[hit]["variant"] is not None:
continue

var_info = hit.split("-")[1]
_, ref_nt, alt_nt, position = get_mutation_type(var_info)
var_nom = hit.split("-")[0].split("_")[1]
var_type, *_ = get_mutation_type(var_nom)
variant = ResistanceVariant(
variant_type=var_type,
genes=[hit.split("_")[0]],
phenotypes=[element_type],
position=position,
ref_nt=ref_nt,
alt_nt=alt_nt,
depth=hits[hit]["info"]["coverage"]["alternate"]["median_depth"],
ref_database=None,
ref_id=None,
type=None,
change=var_nom,
nucleotide_change=None,
protein_change=None,
annotation=None,
drugs=None,
)
results.append(variant)
return results


def parse_mykrobe_amr_pred(
prediction: Dict[str, Any], resistance_category
) -> Tuple[SoupVersions, ElementTypeResult]:
) -> ElementTypeResult | None:
"""Parse mykrobe resistance prediction results."""
LOG.info("Parsing mykrobe prediction")
pred = prediction["susceptibility"]
resistance = ElementTypeResult(
phenotypes=_get_mykrobe_amr_sr_profie(pred),
genes=[], # _parse_mykrobe_amr_genes(pred),
genes=_parse_mykrobe_amr_genes(pred),
mutations=_parse_mykrobe_amr_variants(pred),
)
return MethodIndex(
type=resistance_category, software=Software.MYKROBE, result=resistance
)

# verify prediction result
if is_prediction_result_empty(resistance):
result = None
else:
result = MethodIndex(
type=resistance_category, software=Software.MYKROBE, result=resistance
)
return result
2 changes: 1 addition & 1 deletion prp/parse/phenotype/resfinder.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def _assign_res_subtype(
elif element_type == ElementType.AMR:
assigned_subtype = ElementAmrSubtype.AMR
else:
LOG.warning(f"Dont know how to assign subtype for {element_type}")
LOG.warning("Dont know how to assign subtype for %s", element_type)
return assigned_subtype


Expand Down
14 changes: 13 additions & 1 deletion prp/parse/phenotype/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Shared utility functions."""
from ...models.phenotype import ElementTypeResult, ResistanceGene, VirulenceGene
from ...models.phenotype import ElementTypeResult, ResistanceGene


def _default_resistance() -> ElementTypeResult:
Expand Down Expand Up @@ -48,3 +48,15 @@ def _default_variant() -> ElementTypeResult:
)
mutations = [mutation]
return ElementTypeResult(phenotypes=[], genes=[], mutations=mutations)


def is_prediction_result_empty(result: ElementTypeResult) -> bool:
"""Check if prediction result is emtpy.
:param result: Prediction result
:type result: ElementTypeResult
:return: Retrun True if no resistance was predicted.
:rtype: bool
"""
n_entries = len(result.genes) + len(result.mutations)
return n_entries == 1
10 changes: 9 additions & 1 deletion prp/parse/phenotype/virulencefinder.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,19 @@ def _parse_virulencefinder_vir_results(pred: str) -> ElementTypeResult:


def parse_virulencefinder_vir_pred(file: str) -> ElementTypeResult | None:
"""Parse virulencefinder virulence prediction results."""
"""Parse virulencefinder virulence prediction results.
:param file: File name
:type file: str
:return: Return element type if virulence was predicted else null
:rtype: ElementTypeResult | None
"""
LOG.info("Parsing virulencefinder virulence prediction")
pred = json.load(file)
if "not virulencefinder" in pred:
results: ElementTypeResult = _parse_virulencefinder_vir_results(pred)
return MethodIndex(
type=ElementType.VIR, software=Software.VIRFINDER, result=results
)

return None
Loading

0 comments on commit 0407c89

Please sign in to comment.