From 9c6c0cc8b50924d3fdb06323b25b38d50cdc4c49 Mon Sep 17 00:00:00 2001 From: Jennings Zhang Date: Mon, 24 Jan 2022 16:24:41 -0500 Subject: [PATCH] Cromwell (#188) --- .dockerignore | 2 + pman/config.py | 3 + pman/cromwell/__init__.py | 0 pman/cromwell/client.py | 94 +++++++ pman/cromwell/models.py | 143 ++++++++++ pman/cromwellmgr.py | 217 ++++++++++++++ pman/e2_wdl.py | 91 ++++++ pman/resources.py | 4 +- requirements/base.txt | 3 +- setup.py | 5 +- tests/cromwell/__init__.py | 0 tests/cromwell/examples/__init__.py | 0 tests/cromwell/examples/metadata.py | 420 ++++++++++++++++++++++++++++ tests/cromwell/examples/query.py | 20 ++ tests/cromwell/helpers.py | 35 +++ tests/cromwell/test_client.py | 29 ++ tests/cromwell/test_cromwellmgr.py | 113 ++++++++ tests/cromwell/test_wdl.py | 21 ++ 18 files changed, 1196 insertions(+), 4 deletions(-) create mode 100644 pman/cromwell/__init__.py create mode 100644 pman/cromwell/client.py create mode 100644 pman/cromwell/models.py create mode 100755 pman/cromwellmgr.py create mode 100644 pman/e2_wdl.py create mode 100644 tests/cromwell/__init__.py create mode 100644 tests/cromwell/examples/__init__.py create mode 100644 tests/cromwell/examples/metadata.py create mode 100644 tests/cromwell/examples/query.py create mode 100644 tests/cromwell/helpers.py create mode 100644 tests/cromwell/test_client.py create mode 100644 tests/cromwell/test_cromwellmgr.py create mode 100644 tests/cromwell/test_wdl.py diff --git a/.dockerignore b/.dockerignore index 4d586b6b..6153dc18 100755 --- a/.dockerignore +++ b/.dockerignore @@ -6,3 +6,5 @@ Dockerfile .git LICENSE CHRIS_REMOTE_FS +venv/ + diff --git a/pman/config.py b/pman/config.py index 4d815ef7..a3f9f67f 100755 --- a/pman/config.py +++ b/pman/config.py @@ -42,6 +42,9 @@ def __init__(self): if self.CONTAINER_ENV == 'kubernetes': self.JOB_NAMESPACE = env('JOB_NAMESPACE', 'default') + if self.CONTAINER_ENV == 'cromwell': + self.CROMWELL_URL = env('CROMWELL_URL') + self.env = env diff --git a/pman/cromwell/__init__.py b/pman/cromwell/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pman/cromwell/client.py b/pman/cromwell/client.py new file mode 100644 index 00000000..499d266a --- /dev/null +++ b/pman/cromwell/client.py @@ -0,0 +1,94 @@ +import io +import json +from dataclasses import dataclass +from typing import Optional, Dict +from .models import ( + WorkflowId, StrWdl, + WorkflowIdAndStatus, WorkflowQueryResponse, WorkflowMetadataResponse +) +from cromwell_tools.cromwell_api import CromwellAPI, CromwellAuth +from serde.json import from_json +from os import path +import requests + + +@dataclass +class CromwellClient: + """ + A wrapper around :mod:`cromwell_tools.cromwell_api` providing a similar + interface but with typed parameters and returns. + """ + auth: CromwellAuth + + def submit(self, wdl: StrWdl, label: Dict[str, str]) -> WorkflowIdAndStatus: + """ + Schedule a WDL file to be executed. + + :param wdl: WDL + :param label: labels to apply to this workflow + :return: response from Cromwell + """ + res = CromwellAPI.submit( + auth=self.auth, + wdl_file=self.__str2bytesio(wdl), + label_file=self.__create_label(label), + raise_for_status=True + ) + return from_json(WorkflowIdAndStatus, res.text) + + def status(self, uuid: WorkflowId) -> Optional[WorkflowIdAndStatus]: + """ + https://cromwell.readthedocs.io/en/stable/api/RESTAPI/#retrieves-the-current-state-for-a-workflow + + :return: workflow ID and status, or None if workflow not found + """ + res = CromwellAPI.status(uuid=uuid, auth=self.auth, raise_for_status=False) + if res.status_code == 404: + return None + res.raise_for_status() + return from_json(WorkflowIdAndStatus, res.text) + + def query(self, label: Optional[Dict[str, str]] = None) -> WorkflowQueryResponse: + query_dict = {} + if label: + query_dict['label'] = label + res = CromwellAPI.query(query_dict=query_dict, + auth=self.auth, raise_for_status=True) + return from_json(WorkflowQueryResponse, res.text) + + def metadata(self, uuid: WorkflowId) -> Optional[WorkflowMetadataResponse]: + res = CromwellAPI.metadata(uuid=uuid, + auth=self.auth, raise_for_status=False) + if res.status_code == 404: + return None + res.raise_for_status() + return from_json(WorkflowMetadataResponse, res.text) + + def abort(self, uuid: WorkflowId) -> WorkflowIdAndStatus: + """ + https://cromwell.readthedocs.io/en/stable/api/RESTAPI/#abort-a-running-workflow + """ + res = CromwellAPI.abort(uuid=uuid, auth=self.auth, raise_for_status=True) + return from_json(WorkflowIdAndStatus, res.text) + + def logs_idc(self, uuid: WorkflowId) -> dict: + """ + This method is not available in upstream cromwell-tools. + """ + uri = path.join(self.auth.url, 'api/workflows/v1', uuid, 'logs') + res = requests.get(uri) + res.raise_for_status() + return res.json() + + @classmethod + def __create_label(cls, d: Dict[str, str]) -> io.BytesIO: + """ + Create Cromwell labels from a dictionary of key-value pairs. + + https://cromwell.readthedocs.io/en/stable/cromwell_features/Labels/ + """ + return cls.__str2bytesio(json.dumps(d)) + + @staticmethod + def __str2bytesio(s: str) -> io.BytesIO: + return io.BytesIO(bytes(s, 'utf-8')) diff --git a/pman/cromwell/models.py b/pman/cromwell/models.py new file mode 100644 index 00000000..428fed6c --- /dev/null +++ b/pman/cromwell/models.py @@ -0,0 +1,143 @@ +""" +Data definitions for Cromwell API responses. +""" +from enum import Enum +from serde import deserialize +from typing import NewType, List, Dict, Optional +from pman.abstractmgr import TimeStamp +from pathlib import Path + + +StrWdl = NewType('StrWdl', str) +"""WDL as a :type:`str`.""" +WorkflowName = NewType('WorkflowName', str) +WorkflowId = NewType('WorkflowId', str) +RuntimeAttributes = Dict[str, str] +""" +Custom information about a task call from Cromwell workflow metadata, +defined by how Cromwell's backend is configured. + +p.s. a type alias bc https://github.com/yukinarit/pyserde/issues/192 +""" + + +class WorkflowStatus(Enum): + """ + https://github.com/broadinstitute/cromwell/blob/32d5d0cbf07e46f56d3d070f457eaff0138478d5/wes2cromwell/src/main/scala/wes2cromwell/WesState.scala#L19-L28 + """ + + # btw, the Cromwell documentation is not accurate. It's missing the "On Hold" status. + # https://broadworkbench.atlassian.net/browse/CROM-6869 + + OnHold = 'On Hold' + Submitted = 'Submitted' + Running = 'Running' + Aborting = 'Aborting' + Aborted = 'Aborted' + Succeeded = 'Succeeded' + Failed = 'Failed' + + +@deserialize +class WorkflowIdAndStatus: + """ + https://cromwell.readthedocs.io/en/stable/api/RESTAPI/#workflowidandstatus + """ + id: WorkflowId + status: WorkflowStatus + + +@deserialize +class WorkflowQueryResult: + """ + https://cromwell.readthedocs.io/en/stable/api/RESTAPI/#workflowqueryresult + """ + end: Optional[TimeStamp] + id: WorkflowId + # name will be undefined for the first few seconds after submission + name: Optional[WorkflowName] + start: Optional[TimeStamp] + status: WorkflowStatus + submission: Optional[TimeStamp] + + +@deserialize +class WorkflowQueryResponse: + """ + https://cromwell.readthedocs.io/en/stable/api/RESTAPI/#workflowqueryresponse + """ + results: List[WorkflowQueryResult] + totalResultsCount: int + + +# doesn't seem correct +# @deserialize +# class FailureMessage: +# """ +# https://cromwell.readthedocs.io/en/stable/api/RESTAPI/#failuremessage +# """ +# failure: str +# timestamp: TimeStamp + +@deserialize +class CausedFailure: + message: str + causedBy: List # is actually a List['CausedFailure'], + # but pyserde does not support circular data definition + + +@deserialize +class CallMetadata: + """ + https://cromwell.readthedocs.io/en/stable/api/RESTAPI/#callmetadata + """ + # these are the conventional fields + backend: Optional[str] + backendLogs: Optional[dict] + backendStatus: Optional[str] + end: Optional[TimeStamp] + executionStatus: str + failures: Optional[List[CausedFailure]] + inputs: Optional[dict] + jobId: Optional[str] + returnCode: Optional[int] + start: Optional[TimeStamp] + stderr: Optional[Path] + stdout: Optional[Path] + # these fields are not documented, yet they are very important + commandLine: Optional[str] + runtimeAttributes: Optional[RuntimeAttributes] + attempt: Optional[int] + # and these, we don't care about + # compressedDockerSize: int + # callCaching: CallCaching + # shardIndex: int + + +@deserialize +class SubmittedFiles: + workflow: StrWdl + root: str + options: str + inputs: str + workflowUrl: str + labels: str + + +@deserialize +class WorkflowMetadataResponse: + """ + https://cromwell.readthedocs.io/en/stable/api/RESTAPI/#workflowmetadataresponse + """ + calls: Optional[Dict[str, List[CallMetadata]]] + end: Optional[TimeStamp] + failures: Optional[List[CausedFailure]] + id: WorkflowId + inputs: Optional[dict] + outputs: Optional[dict] + start: Optional[TimeStamp] + status: WorkflowStatus + submission: TimeStamp + # these fields are undocumented + labels: Dict[str, str] + submittedFiles: Optional[SubmittedFiles] diff --git a/pman/cromwellmgr.py b/pman/cromwellmgr.py new file mode 100755 index 00000000..9373230e --- /dev/null +++ b/pman/cromwellmgr.py @@ -0,0 +1,217 @@ +""" +TODO: another microservice to fill functionality not provided by Cromwell + +- manager.get_job_logs --> return stdout +- manager.remove_job --> remove files + +""" + +import json +import logging +import time +from typing import Optional +from .abstractmgr import AbstractManager, ManagerException, JobStatus, JobInfo, Image, JobName, TimeStamp +from .cromwell.models import ( + WorkflowId, StrWdl, + WorkflowStatus, WorkflowIdAndStatus, WorkflowQueryResult, + WorkflowMetadataResponse +) +from .cromwell.client import CromwellAuth, CromwellClient +from .e2_wdl import ChRISJob, SlurmRuntimeAttributes + + +STATUS_MAP = { + WorkflowStatus.OnHold: JobStatus.notstarted, + WorkflowStatus.Submitted: JobStatus.notstarted, + WorkflowStatus.Running: JobStatus.started, + WorkflowStatus.Aborted: JobStatus.finishedWithError, + WorkflowStatus.Aborting: JobStatus.finishedWithError, + WorkflowStatus.Succeeded: JobStatus.finishedSuccessfully, + WorkflowStatus.Failed: JobStatus.finishedWithError +} + +logger = logging.getLogger(__name__) + + +class CromwellManager(AbstractManager[WorkflowId]): + """ + A Cromwell shim for ``pman``. + + https://cromwell.readthedocs.io/ + + Instead of defining workflow inputs and outputs, the ``CromwellManager`` + expects for a plugin instance's input files to already exist in the + remote compute environment's filesystem, and instructs Cromwell to also + write output files to its filesystem, same as how "storeBase" works with + the *docker swarm* ``pman`` backend. + + Tip: the workflow name is not the ``pman`` job name! Currently, the + workflow name is hard-coded for every workflow to be ``ChRISJob``. + Instead, the ``pman`` job name is tracked by Cromwell as a label with + the key :const:`PMAN_CROMWELL_LABEL`. + """ + + PMAN_CROMWELL_LABEL = 'org.chrisproject.pman.name' + """The Cromwell label key for pman job names.""" + + def __init__(self, config_dict=None): + super().__init__(config_dict) + auth = CromwellAuth(config_dict['CROMWELL_URL']) + self.__client = CromwellClient(auth) + + def schedule_job(self, image: Image, command: str, name: JobName, + resources_dict: dict, mountdir: Optional[str] = None) -> WorkflowId: + wdl = ChRISJob(image, command, mountdir, resources_dict).to_wdl() + res = self.__submit(wdl, name) + # Submission does not appear in Cromwell immediately, but pman wants to + # get job info, so we need to wait for Cromwell to catch up. + self.__must_be_submitted(res) + return res.id + + def __submit(self, wdl: StrWdl, name: JobName) -> WorkflowIdAndStatus: + """ + Schedule a WDL file to be executed, and then wait for Cromwell to register it. + + :param wdl: WDL + :param name: ID which pman can be queried for to retrieve the submitted workflow + :return: response from Cromwell + """ + + res = self.__client.submit(wdl, label={self.PMAN_CROMWELL_LABEL: name}) + self.__must_be_submitted(res) + if not self.__block_until_metadata_available(res.id): + raise CromwellException('Workflow was submitted, but timed out waiting for ' + f'Cromwell to produce a call on: {res.id}') + return res + + @staticmethod + def __must_be_submitted(res: WorkflowIdAndStatus): + if res.status != WorkflowStatus.Submitted: + raise CromwellException(f'Workflow status is not "Submitted": {res}') + + def __block_until_metadata_available(self, uuid: WorkflowId, tries=20, interval=1) -> bool: + """ + Poll for a workflow's metadata until a call has been produced by Cromwell. + + After submitting a workflow, it take a little while for it to register in + Cromwell, and then it takes a little bit more for it to be parsed, processed, + and then finally scheduled. + + :param uuid: workflow UUID + :param tries: number of metadata request attempts + :param interval: seconds to wait between attempts + :return: True if a call has been made before timeout, otherwise False + """ + if tries <= 0: + return False + time.sleep(interval) + if self._check_job_info(uuid) is not None: + return True + return self.__block_until_metadata_available(uuid, tries - 1, interval) + + def get_job(self, name: JobName) -> WorkflowId: + job = self.__query_by_name(name) + if job: + return job.id + raise CromwellException(f'No job found for name="{name}"', status_code=404) + + def get_job_logs(self, job: WorkflowId) -> str: + # cromwell_tools.utilities.download + data = self.__client.logs_idc(job) + return ( + 'Logs not yet supported for Cromwell backend. ' + 'They can be read manually from here:\n' + + json.dumps(data, indent=2) + ) + + def get_job_info(self, job: WorkflowId) -> JobInfo: + info = self._check_job_info(job) + if info is None: + raise CromwellException(f'Info not available for WorkflowId={job}', status_code=404) + return info + + def _check_job_info(self, uuid: WorkflowId) -> Optional[JobInfo]: + """ + Get job info from Cromwell metadata if available. + """ + res = self.__client.metadata(uuid) + if res is None: + return None + if self.__is_complete_call(res): + return self.__info_from_complete_call(res) + if res.submittedFiles is not None: + return self.__info_from_early_submission(res) + return None + + @staticmethod + def __is_complete_call(res: WorkflowMetadataResponse) -> bool: + """ + :return: True if metadata shows that Cromwell has picked up and processed the workflow + """ + return ( + 'ChRISJob.plugin_instance' in res.calls + and len(res.calls['ChRISJob.plugin_instance']) >= 1 + and res.calls['ChRISJob.plugin_instance'][0].commandLine is not None + ) + + @classmethod + def __info_from_complete_call(cls, res: WorkflowMetadataResponse) -> JobInfo: + """ + Get info from a workflow which was picked up and processed by Cromwell. + """ + if len(res.calls['ChRISJob.plugin_instance']) > 1: + logger.warning('Task "ChRISJob.plugin_instance" has multiple calls: %s', str(res)) + + call = res.calls['ChRISJob.plugin_instance'][0] + attrs = SlurmRuntimeAttributes.deserialize(call.runtimeAttributes) + + return JobInfo( + name=JobName(res.labels[cls.PMAN_CROMWELL_LABEL]), + image=attrs.docker, + cmd=call.commandLine, + timestamp=res.end if res.end is not None else '', + message=str(res.status), # whatever + status=STATUS_MAP[res.status] + ) + + @classmethod + def __info_from_early_submission(cls, res: WorkflowMetadataResponse) -> JobInfo: + """ + Get info from a workflow by parsing its submittedFiles. + """ + job_details = ChRISJob.from_wdl(res.submittedFiles.workflow) + labels = json.loads(res.submittedFiles.labels) + + message = 'Waiting to be picked up by Cromwell' + if 'ChRISJob.plugin_instance' in res.calls and len(res.calls['ChRISJob.plugin_instance']) >= 1: + message = res.calls['ChRISJob.plugin_instance'][0].executionStatus + + return JobInfo( + name=labels[cls.PMAN_CROMWELL_LABEL], + image=job_details.image, + cmd=job_details.command, + timestamp=TimeStamp(''), + message=message, + status=JobStatus.notstarted + ) + + def __query_by_name(self, name: JobName) -> Optional[WorkflowQueryResult]: + """ + Get a single job by name. + + :raises CromwellException: if multiple jobs found by the given name + """ + res = self.__client.query({self.PMAN_CROMWELL_LABEL: name}) + if res.totalResultsCount < 1: + return None + if res.totalResultsCount > 1: + logger.warning('More than one job where name="%s" found in: %s', name, str(res)) + # we will return the first one in the list, which is probably the most recent + return res.results[0] + + def remove_job(self, job: WorkflowId): + self.__client.abort(job) + + +class CromwellException(ManagerException): + pass diff --git a/pman/e2_wdl.py b/pman/e2_wdl.py new file mode 100644 index 00000000..6d37ebba --- /dev/null +++ b/pman/e2_wdl.py @@ -0,0 +1,91 @@ +""" +WDL template for running a *ChRIS* plugin on the BCH *E2* SLURM. + +TODO pass resources_dict (requested CPU, mem, GPU, ...) into the WDL task runtime + +Maybe it would be nice to set a workflow name instead of just "ChrisPlugin" +but it doesn't really matter. +""" + +from typing import Optional, Tuple + +from serde import from_dict, deserialize +from jinja2 import Environment +from .abstractmgr import Image +from pman.cromwell.models import StrWdl, RuntimeAttributes +from dataclasses import dataclass + + +template = Environment().from_string(r""" +version 1.0 + +task plugin_instance { + command { + {{ cmd }} + } #ENDCOMMAND + runtime { + docker: '{{ docker }}' + sharedir: '{{ sharedir }}' + } +} + +workflow ChRISJob { + call plugin_instance +} +""") + + +@dataclass +class ChRISJob: + """ + Represents a ChRIS plugin instance which runs on E2. + """ + image: Image + command: str + sharedir: str + resources_dict: Optional[dict] = None + + def to_wdl(self) -> StrWdl: + """ + :return: a WDL wrapper for a *ChRIS* plugin instance + """ + return StrWdl(template.render( + cmd=self.command, docker=self.image, sharedir=self.sharedir + )) + + @classmethod + def from_wdl(cls, wdl: StrWdl) -> 'ChRISJob': + command, end = cls._get_between(wdl, 'command {\n', ' } #ENDCOMMAND\n', 35) + image, end = cls._get_between(wdl, "docker: '", "'\n", end) + sharedir, _ = cls._get_between(wdl, "sharedir: '", "'\n", end) + return cls(Image(image), command.strip(), sharedir) + + @staticmethod + def _get_between(data: str, lookahead: str, lookbehind: str, start: int = 0) -> Tuple[str, int]: + """ + Some light parsing because miniwdl is not mini at all, and regex is ugly. + """ + beginning = data.index(lookahead, start) + len(lookahead) + end = data.index(lookbehind, beginning) + return data[beginning:end], end + + +@deserialize +class SlurmRuntimeAttributes: + """ + These fields are custom to how Cromwell is configured to speak with BCH E2 SLURM. + """ + runtime_minutes: int + queue: str + requested_memory_mb_per_core: int + failOnStderr: bool + sharedir: str + continueOnReturnCode: int + docker: Image + maxRetries: int + cpus: int + account: str + + @classmethod + def deserialize(cls, _a: RuntimeAttributes) -> 'SlurmRuntimeAttributes': + return from_dict(cls, _a) diff --git a/pman/resources.py b/pman/resources.py index bc592864..67c01b47 100755 --- a/pman/resources.py +++ b/pman/resources.py @@ -10,7 +10,7 @@ from .openshiftmgr import OpenShiftManager from .kubernetesmgr import KubernetesManager from .swarmmgr import SwarmManager - +from .cromwellmgr import CromwellManager logger = logging.getLogger(__name__) @@ -39,6 +39,8 @@ def get_compute_mgr(container_env): compute_mgr = KubernetesManager(app.config) elif container_env == 'openshift': compute_mgr = OpenShiftManager() + elif container_env == 'cromwell': + compute_mgr = CromwellManager(app.config) return compute_mgr diff --git a/requirements/base.txt b/requirements/base.txt index 96337e33..1da458c6 100755 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -7,4 +7,5 @@ python-keystoneclient==4.2.0 pfmisc==2.2.4 environs==9.3.2 emoji==1.2.0 - +cromwell-tools==2.4.1 +pyserde==0.6.0 diff --git a/setup.py b/setup.py index cd48b46c..ec8c2ed8 100755 --- a/setup.py +++ b/setup.py @@ -15,8 +15,9 @@ url = 'https://github.com/FNNDSC/pman', packages = find_packages(), - install_requires = ['pudb', 'pfmisc', 'docker', 'openshift', 'kubernetes', - 'python-keystoneclient', 'Flask', 'Flask_RESTful', 'environs'], + install_requires = ['pudb', 'pfmisc', 'docker', 'openshift', 'kubernetes', 'cromwell-tools', + 'python-keystoneclient', 'Flask', 'Flask_RESTful', 'environs', 'pyserde', + 'jinja2'], test_suite = 'nose.collector', tests_require = ['nose'], scripts = ['bin/pman'], diff --git a/tests/cromwell/__init__.py b/tests/cromwell/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/cromwell/examples/__init__.py b/tests/cromwell/examples/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/cromwell/examples/metadata.py b/tests/cromwell/examples/metadata.py new file mode 100644 index 00000000..d53cc295 --- /dev/null +++ b/tests/cromwell/examples/metadata.py @@ -0,0 +1,420 @@ +from pman.cromwell.models import WorkflowId, WorkflowStatus +from pman.abstractmgr import JobInfo, JobStatus, Image, TimeStamp, JobName +from pman.e2_wdl import ChRISJob + +workflow_uuid = WorkflowId('4165ed81-c121-4a8d-b284-a6dda9ef0aa8') + +expected_running = JobInfo( + name=JobName('example-jid-1234'), + image=Image('docker.io/fnndsc/pl-office-convert:0.0.1'), + cmd='office_convert /share/incoming /share/outgoing', + timestamp=TimeStamp(''), + message=str(WorkflowStatus.Running), + status=JobStatus.started +) + +response_running = r""" +{ + "workflowName": "ChRISJob", + "workflowProcessingEvents": [ + { + "cromwellId": "cromid-fa30812", + "description": "PickedUp", + "timestamp": "2022-01-23T19:03:20.147Z", + "cromwellVersion": "74-10892f4" + } + ], + "actualWorkflowLanguageVersion": "1.0", + "submittedFiles": { + "workflow": "version 1.0\n\ntask plugin_instance {\n command {\n office_convert /share/incoming /share/outgoing\n } #ENDCOMMAND\n runtime {\n docker: 'docker.io/fnndsc/pl-office-convert:0.0.1'\n sharedir: '/mounted/storebase/example-jid-1234'\n }\n}\n\nworkflow ChRISJob {\n call plugin_instance\n}\n", + "workflowType": "WDL", + "root": "", + "workflowTypeVersion": "1.0", + "options": "{\n\n}", + "inputs": "{}", + "workflowUrl": "", + "labels": "{ \"org.chrisproject.pman.name\": \"example-jid-1234\" }\n" + }, + "calls": { + "ChRISJob.plugin_instance": [ + { + "executionStatus": "Running", + "stdout": "/cromwell-executions/ChRISJob/4165ed81-c121-4a8d-b284-a6dda9ef0aa8/call-plugin_instance/execution/stdout", + "backendStatus": "Running", + "compressedDockerSize": 110900053, + "commandLine": "office_convert /share/incoming /share/outgoing", + "shardIndex": -1, + "runtimeAttributes": { + "runtime_minutes": "5", + "queue": "my-slurm-partition", + "requested_memory_mb_per_core": "4000", + "failOnStderr": "false", + "sharedir": "/mounted/storebase/example-jid-1234", + "continueOnReturnCode": "0", + "docker": "docker.io/fnndsc/pl-office-convert:0.0.1", + "maxRetries": "0", + "cpus": "2", + "account": "fnndsc" + }, + "callCaching": { + "allowResultReuse": false, + "effectiveCallCachingMode": "CallCachingOff" + }, + "inputs": {}, + "jobId": "1866268", + "backend": "SLURM", + "stderr": "/cromwell-executions/ChRISJob/4165ed81-c121-4a8d-b284-a6dda9ef0aa8/call-plugin_instance/execution/stderr", + "callRoot": "/cromwell-executions/ChRISJob/4165ed81-c121-4a8d-b284-a6dda9ef0aa8/call-plugin_instance", + "attempt": 1, + "start": "2022-01-23T19:03:21.820Z" + } + ] + }, + "outputs": {}, + "workflowRoot": "/cromwell-executions/ChRISJob/4165ed81-c121-4a8d-b284-a6dda9ef0aa8", + "actualWorkflowLanguage": "WDL", + "id": "4165ed81-c121-4a8d-b284-a6dda9ef0aa8", + "inputs": {}, + "labels": { + "cromwell-workflow-id": "cromwell-4165ed81-c121-4a8d-b284-a6dda9ef0aa8", + "org.chrisproject.pman.name": "example-jid-1234" + }, + "submission": "2022-01-23T18:00:49.346Z", + "status": "Running", + "start": "2022-01-23T19:03:20.171Z" +} +""" + + +expected_failed = JobInfo( + name=JobName('wont-work'), + image=Image('docker.io/fnndsc/pl-office-convert:0.0.1'), + cmd='office_convert /share/incoming /share/outgoing', + timestamp=TimeStamp('2022-01-24T00:19:36.143Z'), + message=str(WorkflowStatus.Failed), + status=JobStatus.finishedWithError +) + + +response_failed = r""" +{ + "workflowName": "ChRISJob", + "workflowProcessingEvents": [ + { + "cromwellId": "cromid-7140408", + "description": "Finished", + "timestamp": "2022-01-24T00:19:36.144Z", + "cromwellVersion": "74-10892f4" + }, + { + "cromwellId": "cromid-7140408", + "description": "PickedUp", + "timestamp": "2022-01-24T00:17:41.861Z", + "cromwellVersion": "74-10892f4" + } + ], + "actualWorkflowLanguageVersion": "1.0", + "submittedFiles": { + "workflow": "\nversion 1.0\n\ntask plugin_instance {\n command {\n /usr/local/bin/python /usr/local/bin/office_convert /share/incoming /share/outgoing\n } #ENDCOMMAND\n runtime {\n docker: 'ghcr.io/fnndsc/pl-office-convert:0.0.2'\n sharedir: '/mounted/storebase/key-wont-work'\n }\n}\n\nworkflow ChRISJob {\n call plugin_instance\n}", + "root": "", + "options": "{\n\n}", + "inputs": "{}", + "workflowUrl": "", + "labels": "{\"org.chrisproject.pman.name\": \"wont-work\"}" + }, + "calls": { + "ChRISJob.plugin_instance": [ + { + "retryableFailure": false, + "executionStatus": "Failed", + "stdout": "/mounted/cromwell-executions/ChRISJob/3f206683-e52c-428d-aaba-d80da957dbdb/call-plugin_instance/execution/stdout", + "backendStatus": "Done", + "commandLine": "/usr/local/bin/python /usr/local/bin/office_convert /share/incoming /share/outgoing", + "shardIndex": -1, + "runtimeAttributes": { + "runtime_minutes": "5", + "queue": "soloq", + "requested_memory_mb_per_core": "4000", + "failOnStderr": "false", + "sharedir": "/mounted/storebase/key-wont-work", + "continueOnReturnCode": "0", + "docker": "ghcr.io/fnndsc/pl-office-convert:0.0.2", + "maxRetries": "0", + "cpus": "2", + "account": "faker" + }, + "callCaching": { + "allowResultReuse": false, + "effectiveCallCachingMode": "CallCachingOff" + }, + "inputs": {}, + "returnCode": 1, + "failures": [ + { + "message": "Job ChRISJob.plugin_instance:NA:1 exited with return code 1 which has not been declared as a valid return code. See 'continueOnReturnCode' runtime attribute for more details.", + "causedBy": [] + } + ], + "jobId": "1866296", + "backend": "SLURM", + "end": "2022-01-24T00:19:35.890Z", + "stderr": "/mounted/cromwell-executions/ChRISJob/3f206683-e52c-428d-aaba-d80da957dbdb/call-plugin_instance/execution/stderr", + "callRoot": "/mounted/cromwell-executions/ChRISJob/3f206683-e52c-428d-aaba-d80da957dbdb/call-plugin_instance", + "attempt": 1, + "executionEvents": [ + { + "startTime": "2022-01-24T00:17:42.922Z", + "description": "Pending", + "endTime": "2022-01-24T00:17:42.923Z" + }, + { + "startTime": "2022-01-24T00:17:44.978Z", + "description": "PreparingJob", + "endTime": "2022-01-24T00:17:44.993Z" + }, + { + "startTime": "2022-01-24T00:17:42.923Z", + "description": "RequestingExecutionToken", + "endTime": "2022-01-24T00:17:44.977Z" + }, + { + "startTime": "2022-01-24T00:17:44.993Z", + "description": "RunningJob", + "endTime": "2022-01-24T00:19:35.549Z" + }, + { + "startTime": "2022-01-24T00:17:44.977Z", + "description": "WaitingForValueStore", + "endTime": "2022-01-24T00:17:44.978Z" + }, + { + "startTime": "2022-01-24T00:19:35.549Z", + "description": "UpdatingJobStore", + "endTime": "2022-01-24T00:19:35.890Z" + } + ], + "start": "2022-01-24T00:17:42.922Z" + } + ] + }, + "outputs": {}, + "workflowRoot": "/mounted/cromwell-executions/ChRISJob/3f206683-e52c-428d-aaba-d80da957dbdb", + "actualWorkflowLanguage": "WDL", + "id": "3f206683-e52c-428d-aaba-d80da957dbdb", + "inputs": {}, + "labels": { + "cromwell-workflow-id": "cromwell-3f206683-e52c-428d-aaba-d80da957dbdb", + "org.chrisproject.pman.name": "wont-work" + }, + "submission": "2022-01-24T00:17:37.151Z", + "status": "Failed", + "failures": [ + { + "causedBy": [ + { + "message": "Job ChRISJob.plugin_instance:NA:1 exited with return code 1 which has not been declared as a valid return code. See 'continueOnReturnCode' runtime attribute for more details.", + "causedBy": [] + } + ], + "message": "Workflow failed" + } + ], + "end": "2022-01-24T00:19:36.143Z", + "start": "2022-01-24T00:17:41.864Z" +} +""" + + +response_done = r""" +{ + "workflowName": "ChRISJob", + "workflowProcessingEvents": [ + { + "cromwellId": "cromid-01f0ee2", + "description": "PickedUp", + "timestamp": "2022-01-24T06:16:35.354Z", + "cromwellVersion": "74-10892f4" + }, + { + "cromwellId": "cromid-01f0ee2", + "description": "Finished", + "timestamp": "2022-01-24T06:16:53.375Z", + "cromwellVersion": "74-10892f4" + } + ], + "actualWorkflowLanguageVersion": "1.0", + "submittedFiles": { + "workflow": "\nversion 1.0\n\ntask plugin_instance {\n command {\n /usr/local/bin/python /usr/local/bin/office_convert /share/incoming /share/outgoing\n } #ENDCOMMAND\n runtime {\n docker: 'ghcr.io/fnndsc/pl-office-convert:0.0.2'\n sharedir: '/mounted/storebase/key-done-and-dusted'\n }\n}\n\nworkflow ChRISJob {\n call plugin_instance\n}", + "root": "", + "options": "{\n\n}", + "inputs": "{}", + "workflowUrl": "", + "labels": "{\"org.chrisproject.pman.name\": \"done-and-dusted\"}" + }, + "calls": { + "ChRISJob.plugin_instance": [ + { + "executionStatus": "Done", + "stdout": "/cromwell-executions/ChRISJob/04e7dec7-b8f1-4408-ae5c-18c69d94b27e/call-plugin_instance/execution/stdout", + "backendStatus": "Done", + "commandLine": "/usr/local/bin/python /usr/local/bin/office_convert /share/incoming /share/outgoing", + "shardIndex": -1, + "outputs": {}, + "runtimeAttributes": { + "runtime_minutes": "5", + "queue": "toplane", + "requested_memory_mb_per_core": "4000", + "failOnStderr": "false", + "sharedir": "/mounted/storebase/key-done-and-dusted", + "continueOnReturnCode": "0", + "docker": "ghcr.io/fnndsc/pl-office-convert:0.0.2", + "maxRetries": "0", + "cpus": "2", + "account": "kled" + }, + "callCaching": { + "allowResultReuse": false, + "effectiveCallCachingMode": "CallCachingOff" + }, + "inputs": {}, + "returnCode": 0, + "jobId": "1866497", + "backend": "SLURM", + "end": "2022-01-24T06:16:53.016Z", + "dockerImageUsed": "ghcr.io/fnndsc/pl-office-convert:0.0.2", + "stderr": "/cromwell-executions/ChRISJob/04e7dec7-b8f1-4408-ae5c-18c69d94b27e/call-plugin_instance/execution/stderr", + "callRoot": "/cromwell-executions/ChRISJob/04e7dec7-b8f1-4408-ae5c-18c69d94b27e/call-plugin_instance", + "attempt": 1, + "executionEvents": [ + { + "startTime": "2022-01-24T06:16:45.110Z", + "description": "PreparingJob", + "endTime": "2022-01-24T06:16:45.154Z" + }, + { + "startTime": "2022-01-24T06:16:45.106Z", + "description": "WaitingForValueStore", + "endTime": "2022-01-24T06:16:45.110Z" + }, + { + "startTime": "2022-01-24T06:16:37.047Z", + "description": "Pending", + "endTime": "2022-01-24T06:16:37.054Z" + }, + { + "startTime": "2022-01-24T06:16:52.395Z", + "description": "UpdatingJobStore", + "endTime": "2022-01-24T06:16:53.017Z" + }, + { + "startTime": "2022-01-24T06:16:37.054Z", + "description": "RequestingExecutionToken", + "endTime": "2022-01-24T06:16:45.106Z" + }, + { + "startTime": "2022-01-24T06:16:45.154Z", + "description": "RunningJob", + "endTime": "2022-01-24T06:16:52.395Z" + } + ], + "start": "2022-01-24T06:16:37.039Z" + } + ] + }, + "outputs": {}, + "workflowRoot": "/cromwell-executions/ChRISJob/04e7dec7-b8f1-4408-ae5c-18c69d94b27e", + "actualWorkflowLanguage": "WDL", + "id": "04e7dec7-b8f1-4408-ae5c-18c69d94b27e", + "inputs": {}, + "labels": { + "cromwell-workflow-id": "cromwell-04e7dec7-b8f1-4408-ae5c-18c69d94b27e", + "org.chrisproject.pman.name": "done-and-dusted" + }, + "submission": "2022-01-24T06:16:33.391Z", + "status": "Succeeded", + "end": "2022-01-24T06:16:53.374Z", + "start": "2022-01-24T06:16:35.381Z" +} +""" + + +expected_notstarted = ChRISJob( + image=Image('ghcr.io/fnndsc/pl-salute-the-sun:latest'), + command='/usr/local/bin/python /usr/local/bin/whatsgood --day sunny /share/incoming /share/outgoing', + sharedir='/storebase/key-vitamin-d' +) + +response_notstarted = r""" +{ + "submittedFiles": { + "workflow": "\nversion 1.0\n\ntask plugin_instance {\n command {\n /usr/local/bin/python /usr/local/bin/whatsgood --day sunny /share/incoming /share/outgoing\n } #ENDCOMMAND\n runtime {\n docker: 'ghcr.io/fnndsc/pl-salute-the-sun:latest'\n sharedir: '/storebase/key-vitamin-d'\n }\n}\n\nworkflow ChRISJob {\n call plugin_instance\n}", + "root": "", + "options": "{\n\n}", + "inputs": "{}", + "workflowUrl": "", + "labels": "{\"org.chrisproject.pman.name\": \"vitamin-d\"}" + }, + "calls": {}, + "outputs": {}, + "id": "70d639fc-d99c-4af9-9d90-519f32a3dc9d", + "inputs": {}, + "labels": { + "cromwell-workflow-id": "cromwell-70d639fc-d99c-4af9-9d90-519f32a3dc9d", + "org.chrisproject.pman.name": "vitamin-d" + }, + "submission": "2022-01-24T07:23:47.397Z", + "status": "Submitted" +} +""" + +expected_queued = ChRISJob( + image=Image('internal.gitlab:5678/fnndsc/pl-fruit:1.2.3'), + command='/usr/local/bin/python /usr/local/bin/fruit_machine --salad orange /share/incoming /share/outgoing', + sharedir='/storebase/key-fruity-fruit' +) + +response_queued = r""" +{ + "workflowName": "ChRISJob", + "workflowProcessingEvents": [ + { + "cromwellId": "cromid-01f0ee2", + "description": "PickedUp", + "timestamp": "2022-01-24T07:23:59.398Z", + "cromwellVersion": "74-10892f4" + } + ], + "actualWorkflowLanguageVersion": "1.0", + "submittedFiles": { + "workflow": "\nversion 1.0\n\ntask plugin_instance {\n command {\n /usr/local/bin/python /usr/local/bin/fruit_machine --salad orange /share/incoming /share/outgoing\n } #ENDCOMMAND\n runtime {\n docker: 'internal.gitlab:5678/fnndsc/pl-fruit:1.2.3'\n sharedir: '/storebase/key-fruity-fruit'\n }\n}\n\nworkflow ChRISJob {\n call plugin_instance\n}", + "root": "", + "options": "{\n\n}", + "inputs": "{}", + "workflowUrl": "", + "labels": "{\"org.chrisproject.pman.name\": \"fruity-fruit\"}" + }, + "calls": { + "ChRISJob.plugin_instance": [ + { + "executionStatus": "QueuedInCromwell", + "shardIndex": -1, + "backend": "SLURM", + "attempt": 1, + "start": "2022-01-24T07:24:00.451Z" + } + ] + }, + "outputs": {}, + "workflowRoot": "/cromwell-executions/ChRISJob/70d639fc-d99c-4af9-9d90-519f32a3dc9d", + "actualWorkflowLanguage": "WDL", + "id": "70d639fc-d99c-4af9-9d90-519f32a3dc9d", + "inputs": {}, + "labels": { + "cromwell-workflow-id": "cromwell-70d639fc-d99c-4af9-9d90-519f32a3dc9d", + "org.chrisproject.pman.name": "fruity-fruit" + }, + "submission": "2022-01-24T07:23:47.397Z", + "status": "Running", + "start": "2022-01-24T07:23:59.400Z" +} +""" diff --git a/tests/cromwell/examples/query.py b/tests/cromwell/examples/query.py new file mode 100644 index 00000000..e4837426 --- /dev/null +++ b/tests/cromwell/examples/query.py @@ -0,0 +1,20 @@ +from pman.cromwell.models import WorkflowId + +expected = WorkflowId('69bec3cb-c2bc-46d3-96a7-8eb15fca2755') + +response_text = r""" +{ + "results": [ + { + "end": "2022-01-23T22:27:58.740Z", + "id": "69bec3cb-c2bc-46d3-96a7-8eb15fca2755", + "metadataArchiveStatus": "Unarchived", + "name": "ChRISJob", + "start": "2022-01-23T22:27:15.279Z", + "status": "Succeeded", + "submission": "2022-01-23T22:27:11.839Z" + } + ], + "totalResultsCount": 1 +} +""" diff --git a/tests/cromwell/helpers.py b/tests/cromwell/helpers.py new file mode 100644 index 00000000..f7563f49 --- /dev/null +++ b/tests/cromwell/helpers.py @@ -0,0 +1,35 @@ +from unittest.mock import patch, Mock +import functools + + +def patch_cromwell_api(method_name: str, response_text: str): + """ + Patch a function of :class:`cromwell_tools.cromwell_api.CromwellAPI` + so that it returns the given data. + + It is not recommended to stack this decorator with the real + :func:`unittest.mock.patch` because the order of operation is + seriously weird. + + :param method_name: the function to patch + :param response_text: the text the mock should respond with + """ + res = Mock() + res.text = response_text + res.status_code = 200 + + def decorator(real_method): + @functools.wraps(real_method) + @patch(f'cromwell_tools.cromwell_api.CromwellAPI.{method_name}') + def wrapper(self, mock_cromwell_method: Mock, *args, **kwargs): + mock_cromwell_method.return_value = res + real_method(self, mock_cromwell_method, *args, **kwargs) + return wrapper + return decorator + + +def create_404_response(uuid) -> Mock: + res = Mock() + res.text = f'{{"message": "Unrecognized workflow ID: {uuid}", "status": "fail"}}' + res.status_code = 404 + return res diff --git a/tests/cromwell/test_client.py b/tests/cromwell/test_client.py new file mode 100644 index 00000000..5ed9b4a5 --- /dev/null +++ b/tests/cromwell/test_client.py @@ -0,0 +1,29 @@ +import unittest +from unittest.mock import Mock +from tests.cromwell.helpers import patch_cromwell_api +from cromwell_tools.cromwell_auth import CromwellAuth +from pman.cromwell.client import CromwellClient +from pman.cromwell.models import WorkflowId, WorkflowStatus +import tests.cromwell.examples.metadata as metadata_example + + +class CromwellClientTestCase(unittest.TestCase): + + @classmethod + def setUpClass(cls) -> None: + cls.client = CromwellClient(CromwellAuth('https://example.com')) + + @patch_cromwell_api('metadata', metadata_example.response_failed) + def test_metadata_failed(self, _): + res = self.client.metadata(WorkflowId('wont-work')) + self.assertEqual(WorkflowStatus.Failed, res.status) + self.assertEqual(metadata_example.expected_failed.timestamp, res.end) + + @patch_cromwell_api('metadata', metadata_example.response_done) + def test_metadata_done(self, _): + res = self.client.metadata(WorkflowId('done-and-dusted')) + self.assertEqual(WorkflowStatus.Succeeded, res.status) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/cromwell/test_cromwellmgr.py b/tests/cromwell/test_cromwellmgr.py new file mode 100644 index 00000000..062d6e73 --- /dev/null +++ b/tests/cromwell/test_cromwellmgr.py @@ -0,0 +1,113 @@ +import io +import json +import unittest +from unittest.mock import Mock, patch, ANY, call +from pman.abstractmgr import Image, JobName +from pman.cromwellmgr import CromwellManager, CromwellException, WorkflowId +import tests.cromwell.examples.metadata as metadata_example +import tests.cromwell.examples.query as query_example +from tests.cromwell.helpers import patch_cromwell_api, create_404_response + + +class CromwellTestCase(unittest.TestCase): + + @classmethod + def setUpClass(cls) -> None: + cls.manager = CromwellManager({'CROMWELL_URL': 'https://example.com/'}) + + @patch('time.sleep') + @patch('pman.e2_wdl.ChRISJob.to_wdl') + @patch('cromwell_tools.cromwell_api.CromwellAPI.metadata') + @patch('cromwell_tools.cromwell_api.CromwellAPI.submit') + def test_submit(self, mock_submit: Mock, mock_metadata: Mock, + mock_inflate: Mock, mock_sleep: Mock): + # mock WDL template + fake_wdl = 'fake wdl' + mock_inflate.return_value = fake_wdl + + # Workflow does not immediately appear in Cromwell after being submitted, + # but pman wants to get job info, so we need to poll Cromwell a few times. + ok_res = Mock() + ok_res.status_code = 200 + ok_res.text = metadata_example.response_notstarted + status_responses = [ + create_404_response('example-jid-4567'), + create_404_response('example-jid-4567'), + create_404_response('example-jid-4567'), + ok_res + ] + mock_metadata.side_effect = status_responses + + mock_submit.return_value = Mock() + mock_submit.return_value.text = r'{"id": "example-jid-4567", "status": "Submitted"}' + + self.manager.schedule_job( + Image('fnndsc/pl-simpledsapp'), 'simpledsapp /in /out', + JobName('example-jid-4567'), {}, '/storeBase/whatever' + ) + + # assert submitted with correct data + mock_submit.assert_called_once() + self.assertBytesIOEqual(fake_wdl, mock_submit.call_args.kwargs['wdl_file']) + self.assertBytesIOEqualDict( + {CromwellManager.PMAN_CROMWELL_LABEL: 'example-jid-4567'}, + mock_submit.call_args.kwargs['label_file'] + ) + + # assert polling worked + mock_metadata.assert_has_calls( + # check that status was polled + [call(uuid='example-jid-4567', auth=ANY, raise_for_status=False)] * len(status_responses) + ) + mock_sleep.assert_has_calls([call(1)] * 4) + + def assertBytesIOEqual(self, expected: str, actual: io.BytesIO): + self.assertEqual(expected, self.__bytesIO2str(actual)) + + def assertBytesIOEqualDict(self, expected: dict, actual: io.BytesIO): + self.assertDictEqual(expected, json.loads(self.__bytesIO2str(actual))) + + @staticmethod + def __bytesIO2str(_b: io.BytesIO) -> str: + return _b.getvalue().decode('utf-8') + + @patch_cromwell_api('submit', r'{"id": "donut", "status": "On Hold"}') + def test_submit_bad_response(self, _): + with self.assertRaises(CromwellException): + self.manager.schedule_job( + Image('fnndsc/pl-simpledsapp'), 'simpledsapp /in /out', + JobName('example-jid-4567'), {}, '/storeBase/whatever' + ) + + @patch_cromwell_api('metadata', metadata_example.response_running) + def test_get_job_info(self, mock_metadata: Mock): + job_info = self.manager.get_job_info(metadata_example.workflow_uuid) + mock_metadata.assert_called_once_with(uuid=metadata_example.workflow_uuid, + auth=ANY, raise_for_status=False) + self.assertEqual(metadata_example.expected_running, job_info) + + @patch_cromwell_api('query', query_example.response_text) + def test_get_job(self, mock_query: Mock): + job_name = JobName('sushi') + self.assertEqual(query_example.expected, self.manager.get_job(job_name)) + mock_query.assert_called_once_with( + query_dict={'label': {self.manager.PMAN_CROMWELL_LABEL: 'sushi'}}, + auth=ANY, raise_for_status=True + ) + + @patch_cromwell_api('query', r'{"results": [], "totalResultsCount": 0}') + def test_get_job_not_found(self, mock_query: Mock): + with self.assertRaises(CromwellException) as e: + self.manager.get_job(JobName('sushi')) + self.assertEqual(404, e.exception.status_code, + msg='Should have Exception with status_code=404 when job not found') + + @patch_cromwell_api('abort', r'{"id": "tbh didnt actually try this one", "status": "Aborting"}') + def test_abort(self, mock_abort: Mock): + w = WorkflowId('remove-me') + self.manager.remove_job(w) + mock_abort.assert_called_once_with(uuid=w, auth=ANY, raise_for_status=True) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/cromwell/test_wdl.py b/tests/cromwell/test_wdl.py new file mode 100644 index 00000000..b75ad00d --- /dev/null +++ b/tests/cromwell/test_wdl.py @@ -0,0 +1,21 @@ +import unittest +from pman.e2_wdl import ChRISJob +from pman.cromwell.models import WorkflowMetadataResponse +import tests.cromwell.examples.metadata as examples +from serde.json import from_json + + +class WdlTestCase(unittest.TestCase): + + @classmethod + def setUpClass(cls) -> None: + cls.example1: WorkflowMetadataResponse = from_json(WorkflowMetadataResponse, examples.response_notstarted) + cls.example2: WorkflowMetadataResponse = from_json(WorkflowMetadataResponse, examples.response_queued) + + def test_parse_wdl(self): + self.assertEqual(examples.expected_notstarted, ChRISJob.from_wdl(self.example1.submittedFiles.workflow)) + self.assertEqual(examples.expected_queued, ChRISJob.from_wdl(self.example2.submittedFiles.workflow)) + + +if __name__ == '__main__': + unittest.main()