Skip to content

Commit

Permalink
Merge pull request #12 from neutrons/relentless_attempt_db_connection
Browse files Browse the repository at this point in the history
Add retry logic for database and broker connections
  • Loading branch information
jmborr authored Aug 30, 2024
2 parents f6940da + 2a357c9 commit 6d92150
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 8 deletions.
10 changes: 10 additions & 0 deletions docs/developer/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ Locally running the units tests:
$> conda activate webmonchow
(webmonchow)$> pytest -v tests/
Listing the dependencies
------------------------
It is critical to list the dependencies both in the conda recipe `meta.yaml` and in the `pyproject.toml` file.
The first ensures a successful build of the conda package,
while the second enables us to install a feature branch of `webmonchow` in
`service webmonchow <https://github.com/neutrons/data_workflow/blob/next/Dockerfile.webmonchow>`_
of the `data_workflow` package.



Building the python wheel
-------------------------
Locally building the python wheel. At the root of the repo:
Expand Down
Binary file added docs/media/deployment_worfklow.pptx
Binary file not shown.
Binary file added docs/media/deployment_workflow.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
9 changes: 9 additions & 0 deletions docs/releases.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,16 @@ Notes for the next major or minor release of `webmonchow`.

- PR #XYZ one-liner description

1.1.0
-----
(TBA)

**Of interest to the Developer:**

- PR #12 Add retry logic for database and broker connections

1.0.0
-----
(27th of August, 2021)
Modernization of the repository following the best practices described in the paper
`Preferred Practices Through a Project Template <https://zenodo.org/records/13357328>`_.
19 changes: 17 additions & 2 deletions docs/user/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@ These services provide the feed to three of the components spawned by the
:align: center
:alt: layout of webmonchow

Deployments
-----------
Webmonchow is deployed as a service in a Docker container,
either in the local (http://localhost/)
or test (http://webmon-test.ornl.gov) environments of the Web Monitor application.

.. image:: ../media/deployment_workflow.png
:width: 999px
:align: center
:alt: layout of webmonchow


Entry Points
------------
Expand All @@ -31,6 +42,8 @@ After installation, two executable scripts are available from the command line:
Broadcast AMQ messages
----------------------
Command `broadcast_amq` will connect to the default AMQ broker at `localhost:61613`.
If the broker is not running, `broadcast_amq` will attempt to reconnect every 5 seconds indefinitely
or until a prescribed number of attempts is reached.
These settings can be changed by via the command line options (see `broadcast_amq --help`)

Messages are generated from one or more content `json` files.
Expand Down Expand Up @@ -103,8 +116,10 @@ An example of the input `json` file is:
Broadcast PV updates
--------------------
Command `broadcast_pv` will connect to the default postgresql database `localhost:5432` and
send the PV updates defined in file
Command `broadcast_pv` will connect to the default postgresql database `localhost:5432`.
If the database is not running, `broadcast_pv` will attempt to reconnect every 5 seconds indefinitely
or until a prescribed number of attempts is reached.
Once connected, `broadcast_pv` will send the PV updates defined in file
`PV dasmon.json <https://github.com/neutrons/webmonchow/blob/next/src/webmonchow/pv/services/dasmon.json>`_.

These settings can be changed by via the command line options (see `broadcast_pv --help`)
Expand Down
24 changes: 21 additions & 3 deletions src/webmonchow/amq/broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def message_generator(data):
count += 1


def connect_to_broker(broker, user, password):
def connect_to_broker(broker, user, password, attempts=None, interval=5.0):
"""
Creates and returns a connection to an AMQ broker.
Expand All @@ -85,19 +85,37 @@ def connect_to_broker(broker, user, password):
The username for the connection.
password : str
The password for the connection.
attempts : Optional[int]
The number of attempts to connect to the broker. If None, the connection will be attempted indefinitely.
interval : float
The time interval between connection attempts.
Returns
-------
stomp.Connection
An established connection.
Raises
------
stomp.exception.ConnectFailedException
If the connection fails after the specified number of attempts.
"""
conn = stomp.Connection(
host_and_ports=[
tuple(broker.split(":")),
]
)
conn.connect(user, password, wait=True)
return conn
attempt_number = 0
while attempts is None or attempt_number < attempts:
try:
conn.connect(user, password, wait=True)
print(f"Connected to {broker}")
return conn
except stomp.exception.ConnectFailedException as e:
attempt_number += 1
print(f"Failed to connect to broker after {attempt_number} attempts: {e}")
time.sleep(interval)
raise stomp.exception.ConnectFailedException(f"Failed to connect to broker after {attempts} attempts.")


def broadcast(connection, message_gen):
Expand Down
24 changes: 21 additions & 3 deletions src/webmonchow/pv/broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def broadcast(conn, pv_gen):
conn.commit()


def connect_to_database(database, user, password, host, port):
def connect_to_database(database, user, password, host, port, attempts=None, interval=5.0):
"""
Establishes a connection to a PostgreSQL database.
Expand All @@ -115,14 +115,32 @@ def connect_to_database(database, user, password, host, port):
The host address of the database.
port : str
The port number on which the database is listening.
attempts : Optional[int]
The number of attempts to connect to the broker. If None, the connection will be attempted indefinitely.
interval : float
The time interval between connection attempts.
Returns
-------
psycopg2.extensions.connection
A connection object to the PostgreSQL database.
Raises
------
psycopg2.OperationalError
If the connection fails after the specified number of attempts.
"""
conn = psycopg2.connect(database=database, user=user, password=password, host=host, port=port)
return conn
attempt_number = 0
while attempts is None or attempt_number < attempts:
try:
conn = psycopg2.connect(database=database, user=user, password=password, host=host, port=port)
print(f"Connected to {database}")
return conn
except psycopg2.OperationalError as e:
attempt_number += 1
print(f"Failed to connect to database after {attempt_number} attempts: {e}")
time.sleep(interval)
raise psycopg2.OperationalError(f"Failed to connect to database after {attempts} attempts.")


def get_options(argv):
Expand Down
26 changes: 26 additions & 0 deletions tests/unit/test_amq_broadcast.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
# standard imports
import json
import os
from unittest import TestCase
from unittest.mock import mock_open, patch

# third-party imports
import pytest
import stomp

# webmonchow imports
from webmonchow.amq.broadcast import (
broadcast,
connect_to_broker,
get_options,
message_generator,
read_contents,
Expand Down Expand Up @@ -52,6 +55,29 @@ def test_message_generator():
assert next(gen) == ("queue2", "msg2")


class TestConnectToBroker(TestCase):
@patch("stomp.Connection")
def test_connects_successfully(self, mock_connection):
mock_conn = mock_connection.return_value
mock_conn.connect.return_value = mock_conn

broker = "localhost:61613"
user = "user"
password = "password"
conn = connect_to_broker(broker, user, password)

self.assertEqual(conn, mock_conn)
mock_conn.connect.assert_called_once_with(user, password, wait=True)

def test_fails_and_exceeds_attempts(self):
broker = "localhost:61613"
user = "user"
password = "password"
with pytest.raises(stomp.exception.ConnectFailedException) as e:
connect_to_broker(broker, user, password, attempts=2, interval=1.0)
assert str(e.value) == "Failed to connect to broker after 2 attempts."


def test_broadcast():
with patch("stomp.Connection", autospec=True) as mock_connection:
mock_conn = mock_connection.return_value
Expand Down
7 changes: 7 additions & 0 deletions tests/unit/test_pv_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from unittest.mock import MagicMock, mock_open, patch

# third-party imports
import psycopg2
import pytest

# webmonchow imports
Expand Down Expand Up @@ -63,6 +64,12 @@ def test_connect_to_database(mock_psycopg2_connect):
)


def test_connect_to_database_fails():
with pytest.raises(psycopg2.OperationalError) as e:
connect_to_database("database", "user", "password", "host", "port", attempts=2, interval=1.0)
assert str(e.value) == "Failed to connect to database after 2 attempts."


@patch("time.time")
def test_broadcast(mock_time):
mock_time.return_value = 123456
Expand Down

0 comments on commit 6d92150

Please sign in to comment.