Skip to content

Commit

Permalink
Merge pull request #13 from Clinical-Genomics-Lund/12-handle-mykrobe-…
Browse files Browse the repository at this point in the history
…csv-output

Update prp to handle mykrobe csv format
  • Loading branch information
ryanjameskennedy authored Jan 4, 2024
2 parents 420b073 + 4850c86 commit f113731
Show file tree
Hide file tree
Showing 25 changed files with 2,300 additions and 220 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/pylint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install pylint
pip install -e .
pip install pytest
pip install -e .[dev]
- name: Analysing the code with pylint
run: |
pylint --fail-under 9 $(git ls-files '*.py')
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@

### Added

- Pytest for Mycobacterium tuberculosis

### Fixed

### Changed

- Mykrobe output parser handles csv format instead of json

## [0.2.0]

### Added
Expand Down
14 changes: 9 additions & 5 deletions prp/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import List

import click
import pandas as pd
from pydantic import TypeAdapter, ValidationError

from .models.metadata import SoupType, SoupVersion
Expand Down Expand Up @@ -188,10 +189,13 @@ def create_bonsai_input(
# mykrobe
if mykrobe:
LOG.info("Parse mykrobe results")
pred_res = json.load(mykrobe)
pred_res = pd.read_csv(mykrobe, quotechar='"')
pred_res.columns.values[3] = "variants"
pred_res.columns.values[4] = "genes"
pred_res = pred_res.to_dict(orient="records")

# verify that sample id is in prediction result
if not sample_id in pred_res:
if not sample_id in pred_res[0]["sample"]:
LOG.warning(
"Sample id %s is not in Mykrobe result, possible sample mixup",
sample_id,
Expand All @@ -202,17 +206,17 @@ def create_bonsai_input(
results["run_metadata"]["databases"].append(
SoupVersion(
name="mykrobe-predictor",
version=pred_res[sample_id]["version"]["mykrobe-predictor"],
version=pred_res[0]["mykrobe_version"],
type=SoupType.DB,
)
)
# parse mykrobe result
amr_res = parse_mykrobe_amr_pred(pred_res[sample_id], ElementType.AMR)
amr_res = parse_mykrobe_amr_pred(pred_res, ElementType.AMR)
if amr_res is not None:
results["element_type_result"].append(amr_res)

lin_res: MethodIndex = parse_mykrobe_lineage_results(
pred_res[sample_id], TypingMethod.LINEAGE
pred_res, TypingMethod.LINEAGE
)
results["typing_result"].append(lin_res)

Expand Down
14 changes: 9 additions & 5 deletions prp/models/phenotype.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from enum import Enum
from typing import Dict, List, Optional, Union

from pydantic import BaseModel, ConfigDict, Field
from pydantic import BaseModel, Field

from .base import RWModel

Expand Down Expand Up @@ -90,6 +90,7 @@ class GeneBase(BaseModel):
coverage: Optional[float] = None
ref_start_pos: Optional[int] = None
ref_end_pos: Optional[int] = None
drugs: Optional[List[Union[Dict, str]]] = None
ref_gene_length: Optional[int] = Field(
default=None,
alias="target_length",
Expand Down Expand Up @@ -143,14 +144,17 @@ class VariantBase(DatabaseReference):
"""Container for mutation information"""

variant_type: VariantType
genes: List[str]
position: int
ref_nt: str
alt_nt: str
ref_aa: str
alt_aa: str
ref_aa: Optional[str] = None
alt_aa: Optional[str] = None
# prediction info
conf: Optional[int] = None
alt_kmer_count: Optional[int] = None
ref_kmer_count: Optional[int] = None
depth: Optional[float] = None
freq: Optional[float] = None
contig_id: Optional[str] = None
gene_symbol: Optional[str] = None
sequence_name: Optional[str] = Field(
Expand All @@ -175,7 +179,7 @@ class VariantBase(DatabaseReference):
nucleotide_change: Optional[str] = None
protein_change: Optional[str] = None
annotation: Optional[List[Dict]] = None
drugs: Optional[List[Dict]] = None
drugs: Optional[List[Union[Dict, str]]] = None


class ResistanceVariant(VariantBase):
Expand Down
2 changes: 1 addition & 1 deletion prp/models/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class LineageInformation(RWModel):
rd: str | None = None
fraction: float | None = None
variant: str | None = None
coverage: Dict[str, Any] = None
coverage: Dict[str, Any] | None = None


class ResultMlstBase(RWModel):
Expand Down
78 changes: 27 additions & 51 deletions prp/parse/phenotype/mykrobe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import re
from typing import Any, Dict, Tuple

from ...models.phenotype import ElementAmrSubtype, ElementType, ElementTypeResult
from ...models.phenotype import ElementType, ElementTypeResult
from ...models.phenotype import PredictionSoftware as Software
from ...models.phenotype import ResistanceGene, ResistanceVariant, VariantType
from ...models.phenotype import ResistanceVariant, VariantType
from ...models.sample import MethodIndex
from .utils import is_prediction_result_empty

Expand All @@ -21,37 +21,13 @@ def _get_mykrobe_amr_sr_profie(mykrobe_result):
return {}

for element_type in mykrobe_result:
if mykrobe_result[element_type]["predict"].upper() == "R":
resistant.add(element_type)
if element_type["susceptibility"].upper() == "R":
resistant.add(element_type["drug"])
else:
susceptible.add(element_type)
susceptible.add(element_type["drug"])
return {"susceptible": list(susceptible), "resistant": list(resistant)}


def _parse_mykrobe_amr_genes(mykrobe_result) -> Tuple[ResistanceGene, ...]:
"""Get resistance genes from mykrobe result."""
results = []
for element_type in mykrobe_result:
# skip non-resistance yeilding
if not mykrobe_result[element_type]["predict"].upper() == "R":
continue

hits = mykrobe_result[element_type]["called_by"]
for hit_name, hit in hits.items():
gene = ResistanceGene(
gene_symbol=hit_name.split("_")[0],
accession=None,
depth=hit["info"]["coverage"]["alternate"]["median_depth"],
identity=None,
coverage=hit["info"]["coverage"]["alternate"]["percent_coverage"],
phenotypes=[element_type.lower()],
element_type=ElementType.AMR,
element_subtype=ElementAmrSubtype.AMR,
)
results.append(gene)
return results


def get_mutation_type(var_nom: str) -> Tuple[VariantType, str, str, int]:
"""Extract mutation type from Mykrobe mutation description.
Expand Down Expand Up @@ -90,34 +66,35 @@ def _parse_mykrobe_amr_variants(mykrobe_result) -> Tuple[ResistanceVariant, ...]

for element_type in mykrobe_result:
# skip non-resistance yeilding
if not mykrobe_result[element_type]["predict"].upper() == "R":
if not element_type["susceptibility"].upper() == "R":
continue

hits = mykrobe_result[element_type]["called_by"]
for hit in hits:
if hits[hit]["variant"] is not None:
continue
if element_type["variants"] is None:
continue

var_info = hit.split("-")[1]
variants = element_type["variants"].split(";")
for var in variants:
var_info = var.split("-")[1].split(":")[0]
_, ref_nt, alt_nt, position = get_mutation_type(var_info)
var_nom = hit.split("-")[0].split("_")[1]
var_type, *_ = get_mutation_type(var_nom)
var_nom = var.split("-")[0].split("_")[1]
var_type, ref_aa, alt_aa, _ = get_mutation_type(var_nom)
variant = ResistanceVariant(
variant_type=var_type,
genes=[hit.split("_")[0]],
phenotypes=[element_type],
gene_symbol=var.split("_")[0],
position=position,
ref_nt=ref_nt,
alt_nt=alt_nt,
depth=hits[hit]["info"]["coverage"]["alternate"]["median_depth"],
ref_database=None,
ref_id=None,
type=None,
ref_aa=ref_aa if len(ref_aa) == 1 and len(alt_aa) == 1 else None,
alt_aa=alt_aa if len(ref_aa) == 1 and len(alt_aa) == 1 else None,
conf=float(var.split(":")[-1]),
alt_kmer_count=float(var.split(":")[-2]),
ref_kmer_count=float(var.split(":")[-3]),
change=var_nom,
nucleotide_change=None,
protein_change=None,
annotation=None,
drugs=None,
nucleotide_change="c." + var_info,
protein_change="p." + var_nom,
element_type=ElementType.AMR,
method=element_type["genotype_model"],
drugs=[element_type["drug"].lower()],
)
results.append(variant)
return results
Expand All @@ -128,11 +105,10 @@ def parse_mykrobe_amr_pred(
) -> ElementTypeResult | None:
"""Parse mykrobe resistance prediction results."""
LOG.info("Parsing mykrobe prediction")
pred = prediction["susceptibility"]
resistance = ElementTypeResult(
phenotypes=_get_mykrobe_amr_sr_profie(pred),
genes=_parse_mykrobe_amr_genes(pred),
mutations=_parse_mykrobe_amr_variants(pred),
phenotypes=_get_mykrobe_amr_sr_profie(prediction),
genes=[],
mutations=_parse_mykrobe_amr_variants(prediction),
)

# verify prediction result
Expand Down
61 changes: 2 additions & 59 deletions prp/parse/phenotype/resfinder.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from ...models.phenotype import PredictionSoftware as Software
from ...models.phenotype import ResistanceGene, ResistanceVariant, VariantType
from ...models.sample import MethodIndex
from .utils import _default_resistance
from .utils import format_nt_change, get_nt_change

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -221,7 +221,7 @@ def _parse_resfinder_amr_genes(
"""Get resistance genes from resfinder result."""
results = []
if not "seq_regions" in resfinder_result:
return _default_resistance().genes
return [ResistanceGene()]

for info in resfinder_result["seq_regions"].values():
# Get only acquired resistance genes
Expand Down Expand Up @@ -270,64 +270,11 @@ def _parse_resfinder_amr_genes(
return results


def get_nt_change(ref_codon: str, alt_codon: str) -> Tuple[str, str]:
"""Get nucleotide change from codons
Ref: TCG, Alt: TTG => Tuple[C, T]
:param ref_codon: Reference codeon
:type ref_codon: str
:param str: Alternatve codon
:type str: str
:return: Returns nucleotide changed from the reference.
:rtype: Tuple[str, str]
"""
ref_nt = ""
alt_nt = ""
for ref, alt in zip(ref_codon, alt_codon):
if not ref == alt:
ref_nt += ref
alt_nt += alt
return ref_nt.upper(), alt_nt.upper()


def format_nt_change(
ref: str,
alt: str,
var_type: VariantType,
start_pos: int,
end_pos: int = None,
) -> str:
"""Format nucleotide change
:param ref: Reference sequence
:type ref: str
:param alt: Alternate sequence
:type alt: str
:param pos: Position
:type pos: int
:param var_type: Type of change
:type var_type: VariantType
:return: Formatted nucleotide
:rtype: str
"""
fmt_change = ""
match var_type:
case VariantType.SUBSTITUTION:
f"g.{start_pos}{ref}>{alt}"
case VariantType.DELETION:
f"g.{start_pos}_{end_pos}del"
case VariantType.INSERTION:
f"g.{start_pos}_{end_pos}ins{alt}"
return fmt_change


def _parse_resfinder_amr_variants(
resfinder_result, limit_to_phenotypes=None
) -> Tuple[ResistanceVariant, ...]:
"""Get resistance genes from resfinder result."""
results = []
igenes = []
for info in resfinder_result["seq_variations"].values():
# Get only variants from desired phenotypes
if limit_to_phenotypes is not None:
Expand All @@ -350,9 +297,6 @@ def _parse_resfinder_amr_variants(
var_type = VariantType.DELETION
else:
raise ValueError("Output has no known mutation type")
if not "seq_regions" in info:
# igenes = _default_resistance().genes
igenes = [""]

# get gene symbol and accession nr
gene_symbol, _, gene_accnr = info["seq_regions"][0].split(";;")
Expand All @@ -376,7 +320,6 @@ def _parse_resfinder_amr_variants(
gene_symbol=gene_symbol,
accession=gene_accnr,
close_seq_name=gene_accnr,
genes=igenes,
phenotypes=phenotype,
position=info["ref_start_pos"],
ref_nt=ref_nt,
Expand Down
9 changes: 5 additions & 4 deletions prp/parse/phenotype/tbprofiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from ...models.phenotype import PredictionSoftware as Software
from ...models.phenotype import ResistanceVariant
from ...models.sample import MethodIndex
from .utils import _default_variant

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -50,14 +49,16 @@ def _parse_tbprofiler_amr_variants(tbprofiler_result) -> Tuple[ResistanceVariant

for hit in tbprofiler_result["dr_variants"]:
var_type = "substitution"

variant = ResistanceVariant(
variant_type=var_type,
genes=[hit["gene"]],
phenotypes=hit["gene_associated_drugs"],
gene_symbol=hit["gene"],
phenotypes=[],
position=int(hit["genome_pos"]),
ref_nt=hit["ref"],
alt_nt=hit["alt"],
depth=hit["depth"],
freq=float(hit["freq"]),
ref_database=tbprofiler_result["db_version"]["name"],
type=hit["type"],
nucleotide_change=hit["nucleotide_change"],
Expand All @@ -68,7 +69,7 @@ def _parse_tbprofiler_amr_variants(tbprofiler_result) -> Tuple[ResistanceVariant
results.append(variant)

if not results:
results = _default_variant().mutations
results = [ResistanceVariant()]
return results

return results
Expand Down
Loading

0 comments on commit f113731

Please sign in to comment.