From e8856a03f39db6b225e999469b2fc0d4ee0a1e4a Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Thu, 19 Dec 2024 01:18:55 +0100 Subject: [PATCH 01/25] ProcessFuture may okay --- src/aiida/engine/processes/process.py | 16 +++++++++------- src/aiida/engine/runners.py | 4 ++-- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/aiida/engine/processes/process.py b/src/aiida/engine/processes/process.py index e25d1b7c23..c684a4801b 100644 --- a/src/aiida/engine/processes/process.py +++ b/src/aiida/engine/processes/process.py @@ -39,9 +39,9 @@ import plumpy.futures import plumpy.persistence import plumpy.processes -from kiwipy.communications import UnroutableError +# from kiwipy.communications import UnroutableError from plumpy.process_states import Finished, ProcessState -from plumpy.processes import ConnectionClosed # type: ignore[attr-defined] +# from plumpy.processes import ConnectionClosed # type: ignore[attr-defined] from plumpy.processes import Process as PlumpyProcess from plumpy.utils import AttributesFrozendict @@ -178,7 +178,7 @@ def __init__( inputs=self.spec().inputs.serialize(inputs), logger=logger, loop=self._runner.loop, - communicator=self._runner.communicator, + coordinator=self._runner.communicator, ) self._node: Optional[orm.ProcessNode] = None @@ -352,10 +352,12 @@ def kill(self, msg: Union[str, None] = None) -> Union[bool, plumpy.futures.Futur result = asyncio.wrap_future(result) # type: ignore[arg-type] if asyncio.isfuture(result): killing.append(result) - except ConnectionClosed: - self.logger.info('no connection available to kill child<%s>', child.pk) - except UnroutableError: - self.logger.info('kill signal was unable to reach child<%s>', child.pk) + # except ConnectionClosed: + # self.logger.info('no connection available to kill child<%s>', child.pk) + # except UnroutableError: + # self.logger.info('kill signal was unable to reach child<%s>', child.pk) + except Exception: + raise if asyncio.isfuture(result): # We ourselves are waiting to be killed so add it to the list diff --git a/src/aiida/engine/runners.py b/src/aiida/engine/runners.py index 42cb76244c..b1f932becf 100644 --- a/src/aiida/engine/runners.py +++ b/src/aiida/engine/runners.py @@ -19,10 +19,10 @@ from typing import Any, Callable, Dict, NamedTuple, Optional, Tuple, Type, Union import kiwipy -from plumpy.communications import wrap_communicator +from plumpy.rmq import wrap_communicator from plumpy.events import reset_event_loop_policy, set_event_loop_policy from plumpy.persistence import Persister -from plumpy.process_comms import RemoteProcessThreadController +from plumpy.rmq import RemoteProcessThreadController from aiida.common import exceptions from aiida.orm import ProcessNode, load_node From a1e8f8747a9fec6c557480a3c3e90cb0e0b22fc6 Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Thu, 19 Dec 2024 16:05:20 +0100 Subject: [PATCH 02/25] Debug enhancer --- src/aiida/engine/processes/control.py | 2 +- src/aiida/engine/processes/launcher.py | 6 ++- src/aiida/tools/pytest_fixtures/daemon.py | 56 ++++++++++++++++++++++- tests/cmdline/commands/test_rabbitmq.py | 2 +- tests/engine/processes/test_control.py | 2 +- tests/engine/test_launch.py | 1 + 6 files changed, 64 insertions(+), 5 deletions(-) diff --git a/src/aiida/engine/processes/control.py b/src/aiida/engine/processes/control.py index 7cc214c76c..6be3aa7693 100644 --- a/src/aiida/engine/processes/control.py +++ b/src/aiida/engine/processes/control.py @@ -8,7 +8,7 @@ import kiwipy from kiwipy import communications -from plumpy.futures import unwrap_kiwi_future +from plumpy.rmq.futures import unwrap_kiwi_future from aiida.brokers import Broker from aiida.common.exceptions import AiidaException diff --git a/src/aiida/engine/processes/launcher.py b/src/aiida/engine/processes/launcher.py index b6c72aa724..e06024feae 100644 --- a/src/aiida/engine/processes/launcher.py +++ b/src/aiida/engine/processes/launcher.py @@ -38,7 +38,8 @@ def handle_continue_exception(node, exception, message): node.set_process_state(ProcessState.EXCEPTED) node.seal() - async def _continue(self, communicator, pid, nowait, tag=None): + + async def _continue(self, pid, nowait, tag=None): """Continue the task. Note that the task may already have been completed, as indicated from the corresponding the node, in which @@ -56,6 +57,9 @@ async def _continue(self, communicator, pid, nowait, tag=None): from aiida.orm import Data, load_node from aiida.orm.utils import serialize + LOGGER.error("SHOWTHIS???") + print("!!!HTHTHTH") + try: node = load_node(pk=pid) except (exceptions.MultipleObjectsError, exceptions.NotExistent): diff --git a/src/aiida/tools/pytest_fixtures/daemon.py b/src/aiida/tools/pytest_fixtures/daemon.py index 89ef02d841..b5f4363ddd 100644 --- a/src/aiida/tools/pytest_fixtures/daemon.py +++ b/src/aiida/tools/pytest_fixtures/daemon.py @@ -2,11 +2,14 @@ from __future__ import annotations +import logging import pathlib import typing as t import pytest +from aiida.engine.daemon.client import DaemonClient + if t.TYPE_CHECKING: from aiida.engine import Process, ProcessBuilder from aiida.orm import ProcessNode @@ -47,7 +50,7 @@ def test(daemon_client): @pytest.fixture -def started_daemon_client(daemon_client): +def started_daemon_client(daemon_client: DaemonClient): """Ensure that the daemon is running for the test profile and return the associated client. Usage:: @@ -60,8 +63,59 @@ def test(started_daemon_client): daemon_client.start_daemon() assert daemon_client.is_daemon_running + import time + import threading + # XXX: watchdog and pytest-timeout as extra deps of tests + from watchdog.observers import Observer + from watchdog.events import FileSystemEventHandler + + logger = logging.getLogger("tests.daemon:started_daemon_client") + + logger.debug(f'Daemon log file is located at: {daemon_client.daemon_log_file}') + + # This flag will be used to stop the thread when the fixture is torn down + stop_thread = False + + class LogFileEventHandler(FileSystemEventHandler): + def __init__(self, filepath): + self.filepath = filepath + # Keep track of how many bytes have been read so we print only new data + self._pos = 0 + + def on_modified(self, event): + if event.src_path == self.filepath: + # The file was modified, read from the last known position + with open(self.filepath, 'r') as f: + f.seek(self._pos) + new_output = f.read() + if new_output: + logger.debug(new_output) + self._pos = f.tell() + + def print_log_content(check_interval=0.1): + event_handler = LogFileEventHandler(daemon_client.daemon_log_file) + observer = Observer() + _ = observer.schedule(event_handler, str(pathlib.Path(daemon_client.daemon_log_file)), recursive=False) + observer.start() + + try: + while not stop_thread: + time.sleep(check_interval) + finally: + observer.stop() + observer.join() + + + # Start a background thread to continuously print new log lines + t = threading.Thread(target=print_log_content, daemon=True) + t.start() + yield daemon_client + # After the test finishes, signal the thread to stop and join it + stop_thread = True + t.join(timeout=5) + @pytest.fixture def stopped_daemon_client(daemon_client): diff --git a/tests/cmdline/commands/test_rabbitmq.py b/tests/cmdline/commands/test_rabbitmq.py index 8bb443cfe2..a60630bbbb 100644 --- a/tests/cmdline/commands/test_rabbitmq.py +++ b/tests/cmdline/commands/test_rabbitmq.py @@ -9,7 +9,7 @@ """Tests for ``verdi devel rabbitmq``.""" import pytest -from plumpy.process_comms import RemoteProcessThreadController +from plumpy.rmq import RemoteProcessThreadController from aiida.cmdline.commands import cmd_rabbitmq from aiida.engine import ProcessState, submit diff --git a/tests/engine/processes/test_control.py b/tests/engine/processes/test_control.py index 5bb9b8b7a6..f66c301c6e 100644 --- a/tests/engine/processes/test_control.py +++ b/tests/engine/processes/test_control.py @@ -1,7 +1,7 @@ """Tests for the :mod:`aiida.engine.processes.control` module.""" import pytest -from plumpy.process_comms import RemoteProcessThreadController +from plumpy.rmq import RemoteProcessThreadController from aiida.engine import ProcessState from aiida.engine.launch import submit diff --git a/tests/engine/test_launch.py b/tests/engine/test_launch.py index 6aba82cdcc..fa960998ae 100644 --- a/tests/engine/test_launch.py +++ b/tests/engine/test_launch.py @@ -16,6 +16,7 @@ from aiida import orm from aiida.common import exceptions from aiida.engine import CalcJob, Process, WorkChain, calcfunction, launch +from aiida.engine.daemon.client import get_daemon_client from aiida.plugins import CalculationFactory ArithmeticAddCalculation = CalculationFactory('core.arithmetic.add') From b837d72e1400b88a569d6d39cdfa713c844b20d9 Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Thu, 19 Dec 2024 16:36:01 +0100 Subject: [PATCH 03/25] fix --- src/aiida/engine/processes/launcher.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/aiida/engine/processes/launcher.py b/src/aiida/engine/processes/launcher.py index e06024feae..c667032c39 100644 --- a/src/aiida/engine/processes/launcher.py +++ b/src/aiida/engine/processes/launcher.py @@ -57,9 +57,6 @@ async def _continue(self, pid, nowait, tag=None): from aiida.orm import Data, load_node from aiida.orm.utils import serialize - LOGGER.error("SHOWTHIS???") - print("!!!HTHTHTH") - try: node = load_node(pk=pid) except (exceptions.MultipleObjectsError, exceptions.NotExistent): @@ -88,7 +85,7 @@ async def _continue(self, pid, nowait, tag=None): return future.result() try: - result = await super()._continue(communicator, pid, nowait, tag) + result = await super()._continue(pid, nowait, tag) except ImportError as exception: message = 'the class of the process could not be imported.' self.handle_continue_exception(node, exception, message) From 92f16830ce5c382d6a49b1f9538715a1d57093de Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Fri, 20 Dec 2024 09:36:28 +0100 Subject: [PATCH 04/25] plumpy.futures.CancelledError is an alias of concurrent.futures.CancelledError not needed --- src/aiida/engine/processes/calcjobs/tasks.py | 23 ++++++++++---------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/aiida/engine/processes/calcjobs/tasks.py b/src/aiida/engine/processes/calcjobs/tasks.py index 1059d277ba..2f677c766f 100644 --- a/src/aiida/engine/processes/calcjobs/tasks.py +++ b/src/aiida/engine/processes/calcjobs/tasks.py @@ -11,6 +11,7 @@ from __future__ import annotations import asyncio +import concurrent.futures import functools import logging import tempfile @@ -101,13 +102,13 @@ async def do_upload(): try: logger.info(f'scheduled request to upload CalcJob<{node.pk}>') - ignore_exceptions = (plumpy.futures.CancelledError, PreSubmitException, plumpy.process_states.Interruption) + ignore_exceptions = (concurrent.futures.CancelledError, PreSubmitException, plumpy.process_states.Interruption) skip_submit = await exponential_backoff_retry( do_upload, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions ) except PreSubmitException: raise - except (plumpy.futures.CancelledError, plumpy.process_states.Interruption): + except (concurrent.futures.CancelledError, plumpy.process_states.Interruption): raise except Exception as exception: logger.warning(f'uploading CalcJob<{node.pk}> failed') @@ -149,11 +150,11 @@ async def do_submit(): try: logger.info(f'scheduled request to submit CalcJob<{node.pk}>') - ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption) + ignore_exceptions = (concurrent.futures.CancelledError, plumpy.process_states.Interruption) result = await exponential_backoff_retry( do_submit, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions ) - except (plumpy.futures.CancelledError, plumpy.process_states.Interruption): + except (concurrent.futures.CancelledError, plumpy.process_states.Interruption): raise except Exception as exception: logger.warning(f'submitting CalcJob<{node.pk}> failed') @@ -207,11 +208,11 @@ async def do_update(): try: logger.info(f'scheduled request to update CalcJob<{node.pk}>') - ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption) + ignore_exceptions = (concurrent.futures.CancelledError, plumpy.process_states.Interruption) job_done = await exponential_backoff_retry( do_update, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions ) - except (plumpy.futures.CancelledError, plumpy.process_states.Interruption): + except (concurrent.futures.CancelledError, plumpy.process_states.Interruption): raise except Exception as exception: logger.warning(f'updating CalcJob<{node.pk}> failed') @@ -257,11 +258,11 @@ async def do_monitor(): try: logger.info(f'scheduled request to monitor CalcJob<{node.pk}>') - ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption) + ignore_exceptions = (concurrent.futures.CancelledError, plumpy.process_states.Interruption) monitor_result = await exponential_backoff_retry( do_monitor, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions ) - except (plumpy.futures.CancelledError, plumpy.process_states.Interruption): + except (concurrent.futures.CancelledError, plumpy.process_states.Interruption): raise except Exception as exception: logger.warning(f'monitoring CalcJob<{node.pk}> failed') @@ -333,11 +334,11 @@ async def do_retrieve(): try: logger.info(f'scheduled request to retrieve CalcJob<{node.pk}>') - ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption) + ignore_exceptions = (concurrent.futures.CancelledError, plumpy.process_states.Interruption) result = await exponential_backoff_retry( do_retrieve, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions ) - except (plumpy.futures.CancelledError, plumpy.process_states.Interruption): + except (concurrent.futures.CancelledError, plumpy.process_states.Interruption): raise except Exception as exception: logger.warning(f'retrieving CalcJob<{node.pk}> failed') @@ -569,7 +570,7 @@ async def execute(self) -> plumpy.process_states.State: # type: ignore[override await self._kill_job(node, transport_queue) node.set_process_status(str(exception)) return self.retrieve(monitor_result=self._monitor_result) - except (plumpy.futures.CancelledError, asyncio.CancelledError): + except (concurrent.futures.CancelledError, asyncio.CancelledError): node.set_process_status(f'Transport task {self._command} was cancelled') raise except plumpy.process_states.Interruption: From 6085bca4271b717becc09177943458cce2919eed Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Fri, 20 Dec 2024 15:50:31 +0100 Subject: [PATCH 05/25] Adopt with new message type and solve import issues --- src/aiida/cmdline/commands/cmd_process.py | 8 ++++---- src/aiida/engine/processes/control.py | 19 +++++++++++-------- src/aiida/engine/processes/functions.py | 2 +- src/aiida/engine/processes/process.py | 15 +++++++++------ src/aiida/engine/runners.py | 2 +- src/aiida/manage/manager.py | 3 ++- tests/engine/processes/test_control.py | 1 + tests/engine/test_rmq.py | 8 ++++---- tests/engine/test_runners.py | 2 +- tests/engine/test_work_chain.py | 4 ++-- 10 files changed, 36 insertions(+), 28 deletions(-) diff --git a/src/aiida/cmdline/commands/cmd_process.py b/src/aiida/cmdline/commands/cmd_process.py index e203bdddfc..9f8ae1646b 100644 --- a/src/aiida/cmdline/commands/cmd_process.py +++ b/src/aiida/cmdline/commands/cmd_process.py @@ -340,8 +340,8 @@ def process_kill(processes, all_entries, timeout, wait): with capture_logging() as stream: try: - message = 'Killed through `verdi process kill`' - control.kill_processes(processes, all_entries=all_entries, timeout=timeout, wait=wait, message=message) + msg_text = 'Killed through `verdi process kill`' + control.kill_processes(processes, all_entries=all_entries, timeout=timeout, wait=wait, msg_text=msg_text) except control.ProcessTimeoutException as exception: echo.echo_critical(f'{exception}\n{REPAIR_INSTRUCTIONS}') @@ -371,8 +371,8 @@ def process_pause(processes, all_entries, timeout, wait): with capture_logging() as stream: try: - message = 'Paused through `verdi process pause`' - control.pause_processes(processes, all_entries=all_entries, timeout=timeout, wait=wait, message=message) + msg_text = 'Paused through `verdi process pause`' + control.pause_processes(processes, all_entries=all_entries, timeout=timeout, wait=wait, msg_text=msg_text) except control.ProcessTimeoutException as exception: echo.echo_critical(f'{exception}\n{REPAIR_INSTRUCTIONS}') diff --git a/src/aiida/engine/processes/control.py b/src/aiida/engine/processes/control.py index 6be3aa7693..f9c3434c16 100644 --- a/src/aiida/engine/processes/control.py +++ b/src/aiida/engine/processes/control.py @@ -4,6 +4,7 @@ import collections import concurrent +import functools import typing as t import kiwipy @@ -18,7 +19,7 @@ from aiida.orm import ProcessNode, QueryBuilder from aiida.tools.query.calculation import CalculationQueryBuilder -LOGGER = AIIDA_LOGGER.getChild('process_control') +LOGGER = AIIDA_LOGGER.getChild('engine.processes') class ProcessTimeoutException(AiidaException): @@ -135,7 +136,7 @@ def play_processes( def pause_processes( processes: list[ProcessNode] | None = None, *, - message: str = 'Paused through `aiida.engine.processes.control.pause_processes`', + msg_text: str = 'Paused through `aiida.engine.processes.control.pause_processes`', all_entries: bool = False, timeout: float = 5.0, wait: bool = False, @@ -164,13 +165,14 @@ def pause_processes( return controller = get_manager().get_process_controller() - _perform_actions(processes, controller.pause_process, 'pause', 'pausing', timeout, wait, msg=message) + action = functools.partial(controller.pause_process, msg_text=msg_text) + _perform_actions(processes, action, 'pause', 'pausing', timeout, wait) def kill_processes( processes: list[ProcessNode] | None = None, *, - message: str = 'Killed through `aiida.engine.processes.control.kill_processes`', + msg_text: str = 'Killed through `aiida.engine.processes.control.kill_processes`', all_entries: bool = False, timeout: float = 5.0, wait: bool = False, @@ -199,7 +201,8 @@ def kill_processes( return controller = get_manager().get_process_controller() - _perform_actions(processes, controller.kill_process, 'kill', 'killing', timeout, wait, msg=message) + action = functools.partial(controller.kill_process, msg_text=msg_text) + _perform_actions(processes, action, 'kill', 'killing', timeout, wait) def _perform_actions( @@ -281,9 +284,9 @@ def handle_result(result): unwrapped = unwrap_kiwi_future(future) result = unwrapped.result() except communications.TimeoutError: - LOGGER.error(f'call to {infinitive} Process<{process.pk}> timed out') + LOGGER.error(f'call to {infinitive} Process<{process.pk}> timed out', exc_info=True) except Exception as exception: - LOGGER.error(f'failed to {infinitive} Process<{process.pk}>: {exception}') + LOGGER.error(f'failed to {infinitive} Process<{process.pk}>: {exception}', exc_info=True) else: if isinstance(result, kiwipy.Future): LOGGER.report(f'scheduled {infinitive} Process<{process.pk}>') @@ -302,7 +305,7 @@ def handle_result(result): try: result = future.result() except Exception as exception: - LOGGER.error(f'failed to {infinitive} Process<{process.pk}>: {exception}') + LOGGER.error(f'failed to {infinitive} Process<{process.pk}>: {exception}', exc_info=True) else: handle_result(result) diff --git a/src/aiida/engine/processes/functions.py b/src/aiida/engine/processes/functions.py index 8bca68f55c..cca28abedd 100644 --- a/src/aiida/engine/processes/functions.py +++ b/src/aiida/engine/processes/functions.py @@ -235,7 +235,7 @@ def run_get_node(*args, **kwargs) -> tuple[dict[str, t.Any] | None, 'ProcessNode def kill_process(_num, _frame): """Send the kill signal to the process in the current scope.""" LOGGER.critical('runner received interrupt, killing process %s', process.pid) - result = process.kill(msg='Process was killed because the runner received an interrupt') + result = process.kill(msg_text='Process was killed because the runner received an interrupt') return result # Store the current handler on the signal such that it can be restored after process has terminated diff --git a/src/aiida/engine/processes/process.py b/src/aiida/engine/processes/process.py index c684a4801b..cb085901d3 100644 --- a/src/aiida/engine/processes/process.py +++ b/src/aiida/engine/processes/process.py @@ -39,8 +39,10 @@ import plumpy.futures import plumpy.persistence import plumpy.processes + # from kiwipy.communications import UnroutableError from plumpy.process_states import Finished, ProcessState + # from plumpy.processes import ConnectionClosed # type: ignore[attr-defined] from plumpy.processes import Process as PlumpyProcess from plumpy.utils import AttributesFrozendict @@ -318,7 +320,7 @@ def load_instance_state( else: self._runner = manager.get_manager().get_runner() - load_context = load_context.copyextend(loop=self._runner.loop, communicator=self._runner.communicator) + load_context = load_context.copyextend(loop=self._runner.loop, coordinator=self._runner.communicator) super().load_instance_state(saved_state, load_context) if self.SaveKeys.CALC_ID.value in saved_state: @@ -329,7 +331,7 @@ def load_instance_state( self.node.logger.info(f'Loaded process<{self.node.pk}> from saved state') - def kill(self, msg: Union[str, None] = None) -> Union[bool, plumpy.futures.Future]: + def kill(self, msg_text: Union[str, None] = None) -> Union[bool, plumpy.futures.Future]: """Kill the process and all the children calculations it called :param msg: message @@ -338,7 +340,7 @@ def kill(self, msg: Union[str, None] = None) -> Union[bool, plumpy.futures.Futur had_been_terminated = self.has_terminated() - result = super().kill(msg) + result = super().kill(msg_text) # Only kill children if we could be killed ourselves if result is not False and not had_been_terminated: @@ -348,10 +350,11 @@ def kill(self, msg: Union[str, None] = None) -> Union[bool, plumpy.futures.Futur self.logger.info('no controller available to kill child<%s>', child.pk) continue try: - result = self.runner.controller.kill_process(child.pk, f'Killed by parent<{self.node.pk}>') + result = self.runner.controller.kill_process(child.pk, msg_text=f'Killed by parent<{self.node.pk}>') result = asyncio.wrap_future(result) # type: ignore[arg-type] if asyncio.isfuture(result): killing.append(result) + # FIXME: use generic exception to catch the coordinator side exception # except ConnectionClosed: # self.logger.info('no connection available to kill child<%s>', child.pk) # except UnroutableError: @@ -365,10 +368,10 @@ def kill(self, msg: Union[str, None] = None) -> Union[bool, plumpy.futures.Futur if killing: # We are waiting for things to be killed, so return the 'gathered' future - kill_future = plumpy.futures.gather(*killing) + kill_future = asyncio.gather(*killing) result = self.loop.create_future() - def done(done_future: plumpy.futures.Future): + def done(done_future: asyncio.Future): is_all_killed = all(done_future.result()) result.set_result(is_all_killed) diff --git a/src/aiida/engine/runners.py b/src/aiida/engine/runners.py index b1f932becf..ad3e4c60a1 100644 --- a/src/aiida/engine/runners.py +++ b/src/aiida/engine/runners.py @@ -250,7 +250,7 @@ def kill_process(_num, _frame): LOGGER.warning('runner received interrupt, process %s already being killed', process_inited.pid) return LOGGER.critical('runner received interrupt, killing process %s', process_inited.pid) - process_inited.kill(msg='Process was killed because the runner received an interrupt') + process_inited.kill(msg_text='Process was killed because the runner received an interrupt') original_handler_int = signal.getsignal(signal.SIGINT) original_handler_term = signal.getsignal(signal.SIGTERM) diff --git a/src/aiida/manage/manager.py b/src/aiida/manage/manager.py index 651190454e..462d97c024 100644 --- a/src/aiida/manage/manager.py +++ b/src/aiida/manage/manager.py @@ -369,9 +369,10 @@ def get_process_controller(self) -> 'RemoteProcessThreadController': :return: the process controller instance """ - from plumpy.process_comms import RemoteProcessThreadController + from plumpy.rmq import RemoteProcessThreadController if self._process_controller is None: + # FIXME: use coordinator wrapper self._process_controller = RemoteProcessThreadController(self.get_communicator()) return self._process_controller diff --git a/tests/engine/processes/test_control.py b/tests/engine/processes/test_control.py index f66c301c6e..637dbed897 100644 --- a/tests/engine/processes/test_control.py +++ b/tests/engine/processes/test_control.py @@ -82,6 +82,7 @@ def test_kill_processes(submit_and_await): node = submit_and_await(WaitProcess, ProcessState.WAITING) control.kill_processes([node], wait=True) + # __import__('ipdb').set_trace() assert node.is_terminated assert node.is_killed assert node.process_status == 'Killed through `aiida.engine.processes.control.kill_processes`' diff --git a/tests/engine/test_rmq.py b/tests/engine/test_rmq.py index a2edc2fa41..3284ad4f7c 100644 --- a/tests/engine/test_rmq.py +++ b/tests/engine/test_rmq.py @@ -94,7 +94,7 @@ async def do_pause(): assert calc_node.paused kill_message = 'Sorry, you have to go mate' - kill_future = controller.kill_process(calc_node.pk, msg=kill_message) + kill_future = controller.kill_process(calc_node.pk, msg_text=kill_message) future = await with_timeout(asyncio.wrap_future(kill_future)) result = await self.wait_future(asyncio.wrap_future(future)) assert result @@ -112,7 +112,7 @@ async def do_pause_play(): await asyncio.sleep(0.1) pause_message = 'Take a seat' - pause_future = controller.pause_process(calc_node.pk, msg=pause_message) + pause_future = controller.pause_process(calc_node.pk, msg_text=pause_message) future = await with_timeout(asyncio.wrap_future(pause_future)) result = await self.wait_future(asyncio.wrap_future(future)) assert calc_node.paused @@ -127,7 +127,7 @@ async def do_pause_play(): assert calc_node.process_status is None kill_message = 'Sorry, you have to go mate' - kill_future = controller.kill_process(calc_node.pk, msg=kill_message) + kill_future = controller.kill_process(calc_node.pk, msg_text=kill_message) future = await with_timeout(asyncio.wrap_future(kill_future)) result = await self.wait_future(asyncio.wrap_future(future)) assert result @@ -145,7 +145,7 @@ async def do_kill(): await asyncio.sleep(0.1) kill_message = 'Sorry, you have to go mate' - kill_future = controller.kill_process(calc_node.pk, msg=kill_message) + kill_future = controller.kill_process(calc_node.pk, msg_text=kill_message) future = await with_timeout(asyncio.wrap_future(kill_future)) result = await self.wait_future(asyncio.wrap_future(future)) assert result diff --git a/tests/engine/test_runners.py b/tests/engine/test_runners.py index db6e0e7493..c9529a9bfe 100644 --- a/tests/engine/test_runners.py +++ b/tests/engine/test_runners.py @@ -51,7 +51,7 @@ def test_call_on_process_finish(runner): """Test call on calculation finish.""" loop = runner.loop proc = Proc(runner=runner, inputs={'a': Str('input')}) - future = plumpy.Future() + future = asyncio.Future() event = threading.Event() def calc_done(): diff --git a/tests/engine/test_work_chain.py b/tests/engine/test_work_chain.py index 4fb8c70667..6480a45f81 100644 --- a/tests/engine/test_work_chain.py +++ b/tests/engine/test_work_chain.py @@ -28,7 +28,7 @@ def run_until_paused(proc): """Set up a future that will be resolved when process is paused""" listener = plumpy.ProcessListener() - paused = plumpy.Future() + paused = asyncio.Future() if proc.paused: paused.set_result(True) @@ -49,7 +49,7 @@ def run_until_waiting(proc): from aiida.engine import ProcessState listener = plumpy.ProcessListener() - in_waiting = plumpy.Future() + in_waiting = asyncio.Future() if proc.state == ProcessState.WAITING: in_waiting.set_result(True) From 61934f3070dc89c69f701e8876d8334daee474b6 Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Fri, 20 Dec 2024 16:24:15 +0100 Subject: [PATCH 06/25] apply timeout for pytest to 30s --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 9461b7a46a..833935793b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -387,6 +387,7 @@ minversion = '7.0' testpaths = [ 'tests' ] +timeout = 30 xfail_strict = true [tool.ruff] From d5733b225c4e1e916c218537b3a0447d6f9f504d Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Fri, 20 Dec 2024 16:33:37 +0100 Subject: [PATCH 07/25] use aiida_profile_clean for test_input_code test --- tests/tools/archive/orm/test_codes.py | 4 ++-- tests/tools/groups/test_paths.py | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/tools/archive/orm/test_codes.py b/tests/tools/archive/orm/test_codes.py index 27ae5f7882..5edc7e2fc7 100644 --- a/tests/tools/archive/orm/test_codes.py +++ b/tests/tools/archive/orm/test_codes.py @@ -35,7 +35,7 @@ def test_that_solo_code_is_exported_correctly(aiida_profile, tmp_path, aiida_loc assert orm.load_node(code_uuid).label == code_label -def test_input_code(aiida_profile, tmp_path, aiida_localhost): +def test_input_code(aiida_profile_clean, tmp_path, aiida_localhost): """This test checks that when a calculation is exported then the corresponding code is also exported. It also checks that the links are also in place after the import. @@ -59,7 +59,7 @@ def test_input_code(aiida_profile, tmp_path, aiida_localhost): export_file = tmp_path / 'export.aiida' create_archive([calc], filename=export_file) - aiida_profile.reset_storage() + aiida_profile_clean.reset_storage() import_archive(export_file) diff --git a/tests/tools/groups/test_paths.py b/tests/tools/groups/test_paths.py index 6ff2459650..3a8d4dbb69 100644 --- a/tests/tools/groups/test_paths.py +++ b/tests/tools/groups/test_paths.py @@ -116,6 +116,7 @@ def test_walk(setup_groups): @pytest.mark.filterwarnings('ignore::UserWarning') +@pytest.mark.usefixtures('aiida_profile_clean') def test_walk_with_invalid_path(): """Test the ``GroupPath.walk`` method with invalid paths.""" for label in ['a', 'a/b', 'a/c/d', 'a/c/e/g', 'a/f', 'bad//group', 'bad/other']: From b1f446a6b01ea4d381e65be33df29136ddef2941 Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Fri, 20 Dec 2024 16:45:57 +0100 Subject: [PATCH 08/25] bind and test against the corresponded rmq-out branch of unkcpz/plumpy changes --- pyproject.toml | 6 ++- src/aiida/engine/processes/launcher.py | 1 - src/aiida/engine/runners.py | 3 +- src/aiida/tools/pytest_fixtures/daemon.py | 9 +++-- tests/engine/test_launch.py | 1 - tests/engine/test_runners.py | 1 - uv.lock | 49 ++++++++++++++++++++--- 7 files changed, 54 insertions(+), 16 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 833935793b..c058321827 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,7 @@ dependencies = [ 'importlib-metadata~=6.0', 'numpy~=1.21', 'paramiko~=3.0', - 'plumpy~=0.22.3', + 'plumpy', 'pgsu~=0.3.0', 'psutil~=5.6', 'psycopg[binary]~=3.0', @@ -246,6 +246,7 @@ tests = [ 'pympler~=1.0', 'coverage~=7.0', 'sphinx~=7.2.0', + 'watchdog~=6.0', 'docutils~=0.20' ] tui = [ @@ -510,3 +511,6 @@ passenv = AIIDA_TEST_WORKERS commands = molecule {posargs:test} """ + +[tool.uv.sources] +plumpy = {git = "https://github.com/unkcpz/plumpy", branch = "rmq-out"} diff --git a/src/aiida/engine/processes/launcher.py b/src/aiida/engine/processes/launcher.py index c667032c39..73cb652ee1 100644 --- a/src/aiida/engine/processes/launcher.py +++ b/src/aiida/engine/processes/launcher.py @@ -38,7 +38,6 @@ def handle_continue_exception(node, exception, message): node.set_process_state(ProcessState.EXCEPTED) node.seal() - async def _continue(self, pid, nowait, tag=None): """Continue the task. diff --git a/src/aiida/engine/runners.py b/src/aiida/engine/runners.py index ad3e4c60a1..bb6843bfbd 100644 --- a/src/aiida/engine/runners.py +++ b/src/aiida/engine/runners.py @@ -19,10 +19,9 @@ from typing import Any, Callable, Dict, NamedTuple, Optional, Tuple, Type, Union import kiwipy -from plumpy.rmq import wrap_communicator from plumpy.events import reset_event_loop_policy, set_event_loop_policy from plumpy.persistence import Persister -from plumpy.rmq import RemoteProcessThreadController +from plumpy.rmq import RemoteProcessThreadController, wrap_communicator from aiida.common import exceptions from aiida.orm import ProcessNode, load_node diff --git a/src/aiida/tools/pytest_fixtures/daemon.py b/src/aiida/tools/pytest_fixtures/daemon.py index b5f4363ddd..54959e0a3f 100644 --- a/src/aiida/tools/pytest_fixtures/daemon.py +++ b/src/aiida/tools/pytest_fixtures/daemon.py @@ -63,13 +63,15 @@ def test(started_daemon_client): daemon_client.start_daemon() assert daemon_client.is_daemon_running - import time import threading + import time + + from watchdog.events import FileSystemEventHandler + # XXX: watchdog and pytest-timeout as extra deps of tests from watchdog.observers import Observer - from watchdog.events import FileSystemEventHandler - logger = logging.getLogger("tests.daemon:started_daemon_client") + logger = logging.getLogger('tests.daemon:started_daemon_client') logger.debug(f'Daemon log file is located at: {daemon_client.daemon_log_file}') @@ -105,7 +107,6 @@ def print_log_content(check_interval=0.1): observer.stop() observer.join() - # Start a background thread to continuously print new log lines t = threading.Thread(target=print_log_content, daemon=True) t.start() diff --git a/tests/engine/test_launch.py b/tests/engine/test_launch.py index fa960998ae..6aba82cdcc 100644 --- a/tests/engine/test_launch.py +++ b/tests/engine/test_launch.py @@ -16,7 +16,6 @@ from aiida import orm from aiida.common import exceptions from aiida.engine import CalcJob, Process, WorkChain, calcfunction, launch -from aiida.engine.daemon.client import get_daemon_client from aiida.plugins import CalculationFactory ArithmeticAddCalculation = CalculationFactory('core.arithmetic.add') diff --git a/tests/engine/test_runners.py b/tests/engine/test_runners.py index c9529a9bfe..3281cfc535 100644 --- a/tests/engine/test_runners.py +++ b/tests/engine/test_runners.py @@ -11,7 +11,6 @@ import asyncio import threading -import plumpy import pytest from aiida.calculations.arithmetic.add import ArithmeticAddCalculation diff --git a/uv.lock b/uv.lock index f569268776..4d4c33fcc5 100644 --- a/uv.lock +++ b/uv.lock @@ -119,6 +119,7 @@ pre-commit = [ { name = "tomli" }, { name = "trogon" }, { name = "types-pyyaml" }, + { name = "watchdog" }, ] rest = [ { name = "flask" }, @@ -148,6 +149,7 @@ tests = [ { name = "pytest-timeout" }, { name = "pytest-xdist" }, { name = "sphinx" }, + { name = "watchdog" }, ] tui = [ { name = "trogon" }, @@ -191,7 +193,7 @@ requires-dist = [ { name = "pg8000", marker = "extra == 'tests'", specifier = "~=1.13" }, { name = "pgsu", specifier = "~=0.3.0" }, { name = "pgtest", marker = "extra == 'tests'", specifier = "~=1.3,>=1.3.1" }, - { name = "plumpy", specifier = "~=0.22.3" }, + { name = "plumpy", git = "https://github.com/unkcpz/plumpy?branch=rmq-out" }, { name = "pre-commit", marker = "extra == 'pre-commit'", specifier = "~=3.5" }, { name = "psutil", specifier = "~=5.6" }, { name = "psycopg", extras = ["binary"], specifier = "~=3.0" }, @@ -235,6 +237,7 @@ requires-dist = [ { name = "types-pyyaml", marker = "extra == 'pre-commit'" }, { name = "typing-extensions", marker = "python_full_version < '3.10'", specifier = "~=4.0" }, { name = "upf-to-json", specifier = "~=0.9.2" }, + { name = "watchdog", marker = "extra == 'tests'", specifier = "~=6.0" }, { name = "wrapt", specifier = "~=1.11" }, ] @@ -2920,15 +2923,12 @@ wheels = [ [[package]] name = "plumpy" version = "0.22.3" -source = { registry = "https://pypi.org/simple" } +source = { git = "https://github.com/unkcpz/plumpy?branch=rmq-out#d953f06954d39e0c4de630062a6c3326f2de645d" } dependencies = [ { name = "kiwipy", extra = ["rmq"] }, { name = "nest-asyncio" }, { name = "pyyaml" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/ab/99/6c931d3f4697acd34cf18eb3fbfe96ed55cd0408d9be7c0f316349117a8e/plumpy-0.22.3.tar.gz", hash = "sha256:e58f45e6360f173babf04e2a4abacae9867622768ce2a126c8260db3b46372c4", size = 73582 } -wheels = [ - { url = "https://files.pythonhosted.org/packages/95/d9/12fd8281f494ca79d6a7a9d40099616d16415be5807959e5b024dffe8aed/plumpy-0.22.3-py3-none-any.whl", hash = "sha256:63ae6c90713f52483836a3b2b3e1941eab7ada920c303092facc27e78229bdc3", size = 74244 }, + { name = "typing-extensions" }, ] [[package]] @@ -4649,6 +4649,43 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ae/92/78324ff89391e00c8f4cf6b8526c41c6ef36b4ea2d2c132250b1a6fc2b8d/virtualenv-20.27.1-py3-none-any.whl", hash = "sha256:f11f1b8a29525562925f745563bfd48b189450f61fb34c4f9cc79dd5aa32a1f4", size = 3117838 }, ] +[[package]] +name = "watchdog" +version = "6.0.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/db/7d/7f3d619e951c88ed75c6037b246ddcf2d322812ee8ea189be89511721d54/watchdog-6.0.0.tar.gz", hash = "sha256:9ddf7c82fda3ae8e24decda1338ede66e1c99883db93711d8fb941eaa2d8c282", size = 131220 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/0c/56/90994d789c61df619bfc5ce2ecdabd5eeff564e1eb47512bd01b5e019569/watchdog-6.0.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:d1cdb490583ebd691c012b3d6dae011000fe42edb7a82ece80965b42abd61f26", size = 96390 }, + { url = "https://files.pythonhosted.org/packages/55/46/9a67ee697342ddf3c6daa97e3a587a56d6c4052f881ed926a849fcf7371c/watchdog-6.0.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:bc64ab3bdb6a04d69d4023b29422170b74681784ffb9463ed4870cf2f3e66112", size = 88389 }, + { url = "https://files.pythonhosted.org/packages/44/65/91b0985747c52064d8701e1075eb96f8c40a79df889e59a399453adfb882/watchdog-6.0.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:c897ac1b55c5a1461e16dae288d22bb2e412ba9807df8397a635d88f671d36c3", size = 89020 }, + { url = "https://files.pythonhosted.org/packages/e0/24/d9be5cd6642a6aa68352ded4b4b10fb0d7889cb7f45814fb92cecd35f101/watchdog-6.0.0-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:6eb11feb5a0d452ee41f824e271ca311a09e250441c262ca2fd7ebcf2461a06c", size = 96393 }, + { url = "https://files.pythonhosted.org/packages/63/7a/6013b0d8dbc56adca7fdd4f0beed381c59f6752341b12fa0886fa7afc78b/watchdog-6.0.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:ef810fbf7b781a5a593894e4f439773830bdecb885e6880d957d5b9382a960d2", size = 88392 }, + { url = "https://files.pythonhosted.org/packages/d1/40/b75381494851556de56281e053700e46bff5b37bf4c7267e858640af5a7f/watchdog-6.0.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:afd0fe1b2270917c5e23c2a65ce50c2a4abb63daafb0d419fde368e272a76b7c", size = 89019 }, + { url = "https://files.pythonhosted.org/packages/39/ea/3930d07dafc9e286ed356a679aa02d777c06e9bfd1164fa7c19c288a5483/watchdog-6.0.0-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:bdd4e6f14b8b18c334febb9c4425a878a2ac20efd1e0b231978e7b150f92a948", size = 96471 }, + { url = "https://files.pythonhosted.org/packages/12/87/48361531f70b1f87928b045df868a9fd4e253d9ae087fa4cf3f7113be363/watchdog-6.0.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:c7c15dda13c4eb00d6fb6fc508b3c0ed88b9d5d374056b239c4ad1611125c860", size = 88449 }, + { url = "https://files.pythonhosted.org/packages/5b/7e/8f322f5e600812e6f9a31b75d242631068ca8f4ef0582dd3ae6e72daecc8/watchdog-6.0.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:6f10cb2d5902447c7d0da897e2c6768bca89174d0c6e1e30abec5421af97a5b0", size = 89054 }, + { url = "https://files.pythonhosted.org/packages/68/98/b0345cabdce2041a01293ba483333582891a3bd5769b08eceb0d406056ef/watchdog-6.0.0-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:490ab2ef84f11129844c23fb14ecf30ef3d8a6abafd3754a6f75ca1e6654136c", size = 96480 }, + { url = "https://files.pythonhosted.org/packages/85/83/cdf13902c626b28eedef7ec4f10745c52aad8a8fe7eb04ed7b1f111ca20e/watchdog-6.0.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:76aae96b00ae814b181bb25b1b98076d5fc84e8a53cd8885a318b42b6d3a5134", size = 88451 }, + { url = "https://files.pythonhosted.org/packages/fe/c4/225c87bae08c8b9ec99030cd48ae9c4eca050a59bf5c2255853e18c87b50/watchdog-6.0.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:a175f755fc2279e0b7312c0035d52e27211a5bc39719dd529625b1930917345b", size = 89057 }, + { url = "https://files.pythonhosted.org/packages/05/52/7223011bb760fce8ddc53416beb65b83a3ea6d7d13738dde75eeb2c89679/watchdog-6.0.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:e6f0e77c9417e7cd62af82529b10563db3423625c5fce018430b249bf977f9e8", size = 96390 }, + { url = "https://files.pythonhosted.org/packages/9c/62/d2b21bc4e706d3a9d467561f487c2938cbd881c69f3808c43ac1ec242391/watchdog-6.0.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:90c8e78f3b94014f7aaae121e6b909674df5b46ec24d6bebc45c44c56729af2a", size = 88386 }, + { url = "https://files.pythonhosted.org/packages/ea/22/1c90b20eda9f4132e4603a26296108728a8bfe9584b006bd05dd94548853/watchdog-6.0.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:e7631a77ffb1f7d2eefa4445ebbee491c720a5661ddf6df3498ebecae5ed375c", size = 89017 }, + { url = "https://files.pythonhosted.org/packages/30/ad/d17b5d42e28a8b91f8ed01cb949da092827afb9995d4559fd448d0472763/watchdog-6.0.0-pp310-pypy310_pp73-macosx_10_15_x86_64.whl", hash = "sha256:c7ac31a19f4545dd92fc25d200694098f42c9a8e391bc00bdd362c5736dbf881", size = 87902 }, + { url = "https://files.pythonhosted.org/packages/5c/ca/c3649991d140ff6ab67bfc85ab42b165ead119c9e12211e08089d763ece5/watchdog-6.0.0-pp310-pypy310_pp73-macosx_11_0_arm64.whl", hash = "sha256:9513f27a1a582d9808cf21a07dae516f0fab1cf2d7683a742c498b93eedabb11", size = 88380 }, + { url = "https://files.pythonhosted.org/packages/5b/79/69f2b0e8d3f2afd462029031baafb1b75d11bb62703f0e1022b2e54d49ee/watchdog-6.0.0-pp39-pypy39_pp73-macosx_10_15_x86_64.whl", hash = "sha256:7a0e56874cfbc4b9b05c60c8a1926fedf56324bb08cfbc188969777940aef3aa", size = 87903 }, + { url = "https://files.pythonhosted.org/packages/e2/2b/dc048dd71c2e5f0f7ebc04dd7912981ec45793a03c0dc462438e0591ba5d/watchdog-6.0.0-pp39-pypy39_pp73-macosx_11_0_arm64.whl", hash = "sha256:e6439e374fc012255b4ec786ae3c4bc838cd7309a540e5fe0952d03687d8804e", size = 88381 }, + { url = "https://files.pythonhosted.org/packages/a9/c7/ca4bf3e518cb57a686b2feb4f55a1892fd9a3dd13f470fca14e00f80ea36/watchdog-6.0.0-py3-none-manylinux2014_aarch64.whl", hash = "sha256:7607498efa04a3542ae3e05e64da8202e58159aa1fa4acddf7678d34a35d4f13", size = 79079 }, + { url = "https://files.pythonhosted.org/packages/5c/51/d46dc9332f9a647593c947b4b88e2381c8dfc0942d15b8edc0310fa4abb1/watchdog-6.0.0-py3-none-manylinux2014_armv7l.whl", hash = "sha256:9041567ee8953024c83343288ccc458fd0a2d811d6a0fd68c4c22609e3490379", size = 79078 }, + { url = "https://files.pythonhosted.org/packages/d4/57/04edbf5e169cd318d5f07b4766fee38e825d64b6913ca157ca32d1a42267/watchdog-6.0.0-py3-none-manylinux2014_i686.whl", hash = "sha256:82dc3e3143c7e38ec49d61af98d6558288c415eac98486a5c581726e0737c00e", size = 79076 }, + { url = "https://files.pythonhosted.org/packages/ab/cc/da8422b300e13cb187d2203f20b9253e91058aaf7db65b74142013478e66/watchdog-6.0.0-py3-none-manylinux2014_ppc64.whl", hash = "sha256:212ac9b8bf1161dc91bd09c048048a95ca3a4c4f5e5d4a7d1b1a7d5752a7f96f", size = 79077 }, + { url = "https://files.pythonhosted.org/packages/2c/3b/b8964e04ae1a025c44ba8e4291f86e97fac443bca31de8bd98d3263d2fcf/watchdog-6.0.0-py3-none-manylinux2014_ppc64le.whl", hash = "sha256:e3df4cbb9a450c6d49318f6d14f4bbc80d763fa587ba46ec86f99f9e6876bb26", size = 79078 }, + { url = "https://files.pythonhosted.org/packages/62/ae/a696eb424bedff7407801c257d4b1afda455fe40821a2be430e173660e81/watchdog-6.0.0-py3-none-manylinux2014_s390x.whl", hash = "sha256:2cce7cfc2008eb51feb6aab51251fd79b85d9894e98ba847408f662b3395ca3c", size = 79077 }, + { url = "https://files.pythonhosted.org/packages/b5/e8/dbf020b4d98251a9860752a094d09a65e1b436ad181faf929983f697048f/watchdog-6.0.0-py3-none-manylinux2014_x86_64.whl", hash = "sha256:20ffe5b202af80ab4266dcd3e91aae72bf2da48c0d33bdb15c66658e685e94e2", size = 79078 }, + { url = "https://files.pythonhosted.org/packages/07/f6/d0e5b343768e8bcb4cda79f0f2f55051bf26177ecd5651f84c07567461cf/watchdog-6.0.0-py3-none-win32.whl", hash = "sha256:07df1fdd701c5d4c8e55ef6cf55b8f0120fe1aef7ef39a1c6fc6bc2e606d517a", size = 79065 }, + { url = "https://files.pythonhosted.org/packages/db/d9/c495884c6e548fce18a8f40568ff120bc3a4b7b99813081c8ac0c936fa64/watchdog-6.0.0-py3-none-win_amd64.whl", hash = "sha256:cbafb470cf848d93b5d013e2ecb245d4aa1c8fd0504e863ccefa32445359d680", size = 79070 }, + { url = "https://files.pythonhosted.org/packages/33/e8/e40370e6d74ddba47f002a32919d91310d6074130fe4e17dabcafc15cbf1/watchdog-6.0.0-py3-none-win_ia64.whl", hash = "sha256:a1914259fa9e1454315171103c6a30961236f508b9b623eae470268bbcc6a22f", size = 79067 }, +] + [[package]] name = "wcwidth" version = "0.2.13" From 36f1a86efa5c63753306e067b02e95a411a979ef Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Sat, 21 Dec 2024 02:31:09 +0100 Subject: [PATCH 09/25] Simplipy create_runner function signature --- src/aiida/engine/runners.py | 2 +- src/aiida/manage/manager.py | 48 +++++++++++++++++++----------------- tests/engine/test_futures.py | 2 +- 3 files changed, 27 insertions(+), 25 deletions(-) diff --git a/src/aiida/engine/runners.py b/src/aiida/engine/runners.py index bb6843bfbd..5845077c2f 100644 --- a/src/aiida/engine/runners.py +++ b/src/aiida/engine/runners.py @@ -81,7 +81,7 @@ def __init__( ), 'Must supply a persister if you want to submit using communicator' set_event_loop_policy() - self._loop = loop if loop is not None else asyncio.get_event_loop() + self._loop = loop or asyncio.get_event_loop() self._poll_interval = poll_interval self._broker_submit = broker_submit self._transport = transports.TransportQueue(self._loop) diff --git a/src/aiida/manage/manager.py b/src/aiida/manage/manager.py index 462d97c024..f6eebfcdca 100644 --- a/src/aiida/manage/manager.py +++ b/src/aiida/manage/manager.py @@ -12,9 +12,10 @@ from typing import TYPE_CHECKING, Any, Optional, Union -if TYPE_CHECKING: - import asyncio +import asyncio +import kiwipy +if TYPE_CHECKING: from kiwipy.rmq import RmqThreadCommunicator from plumpy.process_comms import RemoteProcessThreadController @@ -381,7 +382,6 @@ def get_runner(self, **kwargs) -> 'Runner': """Return a runner that is based on the current profile settings and can be used globally by the code. :return: the global runner - """ if self._runner is None: self._runner = self.create_runner(**kwargs) @@ -392,20 +392,25 @@ def set_runner(self, new_runner: 'Runner') -> None: """Set the currently used runner :param new_runner: the new runner to use - """ if self._runner is not None: self._runner.close() self._runner = new_runner - def create_runner(self, with_persistence: bool = True, **kwargs: Any) -> 'Runner': - """Create and return a new runner + def create_runner( + self, + poll_interval: Union[int, float] | None = None, + loop: Optional[asyncio.AbstractEventLoop] = None, + communicator: Optional[kiwipy.Communicator] = None, + broker_submit: bool = False, + persister: Optional[AiiDAPersister] = None, + ) -> 'Runner': + """Create and return a new runner, with default settings from profile. :param with_persistence: create a runner with persistence enabled :return: a new runner instance - """ from aiida.common import ConfigurationError from aiida.engine import runners @@ -415,23 +420,20 @@ def create_runner(self, with_persistence: bool = True, **kwargs: Any) -> 'Runner raise ConfigurationError( 'Could not determine the current profile. Consider loading a profile using `aiida.load_profile()`.' ) - poll_interval = 0.0 if profile.is_test_profile else self.get_option('runner.poll.interval') - - settings = {'broker_submit': False, 'poll_interval': poll_interval} - settings.update(kwargs) - if 'communicator' not in settings: - # Only call get_communicator if we have to as it will lazily create - try: - settings['communicator'] = self.get_communicator() - except ConfigurationError: - # The currently loaded profile does not define a broker and so there is no communicator - pass - - if with_persistence and 'persister' not in settings: - settings['persister'] = self.get_persister() - - return runners.Runner(**settings) # type: ignore[arg-type] + _default_poll_interval = 0.0 if profile.is_test_profile else self.get_option('runner.poll.interval') + _default_broker_submit = False + _default_communicator = self.get_communicator() + _default_persister = self.get_persister() + + runner = runners.Runner( + poll_interval=poll_interval or _default_poll_interval, + loop=loop or asyncio.get_event_loop(), + communicator=communicator or _default_communicator, + broker_submit=broker_submit or _default_broker_submit, + persister=persister or _default_persister, + ) + return runner def create_daemon_runner(self, loop: Optional['asyncio.AbstractEventLoop'] = None) -> 'Runner': """Create and return a new daemon runner. diff --git a/tests/engine/test_futures.py b/tests/engine/test_futures.py index b8ba78aa8f..6bc9527ce9 100644 --- a/tests/engine/test_futures.py +++ b/tests/engine/test_futures.py @@ -31,7 +31,7 @@ def test_calculation_future_broadcasts(self): # No polling future = processes.futures.ProcessFuture( - pk=process.pid, loop=runner.loop, communicator=manager.get_communicator() + pk=process.pid, loop=runner.loop, communicator=manager.get_coordinator() ) run(process) From 28cdb1ce2dd5b749192916881dd6a9311594f85c Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Sat, 21 Dec 2024 03:08:55 +0100 Subject: [PATCH 10/25] Runner use coordinator interface --- src/aiida/brokers/broker.py | 4 +++ src/aiida/brokers/rabbitmq/broker.py | 8 +++++- src/aiida/engine/processes/process.py | 7 +++--- src/aiida/engine/runners.py | 34 ++++++++++++++------------ src/aiida/manage/manager.py | 35 ++++++++++++++++++++------- tests/engine/test_futures.py | 2 +- 6 files changed, 59 insertions(+), 31 deletions(-) diff --git a/src/aiida/brokers/broker.py b/src/aiida/brokers/broker.py index cfb8b3d50e..df8e628a21 100644 --- a/src/aiida/brokers/broker.py +++ b/src/aiida/brokers/broker.py @@ -23,6 +23,10 @@ def __init__(self, profile: 'Profile') -> None: def get_communicator(self): """Return an instance of :class:`kiwipy.Communicator`.""" + @abc.abstractmethod + def get_coordinator(self): + """Return an instance of coordinator.""" + @abc.abstractmethod def iterate_tasks(self): """Return an iterator over the tasks in the launch queue.""" diff --git a/src/aiida/brokers/rabbitmq/broker.py b/src/aiida/brokers/rabbitmq/broker.py index c4ecfa2400..8096747be1 100644 --- a/src/aiida/brokers/rabbitmq/broker.py +++ b/src/aiida/brokers/rabbitmq/broker.py @@ -5,6 +5,8 @@ import functools import typing as t +from plumpy.rmq import RmqCoordinator + from aiida.brokers.broker import Broker from aiida.common.log import AIIDA_LOGGER from aiida.manage.configuration import get_config_option @@ -13,7 +15,6 @@ if t.TYPE_CHECKING: from kiwipy.rmq import RmqThreadCommunicator - from aiida.manage.configuration.profile import Profile LOGGER = AIIDA_LOGGER.getChild('broker.rabbitmq') @@ -58,6 +59,11 @@ def get_communicator(self) -> 'RmqThreadCommunicator': return self._communicator + def get_coordinator(self): + coordinator = RmqCoordinator(self.get_communicator()) + + return coordinator + def _create_communicator(self) -> 'RmqThreadCommunicator': """Return an instance of :class:`kiwipy.Communicator`.""" from kiwipy.rmq import RmqThreadCommunicator diff --git a/src/aiida/engine/processes/process.py b/src/aiida/engine/processes/process.py index cb085901d3..a678b115c7 100644 --- a/src/aiida/engine/processes/process.py +++ b/src/aiida/engine/processes/process.py @@ -41,9 +41,9 @@ import plumpy.processes # from kiwipy.communications import UnroutableError +# from plumpy.processes import ConnectionClosed # type: ignore[attr-defined] from plumpy.process_states import Finished, ProcessState -# from plumpy.processes import ConnectionClosed # type: ignore[attr-defined] from plumpy.processes import Process as PlumpyProcess from plumpy.utils import AttributesFrozendict @@ -174,13 +174,12 @@ def __init__( from aiida.manage import manager self._runner = runner if runner is not None else manager.get_manager().get_runner() - # assert self._runner.communicator is not None, 'communicator not set for runner' super().__init__( inputs=self.spec().inputs.serialize(inputs), logger=logger, loop=self._runner.loop, - coordinator=self._runner.communicator, + coordinator=self._runner.coordinator, ) self._node: Optional[orm.ProcessNode] = None @@ -320,7 +319,7 @@ def load_instance_state( else: self._runner = manager.get_manager().get_runner() - load_context = load_context.copyextend(loop=self._runner.loop, coordinator=self._runner.communicator) + load_context = load_context.copyextend(loop=self._runner.loop, coordinator=self._runner.coordinator) super().load_instance_state(saved_state, load_context) if self.SaveKeys.CALC_ID.value in saved_state: diff --git a/src/aiida/engine/runners.py b/src/aiida/engine/runners.py index 5845077c2f..e1dd3c38f5 100644 --- a/src/aiida/engine/runners.py +++ b/src/aiida/engine/runners.py @@ -19,6 +19,7 @@ from typing import Any, Callable, Dict, NamedTuple, Optional, Tuple, Type, Union import kiwipy +from plumpy.coordinator import Coordinator from plumpy.events import reset_event_loop_policy, set_event_loop_policy from plumpy.persistence import Persister from plumpy.rmq import RemoteProcessThreadController, wrap_communicator @@ -55,7 +56,7 @@ class Runner: """Class that can launch processes by running in the current interpreter or by submitting them to the daemon.""" _persister: Optional[Persister] = None - _communicator: Optional[kiwipy.Communicator] = None + _coordinator: Optional[Coordinator] = None _controller: Optional[RemoteProcessThreadController] = None _closed: bool = False @@ -63,7 +64,7 @@ def __init__( self, poll_interval: Union[int, float] = 0, loop: Optional[asyncio.AbstractEventLoop] = None, - communicator: Optional[kiwipy.Communicator] = None, + coordinator: Optional[Coordinator] = None, broker_submit: bool = False, persister: Optional[Persister] = None, ): @@ -71,14 +72,14 @@ def __init__( :param poll_interval: interval in seconds between polling for status of active sub processes :param loop: an asyncio event loop, if none is suppled a new one will be created - :param communicator: the communicator to use + :param coordinator: the coordinator to use :param broker_submit: if True, processes will be submitted to the broker, otherwise they will be scheduled here :param persister: the persister to use to persist processes """ assert not ( broker_submit and persister is None - ), 'Must supply a persister if you want to submit using communicator' + ), 'Must supply a persister if you want to submit using coordinator' set_event_loop_policy() self._loop = loop or asyncio.get_event_loop() @@ -89,11 +90,12 @@ def __init__( self._persister = persister self._plugin_version_provider = PluginVersionProvider() - if communicator is not None: - self._communicator = wrap_communicator(communicator, self._loop) - self._controller = RemoteProcessThreadController(communicator) + if coordinator is not None: + # FIXME: the wrap is not needed, when passed in, the coordinator should already wrapped + self._coordinator = wrap_communicator(coordinator.communicator, self._loop) + self._controller = RemoteProcessThreadController(coordinator) elif self._broker_submit: - LOGGER.warning('Disabling broker submission, no communicator provided') + LOGGER.warning('Disabling broker submission, no coordinator provided') self._broker_submit = False def __enter__(self) -> 'Runner': @@ -117,9 +119,9 @@ def persister(self) -> Optional[Persister]: return self._persister @property - def communicator(self) -> Optional[kiwipy.Communicator]: - """Get the communicator used by this runner.""" - return self._communicator + def coordinator(self) -> Optional[Coordinator]: + """Get the coordinator used by this runner.""" + return self._coordinator @property def plugin_version_provider(self) -> PluginVersionProvider: @@ -329,16 +331,16 @@ def inline_callback(event, *args, **kwargs): callback() finally: event.set() - if self.communicator: - self.communicator.remove_broadcast_subscriber(subscriber_identifier) + if self.coordinator: + self.coordinator.remove_broadcast_subscriber(subscriber_identifier) broadcast_filter = kiwipy.BroadcastFilter(functools.partial(inline_callback, event), sender=pk) for state in [ProcessState.FINISHED, ProcessState.KILLED, ProcessState.EXCEPTED]: broadcast_filter.add_subject_filter(f'state_changed.*.{state.value}') - if self.communicator: + if self.coordinator: LOGGER.info('adding subscriber for broadcasts of %d', pk) - self.communicator.add_broadcast_subscriber(broadcast_filter, subscriber_identifier) + self.coordinator.add_broadcast_subscriber(broadcast_filter, subscriber_identifier) self._poll_process(node, functools.partial(inline_callback, event)) def get_process_future(self, pk: int) -> futures.ProcessFuture: @@ -348,7 +350,7 @@ def get_process_future(self, pk: int) -> futures.ProcessFuture: :return: A future representing the completion of the process node """ - return futures.ProcessFuture(pk, self._loop, self._poll_interval, self._communicator) + return futures.ProcessFuture(pk, self._loop, self._poll_interval, self._coordinator) def _poll_process(self, node, callback): """Check whether the process state of the node is terminated and call the callback or reschedule it. diff --git a/src/aiida/manage/manager.py b/src/aiida/manage/manager.py index f6eebfcdca..6cfc7153af 100644 --- a/src/aiida/manage/manager.py +++ b/src/aiida/manage/manager.py @@ -14,6 +14,7 @@ import asyncio import kiwipy +from plumpy.coordinator import Coordinator if TYPE_CHECKING: from kiwipy.rmq import RmqThreadCommunicator @@ -60,8 +61,8 @@ class Manager: 3. A single storage backend object for the profile, to connect to data storage resources 5. A single daemon client object for the profile, to connect to the AiiDA daemon - 4. A single communicator object for the profile, to connect to the process control resources - 6. A single process controller object for the profile, which uses the communicator to control process tasks + 4. A single coordinator object for the profile, to connect to the process control resources + 6. A single process controller object for the profile, which uses the coordinator to control process tasks 7. A single runner object for the profile, which uses the process controller to start and stop processes 8. A single persister object for the profile, which can persist running processes to the profile storage @@ -343,6 +344,23 @@ def get_communicator(self) -> 'RmqThreadCommunicator': return broker.get_communicator() + def get_coordinator(self) -> 'Coordinator': + """Return the coordinator + + :return: a global coordinator instance + """ + from aiida.common import ConfigurationError + + broker = self.get_broker() + + if broker is None: + assert self._profile is not None + raise ConfigurationError( + f'profile `{self._profile.name}` does not provide a coordinator because it does not define a broker' + ) + + return broker.get_coordinator() + def get_daemon_client(self) -> 'DaemonClient': """Return the daemon client for the current profile. @@ -373,8 +391,7 @@ def get_process_controller(self) -> 'RemoteProcessThreadController': from plumpy.rmq import RemoteProcessThreadController if self._process_controller is None: - # FIXME: use coordinator wrapper - self._process_controller = RemoteProcessThreadController(self.get_communicator()) + self._process_controller = RemoteProcessThreadController(self.get_coordinator()) return self._process_controller @@ -402,7 +419,7 @@ def create_runner( self, poll_interval: Union[int, float] | None = None, loop: Optional[asyncio.AbstractEventLoop] = None, - communicator: Optional[kiwipy.Communicator] = None, + coordinator: Optional[Coordinator] = None, broker_submit: bool = False, persister: Optional[AiiDAPersister] = None, ) -> 'Runner': @@ -423,13 +440,13 @@ def create_runner( _default_poll_interval = 0.0 if profile.is_test_profile else self.get_option('runner.poll.interval') _default_broker_submit = False - _default_communicator = self.get_communicator() + _default_coordinator = self.get_coordinator() _default_persister = self.get_persister() runner = runners.Runner( poll_interval=poll_interval or _default_poll_interval, loop=loop or asyncio.get_event_loop(), - communicator=communicator or _default_communicator, + coordinator=coordinator or _default_coordinator, broker_submit=broker_submit or _default_broker_submit, persister=persister or _default_persister, ) @@ -461,8 +478,8 @@ def create_daemon_runner(self, loop: Optional['asyncio.AbstractEventLoop'] = Non loader=persistence.get_object_loader(), ) - assert runner.communicator is not None, 'communicator not set for runner' - runner.communicator.add_task_subscriber(task_receiver) + assert runner.coordinator is not None, 'coordinator not set for runner' + runner.coordinator.add_task_subscriber(task_receiver) return runner diff --git a/tests/engine/test_futures.py b/tests/engine/test_futures.py index 6bc9527ce9..b8ba78aa8f 100644 --- a/tests/engine/test_futures.py +++ b/tests/engine/test_futures.py @@ -31,7 +31,7 @@ def test_calculation_future_broadcasts(self): # No polling future = processes.futures.ProcessFuture( - pk=process.pid, loop=runner.loop, communicator=manager.get_coordinator() + pk=process.pid, loop=runner.loop, communicator=manager.get_communicator() ) run(process) From 5d59e6a036e882a0281c780dc32fe8df74c91570 Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Sat, 21 Dec 2024 03:37:21 +0100 Subject: [PATCH 11/25] Remove get_communicator calls --- src/aiida/brokers/broker.py | 8 +++----- src/aiida/brokers/rabbitmq/broker.py | 17 +++++++--------- src/aiida/cmdline/commands/cmd_process.py | 11 ++++++----- src/aiida/cmdline/commands/cmd_rabbitmq.py | 6 ++++-- src/aiida/cmdline/commands/cmd_status.py | 2 +- src/aiida/engine/processes/futures.py | 23 +++++++++++----------- src/aiida/manage/manager.py | 18 ----------------- tests/brokers/test_rabbitmq.py | 10 +++++----- tests/conftest.py | 7 ++++--- tests/engine/test_futures.py | 2 +- tests/manage/test_manager.py | 6 +++++- 11 files changed, 48 insertions(+), 62 deletions(-) diff --git a/src/aiida/brokers/broker.py b/src/aiida/brokers/broker.py index df8e628a21..941c69833d 100644 --- a/src/aiida/brokers/broker.py +++ b/src/aiida/brokers/broker.py @@ -3,8 +3,10 @@ import abc import typing as t + if t.TYPE_CHECKING: from aiida.manage.configuration.profile import Profile + from plumpy.coordinator import Coordinator __all__ = ('Broker',) @@ -20,11 +22,7 @@ def __init__(self, profile: 'Profile') -> None: self._profile = profile @abc.abstractmethod - def get_communicator(self): - """Return an instance of :class:`kiwipy.Communicator`.""" - - @abc.abstractmethod - def get_coordinator(self): + def get_coordinator(self) -> 'Coordinator': """Return an instance of coordinator.""" @abc.abstractmethod diff --git a/src/aiida/brokers/rabbitmq/broker.py b/src/aiida/brokers/rabbitmq/broker.py index 8096747be1..370afc6acf 100644 --- a/src/aiida/brokers/rabbitmq/broker.py +++ b/src/aiida/brokers/rabbitmq/broker.py @@ -31,7 +31,7 @@ def __init__(self, profile: Profile) -> None: :param profile: The profile. """ self._profile = profile - self._communicator: 'RmqThreadCommunicator' | None = None + self._communicator: 'RmqThreadCommunicator | None' = None self._prefix = f'aiida-{self._profile.uuid}' def __str__(self): @@ -48,19 +48,16 @@ def close(self): def iterate_tasks(self): """Return an iterator over the tasks in the launch queue.""" - for task in self.get_communicator().task_queue(get_launch_queue_name(self._prefix)): + for task in self.get_coordinator().communicator.task_queue(get_launch_queue_name(self._prefix)): yield task - def get_communicator(self) -> 'RmqThreadCommunicator': + def get_coordinator(self): if self._communicator is None: self._communicator = self._create_communicator() # Check whether a compatible version of RabbitMQ is being used. self.check_rabbitmq_version() - return self._communicator - - def get_coordinator(self): - coordinator = RmqCoordinator(self.get_communicator()) + coordinator = RmqCoordinator(self._communicator) return coordinator @@ -70,7 +67,7 @@ def _create_communicator(self) -> 'RmqThreadCommunicator': from aiida.orm.utils import serialize - self._communicator = RmqThreadCommunicator.connect( + _communicator = RmqThreadCommunicator.connect( connection_params={'url': self.get_url()}, message_exchange=get_message_exchange_name(self._prefix), encoder=functools.partial(serialize.serialize, encoding='utf-8'), @@ -84,7 +81,7 @@ def _create_communicator(self) -> 'RmqThreadCommunicator': testing_mode=self._profile.is_test_profile, ) - return self._communicator + return _communicator def check_rabbitmq_version(self): """Check the version of RabbitMQ that is being connected to and emit warning if it is not compatible.""" @@ -128,4 +125,4 @@ def get_rabbitmq_version(self): """ from packaging.version import parse - return parse(self.get_communicator().server_properties['version']) + return parse(self.get_coordinator().communicator.server_properties['version']) diff --git a/src/aiida/cmdline/commands/cmd_process.py b/src/aiida/cmdline/commands/cmd_process.py index 9f8ae1646b..a4e665a544 100644 --- a/src/aiida/cmdline/commands/cmd_process.py +++ b/src/aiida/cmdline/commands/cmd_process.py @@ -10,6 +10,7 @@ import click +from aiida.brokers.broker import Broker from aiida.cmdline.commands.cmd_verdi import verdi from aiida.cmdline.params import arguments, options, types from aiida.cmdline.utils import decorators, echo @@ -416,7 +417,7 @@ def process_play(processes, all_entries, timeout, wait): @decorators.with_dbenv() @decorators.with_broker @decorators.only_if_daemon_running(echo.echo_warning, 'daemon is not running, so process may not be reachable') -def process_watch(broker, processes, most_recent_node): +def process_watch(broker: Broker, processes, most_recent_node): """Watch the state transitions of processes. Watch the state transitions for one or multiple running processes.""" @@ -436,7 +437,7 @@ def process_watch(broker, processes, most_recent_node): from kiwipy import BroadcastFilter - def _print(communicator, body, sender, subject, correlation_id): + def _print(coordinator, body, sender, subject, correlation_id): """Format the incoming broadcast data into a message and echo it to stdout.""" if body is None: body = 'No message specified' @@ -446,7 +447,7 @@ def _print(communicator, body, sender, subject, correlation_id): echo.echo(f'Process<{sender}> [{subject}|{correlation_id}]: {body}') - communicator = broker.get_communicator() + coordinator = broker.get_coordinator() echo.echo_report('watching for broadcasted messages, press CTRL+C to stop...') if most_recent_node: @@ -457,7 +458,7 @@ def _print(communicator, body, sender, subject, correlation_id): echo.echo_error(f'Process<{process.pk}> is already terminated') continue - communicator.add_broadcast_subscriber(BroadcastFilter(_print, sender=process.pk)) + coordinator.add_broadcast_subscriber(BroadcastFilter(_print, sender=process.pk)) try: # Block this thread indefinitely until interrupt @@ -467,7 +468,7 @@ def _print(communicator, body, sender, subject, correlation_id): echo.echo('') # add a new line after the interrupt character echo.echo_report('received interrupt, exiting...') try: - communicator.close() + coordinator.close() except RuntimeError: pass diff --git a/src/aiida/cmdline/commands/cmd_rabbitmq.py b/src/aiida/cmdline/commands/cmd_rabbitmq.py index c6a66d6da2..99346896f2 100644 --- a/src/aiida/cmdline/commands/cmd_rabbitmq.py +++ b/src/aiida/cmdline/commands/cmd_rabbitmq.py @@ -20,6 +20,7 @@ from aiida.cmdline.commands.cmd_devel import verdi_devel from aiida.cmdline.params import arguments, options from aiida.cmdline.utils import decorators, echo, echo_tabulate +from aiida.manage.manager import Manager if t.TYPE_CHECKING: import requests @@ -131,12 +132,13 @@ def with_client(ctx, wrapped, _, args, kwargs): @cmd_rabbitmq.command('server-properties') @decorators.with_manager -def cmd_server_properties(manager): +def cmd_server_properties(manager: Manager): """List the server properties.""" import yaml data = {} - for key, value in manager.get_communicator().server_properties.items(): + # FIXME: server_properties as an common API for coordinator? + for key, value in manager.get_coordinator().communicator.server_properties.items(): data[key] = value.decode('utf-8') if isinstance(value, bytes) else value click.echo(yaml.dump(data, indent=4)) diff --git a/src/aiida/cmdline/commands/cmd_status.py b/src/aiida/cmdline/commands/cmd_status.py index 85ef292fa7..6ee1952fb1 100644 --- a/src/aiida/cmdline/commands/cmd_status.py +++ b/src/aiida/cmdline/commands/cmd_status.py @@ -132,7 +132,7 @@ def verdi_status(print_traceback, no_rmq): if broker: try: - broker.get_communicator() + broker.get_coordinator() except Exception as exc: message = f'Unable to connect to broker: {broker}' print_status(ServiceStatus.ERROR, 'broker', message, exception=exc, print_traceback=print_traceback) diff --git a/src/aiida/engine/processes/futures.py b/src/aiida/engine/processes/futures.py index 096c11b277..79016dacc3 100644 --- a/src/aiida/engine/processes/futures.py +++ b/src/aiida/engine/processes/futures.py @@ -12,6 +12,7 @@ from typing import Optional, Union import kiwipy +from plumpy.coordinator import Coordinator from aiida.orm import Node, load_node @@ -28,17 +29,17 @@ def __init__( pk: int, loop: Optional[asyncio.AbstractEventLoop] = None, poll_interval: Union[None, int, float] = None, - communicator: Optional[kiwipy.Communicator] = None, + coordinator: Optional[Coordinator] = None, ): """Construct a future for a process node being finished. If a None poll_interval is supplied polling will not be used. - If a communicator is supplied it will be used to listen for broadcast messages. + If a coordinator is supplied it will be used to listen for broadcast messages. :param pk: process pk :param loop: An event loop :param poll_interval: optional polling interval, if None, polling is not activated. - :param communicator: optional communicator, if None, will not subscribe to broadcasts. + :param coordinator: optional coordinator, if None, will not subscribe to broadcasts. """ from .process import ProcessState @@ -46,18 +47,18 @@ def __init__( loop = loop if loop is not None else asyncio.get_event_loop() super().__init__(loop=loop) - assert not (poll_interval is None and communicator is None), 'Must poll or have a communicator to use' + assert not (poll_interval is None and coordinator is None), 'Must poll or have a coordinator to use' node = load_node(pk=pk) if node.is_terminated: self.set_result(node) else: - self._communicator = communicator + self._coordinator = coordinator self.add_done_callback(lambda _: self.cleanup()) # Try setting up a filtered broadcast subscriber - if self._communicator is not None: + if self._coordinator is not None: def _subscriber(*args, **kwargs): if not self.done(): @@ -66,17 +67,17 @@ def _subscriber(*args, **kwargs): broadcast_filter = kiwipy.BroadcastFilter(_subscriber, sender=pk) for state in [ProcessState.FINISHED, ProcessState.KILLED, ProcessState.EXCEPTED]: broadcast_filter.add_subject_filter(f'state_changed.*.{state.value}') - self._broadcast_identifier = self._communicator.add_broadcast_subscriber(broadcast_filter) + self._broadcast_identifier = self._coordinator.add_broadcast_subscriber(broadcast_filter) # Start polling if poll_interval is not None: loop.create_task(self._poll_process(node, poll_interval)) def cleanup(self) -> None: - """Clean up the future by removing broadcast subscribers from the communicator if it still exists.""" - if self._communicator is not None: - self._communicator.remove_broadcast_subscriber(self._broadcast_identifier) - self._communicator = None + """Clean up the future by removing broadcast subscribers from the coordinator if it still exists.""" + if self._coordinator is not None: + self._coordinator.remove_broadcast_subscriber(self._broadcast_identifier) + self._coordinator = None self._broadcast_identifier = None async def _poll_process(self, node: Node, poll_interval: Union[int, float]) -> None: diff --git a/src/aiida/manage/manager.py b/src/aiida/manage/manager.py index 6cfc7153af..916589ccfb 100644 --- a/src/aiida/manage/manager.py +++ b/src/aiida/manage/manager.py @@ -326,24 +326,6 @@ def get_persister(self) -> 'AiiDAPersister': return self._persister - def get_communicator(self) -> 'RmqThreadCommunicator': - """Return the communicator - - :return: a global communicator instance - - """ - from aiida.common import ConfigurationError - - broker = self.get_broker() - - if broker is None: - assert self._profile is not None - raise ConfigurationError( - f'profile `{self._profile.name}` does not provide a communicator because it does not define a broker' - ) - - return broker.get_communicator() - def get_coordinator(self) -> 'Coordinator': """Return the coordinator diff --git a/tests/brokers/test_rabbitmq.py b/tests/brokers/test_rabbitmq.py index 2417d27748..58399a7e36 100644 --- a/tests/brokers/test_rabbitmq.py +++ b/tests/brokers/test_rabbitmq.py @@ -32,7 +32,7 @@ def raise_connection_error(): broker = manager.get_broker() assert 'RabbitMQ v' in str(broker) - monkeypatch.setattr(broker, 'get_communicator', raise_connection_error) + monkeypatch.setattr(broker, 'get_coordinator', raise_connection_error) assert 'RabbitMQ @' in str(broker) @@ -92,14 +92,14 @@ def test_communicator(url): RmqThreadCommunicator.connect(connection_params={'url': url}) -def test_add_rpc_subscriber(communicator): +def test_add_rpc_subscriber(coordinator): """Test ``add_rpc_subscriber``.""" - communicator.add_rpc_subscriber(None) + coordinator.add_rpc_subscriber(None) -def test_add_broadcast_subscriber(communicator): +def test_add_broadcast_subscriber(coordinator): """Test ``add_broadcast_subscriber``.""" - communicator.add_broadcast_subscriber(None) + coordinator.add_broadcast_subscriber(None) @pytest.mark.usefixtures('aiida_profile_clean') diff --git a/tests/conftest.py b/tests/conftest.py index 89b0a1bad7..acebc1cb7b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -32,6 +32,7 @@ from aiida.common.folders import Folder from aiida.common.links import LinkType from aiida.manage.configuration import Profile, get_config, load_profile +from aiida.manage.manager import Manager if t.TYPE_CHECKING: from aiida.manage.configuration.config import Config @@ -540,9 +541,9 @@ def backend(manager): @pytest.fixture -def communicator(manager): - """Get the ``Communicator`` instance of the currently loaded profile to communicate with RabbitMQ.""" - return manager.get_communicator() +def coordinator(manager: Manager): + """Get the ``Coordinator`` instance of the currently loaded profile to communicate with RabbitMQ.""" + return manager.get_coordinator() @pytest.fixture diff --git a/tests/engine/test_futures.py b/tests/engine/test_futures.py index b8ba78aa8f..194bcf60c5 100644 --- a/tests/engine/test_futures.py +++ b/tests/engine/test_futures.py @@ -31,7 +31,7 @@ def test_calculation_future_broadcasts(self): # No polling future = processes.futures.ProcessFuture( - pk=process.pid, loop=runner.loop, communicator=manager.get_communicator() + pk=process.pid, loop=runner.loop, coordinator=manager.get_coordinator() ) run(process) diff --git a/tests/manage/test_manager.py b/tests/manage/test_manager.py index 3a8f4949cf..9085c5d86b 100644 --- a/tests/manage/test_manager.py +++ b/tests/manage/test_manager.py @@ -21,11 +21,15 @@ def test_disconnect(): demonstrate the problematic behavior. Getting the communicator and then disconnecting it (through calling :meth:`aiida.manage.manager.Manager.reset_profile`) works fine. However, if a process is a run before closing it, for example running a calcfunction, the closing of the communicator will raise a ``TimeoutError``. + + The problem was solved by: + - https://github.com/aiidateam/aiida-core/pull/6672 + - https://github.com/mosquito/aiormq/pull/208 """ from aiida.manage import get_manager manager = get_manager() - manager.get_communicator() + _ = manager.get_coordinator() manager.reset_profile() # This returns just fine result, node = add_calcfunction.run_get_node(1) From c769906e81246d6398161bdc931f9d90185a2ec1 Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Sat, 21 Dec 2024 14:30:30 +0100 Subject: [PATCH 12/25] Controller snuck into broker --- environment.yml | 2 +- src/aiida/brokers/broker.py | 9 ++++++++- src/aiida/brokers/rabbitmq/broker.py | 9 ++++++++- src/aiida/engine/processes/process.py | 1 - src/aiida/engine/runners.py | 20 ++++++++++++-------- src/aiida/manage/manager.py | 16 +++++++--------- 6 files changed, 36 insertions(+), 21 deletions(-) diff --git a/environment.yml b/environment.yml index ad80dd3416..d38ea2a437 100644 --- a/environment.yml +++ b/environment.yml @@ -22,7 +22,7 @@ dependencies: - importlib-metadata~=6.0 - numpy~=1.21 - paramiko~=3.0 -- plumpy~=0.22.3 +- plumpy - pgsu~=0.3.0 - psutil~=5.6 - psycopg[binary]~=3.0 diff --git a/src/aiida/brokers/broker.py b/src/aiida/brokers/broker.py index 941c69833d..1259a9c43c 100644 --- a/src/aiida/brokers/broker.py +++ b/src/aiida/brokers/broker.py @@ -3,11 +3,13 @@ import abc import typing as t +from plumpy.controller import ProcessController if t.TYPE_CHECKING: - from aiida.manage.configuration.profile import Profile from plumpy.coordinator import Coordinator + from aiida.manage.configuration.profile import Profile + __all__ = ('Broker',) @@ -25,6 +27,11 @@ def __init__(self, profile: 'Profile') -> None: def get_coordinator(self) -> 'Coordinator': """Return an instance of coordinator.""" + @abc.abstractmethod + def get_controller(self) -> ProcessController: + """Return the process controller""" + ... + @abc.abstractmethod def iterate_tasks(self): """Return an iterator over the tasks in the launch queue.""" diff --git a/src/aiida/brokers/rabbitmq/broker.py b/src/aiida/brokers/rabbitmq/broker.py index 370afc6acf..0ed8bcd0d7 100644 --- a/src/aiida/brokers/rabbitmq/broker.py +++ b/src/aiida/brokers/rabbitmq/broker.py @@ -5,7 +5,9 @@ import functools import typing as t -from plumpy.rmq import RmqCoordinator +from plumpy.rmq import RemoteProcessThreadController, RmqCoordinator +from plumpy import ProcessController +from plumpy.rmq.process_control import RemoteProcessController from aiida.brokers.broker import Broker from aiida.common.log import AIIDA_LOGGER @@ -15,6 +17,7 @@ if t.TYPE_CHECKING: from kiwipy.rmq import RmqThreadCommunicator + from aiida.manage.configuration.profile import Profile LOGGER = AIIDA_LOGGER.getChild('broker.rabbitmq') @@ -61,6 +64,10 @@ def get_coordinator(self): return coordinator + def get_controller(self) -> ProcessController: + coordinator = self.get_coordinator() + return RemoteProcessThreadController(coordinator) + def _create_communicator(self) -> 'RmqThreadCommunicator': """Return an instance of :class:`kiwipy.Communicator`.""" from kiwipy.rmq import RmqThreadCommunicator diff --git a/src/aiida/engine/processes/process.py b/src/aiida/engine/processes/process.py index a678b115c7..1746cee932 100644 --- a/src/aiida/engine/processes/process.py +++ b/src/aiida/engine/processes/process.py @@ -43,7 +43,6 @@ # from kiwipy.communications import UnroutableError # from plumpy.processes import ConnectionClosed # type: ignore[attr-defined] from plumpy.process_states import Finished, ProcessState - from plumpy.processes import Process as PlumpyProcess from plumpy.utils import AttributesFrozendict diff --git a/src/aiida/engine/runners.py b/src/aiida/engine/runners.py index e1dd3c38f5..92a62c0712 100644 --- a/src/aiida/engine/runners.py +++ b/src/aiida/engine/runners.py @@ -27,6 +27,7 @@ from aiida.common import exceptions from aiida.orm import ProcessNode, load_node from aiida.plugins.utils import PluginVersionProvider +from aiida.brokers import Broker from . import transports, utils from .processes import Process, ProcessBuilder, ProcessState, futures @@ -64,7 +65,7 @@ def __init__( self, poll_interval: Union[int, float] = 0, loop: Optional[asyncio.AbstractEventLoop] = None, - coordinator: Optional[Coordinator] = None, + broker: Broker | None = None, broker_submit: bool = False, persister: Optional[Persister] = None, ): @@ -72,14 +73,14 @@ def __init__( :param poll_interval: interval in seconds between polling for status of active sub processes :param loop: an asyncio event loop, if none is suppled a new one will be created - :param coordinator: the coordinator to use + :param broker: the broker to use :param broker_submit: if True, processes will be submitted to the broker, otherwise they will be scheduled here :param persister: the persister to use to persist processes """ assert not ( broker_submit and persister is None - ), 'Must supply a persister if you want to submit using coordinator' + ), 'Must supply a persister if you want to submit using coordinator/broker' set_event_loop_policy() self._loop = loop or asyncio.get_event_loop() @@ -90,11 +91,14 @@ def __init__( self._persister = persister self._plugin_version_provider = PluginVersionProvider() - if coordinator is not None: - # FIXME: the wrap is not needed, when passed in, the coordinator should already wrapped - self._coordinator = wrap_communicator(coordinator.communicator, self._loop) - self._controller = RemoteProcessThreadController(coordinator) + # FIXME: broker and coordinator overlap the concept there for over-abstraction, remove the abstraction + if broker is not None: + _coordinator = broker.get_coordinator() + # FIXME: the wrap should not be needed + self._coordinator = wrap_communicator(_coordinator.communicator, self._loop) + self._controller = broker.get_controller() elif self._broker_submit: + # FIXME: if broker then broker_submit else False LOGGER.warning('Disabling broker submission, no coordinator provided') self._broker_submit = False @@ -350,7 +354,7 @@ def get_process_future(self, pk: int) -> futures.ProcessFuture: :return: A future representing the completion of the process node """ - return futures.ProcessFuture(pk, self._loop, self._poll_interval, self._coordinator) + return futures.ProcessFuture(pk, self._loop, self._poll_interval, self.coordinator) def _poll_process(self, node, callback): """Check whether the process state of the node is terminated and call the callback or reschedule it. diff --git a/src/aiida/manage/manager.py b/src/aiida/manage/manager.py index 916589ccfb..690051c73b 100644 --- a/src/aiida/manage/manager.py +++ b/src/aiida/manage/manager.py @@ -10,14 +10,12 @@ from __future__ import annotations +import asyncio from typing import TYPE_CHECKING, Any, Optional, Union -import asyncio -import kiwipy from plumpy.coordinator import Coordinator if TYPE_CHECKING: - from kiwipy.rmq import RmqThreadCommunicator from plumpy.process_comms import RemoteProcessThreadController from aiida.brokers.broker import Broker @@ -169,7 +167,7 @@ def reset_profile_storage(self) -> None: self._profile_storage = None def reset_broker(self) -> None: - """Reset the communicator.""" + """Reset the broker.""" from concurrent import futures if self._broker is not None: @@ -401,7 +399,7 @@ def create_runner( self, poll_interval: Union[int, float] | None = None, loop: Optional[asyncio.AbstractEventLoop] = None, - coordinator: Optional[Coordinator] = None, + broker: Broker | None = None, broker_submit: bool = False, persister: Optional[AiiDAPersister] = None, ) -> 'Runner': @@ -422,19 +420,19 @@ def create_runner( _default_poll_interval = 0.0 if profile.is_test_profile else self.get_option('runner.poll.interval') _default_broker_submit = False - _default_coordinator = self.get_coordinator() _default_persister = self.get_persister() + _default_broker = self.get_broker() runner = runners.Runner( poll_interval=poll_interval or _default_poll_interval, loop=loop or asyncio.get_event_loop(), - coordinator=coordinator or _default_coordinator, + broker=broker or _default_broker, broker_submit=broker_submit or _default_broker_submit, persister=persister or _default_persister, ) return runner - def create_daemon_runner(self, loop: Optional['asyncio.AbstractEventLoop'] = None) -> 'Runner': + def create_daemon_runner(self) -> 'Runner': """Create and return a new daemon runner. This is used by workers when the daemon is running and in testing. @@ -449,7 +447,7 @@ def create_daemon_runner(self, loop: Optional['asyncio.AbstractEventLoop'] = Non from aiida.engine import persistence from aiida.engine.processes.launcher import ProcessLauncher - runner = self.create_runner(broker_submit=True, loop=loop) + runner = self.create_runner(broker_submit=True, loop=None) runner_loop = runner.loop # Listen for incoming launch requests From 03f7a5bd243ac645ba23efec83999e1d0165b56a Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Fri, 27 Dec 2024 01:37:05 +0100 Subject: [PATCH 13/25] Construct and use RmqLooCoordinator directly --- src/aiida/brokers/rabbitmq/broker.py | 6 +- src/aiida/brokers/rabbitmq/coordinator.py | 86 +++++++++++++++++++++++ src/aiida/engine/runners.py | 4 +- tests/brokers/test_rabbitmq.py | 4 +- 4 files changed, 94 insertions(+), 6 deletions(-) create mode 100644 src/aiida/brokers/rabbitmq/coordinator.py diff --git a/src/aiida/brokers/rabbitmq/broker.py b/src/aiida/brokers/rabbitmq/broker.py index 0ed8bcd0d7..7bfcb2fec6 100644 --- a/src/aiida/brokers/rabbitmq/broker.py +++ b/src/aiida/brokers/rabbitmq/broker.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio import functools import typing as t @@ -10,6 +11,7 @@ from plumpy.rmq.process_control import RemoteProcessController from aiida.brokers.broker import Broker +from aiida.brokers.rabbitmq.coordinator import RmqLoopCoordinator from aiida.common.log import AIIDA_LOGGER from aiida.manage.configuration import get_config_option @@ -36,6 +38,8 @@ def __init__(self, profile: Profile) -> None: self._profile = profile self._communicator: 'RmqThreadCommunicator | None' = None self._prefix = f'aiida-{self._profile.uuid}' + # FIXME: ??? should make the event loop setable?? + self._loop = asyncio.get_event_loop() def __str__(self): try: @@ -60,7 +64,7 @@ def get_coordinator(self): # Check whether a compatible version of RabbitMQ is being used. self.check_rabbitmq_version() - coordinator = RmqCoordinator(self._communicator) + coordinator = RmqLoopCoordinator(self._communicator, self._loop) return coordinator diff --git a/src/aiida/brokers/rabbitmq/coordinator.py b/src/aiida/brokers/rabbitmq/coordinator.py new file mode 100644 index 0000000000..6c6a13c7e2 --- /dev/null +++ b/src/aiida/brokers/rabbitmq/coordinator.py @@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- +from asyncio import AbstractEventLoop +from typing import Generic, TypeVar, final +import kiwipy +import concurrent.futures + +from plumpy.exceptions import CoordinatorConnectionError +from plumpy.rmq.communications import convert_to_comm + +__all__ = ['RmqCoordinator'] + +U = TypeVar('U', bound=kiwipy.Communicator) + + +@final +class RmqLoopCoordinator(Generic[U]): + def __init__(self, comm: U, loop: AbstractEventLoop): + self._comm = comm + self._loop = loop + + @property + def communicator(self) -> U: + """The inner communicator.""" + return self._comm + + def add_rpc_subscriber(self, subscriber, identifier=None): + subscriber = convert_to_comm(subscriber, self._loop) + return self._comm.add_rpc_subscriber(subscriber, identifier) + + def add_broadcast_subscriber( + self, + subscriber, + subject_filters=None, + sender_filters=None, + identifier=None, + ): + subscriber = kiwipy.BroadcastFilter(subscriber) + + subject_filters = subject_filters or [] + sender_filters = sender_filters or [] + + for filter in subject_filters: + subscriber.add_subject_filter(filter) + for filter in sender_filters: + subscriber.add_sender_filter(filter) + + subscriber = convert_to_comm(subscriber, self._loop) + return self._comm.add_broadcast_subscriber(subscriber, identifier) + + def add_task_subscriber(self, subscriber, identifier=None): + subscriber = convert_to_comm(subscriber, self._loop) + return self._comm.add_task_subscriber(subscriber, identifier) + + def remove_rpc_subscriber(self, identifier): + return self._comm.remove_rpc_subscriber(identifier) + + def remove_broadcast_subscriber(self, identifier): + return self._comm.remove_broadcast_subscriber(identifier) + + def remove_task_subscriber(self, identifier): + return self._comm.remove_task_subscriber(identifier) + + def rpc_send(self, recipient_id, msg): + return self._comm.rpc_send(recipient_id, msg) + + def broadcast_send( + self, + body, + sender=None, + subject=None, + correlation_id=None, + ): + from aio_pika.exceptions import ChannelInvalidStateError, AMQPConnectionError + + try: + rsp = self._comm.broadcast_send(body, sender, subject, correlation_id) + except (ChannelInvalidStateError, AMQPConnectionError, concurrent.futures.TimeoutError) as exc: + raise CoordinatorConnectionError from exc + else: + return rsp + + def task_send(self, task, no_reply=False): + return self._comm.task_send(task, no_reply) + + def close(self): + self._comm.close() diff --git a/src/aiida/engine/runners.py b/src/aiida/engine/runners.py index 92a62c0712..cb14be2b85 100644 --- a/src/aiida/engine/runners.py +++ b/src/aiida/engine/runners.py @@ -93,9 +93,7 @@ def __init__( # FIXME: broker and coordinator overlap the concept there for over-abstraction, remove the abstraction if broker is not None: - _coordinator = broker.get_coordinator() - # FIXME: the wrap should not be needed - self._coordinator = wrap_communicator(_coordinator.communicator, self._loop) + self._coordinator = broker.get_coordinator() self._controller = broker.get_controller() elif self._broker_submit: # FIXME: if broker then broker_submit else False diff --git a/tests/brokers/test_rabbitmq.py b/tests/brokers/test_rabbitmq.py index 58399a7e36..5fb5c2bc5c 100644 --- a/tests/brokers/test_rabbitmq.py +++ b/tests/brokers/test_rabbitmq.py @@ -94,12 +94,12 @@ def test_communicator(url): def test_add_rpc_subscriber(coordinator): """Test ``add_rpc_subscriber``.""" - coordinator.add_rpc_subscriber(None) + coordinator.add_rpc_subscriber(lambda: None) def test_add_broadcast_subscriber(coordinator): """Test ``add_broadcast_subscriber``.""" - coordinator.add_broadcast_subscriber(None) + coordinator.add_broadcast_subscriber(lambda: None) @pytest.mark.usefixtures('aiida_profile_clean') From 02a939e6f1b119def22e0e1474622e3d5eefe70a Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Fri, 27 Dec 2024 02:39:49 +0100 Subject: [PATCH 14/25] Separate create_broker and get_broker where get won't change state --- src/aiida/brokers/broker.py | 14 ++++++++------ src/aiida/brokers/rabbitmq/broker.py | 12 +++++++++--- src/aiida/cmdline/utils/decorators.py | 4 +++- src/aiida/manage/manager.py | 14 ++++++++++---- 4 files changed, 30 insertions(+), 14 deletions(-) diff --git a/src/aiida/brokers/broker.py b/src/aiida/brokers/broker.py index 1259a9c43c..388cd7e510 100644 --- a/src/aiida/brokers/broker.py +++ b/src/aiida/brokers/broker.py @@ -13,17 +13,19 @@ __all__ = ('Broker',) +# FIXME: make me a protocol class Broker: """Interface for a message broker that facilitates communication with and between process runners.""" - def __init__(self, profile: 'Profile') -> None: - """Construct a new instance. - - :param profile: The profile. - """ - self._profile = profile + # def __init__(self, profile: 'Profile') -> None: + # """Construct a new instance. + # + # :param profile: The profile. + # """ + # self._profile = profile @abc.abstractmethod + # FIXME: make me a property def get_coordinator(self) -> 'Coordinator': """Return an instance of coordinator.""" diff --git a/src/aiida/brokers/rabbitmq/broker.py b/src/aiida/brokers/rabbitmq/broker.py index 7bfcb2fec6..103720c83f 100644 --- a/src/aiida/brokers/rabbitmq/broker.py +++ b/src/aiida/brokers/rabbitmq/broker.py @@ -30,7 +30,7 @@ class RabbitmqBroker(Broker): """Implementation of the message broker interface using RabbitMQ through ``kiwipy``.""" - def __init__(self, profile: Profile) -> None: + def __init__(self, profile: Profile, loop=None) -> None: """Construct a new instance. :param profile: The profile. @@ -38,8 +38,8 @@ def __init__(self, profile: Profile) -> None: self._profile = profile self._communicator: 'RmqThreadCommunicator | None' = None self._prefix = f'aiida-{self._profile.uuid}' - # FIXME: ??? should make the event loop setable?? - self._loop = asyncio.get_event_loop() + self._coordinator = None + self._loop = loop or asyncio.get_event_loop() def __str__(self): try: @@ -59,6 +59,12 @@ def iterate_tasks(self): yield task def get_coordinator(self): + if self._coordinator is not None: + return self._coordinator + + return self.create_coordinator() + + def create_coordinator(self): if self._communicator is None: self._communicator = self._create_communicator() # Check whether a compatible version of RabbitMQ is being used. diff --git a/src/aiida/cmdline/utils/decorators.py b/src/aiida/cmdline/utils/decorators.py index c5fdf826ca..595ce8373d 100644 --- a/src/aiida/cmdline/utils/decorators.py +++ b/src/aiida/cmdline/utils/decorators.py @@ -18,6 +18,7 @@ """ +import asyncio from contextlib import contextmanager from click_spinner import spinner @@ -325,7 +326,8 @@ def start_daemon(): assert profile is not None - if manager.get_broker() is None: + loop = asyncio.get_event_loop() + if manager.create_broker(loop) is None: echo.echo_critical( f'profile `{profile.name}` does not define a broker and so cannot use this functionality.' f'See {URL_NO_BROKER} for more details.' diff --git a/src/aiida/manage/manager.py b/src/aiida/manage/manager.py index 690051c73b..e9bf5c17f0 100644 --- a/src/aiida/manage/manager.py +++ b/src/aiida/manage/manager.py @@ -285,7 +285,10 @@ def get_profile_storage(self) -> 'StorageBackend': return self._profile_storage - def get_broker(self) -> 'Broker' | None: + def get_broker(self) -> 'Broker | None': + return self._broker + + def create_broker(self, loop) -> 'Broker | None': """Return an instance of :class:`aiida.brokers.broker.Broker` if the profile defines a broker. :returns: The broker of the profile, or ``None`` if the profile doesn't define one. @@ -307,7 +310,7 @@ def get_broker(self) -> 'Broker' | None: entry_point = 'core.rabbitmq' broker_cls = BrokerFactory(entry_point) - self._broker = broker_cls(self._profile) + self._broker = broker_cls(self._profile, loop) return self._broker @@ -421,11 +424,14 @@ def create_runner( _default_poll_interval = 0.0 if profile.is_test_profile else self.get_option('runner.poll.interval') _default_broker_submit = False _default_persister = self.get_persister() - _default_broker = self.get_broker() + _default_loop = asyncio.get_event_loop() + + loop = loop or _default_loop + _default_broker = self.create_broker(loop) runner = runners.Runner( poll_interval=poll_interval or _default_poll_interval, - loop=loop or asyncio.get_event_loop(), + loop=loop, broker=broker or _default_broker, broker_submit=broker_submit or _default_broker_submit, persister=persister or _default_persister, From 329c51cd1b2e282c43735152234102ad4e4a7f15 Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Fri, 27 Dec 2024 03:36:59 +0100 Subject: [PATCH 15/25] Keep on mess up with coordinator loop When calling add_rpc_subscriber and add_task_subscriber, the event loop of caller may from random event loop. But the target event loop is the runner one. Therefore it requires to pass the loop to the broker when creating the runner and runner's broker. --- src/aiida/brokers/rabbitmq/coordinator.py | 5 +++++ src/aiida/cmdline/utils/decorators.py | 3 +-- src/aiida/engine/runners.py | 4 ++++ src/aiida/manage/manager.py | 10 +++++++--- 4 files changed, 17 insertions(+), 5 deletions(-) diff --git a/src/aiida/brokers/rabbitmq/coordinator.py b/src/aiida/brokers/rabbitmq/coordinator.py index 6c6a13c7e2..58ce21ecee 100644 --- a/src/aiida/brokers/rabbitmq/coordinator.py +++ b/src/aiida/brokers/rabbitmq/coordinator.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- from asyncio import AbstractEventLoop +import asyncio from typing import Generic, TypeVar, final import kiwipy import concurrent.futures @@ -84,3 +85,7 @@ def task_send(self, task, no_reply=False): def close(self): self._comm.close() + + def is_closed(self) -> bool: + """Return `True` if the communicator was closed""" + return self._comm.is_closed() diff --git a/src/aiida/cmdline/utils/decorators.py b/src/aiida/cmdline/utils/decorators.py index 595ce8373d..1cd251493e 100644 --- a/src/aiida/cmdline/utils/decorators.py +++ b/src/aiida/cmdline/utils/decorators.py @@ -326,8 +326,7 @@ def start_daemon(): assert profile is not None - loop = asyncio.get_event_loop() - if manager.create_broker(loop) is None: + if manager.get_broker() is None: echo.echo_critical( f'profile `{profile.name}` does not define a broker and so cannot use this functionality.' f'See {URL_NO_BROKER} for more details.' diff --git a/src/aiida/engine/runners.py b/src/aiida/engine/runners.py index cb14be2b85..007ae3770e 100644 --- a/src/aiida/engine/runners.py +++ b/src/aiida/engine/runners.py @@ -92,9 +92,13 @@ def __init__( self._plugin_version_provider = PluginVersionProvider() # FIXME: broker and coordinator overlap the concept there for over-abstraction, remove the abstraction + # Broker should always create inside runner? since they should share the loop. if broker is not None: self._coordinator = broker.get_coordinator() self._controller = broker.get_controller() + + # FIXME: why with wrapper, the pending task not exist?? + # self._coordinator = wrap_communicator(broker.get_coordinator().communicator, self._loop) elif self._broker_submit: # FIXME: if broker then broker_submit else False LOGGER.warning('Disabling broker submission, no coordinator provided') diff --git a/src/aiida/manage/manager.py b/src/aiida/manage/manager.py index e9bf5c17f0..ca46d0894f 100644 --- a/src/aiida/manage/manager.py +++ b/src/aiida/manage/manager.py @@ -286,9 +286,13 @@ def get_profile_storage(self) -> 'StorageBackend': return self._profile_storage def get_broker(self) -> 'Broker | None': - return self._broker + if self._broker is not None: + return self._broker + + _default_loop = asyncio.get_event_loop() + return self._create_broker(_default_loop) - def create_broker(self, loop) -> 'Broker | None': + def _create_broker(self, loop) -> 'Broker | None': """Return an instance of :class:`aiida.brokers.broker.Broker` if the profile defines a broker. :returns: The broker of the profile, or ``None`` if the profile doesn't define one. @@ -427,7 +431,7 @@ def create_runner( _default_loop = asyncio.get_event_loop() loop = loop or _default_loop - _default_broker = self.create_broker(loop) + _default_broker = self._create_broker(loop) runner = runners.Runner( poll_interval=poll_interval or _default_poll_interval, From 82f14dd8f0a0a398073a16602f68222a0965af62 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 27 Dec 2024 13:29:57 +0000 Subject: [PATCH 16/25] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/aiida/brokers/broker.py | 1 - src/aiida/brokers/rabbitmq/broker.py | 3 +-- src/aiida/brokers/rabbitmq/coordinator.py | 8 +++----- src/aiida/cmdline/utils/decorators.py | 1 - src/aiida/engine/runners.py | 4 ++-- 5 files changed, 6 insertions(+), 11 deletions(-) diff --git a/src/aiida/brokers/broker.py b/src/aiida/brokers/broker.py index 388cd7e510..7f719f0760 100644 --- a/src/aiida/brokers/broker.py +++ b/src/aiida/brokers/broker.py @@ -8,7 +8,6 @@ if t.TYPE_CHECKING: from plumpy.coordinator import Coordinator - from aiida.manage.configuration.profile import Profile __all__ = ('Broker',) diff --git a/src/aiida/brokers/rabbitmq/broker.py b/src/aiida/brokers/rabbitmq/broker.py index 103720c83f..dc9af4acd5 100644 --- a/src/aiida/brokers/rabbitmq/broker.py +++ b/src/aiida/brokers/rabbitmq/broker.py @@ -6,9 +6,8 @@ import functools import typing as t -from plumpy.rmq import RemoteProcessThreadController, RmqCoordinator from plumpy import ProcessController -from plumpy.rmq.process_control import RemoteProcessController +from plumpy.rmq import RemoteProcessThreadController from aiida.brokers.broker import Broker from aiida.brokers.rabbitmq.coordinator import RmqLoopCoordinator diff --git a/src/aiida/brokers/rabbitmq/coordinator.py b/src/aiida/brokers/rabbitmq/coordinator.py index 58ce21ecee..a0f2fdf02f 100644 --- a/src/aiida/brokers/rabbitmq/coordinator.py +++ b/src/aiida/brokers/rabbitmq/coordinator.py @@ -1,10 +1,8 @@ -# -*- coding: utf-8 -*- +import concurrent.futures from asyncio import AbstractEventLoop -import asyncio from typing import Generic, TypeVar, final -import kiwipy -import concurrent.futures +import kiwipy from plumpy.exceptions import CoordinatorConnectionError from plumpy.rmq.communications import convert_to_comm @@ -71,7 +69,7 @@ def broadcast_send( subject=None, correlation_id=None, ): - from aio_pika.exceptions import ChannelInvalidStateError, AMQPConnectionError + from aio_pika.exceptions import AMQPConnectionError, ChannelInvalidStateError try: rsp = self._comm.broadcast_send(body, sender, subject, correlation_id) diff --git a/src/aiida/cmdline/utils/decorators.py b/src/aiida/cmdline/utils/decorators.py index 1cd251493e..c5fdf826ca 100644 --- a/src/aiida/cmdline/utils/decorators.py +++ b/src/aiida/cmdline/utils/decorators.py @@ -18,7 +18,6 @@ """ -import asyncio from contextlib import contextmanager from click_spinner import spinner diff --git a/src/aiida/engine/runners.py b/src/aiida/engine/runners.py index 007ae3770e..768d9fd569 100644 --- a/src/aiida/engine/runners.py +++ b/src/aiida/engine/runners.py @@ -22,12 +22,12 @@ from plumpy.coordinator import Coordinator from plumpy.events import reset_event_loop_policy, set_event_loop_policy from plumpy.persistence import Persister -from plumpy.rmq import RemoteProcessThreadController, wrap_communicator +from plumpy.rmq import RemoteProcessThreadController +from aiida.brokers import Broker from aiida.common import exceptions from aiida.orm import ProcessNode, load_node from aiida.plugins.utils import PluginVersionProvider -from aiida.brokers import Broker from . import transports, utils from .processes import Process, ProcessBuilder, ProcessState, futures From 2c00fb76beed318ebbe5b78241cc1497108163c2 Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Sat, 28 Dec 2024 01:09:21 +0100 Subject: [PATCH 17/25] coordinator decouple usage in process.py --- src/aiida/engine/processes/process.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/aiida/engine/processes/process.py b/src/aiida/engine/processes/process.py index 1746cee932..72480c01e7 100644 --- a/src/aiida/engine/processes/process.py +++ b/src/aiida/engine/processes/process.py @@ -173,12 +173,13 @@ def __init__( from aiida.manage import manager self._runner = runner if runner is not None else manager.get_manager().get_runner() + _coordinator = manager.get_manager().get_coordinator() super().__init__( inputs=self.spec().inputs.serialize(inputs), logger=logger, loop=self._runner.loop, - coordinator=self._runner.coordinator, + coordinator=_coordinator, ) self._node: Optional[orm.ProcessNode] = None @@ -318,7 +319,9 @@ def load_instance_state( else: self._runner = manager.get_manager().get_runner() - load_context = load_context.copyextend(loop=self._runner.loop, coordinator=self._runner.coordinator) + _coordinator = manager.get_manager().get_coordinator() + + load_context = load_context.copyextend(loop=self._runner.loop, coordinator=_coordinator) super().load_instance_state(saved_state, load_context) if self.SaveKeys.CALC_ID.value in saved_state: From 185765680e805e22dc1c13de0d3227c8474bfbe6 Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Sat, 28 Dec 2024 01:10:07 +0100 Subject: [PATCH 18/25] Move create_daemon_runner to daemon worker.py Instead of as a method of runner which is not needed and confuse. --- src/aiida/engine/daemon/worker.py | 33 ++++++++++++++++++++++++++++++- src/aiida/manage/manager.py | 31 ----------------------------- 2 files changed, 32 insertions(+), 32 deletions(-) diff --git a/src/aiida/engine/daemon/worker.py b/src/aiida/engine/daemon/worker.py index 913e44d9b7..bd0d5aaa0b 100644 --- a/src/aiida/engine/daemon/worker.py +++ b/src/aiida/engine/daemon/worker.py @@ -17,6 +17,7 @@ from aiida.engine.daemon.client import get_daemon_client from aiida.engine.runners import Runner from aiida.manage import get_config_option, get_manager +from aiida.manage.manager import Manager LOGGER = logging.getLogger(__name__) @@ -36,6 +37,36 @@ async def shutdown_worker(runner: Runner) -> None: LOGGER.info('Daemon worker stopped') +def create_daemon_runner(manager: Manager) -> 'Runner': + """Create and return a new daemon runner. + + This is used by workers when the daemon is running and in testing. + + :param loop: the (optional) asyncio event loop to use + + :return: a runner configured to work in the daemon configuration + + """ + from plumpy.persistence import LoadSaveContext + + from aiida.engine import persistence + from aiida.engine.processes.launcher import ProcessLauncher + + runner = manager.create_runner(broker_submit=True, loop=None) + runner_loop = runner.loop + + # Listen for incoming launch requests + task_receiver = ProcessLauncher( + loop=runner_loop, + persister=manager.get_persister(), + load_context=LoadSaveContext(runner=runner), + loader=persistence.get_object_loader(), + ) + + assert runner.coordinator is not None, 'coordinator not set for runner' + runner.coordinator.add_task_subscriber(task_receiver) + + return runner def start_daemon_worker(foreground: bool = False) -> None: """Start a daemon worker for the currently configured profile. @@ -51,7 +82,7 @@ def start_daemon_worker(foreground: bool = False) -> None: try: manager = get_manager() - runner = manager.create_daemon_runner() + runner = create_daemon_runner(manager) manager.set_runner(runner) except Exception: LOGGER.exception('daemon worker failed to start') diff --git a/src/aiida/manage/manager.py b/src/aiida/manage/manager.py index ca46d0894f..fe67e7102a 100644 --- a/src/aiida/manage/manager.py +++ b/src/aiida/manage/manager.py @@ -442,37 +442,6 @@ def create_runner( ) return runner - def create_daemon_runner(self) -> 'Runner': - """Create and return a new daemon runner. - - This is used by workers when the daemon is running and in testing. - - :param loop: the (optional) asyncio event loop to use - - :return: a runner configured to work in the daemon configuration - - """ - from plumpy.persistence import LoadSaveContext - - from aiida.engine import persistence - from aiida.engine.processes.launcher import ProcessLauncher - - runner = self.create_runner(broker_submit=True, loop=None) - runner_loop = runner.loop - - # Listen for incoming launch requests - task_receiver = ProcessLauncher( - loop=runner_loop, - persister=self.get_persister(), - load_context=LoadSaveContext(runner=runner), - loader=persistence.get_object_loader(), - ) - - assert runner.coordinator is not None, 'coordinator not set for runner' - runner.coordinator.add_task_subscriber(task_receiver) - - return runner - def check_version(self): """Check the currently installed version of ``aiida-core`` and warn if it is a post release development version. From 01f92f7c04324a400b208e264f4b6fed106de437 Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Sat, 28 Dec 2024 01:14:13 +0100 Subject: [PATCH 19/25] Remove outer usage of runner.coordinator --- src/aiida/engine/daemon/worker.py | 5 +++-- src/aiida/engine/runners.py | 15 +++++---------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/src/aiida/engine/daemon/worker.py b/src/aiida/engine/daemon/worker.py index bd0d5aaa0b..f461a4ba39 100644 --- a/src/aiida/engine/daemon/worker.py +++ b/src/aiida/engine/daemon/worker.py @@ -63,8 +63,9 @@ def create_daemon_runner(manager: Manager) -> 'Runner': loader=persistence.get_object_loader(), ) - assert runner.coordinator is not None, 'coordinator not set for runner' - runner.coordinator.add_task_subscriber(task_receiver) + coordinator = manager.get_coordinator() + assert coordinator is not None, 'coordinator not set for runner' + coordinator.add_task_subscriber(task_receiver) return runner diff --git a/src/aiida/engine/runners.py b/src/aiida/engine/runners.py index 768d9fd569..e0dfad2a86 100644 --- a/src/aiida/engine/runners.py +++ b/src/aiida/engine/runners.py @@ -124,11 +124,6 @@ def persister(self) -> Optional[Persister]: """Get the persister used by this runner.""" return self._persister - @property - def coordinator(self) -> Optional[Coordinator]: - """Get the coordinator used by this runner.""" - return self._coordinator - @property def plugin_version_provider(self) -> PluginVersionProvider: return self._plugin_version_provider @@ -337,16 +332,16 @@ def inline_callback(event, *args, **kwargs): callback() finally: event.set() - if self.coordinator: - self.coordinator.remove_broadcast_subscriber(subscriber_identifier) + if self._coordinator: + self._coordinator.remove_broadcast_subscriber(subscriber_identifier) broadcast_filter = kiwipy.BroadcastFilter(functools.partial(inline_callback, event), sender=pk) for state in [ProcessState.FINISHED, ProcessState.KILLED, ProcessState.EXCEPTED]: broadcast_filter.add_subject_filter(f'state_changed.*.{state.value}') - if self.coordinator: + if self._coordinator: LOGGER.info('adding subscriber for broadcasts of %d', pk) - self.coordinator.add_broadcast_subscriber(broadcast_filter, subscriber_identifier) + self._coordinator.add_broadcast_subscriber(broadcast_filter, subscriber_identifier) self._poll_process(node, functools.partial(inline_callback, event)) def get_process_future(self, pk: int) -> futures.ProcessFuture: @@ -356,7 +351,7 @@ def get_process_future(self, pk: int) -> futures.ProcessFuture: :return: A future representing the completion of the process node """ - return futures.ProcessFuture(pk, self._loop, self._poll_interval, self.coordinator) + return futures.ProcessFuture(pk, self._loop, self._poll_interval, self._coordinator) def _poll_process(self, node, callback): """Check whether the process state of the node is terminated and call the callback or reschedule it. From 69db549c1d257363e0c625a6370cbd591c2358a7 Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Sat, 28 Dec 2024 01:35:59 +0100 Subject: [PATCH 20/25] Start runner in a dedicated thread --- src/aiida/engine/daemon/worker.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/aiida/engine/daemon/worker.py b/src/aiida/engine/daemon/worker.py index f461a4ba39..c9694d6990 100644 --- a/src/aiida/engine/daemon/worker.py +++ b/src/aiida/engine/daemon/worker.py @@ -12,6 +12,7 @@ import logging import signal import sys +import threading from aiida.common.log import configure_logging from aiida.engine.daemon.client import get_daemon_client @@ -98,9 +99,13 @@ def start_daemon_worker(foreground: bool = False) -> None: # https://github.com/python/mypy/issues/12557 runner.loop.add_signal_handler(s, lambda s=s: asyncio.create_task(shutdown_worker(runner))) # type: ignore[misc] + # XXX: check the threading use is elegantly implemented: e.g. log handle, error handle, shutdown handle. + LOGGER.info('Starting a daemon worker') + runner_thread = threading.Thread(target=runner.start, daemon=True) + runner_thread.start() + try: - LOGGER.info('Starting a daemon worker') - runner.start() + runner_thread.join() except SystemError as exception: LOGGER.info('Received a SystemError: %s', exception) runner.close() From a078400a74b47eb998bc49f67e9498ca6cb0f12c Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Sun, 29 Dec 2024 01:05:40 +0100 Subject: [PATCH 21/25] Exclude .python-version of pyenv from gitignore --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index a4fdd01ebc..ca6cb23866 100644 --- a/.gitignore +++ b/.gitignore @@ -43,3 +43,6 @@ pplot_out/ # docker docker-bake.override.json + +# pyenv +.python-version From 5746ae80d87053a5f75b84690fad9cc4dbcf1337 Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Sun, 29 Dec 2024 01:45:09 +0100 Subject: [PATCH 22/25] find run_task not await coro crux --- src/aiida/brokers/rabbitmq/coordinator.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/src/aiida/brokers/rabbitmq/coordinator.py b/src/aiida/brokers/rabbitmq/coordinator.py index a0f2fdf02f..2cea693d0e 100644 --- a/src/aiida/brokers/rabbitmq/coordinator.py +++ b/src/aiida/brokers/rabbitmq/coordinator.py @@ -33,15 +33,19 @@ def add_broadcast_subscriber( sender_filters=None, identifier=None, ): - subscriber = kiwipy.BroadcastFilter(subscriber) - - subject_filters = subject_filters or [] - sender_filters = sender_filters or [] - - for filter in subject_filters: - subscriber.add_subject_filter(filter) - for filter in sender_filters: - subscriber.add_sender_filter(filter) + # XXX: this change behavior of create_task when decide whether the broadcast is_filtered. + # Need to understand the BroadcastFilter and make the improvement. + # To manifest the issue of run_task not await, run twice 'test_launch.py::test_submit_wait'. + + # subscriber = kiwipy.BroadcastFilter(subscriber) + # + # subject_filters = subject_filters or [] + # sender_filters = sender_filters or [] + # + # for filter in subject_filters: + # subscriber.add_subject_filter(filter) + # for filter in sender_filters: + # subscriber.add_sender_filter(filter) subscriber = convert_to_comm(subscriber, self._loop) return self._comm.add_broadcast_subscriber(subscriber, identifier) From d62816e58bb16bbe1540b1b6d09e5a834b036ddc Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Sun, 29 Dec 2024 23:59:48 +0100 Subject: [PATCH 23/25] Revert start runner in thread since the close operation not well handled --- src/aiida/engine/daemon/worker.py | 11 ++++++++--- tests/cmdline/commands/test_rabbitmq.py | 2 +- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/aiida/engine/daemon/worker.py b/src/aiida/engine/daemon/worker.py index c9694d6990..ec759fce40 100644 --- a/src/aiida/engine/daemon/worker.py +++ b/src/aiida/engine/daemon/worker.py @@ -100,12 +100,17 @@ def start_daemon_worker(foreground: bool = False) -> None: runner.loop.add_signal_handler(s, lambda s=s: asyncio.create_task(shutdown_worker(runner))) # type: ignore[misc] # XXX: check the threading use is elegantly implemented: e.g. log handle, error handle, shutdown handle. + # it should work and it is better to have runner has its own event loop to handle the aiida processes only. + # however, it randomly fail some test because of resources not elegantly handled. + # The problem is the runner running in thread is not closed when thread join, the join should be the shutdown operation. + LOGGER.info('Starting a daemon worker') - runner_thread = threading.Thread(target=runner.start, daemon=True) - runner_thread.start() + # runner_thread = threading.Thread(target=runner.start, daemon=False) + # runner_thread.start() try: - runner_thread.join() + runner.start() + # runner_thread.join() except SystemError as exception: LOGGER.info('Received a SystemError: %s', exception) runner.close() diff --git a/tests/cmdline/commands/test_rabbitmq.py b/tests/cmdline/commands/test_rabbitmq.py index a60630bbbb..b0f304459d 100644 --- a/tests/cmdline/commands/test_rabbitmq.py +++ b/tests/cmdline/commands/test_rabbitmq.py @@ -69,7 +69,7 @@ def test_tasks_revive_without_daemon(run_cli_command): assert run_cli_command(cmd_rabbitmq.cmd_tasks_revive, raises=True) -@pytest.mark.usefixtures('started_daemon_client') +@pytest.mark.usefixtures('aiida_profile_clean') def test_revive(run_cli_command, monkeypatch, aiida_code_installed, submit_and_await): """Test ``tasks revive``.""" code = aiida_code_installed(default_calc_job_plugin='core.arithmetic.add', filepath_executable='/bin/bash') From 597cef128315519363febcdbff2a5d884fa1f694 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 30 Dec 2024 00:16:16 +0000 Subject: [PATCH 24/25] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/aiida/engine/daemon/worker.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/aiida/engine/daemon/worker.py b/src/aiida/engine/daemon/worker.py index ec759fce40..7455c4dcc9 100644 --- a/src/aiida/engine/daemon/worker.py +++ b/src/aiida/engine/daemon/worker.py @@ -12,7 +12,6 @@ import logging import signal import sys -import threading from aiida.common.log import configure_logging from aiida.engine.daemon.client import get_daemon_client @@ -38,6 +37,7 @@ async def shutdown_worker(runner: Runner) -> None: LOGGER.info('Daemon worker stopped') + def create_daemon_runner(manager: Manager) -> 'Runner': """Create and return a new daemon runner. @@ -70,6 +70,7 @@ def create_daemon_runner(manager: Manager) -> 'Runner': return runner + def start_daemon_worker(foreground: bool = False) -> None: """Start a daemon worker for the currently configured profile. From 0cee2f23a8952ee0d337b3e2f0cfc7f109c4b005 Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Mon, 30 Dec 2024 02:29:23 +0100 Subject: [PATCH 25/25] note for profile close issue that may not root from aiormq --- tests/manage/test_manager.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/manage/test_manager.py b/tests/manage/test_manager.py index 9085c5d86b..d0b82eb7a9 100644 --- a/tests/manage/test_manager.py +++ b/tests/manage/test_manager.py @@ -25,6 +25,7 @@ def test_disconnect(): The problem was solved by: - https://github.com/aiidateam/aiida-core/pull/6672 - https://github.com/mosquito/aiormq/pull/208 + # XXX: this may wrong, because in the new combination of rmq-out, problem solved without the change in aiormq """ from aiida.manage import get_manager