-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add new notification dispatcher mechanism #975
Open
linuxbandit
wants to merge
32
commits into
stable
Choose a base branch
from
feat/add-notification-dispatcher
base: stable
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
32 commits
Select commit
Hold shift + click to select a range
78d398b
feat: add new notification dispatcher mechanism
linuxbandit 67693ff
feat: add retry if template not found
linuxbandit 5f9f437
chore: add docker setup
linuxbandit ba33f79
chore: declare queue only once
linuxbandit b0cffbe
feat: make persistent queue and message
linuxbandit d9c689d
chore: add explicit exchange as per pattern
linuxbandit ce89fb2
chore: make exchange persistent and add route key
linuxbandit 5019cb2
feat: add full templates tester, add templates
linuxbandit ff69624
chore: optimise docker image building; add volumes
linuxbandit ae97e74
chore: correct core templates
linuxbandit 4a0fddf
feat: add SU templates. Also add macros.jinja2
linuxbandit cb9c695
chore: remove comment
linuxbandit bae8aa9
chore: add all templates for SU, working
linuxbandit 415f653
chore: add events templates
linuxbandit a7b9b1f
chore: add stuff for statutory
linuxbandit 74ee32e
chore: add last templates
linuxbandit ef03dd6
chore: add checker for thorough testing
linuxbandit de2f6fc
docs: add dockerisation
linuxbandit 1d63fda
chore: forgot another update
linuxbandit c435822
chore: add quickstart
linuxbandit f35b247
fix: correct make test
linuxbandit 26a277b
chore: add extra checks for errors during render
linuxbandit 262580b
feat: add requeue with a delay
linuxbandit a174fa9
chore: correct template, add comments
linuxbandit c0da07b
feat: requeue with dynamic delay
linuxbandit bdf6d32
chore: make USER non-root
linuxbandit fcc5950
chore: differentiate between environments
linuxbandit 7a90520
chore: add everything into a main to be modular
linuxbandit 526bcd8
chore: add loglevels
linuxbandit c405a67
chore: increase clarity of incremental retry block
linuxbandit 083b1ad
feat: add notification system
linuxbandit 89e4f47
chore: apply suggestions from code review for templates
linuxbandit File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
|
||
default: build-dev | ||
|
||
build-dev: | ||
BASE_URL=appserver.vgr SUBDOMAIN_RABBITMQ=rabbit. SUBDOMAIN_MAILHOG=mailhog. PATH_DISPATCHER=./ COMPOSE_PROJECT_NAME=myaegee docker-compose -f ./docker/docker-compose.yml -f ./docker/docker-compose.dev.yml up -d | ||
|
||
build: | ||
BASE_URL=appserver.vgr SUBDOMAIN_RABBITMQ=rabbit. COMPOSE_PROJECT_NAME=myaegee docker-compose -f ./docker/docker-compose.yml up -d | ||
|
||
down: | ||
COMPOSE_PROJECT_NAME=myaegee docker-compose -f ./docker/docker-compose.yml -f ./docker/docker-compose.dev.yml down | ||
|
||
test: build-dev | ||
@sleep 5; sed -i "s/'172.18.0.X'/$$(docker inspect myaegee_rabbit_1 | jq .[0].NetworkSettings.Networks.OMS.IPAddress)/" helpers/send.py ; cd helpers && python send.py | ||
|
||
rabbit: | ||
@DOCKER_BUILDKIT=0 docker build -t aegee/rabbit:latest -f docker/Dockerfile.rabbit . && docker push aegee/rabbit |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
# Dispatcher | ||
|
||
Polls rabbitMQ and takes action (sends mail). | ||
|
||
## How to run it | ||
### Pre-requisites | ||
In the Vagrant's appserver VM, the packages are installed globally: | ||
1. faker | ||
1. jinja2 | ||
1. pika | ||
|
||
Otherwise use poetry to install dependencies and launch the virtual environment. | ||
|
||
### Launching it and testing it | ||
In a console, run `python3 dispatcher/main.py`. | ||
In another console, run `python3 helpers/send.py`. | ||
Control on `appserver.test:8025` the emails sent. It is also possible to control rabbit's stats on `appserver.test:8080` | ||
|
||
This method is now dockerised. Using the docker way is useful for the 'DNS' feature of docker (i.e. not hardcoding the IP address of the rabbit host) | ||
|
||
### Rationale | ||
We do not need a web service for this, only a worker. Doing it this way only means it cannot be scaled (unless precautions are taken for the ack of the message, but pika should already give this out of the box). | ||
In order to add templates, one can work on the filesystem: as the template file is read from memory at the time a message is received, there is basically a mechanism of hot-reload ready to be used. | ||
|
||
We do not need a web service, because we do not need to pilot anything. | ||
|
||
## Queues | ||
|
||
Current queues: | ||
1. email | ||
|
||
Queues envisioned: | ||
1. email | ||
1. telegram | ||
1. slack (If EBs have enabled it) | ||
1. webgui (handled by vue, NOT by this program) | ||
|
||
## TODOs and next steps | ||
rather in order: | ||
|
||
1. [x] (not on this project): run core with the email as 'inserting in the queue' instead of 'API request to mailer' | ||
1. [?] (not on this project): run core with the email as 'exchange' instead of 'inserting in the queue' | ||
1. [x] include traefik configuration to have the mailhog and rabbit on a subdomain instead of `domain:port` | ||
1. [x] When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to: we need to mark both the queue and messages as durable | ||
1. [ ] Add auto-retry (DLQ). rabbit is smart and doesn't let me process a message again unless i force it.. https://devcorner.digitalpress.blog/rabbitmq-retries-the-new-full-story/ | ||
1. [ ] add the telegram queue | ||
1. investigate the massmailer queue: a queue which picks every message, and creates a list of "bcc" to send only one email? (danger: queue needs something like batch ack..) - OR it is not feasible at all because "mass"mailer is still "personalised" mailer? | ||
|
||
1. why do we even have a `<`title`>` (which is dynamic), why not using directly the subject? (re: the body of the email) | ||
1. remove extension Jinja2 (into jinja) | ||
1. make it such that templates list is read from fs (for dynamic tests) | ||
|
||
|
||
|
||
https://www.rabbitmq.com/publishers.html#unroutable | ||
|
||
|
||
Each consumer (subscription) has an identifier called a consumer tag. It can be used to unsubscribe from messages. Consumer tags are just strings. |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,283 @@ | ||
import pika | ||
import json | ||
import smtplib | ||
from email.message import EmailMessage | ||
from jinja2 import Environment, FileSystemLoader, exceptions | ||
import os | ||
import sys | ||
import logging | ||
from notify import slack_alert | ||
|
||
""" | ||
continuously polls(*) the email queue and renders+sends the template on every acked message | ||
(*) = waits for the queue to push a message onto the app | ||
""" | ||
|
||
ONCALL_HANDLER = "@grasshopper" | ||
|
||
def connect_to_smtp(): | ||
global smtpObj | ||
|
||
EMAIL_HOST='mailhog' | ||
EMAIL_PORT=1025 | ||
EMAIL_ADDRESS=None | ||
EMAIL_PASSWORD=None | ||
|
||
if env == 'production': | ||
EMAIL_HOST= os.environ.get("EMAIL_HOST") | ||
EMAIL_PORT= os.environ.get("EMAIL_PORT") | ||
EMAIL_ADDRESS= os.environ.get("EMAIL_ADDRESS") | ||
EMAIL_PASSWORD= os.environ.get("EMAIL_PASSWORD") | ||
|
||
try: | ||
smtpObj = smtplib.SMTP( EMAIL_HOST, EMAIL_PORT ) | ||
|
||
if env == 'production': | ||
# we have to upgrade the connection and login | ||
# (At least with ethereal.email.. didn't try with gmail!) #TODO | ||
smtpObj.starttls() | ||
smtpObj.login(EMAIL_ADDRESS, EMAIL_PASSWORD) | ||
logging.info(" -> Connected") | ||
except smtplib.SMTPConnectError: | ||
logging.error("Could not connect to the SMTP server.") | ||
except smtplib.SMTPAuthenticationError: | ||
logging.error("Failed to authenticate with given credentials.") | ||
except Exception as e: | ||
logging.error(f"Could not connect to SMTP server for generic reason: {e}") | ||
|
||
def requeue_wait(ch, method, properties, body, reason): | ||
REQUEUE_DELAY_DURATIONS = [ | ||
5 * 60000, # 5 mins | ||
50 * 60000, # 50 mins | ||
5*60 * 60000, # 5 hrs | ||
5*60*10 * 60000, # 50 hrs | ||
5*60*20 * 60000, # 100 hrs | ||
] | ||
|
||
current_delay = properties.headers.get("x-delay") if properties.headers else 0 | ||
try: | ||
index = REQUEUE_DELAY_DURATIONS.index(int(current_delay)) | ||
except ValueError: | ||
index = -1 | ||
|
||
next_index = index + 1 | ||
|
||
if next_index >= len(REQUEUE_DELAY_DURATIONS): | ||
logging.warning('Max retry time hit, dropping message') | ||
slack_alert(f"Time over, a message was dropped ({reason})", submessage = ":poop:") | ||
ch.basic_ack(delivery_tag=method.delivery_tag) | ||
return | ||
|
||
wait = REQUEUE_DELAY_DURATIONS[next_index] | ||
retry_message = f'Retry attempt {next_index + 1}/{len(REQUEUE_DELAY_DURATIONS)} will happen in {int(wait/1000)} sec' | ||
logging.info(retry_message) | ||
last_chance = '' | ||
if next_index + 1 == len(REQUEUE_DELAY_DURATIONS): | ||
last_chance = f'-- LAST ATTEMPT TO FIX: within {int(wait/1000)} sec' + f' {ONCALL_HANDLER}' | ||
logging.error(last_chance) | ||
slack_alert(f"A template is missing! ({reason})", | ||
submessage = retry_message + " " + last_chance | ||
) | ||
|
||
headers = { | ||
'reason': reason, | ||
'x-delay': wait, | ||
} | ||
prop = pika.BasicProperties( | ||
headers=headers, | ||
delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE, | ||
) | ||
channel.basic_publish(exchange='wait_exchange', | ||
routing_key='wait', | ||
body=body, | ||
properties=prop) #NOTE it completely ignores the previous properties (and it's fine) | ||
ch.basic_ack(delivery_tag=method.delivery_tag) | ||
|
||
def send_email(ch, method, properties, body): | ||
""" | ||
Callback for the NORMAL MESSAGE | ||
Output: send an email | ||
OR | ||
Output: Wait-exchange | ||
""" | ||
msg = json.loads(body) | ||
|
||
try: | ||
template = tpl_environment.get_template(f"{msg['template']}.jinja2") | ||
except exceptions.TemplateNotFound: | ||
logging.error(f"Template {msg['template']}.jinja2 not found") | ||
# NOTE: this is a requeuable message | ||
requeue_wait(ch, method, properties, body, reason=f"template_not_found-{msg['template']}") | ||
return | ||
|
||
try: | ||
rendered = template.render(msg['parameters'], altro=msg['subject']) | ||
except exceptions.UndefinedError as e: | ||
logging.error(f"Error in rendering: some parameter is undefined (error: {e}; message: {msg})") | ||
# NOTE: this is a NON-requeuable message | ||
requeue_wait(ch, method, properties, body, reason="parameter_undefined") | ||
return | ||
except exceptions.TemplateNotFound: | ||
logging.error(f"A sub-template in {msg['template']}.jinja2 was not found") | ||
# NOTE: this is a requeuable message | ||
requeue_wait(ch, method, properties, body, reason=f"subtemplate_not_found-{msg['template']}") | ||
return | ||
|
||
try: | ||
email = EmailMessage() | ||
email.set_content(rendered, subtype='html') | ||
email['From'] = msg['from'] | ||
email['Reply-To'] = msg['reply_to'] | ||
email['To'] = msg['to'] | ||
email['Subject'] = msg['subject'] | ||
smtpObj.send_message(email) | ||
ch.basic_ack(delivery_tag = method.delivery_tag) | ||
except smtplib.SMTPServerDisconnected: | ||
logging.error("Server unexpectedly disconnected. Attempting to reconnect") | ||
connect_to_smtp() | ||
except smtplib.SMTPResponseException as e: | ||
logging.error(f"SMTP error occurred: {e.smtp_code} - {e.smtp_error}") | ||
except Exception as e: | ||
logging.error(f"An unexpected error occurred: {e}") | ||
|
||
def process_dead_letter_messages(ch, method, properties, body): | ||
""" | ||
Callback for the ERROR MESSAGE | ||
Output: none yet. I don't expect for messages to fall here, I keep the DLQ for safety | ||
|
||
@see https://stackoverflow.com/a/58500336 | ||
"The way to do this is not to use NACK at all but to generate and return a 'new' message | ||
(which is simply the current message you are handling, but adding new headers to it). | ||
It appears that a NACK is basically doing this anyway according to the AMQP spec." | ||
""" | ||
REQUEUE_DELAY_DURATIONS = [ | ||
5 * 60000, # 5 mins | ||
50 * 60000, # 50 mins | ||
5*60 * 60000, # 5 hrs | ||
5*60*10 * 60000, # 50 hrs | ||
5*60*20 * 60000, # 100 hrs | ||
] #TODO: why is this here again? | ||
wait_for = REQUEUE_DELAY_DURATIONS[-1] | ||
|
||
logging.error("For some reason there's the DLQ handler that was triggered!") | ||
slack_alert("For some reason there's the DLQ handler that was triggered!") | ||
|
||
headers = { | ||
'x-delay': wait_for, | ||
} | ||
fullheaders = {**properties.headers, **headers} | ||
prop = pika.BasicProperties( | ||
headers=fullheaders, | ||
delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE, | ||
) | ||
channel.basic_publish(exchange='wait_exchange', | ||
routing_key='wait', | ||
body=body, | ||
properties=prop) | ||
|
||
ch.basic_ack(delivery_tag = method.delivery_tag) | ||
|
||
def process_requeue(ch, method, properties, body): | ||
""" | ||
Callback for the WAITING MESSAGES | ||
Output: Requeue on normal exchange (if error about missing template) | ||
OR | ||
Output: Remove (if unfixable error) | ||
""" | ||
|
||
if (properties.headers["reason"] == 'parameter_undefined'): | ||
logging.warning('Impossible to fix error, dropping message') | ||
#TODO output something/notify to leave a trail for better debugging on what was missing | ||
ch.basic_ack(delivery_tag = method.delivery_tag) | ||
return | ||
|
||
channel.basic_publish(exchange='eml', | ||
routing_key='mail', | ||
body=body, | ||
properties=pika.BasicProperties( | ||
headers = properties.headers, # propagation to avoid endless loop | ||
delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE, | ||
)) | ||
ch.basic_ack(delivery_tag = method.delivery_tag) | ||
|
||
def main(): | ||
global smtpObj | ||
global tpl_environment | ||
global env | ||
global channel | ||
|
||
# Configure logging for this app, and remove "info" of pika | ||
logging.basicConfig(level=logging.INFO) | ||
logging.getLogger('pika').setLevel(logging.WARNING) | ||
|
||
tpl_environment = Environment(loader=FileSystemLoader("../templates/")) | ||
env = os.environ.get("ENV") or 'development' | ||
|
||
RABBIT_HOST='rabbit' | ||
connection = pika.BlockingConnection(pika.ConnectionParameters(RABBIT_HOST)) | ||
channel = connection.channel() | ||
|
||
channel.exchange_declare(exchange='eml', | ||
exchange_type='direct', | ||
durable=True) | ||
channel.queue_declare(queue='email', | ||
arguments={ | ||
'x-dead-letter-exchange': "dead_letter_exchange", | ||
'x-dead-letter-routing-key': "dead_letter_routing_key", | ||
'x-death-header': True, | ||
}, | ||
durable=True) | ||
channel.queue_bind(exchange='eml', | ||
queue='email', | ||
routing_key='mail') | ||
|
||
#channel.basic_qos(prefetch_count=1) #TODO: notice that with this enabled, an error processing a message will BLOCK the others from being processed | ||
|
||
channel.exchange_declare(exchange="dead_letter_exchange", | ||
exchange_type='direct', | ||
durable=True) | ||
channel.queue_declare(queue='error_queue', | ||
durable=True) | ||
channel.queue_bind(exchange='dead_letter_exchange', | ||
queue='error_queue', | ||
routing_key='dead_letter_routing_key') | ||
|
||
channel.exchange_declare(exchange="wait_exchange", | ||
exchange_type='x-delayed-message', | ||
durable=True, | ||
arguments={"x-delayed-type": "direct"} | ||
) | ||
channel.queue_declare(queue='requeue_queue', | ||
durable=True) | ||
channel.queue_bind(exchange='wait_exchange', | ||
queue='requeue_queue', | ||
routing_key='wait') | ||
|
||
channel.basic_consume(queue='email', | ||
auto_ack=False, | ||
on_message_callback=send_email) | ||
|
||
channel.basic_consume(queue='error_queue', | ||
auto_ack=False, | ||
on_message_callback=process_dead_letter_messages) | ||
|
||
channel.basic_consume(queue='requeue_queue', | ||
auto_ack=False, | ||
on_message_callback=process_requeue) | ||
|
||
logging.info(' [*] Connecting to smtp') | ||
connect_to_smtp() | ||
logging.info(' [*] Waiting for messages. To exit press CTRL+C') | ||
channel.start_consuming() | ||
|
||
if __name__ == '__main__': | ||
try: | ||
main() | ||
except KeyboardInterrupt: | ||
logging.error('Interrupted') | ||
smtpObj.quit() | ||
try: | ||
sys.exit(0) | ||
except SystemExit: | ||
os._exit(0) | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to have users interact with the Telegram queue initially already or is it first only for us to get monitoring alerts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in which way interact?
yes at first i was thinking of using it for us, to see how it scales in small numbers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interact was not the correct word. I just meant receiving messages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say at the beginning is for us