Skip to content

Commit

Permalink
Make cli worker parameter flexible (#606)
Browse files Browse the repository at this point in the history
* make cli command flexible

* add custom worker command to yaml

* add custom config args

* change tests to remove specific argument ordering

* fixes for ci
  • Loading branch information
hmacdope authored Jul 16, 2023
1 parent ff47d71 commit 4bbd0a0
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 31 deletions.
27 changes: 20 additions & 7 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
Seconds to wait for a scheduler before closing workers
extra : list
Deprecated: use ``worker_extra_args`` instead. This parameter will be removed in a future version.
worker_command : list
Command to run when launching a worker. Defaults to "distributed.cli.dask_worker"
worker_extra_args : list
Additional arguments to pass to `dask-worker`
env_extra : list
Expand Down Expand Up @@ -166,6 +168,7 @@ def __init__(
death_timeout=None,
local_directory=None,
extra=None,
worker_command=None,
worker_extra_args=None,
job_extra=None,
job_extra_directives=None,
Expand Down Expand Up @@ -222,6 +225,10 @@ def __init__(
)
if extra is None:
extra = dask.config.get("jobqueue.%s.extra" % self.config_name)
if worker_command is None:
worker_command = dask.config.get(
"jobqueue.%s.worker-command" % self.config_name
)
if worker_extra_args is None:
worker_extra_args = dask.config.get(
"jobqueue.%s.worker-extra-args" % self.config_name
Expand Down Expand Up @@ -332,17 +339,23 @@ def __init__(
self._job_script_prologue = job_script_prologue

# dask-worker command line build
dask_worker_command = "%(python)s -m distributed.cli.dask_worker" % dict(
python=python
dask_worker_command = "%(python)s -m %(worker_command)s" % dict(
python=python,
worker_command=worker_command
)

command_args = [dask_worker_command, self.scheduler]
command_args += ["--nthreads", self.worker_process_threads]
if processes is not None and processes > 1:
command_args += ["--nworkers", processes]

command_args += ["--memory-limit", self.worker_process_memory]
# common
command_args += ["--name", str(name)]
command_args += ["--nanny" if nanny else "--no-nanny"]
command_args += ["--nthreads", self.worker_process_threads]
command_args += ["--memory-limit", self.worker_process_memory]

# distributed.cli.dask_worker specific
if worker_command == "distributed.cli.dask_worker":
if processes is not None and processes > 1:
command_args += ["--nworkers", processes]
command_args += ["--nanny" if nanny else "--no-nanny"]

if death_timeout is not None:
command_args += ["--death-timeout", death_timeout]
Expand Down
8 changes: 8 additions & 0 deletions dask_jobqueue/jobqueue.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ jobqueue:
local-directory: null # Location of fast local storage like /scratch or $TMPDIR
shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers
extra: null # deprecated: use worker-extra-args
worker-command: "distributed.cli.dask_worker" # Command to launch a worker
worker-extra-args: [] # Additional arguments to pass to `dask-worker`

# OAR resource manager options
Expand Down Expand Up @@ -44,6 +45,7 @@ jobqueue:
local-directory: null # Location of fast local storage like /scratch or $TMPDIR
shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers
extra: null # deprecated: use worker-extra-args
worker-command: "distributed.cli.dask_worker" # Command to launch a worker
worker-extra-args: [] # Additional arguments to pass to `dask-worker`

# PBS resource manager options
Expand Down Expand Up @@ -75,6 +77,7 @@ jobqueue:
local-directory: null # Location of fast local storage like /scratch or $TMPDIR
shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers
extra: null # deprecated: use worker-extra-args
worker-command: "distributed.cli.dask_worker" # Command to launch a worker
worker-extra-args: [] # Additional arguments to pass to `dask-worker`

# SGE resource manager options
Expand Down Expand Up @@ -106,6 +109,7 @@ jobqueue:
local-directory: null # Location of fast local storage like /scratch or $TMPDIR
shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers
extra: null # deprecated: use worker-extra-args
worker-command: "distributed.cli.dask_worker" # Command to launch a worker
worker-extra-args: [] # Additional arguments to pass to `dask-worker`

# SLURM resource manager options
Expand Down Expand Up @@ -138,6 +142,7 @@ jobqueue:
local-directory: null # Location of fast local storage like /scratch or $TMPDIR
shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers
extra: null # deprecated: use worker-extra-args
worker-command: "distributed.cli.dask_worker" # Command to launch a worker
worker-extra-args: [] # Additional arguments to pass to `dask-worker`

# PBS resource manager options
Expand Down Expand Up @@ -169,6 +174,7 @@ jobqueue:
local-directory: null # Location of fast local storage like /scratch or $TMPDIR
shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers
extra: null # deprecated: use worker-extra-args
worker-command: "distributed.cli.dask_worker" # Command to launch a worker
worker-extra-args: [] # Additional arguments to pass to `dask-worker`

# LSF resource manager options
Expand Down Expand Up @@ -203,6 +209,7 @@ jobqueue:
local-directory: null # Location of fast local storage like /scratch or $TMPDIR
shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers
extra: null # deprecated: use worker-extra-args
worker-command: "distributed.cli.dask_worker" # Command to launch a worker
worker-extra-args: [] # Additional arguments to pass to `dask-worker`

# HTCondor Resource Manager options
Expand Down Expand Up @@ -232,6 +239,7 @@ jobqueue:
local-directory: null # Location of fast local storage like /scratch or $TMPDIR
shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers
extra: null # deprecated: use worker-extra-args
worker-command: "distributed.cli.dask_worker" # Command to launch a worker
worker-extra-args: [] # Additional arguments to pass to `dask-worker`

env-extra: null
Expand Down
1 change: 1 addition & 0 deletions dask_jobqueue/tests/test_htcondor.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def test_config_name_htcondor_takes_custom_config():
"interface": None,
"death-timeout": None,
"extra": None,
"worker-command": None,
"worker-extra-args": [],
"env-extra": None,
"job-script-prologue": [],
Expand Down
3 changes: 3 additions & 0 deletions dask_jobqueue/tests/test_jobqueue_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ def test_command_template(Cluster):
assert " --local-directory /scratch" in cluster._dummy_job._command_template
assert " --preload mymodule" in cluster._dummy_job._command_template

with Cluster(cores=2, memory="4GB", worker_command="dask_cuda.cli") as cluster:
assert "dask_cuda.cli" in cluster._dummy_job._command_template


def test_shebang_settings(Cluster, request):
if Cluster is HTCondorCluster or Cluster is LocalCluster:
Expand Down
13 changes: 7 additions & 6 deletions dask_jobqueue/tests/test_lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ def test_job_script():
in job_script
)
formatted_bytes = format_bytes(parse_bytes("7GB")).replace(" ", "")
assert (
f"--nthreads 2 --nworkers 4 --memory-limit {formatted_bytes}" in job_script
)
assert ("--nthreads 2" in job_script)
assert ("--nworkers 4" in job_script)
assert (f"--memory-limit {formatted_bytes}" in job_script)

with LSFCluster(
queue="general",
Expand All @@ -130,9 +130,9 @@ def test_job_script():
in job_script
)
formatted_bytes = format_bytes(parse_bytes("7GB")).replace(" ", "")
assert (
f"--nthreads 2 --nworkers 4 --memory-limit {formatted_bytes}" in job_script
)
assert ("--nthreads 2" in job_script)
assert ("--nworkers 4" in job_script)
assert (f"--memory-limit {formatted_bytes}" in job_script)

with LSFCluster(
walltime="1:00",
Expand Down Expand Up @@ -322,6 +322,7 @@ def test_config_name_lsf_takes_custom_config():
"local-directory": "/foo",
"shared-temp-directory": None,
"extra": None,
"worker-command": None,
"worker-extra-args": [],
"env-extra": None,
"job-script-prologue": [],
Expand Down
13 changes: 7 additions & 6 deletions dask_jobqueue/tests/test_oar.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ def test_job_script():
in job_script
)
formatted_bytes = format_bytes(parse_bytes("7GB")).replace(" ", "")
assert (
f"--nthreads 2 --nworkers 4 --memory-limit {formatted_bytes}" in job_script
)
assert ("--nthreads 2" in job_script)
assert ("--nworkers 4" in job_script)
assert (f"--memory-limit {formatted_bytes}" in job_script)

with OARCluster(
walltime="00:02:00",
Expand Down Expand Up @@ -115,9 +115,9 @@ def test_job_script():
in job_script
)
formatted_bytes = format_bytes(parse_bytes("7GB")).replace(" ", "")
assert (
f"--nthreads 2 --nworkers 4 --memory-limit {formatted_bytes}" in job_script
)
assert ("--nthreads 2" in job_script)
assert ("--nworkers 4" in job_script)
assert (f"--memory-limit {formatted_bytes}" in job_script)


def test_config_name_oar_takes_custom_config():
Expand All @@ -137,6 +137,7 @@ def test_config_name_oar_takes_custom_config():
"local-directory": "/foo",
"shared-temp-directory": None,
"extra": None,
"worker-command": None,
"worker-extra-args": [],
"env-extra": None,
"job-script-prologue": [],
Expand Down
13 changes: 7 additions & 6 deletions dask_jobqueue/tests/test_pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ def test_job_script(Cluster):
in job_script
)
formatted_bytes = format_bytes(parse_bytes("7GB")).replace(" ", "")
assert (
f"--nthreads 2 --nworkers 4 --memory-limit {formatted_bytes}" in job_script
)
assert ("--nthreads 2" in job_script)
assert ("--nworkers 4" in job_script)
assert (f"--memory-limit {formatted_bytes}" in job_script)

with Cluster(
queue="regular",
Expand All @@ -102,9 +102,9 @@ def test_job_script(Cluster):
in job_script
)
formatted_bytes = format_bytes(parse_bytes("7GB")).replace(" ", "")
assert (
f"--nthreads 2 --nworkers 4 --memory-limit {formatted_bytes}" in job_script
)
assert ("--nthreads 2" in job_script)
assert ("--nworkers 4" in job_script)
assert (f"--memory-limit {formatted_bytes}" in job_script)


@pytest.mark.env("pbs")
Expand Down Expand Up @@ -361,6 +361,7 @@ def test_config_name_pbs_takes_custom_config():
"local-directory": "/foo",
"shared-temp-directory": None,
"extra": None,
"worker-command": None,
"worker-extra-args": [],
"env-extra": None,
"job-script-prologue": [],
Expand Down
1 change: 1 addition & 0 deletions dask_jobqueue/tests/test_sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def test_config_name_sge_takes_custom_config():
"local-directory": "/foo",
"shared-temp-directory": None,
"extra": None,
"worker-command": None,
"worker-extra-args": [],
"env-extra": None,
"job-script-prologue": [],
Expand Down
13 changes: 7 additions & 6 deletions dask_jobqueue/tests/test_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ def test_job_script():
in job_script
)
formatted_bytes = format_bytes(parse_bytes("7GB")).replace(" ", "")
assert (
f"--nthreads 2 --nworkers 4 --memory-limit {formatted_bytes}" in job_script
)
assert ("--nthreads 2" in job_script)
assert ("--nworkers 4" in job_script)
assert (f"--memory-limit {formatted_bytes}" in job_script)

with SLURMCluster(
walltime="00:02:00",
Expand Down Expand Up @@ -111,9 +111,9 @@ def test_job_script():
in job_script
)
formatted_bytes = format_bytes(parse_bytes("7GB")).replace(" ", "")
assert (
f"--nthreads 2 --nworkers 4 --memory-limit {formatted_bytes}" in job_script
)
assert ("--nthreads 2" in job_script)
assert ("--nworkers 4" in job_script)
assert (f"--memory-limit {formatted_bytes}" in job_script)


@pytest.mark.env("slurm")
Expand Down Expand Up @@ -193,6 +193,7 @@ def test_config_name_slurm_takes_custom_config():
"local-directory": "/foo",
"shared-temp-directory": None,
"extra": None,
"worker-command": None,
"worker-extra-args": [],
"env-extra": None,
"job-script-prologue": [],
Expand Down

0 comments on commit 4bbd0a0

Please sign in to comment.