Skip to content

Commit

Permalink
Merge pull request #41 from neutrons/register_processors
Browse files Browse the repository at this point in the history
Make 'processors' an optional configuration parameter
  • Loading branch information
backmari authored Jan 23, 2024
2 parents 139c3e9 + cd1a2b7 commit cc4674e
Show file tree
Hide file tree
Showing 13 changed files with 56 additions and 125 deletions.
51 changes: 17 additions & 34 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,15 @@ For the old version of the post-processing agent, see https://github.com/mantidp

Configuration
-------------
A configuration must be placed in `/etc/post_process_consumer.conf`.
A configuration must be placed in `/etc/autoreduce/post_processing.conf`.

The `configuration/post_process_consumer.conf.developement` file will make a good starting
The `configuration/post_process_consumer.conf.development` file will make a good starting
point for configuration. Here are the entries to pay attention to:

{
"uri": "failover:(tcp://localhost:61613)?randomize=false,startupMaxReconnectAttempts=100,initialReconnectDelay=1000,maxReconnectDelay=5000,maxReconnectAttempts=-1",
"brokers": [("localhost", 61613)],
"amq_user": "",
"amq_pwd": "",
"amq_queues": ["/queue/FERMI_REDUCTION.DATA_READY", "/queue/CATALOG.DATA_READY", "/queue/REDUCTION_CATALOG.DATA_READY"],
"reduction_data_ready": "FERMI_REDUCTION.DATA_READY",

"sw_dir": "/opt/postprocessing",
"python_dir": "/opt/postprocessing/postprocessing",
"start_script": "python",
Expand All @@ -36,17 +33,12 @@ point for configuration. Here are the entries to pay attention to:

#### ActiveMQ settings

- The ActiveMQ server settings must be set by replacing localhost above
by the proper address and the "amq_user" and "amq_pwd" must be filled out.
- List the input queues in "amq_queues".
- Change the input queue names as needed. For example, if the standard
"REDUCTION.DATA_READY" queue is replaced by special-purpose queue like
"FERMI_REDUCTION.DATA_READY", you should change the name of that queue
on the configuration file.
- The ActiveMQ server settings must be set by replacing `localhost` above
by the proper address and the `"amq_user"` and `"amq_pwd"` must be filled out.

- If "jobs_per_instrument" is set to an integer greater than zero, no more than
- If `"jobs_per_instrument"` is set to an integer greater than zero, no more than
that number of jobs will run on a given node for a given instrument.
Set "jobs_per_instrument" to zero to turn this feature off.
Set `"jobs_per_instrument"` to zero to turn this feature off.

If this feature is used, you must add the following to activemq.xml:

Expand Down Expand Up @@ -85,32 +77,23 @@ calling scripts hosted on the analysis cluster.

Installation
------------
The typical installation is designed to be similar to earlier versions of this service.
You can modify where the software is installed by modifying the prefix at the top of the Makefile.

- Create the configuration files:
Create the configuration files and edit according to your installation.

cd configuration
cp post_process_consumer.conf.developement post_process_consumer.conf

Edit the file according to your installation.

- From the top source directory, run

sudo make install
cp post_process_consumer.conf.developement /etc/autoreduce/post_processing.conf

- Alternatively, you can package your configured installation as an RPM:
To run, simply call

make rpm

- To install on a compute node with limited access, you can also do the following:

sudo make install/isolated
python [installation path]/queueProcessor.py

- To run, simply call
Development environment
-----------------------

python [installation path]/queueProcessor.py
The conda environment for running `queueProcessor.py` and tests locally is defined in `environment.yml`. Create and activate the conda environment for development.

conda env create # or: mamba env create
conda activate post_processing_agent

Running the tests
-----------------
Expand Down Expand Up @@ -149,7 +132,7 @@ or

$ python scripts/mantidpython.py tests/reduce_CONDA.py [Data file] [Output dir]

as an example for how to activating a specific conda environment for reduction.
as an example for how to activate a specific conda environment for reduction.


Running with docker
Expand Down
101 changes: 24 additions & 77 deletions README_developer.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,24 @@

## Tasks and queues

There are four tasks handled by the post processing agent:
There are four tasks handled by the Post Processing Agent:

1. Catalog raw data
2. Reduce data
3. Catalog reduced data
4. Create reduction script

The four tasks each correspond to one message queue in the ActiveMQ message broker. For historical reasons,
there are two ways the queue - task connection is made, which will be described in the following two sections.
The four tasks each correspond to one ActiveMQ message broker queue that the Post Processing Agent subscribes to.

### Fixed tasks with configurable queue names
### Task configuration

The message queues for tasks 2. and 4. are configured in the postprocessor configuration file.
The user can configure the active post processors using the parameter `"processors"`. The Post
Processing Agent subscribes to the queues for the registered post processors. The queue names are hard coded
in their respective post processor classes.

| Configuration parameter | Description | Default value |
| ----------------------- | --- | ------------- |
| `"amq_queues"` | List of queues the agent will subscribe to |
| `"reduction_data_ready"` | Messages from this queue will be handled by the `reduce` method | `REDUCTION.DATA_READY` |
| `"create_reduction_script"` | Messages from this queue will be handled by the `create_reduction_script` method | `REDUCTION.CREATE_SCRIPT` |

The Post Processing Agent subscribes to the queues in `"amq_queues"`. When a message is received,
the handler of the message is determined by checking if the queue matches one of the queues
`"reduction_data_ready"` or `"create_reduction_script"`.

### Optional tasks with fixed queue names

The message queues for tasks 1. and 3. are hard coded in their respective post processor classes.
Instead, the user configures the active post processors using the parameter `"processors"` and the Post
Processing Agent subscribes to the queues for the registered post processors.

| Configuration parameter | Description | Default value |
| ----------------------- | --- | ------------- |
| `"processors"` | List of post-processors to register | `[]`, but should probably be `["oncat_processor.ONCatProcessor", "oncat_reduced_processor.ONCatProcessor"]` |
| `"processors"` | List of post-processors to register | `["oncat_processor.ONCatProcessor", "oncat_reduced_processor.ONCatProcessor", "create_reduction_script_processor.CreateReductionScriptProcessor", "reduction_processor.ReductionProcessor"]` |

## Starting the Post Processing Agent

Expand All @@ -47,15 +32,14 @@ subscribes to the required queues, as described in [Tasks and queues](#tasks-and

#### Related configuration parameters

| Configuration parameter | Description | Default value |
| ----------------------- | --- | ------------- |
| `"failover_uri"` | URI of the message broker (the first argument to `StompConfig`) | `tcp://amqbroker.sns.gov:61613` |
| `"amq_queues"` | List of queues the agent will subscribe to |
| `"amq_user"` | Message broker username |
| `"amq_pwd"` | Message broker password |
| `"log_file"` | Path to the log file | `post_processing.log` |
| `"heart_beat"` | Topic the agent will send heartbeats to | `/topic/SNS.COMMON.STATUS.AUTOREDUCE.0` |
| `"heartbeat_ping"` | Topic the agent will subscribe to for ping requests | `/topic/SNS.COMMON.STATUS.PING` |
| Configuration parameter | Description | Default value |
| ----------------------- |-------------------------------------------------------------------------------------------------------------| ------------- |
| `"brokers"` | List of tuples containing host name and port of ActiveMQ broker(s), for example: `[("localhost", 61613)]` | |
| `"amq_user"` | Message broker username |
| `"amq_pwd"` | Message broker password |
| `"log_file"` | Path to the log file | `post_processing.log` |
| `"heart_beat"` | Topic the agent will send heartbeats to | `/topic/SNS.COMMON.STATUS.AUTOREDUCE.0` |
| `"heartbeat_ping"` | Topic the agent will subscribe to for ping requests | `/topic/SNS.COMMON.STATUS.PING` |

## Consuming a message

Expand Down Expand Up @@ -84,12 +68,10 @@ The class `PostProcessorAdmin` routes the consumed message based on the queue na

| Configuration parameter | Description | Default value |
| ----------------------- | --- | ------------- |
| `"failover_uri"` | URI of the message broker | `tcp://amqbroker.sns.gov:61613` |
| `"brokers"` | List of tuples containing host name and port of ActiveMQ broker(s), for example: `[("localhost", 61613)]` | |
| `"amq_user"` | Message broker username |
| `"amq_pwd"` | Message broker password |
| `"exceptions"` | List of exceptions (exception messages) that are not treated as errors | |
| `"reduction_data_ready"` | Messages from this queue will be handled by the `reduce` method | `REDUCTION.DATA_READY` |
| `"create_reduction_script"` | Messages from this queue will be handled by the `create_reduction_script` method | `REDUCTION.CREATE_SCRIPT` |

## Tasks handled by Post Processing Agent

Expand All @@ -102,16 +84,15 @@ create reduction script, catalog raw data and catalog reduced data.

| Configuration parameter | Description | Default value |
| ----------------------- | ------ | ------------- |
| `"reduction_data_ready"` | Messages from this queue will be handled by the `reduce` method | `REDUCTION.DATA_READY` |
| `"reduction_started"` | Topic for status message | |
| `"reduction_complete"` | Topic for status message | |
| `"reduction_error"` | Topic for status message | |
| `"reduction_disabled"` | Topic for status message | |
| `"python_exec"` | Python executable used to run the reduction script | `python` |

#### Test message
#### Example message

Default ActiveMQ queue: `REDUCTION.DATA_READY`.
Queue name: `REDUCTION.DATA_READY`.

{
"information": "mac83808.sns.gov",
Expand All @@ -128,21 +109,20 @@ Default ActiveMQ queue: `REDUCTION.DATA_READY`.

| Configuration parameter | Description | Default value |
| ----------------------- | ------ | ------------- |
| `"create_reduction_script"` | Messages from this queue will be handled by the `create_reduction_script` method | `REDUCTION.CREATE_SCRIPT` |
| `"service_status"` | Topic for status messages related to creating a reduction script | `/topic/SNS.${instrument}.STATUS.POSTPROCESS` |

#### Test message
#### Example message

Default ActiveMQ queue: `REDUCTION.CREATE_SCRIPT`.
Queue name: `REDUCTION.CREATE_SCRIPT`.

Test message using default parameters in the template:
Example message using default parameters in the template:

{
'instrument': 'EQSANS',
'use_default': True
}

Test message with custom parameter values:
Example message with custom parameter values:

{
'instrument': 'SEQ',
Expand All @@ -152,53 +132,20 @@ Test message with custom parameter values:

### Catalog raw data

Listens to queue: `CATALOG.ONCAT.DATA_READY`. The queue name is hard coded and
automatically subscribed to if the configuration parameter `"processors"`
contains `"oncat_processor.ONCatProcessor"`.
Queue name: `CATALOG.ONCAT.DATA_READY`.

#### Related configuration parameters

| Configuration parameter | Description | Default value |
| ----------------------- | --- | ------------- |
| `"processors"` | List of post-processors to register | `[]`, but should probably be `["oncat_processor.ONCatProcessor", "oncat_reduced_processor.ONCatProcessor"]` |
| `"python_exec"` | Python executable used to run the ONCat ingest script | `python` |

### Catalog reduced data

Listens to queue: `REDUCTION_CATALOG.DATA_READY`. The queue name is hard coded and
automatically subscribed to if the configuration parameter `"processors"`
contains `"oncat_reduced_processor.ONCatProcessor"`.
Queue name: `REDUCTION_CATALOG.DATA_READY`.

#### Related configuration parameters

| Configuration parameter | Description | Default value |
| ----------------------- | --- | ------------- |
| `"processors"` | List of post-processors to register | `[]`, but should probably be `["oncat_processor.ONCatProcessor", "oncat_reduced_processor.ONCatProcessor"]` |
| `"python_exec"` | Python executable used to run the ONCat reduced ingest script | `python` |

## Testing Post Processing Agent locally

### To run a local ActiveMQ message broker

1. Download ActiveMQ from the website: https://activemq.apache.org/.
2. Install Java using `sudo apt install default-jdk`
3. Start the message broker using `apache-activemq-*.*.*/bin/activemq start`
4. Access the Apache ActiveMQ Console at ``http://localhost:8161/`` to add queues etc. (Default user/password is admin/admin.)
5. Purge pending messages using `activemq purge`
6. Stop the message broker using `activemq stop`

### To simulate incoming messages

With a local ActiveMQ message broker running, we can simulate messages from the workflow manager by publishing messages to the queues, either using the ActiveMQ Console or the `stompy.py` command-line client.

#### Use ActiveMQ Console

Open the Apache ActiveMQ Console at ``http://localhost:8161/`` (default user/password is admin/admin). Locate the queue and
the "Send To" operation, paste the JSON in the Message body and press "Send".

#### Use `stomp.py` command-line client

Install `stomp.py` and use its [command-line client](https://jasonrbriggs.github.io/stomp.py/commandline.html).

$ python -m stomp -H localhost -P 61613
> send /queue/REDUCTION.DATA_READY {"information":"mac83808.sns.gov", "run_number":"30892", "instrument":"EQSANS", "ipts":"IPTS-10674", "facility":"SNS", "data_file":"/Volumes/RAID/SNS/EQSANS/IPTS-10674/0/30892/NeXus/EQSANS_30892_event.nxs"}
2 changes: 1 addition & 1 deletion SPECS/postprocessing.spec
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
%define release 1

Name: %{srcname}
Version: 2.8.0
Version: 3.0.0
Release: %{release}%{?dist}
Summary: %{summary}

Expand Down
1 change: 0 additions & 1 deletion configuration/post_process_consumer.conf.development
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
{
"failover_uri": "failover:(tcp://localhost:61613)?randomize=false,startupMaxReconnectAttempts=100,initialReconnectDelay=1000,maxReconnectDelay=5000,maxReconnectAttempts=-1",
"brokers": [["localhost", 61613]],
"amq_queues": ["/queue/REDUCTION.DATA_READY"],
"amq_user": "icat",
"amq_pwd": "icat",
"sw_dir": "/opt/postprocessing",
Expand Down
1 change: 0 additions & 1 deletion configuration/post_process_consumer.conf.local
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
{
"failover_uri": "failover:(tcp://localhost:61613)?randomize=false,startupMaxReconnectAttempts=100,initialReconnectDelay=1000,maxReconnectDelay=5000,maxReconnectAttempts=-1",
"brokers": [("localhost", 61613)],
"amq_queues": ["/queue/REDUCTION.CREATE_SCRIPT", "/queue/REDUCTION.DATA_READY", "/queue/CATALOG.DATA_READY", "/queue/REDUCTION_CATALOG.DATA_READY"],
"amq_user": "",
"amq_pwd": "",
"sw_dir": "/opt/postprocessing",
Expand Down
5 changes: 2 additions & 3 deletions environment.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: post_processing_agent_py3
name: post_processing_agent
channels:
- conda-forge
- defaults
Expand All @@ -12,5 +12,4 @@ dependencies:
- requests=2.25
- stomp.py=7
- h5py
# needed for wheel - add when switching to python3
# - build
- build
12 changes: 9 additions & 3 deletions postprocessing/Configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ def __init__(self, config_file):
# ActiveMQ broker information
self.failover_uri = config["failover_uri"]
self.brokers = [(host, port) for host, port in config["brokers"]]
self.queues = config["amq_queues"]
self.sw_dir = config["sw_dir"] if "sw_dir" in config else "/opt/postprocessing"
self.postprocess_error = config["postprocess_error"]
# Reduction AMQ queues
Expand Down Expand Up @@ -133,7 +132,14 @@ def __init__(self, config_file):

sys.path.insert(0, self.sw_dir)
# Configure processor plugins
self.processors = config["processors"] if "processors" in config else []
default_processors = [
"oncat_processor.ONCatProcessor",
"oncat_reduced_processor.ONCatProcessor",
"create_reduction_script_processor.CreateReductionScriptProcessor",
"reduction_processor.ReductionProcessor",
]
self.processors = config.get("processors", default_processors)
self.queues = []
if isinstance(self.processors, list):
for p in self.processors:
toks = p.split(".")
Expand All @@ -143,7 +149,7 @@ def __init__(self, config_file):
f"postprocessing.processors.{toks[0]}"
)
try:
processor_class = eval(f"processor_module.{toks[1]}")
processor_class = getattr(processor_module, toks[1])
self.queues.append(processor_class.get_input_queue_name())
except: # noqa: E722
logging.error(
Expand Down
2 changes: 1 addition & 1 deletion postprocessing/PostProcessAdmin.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def send(self, destination, data):
f"postprocessing.processors.{toks[0]}"
)
try:
processor_class = eval(f"processor_module.{toks[1]}")
processor_class = getattr(processor_module, toks[1])
if (
namespace.queue
== processor_class.get_input_queue_name()
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[project]
name = "postprocessing"
description = "Post-processing agent to automatically catalog and reduce neutron data"
version = "2.8.0"
version = "3.0.0"
requires-python = ">=3.9"
dependencies = [
"requests",
Expand Down
1 change: 0 additions & 1 deletion tests/data/post_processing.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
{
"failover_uri": "failover:(tcp://amqbroker.sns.gov:61613)?randomize=false,startupMaxReconnectAttempts=100,initialReconnectDelay=1000,maxReconnectDelay=5000,maxReconnectAttempts=-1",
"brokers": [["amqbroker.sns.gov", 61613]],
"amq_queues": ["/queue/REDUCTION.CREATE_SCRIPT", "/queue/REDUCTION.DATA_READY", "/queue/CATALOG.DATA_READY"],
"amq_user": "wfclient",
"amq_pwd": "w0rkfl0w",
"sw_dir": "/opt/postprocessing",
Expand Down
1 change: 0 additions & 1 deletion tests/data/post_processing.conf.original
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{
"failover_uri": "failover:(tcp://amqbroker.sns.gov:61613)?randomize=false,startupMaxReconnectAttempts=100,initialReconnectDelay=1000,maxReconnectDelay=5000,maxReconnectAttempts=-1",
"amq_queues": ["/queue/REDUCTION.CREATE_SCRIPT", "/queue/REDUCTION.DATA_READY", "/queue/CATALOG.DATA_READY"],
"amq_user": "wfclient",
"amq_pwd": "w0rkfl0w",
"sw_dir": "/opt/postprocessing",
Expand Down
1 change: 0 additions & 1 deletion tests/integration/post_processing.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
{
"failover_uri": "failover:(tcp://activemq:61613)?randomize=false,startupMaxReconnectAttempts=100,initialReconnectDelay=1000,maxReconnectDelay=5000,maxReconnectAttempts=-1",
"brokers": [["activemq", 61613]],
"amq_queues": ["/queue/CATALOG.DATA_READY"],
"amq_user": "",
"amq_pwd": "",
"sw_dir": "/opt/postprocessing",
Expand Down
Loading

0 comments on commit cc4674e

Please sign in to comment.