-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: added experimental support for storing root files, stil needs s…
…ome work
- Loading branch information
1 parent
d8297d9
commit 34157c8
Showing
4 changed files
with
98 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
import os | ||
from dataclasses import dataclass | ||
from typing import Any, cast | ||
|
||
import uproot | ||
|
||
from daq.models import DAQJobConfig | ||
from daq.store.base import DAQJobStore | ||
from daq.store.models import DAQJobMessageStore, DAQJobStoreConfig | ||
from utils.file import add_date_to_file_name | ||
|
||
|
||
@dataclass | ||
class DAQJobStoreConfigROOT(DAQJobStoreConfig): | ||
file_path: str | ||
add_date: bool | ||
|
||
|
||
@dataclass | ||
class DAQJobStoreROOTConfig(DAQJobConfig): | ||
pass | ||
|
||
|
||
class DAQJobStoreROOT(DAQJobStore): | ||
config_type = DAQJobStoreROOTConfig | ||
allowed_store_config_types = [DAQJobStoreConfigROOT] | ||
_open_files: dict[str, Any] | ||
|
||
def __init__(self, config: Any): | ||
super().__init__(config) | ||
self._open_files = {} | ||
|
||
def handle_message(self, message: DAQJobMessageStore) -> bool: | ||
super().handle_message(message) | ||
store_config = cast(DAQJobStoreConfigROOT, message.store_config) | ||
file_path = add_date_to_file_name(store_config.file_path, store_config.add_date) | ||
|
||
if file_path not in self._open_files: | ||
file_exists = os.path.exists(file_path) | ||
# Create the file if it doesn't exist | ||
if not file_exists: | ||
# If file was newly created, do not commit it, close it and | ||
# switch to update mode on the next iteration | ||
root_file = uproot.recreate(file_path) | ||
else: | ||
root_file = uproot.update(file_path) | ||
self._open_files[file_path] = root_file | ||
else: | ||
file_exists = True | ||
root_file = self._open_files[file_path] | ||
|
||
data_to_write = {} | ||
for idx, key in enumerate(message.keys): | ||
for data in message.data: | ||
if key not in data_to_write: | ||
data_to_write[key] = [] | ||
data_to_write[key].append(data[idx]) | ||
|
||
# TODO: This creates a new tree every time we commit. We should probably create tree | ||
# once and only once, preferably when everything we needed to save is available | ||
# This kind of depends on the task so it will have to wait | ||
root_file["tree"] = {key: data_to_write[key] for key in message.keys} | ||
root_file.file.sink.flush() | ||
|
||
# Close the file if it was newly created | ||
if not file_exists: | ||
root_file.file.sink.close() | ||
|
||
return True | ||
|
||
def __del__(self): | ||
# Close all open files | ||
for root_file in self._open_files.values(): | ||
if root_file.closed: | ||
continue | ||
root_file.file.close() | ||
|
||
return super().__del__() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,7 @@ | ||
from daq.store.csv import DAQJobStoreConfigCSV | ||
from daq.store.root import DAQJobStoreConfigROOT | ||
|
||
DAQ_STORE_CONFIG_TYPE_TO_CLASS = {"csv": DAQJobStoreConfigCSV} | ||
DAQ_STORE_CONFIG_TYPE_TO_CLASS = { | ||
"csv": DAQJobStoreConfigCSV, | ||
"root": DAQJobStoreConfigROOT, | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,12 @@ | ||
from daq.base import DAQJob | ||
from daq.caen.n1081b import DAQJobN1081B | ||
from daq.store.csv import DAQJobStoreCSV | ||
from daq.store.root import DAQJobStoreROOT | ||
from daq.test_job import DAQJobTest | ||
|
||
DAQ_JOB_TYPE_TO_CLASS: dict[str, type[DAQJob]] = { | ||
"n1081b": DAQJobN1081B, | ||
"test": DAQJobTest, | ||
"store_csv": DAQJobStoreCSV, | ||
"store_root": DAQJobStoreROOT, | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
import os | ||
from datetime import datetime | ||
|
||
|
||
def add_date_to_file_name(file_path: str, add_date: bool) -> str: | ||
splitted_file_path = os.path.splitext(file_path) | ||
date_text = datetime.now().strftime("%Y-%m-%d") | ||
if len(splitted_file_path) > 1: | ||
file_path = f"{splitted_file_path[0]}_{date_text}{splitted_file_path[1]}" | ||
else: | ||
file_path = f"{splitted_file_path[0]}_{date_text}" | ||
|
||
return file_path |