Skip to content

Commit

Permalink
feat: change Queue to deque for DAQJobStoreCSV
Browse files Browse the repository at this point in the history
  • Loading branch information
furkan-bilgin committed Oct 12, 2024
1 parent 0a069a7 commit f6c69e6
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 19 deletions.
14 changes: 7 additions & 7 deletions src/daq/store/csv.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import csv
import os
from collections import deque
from dataclasses import dataclass
from datetime import datetime
from io import TextIOWrapper
from pathlib import Path
from queue import Empty, Queue
from typing import Any, cast

from daq.models import DAQJobConfig
Expand All @@ -31,7 +31,7 @@ class DAQJobStoreCSVConfig(DAQJobConfig):
class CSVFile:
file: TextIOWrapper
last_flush_date: datetime
write_queue: Queue[list[Any]]
write_queue: deque[list[Any]]


class DAQJobStoreCSV(DAQJobStore):
Expand All @@ -55,11 +55,11 @@ def handle_message(self, message: DAQJobMessageStore) -> bool:

# Write headers if the file is new
if new_file:
file.write_queue.put(message.keys)
file.write_queue.append(message.keys)

# Append rows to write_queue
for row in message.data:
file.write_queue.put(row)
file.write_queue.append(row)

return True

Expand All @@ -74,7 +74,7 @@ def _open_csv_file(self, file_path: str) -> tuple[CSVFile, bool]:
Path(file_path).touch()

# Open file
file = CSVFile(open(file_path, "a"), datetime.now(), Queue())
file = CSVFile(open(file_path, "a"), datetime.now(), deque())
self._open_csv_files[file_path] = file
else:
file_exists = True
Expand Down Expand Up @@ -104,8 +104,8 @@ def store_loop(self):
rows_to_write.clear()
for _ in range(DAQ_JOB_STORE_CSV_WRITE_BATCH_SIZE):
try:
rows_to_write.append(file.write_queue.get_nowait())
except Empty:
rows_to_write.append(file.write_queue.pop())
except IndexError:
break
if len(rows_to_write) > 0:
writer.writerows(rows_to_write)
Expand Down
24 changes: 12 additions & 12 deletions src/tests/test_csv.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import unittest
from collections import deque
from datetime import datetime, timedelta
from queue import Queue
from unittest.mock import MagicMock, mock_open, patch

from daq.store.csv import (
Expand Down Expand Up @@ -38,7 +38,7 @@ def test_handle_message_new_file(
mock_open.assert_called_once_with("test.csv", "a")
self.assertIn("test.csv", self.store._open_csv_files)
file = self.store._open_csv_files["test.csv"]
self.assertEqual(file.write_queue.qsize(), 3) # 1 header + 2 rows
self.assertEqual(len(file.write_queue), 3) # 1 header + 2 rows

@patch("daq.store.csv.modify_file_path", return_value="test.csv")
@patch("builtins.open", new_callable=mock_open)
Expand All @@ -58,14 +58,14 @@ def test_handle_message_existing_file(self, mock_exists, mock_open, mock_add_dat
mock_open.assert_called_once_with("test.csv", "a")
self.assertIn("test.csv", self.store._open_csv_files)
file = self.store._open_csv_files["test.csv"]
self.assertEqual(file.write_queue.qsize(), 2) # 2 rows only, no header
self.assertEqual(len(file.write_queue), 2) # 2 rows only, no header

def test_flush(self):
file = CSVFile(
file=MagicMock(),
last_flush_date=datetime.now()
- timedelta(seconds=DAQ_JOB_STORE_CSV_FLUSH_INTERVAL_SECONDS + 1),
write_queue=Queue(),
write_queue=deque(),
)
result = self.store._flush(file)
self.assertTrue(result)
Expand All @@ -80,10 +80,10 @@ def test_store_loop(self, mock_csv_writer):
file=MagicMock(closed=False),
last_flush_date=datetime.now()
- timedelta(seconds=DAQ_JOB_STORE_CSV_FLUSH_INTERVAL_SECONDS + 1),
write_queue=Queue(),
write_queue=deque(),
)
file.write_queue.put(["row1_col1", "row1_col2"])
file.write_queue.put(["row2_col1", "row2_col2"])
file.write_queue.append(["row1_col1", "row1_col2"])
file.write_queue.append(["row2_col1", "row2_col2"])
self.store._open_csv_files["test.csv"] = file

mock_writer_instance = mock_csv_writer.return_value
Expand All @@ -99,26 +99,26 @@ def test_store_loop_writerows(self, mock_csv_writer):
file=MagicMock(closed=False),
last_flush_date=datetime.now()
- timedelta(seconds=DAQ_JOB_STORE_CSV_FLUSH_INTERVAL_SECONDS + 1),
write_queue=Queue(),
write_queue=deque(),
)
file.write_queue.put(["row1_col1", "row1_col2"])
file.write_queue.put(["row2_col1", "row2_col2"])
file.write_queue.append(["row1_col1", "row1_col2"])
file.write_queue.append(["row2_col1", "row2_col2"])
self.store._open_csv_files["test.csv"] = file

mock_writer_instance = mock_csv_writer.return_value

self.store.store_loop()

mock_writer_instance.writerows.assert_called_with(
[["row1_col1", "row1_col2"], ["row2_col1", "row2_col2"]]
[["row2_col1", "row2_col2"], ["row1_col1", "row1_col2"]]
)
self.assertTrue(file.file.flush.called)

def test_del(self):
file = CSVFile(
file=MagicMock(closed=False),
last_flush_date=datetime.now(),
write_queue=Queue(),
write_queue=deque(),
)
self.store._open_csv_files["test.csv"] = file

Expand Down

0 comments on commit f6c69e6

Please sign in to comment.