Skip to content

Commit

Permalink
refactor: rename config for DAQJobRemote and add it to types
Browse files Browse the repository at this point in the history
  • Loading branch information
furkan-bilgin committed Oct 19, 2024
1 parent 8d82dc9 commit 8d3eb7c
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 13 deletions.
20 changes: 11 additions & 9 deletions src/daq/jobs/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

@dataclass
class DAQJobRemoteConfig(DAQJobConfig):
zmq_sender_url: str
zmq_receiver_url: str
zmq_local_url: str
zmq_remote_url: str


class DAQJobRemote(DAQJob):
Expand All @@ -25,27 +25,29 @@ class DAQJobRemote(DAQJob):
"""

allowed_message_in_types = [DAQJobMessage] # accept all message types
config = DAQJobRemoteConfig
config_type = DAQJobRemoteConfig
config: DAQJobRemoteConfig

def __init__(self, config: DAQJobRemoteConfig):
super().__init__(config)
self._zmq_context = zmq.Context()
self._zmq_sender = self._zmq_context.socket(zmq.PUSH)
self._zmq_receiver = self._zmq_context.socket(zmq.PULL)
self._zmq_sender.connect(config.zmq_sender_url)
self._zmq_receiver.connect(config.zmq_receiver_url)
self._zmq_local = self._zmq_context.socket(zmq.PUSH)
self._zmq_remote = self._zmq_context.socket(zmq.PULL)
self._zmq_local.connect(config.zmq_local_url)
self._zmq_remote.connect(config.zmq_remote_url)

self._receive_thread = threading.Thread(
target=self._start_receive_thread, daemon=True
)

def handle_message(self, message: DAQJobMessage) -> bool:
self._zmq_sender.send(pickle.dumps(message))
print(type(message))
self._zmq_local.send(pickle.dumps(message))
return True

def _start_receive_thread(self):
while True:
message = self._zmq_receiver.recv()
message = self._zmq_remote.recv()
# remote message_in -> message_out
self.message_out.put(pickle.loads(message))

Expand Down
2 changes: 2 additions & 0 deletions src/daq/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from daq.jobs.handle_alerts import DAQJobHandleAlerts
from daq.jobs.handle_stats import DAQJobHandleStats
from daq.jobs.healthcheck import DAQJobHealthcheck
from daq.jobs.remote import DAQJobRemote
from daq.jobs.serve_http import DAQJobServeHTTP
from daq.jobs.store.csv import DAQJobStoreCSV
from daq.jobs.store.root import DAQJobStoreROOT
Expand All @@ -19,4 +20,5 @@
"handle_alerts": DAQJobHandleAlerts,
"alert_slack": DAQJobAlertSlack,
"healthcheck": DAQJobHealthcheck,
"remote": DAQJobRemote,
}
8 changes: 4 additions & 4 deletions src/tests/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ def setUp(self, MockZmqContext):
self.mock_receiver = self.mock_context.socket.return_value
self.config = DAQJobRemoteConfig(
daq_job_type="remote",
zmq_sender_url="tcp://localhost:5555",
zmq_receiver_url="tcp://localhost:5556",
zmq_local_url="tcp://localhost:5555",
zmq_remote_url="tcp://localhost:5556",
)
self.daq_job_remote = DAQJobRemote(self.config)
self.daq_job_remote._zmq_sender = self.mock_sender
self.daq_job_remote._zmq_receiver = self.mock_receiver
self.daq_job_remote._zmq_local = self.mock_sender
self.daq_job_remote._zmq_remote = self.mock_receiver

def test_handle_message(self):
message = DAQJobMessageStore(
Expand Down

0 comments on commit 8d3eb7c

Please sign in to comment.