From e314b714d44d250d345f6d932e91149704080928 Mon Sep 17 00:00:00 2001 From: Matias Bordese Date: Tue, 15 Oct 2024 09:02:44 -0300 Subject: [PATCH 1/3] Reschedule rate limited telegram task instead of retry --- engine/apps/alerts/tasks/notify_user.py | 6 ++++-- engine/apps/alerts/tests/test_notify_user.py | 4 +++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/engine/apps/alerts/tasks/notify_user.py b/engine/apps/alerts/tasks/notify_user.py index 7ad2bb498..7d7e0dbf9 100644 --- a/engine/apps/alerts/tasks/notify_user.py +++ b/engine/apps/alerts/tasks/notify_user.py @@ -445,10 +445,12 @@ def perform_notification(log_record_pk, use_default_notification_policy_fallback try: TelegramToUserConnector.notify_user(user, alert_group, notification_policy) except RetryAfter as e: + task_logger.exception(f"Telegram API rate limit exceeded. Retry after {e.retry_after} seconds.") countdown = getattr(e, "retry_after", 3) - raise perform_notification.retry( - (log_record_pk, use_default_notification_policy_fallback), countdown=countdown, exc=e + perform_notification.apply_async( + (log_record_pk, use_default_notification_policy_fallback), countdown=countdown ) + return elif notification_channel == UserNotificationPolicy.NotificationChannel.SLACK: # TODO: refactor checking the possibility of sending a notification in slack diff --git a/engine/apps/alerts/tests/test_notify_user.py b/engine/apps/alerts/tests/test_notify_user.py index 7124f957d..114c339a5 100644 --- a/engine/apps/alerts/tests/test_notify_user.py +++ b/engine/apps/alerts/tests/test_notify_user.py @@ -360,10 +360,12 @@ def test_perform_notification_telegram_retryafter_error( countdown = 15 exc = RetryAfter(countdown) with patch.object(TelegramToUserConnector, "notify_user", side_effect=exc) as mock_notify_user: - with pytest.raises(RetryAfter): + with patch.object(perform_notification, "apply_async") as mock_apply_async: perform_notification(log_record.pk, False) mock_notify_user.assert_called_once_with(user, alert_group, user_notification_policy) + # task is rescheduled using the countdown value from the exception + mock_apply_async.assert_called_once_with((log_record.pk, False), countdown=countdown) assert alert_group.personal_log_records.last() == log_record From f291ff5f9a3b26fd8bf9df4c91fc2b9de198a68d Mon Sep 17 00:00:00 2001 From: Matias Bordese Date: Tue, 15 Oct 2024 09:53:31 -0300 Subject: [PATCH 2/3] Set a retry timeout for telegram rate limit errors --- engine/apps/alerts/tasks/notify_user.py | 33 +++++++++++++++----- engine/apps/alerts/tests/test_notify_user.py | 16 ++++++++++ 2 files changed, 42 insertions(+), 7 deletions(-) diff --git a/engine/apps/alerts/tasks/notify_user.py b/engine/apps/alerts/tasks/notify_user.py index 7d7e0dbf9..e7546a7ab 100644 --- a/engine/apps/alerts/tasks/notify_user.py +++ b/engine/apps/alerts/tasks/notify_user.py @@ -26,6 +26,9 @@ from apps.user_management.models import User +RETRY_TIMEOUT_HOURS = 1 + + def schedule_send_bundled_notification_task( user_notification_bundle: "UserNotificationBundle", alert_group: "AlertGroup" ): @@ -446,10 +449,27 @@ def perform_notification(log_record_pk, use_default_notification_policy_fallback TelegramToUserConnector.notify_user(user, alert_group, notification_policy) except RetryAfter as e: task_logger.exception(f"Telegram API rate limit exceeded. Retry after {e.retry_after} seconds.") - countdown = getattr(e, "retry_after", 3) - perform_notification.apply_async( - (log_record_pk, use_default_notification_policy_fallback), countdown=countdown - ) + # check how much time has passed since log record was created + # to prevent eternal loop of restarting perform_notification task + if timezone.now() < log_record.created_at + timezone.timedelta(hours=RETRY_TIMEOUT_HOURS): + countdown = getattr(e, "retry_after", 3) + perform_notification.apply_async( + (log_record_pk, use_default_notification_policy_fallback), countdown=countdown + ) + else: + task_logger.debug( + f"telegram notification for alert_group {alert_group.pk} failed because of rate limit" + ) + UserNotificationPolicyLogRecord( + author=user, + type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED, + notification_policy=notification_policy, + reason="Telegram rate limit exceeded", + alert_group=alert_group, + notification_step=notification_policy.step, + notification_channel=notification_channel, + notification_error_code=UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_POSTING_TO_TELEGRAM_IS_DISABLED, + ).save() return elif notification_channel == UserNotificationPolicy.NotificationChannel.SLACK: @@ -518,13 +538,12 @@ def perform_notification(log_record_pk, use_default_notification_policy_fallback ).save() return - retry_timeout_hours = 1 if alert_group.slack_message: alert_group.slack_message.send_slack_notification(user, alert_group, notification_policy) task_logger.debug(f"Finished send_slack_notification for alert_group {alert_group.pk}.") # check how much time has passed since log record was created # to prevent eternal loop of restarting perform_notification task - elif timezone.now() < log_record.created_at + timezone.timedelta(hours=retry_timeout_hours): + elif timezone.now() < log_record.created_at + timezone.timedelta(hours=RETRY_TIMEOUT_HOURS): task_logger.debug( f"send_slack_notification for alert_group {alert_group.pk} failed because slack message " f"does not exist. Restarting perform_notification." @@ -536,7 +555,7 @@ def perform_notification(log_record_pk, use_default_notification_policy_fallback else: task_logger.debug( f"send_slack_notification for alert_group {alert_group.pk} failed because slack message " - f"after {retry_timeout_hours} hours still does not exist" + f"after {RETRY_TIMEOUT_HOURS} hours still does not exist" ) UserNotificationPolicyLogRecord( author=user, diff --git a/engine/apps/alerts/tests/test_notify_user.py b/engine/apps/alerts/tests/test_notify_user.py index 114c339a5..dd8288be9 100644 --- a/engine/apps/alerts/tests/test_notify_user.py +++ b/engine/apps/alerts/tests/test_notify_user.py @@ -368,6 +368,22 @@ def test_perform_notification_telegram_retryafter_error( mock_apply_async.assert_called_once_with((log_record.pk, False), countdown=countdown) assert alert_group.personal_log_records.last() == log_record + # but if the log was too old, skip and create a failed log record + log_record.created_at = timezone.now() - timezone.timedelta(minutes=90) + log_record.save() + with patch.object(TelegramToUserConnector, "notify_user", side_effect=exc) as mock_notify_user: + with patch.object(perform_notification, "apply_async") as mock_apply_async: + perform_notification(log_record.pk, False) + mock_notify_user.assert_called_once_with(user, alert_group, user_notification_policy) + assert not mock_apply_async.called + last_log_record = UserNotificationPolicyLogRecord.objects.last() + assert last_log_record.type == UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED + assert last_log_record.reason == "Telegram rate limit exceeded" + assert ( + last_log_record.notification_error_code + == UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_POSTING_TO_TELEGRAM_IS_DISABLED + ) + @patch("apps.base.models.UserNotificationPolicy.get_default_fallback_policy") @patch("apps.base.tests.messaging_backend.TestOnlyBackend.notify_user") From 6647f1a612f5082791797e2d1a033a7280172939 Mon Sep 17 00:00:00 2001 From: Matias Bordese Date: Tue, 15 Oct 2024 10:25:25 -0300 Subject: [PATCH 3/3] Add specific error code for telegram ratelimit --- engine/apps/alerts/tasks/notify_user.py | 2 +- engine/apps/alerts/tests/test_notify_user.py | 2 +- .../base/models/user_notification_policy_log_record.py | 7 ++++++- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/engine/apps/alerts/tasks/notify_user.py b/engine/apps/alerts/tasks/notify_user.py index e7546a7ab..88f46f6e9 100644 --- a/engine/apps/alerts/tasks/notify_user.py +++ b/engine/apps/alerts/tasks/notify_user.py @@ -468,7 +468,7 @@ def perform_notification(log_record_pk, use_default_notification_policy_fallback alert_group=alert_group, notification_step=notification_policy.step, notification_channel=notification_channel, - notification_error_code=UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_POSTING_TO_TELEGRAM_IS_DISABLED, + notification_error_code=UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_IN_TELEGRAM_RATELIMIT, ).save() return diff --git a/engine/apps/alerts/tests/test_notify_user.py b/engine/apps/alerts/tests/test_notify_user.py index dd8288be9..a24003dfe 100644 --- a/engine/apps/alerts/tests/test_notify_user.py +++ b/engine/apps/alerts/tests/test_notify_user.py @@ -381,7 +381,7 @@ def test_perform_notification_telegram_retryafter_error( assert last_log_record.reason == "Telegram rate limit exceeded" assert ( last_log_record.notification_error_code - == UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_POSTING_TO_TELEGRAM_IS_DISABLED + == UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_IN_TELEGRAM_RATELIMIT ) diff --git a/engine/apps/base/models/user_notification_policy_log_record.py b/engine/apps/base/models/user_notification_policy_log_record.py index 3d7ab44e4..80185a845 100644 --- a/engine/apps/base/models/user_notification_policy_log_record.py +++ b/engine/apps/base/models/user_notification_policy_log_record.py @@ -106,7 +106,8 @@ class UserNotificationPolicyLogRecord(models.Model): ERROR_NOTIFICATION_TELEGRAM_USER_IS_DEACTIVATED, ERROR_NOTIFICATION_MOBILE_USER_HAS_NO_ACTIVE_DEVICE, ERROR_NOTIFICATION_FORMATTING_ERROR, - ) = range(29) + ERROR_NOTIFICATION_IN_TELEGRAM_RATELIMIT, + ) = range(30) # for this errors we want to send message to general log channel ERRORS_TO_SEND_IN_SLACK_CHANNEL = [ @@ -304,6 +305,10 @@ def render_log_line_action(self, for_slack=False, substitute_author_with_tag=Fal result += f"failed to notify {user_verbal} in Slack, because channel is archived" elif self.notification_error_code == UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_IN_SLACK_RATELIMIT: result += f"failed to notify {user_verbal} in Slack due to Slack rate limit" + elif ( + self.notification_error_code == UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_IN_TELEGRAM_RATELIMIT + ): + result += f"failed to notify {user_verbal} in Telegram due to Telegram rate limit" elif self.notification_error_code == UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_FORBIDDEN: result += f"failed to notify {user_verbal}, not allowed" elif (