From d105ee95e8d903b925fcf6658d17358a69c7a280 Mon Sep 17 00:00:00 2001 From: Furkan Date: Fri, 11 Oct 2024 10:18:10 +0300 Subject: [PATCH] feat: finished csv store, added test DAQJob --- configs/store_csv.example | 1 + configs/test.toml.example | 8 ++++++++ src/daq/caen/n1081b.py | 22 ++++++++++++++++----- src/daq/daq_job.py | 21 ++++++++++++++------ src/daq/store/base.py | 19 ++++++++++++------- src/daq/store/csv.py | 19 ++++++++++--------- src/daq/store/models.py | 8 +++++--- src/daq/store/types.py | 3 +++ src/daq/test_job.py | 40 +++++++++++++++++++++++++++++++++++++++ src/daq/types.py | 10 ++++++++++ src/test_csv.py | 19 +++++++++++-------- src/test_entrypoint.py | 24 ++++++++++++++++++++++- 12 files changed, 155 insertions(+), 39 deletions(-) create mode 100644 configs/store_csv.example create mode 100644 configs/test.toml.example create mode 100644 src/daq/store/types.py create mode 100644 src/daq/test_job.py create mode 100644 src/daq/types.py diff --git a/configs/store_csv.example b/configs/store_csv.example new file mode 100644 index 0000000..4e0e10b --- /dev/null +++ b/configs/store_csv.example @@ -0,0 +1 @@ +daq_job_type="store_csv" \ No newline at end of file diff --git a/configs/test.toml.example b/configs/test.toml.example new file mode 100644 index 0000000..f3887c5 --- /dev/null +++ b/configs/test.toml.example @@ -0,0 +1,8 @@ +daq_job_type="test" +rand_min=1 +rand_max=100 + +[store_config] +daq_job_store_type="csv" +file_path="test.csv" +add_date=true \ No newline at end of file diff --git a/src/daq/caen/n1081b.py b/src/daq/caen/n1081b.py index 35d56b2..433ff87 100644 --- a/src/daq/caen/n1081b.py +++ b/src/daq/caen/n1081b.py @@ -6,13 +6,13 @@ from daq.base import DAQJob from daq.models import DAQJobMessage -from daq.store.models import StorableDAQJobConfig +from daq.store.models import DAQJobMessageStore, StorableDAQJobConfig N1081B_QUERY_INTERVAL_SECONDS = 1 @dataclass -class DAQN1081BConfig(StorableDAQJobConfig): +class DAQJobN1081BConfig(StorableDAQJobConfig): host: str port: str password: str @@ -20,11 +20,11 @@ class DAQN1081BConfig(StorableDAQJobConfig): class DAQJobN1081B(DAQJob): - config_type = DAQN1081BConfig + config_type = DAQJobN1081BConfig device: N1081B - config: DAQN1081BConfig + config: DAQJobN1081BConfig - def __init__(self, config: DAQN1081BConfig): + def __init__(self, config: DAQJobN1081BConfig): super().__init__(config) self.device = N1081B(f"{config.host}:{config.port}?") @@ -80,4 +80,16 @@ def _poll_sections(self): for counter in data["counters"]: self._logger.info(f"Lemo {counter['lemo']}: {counter['value']}") + self._send_store_message(data) + self._logger.info("===") + + def _send_store_message(self, data: dict): + self.message_out.put( + DAQJobMessageStore( + store_config=self.config.store_config, + daq_job=self, + keys=[x["lemo"] for x in data["counters"]], + data=[[x["value"] for x in data["counters"]]], + ) + ) diff --git a/src/daq/daq_job.py b/src/daq/daq_job.py index 9906d48..224b691 100644 --- a/src/daq/daq_job.py +++ b/src/daq/daq_job.py @@ -6,12 +6,9 @@ import tomllib from daq.base import DAQJob, DAQJobThread -from daq.caen.n1081b import DAQJobN1081B from daq.models import DAQJobConfig - -DAQ_JOB_TYPE_TO_CLASS: dict[str, type["DAQJob"]] = { - "n1081b": DAQJobN1081B, -} +from daq.store.models import DAQJobStoreConfig +from daq.types import DAQ_JOB_TYPE_TO_CLASS def build_daq_job(toml_config: dict) -> DAQJob: @@ -22,7 +19,7 @@ def build_daq_job(toml_config: dict) -> DAQJob: # Get DAQ and DAQ config clasess based on daq_job_type daq_job_class = DAQ_JOB_TYPE_TO_CLASS[generic_daq_job_config.daq_job_type] - daq_job_config_class = daq_job_class.config_type + daq_job_config_class: DAQJobConfig = daq_job_class.config_type # Load the config in config = daq_job_config_class.schema().load(toml_config) @@ -56,3 +53,15 @@ def start_daq_jobs(daq_jobs: list[DAQJob]) -> list[DAQJobThread]: threads.append(start_daq_job(daq_job)) return threads + + +def parse_store_config(config: dict) -> DAQJobStoreConfig: + from daq.store.types import DAQ_STORE_CONFIG_TYPE_TO_CLASS + + if "daq_job_store_type" not in config: + raise Exception("No daq_job_store_type specified in config") + + daq_job_store_type = config["daq_job_store_type"] + store_config_class = DAQ_STORE_CONFIG_TYPE_TO_CLASS[daq_job_store_type] + + return store_config_class.schema().load(config) diff --git a/src/daq/store/base.py b/src/daq/store/base.py index d8ff1d2..f2389b0 100644 --- a/src/daq/store/base.py +++ b/src/daq/store/base.py @@ -2,11 +2,11 @@ from daq.base import DAQJob from daq.models import DAQJobMessage -from daq.store.models import DAQJobMessageStore +from daq.store.models import DAQJobMessageStore, DAQJobStoreConfig class DAQJobStore(DAQJob): - allowed_message_types: list[type["DAQJobMessageStore"]] + allowed_store_config_types: list[type[DAQJobStoreConfig]] def start(self): while True: @@ -14,10 +14,15 @@ def start(self): time.sleep(0.5) def handle_message(self, message: DAQJobMessage) -> bool: - is_message_allowed = False - for allowed_message_type in self.allowed_message_types: - if isinstance(message, allowed_message_type): - is_message_allowed = True - if not is_message_allowed: + if not self.can_store(message): raise Exception(f"Invalid message type: {type(message)}") return super().handle_message(message) + + def can_store(self, message: DAQJobMessage) -> bool: + if not isinstance(message, DAQJobMessageStore): + return False + is_message_allowed = False + for allowed_config_type in self.allowed_store_config_types: + if isinstance(message.store_config, allowed_config_type): + is_message_allowed = True + return is_message_allowed diff --git a/src/daq/store/csv.py b/src/daq/store/csv.py index d9c7971..5d86f4a 100644 --- a/src/daq/store/csv.py +++ b/src/daq/store/csv.py @@ -6,31 +6,32 @@ from pathlib import Path from typing import Any, cast +from daq.models import DAQJobConfig from daq.store.base import DAQJobStore from daq.store.models import DAQJobMessageStore, DAQJobStoreConfig -@dataclass -class DAQJobMessageStoreCSV(DAQJobMessageStore): - header: list[str] - data: list[list[str]] - - @dataclass class DAQJobStoreConfigCSV(DAQJobStoreConfig): file_path: str add_date: bool +@dataclass +class DAQJobStoreCSVConfig(DAQJobConfig): + pass + + class DAQJobStoreCSV(DAQJobStore): - allowed_message_types = [DAQJobMessageStoreCSV] + config_type = DAQJobStoreCSVConfig + allowed_store_config_types = [DAQJobStoreConfigCSV] _open_files: dict[str, TextIOWrapper] def __init__(self, config: Any): super().__init__(config) self._open_files = {} - def handle_message(self, message: DAQJobMessageStoreCSV) -> bool: + def handle_message(self, message: DAQJobMessageStore) -> bool: super().handle_message(message) store_config = cast(DAQJobStoreConfigCSV, message.store_config) file_path = store_config.file_path @@ -58,7 +59,7 @@ def handle_message(self, message: DAQJobMessageStoreCSV) -> bool: file = open(file_path, "a") self._open_files[file_path] = file writer = csv.writer(file) - writer.writerow(message.header) + writer.writerow(message.keys) else: file = self._open_files[file_path] writer = csv.writer(file) diff --git a/src/daq/store/models.py b/src/daq/store/models.py index 8033561..49b3155 100644 --- a/src/daq/store/models.py +++ b/src/daq/store/models.py @@ -1,5 +1,5 @@ -import time from dataclasses import dataclass +from typing import Any from dataclasses_json import DataClassJsonMixin @@ -18,10 +18,12 @@ class DAQJobStoreConfig(DataClassJsonMixin): @dataclass class DAQJobMessageStore(DAQJobMessage): - store_config: DAQJobStoreConfig + store_config: dict | DAQJobStoreConfig daq_job: DAQJob + keys: list[str] + data: list[list[Any]] @dataclass class StorableDAQJobConfig(DAQJobConfig): - store_config: type[DAQJobStoreConfig] + store_config: dict diff --git a/src/daq/store/types.py b/src/daq/store/types.py new file mode 100644 index 0000000..c0b62a7 --- /dev/null +++ b/src/daq/store/types.py @@ -0,0 +1,3 @@ +from daq.store.csv import DAQJobStoreConfigCSV + +DAQ_STORE_CONFIG_TYPE_TO_CLASS = {"csv": DAQJobStoreConfigCSV} diff --git a/src/daq/test_job.py b/src/daq/test_job.py new file mode 100644 index 0000000..6af8ff7 --- /dev/null +++ b/src/daq/test_job.py @@ -0,0 +1,40 @@ +import time +from dataclasses import dataclass +from random import randint + +from N1081B import N1081B + +from daq.base import DAQJob +from daq.store.models import DAQJobMessageStore, StorableDAQJobConfig + + +@dataclass +class DAQJobTestConfig(StorableDAQJobConfig): + rand_min: int + rand_max: int + + +class DAQJobTest(DAQJob): + config_type = DAQJobTestConfig + device: N1081B + config: DAQJobTestConfig + + def start(self): + while True: + self.consume() + self._send_store_message() + + time.sleep(1) + + def _send_store_message(self): + def get_int(): + return randint(self.config.rand_min, self.config.rand_max) + + self.message_out.put( + DAQJobMessageStore( + store_config=self.config.store_config, + daq_job=self, + keys=["A", "B", "C"], + data=[[get_int(), get_int(), get_int()]], + ) + ) diff --git a/src/daq/types.py b/src/daq/types.py new file mode 100644 index 0000000..86c1ec6 --- /dev/null +++ b/src/daq/types.py @@ -0,0 +1,10 @@ +from daq.base import DAQJob +from daq.caen.n1081b import DAQJobN1081B +from daq.store.csv import DAQJobStoreCSV +from daq.test_job import DAQJobTest + +DAQ_JOB_TYPE_TO_CLASS: dict[str, type[DAQJob]] = { + "n1081b": DAQJobN1081B, + "test": DAQJobTest, + "store_csv": DAQJobStoreCSV, +} diff --git a/src/test_csv.py b/src/test_csv.py index dddd7b3..bfa3efc 100644 --- a/src/test_csv.py +++ b/src/test_csv.py @@ -3,8 +3,9 @@ import coloredlogs -from daq.daq_job import DAQJob, start_daq_job -from daq.store.csv import DAQJobMessageStoreCSV, DAQJobStoreConfigCSV, DAQJobStoreCSV +from daq.daq_job import DAQJob, parse_store_config, start_daq_job +from daq.store.csv import DAQJobStoreCSV +from daq.store.models import DAQJobMessageStore coloredlogs.install( level=logging.DEBUG, @@ -15,14 +16,16 @@ _test_daq_job = DAQJob({}) while True: daq_job_thread.daq_job.message_in.put_nowait( - DAQJobMessageStoreCSV( + DAQJobMessageStore( daq_job=_test_daq_job, - header=["a", "b", "c"], + keys=["a", "b", "c"], data=[["1", "2", "3"], ["4", "5", "6"]], - store_config=DAQJobStoreConfigCSV( - daq_job_store_type="", - file_path="test.csv", - add_date=True, + store_config=parse_store_config( + { + "daq_job_store_type": "csv", + "file_path": "test.csv", + "add_date": True, + } ), ) ) diff --git a/src/test_entrypoint.py b/src/test_entrypoint.py index 024bf1f..4861361 100644 --- a/src/test_entrypoint.py +++ b/src/test_entrypoint.py @@ -1,16 +1,19 @@ import logging import time +from queue import Empty import coloredlogs -from daq.daq_job import load_daq_jobs, start_daq_job, start_daq_jobs +from daq.daq_job import load_daq_jobs, parse_store_config, start_daq_job, start_daq_jobs from daq.store.base import DAQJobStore +from daq.store.models import DAQJobMessageStore coloredlogs.install( level=logging.DEBUG, datefmt="%Y-%m-%d %H:%M:%S", ) +DAQ_JOB_QUEUE_ACTION_TIMEOUT = 0.1 daq_jobs = load_daq_jobs("configs/") daq_job_threads = start_daq_jobs(daq_jobs) @@ -29,4 +32,23 @@ for thread in dead_threads: daq_job_threads.append(start_daq_job(thread.daq_job)) + daq_messages = [] + for thread in daq_job_threads: + try: + daq_messages.append( + thread.daq_job.message_out.get(timeout=DAQ_JOB_QUEUE_ACTION_TIMEOUT) + ) + except Empty: + pass + + # Handle store messages + for message in daq_messages: + if isinstance(message, DAQJobMessageStore): + if isinstance(message.store_config, dict): + message.store_config = parse_store_config(message.store_config) + for store_job in store_jobs: + if not store_job.can_store(message): + continue + store_job.message_in.put(message, timeout=DAQ_JOB_QUEUE_ACTION_TIMEOUT) + time.sleep(1)