Skip to content

Commit

Permalink
feat: finished csv store, added test DAQJob
Browse files Browse the repository at this point in the history
  • Loading branch information
furkan-bilgin committed Oct 11, 2024
1 parent c9f8a23 commit d105ee9
Show file tree
Hide file tree
Showing 12 changed files with 155 additions and 39 deletions.
1 change: 1 addition & 0 deletions configs/store_csv.example
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
daq_job_type="store_csv"
8 changes: 8 additions & 0 deletions configs/test.toml.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
daq_job_type="test"
rand_min=1
rand_max=100

[store_config]
daq_job_store_type="csv"
file_path="test.csv"
add_date=true
22 changes: 17 additions & 5 deletions src/daq/caen/n1081b.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,25 @@

from daq.base import DAQJob
from daq.models import DAQJobMessage
from daq.store.models import StorableDAQJobConfig
from daq.store.models import DAQJobMessageStore, StorableDAQJobConfig

N1081B_QUERY_INTERVAL_SECONDS = 1


@dataclass
class DAQN1081BConfig(StorableDAQJobConfig):
class DAQJobN1081BConfig(StorableDAQJobConfig):
host: str
port: str
password: str
sections_to_store: list[str]


class DAQJobN1081B(DAQJob):
config_type = DAQN1081BConfig
config_type = DAQJobN1081BConfig
device: N1081B
config: DAQN1081BConfig
config: DAQJobN1081BConfig

def __init__(self, config: DAQN1081BConfig):
def __init__(self, config: DAQJobN1081BConfig):
super().__init__(config)
self.device = N1081B(f"{config.host}:{config.port}?")

Expand Down Expand Up @@ -80,4 +80,16 @@ def _poll_sections(self):
for counter in data["counters"]:
self._logger.info(f"Lemo {counter['lemo']}: {counter['value']}")

self._send_store_message(data)

self._logger.info("===")

def _send_store_message(self, data: dict):
self.message_out.put(
DAQJobMessageStore(
store_config=self.config.store_config,
daq_job=self,
keys=[x["lemo"] for x in data["counters"]],
data=[[x["value"] for x in data["counters"]]],
)
)
21 changes: 15 additions & 6 deletions src/daq/daq_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,9 @@
import tomllib

from daq.base import DAQJob, DAQJobThread
from daq.caen.n1081b import DAQJobN1081B
from daq.models import DAQJobConfig

DAQ_JOB_TYPE_TO_CLASS: dict[str, type["DAQJob"]] = {
"n1081b": DAQJobN1081B,
}
from daq.store.models import DAQJobStoreConfig
from daq.types import DAQ_JOB_TYPE_TO_CLASS


def build_daq_job(toml_config: dict) -> DAQJob:
Expand All @@ -22,7 +19,7 @@ def build_daq_job(toml_config: dict) -> DAQJob:

# Get DAQ and DAQ config clasess based on daq_job_type
daq_job_class = DAQ_JOB_TYPE_TO_CLASS[generic_daq_job_config.daq_job_type]
daq_job_config_class = daq_job_class.config_type
daq_job_config_class: DAQJobConfig = daq_job_class.config_type

# Load the config in
config = daq_job_config_class.schema().load(toml_config)
Expand Down Expand Up @@ -56,3 +53,15 @@ def start_daq_jobs(daq_jobs: list[DAQJob]) -> list[DAQJobThread]:
threads.append(start_daq_job(daq_job))

return threads


def parse_store_config(config: dict) -> DAQJobStoreConfig:
from daq.store.types import DAQ_STORE_CONFIG_TYPE_TO_CLASS

if "daq_job_store_type" not in config:
raise Exception("No daq_job_store_type specified in config")

daq_job_store_type = config["daq_job_store_type"]
store_config_class = DAQ_STORE_CONFIG_TYPE_TO_CLASS[daq_job_store_type]

return store_config_class.schema().load(config)
19 changes: 12 additions & 7 deletions src/daq/store/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,27 @@

from daq.base import DAQJob
from daq.models import DAQJobMessage
from daq.store.models import DAQJobMessageStore
from daq.store.models import DAQJobMessageStore, DAQJobStoreConfig


class DAQJobStore(DAQJob):
allowed_message_types: list[type["DAQJobMessageStore"]]
allowed_store_config_types: list[type[DAQJobStoreConfig]]

def start(self):
while True:
self.consume()
time.sleep(0.5)

def handle_message(self, message: DAQJobMessage) -> bool:
is_message_allowed = False
for allowed_message_type in self.allowed_message_types:
if isinstance(message, allowed_message_type):
is_message_allowed = True
if not is_message_allowed:
if not self.can_store(message):
raise Exception(f"Invalid message type: {type(message)}")
return super().handle_message(message)

def can_store(self, message: DAQJobMessage) -> bool:
if not isinstance(message, DAQJobMessageStore):
return False
is_message_allowed = False
for allowed_config_type in self.allowed_store_config_types:
if isinstance(message.store_config, allowed_config_type):
is_message_allowed = True
return is_message_allowed
19 changes: 10 additions & 9 deletions src/daq/store/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,32 @@
from pathlib import Path
from typing import Any, cast

from daq.models import DAQJobConfig
from daq.store.base import DAQJobStore
from daq.store.models import DAQJobMessageStore, DAQJobStoreConfig


@dataclass
class DAQJobMessageStoreCSV(DAQJobMessageStore):
header: list[str]
data: list[list[str]]


@dataclass
class DAQJobStoreConfigCSV(DAQJobStoreConfig):
file_path: str
add_date: bool


@dataclass
class DAQJobStoreCSVConfig(DAQJobConfig):
pass


class DAQJobStoreCSV(DAQJobStore):
allowed_message_types = [DAQJobMessageStoreCSV]
config_type = DAQJobStoreCSVConfig
allowed_store_config_types = [DAQJobStoreConfigCSV]
_open_files: dict[str, TextIOWrapper]

def __init__(self, config: Any):
super().__init__(config)
self._open_files = {}

def handle_message(self, message: DAQJobMessageStoreCSV) -> bool:
def handle_message(self, message: DAQJobMessageStore) -> bool:
super().handle_message(message)
store_config = cast(DAQJobStoreConfigCSV, message.store_config)
file_path = store_config.file_path
Expand Down Expand Up @@ -58,7 +59,7 @@ def handle_message(self, message: DAQJobMessageStoreCSV) -> bool:
file = open(file_path, "a")
self._open_files[file_path] = file
writer = csv.writer(file)
writer.writerow(message.header)
writer.writerow(message.keys)
else:
file = self._open_files[file_path]
writer = csv.writer(file)
Expand Down
8 changes: 5 additions & 3 deletions src/daq/store/models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import time
from dataclasses import dataclass
from typing import Any

from dataclasses_json import DataClassJsonMixin

Expand All @@ -18,10 +18,12 @@ class DAQJobStoreConfig(DataClassJsonMixin):

@dataclass
class DAQJobMessageStore(DAQJobMessage):
store_config: DAQJobStoreConfig
store_config: dict | DAQJobStoreConfig
daq_job: DAQJob
keys: list[str]
data: list[list[Any]]


@dataclass
class StorableDAQJobConfig(DAQJobConfig):
store_config: type[DAQJobStoreConfig]
store_config: dict
3 changes: 3 additions & 0 deletions src/daq/store/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from daq.store.csv import DAQJobStoreConfigCSV

DAQ_STORE_CONFIG_TYPE_TO_CLASS = {"csv": DAQJobStoreConfigCSV}
40 changes: 40 additions & 0 deletions src/daq/test_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import time
from dataclasses import dataclass
from random import randint

from N1081B import N1081B

from daq.base import DAQJob
from daq.store.models import DAQJobMessageStore, StorableDAQJobConfig


@dataclass
class DAQJobTestConfig(StorableDAQJobConfig):
rand_min: int
rand_max: int


class DAQJobTest(DAQJob):
config_type = DAQJobTestConfig
device: N1081B
config: DAQJobTestConfig

def start(self):
while True:
self.consume()
self._send_store_message()

time.sleep(1)

def _send_store_message(self):
def get_int():
return randint(self.config.rand_min, self.config.rand_max)

self.message_out.put(
DAQJobMessageStore(
store_config=self.config.store_config,
daq_job=self,
keys=["A", "B", "C"],
data=[[get_int(), get_int(), get_int()]],
)
)
10 changes: 10 additions & 0 deletions src/daq/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from daq.base import DAQJob
from daq.caen.n1081b import DAQJobN1081B
from daq.store.csv import DAQJobStoreCSV
from daq.test_job import DAQJobTest

DAQ_JOB_TYPE_TO_CLASS: dict[str, type[DAQJob]] = {
"n1081b": DAQJobN1081B,
"test": DAQJobTest,
"store_csv": DAQJobStoreCSV,
}
19 changes: 11 additions & 8 deletions src/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@

import coloredlogs

from daq.daq_job import DAQJob, start_daq_job
from daq.store.csv import DAQJobMessageStoreCSV, DAQJobStoreConfigCSV, DAQJobStoreCSV
from daq.daq_job import DAQJob, parse_store_config, start_daq_job
from daq.store.csv import DAQJobStoreCSV
from daq.store.models import DAQJobMessageStore

coloredlogs.install(
level=logging.DEBUG,
Expand All @@ -15,14 +16,16 @@
_test_daq_job = DAQJob({})
while True:
daq_job_thread.daq_job.message_in.put_nowait(
DAQJobMessageStoreCSV(
DAQJobMessageStore(
daq_job=_test_daq_job,
header=["a", "b", "c"],
keys=["a", "b", "c"],
data=[["1", "2", "3"], ["4", "5", "6"]],
store_config=DAQJobStoreConfigCSV(
daq_job_store_type="",
file_path="test.csv",
add_date=True,
store_config=parse_store_config(
{
"daq_job_store_type": "csv",
"file_path": "test.csv",
"add_date": True,
}
),
)
)
Expand Down
24 changes: 23 additions & 1 deletion src/test_entrypoint.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import logging
import time
from queue import Empty

import coloredlogs

from daq.daq_job import load_daq_jobs, start_daq_job, start_daq_jobs
from daq.daq_job import load_daq_jobs, parse_store_config, start_daq_job, start_daq_jobs
from daq.store.base import DAQJobStore
from daq.store.models import DAQJobMessageStore

coloredlogs.install(
level=logging.DEBUG,
datefmt="%Y-%m-%d %H:%M:%S",
)

DAQ_JOB_QUEUE_ACTION_TIMEOUT = 0.1

daq_jobs = load_daq_jobs("configs/")
daq_job_threads = start_daq_jobs(daq_jobs)
Expand All @@ -29,4 +32,23 @@
for thread in dead_threads:
daq_job_threads.append(start_daq_job(thread.daq_job))

daq_messages = []
for thread in daq_job_threads:
try:
daq_messages.append(
thread.daq_job.message_out.get(timeout=DAQ_JOB_QUEUE_ACTION_TIMEOUT)
)
except Empty:
pass

# Handle store messages
for message in daq_messages:
if isinstance(message, DAQJobMessageStore):
if isinstance(message.store_config, dict):
message.store_config = parse_store_config(message.store_config)
for store_job in store_jobs:
if not store_job.can_store(message):
continue
store_job.message_in.put(message, timeout=DAQ_JOB_QUEUE_ACTION_TIMEOUT)

time.sleep(1)

0 comments on commit d105ee9

Please sign in to comment.