diff --git a/postprocessing/processors/reduction_processor.py b/postprocessing/processors/reduction_processor.py index 8ade85b..a625ce8 100644 --- a/postprocessing/processors/reduction_processor.py +++ b/postprocessing/processors/reduction_processor.py @@ -117,3 +117,7 @@ def __call__(self): logging.error(f"reduce: {sys.exc_info()[1]}") self.data["error"] = f"Reduction: {sys.exc_info()[1]} " self.send(ReductionProcessor.ERROR_QUEUE, json.dumps(self.data)) + + +class ReductionProcessorHighMemory(ReductionProcessor): + _message_queue = "/queue/REDUCTION.HIMEM.DATA_READY" diff --git a/tests/integration/Dockerfile b/tests/integration/Dockerfile index 326d983..ffd01db 100644 --- a/tests/integration/Dockerfile +++ b/tests/integration/Dockerfile @@ -21,6 +21,7 @@ RUN mkdir -p /opt/postprocessing/log && \ mkdir -p /SNS/EQSANS/IPTS-10674/0/30892/NeXus && \ mkdir -p /SNS/EQSANS/IPTS-10674/shared/autoreduce && \ touch /SNS/EQSANS/IPTS-10674/0/30892/NeXus/EQSANS_30892_event.nxs && \ + touch /SNS/EQSANS/IPTS-10674/0/30892/NeXus/EQSANS_30893_event.nxs && \ mkdir -p /SNS/EQSANS/shared/autoreduce && \ echo "import sys;print(sys.argv[1:])" > /SNS/EQSANS/shared/autoreduce/reduce_EQSANS.py && \ \ diff --git a/tests/integration/docker-compose-rpm.yml b/tests/integration/docker-compose-rpm.yml index 403783b..7eff009 100644 --- a/tests/integration/docker-compose-rpm.yml +++ b/tests/integration/docker-compose-rpm.yml @@ -8,7 +8,7 @@ services: dockerfile: ./Dockerfile activemq: - image: rmohr/activemq + image: apache/activemq-classic hostname: activemq ports: - 8161:8161 diff --git a/tests/integration/docker-compose.yml b/tests/integration/docker-compose.yml index e4d78fa..fd46047 100644 --- a/tests/integration/docker-compose.yml +++ b/tests/integration/docker-compose.yml @@ -8,7 +8,7 @@ services: dockerfile: ./tests/integration/Dockerfile activemq: - image: rmohr/activemq + image: apache/activemq-classic hostname: activemq ports: - 8161:8161 diff --git a/tests/integration/post_processing.conf b/tests/integration/post_processing.conf index 54f8fc1..ffb2d8b 100644 --- a/tests/integration/post_processing.conf +++ b/tests/integration/post_processing.conf @@ -28,7 +28,8 @@ "calvera_processor.CalveraProcessor", "calvera_processor.CalveraReducedProcessor", "create_reduction_script_processor.CreateReductionScriptProcessor", - "reduction_processor.ReductionProcessor" + "reduction_processor.ReductionProcessor", + "reduction_processor.ReductionProcessorHighMemory" ], "calvera_ingest_url": "http://not-valid.localhost:12345" } diff --git a/tests/integration/test_data_ready.py b/tests/integration/test_data_ready.py index 018536e..e06587a 100644 --- a/tests/integration/test_data_ready.py +++ b/tests/integration/test_data_ready.py @@ -169,3 +169,52 @@ def test_reduction_error(): assert msg["facility"] == message["facility"] assert msg["data_file"] == message["data_file"] assert msg["error"] == "REDUCTION: This is an ERROR!" + + +def test_reduction_high_memory(): + message = { + "run_number": "30893", + "instrument": "EQSANS", + "ipts": "IPTS-10674", + "facility": "SNS", + "data_file": "/SNS/EQSANS/IPTS-10674/0/30892/NeXus/EQSANS_30893_event.nxs", + } + + conn = stomp.Connection(host_and_ports=[("localhost", 61613)]) + + listener = stomp.listener.TestListener() + conn.set_listener("", listener) + + try: + conn.connect() + except stomp.exception.ConnectFailedException: + pytest.skip("Requires activemq running") + + # expect a reduction complete + conn.subscribe("/queue/REDUCTION.COMPLETE", id="123", ack="auto") + + # send data ready + conn.send("/queue/REDUCTION.HIMEM.DATA_READY", json.dumps(message).encode()) + + listener.wait_for_message() + + conn.disconnect() + + header, body = listener.get_latest_message() + + msg = json.loads(body) + assert msg["run_number"] == message["run_number"] + assert msg["instrument"] == message["instrument"] + assert msg["ipts"] == message["ipts"] + assert msg["facility"] == message["facility"] + assert msg["data_file"] == message["data_file"] + + # we can also check that the reduction did run by checking the reduction_log + reduction_log = docker_exec_and_cat( + "/SNS/EQSANS/IPTS-10674/shared/autoreduce/reduction_log/EQSANS_30893_event.nxs.log" + ) + + assert ( + reduction_log + == "['/SNS/EQSANS/IPTS-10674/0/30892/NeXus/EQSANS_30893_event.nxs', '/SNS/EQSANS/IPTS-10674/shared/autoreduce/']\n" + )