diff --git a/synapse/server/nodes/spectral_filter.py b/synapse/server/nodes/spectral_filter.py index 8d06864..8f2decf 100644 --- a/synapse/server/nodes/spectral_filter.py +++ b/synapse/server/nodes/spectral_filter.py @@ -65,7 +65,7 @@ async def on_data_received(self, data: SynapseData): self.sample_rate = data.sample_rate self.channel_states.clear() - await self.data_queue.put(data) + await super().on_data_received(data) def apply_filter(self, sample_data): channel_ids, samples = zip(*sample_data) @@ -88,13 +88,11 @@ def apply_filter(self, sample_data): async def run(self): while self.running: - try: - data = await self.data_queue.get() - except queue.Empty: - continue - + data = await self.data_queue.get() filtered_samples = self.apply_filter(data.samples) await self.emit_data( - ElectricalBroadbandData(data.t0, filtered_samples, data.sample_rate) + ElectricalBroadbandData( + data.t0, data.bit_width, filtered_samples, data.sample_rate + ) ) diff --git a/synapse/server/nodes/spike_detect.py b/synapse/server/nodes/spike_detect.py index 0c07fed..7693de5 100644 --- a/synapse/server/nodes/spike_detect.py +++ b/synapse/server/nodes/spike_detect.py @@ -53,10 +53,7 @@ def configure(self, config=SpikeDetectConfig()): async def run(self): while self.running: - try: - data = await self.data_queue.get() - except queue.Empty: - continue + data = await self.data_queue.get() if data.data_type != DataType.kBroadband: self.logger.warning("Received non-broadband data") diff --git a/synapse/server/nodes/stream_out.py b/synapse/server/nodes/stream_out.py index acfb2f7..fd1efad 100644 --- a/synapse/server/nodes/stream_out.py +++ b/synapse/server/nodes/stream_out.py @@ -76,35 +76,25 @@ async def run(self): if not self.socket: self.logger.error("socket not configured") return - try: - data = await self.data_queue.get() - except queue.Empty: - continue + data = await self.data_queue.get() packets = self._pack(data) for packet in packets: - try: - await loop.run_in_executor( - None, - self.__socket.sendto, - packet, - (self.socket[0], self.socket[1]), - ) - except Exception as e: - self.logger.error(f"Error sending data: {e}") + await loop.run_in_executor( + None, + self.__socket.sendto, + packet, + (self.socket[0], self.socket[1]), + ) def _pack(self, data: SynapseData) -> List[bytes]: packets = [] - if hasattr(data, "pack"): - try: - packets = data.pack(self.__sequence_number) - self.__sequence_number += len(packets) - - except Exception as e: - raise ValueError(f"Error packing data: {e}") - else: - raise ValueError(f"Invalid payload: {type(data)}, {data}") + try: + packets = data.pack(self.__sequence_number) + self.__sequence_number += len(packets) + except Exception as e: + raise ValueError(f"Error packing data: {e}") return packets diff --git a/synapse/simulator/nodes/optical_stimulation.py b/synapse/simulator/nodes/optical_stimulation.py index 899eef5..f0862a6 100644 --- a/synapse/simulator/nodes/optical_stimulation.py +++ b/synapse/simulator/nodes/optical_stimulation.py @@ -23,10 +23,8 @@ def configure(self, config=OpticalStimulationConfig()) -> Status: async def run(self): self.logger.debug("Starting to receive data...") while self.running: - try: - data = await self.data_queue.get() - except queue.Empty: - continue + data = await self.data_queue.get() + # write to the device somehow, but here, just log it value = int.from_bytes(data, byteorder="big") self.logger.debug("received data: %i" % (self.id, value))