Skip to content

Commit

Permalink
Detect and flatten nested WDL directories (#268)
Browse files Browse the repository at this point in the history
* Detect and flatten nested WDL directories to make execution compatible with Cromwell.

Co-authored-by: bshifaw <[email protected]>
  • Loading branch information
kvg and bshifaw authored Sep 27, 2023
1 parent 66ff06d commit 9473930
Show file tree
Hide file tree
Showing 6 changed files with 294 additions and 2 deletions.
25 changes: 24 additions & 1 deletion src/cromshell/submit/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import csv
import json
import logging
import tempfile
from datetime import datetime
from pathlib import Path, PurePath

Expand Down Expand Up @@ -61,14 +62,36 @@ class ValidationFailedError(Exception):
default=False,
help="Do not check womtool for validation before submitting.",
)
@click.option(
"--do-not-flatten-wdls",
is_flag=True,
default=False,
help=".",
)
@click.pass_obj
def main(config, wdl, wdl_json, options_json, dependencies_zip, no_validation):
def main(
config,
wdl,
wdl_json,
options_json,
dependencies_zip,
no_validation,
do_not_flatten_wdls,
):
"""Submit a workflow and arguments to the Cromwell Server"""

LOGGER.info("submit")

http_utils.assert_can_communicate_with_server(config=config)

if not do_not_flatten_wdls and io_utils.has_nested_dependencies(wdl):
tempdir = tempfile.TemporaryDirectory(prefix="cromshell_")

LOGGER.info(f"Flattening WDL structure to {tempdir.name}.")

wdl = io_utils.flatten_nested_dependencies(tempdir, wdl)
dependencies_zip = tempdir.name

if no_validation:
LOGGER.info("Skipping WDL validation")
else:
Expand Down
77 changes: 77 additions & 0 deletions src/cromshell/utilities/io_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import re
import shutil
import tempfile
from contextlib import nullcontext
from io import BytesIO
from pathlib import Path
Expand Down Expand Up @@ -101,6 +102,82 @@ def assert_path_is_not_empty(path: Union[str, Path], description: str) -> None:
raise EOFError(f"ERROR: {description} is empty: {path}.")


def has_nested_dependencies(wdl_path: str or Path) -> bool:
"""Determine if a WDL has any nested imports."""

with open(wdl_path, "r") as rf:
for line in rf:
if line.startswith("import"):
m = re.match(r'import "(.+)"', line)

imported_wdl_name = m.group(1)
if "../" in imported_wdl_name:
return True

return False


def get_flattened_filename(tempdir: str, wdl_path: str or Path) -> Path:
"""Generate hyphen-separated path to use for flattened WDL file path.
For example:
tempdir: /path/2/tempdir/ and wdl_path: /dir/path/2/wdl.wdl
returns: /path/2/tempdir/dir-path-2-wdl.wdl
"""

p = Path(wdl_path)

return Path(
tempdir
+ "/"
+ re.sub("^-", "", re.sub("/", "-", str(p.parent)))
+ "-"
+ str(p.name)
)


def flatten_nested_dependencies(
tempdir: tempfile.TemporaryDirectory, wdl_path: str
) -> Path:
"""Flatten a WDL directory structure and rewrite imports accordingly.
Return string representing the filesystem location of the rewritten WDL.
tempdir: /path/2/tempdir/
wdl_path: /dir/path/2/wdl.wdl
returns: /path/2/tempdir/dir-path-2-wdl.wdl
"""

p = Path(wdl_path)
wdl_dir = p.parent

new_wdl_path = get_flattened_filename(tempdir.name, wdl_path)

with open(wdl_path, "r") as rf, open(new_wdl_path, "w") as wf:
for line in rf:
if line.startswith("import"):
m = re.match(r'import "(.+)"', line)
imported_wdl_name = m.group(1)
imported_wdl_path = (Path(wdl_dir) / imported_wdl_name).resolve()
import_line = re.sub(
imported_wdl_name,
Path(get_flattened_filename(tempdir.name, imported_wdl_path)).name,
line,
)

if " as " in line:
wf.write(import_line)
else:
wf.write(
f'{import_line.strip()} as {re.sub(".wdl", "", Path(imported_wdl_path).name)}\n'
)

flatten_nested_dependencies(tempdir, imported_wdl_path)
else:
wf.write(line)

return new_wdl_path


def open_or_zip(path: Union[str, Path, None]) -> Union[nullcontext, BytesIO, BinaryIO]:
"""Return a context that may be used for reading the contents from the path.
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def local_cromshell_config_json(local_hidden_cromshell_folder):

@pytest.fixture
def test_workflows_path():
return Path(__file__).joinpath("workflows/")
return Path(__file__).parent.joinpath("workflows/")


@pytest.fixture
Expand Down
93 changes: 93 additions & 0 deletions tests/unit/test_io_utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import csv
import io
import os
import re
import shutil
import tempfile
from contextlib import redirect_stdout
from pathlib import Path
from tempfile import NamedTemporaryFile
from zipfile import ZipFile

import pytest
Expand Down Expand Up @@ -353,6 +356,96 @@ def test_update_all_workflow_database_tsv(
):
assert row[column_to_update] == update_value

@pytest.mark.parametrize(
"wdl_content, expected_result",
[
('import "other.wdl"', False), # No nested import
('import "../nested/other.wdl"', True), # Nested import
('import "nested/other.wdl"', False), # Relative path, but not nested
("task my_task { command { echo 'Hello, World!' } }", False), # No import
(
'import "../nested/other.wdl"\nimport "nested/another.wdl"',
True,
), # Multiple imports, one nested
],
)
def test_has_nested_dependencies(self, wdl_content, expected_result):
# Create a temporary file with the provided WDL content
with NamedTemporaryFile(mode="w", delete=False) as temp_file:
temp_file.write(wdl_content)

wdl_path = Path(temp_file.name)

# Call the function with the temporary file path
result = io_utils.has_nested_dependencies(wdl_path)

# Check if the result matches the expected outcome
assert result == expected_result

# Clean up the temporary file
wdl_path.unlink()

@pytest.mark.parametrize(
"wdl_path, flattened_wdl_file",
[
("/dir/path/2/wdl.wdl", "dir-path-2-wdl.wdl"),
("/another/wdl.wdl", "another-wdl.wdl"),
],
)
def test_get_flattened_filename(self, wdl_path, flattened_wdl_file):
# Create a TemporaryDirectory to simulate tempdir
with tempfile.TemporaryDirectory() as tempdir:
# tempdir = Path(tempdir_name)
wdl_path = Path(wdl_path)

# Call the function with the simulated tempdir and wdl_path
result = io_utils.get_flattened_filename(tempdir, wdl_path)

# Check if the result matches the expected outcome
assert result == Path(tempdir).joinpath(flattened_wdl_file)

# Define test cases using @pytest.mark.parametrize
@pytest.mark.parametrize(
"wdl_path, expected_file_content",
[
(
"wdl_with_imports/helloWorld_with_imports.wdl",
["-helloWorld.wdl", "-wdl_with_imports-hello_world_task.wdl"],
),
],
)
def test_flatten_nested_dependencies(
self, wdl_path, expected_file_content, test_workflows_path
):
# Create a temporary directory to simulate tempdir

tempdir = tempfile.TemporaryDirectory()
abs_wdl_path = test_workflows_path.joinpath(wdl_path)

abs_wdl_path_str = str(abs_wdl_path.absolute())

# Call the function with the simulated tempdir and wdl_path
result_path = io_utils.flatten_nested_dependencies(
tempdir=tempdir, wdl_path=abs_wdl_path_str
)

# Check if the result matches the expected outcome
expected_result_path = Path(tempdir.name).joinpath(
re.sub("^-", "", re.sub("/", "-", str(abs_wdl_path)))
)
assert result_path == expected_result_path

# Check if the expected file content is in the result file
for expected_file_content_line in expected_file_content:
parsed_line = (
re.sub("^-", "", re.sub("/", "-", str(abs_wdl_path.parents[1])))
+ expected_file_content_line
)
assert parsed_line in result_path.read_text()

# Clean up the temporary directory
tempdir.cleanup()

@pytest.fixture
def mock_data_path(self):
return Path(__file__).parent.joinpath("mock_data/")
Expand Down
42 changes: 42 additions & 0 deletions tests/workflows/wdl_with_imports/helloWorld_with_imports.wdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import "../helloWorld.wdl" as HelloWorldWorkflow
import "hello_world_task.wdl" as helloWorldTask

workflow HelloWorld {
meta {
workflow_description: "echos hello world"
}
parameter_meta {
# Description of inputs:
# Required:
docker: "Docker image in which to run"
# Optional:
mem: "Amount of memory to give to the machine running each task in this workflow."
preemptible_attempts: "Number of times to allow each task in this workflow to be preempted."
disk_space_gb: "Amount of storage disk space (in Gb) to give to each machine running each task in this workflow."
cpu: "Number of CPU cores to give to each machine running each task in this workflow."
boot_disk_size_gb: "Amount of boot disk space (in Gb) to give to each machine running each task in this workflow."
}
String docker

Int? mem
Int? preemptible_attempts
Int? disk_space_gb
Int? cpu
Int? boot_disk_size_gb

call helloWorldTask.HelloWorldTask {
input:
docker = docker,
mem = mem,
preemptible_attempts = preemptible_attempts,
disk_space_gb = disk_space_gb,
cpu = cpu,
boot_disk_size_gb = boot_disk_size_gb
}

output {
File output_file = HelloWorldTask.output_file
}
}


57 changes: 57 additions & 0 deletions tests/workflows/wdl_with_imports/hello_world_task.wdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
task HelloWorldTask {

# ------------------------------------------------
# Input args:
# Required:
# Runtime Options:
String docker
Int? mem
Int? preemptible_attempts
Int? disk_space_gb
Int? cpu
Int? boot_disk_size_gb

# ------------------------------------------------
# Process input args:
# ------------------------------------------------
# Get machine settings:
Boolean use_ssd = false

# You may have to change the following two parameter values depending on the task requirements
Int default_ram_mb = 3 * 1024
# WARNING: In the workflow, you should calculate the disk space as an input to this task (disk_space_gb). Please see [TODO: Link from Jose] for examples.
Int default_disk_space_gb = 100

Int default_boot_disk_size_gb = 15

# Mem is in units of GB but our command and memory runtime values are in MB
Int machine_mem = if defined(mem) then mem * 1024 else default_ram_mb
Int command_mem = machine_mem - 1024

# ------------------------------------------------
# Run our command:
command <<<
set -e
echo 'Hello World!'
>>>

# ------------------------------------------------
# Runtime settings:
# runtime {
# docker: docker
# memory: machine_mem + " MB"
# disks: "local-disk " + select_first([disk_space_gb, default_disk_space_gb]) + if use_ssd then " SSD" else " HDD"
# bootDiskSizeGb: select_first([boot_disk_size_gb, default_boot_disk_size_gb])
# preemptible: 0
# cpu: select_first([cpu, 1])
# }
# ------------------------------------------------
# Outputs:
output {
File output_file = stdout()
}
}

0 comments on commit 9473930

Please sign in to comment.