Skip to content

Commit

Permalink
refactor: refactor variable names in DAQJobRemote
Browse files Browse the repository at this point in the history
  • Loading branch information
furkan-bilgin committed Nov 7, 2024
1 parent 5ed71c8 commit 79f4bf6
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 13 deletions.
25 changes: 13 additions & 12 deletions src/daq/jobs/remote.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pickle
import threading
import time
from typing import Optional

import msgspec
import zmq
Expand Down Expand Up @@ -31,8 +32,8 @@ class DAQJobRemote(DAQJob):
allowed_message_in_types = [DAQJobMessage] # accept all message types
config_type = DAQJobRemoteConfig
config: DAQJobRemoteConfig
_zmq_local: zmq.Socket
_zmq_remotes: dict[str, zmq.Socket]
_zmq_pub: zmq.Socket
_zmq_sub: Optional[zmq.Socket]
_message_class_cache: dict[str, type[DAQJobMessage]]
_remote_message_ids: set[str]
_receive_thread: threading.Thread
Expand All @@ -41,9 +42,9 @@ def __init__(self, config: DAQJobRemoteConfig):
super().__init__(config)
self._zmq_context = zmq.Context()
self._logger.debug(f"Listening on {config.zmq_local_url}")
self._zmq_local = self._zmq_context.socket(zmq.PUB)
self._zmq_remotes = {}
self._zmq_local.bind(config.zmq_local_url)
self._zmq_pub = self._zmq_context.socket(zmq.PUB)
self._zmq_pub.bind(config.zmq_local_url)
self._zmq_sub = None

self._receive_thread = threading.Thread(
target=self._start_receive_thread,
Expand All @@ -66,7 +67,7 @@ def handle_message(self, message: DAQJobMessage) -> bool:
):
return True # Silently ignore

self._zmq_local.send(self._pack_message(message))
self._zmq_pub.send(self._pack_message(message))
return True

def _create_zmq_sub(self, remote_urls: list[str]):
Expand All @@ -79,11 +80,11 @@ def _create_zmq_sub(self, remote_urls: list[str]):
return zmq_sub

def _start_receive_thread(self, remote_urls: list[str]):
zmq_sub = self._create_zmq_sub(remote_urls)
self._zmq_sub = self._create_zmq_sub(remote_urls)

while True:
message = zmq_sub.recv()
self._logger.debug(f"Received {len(message)} bytes from")
message = self._zmq_sub.recv()
self._logger.debug(f"Received {len(message)} bytes")
recv_message = self._unpack_message(message)
recv_message.is_remote = True
# remote message_in -> message_out
Expand Down Expand Up @@ -131,8 +132,8 @@ def _unpack_message(self, message: bytes) -> DAQJobMessage:
return res

def __del__(self):
for remote_url in self._zmq_remotes.keys():
self._zmq_remotes[remote_url].close()
self._zmq_local.close()
if self._zmq_sub is not None:
self._zmq_sub.close()
self._zmq_pub.close()

return super().__del__()
2 changes: 1 addition & 1 deletion src/tests/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def setUp(self, MockZmqContext):
zmq_remote_urls=["tcp://localhost:5556"],
)
self.daq_job_remote = DAQJobRemote(self.config)
self.daq_job_remote._zmq_local = self.mock_sender
self.daq_job_remote._zmq_pub = self.mock_sender

def test_handle_message(self):
message = DAQJobMessage(
Expand Down

0 comments on commit 79f4bf6

Please sign in to comment.