Skip to content

Commit

Permalink
feat: add is_remote flag and timestamp to DAQJobMessage, update…
Browse files Browse the repository at this point in the history
… message handling in `DAQJobRemote`
  • Loading branch information
furkan-bilgin committed Nov 2, 2024
1 parent b61c039 commit 679a756
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 2 deletions.
5 changes: 4 additions & 1 deletion src/daq/jobs/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def handle_message(self, message: DAQJobMessage) -> bool:
isinstance(message, DAQJobMessageStats)
or message.id in self._remote_message_ids
or not super().handle_message(message)
or message.is_remote
):
return True # Silently ignore

Expand All @@ -82,8 +83,10 @@ def _start_receive_thread(self, remote_url: str, zmq_remote: zmq.Socket):
self._logger.debug(
f"Received {len(message)} bytes from remote ({remote_url})"
)
recv_message = self._unpack_message(message)
recv_message.is_remote = True
# remote message_in -> message_out
self.message_out.put(self._unpack_message(message))
self.message_out.put(recv_message)

def start(self):
for remote_url in self._zmq_remotes.keys():
Expand Down
2 changes: 2 additions & 0 deletions src/daq/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class DAQJobConfig(DataClassJsonMixin):
@dataclass(kw_only=True)
class DAQJobMessage(DataClassJsonMixin):
id: Optional[str] = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: Optional[datetime] = field(default_factory=datetime.now)
is_remote: bool = False


@dataclass
Expand Down
4 changes: 3 additions & 1 deletion src/tests/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ def side_effect():
self.daq_job_remote._start_receive_thread(
"tcp://localhost:5556", self.mock_receiver
)
self.daq_job_remote.message_out.put.assert_called_once_with(message)
assert_msg = message
assert_msg.is_remote = True
self.daq_job_remote.message_out.put.assert_called_once_with(assert_msg)
self.assertEqual(self.daq_job_remote.message_out.put.call_count, 1)
self.assertEqual(call_count, 2)

Expand Down

0 comments on commit 679a756

Please sign in to comment.