Skip to content

Commit

Permalink
Cromwell (#188)
Browse files Browse the repository at this point in the history
  • Loading branch information
jennydaman authored Jan 24, 2022
1 parent 4259951 commit 9c6c0cc
Show file tree
Hide file tree
Showing 18 changed files with 1,196 additions and 4 deletions.
2 changes: 2 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ Dockerfile
.git
LICENSE
CHRIS_REMOTE_FS
venv/

3 changes: 3 additions & 0 deletions pman/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ def __init__(self):
if self.CONTAINER_ENV == 'kubernetes':
self.JOB_NAMESPACE = env('JOB_NAMESPACE', 'default')

if self.CONTAINER_ENV == 'cromwell':
self.CROMWELL_URL = env('CROMWELL_URL')

self.env = env


Expand Down
Empty file added pman/cromwell/__init__.py
Empty file.
94 changes: 94 additions & 0 deletions pman/cromwell/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import io
import json
from dataclasses import dataclass
from typing import Optional, Dict
from .models import (
WorkflowId, StrWdl,
WorkflowIdAndStatus, WorkflowQueryResponse, WorkflowMetadataResponse
)
from cromwell_tools.cromwell_api import CromwellAPI, CromwellAuth
from serde.json import from_json
from os import path
import requests


@dataclass
class CromwellClient:
"""
A wrapper around :mod:`cromwell_tools.cromwell_api` providing a similar
interface but with typed parameters and returns.
"""
auth: CromwellAuth

def submit(self, wdl: StrWdl, label: Dict[str, str]) -> WorkflowIdAndStatus:
"""
Schedule a WDL file to be executed.
:param wdl: WDL
:param label: labels to apply to this workflow
:return: response from Cromwell
"""
res = CromwellAPI.submit(
auth=self.auth,
wdl_file=self.__str2bytesio(wdl),
label_file=self.__create_label(label),
raise_for_status=True
)
return from_json(WorkflowIdAndStatus, res.text)

def status(self, uuid: WorkflowId) -> Optional[WorkflowIdAndStatus]:
"""
https://cromwell.readthedocs.io/en/stable/api/RESTAPI/#retrieves-the-current-state-for-a-workflow
:return: workflow ID and status, or None if workflow not found
"""
res = CromwellAPI.status(uuid=uuid, auth=self.auth, raise_for_status=False)
if res.status_code == 404:
return None
res.raise_for_status()
return from_json(WorkflowIdAndStatus, res.text)

def query(self, label: Optional[Dict[str, str]] = None) -> WorkflowQueryResponse:
query_dict = {}
if label:
query_dict['label'] = label
res = CromwellAPI.query(query_dict=query_dict,
auth=self.auth, raise_for_status=True)
return from_json(WorkflowQueryResponse, res.text)

def metadata(self, uuid: WorkflowId) -> Optional[WorkflowMetadataResponse]:
res = CromwellAPI.metadata(uuid=uuid,
auth=self.auth, raise_for_status=False)
if res.status_code == 404:
return None
res.raise_for_status()
return from_json(WorkflowMetadataResponse, res.text)

def abort(self, uuid: WorkflowId) -> WorkflowIdAndStatus:
"""
https://cromwell.readthedocs.io/en/stable/api/RESTAPI/#abort-a-running-workflow
"""
res = CromwellAPI.abort(uuid=uuid, auth=self.auth, raise_for_status=True)
return from_json(WorkflowIdAndStatus, res.text)

def logs_idc(self, uuid: WorkflowId) -> dict:
"""
This method is not available in upstream cromwell-tools.
"""
uri = path.join(self.auth.url, 'api/workflows/v1', uuid, 'logs')
res = requests.get(uri)
res.raise_for_status()
return res.json()

@classmethod
def __create_label(cls, d: Dict[str, str]) -> io.BytesIO:
"""
Create Cromwell labels from a dictionary of key-value pairs.
https://cromwell.readthedocs.io/en/stable/cromwell_features/Labels/
"""
return cls.__str2bytesio(json.dumps(d))

@staticmethod
def __str2bytesio(s: str) -> io.BytesIO:
return io.BytesIO(bytes(s, 'utf-8'))
143 changes: 143 additions & 0 deletions pman/cromwell/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
"""
Data definitions for Cromwell API responses.
"""
from enum import Enum
from serde import deserialize
from typing import NewType, List, Dict, Optional
from pman.abstractmgr import TimeStamp
from pathlib import Path


StrWdl = NewType('StrWdl', str)
"""WDL as a :type:`str`."""
WorkflowName = NewType('WorkflowName', str)
WorkflowId = NewType('WorkflowId', str)
RuntimeAttributes = Dict[str, str]
"""
Custom information about a task call from Cromwell workflow metadata,
defined by how Cromwell's backend is configured.
p.s. a type alias bc https://github.com/yukinarit/pyserde/issues/192
"""


class WorkflowStatus(Enum):
"""
https://github.com/broadinstitute/cromwell/blob/32d5d0cbf07e46f56d3d070f457eaff0138478d5/wes2cromwell/src/main/scala/wes2cromwell/WesState.scala#L19-L28
"""

# btw, the Cromwell documentation is not accurate. It's missing the "On Hold" status.
# https://broadworkbench.atlassian.net/browse/CROM-6869

OnHold = 'On Hold'
Submitted = 'Submitted'
Running = 'Running'
Aborting = 'Aborting'
Aborted = 'Aborted'
Succeeded = 'Succeeded'
Failed = 'Failed'


@deserialize
class WorkflowIdAndStatus:
"""
https://cromwell.readthedocs.io/en/stable/api/RESTAPI/#workflowidandstatus
"""
id: WorkflowId
status: WorkflowStatus


@deserialize
class WorkflowQueryResult:
"""
https://cromwell.readthedocs.io/en/stable/api/RESTAPI/#workflowqueryresult
"""
end: Optional[TimeStamp]
id: WorkflowId
# name will be undefined for the first few seconds after submission
name: Optional[WorkflowName]
start: Optional[TimeStamp]
status: WorkflowStatus
submission: Optional[TimeStamp]


@deserialize
class WorkflowQueryResponse:
"""
https://cromwell.readthedocs.io/en/stable/api/RESTAPI/#workflowqueryresponse
"""
results: List[WorkflowQueryResult]
totalResultsCount: int


# doesn't seem correct
# @deserialize
# class FailureMessage:
# """
# https://cromwell.readthedocs.io/en/stable/api/RESTAPI/#failuremessage
# """
# failure: str
# timestamp: TimeStamp

@deserialize
class CausedFailure:
message: str
causedBy: List # is actually a List['CausedFailure'],
# but pyserde does not support circular data definition


@deserialize
class CallMetadata:
"""
https://cromwell.readthedocs.io/en/stable/api/RESTAPI/#callmetadata
"""
# these are the conventional fields
backend: Optional[str]
backendLogs: Optional[dict]
backendStatus: Optional[str]
end: Optional[TimeStamp]
executionStatus: str
failures: Optional[List[CausedFailure]]
inputs: Optional[dict]
jobId: Optional[str]
returnCode: Optional[int]
start: Optional[TimeStamp]
stderr: Optional[Path]
stdout: Optional[Path]
# these fields are not documented, yet they are very important
commandLine: Optional[str]
runtimeAttributes: Optional[RuntimeAttributes]
attempt: Optional[int]
# and these, we don't care about
# compressedDockerSize: int
# callCaching: CallCaching
# shardIndex: int


@deserialize
class SubmittedFiles:
workflow: StrWdl
root: str
options: str
inputs: str
workflowUrl: str
labels: str


@deserialize
class WorkflowMetadataResponse:
"""
https://cromwell.readthedocs.io/en/stable/api/RESTAPI/#workflowmetadataresponse
"""
calls: Optional[Dict[str, List[CallMetadata]]]
end: Optional[TimeStamp]
failures: Optional[List[CausedFailure]]
id: WorkflowId
inputs: Optional[dict]
outputs: Optional[dict]
start: Optional[TimeStamp]
status: WorkflowStatus
submission: TimeStamp
# these fields are undocumented
labels: Dict[str, str]
submittedFiles: Optional[SubmittedFiles]
Loading

0 comments on commit 9c6c0cc

Please sign in to comment.