Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retry logic for database and broker connections #12

Merged
merged 7 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/developer/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ Locally running the units tests:
$> conda activate webmonchow
(webmonchow)$> pytest -v tests/

Listing the dependencies
------------------------
It is critical to list the dependencies both in the conda recipe `meta.yaml` and in the `pyproject.toml` file.
The first ensures a successful build of the conda package,
while the second enables us to install a feature branch of `webmonchow` in
`service webmonchow <https://github.com/neutrons/data_workflow/blob/next/Dockerfile.webmonchow>`_
of the `data_workflow` package.



Building the python wheel
-------------------------
Locally building the python wheel. At the root of the repo:
Expand Down
Binary file added docs/media/deployment_worfklow.pptx
Binary file not shown.
Binary file added docs/media/deployment_workflow.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
9 changes: 9 additions & 0 deletions docs/releases.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,16 @@ Notes for the next major or minor release of `webmonchow`.

- PR #XYZ one-liner description

1.1.0
-----
(TBA)

**Of interest to the Developer:**

- PR #12 Add retry logic for database and broker connections

1.0.0
-----
(27th of August, 2021)
Modernization of the repository following the best practices described in the paper
`Preferred Practices Through a Project Template <https://zenodo.org/records/13357328>`_.
19 changes: 17 additions & 2 deletions docs/user/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@ These services provide the feed to three of the components spawned by the
:align: center
:alt: layout of webmonchow

Deployments
-----------
Webmonchow is deployed as a service in a Docker container,
either in the local (http://localhost/)
or test (http://webmon-test.ornl.gov) environments of the Web Monitor application.

.. image:: ../media/deployment_workflow.png
:width: 999px
:align: center
:alt: layout of webmonchow


Entry Points
------------
Expand All @@ -31,6 +42,8 @@ After installation, two executable scripts are available from the command line:
Broadcast AMQ messages
----------------------
Command `broadcast_amq` will connect to the default AMQ broker at `localhost:61613`.
If the broker is not running, `broadcast_amq` will attempt to reconnect every 5 seconds indefinitely
or until a prescribed number of attempts is reached.
These settings can be changed by via the command line options (see `broadcast_amq --help`)

Messages are generated from one or more content `json` files.
Expand Down Expand Up @@ -103,8 +116,10 @@ An example of the input `json` file is:

Broadcast PV updates
--------------------
Command `broadcast_pv` will connect to the default postgresql database `localhost:5432` and
send the PV updates defined in file
Command `broadcast_pv` will connect to the default postgresql database `localhost:5432`.
If the database is not running, `broadcast_pv` will attempt to reconnect every 5 seconds indefinitely
or until a prescribed number of attempts is reached.
Once connected, `broadcast_pv` will send the PV updates defined in file
`PV dasmon.json <https://github.com/neutrons/webmonchow/blob/next/src/webmonchow/pv/services/dasmon.json>`_.

These settings can be changed by via the command line options (see `broadcast_pv --help`)
Expand Down
24 changes: 21 additions & 3 deletions src/webmonchow/amq/broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def message_generator(data):
count += 1


def connect_to_broker(broker, user, password):
def connect_to_broker(broker, user, password, attempts=None, interval=5.0):
"""
Creates and returns a connection to an AMQ broker.

Expand All @@ -85,19 +85,37 @@ def connect_to_broker(broker, user, password):
The username for the connection.
password : str
The password for the connection.
attempts : Optional[int]
The number of attempts to connect to the broker. If None, the connection will be attempted indefinitely.
interval : float
The time interval between connection attempts.

Returns
-------
stomp.Connection
An established connection.

Raises
------
stomp.exception.ConnectFailedException
If the connection fails after the specified number of attempts.
"""
conn = stomp.Connection(
host_and_ports=[
tuple(broker.split(":")),
]
)
conn.connect(user, password, wait=True)
return conn
attempt_number = 0
while attempts is None or attempt_number < attempts:
try:
conn.connect(user, password, wait=True)
print(f"Connected to {broker}")
return conn
except stomp.exception.ConnectFailedException as e:
attempt_number += 1
print(f"Failed to connect to broker after {attempt_number} attempts: {e}")
time.sleep(interval)
raise stomp.exception.ConnectFailedException(f"Failed to connect to broker after {attempts} attempts.")


def broadcast(connection, message_gen):
Expand Down
24 changes: 21 additions & 3 deletions src/webmonchow/pv/broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def broadcast(conn, pv_gen):
conn.commit()


def connect_to_database(database, user, password, host, port):
def connect_to_database(database, user, password, host, port, attempts=None, interval=5.0):
"""
Establishes a connection to a PostgreSQL database.

Expand All @@ -115,14 +115,32 @@ def connect_to_database(database, user, password, host, port):
The host address of the database.
port : str
The port number on which the database is listening.
attempts : Optional[int]
The number of attempts to connect to the broker. If None, the connection will be attempted indefinitely.
interval : float
The time interval between connection attempts.

Returns
-------
psycopg2.extensions.connection
A connection object to the PostgreSQL database.

Raises
------
psycopg2.OperationalError
If the connection fails after the specified number of attempts.
"""
conn = psycopg2.connect(database=database, user=user, password=password, host=host, port=port)
return conn
attempt_number = 0
while attempts is None or attempt_number < attempts:
try:
conn = psycopg2.connect(database=database, user=user, password=password, host=host, port=port)
print(f"Connected to {database}")
return conn
except psycopg2.OperationalError as e:
attempt_number += 1
print(f"Failed to connect to database after {attempt_number} attempts: {e}")
time.sleep(interval)
raise psycopg2.OperationalError(f"Failed to connect to database after {attempts} attempts.")


def get_options(argv):
Expand Down
26 changes: 26 additions & 0 deletions tests/unit/test_amq_broadcast.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
# standard imports
import json
import os
from unittest import TestCase
from unittest.mock import mock_open, patch

# third-party imports
import pytest
import stomp

# webmonchow imports
from webmonchow.amq.broadcast import (
broadcast,
connect_to_broker,
get_options,
message_generator,
read_contents,
Expand Down Expand Up @@ -52,6 +55,29 @@ def test_message_generator():
assert next(gen) == ("queue2", "msg2")


class TestConnectToBroker(TestCase):
@patch("stomp.Connection")
def test_connects_successfully(self, mock_connection):
mock_conn = mock_connection.return_value
mock_conn.connect.return_value = mock_conn

broker = "localhost:61613"
user = "user"
password = "password"
conn = connect_to_broker(broker, user, password)

self.assertEqual(conn, mock_conn)
mock_conn.connect.assert_called_once_with(user, password, wait=True)

def test_fails_and_exceeds_attempts(self):
broker = "localhost:61613"
user = "user"
password = "password"
with pytest.raises(stomp.exception.ConnectFailedException) as e:
connect_to_broker(broker, user, password, attempts=2, interval=1.0)
assert str(e.value) == "Failed to connect to broker after 2 attempts."


def test_broadcast():
with patch("stomp.Connection", autospec=True) as mock_connection:
mock_conn = mock_connection.return_value
Expand Down
7 changes: 7 additions & 0 deletions tests/unit/test_pv_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from unittest.mock import MagicMock, mock_open, patch

# third-party imports
import psycopg2
import pytest

# webmonchow imports
Expand Down Expand Up @@ -63,6 +64,12 @@ def test_connect_to_database(mock_psycopg2_connect):
)


def test_connect_to_database_fails():
with pytest.raises(psycopg2.OperationalError) as e:
connect_to_database("database", "user", "password", "host", "port", attempts=2, interval=1.0)
assert str(e.value) == "Failed to connect to database after 2 attempts."


@patch("time.time")
def test_broadcast(mock_time):
mock_time.return_value = 123456
Expand Down
Loading