Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Idempotence (GSI-626) #12

Merged
merged 18 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 13 additions & 15 deletions src/ns/core/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,17 @@ def _create_notification_record(
hash_sum = sha256(notification.model_dump_json().encode("utf-8")).hexdigest()
return models.NotificationRecord(hash_sum=hash_sum, sent=False)

async def _ensure_not_sent(self, *, hash_sum: str) -> bool:
"""Ensures that the notification has not been sent already.
async def _check_if_sent(self, *, hash_sum: str) -> bool:
TheByronHimes marked this conversation as resolved.
Show resolved Hide resolved
"""Check whether the notification has been sent already.

Returns:
- `True` if the notification **has not** been sent yet.
- `False` if the notification **has** already been sent.
- `False` if the notification **has not** been sent yet.
- `True` if the notification **has** already been sent.
"""
with suppress(ResourceNotFoundError):
record = await self._notification_record_dao.get_by_id(id_=hash_sum)
if record.sent:
return False
return True
return record.sent
return False

async def _register_new_notification(
self, *, notification_record: models.NotificationRecord
Expand All @@ -100,21 +99,20 @@ async def send_notification(self, *, notification: event_schemas.Notification):
notification=notification
)

# Verify that the notification has not been sent already, abort if so
if not await self._ensure_not_sent(hash_sum=notification_record.hash_sum):
# Abort if the notification has been sent already
if await self._check_if_sent(hash_sum=notification_record.hash_sum):
log.info("Notification already sent, skipping.")
return

# Add the notification to the database (with sent=False)
await self._register_new_notification(notification_record=notification_record)

if len(notification.recipient_email) > 0:
message = self._construct_email(notification=notification)
self._smtp_client.send_email_message(message)
message = self._construct_email(notification=notification)
self._smtp_client.send_email_message(message)

# update the notification record to show that the notification has been sent.
notification_record.sent = True
await self._notification_record_dao.update(dto=notification_record)
# update the notification record to show that the notification has been sent.
notification_record.sent = True
await self._notification_record_dao.update(dto=notification_record)

def _construct_email(
self, *, notification: event_schemas.Notification
Expand Down
79 changes: 40 additions & 39 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,13 @@

"""Test basic event consumption"""
from hashlib import sha256
from typing import cast

import pytest

from ns.adapters.outbound.smtp_client import SmtpClient
from ns.core.models import NotificationRecord
from tests.conftest import (
joint_fixture, # noqa: F401
kafka_fixture, # noqa: F401
mongodb_fixture, # noqa: F401
)
from ns.core.notifier import Notifier
from tests.fixtures.joint import JointFixture
from tests.fixtures.server import DummyServer
from tests.fixtures.utils import make_notification
Expand All @@ -45,20 +42,23 @@
)
@pytest.mark.asyncio(scope="session")
async def test_email_construction(
joint_fixture: JointFixture, # noqa: F811
joint_fixture: JointFixture,
notification_details,
):
"""Verify that the email is getting constructed properly from the template."""
# Cast notifier type
joint_fixture.notifier = cast(Notifier, joint_fixture.notifier)

notification = make_notification(notification_details)

msg = joint_fixture.notifier._construct_email(notification=notification) # type: ignore
msg = joint_fixture.notifier._construct_email(notification=notification)

assert msg is not None

plaintext_body = msg.get_body(preferencelist="plain")
assert plaintext_body is not None

plaintext_content = plaintext_body.get_content()
plaintext_content = plaintext_body.get_content() # type: ignore
expected_plaintext = (
"Dear Yolanda Martinez,\n\nWhere are you, where are you, Yolanda?\n"
+ "\nWarm regards,\n\nThe GHGA Team"
Expand All @@ -68,7 +68,7 @@ async def test_email_construction(
html_body = msg.get_body(preferencelist="html")
assert html_body is not None

html_content = html_body.get_content()
html_content = html_body.get_content() # type: ignore
assert html_content is not None

expected_html = (
Expand All @@ -81,16 +81,19 @@ async def test_email_construction(


@pytest.mark.asyncio(scope="session")
async def test_failed_authentication(joint_fixture: JointFixture): # noqa: F811
async def test_failed_authentication(joint_fixture: JointFixture):
"""Change login credentials so authentication fails."""
# Cast notifier type
joint_fixture.notifier = cast(Notifier, joint_fixture.notifier)

server = DummyServer(config=joint_fixture.config)

# change the login credentials so that the authentication fails
server.login = "[email protected]"
server.password = "notCorrect"
notification = make_notification(sample_notification)

expected_email = joint_fixture.notifier._construct_email( # type: ignore
expected_email = joint_fixture.notifier._construct_email(
notification=notification,
)

Expand All @@ -100,19 +103,19 @@ async def test_failed_authentication(joint_fixture: JointFixture): # noqa: F811
await joint_fixture.notifier.send_notification(notification=notification)

# verify that the email is in the database but not marked as sent
expected_record = joint_fixture.notifier._create_notification_record( # type: ignore
expected_record = joint_fixture.notifier._create_notification_record(
notification=notification
)

record_in_db = await joint_fixture.notifier._notification_record_dao.get_by_id( # type: ignore
record_in_db = await joint_fixture.notifier._notification_record_dao.get_by_id(
id_=expected_record.hash_sum
)

assert not record_in_db.sent


@pytest.mark.asyncio(scope="session")
async def test_consume_thru_send(joint_fixture: JointFixture): # noqa: F811
async def test_consume_thru_send(joint_fixture: JointFixture):
"""Verify that the event is correctly translated into a basic email object"""
await joint_fixture.kafka.publish_event(
payload={
Expand All @@ -135,10 +138,13 @@ async def test_consume_thru_send(joint_fixture: JointFixture): # noqa: F811


@pytest.mark.asyncio(scope="session")
async def test_helper_functions(joint_fixture: JointFixture): # noqa: F811
"""Unit test for the _ensure_not_sent function, _create_notification_record,
async def test_helper_functions(joint_fixture: JointFixture):
"""Unit test for the _check_if_sent function, _create_notification_record,
and _register_new_notification function.
"""
# Cast notifier type
joint_fixture.notifier = cast(Notifier, joint_fixture.notifier)

# first, create a notification
notification = make_notification(sample_notification)

Expand All @@ -149,50 +155,49 @@ async def test_helper_functions(joint_fixture: JointFixture): # noqa: F811
)

# Now create the record using the notifier's function and compare
actual_record = joint_fixture.notifier._create_notification_record( # type: ignore
actual_record = joint_fixture.notifier._create_notification_record(
notification=notification
)

assert actual_record.model_dump() == expected_record.model_dump()

# Now check the _ensure_not_sent function before the record has been inserted
assert await joint_fixture.notifier._ensure_not_sent( # type: ignore
# Now check the _check_if_sent function before the record has been inserted
assert not await joint_fixture.notifier._check_if_sent(
hash_sum=actual_record.hash_sum
)

# register the notification
await joint_fixture.notifier._register_new_notification( # type: ignore
await joint_fixture.notifier._register_new_notification(
notification_record=actual_record
)

# Verify the record is in the database
record_in_db = await joint_fixture.notifier._notification_record_dao.get_by_id( # type: ignore
record_in_db = await joint_fixture.notifier._notification_record_dao.get_by_id(
id_=actual_record.hash_sum
)

# Extra sanity check to make sure they're the same
assert record_in_db.model_dump() == actual_record.model_dump()

# Record still has not been sent, but now it's in the database. Do another check
assert await joint_fixture.notifier._ensure_not_sent( # type: ignore
assert not await joint_fixture.notifier._check_if_sent(
hash_sum=actual_record.hash_sum
)

# Now mark the record as sent
actual_record.sent = True
await joint_fixture.notifier._notification_record_dao.update( # type: ignore
dto=actual_record
)
await joint_fixture.notifier._notification_record_dao.update(dto=actual_record)

# Now the record has been marked as sent, so _ensure_not_sent should return False
assert not await joint_fixture.notifier._ensure_not_sent( # type: ignore
hash_sum=actual_record.hash_sum
)
# Now the record has been marked as sent, so _check_if_sent should return False
assert await joint_fixture.notifier._check_if_sent(hash_sum=actual_record.hash_sum)


@pytest.mark.asyncio(scope="session")
async def test_idempotence_and_transmission(joint_fixture: JointFixture): # noqa: F811
async def test_idempotence_and_transmission(joint_fixture: JointFixture):
"""Consume identical events and verify that only one email is sent."""
# Cast notifier type
joint_fixture.notifier = cast(Notifier, joint_fixture.notifier)

notification_event = make_notification(sample_notification)
await joint_fixture.kafka.publish_event(
payload=notification_event.model_dump(),
Expand All @@ -201,17 +206,15 @@ async def test_idempotence_and_transmission(joint_fixture: JointFixture): # noq
)

# generate the hash sum for the notification
record = joint_fixture.notifier._create_notification_record( # type: ignore
record = joint_fixture.notifier._create_notification_record(
notification=notification_event
)

# the record hasn't been sent, so this should return True
assert await joint_fixture.notifier._ensure_not_sent( # type: ignore
hash_sum=record.hash_sum
)
# the record hasn't been sent, so this should return False
assert not await joint_fixture.notifier._check_if_sent(hash_sum=record.hash_sum)

server = DummyServer(config=joint_fixture.config)
expected_email = joint_fixture.notifier._construct_email( # type: ignore
expected_email = joint_fixture.notifier._construct_email(
notification=notification_event,
)

Expand All @@ -221,9 +224,7 @@ async def test_idempotence_and_transmission(joint_fixture: JointFixture): # noq
await joint_fixture.event_subscriber.run(forever=False)

# Verify that the notification has now been marked as sent
assert not await joint_fixture.notifier._ensure_not_sent( # type: ignore
hash_sum=record.hash_sum
)
assert await joint_fixture.notifier._check_if_sent(hash_sum=record.hash_sum)

# Now publish the same event again
await joint_fixture.kafka.publish_event(
Expand Down
Loading