From 4bbd0a08342926ee8728ccd9a4559be3aa3ec628 Mon Sep 17 00:00:00 2001 From: Hugo MacDermott-Opeskin Date: Sun, 16 Jul 2023 15:58:42 +1000 Subject: [PATCH] Make cli worker parameter flexible (#606) * make cli command flexible * add custom worker command to yaml * add custom config args * change tests to remove specific argument ordering * fixes for ci --- dask_jobqueue/core.py | 27 +++++++++++++++++------ dask_jobqueue/jobqueue.yaml | 8 +++++++ dask_jobqueue/tests/test_htcondor.py | 1 + dask_jobqueue/tests/test_jobqueue_core.py | 3 +++ dask_jobqueue/tests/test_lsf.py | 13 ++++++----- dask_jobqueue/tests/test_oar.py | 13 ++++++----- dask_jobqueue/tests/test_pbs.py | 13 ++++++----- dask_jobqueue/tests/test_sge.py | 1 + dask_jobqueue/tests/test_slurm.py | 13 ++++++----- 9 files changed, 61 insertions(+), 31 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 01c1756d..2883c088 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -48,6 +48,8 @@ Seconds to wait for a scheduler before closing workers extra : list Deprecated: use ``worker_extra_args`` instead. This parameter will be removed in a future version. + worker_command : list + Command to run when launching a worker. Defaults to "distributed.cli.dask_worker" worker_extra_args : list Additional arguments to pass to `dask-worker` env_extra : list @@ -166,6 +168,7 @@ def __init__( death_timeout=None, local_directory=None, extra=None, + worker_command=None, worker_extra_args=None, job_extra=None, job_extra_directives=None, @@ -222,6 +225,10 @@ def __init__( ) if extra is None: extra = dask.config.get("jobqueue.%s.extra" % self.config_name) + if worker_command is None: + worker_command = dask.config.get( + "jobqueue.%s.worker-command" % self.config_name + ) if worker_extra_args is None: worker_extra_args = dask.config.get( "jobqueue.%s.worker-extra-args" % self.config_name @@ -332,17 +339,23 @@ def __init__( self._job_script_prologue = job_script_prologue # dask-worker command line build - dask_worker_command = "%(python)s -m distributed.cli.dask_worker" % dict( - python=python + dask_worker_command = "%(python)s -m %(worker_command)s" % dict( + python=python, + worker_command=worker_command ) + command_args = [dask_worker_command, self.scheduler] - command_args += ["--nthreads", self.worker_process_threads] - if processes is not None and processes > 1: - command_args += ["--nworkers", processes] - command_args += ["--memory-limit", self.worker_process_memory] + # common command_args += ["--name", str(name)] - command_args += ["--nanny" if nanny else "--no-nanny"] + command_args += ["--nthreads", self.worker_process_threads] + command_args += ["--memory-limit", self.worker_process_memory] + + # distributed.cli.dask_worker specific + if worker_command == "distributed.cli.dask_worker": + if processes is not None and processes > 1: + command_args += ["--nworkers", processes] + command_args += ["--nanny" if nanny else "--no-nanny"] if death_timeout is not None: command_args += ["--death-timeout", death_timeout] diff --git a/dask_jobqueue/jobqueue.yaml b/dask_jobqueue/jobqueue.yaml index bd7b9c5c..3bcb8c58 100644 --- a/dask_jobqueue/jobqueue.yaml +++ b/dask_jobqueue/jobqueue.yaml @@ -12,6 +12,7 @@ jobqueue: local-directory: null # Location of fast local storage like /scratch or $TMPDIR shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers extra: null # deprecated: use worker-extra-args + worker-command: "distributed.cli.dask_worker" # Command to launch a worker worker-extra-args: [] # Additional arguments to pass to `dask-worker` # OAR resource manager options @@ -44,6 +45,7 @@ jobqueue: local-directory: null # Location of fast local storage like /scratch or $TMPDIR shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers extra: null # deprecated: use worker-extra-args + worker-command: "distributed.cli.dask_worker" # Command to launch a worker worker-extra-args: [] # Additional arguments to pass to `dask-worker` # PBS resource manager options @@ -75,6 +77,7 @@ jobqueue: local-directory: null # Location of fast local storage like /scratch or $TMPDIR shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers extra: null # deprecated: use worker-extra-args + worker-command: "distributed.cli.dask_worker" # Command to launch a worker worker-extra-args: [] # Additional arguments to pass to `dask-worker` # SGE resource manager options @@ -106,6 +109,7 @@ jobqueue: local-directory: null # Location of fast local storage like /scratch or $TMPDIR shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers extra: null # deprecated: use worker-extra-args + worker-command: "distributed.cli.dask_worker" # Command to launch a worker worker-extra-args: [] # Additional arguments to pass to `dask-worker` # SLURM resource manager options @@ -138,6 +142,7 @@ jobqueue: local-directory: null # Location of fast local storage like /scratch or $TMPDIR shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers extra: null # deprecated: use worker-extra-args + worker-command: "distributed.cli.dask_worker" # Command to launch a worker worker-extra-args: [] # Additional arguments to pass to `dask-worker` # PBS resource manager options @@ -169,6 +174,7 @@ jobqueue: local-directory: null # Location of fast local storage like /scratch or $TMPDIR shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers extra: null # deprecated: use worker-extra-args + worker-command: "distributed.cli.dask_worker" # Command to launch a worker worker-extra-args: [] # Additional arguments to pass to `dask-worker` # LSF resource manager options @@ -203,6 +209,7 @@ jobqueue: local-directory: null # Location of fast local storage like /scratch or $TMPDIR shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers extra: null # deprecated: use worker-extra-args + worker-command: "distributed.cli.dask_worker" # Command to launch a worker worker-extra-args: [] # Additional arguments to pass to `dask-worker` # HTCondor Resource Manager options @@ -232,6 +239,7 @@ jobqueue: local-directory: null # Location of fast local storage like /scratch or $TMPDIR shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers extra: null # deprecated: use worker-extra-args + worker-command: "distributed.cli.dask_worker" # Command to launch a worker worker-extra-args: [] # Additional arguments to pass to `dask-worker` env-extra: null diff --git a/dask_jobqueue/tests/test_htcondor.py b/dask_jobqueue/tests/test_htcondor.py index c5c5bf6f..653b72df 100644 --- a/dask_jobqueue/tests/test_htcondor.py +++ b/dask_jobqueue/tests/test_htcondor.py @@ -141,6 +141,7 @@ def test_config_name_htcondor_takes_custom_config(): "interface": None, "death-timeout": None, "extra": None, + "worker-command": None, "worker-extra-args": [], "env-extra": None, "job-script-prologue": [], diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index 8f743f7e..23d2059f 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -49,6 +49,9 @@ def test_command_template(Cluster): assert " --local-directory /scratch" in cluster._dummy_job._command_template assert " --preload mymodule" in cluster._dummy_job._command_template + with Cluster(cores=2, memory="4GB", worker_command="dask_cuda.cli") as cluster: + assert "dask_cuda.cli" in cluster._dummy_job._command_template + def test_shebang_settings(Cluster, request): if Cluster is HTCondorCluster or Cluster is LocalCluster: diff --git a/dask_jobqueue/tests/test_lsf.py b/dask_jobqueue/tests/test_lsf.py index 40c826cc..134599ba 100644 --- a/dask_jobqueue/tests/test_lsf.py +++ b/dask_jobqueue/tests/test_lsf.py @@ -101,9 +101,9 @@ def test_job_script(): in job_script ) formatted_bytes = format_bytes(parse_bytes("7GB")).replace(" ", "") - assert ( - f"--nthreads 2 --nworkers 4 --memory-limit {formatted_bytes}" in job_script - ) + assert ("--nthreads 2" in job_script) + assert ("--nworkers 4" in job_script) + assert (f"--memory-limit {formatted_bytes}" in job_script) with LSFCluster( queue="general", @@ -130,9 +130,9 @@ def test_job_script(): in job_script ) formatted_bytes = format_bytes(parse_bytes("7GB")).replace(" ", "") - assert ( - f"--nthreads 2 --nworkers 4 --memory-limit {formatted_bytes}" in job_script - ) + assert ("--nthreads 2" in job_script) + assert ("--nworkers 4" in job_script) + assert (f"--memory-limit {formatted_bytes}" in job_script) with LSFCluster( walltime="1:00", @@ -322,6 +322,7 @@ def test_config_name_lsf_takes_custom_config(): "local-directory": "/foo", "shared-temp-directory": None, "extra": None, + "worker-command": None, "worker-extra-args": [], "env-extra": None, "job-script-prologue": [], diff --git a/dask_jobqueue/tests/test_oar.py b/dask_jobqueue/tests/test_oar.py index 52fd51bf..c9384885 100644 --- a/dask_jobqueue/tests/test_oar.py +++ b/dask_jobqueue/tests/test_oar.py @@ -82,9 +82,9 @@ def test_job_script(): in job_script ) formatted_bytes = format_bytes(parse_bytes("7GB")).replace(" ", "") - assert ( - f"--nthreads 2 --nworkers 4 --memory-limit {formatted_bytes}" in job_script - ) + assert ("--nthreads 2" in job_script) + assert ("--nworkers 4" in job_script) + assert (f"--memory-limit {formatted_bytes}" in job_script) with OARCluster( walltime="00:02:00", @@ -115,9 +115,9 @@ def test_job_script(): in job_script ) formatted_bytes = format_bytes(parse_bytes("7GB")).replace(" ", "") - assert ( - f"--nthreads 2 --nworkers 4 --memory-limit {formatted_bytes}" in job_script - ) + assert ("--nthreads 2" in job_script) + assert ("--nworkers 4" in job_script) + assert (f"--memory-limit {formatted_bytes}" in job_script) def test_config_name_oar_takes_custom_config(): @@ -137,6 +137,7 @@ def test_config_name_oar_takes_custom_config(): "local-directory": "/foo", "shared-temp-directory": None, "extra": None, + "worker-command": None, "worker-extra-args": [], "env-extra": None, "job-script-prologue": [], diff --git a/dask_jobqueue/tests/test_pbs.py b/dask_jobqueue/tests/test_pbs.py index 3959c86c..565d9558 100644 --- a/dask_jobqueue/tests/test_pbs.py +++ b/dask_jobqueue/tests/test_pbs.py @@ -76,9 +76,9 @@ def test_job_script(Cluster): in job_script ) formatted_bytes = format_bytes(parse_bytes("7GB")).replace(" ", "") - assert ( - f"--nthreads 2 --nworkers 4 --memory-limit {formatted_bytes}" in job_script - ) + assert ("--nthreads 2" in job_script) + assert ("--nworkers 4" in job_script) + assert (f"--memory-limit {formatted_bytes}" in job_script) with Cluster( queue="regular", @@ -102,9 +102,9 @@ def test_job_script(Cluster): in job_script ) formatted_bytes = format_bytes(parse_bytes("7GB")).replace(" ", "") - assert ( - f"--nthreads 2 --nworkers 4 --memory-limit {formatted_bytes}" in job_script - ) + assert ("--nthreads 2" in job_script) + assert ("--nworkers 4" in job_script) + assert (f"--memory-limit {formatted_bytes}" in job_script) @pytest.mark.env("pbs") @@ -361,6 +361,7 @@ def test_config_name_pbs_takes_custom_config(): "local-directory": "/foo", "shared-temp-directory": None, "extra": None, + "worker-command": None, "worker-extra-args": [], "env-extra": None, "job-script-prologue": [], diff --git a/dask_jobqueue/tests/test_sge.py b/dask_jobqueue/tests/test_sge.py index 10e2da2e..d1b5d2f3 100644 --- a/dask_jobqueue/tests/test_sge.py +++ b/dask_jobqueue/tests/test_sge.py @@ -58,6 +58,7 @@ def test_config_name_sge_takes_custom_config(): "local-directory": "/foo", "shared-temp-directory": None, "extra": None, + "worker-command": None, "worker-extra-args": [], "env-extra": None, "job-script-prologue": [], diff --git a/dask_jobqueue/tests/test_slurm.py b/dask_jobqueue/tests/test_slurm.py index 80769d5f..c2aaed36 100644 --- a/dask_jobqueue/tests/test_slurm.py +++ b/dask_jobqueue/tests/test_slurm.py @@ -77,9 +77,9 @@ def test_job_script(): in job_script ) formatted_bytes = format_bytes(parse_bytes("7GB")).replace(" ", "") - assert ( - f"--nthreads 2 --nworkers 4 --memory-limit {formatted_bytes}" in job_script - ) + assert ("--nthreads 2" in job_script) + assert ("--nworkers 4" in job_script) + assert (f"--memory-limit {formatted_bytes}" in job_script) with SLURMCluster( walltime="00:02:00", @@ -111,9 +111,9 @@ def test_job_script(): in job_script ) formatted_bytes = format_bytes(parse_bytes("7GB")).replace(" ", "") - assert ( - f"--nthreads 2 --nworkers 4 --memory-limit {formatted_bytes}" in job_script - ) + assert ("--nthreads 2" in job_script) + assert ("--nworkers 4" in job_script) + assert (f"--memory-limit {formatted_bytes}" in job_script) @pytest.mark.env("slurm") @@ -193,6 +193,7 @@ def test_config_name_slurm_takes_custom_config(): "local-directory": "/foo", "shared-temp-directory": None, "extra": None, + "worker-command": None, "worker-extra-args": [], "env-extra": None, "job-script-prologue": [],