From 1f295d2ad53b137b8043d32870e05a47c69d85e8 Mon Sep 17 00:00:00 2001 From: Furkan Date: Thu, 10 Oct 2024 22:34:08 +0300 Subject: [PATCH] feat: further the messaging of daq jobs - add `message_in` and `message_out` - handle generic messages in base model - let it crash! make the daemon restart the DAQJob's, not the DAQJob itself. --- src/daq/caen/n1081b.py | 55 +++++++++++++++++------------------------- src/daq/models.py | 40 +++++++++++++++++++++++++----- src/test_entrypoint.py | 15 ++++++++---- 3 files changed, 66 insertions(+), 44 deletions(-) diff --git a/src/daq/caen/n1081b.py b/src/daq/caen/n1081b.py index d82a79b..c220ffe 100644 --- a/src/daq/caen/n1081b.py +++ b/src/daq/caen/n1081b.py @@ -4,7 +4,7 @@ from N1081B import N1081B from websocket import WebSocket -from daq.models import DAQJob, DAQJobConfig +from daq.models import DAQJob, DAQJobConfig, DAQJobMessage N1081B_QUERY_INTERVAL_SECONDS = 1 @@ -30,49 +30,38 @@ def __init__(self, config: DAQN1081BConfig): if section not in N1081B.Section.__members__: raise Exception(f"Invalid section: {section}") + def handle_message(self, message: DAQJobMessage): + super().handle_message(message) + + # Do not handle the rest of the messages if the connection is not established + if not self._is_connected(): + return False + def start(self): while True: - # Try to connect to the device - if not self._try_connect(): - self._logger.error("Connection failed, retrying") - continue + self.consume() - self._start_loop() + # Log in if not connected + if not self._is_connected(): + self._logger.error("Connecting to the device...") + self._connect_to_device() - def _start_loop(self): - while True: - if self._should_stop: - return True - - # Stop if the connection is dropped - if isinstance(self.device.ws, WebSocket) and not self.device.ws.connected: - self._logger.error("Connection dropped") - break - - try: - self._loop() - except ConnectionResetError: - self._logger.error("Connection reset") - break - except ConnectionAbortedError: - self._logger.error("Connection aborted") - break + # Poll sections + self._poll_sections() time.sleep(N1081B_QUERY_INTERVAL_SECONDS) - def _try_connect(self) -> bool: - try: - if not self.device.connect(): - return False - except ConnectionRefusedError: - return False + def _is_connected(self) -> bool: + return isinstance(self.device.ws, WebSocket) and self.device.ws.connected + + def _connect_to_device(self): + if not self.device.connect(): + raise Exception("Connection failed") if not self.device.login(self.config.password): raise Exception("Login failed") - return True - - def _loop(self): + def _poll_sections(self): for section in self.config.sections_to_store: section = N1081B.Section[section] diff --git a/src/daq/models.py b/src/daq/models.py index d0e4bf7..34c7362 100644 --- a/src/daq/models.py +++ b/src/daq/models.py @@ -1,6 +1,7 @@ import logging import threading from dataclasses import dataclass +from queue import Queue from typing import Any from dataclasses_json import DataClassJsonMixin @@ -14,20 +15,32 @@ class DAQJobConfig(DataClassJsonMixin): class DAQJob: config_type: Any config: Any + message_in: Queue["DAQJobMessage"] + message_out: Queue["DAQJobMessage"] + _logger: logging.Logger - _should_stop: bool def __init__(self, config: Any): self.config = config + self.message_in = Queue() + self.message_out = Queue() self._logger = logging.getLogger(type(self).__name__) self._should_stop = False - def start(self): - pass + def consume(self): + # consume messages from the queue + while not self.message_in.empty(): + message = self.message_in.get() + if not self.handle_message(message): + self.message_in.put_nowait(message) + + def handle_message(self, message: "DAQJobMessage") -> bool: + if isinstance(message, DAQJobMessageStop): + raise DAQJobStopError(message.reason) + return True - def stop(self): - assert not self._should_stop, "DAQ job is already stopped" - self._should_stop = True + def start(self): + raise NotImplementedError def __del__(self): self._logger.info("DAQ job is being deleted") @@ -37,3 +50,18 @@ def __del__(self): class DAQJobThread: daq_job: DAQJob thread: threading.Thread + + +@dataclass +class DAQJobMessage: + pass + + +@dataclass +class DAQJobMessageStop(DAQJobMessage): + reason: str + + +class DAQJobStopError(Exception): + def __init__(self, reason: str): + self.reason = reason diff --git a/src/test_entrypoint.py b/src/test_entrypoint.py index fc4c81a..e2d3aa9 100644 --- a/src/test_entrypoint.py +++ b/src/test_entrypoint.py @@ -1,12 +1,13 @@ import logging import time -from daq.daq_job import load_daq_jobs, start_daq_jobs +from daq.daq_job import load_daq_jobs, start_daq_job, start_daq_jobs +from daq.models import DAQJobMessageStop from daq.store.models import DAQJobStore logging.basicConfig( level=logging.DEBUG, - format="[%(asctime)s] [%(name)s.%(funcName)s:%(lineno)d] %(levelname)s: %(message)s", + format="[%(asctime)s] [%(name)s] %(levelname)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) @@ -20,8 +21,12 @@ logging.warning("No store job found, data will not be stored") while True: - any_thread_alive = any(t.thread.is_alive() for t in daq_job_threads) - if not any_thread_alive: - break + dead_threads = [t for t in daq_job_threads if not t.thread.is_alive()] + # Clean up dead threads + daq_job_threads = [t for t in daq_job_threads if t not in dead_threads] + + # Restart jobs that have stopped + for thread in dead_threads: + daq_job_threads.append(start_daq_job(thread.daq_job)) time.sleep(1)