diff --git a/docs/developer/index.rst b/docs/developer/index.rst index e376891..9f29a2b 100644 --- a/docs/developer/index.rst +++ b/docs/developer/index.rst @@ -55,6 +55,16 @@ Locally running the units tests: $> conda activate webmonchow (webmonchow)$> pytest -v tests/ +Listing the dependencies +------------------------ +It is critical to list the dependencies both in the conda recipe `meta.yaml` and in the `pyproject.toml` file. +The first ensures a successful build of the conda package, +while the second enables us to install a feature branch of `webmonchow` in +`service webmonchow `_ +of the `data_workflow` package. + + + Building the python wheel ------------------------- Locally building the python wheel. At the root of the repo: diff --git a/docs/media/deployment_worfklow.pptx b/docs/media/deployment_worfklow.pptx new file mode 100644 index 0000000..68fc646 Binary files /dev/null and b/docs/media/deployment_worfklow.pptx differ diff --git a/docs/media/deployment_workflow.png b/docs/media/deployment_workflow.png new file mode 100644 index 0000000..6f0e29a Binary files /dev/null and b/docs/media/deployment_workflow.png differ diff --git a/docs/releases.rst b/docs/releases.rst index c636839..d0ec9a6 100644 --- a/docs/releases.rst +++ b/docs/releases.rst @@ -17,7 +17,16 @@ Notes for the next major or minor release of `webmonchow`. - PR #XYZ one-liner description +1.1.0 +----- +(TBA) + +**Of interest to the Developer:** + +- PR #12 Add retry logic for database and broker connections + 1.0.0 ----- +(27th of August, 2021) Modernization of the repository following the best practices described in the paper `Preferred Practices Through a Project Template `_. diff --git a/docs/user/index.rst b/docs/user/index.rst index 2ac2b0c..f73bb6b 100644 --- a/docs/user/index.rst +++ b/docs/user/index.rst @@ -18,6 +18,17 @@ These services provide the feed to three of the components spawned by the :align: center :alt: layout of webmonchow +Deployments +----------- +Webmonchow is deployed as a service in a Docker container, +either in the local (http://localhost/) +or test (http://webmon-test.ornl.gov) environments of the Web Monitor application. + +.. image:: ../media/deployment_workflow.png + :width: 999px + :align: center + :alt: layout of webmonchow + Entry Points ------------ @@ -31,6 +42,8 @@ After installation, two executable scripts are available from the command line: Broadcast AMQ messages ---------------------- Command `broadcast_amq` will connect to the default AMQ broker at `localhost:61613`. +If the broker is not running, `broadcast_amq` will attempt to reconnect every 5 seconds indefinitely +or until a prescribed number of attempts is reached. These settings can be changed by via the command line options (see `broadcast_amq --help`) Messages are generated from one or more content `json` files. @@ -103,8 +116,10 @@ An example of the input `json` file is: Broadcast PV updates -------------------- -Command `broadcast_pv` will connect to the default postgresql database `localhost:5432` and -send the PV updates defined in file +Command `broadcast_pv` will connect to the default postgresql database `localhost:5432`. +If the database is not running, `broadcast_pv` will attempt to reconnect every 5 seconds indefinitely +or until a prescribed number of attempts is reached. +Once connected, `broadcast_pv` will send the PV updates defined in file `PV dasmon.json `_. These settings can be changed by via the command line options (see `broadcast_pv --help`) diff --git a/src/webmonchow/amq/broadcast.py b/src/webmonchow/amq/broadcast.py index 45e72b9..3753e37 100644 --- a/src/webmonchow/amq/broadcast.py +++ b/src/webmonchow/amq/broadcast.py @@ -73,7 +73,7 @@ def message_generator(data): count += 1 -def connect_to_broker(broker, user, password): +def connect_to_broker(broker, user, password, attempts=None, interval=5.0): """ Creates and returns a connection to an AMQ broker. @@ -85,19 +85,37 @@ def connect_to_broker(broker, user, password): The username for the connection. password : str The password for the connection. + attempts : Optional[int] + The number of attempts to connect to the broker. If None, the connection will be attempted indefinitely. + interval : float + The time interval between connection attempts. Returns ------- stomp.Connection An established connection. + + Raises + ------ + stomp.exception.ConnectFailedException + If the connection fails after the specified number of attempts. """ conn = stomp.Connection( host_and_ports=[ tuple(broker.split(":")), ] ) - conn.connect(user, password, wait=True) - return conn + attempt_number = 0 + while attempts is None or attempt_number < attempts: + try: + conn.connect(user, password, wait=True) + print(f"Connected to {broker}") + return conn + except stomp.exception.ConnectFailedException as e: + attempt_number += 1 + print(f"Failed to connect to broker after {attempt_number} attempts: {e}") + time.sleep(interval) + raise stomp.exception.ConnectFailedException(f"Failed to connect to broker after {attempts} attempts.") def broadcast(connection, message_gen): diff --git a/src/webmonchow/pv/broadcast.py b/src/webmonchow/pv/broadcast.py index a927b05..0bec5dd 100644 --- a/src/webmonchow/pv/broadcast.py +++ b/src/webmonchow/pv/broadcast.py @@ -99,7 +99,7 @@ def broadcast(conn, pv_gen): conn.commit() -def connect_to_database(database, user, password, host, port): +def connect_to_database(database, user, password, host, port, attempts=None, interval=5.0): """ Establishes a connection to a PostgreSQL database. @@ -115,14 +115,32 @@ def connect_to_database(database, user, password, host, port): The host address of the database. port : str The port number on which the database is listening. + attempts : Optional[int] + The number of attempts to connect to the broker. If None, the connection will be attempted indefinitely. + interval : float + The time interval between connection attempts. Returns ------- psycopg2.extensions.connection A connection object to the PostgreSQL database. + + Raises + ------ + psycopg2.OperationalError + If the connection fails after the specified number of attempts. """ - conn = psycopg2.connect(database=database, user=user, password=password, host=host, port=port) - return conn + attempt_number = 0 + while attempts is None or attempt_number < attempts: + try: + conn = psycopg2.connect(database=database, user=user, password=password, host=host, port=port) + print(f"Connected to {database}") + return conn + except psycopg2.OperationalError as e: + attempt_number += 1 + print(f"Failed to connect to database after {attempt_number} attempts: {e}") + time.sleep(interval) + raise psycopg2.OperationalError(f"Failed to connect to database after {attempts} attempts.") def get_options(argv): diff --git a/tests/unit/test_amq_broadcast.py b/tests/unit/test_amq_broadcast.py index cb466b1..e5ffc7c 100644 --- a/tests/unit/test_amq_broadcast.py +++ b/tests/unit/test_amq_broadcast.py @@ -1,14 +1,17 @@ # standard imports import json import os +from unittest import TestCase from unittest.mock import mock_open, patch # third-party imports import pytest +import stomp # webmonchow imports from webmonchow.amq.broadcast import ( broadcast, + connect_to_broker, get_options, message_generator, read_contents, @@ -52,6 +55,29 @@ def test_message_generator(): assert next(gen) == ("queue2", "msg2") +class TestConnectToBroker(TestCase): + @patch("stomp.Connection") + def test_connects_successfully(self, mock_connection): + mock_conn = mock_connection.return_value + mock_conn.connect.return_value = mock_conn + + broker = "localhost:61613" + user = "user" + password = "password" + conn = connect_to_broker(broker, user, password) + + self.assertEqual(conn, mock_conn) + mock_conn.connect.assert_called_once_with(user, password, wait=True) + + def test_fails_and_exceeds_attempts(self): + broker = "localhost:61613" + user = "user" + password = "password" + with pytest.raises(stomp.exception.ConnectFailedException) as e: + connect_to_broker(broker, user, password, attempts=2, interval=1.0) + assert str(e.value) == "Failed to connect to broker after 2 attempts." + + def test_broadcast(): with patch("stomp.Connection", autospec=True) as mock_connection: mock_conn = mock_connection.return_value diff --git a/tests/unit/test_pv_broadcast.py b/tests/unit/test_pv_broadcast.py index 1ff4739..b50d630 100644 --- a/tests/unit/test_pv_broadcast.py +++ b/tests/unit/test_pv_broadcast.py @@ -4,6 +4,7 @@ from unittest.mock import MagicMock, mock_open, patch # third-party imports +import psycopg2 import pytest # webmonchow imports @@ -63,6 +64,12 @@ def test_connect_to_database(mock_psycopg2_connect): ) +def test_connect_to_database_fails(): + with pytest.raises(psycopg2.OperationalError) as e: + connect_to_database("database", "user", "password", "host", "port", attempts=2, interval=1.0) + assert str(e.value) == "Failed to connect to database after 2 attempts." + + @patch("time.time") def test_broadcast(mock_time): mock_time.return_value = 123456