diff --git a/src/daq/jobs/remote.py b/src/daq/jobs/remote.py index 20f581d..e3e9e09 100644 --- a/src/daq/jobs/remote.py +++ b/src/daq/jobs/remote.py @@ -70,6 +70,7 @@ def handle_message(self, message: DAQJobMessage) -> bool: isinstance(message, DAQJobMessageStats) or message.id in self._remote_message_ids or not super().handle_message(message) + or message.is_remote ): return True # Silently ignore @@ -82,8 +83,10 @@ def _start_receive_thread(self, remote_url: str, zmq_remote: zmq.Socket): self._logger.debug( f"Received {len(message)} bytes from remote ({remote_url})" ) + recv_message = self._unpack_message(message) + recv_message.is_remote = True # remote message_in -> message_out - self.message_out.put(self._unpack_message(message)) + self.message_out.put(recv_message) def start(self): for remote_url in self._zmq_remotes.keys(): diff --git a/src/daq/models.py b/src/daq/models.py index 02e7922..04d5f17 100644 --- a/src/daq/models.py +++ b/src/daq/models.py @@ -27,6 +27,8 @@ class DAQJobConfig(DataClassJsonMixin): @dataclass(kw_only=True) class DAQJobMessage(DataClassJsonMixin): id: Optional[str] = field(default_factory=lambda: str(uuid.uuid4())) + timestamp: Optional[datetime] = field(default_factory=datetime.now) + is_remote: bool = False @dataclass diff --git a/src/tests/test_remote.py b/src/tests/test_remote.py index 09a71d7..9728625 100644 --- a/src/tests/test_remote.py +++ b/src/tests/test_remote.py @@ -76,7 +76,9 @@ def side_effect(): self.daq_job_remote._start_receive_thread( "tcp://localhost:5556", self.mock_receiver ) - self.daq_job_remote.message_out.put.assert_called_once_with(message) + assert_msg = message + assert_msg.is_remote = True + self.daq_job_remote.message_out.put.assert_called_once_with(assert_msg) self.assertEqual(self.daq_job_remote.message_out.put.call_count, 1) self.assertEqual(call_count, 2)