Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add psycopg3 support to runmailer_pg #177

Merged
merged 2 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ ADDITIONAL CONTRIBUTORS include:
* Steve Weber
* Jaap Roes
* Ed Davison
* Sander Smits
144 changes: 93 additions & 51 deletions src/mailer/postgres.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,29 @@
import logging
import queue
import select
import signal
import sys
import threading
import time
from dataclasses import dataclass
from datetime import datetime

import psycopg2.extensions
from django.db import connections

from mailer.engine import EMPTY_QUEUE_SLEEP, send_all
from mailer.models import Message

is_psycopg3_available = False
try:
import psycopg # noqa: F401
except ImportError:
try:
import psycopg2.extensions
except ImportError:
raise ImportError("Either psycopg or psycopg2 must be installed.")
else:
is_psycopg3_available = True

logger = logging.getLogger(__name__)


Expand All @@ -26,16 +38,21 @@ def postgres_send_loop():
Loop indefinitely, checking queue using NOTIFY/LISTEN and running send_all(),
and additional running every MAILER_EMPTY_QUEUE_SLEEP seconds.
"""
# See https://www.psycopg.org/docs/advanced.html#asynchronous-notifications
# See
# psycopg2
# https://www.psycopg.org/docs/advanced.html#asynchronous-notifications
#
# psycopg3
# https://www.psycopg.org/psycopg3/docs/advanced/async.html#asynchronous-notifications

# Get a connection, for a few lower level operations.
dj_conn = connections[Message.objects.db]
if dj_conn.connection is None:
dj_conn.connect()
conn = dj_conn.connection # psycopg2 connection
conn = dj_conn.connection

# As per psycopg2 docs, we want autocommit for timely notifications
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
# We want autocommit for timely notifications
use_autocommit(conn)

curs = conn.cursor()
install_trigger(curs, dj_conn)
Expand All @@ -46,65 +63,67 @@ def postgres_send_loop():
# sending messages in parallel, which is deliberate - in many cases email
# sending may be throttled and we are less likely to exceed quotas if we
# send in serial rather than parallel.
worker_thread = threading.Thread(target=worker, daemon=True).start()
worker_thread = threading.Thread(target=worker, daemon=True)
worker_thread.start()

if EMPTY_QUEUE_SLEEP is not None:
beat_thread = threading.Thread(target=beat, daemon=True).start()
beat_thread = threading.Thread(target=beat, daemon=True)
beat_thread.start()
else:
beat_thread = None

SELECT_TIMEOUT = 5
while True:
if select.select([conn], [], [], SELECT_TIMEOUT) == ([], [], []):
# timeout
pass
else:
conn.poll()
try:
last = conn.notifies.pop()
except IndexError:
# Not entirely sure how this happens, but it could only happen
# if `notifies` is empty, because there are no more notifications
# to process.
continue

# We don't care about payload or how many NOTIFY there were,
# we'll just run once, so drop the rest:
to_drop = conn.notifies
if to_drop:
# This happens if several messages were inserted in the same
# transaction - we get multiple items on `conn.notifies` after a
# single `conn.poll()`
logger.debug("Dropping notifications %r", to_drop)
conn.notifies.clear()

if notify_q.empty():
logger.debug("Putting %r on queue", last)
notify_q.put(last)
def signal_handler(signal, frame):
logger.debug("Received SIGINT, shutting down")
sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)

if is_psycopg3_available:
gen = conn.notifies()
for notify in gen:
add_item_to_queue(notify)
else:
SELECT_TIMEOUT = 5
while True:
if select.select([conn], [], [], SELECT_TIMEOUT) == ([], [], []):
# timeout
pass
else:
# notify_q is not empty.

# Our worker thread always processes all Messages. If it still
# has an item on the queue, it will process all remaining
# messages next time it runs, so there is no point adding
# another item to the non-empty queue - this will just cause
# `send_all()` to run pointlessly.

# This could be important for efficiency: if 100 records are
# inserted into the Message table at the same time, this process
# will get NOTIFY sent 100 times (unless they were all part of
# the same transaction). The first `send_all()` command will
# deal with them all (or a large fraction of them, depending on
# timing). We don't want `send_all()` to thrash away doing
# nothing another 99 times afterwards.
logger.debug("Discarding item %r as work queue is not empty", last)
conn.poll()
try:
last = conn.notifies.pop()
except IndexError:
# Not entirely sure how this happens, but it could only happen
# if `notifies` is empty, because there are no more notifications
# to process.
continue

# We don't care about payload or how many NOTIFY there were,
# we'll just run once, so drop the rest:
to_drop = conn.notifies
if to_drop:
# This happens if several messages were inserted in the same
# transaction - we get multiple items on `conn.notifies` after a
# single `conn.poll()`
logger.debug("Dropping notifications %r", to_drop)
conn.notifies.clear()

# Add to queue
add_item_to_queue(last)

# Clean up:
worker_thread.join()
if beat_thread is not None:
beat_thread.join()


def use_autocommit(conn):
if is_psycopg3_available:
conn.autocommit = True
else:
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)


def install_trigger(curs, dj_conn):
message_table_name = Message._meta.db_table
curs.execute(
Expand Down Expand Up @@ -135,6 +154,29 @@ def install_trigger(curs, dj_conn):
)


def add_item_to_queue(item):
if notify_q.empty():
logger.debug("Putting %r on queue", item)
notify_q.put(item)
else:
# notify_q is not empty.

# Our worker thread always processes all Messages. If it still
# has an item on the queue, it will process all remaining
# messages next time it runs, so there is no point adding
# another item to the non-empty queue - this will just cause
# `send_all()` to run pointlessly.

# This could be important for efficiency: if 100 records are
# inserted into the Message table at the same time, this process
# will get NOTIFY sent 100 times (unless they were all part of
# the same transaction). The first `send_all()` command will
# deal with them all (or a large fraction of them, depending on
# timing). We don't want `send_all()` to thrash away doing
# nothing another 99 times afterwards.
logger.debug("Discarding item %r as work queue is not empty", item)


def worker():
while True:
item = notify_q.get()
Expand Down
Loading