Skip to content

Commit

Permalink
update helm tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nebfield committed Nov 4, 2024
1 parent 45e9331 commit cda66f6
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 40 deletions.
6 changes: 4 additions & 2 deletions pyvatti/src/pyvatti/cli/render.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ def main() -> None:
)
parser.add_argument("--message_path", required=True)
parser.add_argument("--env_path", required=True)
parser.add_argument("--bucket_name", required=True)

parser.add_argument("--out_path", required=True)

args = parser.parse_args()
Expand All @@ -32,8 +34,8 @@ def main() -> None:
job: JobRequest = JobRequest(**msg)
template: dict = render_template(
job=job,
work_bucket_path="testpathwork/",
results_bucket_path="testpathresults",
work_bucket_path=args.bucket_name,
results_bucket_path=args.bucket_name,
settings=settings,
)
logger.info("Rendered helm values file OK")
Expand Down
11 changes: 10 additions & 1 deletion pyvatti/src/pyvatti/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from tempfile import NamedTemporaryFile
from typing import Optional, Self

from pydantic import Field, DirectoryPath, KafkaDsn, model_validator
from pydantic import Field, DirectoryPath, KafkaDsn, model_validator, HttpUrl
from pydantic_settings import BaseSettings


Expand Down Expand Up @@ -54,6 +54,15 @@ class Settings(BaseSettings):
KAFKA_BOOTSTRAP_SERVER: Optional[KafkaDsn] = Field(default=None)
KAFKA_CONSUMER_TOPIC: Optional[str] = Field(default="pipeline-launch")
KAFKA_PRODUCER_TOPIC: Optional[str] = Field(default="pipeline-status")
KEY_HANDLER_TOKEN: str = Field(
description="Token to authenticate with the key handler service"
)
KEY_HANDLER_PASSWORD: str = Field(
description="Password used by the globus file handler to decrypt the secret key"
)
KEY_HANDLER_URL: Optional[HttpUrl] = Field(
description="URL used to contact the key handler service"
)

@model_validator(mode="after")
def check_mandatory_settings(self) -> Self:
Expand Down
58 changes: 45 additions & 13 deletions pyvatti/src/pyvatti/helm.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,22 @@
It's assumed input parameters are validated by JobModels. This module aims to model and
validate generated job configuration, like work bucket names.
"""

import pathlib
from typing import Optional

import yaml
from pydantic import BaseModel, Field, field_validator, ConfigDict
from pydantic import (
BaseModel,
Field,
field_validator,
ConfigDict,
UUID4,
field_serializer,
HttpUrl,
)

from pyvatti.config import Settings
from pyvatti.messagemodels import JobRequest
from pyvatti.messagemodels import JobRequest, GlobusConfig, TargetGenome


def parse_value_template(helm_chart_path: pathlib.Path) -> dict:
Expand Down Expand Up @@ -74,24 +82,41 @@ class JobInput(BaseModel):
class GlobflowParams(BaseModel):
input: str
outdir: str
config_secrets: str
config_application: str
config_crypt4gh: str


class Secrets(BaseModel):
"""These secrets must be templated with pyvatti environment variables"""

globusDomain: str
globusClientId: str
globusClientSecret: str
globusScopes: str
towerToken: str
towerId: str
keyHandlerToken: str
keyHandlerPassword: str
keyHandlerURL: HttpUrl

@field_serializer("keyHandlerURL")
@classmethod
def url_to_string(cls, url: HttpUrl) -> str:
return str(url)


class KeyHandlerDetails(BaseModel):
secretId: UUID4
secretIdVersion: str

@field_serializer("secretId")
@classmethod
def uuid_to_str(cls, uuid: UUID4) -> str:
return str(uuid).upper()


class HelmValues(BaseModel):
"""Represents all fields in the helm chart that can be templated"""

model_config = ConfigDict(validate_assignment=True)
model_config = ConfigDict(validate_assignment=True, use_enum_values=True)

baseImage: str
dockerTag: str
Expand All @@ -101,12 +126,12 @@ class HelmValues(BaseModel):
serviceAccount: dict

nxfParams: NextflowParams
# a JSON string
calcWorkflowInput: str

calcWorkflowInput: list[TargetGenome]

calcJobParams: CalcJobParams
# a JSON string
globflowInput: str
keyHandlerSecret: KeyHandlerDetails
globflowInput: GlobusConfig
globflowParams: GlobflowParams
secrets: Secrets

Expand All @@ -119,6 +144,9 @@ def _add_secrets(job: HelmValues, settings: Settings) -> None:
job.secrets.globusClientId = settings.GLOBUS_CLIENT_ID
job.secrets.globusClientSecret = settings.GLOBUS_CLIENT_SECRET
job.secrets.globusScopes = settings.GLOBUS_SCOPES
job.secrets.keyHandlerToken = settings.KEY_HANDLER_TOKEN
job.secrets.keyHandlerPassword = settings.KEY_HANDLER_PASSWORD
job.secrets.keyHandlerURL = settings.KEY_HANDLER_URL


def _add_bucket_path(job: JobRequest, bucketPath: str) -> None:
Expand Down Expand Up @@ -152,12 +180,16 @@ def render_template(
job_values.nxfParams.location = settings.GCP_LOCATION
job_values.calcJobParams.outdir = f"gs://{results_bucket_path}/results"

job_values.calcWorkflowInput = job.pipeline_param.target_genomes.model_dump_json()
job_values.globflowInput = job.globus_details.model_dump_json()
job_values.calcWorkflowInput = job.pipeline_param.target_genomes
job_values.globflowInput = job.globus_details

for x in ("pgs_id", "pgp_id", "trait_efo", "target_build"):
setattr(
job_values.calcJobParams, x, getattr(job.pipeline_param.nxf_params_file, x)
)

job_values.keyHandlerSecret.secretId = job.secret_key_details.secret_id
job_values.keyHandlerSecret.secretIdVersion = (
job.secret_key_details.secret_id_version
)
return job_values.model_dump()
68 changes: 44 additions & 24 deletions pyvatti/src/pyvatti/messagemodels.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
field_validator,
Field,
UUID4,
field_serializer,
RootModel,
ConfigDict,
)


Expand Down Expand Up @@ -79,7 +79,7 @@ class TargetGenome(BaseModel):
>>> data = {"sampleset": "test", "chrom": None, "geno": "hi.pgen", "pheno": "hi.psam", "variants": "hi.pvar.zst", "format": "pfile"}
>>> TargetGenome(**data)
TargetGenome(sampleset='test', chrom=None, vcf_import_dosage=False, geno=PosixPath('hi.pgen'), pheno=PosixPath('hi.psam'), variants=PosixPath('hi.pvar.zst'), format=<TargetFormat.PFILE: 'pfile'>)
TargetGenome(sampleset='test', chrom=None, vcf_import_dosage=False, geno='hi.pgen', pheno='hi.psam', variants='hi.pvar.zst', format='pfile')
>>> TargetGenome(**data).model_dump_json()
'{"sampleset":"test","chrom":null,"vcf_import_dosage":false,"geno":"hi.pgen","pheno":"hi.psam","variants":"hi.pvar.zst","format":"pfile"}'
Expand All @@ -88,9 +88,11 @@ class TargetGenome(BaseModel):
>>> data = {"sampleset": "test", "chrom": None, "geno": "hi.vcf.gz", "pheno": "hi.vcf.gz", "variants": "hi.vcf.gz", "format": "vcf"}
>>> TargetGenome(**data)
TargetGenome(sampleset='test', chrom=None, vcf_import_dosage=False, geno=PosixPath('hi.vcf.gz'), pheno=PosixPath('hi.vcf.gz'), variants=PosixPath('hi.vcf.gz'), format=<TargetFormat.VCF: 'vcf'>)
TargetGenome(sampleset='test', chrom=None, vcf_import_dosage=False, geno='hi.vcf.gz', pheno='hi.vcf.gz', variants='hi.vcf.gz', format='vcf')
"""

model_config = ConfigDict(validate_assignment=True, use_enum_values=True)

sampleset: Annotated[str, Field(description="A human label for a cohort / dataset")]
chrom: Annotated[
Optional[str],
Expand All @@ -107,21 +109,29 @@ class TargetGenome(BaseModel):
default=False,
),
]
# not pathlib.Path because it messes up gs:// prefix
geno: Annotated[
pathlib.Path,
str,
Field(description="Path to a genotype file (e.g. pgen / bed / vcf)"),
]
pheno: Annotated[
pathlib.Path, Field(description="Path to a phenotype file (e.g. psam / fam)")
str, Field(description="Path to a phenotype file (e.g. psam / fam)")
]
variants: Annotated[
pathlib.Path,
str,
Field(description="Path to a variant information file (e.g. bim / pvar"),
]
format: Annotated[
TargetFormat, Field(description="What format are the target genomes in?")
]

@field_validator("geno", "pheno", "variants")
@classmethod
def check_file_suffix(cls, name: str) -> str:
if name.endswith(".c4gh"):
raise ValueError("Calculation workflow can't handle encrypted files")
return name

@field_validator("sampleset") # type: ignore
@classmethod
def check_sampleset_name(cls, value: str) -> str:
Expand All @@ -133,25 +143,28 @@ def check_sampleset_name(cls, value: str) -> str:

@field_validator("geno") # type: ignore
@classmethod
def check_geno_suffix(cls, value: pathlib.Path) -> pathlib.Path:
match suffix := value.suffix:
def check_geno_suffix(cls, value: str) -> str:
path = pathlib.Path(value)
match suffix := path.suffix:
case ".pgen" | ".bed":
pass
case ".gz" if ".vcf" in value.suffixes:
case ".gz" if ".vcf" in path.suffixes:
pass
case _:
raise ValueError(f"Genotype file {suffix=} is not a supported format")
return value

@field_validator("variants") # type: ignore
@classmethod
def check_variant_suffix(cls, value: pathlib.Path) -> pathlib.Path:
match suffix := value.suffix:
def check_variant_suffix(cls, value: str) -> str:
path = pathlib.Path(value)

match suffix := path.suffix:
case ".pvar" | ".bim":
pass
case ".zst" if ".pvar" in value.suffixes or ".bim" in value.suffixes:
case ".zst" if ".pvar" in path.suffixes or ".bim" in path.suffixes:
pass
case ".gz" if ".bim" in value.suffixes or ".vcf" in value.suffixes:
case ".gz" if ".bim" in path.suffixes or ".vcf" in path.suffixes:
pass
case _:
raise ValueError(
Expand All @@ -162,11 +175,13 @@ def check_variant_suffix(cls, value: pathlib.Path) -> pathlib.Path:

@field_validator("pheno") # type: ignore
@classmethod
def check_pheno_suffix(cls, value: pathlib.Path) -> pathlib.Path:
match suffix := value.suffix:
def check_pheno_suffix(cls, value: str) -> str:
path = pathlib.Path(value)

match suffix := path.suffix:
case ".psam" | ".fam":
pass
case ".gz" if ".vcf" in value.suffixes:
case ".gz" if ".vcf" in path.suffixes:
pass
case _:
raise ValueError(
Expand All @@ -178,8 +193,8 @@ def check_pheno_suffix(cls, value: pathlib.Path) -> pathlib.Path:
@model_validator(mode="after")
def check_format_and_filenames(self) -> Self:
"""Checks the declared format aligns with the file list"""
paths: list[pathlib.Path] = [self.geno, self.pheno, self.variants]
suffixes: list[list[str]] = [x.suffixes for x in paths]
paths: list[str] = [self.geno, self.pheno, self.variants]
suffixes: list[list[str]] = [pathlib.Path(x).suffixes for x in paths]
extensions: set[str] = {item for sublist in suffixes for item in sublist}

# PLINK1/2 files are a triplet of variant information file (text), genotype (binary), and sample information file (text)
Expand Down Expand Up @@ -212,10 +227,6 @@ def check_format_and_filenames(self) -> Self:

return self

@field_serializer("geno", "pheno", "variants")
def serialise_path_to_str(self, path: pathlib.Path) -> str:
return str(path)


class SamplesheetFormat(str, enum.Enum):
"""Nextflow samplesheet format. The API only accepts json, currently.
Expand Down Expand Up @@ -301,9 +312,16 @@ class SecretKeyDetails(BaseModel):
SecretKeyDetails(secret_id=UUID('81d5c400-21b4-4e88-8208-8d64c9920283'), secret_id_version='1')
"""

secret_id: Annotated[UUID4, Field(description="UUIDv4 of secret key")]
secret_id: Annotated[
UUID4, Field(description="UUIDv4 of secret key", serialization_alias="secretId")
]
secret_id_version: Annotated[
str, Field(description="Version of secret key", coerce_numbers_to_str=True)
str,
Field(
description="Version of secret key",
serialization_alias="secretIdVersion",
coerce_numbers_to_str=True,
),
]


Expand Down Expand Up @@ -339,6 +357,8 @@ class JobRequest(BaseModel):
JobRequest(globus_details=GlobusConfig(dir_path_on_guest_collection...
"""

model_config = ConfigDict(validate_assignment=True)

globus_details: Annotated[
GlobusConfig,
Field(description="Globus file handler parameters for data transfer"),
Expand Down
13 changes: 13 additions & 0 deletions pyvatti/tests/test_render.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import json
import subprocess
import tempfile

import pytest
import yaml

from pyvatti.helm import render_template
from pyvatti.messagemodels import JobRequest
Expand All @@ -24,6 +27,9 @@ def test_render(message):
KAFKA_BOOTSTRAP_SERVER="kafka://localhost:9092",
GCP_PROJECT="testproject",
GCP_LOCATION="europe-west2",
KEY_HANDLER_TOKEN="test",
KEY_HANDLER_URL="https://test.example.com/keyhandler",
KEY_HANDLER_PASSWORD="<PASSWORD>",
)

with open(message) as f:
Expand All @@ -43,3 +49,10 @@ def test_render(message):

# check parameters have been set in the template
assert template["nxfParams"]["gcpProject"] == "testproject"

# test that the values file can create a valid template using the helm CLI
with tempfile.NamedTemporaryFile(mode="wt") as temp_f:
yaml.dump(template, temp_f)
cmd = ["helm", "template", settings.HELM_CHART_PATH, "--values", temp_f.name]
helm: subprocess.CompletedProcess = subprocess.run(cmd)
assert helm.returncode == 0

0 comments on commit cda66f6

Please sign in to comment.