Skip to content

Commit

Permalink
Fix acknowledge reminder task (#5179)
Browse files Browse the repository at this point in the history
# What this PR does
- Adds 10 minutes lock for acknowledge reminder task to prevent task
duplicates, that causes posting multiple reminder messages and flooding
in Slack threads.
- Adds a new signal for acknowledge reminder task instead of using
`alert_group_action_triggered_signal` since it is used only to post
reminder message in Slack thread and it's not needed to be processed by
other representatives

## Which issue(s) this PR closes

Related to grafana/oncall-private#2953

## Checklist

- [x] Unit, integration, and e2e (if applicable) tests updated
- [x] Documentation added (or `pr:no public docs` PR label added if not
required)
- [x] Added the relevant release notes label (see labels prefixed w/
`release:`). These labels dictate how your PR will
    show up in the autogenerated release notes.
  • Loading branch information
Ferril authored Oct 16, 2024
1 parent 4667960 commit 8420cfd
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 29 deletions.
7 changes: 7 additions & 0 deletions engine/apps/alerts/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
# Signal to rerender alert group's resolution note in all connected integrations (Slack)
alert_group_update_resolution_note_signal = django.dispatch.Signal()

# Signal to post acknowledge reminder message (Slack)
post_ack_reminder_message_signal = django.dispatch.Signal()

# Currently only writes error in Slack thread while notify user. Maybe it is worth to delete it?
user_notification_action_triggered_signal = django.dispatch.Signal()

Expand All @@ -40,6 +43,10 @@
AlertGroupSlackRepresentative.on_alert_group_update_resolution_note,
)

post_ack_reminder_message_signal.connect(
AlertGroupSlackRepresentative.on_alert_group_post_acknowledge_reminder_message,
)

user_notification_action_triggered_signal.connect(
UserSlackRepresentative.on_user_action_triggered,
)
Expand Down
34 changes: 30 additions & 4 deletions engine/apps/alerts/tasks/acknowledge_reminder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,25 @@
from functools import partial

from django.conf import settings
from django.core.cache import cache
from django.db import transaction
from django.utils import timezone

from apps.alerts.signals import post_ack_reminder_message_signal
from common.custom_celery_tasks import shared_dedicated_queue_retry_task

from .send_alert_group_signal import send_alert_group_signal
from .task_logger import task_logger

MAX_RETRIES = 1 if settings.DEBUG else None
MAX_RETRIES = 1 if settings.DEBUG else 10


def is_allowed_to_send_acknowledge_reminder(alert_group_id, process_id):
lock_id = f"acknowledge-reminder-lock-{alert_group_id}"
lock_period = 60 * 10 # 10 min
# cache.add returns False if the key already exists
status = cache.add(lock_id, process_id, lock_period)
return status


@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=MAX_RETRIES)
Expand All @@ -28,6 +38,11 @@ def acknowledge_reminder_task(alert_group_pk: int, unacknowledge_process_id: str
if unacknowledge_process_id != alert_group.last_unique_unacknowledge_process_id:
return

# Don't proceed if acknowledge reminder for this alert group has already been sent recently
if not is_allowed_to_send_acknowledge_reminder(alert_group.id, unacknowledge_process_id):
task_logger.info(f"Acknowledge reminder for alert_group {alert_group_pk} has already been sent recently.")
return

organization = alert_group.channel.organization

# Get timeout values
Expand Down Expand Up @@ -55,8 +70,9 @@ def acknowledge_reminder_task(alert_group_pk: int, unacknowledge_process_id: str

# unacknowledge_timeout_task uses acknowledged_by_confirmed to check if acknowledgement reminder has been confirmed
# by the user. Setting to None here to indicate that the user has not confirmed the acknowledgement reminder
alert_group.acknowledged_by_confirmed = None
alert_group.save(update_fields=["acknowledged_by_confirmed"])
if alert_group.acknowledged_by_confirmed is not None:
alert_group.acknowledged_by_confirmed = None
alert_group.save(update_fields=["acknowledged_by_confirmed"])

if unacknowledge_timeout: # "unack in N minutes if no response" is enabled
unacknowledge_timeout_task.apply_async(
Expand All @@ -77,7 +93,7 @@ def acknowledge_reminder_task(alert_group_pk: int, unacknowledge_process_id: str
type=AlertGroupLogRecord.TYPE_ACK_REMINDER_TRIGGERED, author=alert_group.acknowledged_by_user
)
task_logger.info(f"created log record {log_record.pk}, sending signal...")
transaction.on_commit(partial(send_alert_group_signal.delay, log_record.pk))
transaction.on_commit(partial(send_post_ack_reminder_message_signal.delay, log_record.pk))


@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=MAX_RETRIES)
Expand Down Expand Up @@ -138,3 +154,13 @@ def unacknowledge_timeout_task(alert_group_pk: int, unacknowledge_process_id: st
transaction.on_commit(partial(send_alert_group_signal.delay, log_record.pk))
alert_group.unacknowledge()
alert_group.start_escalation_if_needed()


@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=MAX_RETRIES)
def send_post_ack_reminder_message_signal(log_record_id):
"""
Sends signal to post acknowledge reminder message to Slack thread.
The signal is connected to AlertGroupSlackRepresentative.
"""
task_logger.info(f"sending signal for posting ack reminder message, log record {log_record_id}")
post_ack_reminder_message_signal.send(sender=send_post_ack_reminder_message_signal, log_record=log_record_id)
57 changes: 52 additions & 5 deletions engine/apps/alerts/tests/test_acknowledge_reminder.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@

import pytest
from celery import uuid as celery_uuid
from django.core.cache import cache
from django.utils import timezone

from apps.alerts.constants import ActionSource
from apps.alerts.models import AlertGroup, AlertGroupLogRecord
from apps.alerts.tasks import acknowledge_reminder_task
from apps.alerts.tasks.acknowledge_reminder import unacknowledge_timeout_task
from apps.alerts.tasks.acknowledge_reminder import send_post_ack_reminder_message_signal, unacknowledge_timeout_task
from apps.user_management.models import Organization

TASK_ID = "TASK_ID"
Expand Down Expand Up @@ -156,8 +157,10 @@ def test_acknowledge_reminder_task_skip(

@patch.object(unacknowledge_timeout_task, "apply_async")
@patch.object(acknowledge_reminder_task, "apply_async")
@patch.object(send_post_ack_reminder_message_signal, "delay")
@pytest.mark.django_db
def test_acknowledge_reminder_task_reschedules_itself(
def test_acknowledge_reminder_task_reschedules_itself_and_sends_signal(
mock_send_post_ack_reminder_message_signal,
mock_acknowledge_reminder_task,
mock_unacknowledge_timeout_task,
ack_reminder_test_setup,
Expand All @@ -169,9 +172,6 @@ def test_acknowledge_reminder_task_reschedules_itself(
with django_capture_on_commit_callbacks(execute=True) as callbacks:
acknowledge_reminder_task(alert_group.pk, TASK_ID)

# send_alert_group_signal task is queued after commit
assert len(callbacks) == 1

mock_unacknowledge_timeout_task.assert_not_called()
mock_acknowledge_reminder_task.assert_called_once_with(
(alert_group.pk, TASK_ID),
Expand All @@ -182,6 +182,10 @@ def test_acknowledge_reminder_task_reschedules_itself(
assert log_record.type == AlertGroupLogRecord.TYPE_ACK_REMINDER_TRIGGERED
assert log_record.author == alert_group.acknowledged_by_user

# send_post_ack_reminder_message_signal task is queued after commit
assert len(callbacks) == 1
mock_send_post_ack_reminder_message_signal.assert_called_once_with(log_record.id)


@patch.object(unacknowledge_timeout_task, "apply_async")
@patch.object(acknowledge_reminder_task, "apply_async")
Expand Down Expand Up @@ -369,3 +373,46 @@ def test_ack_reminder_cancel_too_old(
mock_acknowledge_reminder_task.assert_not_called()

assert not alert_group.log_records.exists()


@pytest.mark.django_db
def test_acknowledge_reminder_skip_doubled_notification(
ack_reminder_test_setup,
django_capture_on_commit_callbacks,
caplog,
):
organization, alert_group, user = ack_reminder_test_setup(
unacknowledge_timeout=Organization.UNACKNOWLEDGE_TIMEOUT_NEVER
)
expected_log_text = f"Acknowledge reminder for alert_group {alert_group.id} has already been sent recently."

# check task lock cache doesn't exist
lock_id = f"acknowledge-reminder-lock-{alert_group.id}"
assert cache.get(lock_id) is None

with patch.object(acknowledge_reminder_task, "apply_async") as mock_acknowledge_reminder_task:
with patch.object(send_post_ack_reminder_message_signal, "delay") as mock_send_signal:
with django_capture_on_commit_callbacks(execute=True):
acknowledge_reminder_task(alert_group.pk, TASK_ID)

log_record = alert_group.log_records.get()
assert log_record.type == AlertGroupLogRecord.TYPE_ACK_REMINDER_TRIGGERED
mock_send_signal.assert_called_once_with(log_record.id)
mock_acknowledge_reminder_task.assert_called_once()

# check task lock cache exists
assert cache.get(lock_id) == TASK_ID

assert expected_log_text not in caplog.text

# acknowledge_reminder_task doesn't proceed the second time if it has been called recently
with patch.object(acknowledge_reminder_task, "apply_async") as mock_acknowledge_reminder_task:
with patch.object(send_post_ack_reminder_message_signal, "delay") as mock_send_signal:
with django_capture_on_commit_callbacks(execute=True):
acknowledge_reminder_task(alert_group.pk, TASK_ID)

assert alert_group.log_records.count() == 1
mock_send_signal.assert_not_called()
mock_acknowledge_reminder_task.assert_not_called()

assert expected_log_text in caplog.text
67 changes: 48 additions & 19 deletions engine/apps/slack/representatives/alert_group_representative.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import typing

from celery.utils.log import get_task_logger
from django.conf import settings
Expand All @@ -9,6 +10,9 @@
from apps.slack.scenarios.scenario_step import ScenarioStep
from common.custom_celery_tasks import shared_dedicated_queue_retry_task

if typing.TYPE_CHECKING:
from apps.alerts.models import AlertGroupLogRecord

logger = get_task_logger(__name__)
logger.setLevel(logging.DEBUG)

Expand Down Expand Up @@ -130,29 +134,17 @@ def on_create_alert(cls, **kwargs):
@classmethod
def on_alert_group_action_triggered(cls, **kwargs):
logger.debug("Received alert_group_action_triggered signal in SLACK representative")
from apps.alerts.models import AlertGroupLogRecord

log_record = kwargs["log_record"]
force_sync = kwargs.get("force_sync", False)
if isinstance(log_record, AlertGroupLogRecord):
log_record_id = log_record.pk
else:
log_record_id = log_record

try:
log_record = AlertGroupLogRecord.objects.get(pk=log_record_id)
except AlertGroupLogRecord.DoesNotExist:
logger.warning(
f"on_alert_group_action_triggered: log record {log_record_id} never created or has been deleted"
)
log_record = cls.get_log_record_from_kwargs(**kwargs)
if not log_record:
return
force_sync = kwargs.get("force_sync", False)

if log_record.action_source == ActionSource.SLACK or force_sync:
logger.debug(f"SLACK on_alert_group_action_triggered: sync {log_record_id} {force_sync}")
on_alert_group_action_triggered_async(log_record_id)
logger.debug(f"SLACK on_alert_group_action_triggered: sync {log_record.id} {force_sync}")
on_alert_group_action_triggered_async(log_record.id)
else:
logger.debug(f"SLACK on_alert_group_action_triggered: async {log_record_id} {force_sync}")
on_alert_group_action_triggered_async.apply_async((log_record_id,))
logger.debug(f"SLACK on_alert_group_action_triggered: async {log_record.id} {force_sync}")
on_alert_group_action_triggered_async.apply_async((log_record.id,))

@classmethod
def on_alert_group_update_resolution_note(cls, **kwargs):
Expand All @@ -167,6 +159,26 @@ def on_alert_group_update_resolution_note(cls, **kwargs):
step = UpdateResolutionNoteStep(organization.slack_team_identity, organization)
step.process_signal(alert_group, resolution_note)

@classmethod
def on_alert_group_post_acknowledge_reminder_message(cls, **kwargs):
log_record = cls.get_log_record_from_kwargs(**kwargs)
if not log_record:
return
slack_team_identity = log_record.alert_group.channel.organization.slack_team_identity
alert_group = log_record.alert_group
logger.debug(
f"Received post_ack_reminder_message_signal signal in SLACK representative for alert_group {alert_group.id}"
)
if not (log_record.alert_group.slack_message and slack_team_identity):
logger.debug(
f"SLACK representative is NOT applicable for alert_group {alert_group.id}, log record {log_record.id}"
)
return

AcknowledgeConfirmationStep = ScenarioStep.get_step("distribute_alerts", "AcknowledgeConfirmationStep")
step = AcknowledgeConfirmationStep(slack_team_identity)
step.process_signal(log_record)

def on_acknowledge(self):
AcknowledgeGroupStep = ScenarioStep.get_step("distribute_alerts", "AcknowledgeGroupStep")
step = AcknowledgeGroupStep(self.log_record.alert_group.channel.organization.slack_team_identity)
Expand Down Expand Up @@ -229,6 +241,7 @@ def on_auto_un_acknowledge(self):
self.on_un_acknowledge()

def on_ack_reminder_triggered(self):
# deprecated, remove this handler after release
AcknowledgeConfirmationStep = ScenarioStep.get_step("distribute_alerts", "AcknowledgeConfirmationStep")
step = AcknowledgeConfirmationStep(self.log_record.alert_group.channel.organization.slack_team_identity)
step.process_signal(self.log_record)
Expand Down Expand Up @@ -258,3 +271,19 @@ def get_handler_name(self):
@classmethod
def on_handler_not_found(cls):
pass

@classmethod
def get_log_record_from_kwargs(cls, **kwargs) -> typing.Optional["AlertGroupLogRecord"]:
from apps.alerts.models import AlertGroupLogRecord

log_record = None
log_record_id = kwargs["log_record"]

if not isinstance(log_record_id, AlertGroupLogRecord):
try:
log_record = AlertGroupLogRecord.objects.get(pk=log_record_id)
except AlertGroupLogRecord.DoesNotExist:
logger.warning(f"log record {log_record_id} never created or has been deleted")
else:
log_record = log_record_id
return log_record
1 change: 0 additions & 1 deletion engine/apps/telegram/alert_group_representative.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ def get_handlers_map():
AlertGroupLogRecord.TYPE_AUTO_UN_ACK: "alert_group_action",
AlertGroupLogRecord.TYPE_RESOLVED: "alert_group_action",
AlertGroupLogRecord.TYPE_UN_RESOLVED: "alert_group_action",
AlertGroupLogRecord.TYPE_ACK_REMINDER_TRIGGERED: "alert_group_action",
AlertGroupLogRecord.TYPE_SILENCE: "alert_group_action",
AlertGroupLogRecord.TYPE_UN_SILENCE: "alert_group_action",
AlertGroupLogRecord.TYPE_ATTACHED: "alert_group_action",
Expand Down
1 change: 1 addition & 0 deletions engine/settings/celery_task_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
# CRITICAL
"apps.alerts.tasks.acknowledge_reminder.acknowledge_reminder_task": {"queue": "critical"},
"apps.alerts.tasks.acknowledge_reminder.unacknowledge_timeout_task": {"queue": "critical"},
"apps.alerts.tasks.acknowledge_reminder.send_post_ack_reminder_message_signal": {"queue": "critical"},
"apps.alerts.tasks.declare_incident.declare_incident": {"queue": "critical"},
"apps.alerts.tasks.distribute_alert.send_alert_create_signal": {"queue": "critical"},
"apps.alerts.tasks.escalate_alert_group.escalate_alert_group": {"queue": "critical"},
Expand Down

0 comments on commit 8420cfd

Please sign in to comment.