Skip to content

Commit

Permalink
use async queues properly
Browse files Browse the repository at this point in the history
  • Loading branch information
emmazhou committed Oct 3, 2024
1 parent fc12d75 commit 8ac6770
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 37 deletions.
12 changes: 5 additions & 7 deletions synapse/server/nodes/spectral_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ async def on_data_received(self, data: SynapseData):
self.sample_rate = data.sample_rate
self.channel_states.clear()

await self.data_queue.put(data)
await super().on_data_received(data)

def apply_filter(self, sample_data):
channel_ids, samples = zip(*sample_data)
Expand All @@ -88,13 +88,11 @@ def apply_filter(self, sample_data):

async def run(self):
while self.running:
try:
data = await self.data_queue.get()
except queue.Empty:
continue

data = await self.data_queue.get()
filtered_samples = self.apply_filter(data.samples)

await self.emit_data(
ElectricalBroadbandData(data.t0, filtered_samples, data.sample_rate)
ElectricalBroadbandData(
data.t0, data.bit_width, filtered_samples, data.sample_rate
)
)
5 changes: 1 addition & 4 deletions synapse/server/nodes/spike_detect.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,7 @@ def configure(self, config=SpikeDetectConfig()):

async def run(self):
while self.running:
try:
data = await self.data_queue.get()
except queue.Empty:
continue
data = await self.data_queue.get()

if data.data_type != DataType.kBroadband:
self.logger.warning("Received non-broadband data")
Expand Down
34 changes: 12 additions & 22 deletions synapse/server/nodes/stream_out.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,35 +76,25 @@ async def run(self):
if not self.socket:
self.logger.error("socket not configured")
return
try:
data = await self.data_queue.get()
except queue.Empty:
continue

data = await self.data_queue.get()
packets = self._pack(data)

for packet in packets:
try:
await loop.run_in_executor(
None,
self.__socket.sendto,
packet,
(self.socket[0], self.socket[1]),
)
except Exception as e:
self.logger.error(f"Error sending data: {e}")
await loop.run_in_executor(
None,
self.__socket.sendto,
packet,
(self.socket[0], self.socket[1]),
)

def _pack(self, data: SynapseData) -> List[bytes]:
packets = []

if hasattr(data, "pack"):
try:
packets = data.pack(self.__sequence_number)
self.__sequence_number += len(packets)

except Exception as e:
raise ValueError(f"Error packing data: {e}")
else:
raise ValueError(f"Invalid payload: {type(data)}, {data}")
try:
packets = data.pack(self.__sequence_number)
self.__sequence_number += len(packets)
except Exception as e:
raise ValueError(f"Error packing data: {e}")

return packets
6 changes: 2 additions & 4 deletions synapse/simulator/nodes/optical_stimulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ def configure(self, config=OpticalStimulationConfig()) -> Status:
async def run(self):
self.logger.debug("Starting to receive data...")
while self.running:
try:
data = await self.data_queue.get()
except queue.Empty:
continue
data = await self.data_queue.get()

# write to the device somehow, but here, just log it
value = int.from_bytes(data, byteorder="big")
self.logger.debug("received data: %i" % (self.id, value))
Expand Down

0 comments on commit 8ac6770

Please sign in to comment.