diff --git a/src/daq/store/csv.py b/src/daq/store/csv.py index 1615e28..5351068 100644 --- a/src/daq/store/csv.py +++ b/src/daq/store/csv.py @@ -12,7 +12,7 @@ from daq.store.models import DAQJobMessageStore, DAQJobStoreConfig from utils.file import modify_file_path -DAQ_JOB_STORE_CSV_FLUSH_INTERVAL_SECONDS = 5 * 60 +DAQ_JOB_STORE_CSV_FLUSH_INTERVAL_SECONDS = 15 DAQ_JOB_STORE_CSV_WRITE_BATCH_SIZE = 1000 @@ -102,15 +102,13 @@ def store_loop(self): rows_to_write = [] # Write rows in batches - while True: - rows_to_write.clear() - for _ in range(DAQ_JOB_STORE_CSV_WRITE_BATCH_SIZE): - try: - rows_to_write.append(file.write_queue.get_nowait()) - except Empty: - break - if len(rows_to_write) == 0: + rows_to_write.clear() + for _ in range(DAQ_JOB_STORE_CSV_WRITE_BATCH_SIZE): + try: + rows_to_write.append(file.write_queue.get_nowait()) + except Empty: break + if len(rows_to_write) > 0: total_rows_to_write += len(rows_to_write) writer.writerows(rows_to_write) diff --git a/src/tests/test_csv.py b/src/tests/test_csv.py index 3016c38..27f830f 100644 --- a/src/tests/test_csv.py +++ b/src/tests/test_csv.py @@ -93,6 +93,27 @@ def test_store_loop(self, mock_csv_writer): mock_writer_instance.writerows.assert_called() self.assertTrue(file.file.flush.called) + @patch("csv.writer") + def test_store_loop_writerows(self, mock_csv_writer): + file = CSVFile( + file=MagicMock(closed=False), + last_flush_date=datetime.now() + - timedelta(seconds=DAQ_JOB_STORE_CSV_FLUSH_INTERVAL_SECONDS + 1), + write_queue=Queue(), + ) + file.write_queue.put(["row1_col1", "row1_col2"]) + file.write_queue.put(["row2_col1", "row2_col2"]) + self.store._open_csv_files["test.csv"] = file + + mock_writer_instance = mock_csv_writer.return_value + + self.store.store_loop() + + mock_writer_instance.writerows.assert_called_with( + [["row1_col1", "row1_col2"], ["row2_col1", "row2_col2"]] + ) + self.assertTrue(file.file.flush.called) + def test_del(self): file = CSVFile( file=MagicMock(closed=False),