From ef45c0e2210d7d4b08e0747e5551723e5b8c3e2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20R=C3=BCbenach?= Date: Wed, 30 Aug 2023 21:59:08 +0200 Subject: [PATCH] Asynchronous job submission and removal (#610) * Asynchronous job submission and removal This solution is using asyncio.create_subprocess_exec * Code style for black --- dask_jobqueue/core.py | 30 ++++++++-------- dask_jobqueue/local.py | 2 +- dask_jobqueue/oar.py | 1 - dask_jobqueue/tests/test_htcondor.py | 1 - dask_jobqueue/tests/test_job.py | 42 +++++++++++------------ dask_jobqueue/tests/test_jobqueue_core.py | 2 -- dask_jobqueue/tests/test_lsf.py | 8 ----- dask_jobqueue/tests/test_pbs.py | 10 ------ dask_jobqueue/tests/test_sge.py | 1 - dask_jobqueue/tests/test_slurm.py | 5 --- 10 files changed, 36 insertions(+), 66 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 2883c088..b265ac87 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -4,9 +4,8 @@ import os import re import shlex -import subprocess +import asyncio import sys -import weakref import abc import tempfile import copy @@ -403,8 +402,7 @@ def job_file(self): yield fn async def _submit_job(self, script_filename): - # Should we make this async friendly? - return self._call(shlex.split(self.submit_command) + [script_filename]) + return await self._call(shlex.split(self.submit_command) + [script_filename]) @property def worker_process_threads(self): @@ -424,8 +422,6 @@ async def start(self): out = await self._submit_job(fn) self.job_id = self._job_id_from_submit_output(out) - weakref.finalize(self, self._close_job, self.job_id, self.cancel_command) - logger.debug("Starting job: %s", self.job_id) await super().start() @@ -452,18 +448,18 @@ def _job_id_from_submit_output(self, out): async def close(self): logger.debug("Stopping worker: %s job: %s", self.name, self.job_id) - self._close_job(self.job_id, self.cancel_command) + await self._close_job(self.job_id, self.cancel_command) @classmethod - def _close_job(cls, job_id, cancel_command): + async def _close_job(cls, job_id, cancel_command): if job_id: with suppress(RuntimeError): # deleting job when job already gone - cls._call(shlex.split(cancel_command) + [job_id]) + await cls._call(shlex.split(cancel_command) + [job_id]) logger.debug("Closed job %s", job_id) @staticmethod - def _call(cmd, **kwargs): - """Call a command using subprocess.Popen. + async def _call(cmd, **kwargs): + """Call a command using asyncio.create_subprocess_exec. This centralizes calls out to the command line, providing consistent outputs, logging, and an opportunity to go asynchronous in the future. @@ -472,7 +468,7 @@ def _call(cmd, **kwargs): ---------- cmd: List(str)) A command, each of which is a list of strings to hand to - subprocess.Popen + asyncio.create_subprocess_exec Examples -------- @@ -491,11 +487,14 @@ def _call(cmd, **kwargs): "Executing the following command to command line\n{}".format(cmd_str) ) - proc = subprocess.Popen( - cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + **kwargs ) - out, err = proc.communicate() + out, err = await proc.communicate() out, err = out.decode(), err.decode() if proc.returncode != 0: @@ -758,7 +757,6 @@ def _get_worker_security(self, security): for key, value in worker_security_dict.items(): # dump worker in-memory keys for use in job_script if value is not None and "\n" in value: - try: f = tempfile.NamedTemporaryFile( mode="wt", diff --git a/dask_jobqueue/local.py b/dask_jobqueue/local.py index e7b0aa67..68e98c71 100644 --- a/dask_jobqueue/local.py +++ b/dask_jobqueue/local.py @@ -71,7 +71,7 @@ async def _submit_job(self, script_filename): return str(self.process.pid) @classmethod - def _close_job(self, job_id, cancel_command): + async def _close_job(self, job_id, cancel_command): os.kill(int(job_id), 9) # from distributed.utils_test import terminate_process # terminate_process(self.process) diff --git a/dask_jobqueue/oar.py b/dask_jobqueue/oar.py index 8dba580e..0a23f4b0 100644 --- a/dask_jobqueue/oar.py +++ b/dask_jobqueue/oar.py @@ -11,7 +11,6 @@ class OARJob(Job): - # Override class variables submit_command = "oarsub" cancel_command = "oardel" diff --git a/dask_jobqueue/tests/test_htcondor.py b/dask_jobqueue/tests/test_htcondor.py index 653b72df..ea57e004 100644 --- a/dask_jobqueue/tests/test_htcondor.py +++ b/dask_jobqueue/tests/test_htcondor.py @@ -106,7 +106,6 @@ def test_extra_args_broken_cancel(loop): cancel_command_extra=["-name", "wrong.docker"], ) as cluster: with Client(cluster) as client: - cluster.scale(2) client.wait_for_workers(2, timeout=QUEUE_WAIT) diff --git a/dask_jobqueue/tests/test_job.py b/dask_jobqueue/tests/test_job.py index f24e1148..497ceeb5 100644 --- a/dask_jobqueue/tests/test_job.py +++ b/dask_jobqueue/tests/test_job.py @@ -432,25 +432,25 @@ def test_deprecation_job_extra(Cluster): assert "old_param" in job_script -def test_jobqueue_job_call(tmpdir, Cluster): - cluster = Cluster(cores=1, memory="1GB") - - path = tmpdir.join("test.py") - path.write('print("this is the stdout")') - - out = cluster.job_cls._call([sys.executable, path.strpath]) - assert out == "this is the stdout\n" - - path_with_error = tmpdir.join("non-zero-exit-code.py") - path_with_error.write('print("this is the stdout")\n1/0') - - match = ( - "Command exited with non-zero exit code.+" - "Exit code: 1.+" - "stdout:\nthis is the stdout.+" - "stderr:.+ZeroDivisionError" - ) +@pytest.mark.asyncio +async def test_jobqueue_job_call(tmpdir, Cluster): + async with Cluster(cores=1, memory="1GB", asynchronous=True) as cluster: + path = tmpdir.join("test.py") + path.write('print("this is the stdout")') + + out = await cluster.job_cls._call([sys.executable, path.strpath]) + assert out == "this is the stdout\n" + + path_with_error = tmpdir.join("non-zero-exit-code.py") + path_with_error.write('print("this is the stdout")\n1/0') + + match = ( + "Command exited with non-zero exit code.+" + "Exit code: 1.+" + "stdout:\nthis is the stdout.+" + "stderr:.+ZeroDivisionError" + ) - match = re.compile(match, re.DOTALL) - with pytest.raises(RuntimeError, match=match): - cluster.job_cls._call([sys.executable, path_with_error.strpath]) + match = re.compile(match, re.DOTALL) + with pytest.raises(RuntimeError, match=match): + await cluster.job_cls._call([sys.executable, path_with_error.strpath]) diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index 23d2059f..ceadd126 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -247,7 +247,6 @@ def get_interface_and_port(index=0): def test_scheduler_options(Cluster): - interface, port = get_interface_and_port() with Cluster( @@ -324,7 +323,6 @@ def test_import_scheduler_options_from_config(Cluster): with dask.config.set( {"jobqueue.%s.scheduler-options" % default_config_name: scheduler_options} ): - with Cluster(cores=2, memory="2GB") as cluster: scheduler_options = cluster.scheduler_spec["options"] assert scheduler_options.get("interface") == config_scheduler_interface diff --git a/dask_jobqueue/tests/test_lsf.py b/dask_jobqueue/tests/test_lsf.py index 134599ba..87e2cd67 100644 --- a/dask_jobqueue/tests/test_lsf.py +++ b/dask_jobqueue/tests/test_lsf.py @@ -17,7 +17,6 @@ def test_header(): with LSFCluster(walltime="00:02", processes=4, cores=8, memory="8GB") as cluster: - assert "#BSUB" in cluster.job_header assert "#BSUB -J dask-worker" in cluster.job_header assert "#BSUB -n 8" in cluster.job_header @@ -35,7 +34,6 @@ def test_header(): ncpus=24, mem=100000000000, ) as cluster: - assert "#BSUB -q general" in cluster.job_header assert "#BSUB -J dask-worker" in cluster.job_header assert "#BSUB -n 24" in cluster.job_header @@ -54,7 +52,6 @@ def test_header(): ncpus=24, mem=100000000000, ) as cluster: - assert "#BSUB -q general" in cluster.job_header assert "#BSUB -J dask-worker" in cluster.job_header assert "#BSUB -n 24" in cluster.job_header @@ -65,7 +62,6 @@ def test_header(): assert '#BSUB -P "Dask On LSF"' in cluster.job_header with LSFCluster(cores=4, memory="8GB") as cluster: - assert "#BSUB -n" in cluster.job_header assert "#BSUB -W" in cluster.job_header assert "#BSUB -M" in cluster.job_header @@ -75,7 +71,6 @@ def test_header(): with LSFCluster( cores=4, memory="8GB", job_extra_directives=["-u email@domain.com"] ) as cluster: - assert "#BSUB -u email@domain.com" in cluster.job_header assert "#BSUB -n" in cluster.job_header assert "#BSUB -W" in cluster.job_header @@ -86,7 +81,6 @@ def test_header(): def test_job_script(): with LSFCluster(walltime="00:02", processes=4, cores=8, memory="28GB") as cluster: - job_script = cluster.job_script() assert "#BSUB" in job_script assert "#BSUB -J dask-worker" in job_script @@ -114,7 +108,6 @@ def test_job_script(): ncpus=24, mem=100000000000, ) as cluster: - job_script = cluster.job_script() assert "#BSUB -q general" in cluster.job_header assert "#BSUB -J dask-worker" in cluster.job_header @@ -141,7 +134,6 @@ def test_job_script(): project="Dask On LSF", job_extra_directives=["-R rusage[mem=16GB]"], ) as cluster: - job_script = cluster.job_script() assert "#BSUB -J dask-worker" in cluster.job_header diff --git a/dask_jobqueue/tests/test_pbs.py b/dask_jobqueue/tests/test_pbs.py index 565d9558..8ec3a019 100644 --- a/dask_jobqueue/tests/test_pbs.py +++ b/dask_jobqueue/tests/test_pbs.py @@ -18,7 +18,6 @@ def test_header(Cluster): with Cluster( walltime="00:02:00", processes=4, cores=8, memory="28GB", name="dask-worker" ) as cluster: - assert "#PBS" in cluster.job_header assert "#PBS -N dask-worker" in cluster.job_header assert "#PBS -l select=1:ncpus=8:mem=27GB" in cluster.job_header @@ -34,7 +33,6 @@ def test_header(Cluster): resource_spec="select=1:ncpus=24:mem=100GB", memory="28GB", ) as cluster: - assert "#PBS -q regular" in cluster.job_header assert "#PBS -N dask-worker" in cluster.job_header assert "#PBS -l select=1:ncpus=24:mem=100GB" in cluster.job_header @@ -43,7 +41,6 @@ def test_header(Cluster): assert "#PBS -A DaskOnPBS" in cluster.job_header with Cluster(cores=4, memory="8GB") as cluster: - assert "#PBS -j oe" not in cluster.job_header assert "#PBS -N" in cluster.job_header assert "#PBS -l walltime=" in cluster.job_header @@ -51,7 +48,6 @@ def test_header(Cluster): assert "#PBS -q" not in cluster.job_header with Cluster(cores=4, memory="8GB", job_extra_directives=["-j oe"]) as cluster: - assert "#PBS -j oe" in cluster.job_header assert "#PBS -N" in cluster.job_header assert "#PBS -l walltime=" in cluster.job_header @@ -62,7 +58,6 @@ def test_header(Cluster): @pytest.mark.parametrize("Cluster", [PBSCluster, MoabCluster]) def test_job_script(Cluster): with Cluster(walltime="00:02:00", processes=4, cores=8, memory="28GB") as cluster: - job_script = cluster.job_script() assert "#PBS" in job_script assert "#PBS -N dask-worker" in job_script @@ -88,7 +83,6 @@ def test_job_script(Cluster): resource_spec="select=1:ncpus=24:mem=100GB", memory="28GB", ) as cluster: - job_script = cluster.job_script() assert "#PBS -q regular" in job_script assert "#PBS -N dask-worker" in job_script @@ -119,7 +113,6 @@ def test_basic(loop): loop=loop, ) as cluster: with Client(cluster) as client: - cluster.scale(2) client.wait_for_workers(2, timeout=QUEUE_WAIT) @@ -154,7 +147,6 @@ def test_scale_cores_memory(loop): loop=loop, ) as cluster: with Client(cluster) as client: - cluster.scale(cores=2) client.wait_for_workers(1, timeout=QUEUE_WAIT) @@ -188,7 +180,6 @@ def test_basic_scale_edge_cases(loop): job_extra_directives=["-V"], loop=loop, ) as cluster: - cluster.scale(2) cluster.scale(0) @@ -299,7 +290,6 @@ def test_scale_grouped(loop): loop=loop, ) as cluster: with Client(cluster) as client: - cluster.scale(4) # Start 2 jobs start = time() diff --git a/dask_jobqueue/tests/test_sge.py b/dask_jobqueue/tests/test_sge.py index d1b5d2f3..ab761223 100644 --- a/dask_jobqueue/tests/test_sge.py +++ b/dask_jobqueue/tests/test_sge.py @@ -16,7 +16,6 @@ def test_basic(loop): walltime="00:02:00", cores=8, processes=4, memory="2GiB", loop=loop ) as cluster: with Client(cluster, loop=loop) as client: - cluster.scale(2) start = time() diff --git a/dask_jobqueue/tests/test_slurm.py b/dask_jobqueue/tests/test_slurm.py index c2aaed36..51dbbfb2 100644 --- a/dask_jobqueue/tests/test_slurm.py +++ b/dask_jobqueue/tests/test_slurm.py @@ -16,7 +16,6 @@ def test_header(): with SLURMCluster( walltime="00:02:00", processes=4, cores=8, memory="28GB" ) as cluster: - assert "#SBATCH" in cluster.job_header assert "#SBATCH -J dask-worker" in cluster.job_header assert "#SBATCH -n 1" in cluster.job_header @@ -35,7 +34,6 @@ def test_header(): job_cpu=16, job_mem="100G", ) as cluster: - assert "#SBATCH --cpus-per-task=16" in cluster.job_header assert "#SBATCH --cpus-per-task=8" not in cluster.job_header assert "#SBATCH --mem=100G" in cluster.job_header @@ -44,7 +42,6 @@ def test_header(): assert "#SBATCH -p regular" in cluster.job_header with SLURMCluster(cores=4, memory="8GB") as cluster: - assert "#SBATCH" in cluster.job_header assert "#SBATCH -J " in cluster.job_header assert "#SBATCH -n 1" in cluster.job_header @@ -57,7 +54,6 @@ def test_job_script(): with SLURMCluster( walltime="00:02:00", processes=4, cores=8, memory="28GB" ) as cluster: - job_script = cluster.job_script() assert "#SBATCH" in job_script assert "#SBATCH -J dask-worker" in job_script @@ -127,7 +123,6 @@ def test_basic(loop): loop=loop, ) as cluster: with Client(cluster) as client: - cluster.scale(2) start = time()