Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sequencing artifact manager #407

Merged
merged 31 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
3354978
#397: Add sequencing artifact manager
seallard Jul 27, 2023
155b54c
Fix tests
seallard Jul 27, 2023
17a8c9d
Sort inputs
seallard Jul 27, 2023
025a08d
Fix error message
seallard Jul 27, 2023
09fe078
Fix formatting
seallard Jul 27, 2023
040c2a3
Extract SampleArtifacts class
seallard Jul 27, 2023
0781a41
Add test for get_sample_artifacts
seallard Jul 27, 2023
c0ed230
Extract lane artifacts
seallard Jul 28, 2023
7c86c5f
Fix sample artifact extraction
seallard Jul 28, 2023
1b4265f
Fix tests
seallard Jul 28, 2023
26e9664
Fix naming
seallard Jul 28, 2023
7b7935c
Fix readability
seallard Jul 28, 2023
223df95
Fix return type
seallard Jul 28, 2023
7882681
Improve readability
seallard Jul 28, 2023
3a85ffb
Merge branch 'master' into 397-add-sequencing-artifact-manager
seallard Jul 28, 2023
5678325
Use enums
seallard Jul 28, 2023
5ed46f6
Use udf enum
seallard Jul 28, 2023
7ec7e99
Fix indentation
seallard Jul 28, 2023
572304a
Assert artifacts are returned
seallard Jul 28, 2023
bc3326a
Remove doc string
seallard Jul 28, 2023
942fe15
Move enum
seallard Jul 28, 2023
683cec9
Move extraction of sample id
seallard Jul 28, 2023
f8910b5
Fix formatting
seallard Jul 28, 2023
8580caf
Add class doc strings
seallard Jul 28, 2023
7523d8e
Fix parameter
seallard Jul 28, 2023
06f64c3
Merge branch 'master' into 397-add-sequencing-artifact-manager
seallard Aug 1, 2023
bfa79fc
Fix test
seallard Aug 1, 2023
5a41d07
linting with black
Karl-Svard Aug 2, 2023
3874533
refactored some function and variable names
Karl-Svard Aug 2, 2023
9a55388
moved test
Karl-Svard Aug 2, 2023
675561f
Add constant for artifact key
seallard Aug 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions cg_lims/EPPs/arnold/flow_cell.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ def flow_cell(ctx):
lims=lims,
output_type=OutputType.RESULT_FILE,
)
flow_cell_document: FlowCell = build_flow_cell_document(
process=process, lanes=lanes
)
flow_cell_document: FlowCell = build_flow_cell_document(process=process, lanes=lanes)
response: Response = requests.post(
url=f"{arnold_host}/flow_cell",
headers={"Content-Type": "application/json"},
Expand Down
7 changes: 1 addition & 6 deletions cg_lims/EPPs/files/sample_sheet/create_sample_sheet.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from genologics.entities import Artifact, Process, ReagentType
from cg_lims import options
from cg_lims.exceptions import LimsError, InvalidValueError
from cg_lims.get.artifacts import get_artifacts
from cg_lims.get.artifacts import get_artifact_lane, get_artifacts
seallard marked this conversation as resolved.
Show resolved Hide resolved
from cg_lims.EPPs.files.sample_sheet.models import (
IndexSetup,
IndexType,
Expand All @@ -22,11 +22,6 @@
LOG = logging.getLogger(__name__)


def get_artifact_lane(artifact: Artifact) -> int:
"""Return the lane where an artifact is placed"""
return int(artifact.location[1].split(":")[0])


def get_non_pooled_artifacts(artifact: Artifact) -> List[Artifact]:
"""Return the parent artifact of the sample. Should hold the reagent_label"""
artifacts: List[Artifact] = []
Expand Down
89 changes: 89 additions & 0 deletions cg_lims/EPPs/qc/sequencing_artifact_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import logging
from collections import defaultdict
from typing import Dict, Optional

from genologics.entities import Artifact, Process
from genologics.lims import Lims

from cg_lims.exceptions import LimsError
from cg_lims.get.artifacts import get_lane_sample_artifacts
from cg_lims.get.fields import get_artifact_sample_id, get_flow_cell_name
from cg_lims.get.udfs import get_q30_threshold
from cg_lims.set.qc import set_quality_control_flag
from cg_lims.set.udfs import set_q30_score, set_reads_count

LOG = logging.getLogger(__name__)


class SampleLaneArtifacts:
"""
Responsible for easily storing and retrieving artifacts per sample id and lane.
"""

def __init__(self):
self._sample_lane_artifacts: Dict[str, Dict[int, Artifact]] = defaultdict(dict)

def add(self, artifact: Artifact, sample_id: str, lane: int) -> None:
self._sample_lane_artifacts[sample_id][lane] = artifact

def get(self, sample_id: str, lane: int) -> Optional[Artifact]:
return self._sample_lane_artifacts.get(sample_id, {}).get(lane)


class SequencingArtifactManager:
"""
Responsible for providing a high level interface for updating sample artifacts
with sequencing metrics and retrieving the flow cell name and q30 threshold.
"""

def __init__(self, process: Process, lims: Lims):
self.process: Process = process
self.lims: Lims = lims

self._sample_lane_artifacts: SampleLaneArtifacts = SampleLaneArtifacts()
self._populate_sample_lane_artifacts()

def _populate_sample_lane_artifacts(self) -> None:
for lane, artifact in get_lane_sample_artifacts(self.process):
sample_id: Optional[str] = get_artifact_sample_id(artifact)

if not sample_id:
LOG.warning(f"Failed to extract sample id from artifact: {artifact}")
continue

self._sample_lane_artifacts.add(artifact=artifact, sample_id=sample_id, lane=lane)

@property
def flow_cell_name(self) -> str:
flow_cell_name: Optional[str] = get_flow_cell_name(self.process)
if not flow_cell_name:
raise LimsError("Flow cell name not set")
return flow_cell_name

@property
def q30_threshold(self) -> int:
q30_threshold: Optional[str] = get_q30_threshold(self.process)
if not q30_threshold:
raise LimsError("Q30 threshold not set")
return int(q30_threshold)

def update_sample(
self,
sample_id: str,
lane: int,
reads: int,
q30_score: float,
passed_quality_control: bool,
) -> None:
artifact: Optional[Artifact] = self._sample_lane_artifacts.get(
sample_id=sample_id, lane=lane
)

if not artifact:
LOG.warning(f"Sample artifact not found for {sample_id} in lane {lane}. Skipping.")
return

set_reads_count(artifact=artifact, reads=reads)
set_q30_score(artifact=artifact, q30_score=q30_score)
set_quality_control_flag(artifact=artifact, passed=passed_quality_control)
artifact.put()
13 changes: 9 additions & 4 deletions cg_lims/EPPs/udf/calculate/get_missing_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ def find_reruns(artifacts: list, status_db: StatusDBAPI) -> None:
"""
Looking for artifacts to rerun.
Negative control samples are never sent for rerun.
A pool with any sample that is not a negative control will be sent for rerun if reads are missing."""
A pool with any sample that is not a negative control will be sent for rerun if reads are missing.
"""
failed_arts = 0
for artifact in artifacts:
if check_control(artifact):
Expand All @@ -61,7 +62,9 @@ def find_reruns(artifacts: list, status_db: StatusDBAPI) -> None:
continue

try:
target_amount_reads = status_db.get_application_tag(tag_name=app_tag, key="target_reads")
target_amount_reads = status_db.get_application_tag(
tag_name=app_tag, key="target_reads"
)
guaranteed_fraction = 0.01 * status_db.get_application_tag(
tag_name=app_tag, key="percent_reads_guaranteed"
)
Expand All @@ -81,9 +84,11 @@ def find_reruns(artifacts: list, status_db: StatusDBAPI) -> None:
@click.command()
@click.pass_context
def get_missing_reads(ctx):
"""Script to calculate missing reads and decide on reruns.
"""
Script to calculate missing reads and decide on reruns.
Negative control samples are never sent for rerun.
A pool with any sample that is not a negative control will be sent for rerun if reads are missing."""
A pool with any sample that is not a negative control will be sent for rerun if reads are missing.
"""

LOG.info(f"Running {ctx.command_path} with params: {ctx.params}")

Expand Down
32 changes: 31 additions & 1 deletion cg_lims/get/artifacts.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Optional, Literal
from typing import Dict, List, Optional, Literal, Set, Tuple

from genologics.entities import Artifact, Process, Sample
from genologics.lims import Lims
Expand All @@ -22,6 +22,36 @@ class OutputGenerationType(str, Enum):
PER_ALL_INPUTS = "PerAllInputs"


def get_artifact_lane(artifact: Artifact) -> int:
"""Return the lane where an artifact is placed"""
return int(artifact.location[1].split(":")[0])

seallard marked this conversation as resolved.
Show resolved Hide resolved

def get_lane_sample_artifacts(process: Process) -> List[Tuple[int, Artifact]]:
lane_sample_artifacts = set()

for input_map, output_map in process.input_output_maps:
try:
if is_output_type_per_reagent(output_map):
output_artifact: Artifact = get_artifact_from_map(output_map)
input_artifact: Artifact = get_artifact_from_map(input_map)
lane: int = get_artifact_lane(input_artifact)

lane_sample_artifacts.add((lane, output_artifact))
except KeyError:
continue

return list(lane_sample_artifacts)


def is_output_type_per_reagent(output_map: Dict) -> bool:
return output_map["output-generation-type"] == OutputGenerationType.PER_REAGENT


def get_artifact_from_map(map: Dict) -> Artifact:
return map["uri"]

seallard marked this conversation as resolved.
Show resolved Hide resolved

def get_sample_artifact(lims: Lims, sample: Sample) -> Artifact:
"""Returning the initial artifact related to a sample.
Assuming first artifact is allways named sample.id + 'PA1."""
Expand Down
37 changes: 28 additions & 9 deletions cg_lims/get/fields.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import datetime as dt
import logging
from typing import Optional, Tuple
from typing import List, Optional, Tuple

from genologics.entities import Artifact, Sample
from genologics.entities import Artifact, Process, Sample
from requests.exceptions import HTTPError

LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -86,7 +86,7 @@ def get_index_well(artifact: Artifact):


def get_barcode(artifact: Artifact):
"""Central script for generation of barcode. Looks at container type and
"""Central script for generation of barcode. Looks at container type and
assign barcode according to Atlas document 'Barcodes at Clinical Genomics'"""

artifact_container_type = artifact.container.type.name.lower()
Expand All @@ -98,14 +98,33 @@ def get_barcode(artifact: Artifact):
# Barcode for pool placed in tube.
elif len(artifact.samples) > 1 and artifact_container_type == "tube":
return artifact.name

# Barcode for sample in tube.
elif artifact_container_type == "tube":
return artifact.samples[0].id[3:]

else:
LOG.info(
f"Sample {str(artifact.samples[0].id)} could not be assigned a barcode."
)
LOG.info(f"Sample {str(artifact.samples[0].id)} could not be assigned a barcode.")
return None


def get_artifact_sample_id(artifact: Artifact) -> Optional[str]:
"""Return the sample ID belonging to an artifact if it isn't a pool."""
samples = artifact.samples if artifact else None
if not (samples and samples[0].id):
return None
if len(samples) > 1:
return None
return samples[0].id


def get_flow_cell_name(process: Process) -> Optional[str]:
artifacts: Optional[List[Artifact]] = process.all_inputs()
if not artifacts:
return None
artifact: Artifact = artifacts[0]
if not artifact.container:
return None
if not artifact.container.name:
return None

return artifact.container.name
14 changes: 14 additions & 0 deletions cg_lims/get/udfs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import date
from enum import Enum
from typing import Optional
from genologics.entities import Entity
from genologics.lims import Lims
Expand All @@ -9,6 +10,12 @@
LOG = logging.getLogger(__name__)


class UserDefinedFields(str, Enum):
READS = "# Reads"
Q30 = "% Bases >=Q30"
Q30_THRESHOLD = "Threshold for % bases >= Q30"


def get_udf_type(lims: Lims, udf_name: str, attach_to_name: str) -> Optional:
"""Get udf type.

Expand Down Expand Up @@ -36,3 +43,10 @@ def get_udf(entity: Entity, udf: str) -> str:
message = f"UDF {udf} not found on {entity._TAG} {entity.id}!"
LOG.error(message)
raise MissingUDFsError(message)


def get_q30_threshold(entity: Entity) -> Optional[str]:
try:
return get_udf(entity, UserDefinedFields.Q30_THRESHOLD.value)
except MissingUDFsError:
return None
15 changes: 15 additions & 0 deletions cg_lims/set/qc.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
from typing import Literal
from genologics.entities import Artifact
from enum import Enum


class QualityCheck(str, Enum):
PASSED = "PASSED"
FAILED = "FAILED"


def set_qc_fail(
Expand All @@ -20,3 +26,12 @@ def set_qc_fail(
artifact.qc_flag = "FAILED"
elif criteria == "!=" and value != threshold:
artifact.qc_flag = "FAILED"


def set_quality_control_flag(passed: bool, artifact: Artifact) -> None:
qc_flag: str = _get_quality_check_flag(passed)
artifact.qc_flag = qc_flag


def _get_quality_check_flag(quality_check_passed: bool) -> str:
return QualityCheck.PASSED.value if quality_check_passed else QualityCheck.FAILED.value
20 changes: 15 additions & 5 deletions cg_lims/set/udfs.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from typing import List, Tuple, Iterator
import logging
from typing import List, Tuple

from genologics.entities import Artifact, Process

import logging

from cg_lims.exceptions import MissingUDFsError
from cg_lims.get.udfs import UserDefinedFields

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -33,8 +33,10 @@ def copy_artifact_to_artifact(

if qc_flag:
if keep_failed_flags and destination_artifact.qc_flag == "FAILED":
message = f"QC for destination artifact {destination_artifact.id} is failed, " \
f"flag not copied over from source artifact {source_artifact.id}"
message = (
f"QC for destination artifact {destination_artifact.id} is failed, "
f"flag not copied over from source artifact {source_artifact.id}"
)
LOG.error(message)
else:
destination_artifact.qc_flag = source_artifact.qc_flag
Expand Down Expand Up @@ -63,3 +65,11 @@ def copy_udf_process_to_artifact(
message = f"{artifact_udf} doesn't seem to be a valid artifact udf."
LOG.error(message)
raise MissingUDFsError(message=message)


def set_reads_count(artifact: Artifact, reads: int) -> None:
artifact.udf[UserDefinedFields.READS] = reads


def set_q30_score(artifact: Artifact, q30_score: float) -> None:
artifact.udf[UserDefinedFields.Q30] = q30_score
Empty file added tests/EPPs/qc/__init__.py
Empty file.
Loading
Loading