Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ele 3682 alerts grouping #1716

Merged
merged 23 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
bd60db1
GroupedAlerts base class
MikaKerman Oct 6, 2024
be005e6
splitted the grouped alert types into multiple modules
MikaKerman Oct 6, 2024
a3fac51
created all in one grouped alert type
MikaKerman Oct 6, 2024
ac97fe6
added group_all_alerts_threshold config key
MikaKerman Oct 6, 2024
1566dc0
moved data and unified_meta properties to the base grouped alert class
MikaKerman Oct 6, 2024
b8efe33
added abstract method for all-in-one alert template
MikaKerman Oct 6, 2024
cc5c8f1
Slack integration - added all in one alert template
MikaKerman Oct 6, 2024
de78be6
Teams integration - added all in one alert template
MikaKerman Oct 6, 2024
fd24e51
added a compact version of the all-in-one-alert, to avoid the slack m…
MikaKerman Oct 6, 2024
818886b
using send_alerts method instead of send_alert
MikaKerman Oct 7, 2024
0b012c2
supporting unified alert only in slack integration
MikaKerman Oct 7, 2024
23f824a
moved the compact schema to class attribute
MikaKerman Oct 8, 2024
eaf4c4a
changed send_alerts to return a generator
MikaKerman Oct 8, 2024
3b87737
refactored al-in-one alert templates
MikaKerman Oct 8, 2024
c3c2b5d
grouping alerts in the base integration
MikaKerman Oct 8, 2024
020c2c0
changed config group alerts config key name
MikaKerman Oct 10, 2024
164ac89
passing the threshold
MikaKerman Oct 10, 2024
d26d814
slack - reduced number of block to avoid the slack limit
MikaKerman Oct 10, 2024
3071e16
teams - change \n to <br>
MikaKerman Oct 10, 2024
6ec78e7
using GroupedAlert directly
MikaKerman Oct 10, 2024
d0e235d
Refactor grouped alerts to alerts groups
MikaKerman Oct 14, 2024
a624839
Move grouping_type module
MikaKerman Oct 14, 2024
a074335
Add const for default group alerts threshold
MikaKerman Oct 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion elementary/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from google.auth.exceptions import DefaultCredentialsError # type: ignore[import]

from elementary.exceptions.exceptions import InvalidArgumentsError
from elementary.monitor.alerts.group_of_alerts import GroupingType
from elementary.monitor.alerts.grouping_type import GroupingType
from elementary.utils.ordered_yaml import OrderedYaml


Expand Down Expand Up @@ -35,6 +35,8 @@ class Config:

DEFAULT_TARGET_PATH = os.getcwd() + "/edr_target"

DEFAULT_GROUP_ALERTS_THRESHOLD = 100

def __init__(
self,
config_dir: str = DEFAULT_CONFIG_DIR,
Expand All @@ -49,6 +51,7 @@ def __init__(
slack_token: Optional[str] = None,
slack_channel_name: Optional[str] = None,
slack_group_alerts_by: Optional[str] = None,
group_alerts_threshold: Optional[int] = None,
timezone: Optional[str] = None,
aws_profile_name: Optional[str] = None,
aws_region_name: Optional[str] = None,
Expand Down Expand Up @@ -124,6 +127,11 @@ def __init__(
slack_config.get("group_alerts_by"),
GroupingType.BY_ALERT.value,
)
self.group_alerts_threshold = self._first_not_none(
group_alerts_threshold,
slack_config.get("group_alerts_threshold"),
self.DEFAULT_GROUP_ALERTS_THRESHOLD,
)

teams_config = config.get(self._TEAMS, {})
self.teams_webhook = self._first_not_none(
Expand Down
7 changes: 7 additions & 0 deletions elementary/monitor/alerts/alerts_groups/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .alerts_group import AlertsGroup
from .grouped_by_table import GroupedByTableAlerts

__all__ = [
"AlertsGroup",
"GroupedByTableAlerts",
]
Original file line number Diff line number Diff line change
@@ -1,23 +1,12 @@
from datetime import datetime
from enum import Enum
from typing import Dict, List, Optional, Union
from typing import Dict, List, Union

from elementary.monitor.alerts.model_alert import ModelAlertModel
from elementary.monitor.alerts.source_freshness_alert import SourceFreshnessAlertModel
from elementary.monitor.alerts.test_alert import TestAlertModel
from elementary.monitor.data_monitoring.alerts.integrations.utils.report_link import (
ReportLinkData,
get_model_test_runs_link,
)
from elementary.utils.models import get_shortened_model_name


class GroupingType(Enum):
BY_ALERT = "alert"
BY_TABLE = "table"


class GroupedByTableAlerts:
class AlertsGroup:
def __init__(
self,
alerts: List[Union[TestAlertModel, ModelAlertModel, SourceFreshnessAlertModel]],
Expand All @@ -30,26 +19,28 @@ def __init__(
self._sort_alerts()

@property
def model_unique_id(self) -> Optional[str]:
return self.alerts[0].model_unique_id

@property
def model(self) -> str:
return get_shortened_model_name(self.model_unique_id)
def summary(self) -> str:
return f"{len(self.alerts)} issues detected"

@property
def detected_at(self) -> datetime:
# We return the minimum alert detected at time as the group detected at time
return min(alert.detected_at or datetime.max for alert in self.alerts)

@property
def report_url(self) -> Optional[str]:
return self.alerts[0].report_url
def status(self) -> str:
if self.model_errors or self.test_errors:
return "error"
elif self.test_failures:
return "failure"
else:
return "warn"

@property
def data(self) -> List[Dict]:
return [alert.data for alert in self.alerts]

@property
def unified_meta(self) -> Dict:
# If a model level unified meta is defined, we use is.
# Else we use one of the tests level unified metas.
model_unified_meta = dict()
test_unified_meta = dict()
for alert in self.alerts:
Expand All @@ -62,29 +53,6 @@ def unified_meta(self) -> Dict:
test_unified_meta = alert_unified_meta
return model_unified_meta or test_unified_meta

@property
def data(self) -> List[Dict]:
return [alert.data for alert in self.alerts]

@property
def summary(self) -> str:
return f"{self.model}: {len(self.alerts)} issues detected"

@property
def status(self) -> str:
if self.model_errors or self.test_errors:
return "error"
elif self.test_failures:
return "failure"
else:
return "warn"

def get_report_link(self) -> Optional[ReportLinkData]:
if not self.model_errors:
return get_model_test_runs_link(self.report_url, self.model_unique_id)

return None

def _sort_alerts(self):
for alert in self.alerts:
if isinstance(alert, ModelAlertModel):
Expand Down
32 changes: 32 additions & 0 deletions elementary/monitor/alerts/alerts_groups/grouped_by_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from typing import Optional

from elementary.monitor.alerts.alerts_groups.alerts_group import AlertsGroup
from elementary.monitor.data_monitoring.alerts.integrations.utils.report_link import (
ReportLinkData,
get_model_test_runs_link,
)
from elementary.utils.models import get_shortened_model_name


class GroupedByTableAlerts(AlertsGroup):
@property
def model_unique_id(self) -> Optional[str]:
return self.alerts[0].model_unique_id

@property
def model(self) -> str:
return get_shortened_model_name(self.model_unique_id)

@property
def report_url(self) -> Optional[str]:
return self.alerts[0].report_url

@property
def summary(self) -> str:
return f"{self.model}: {len(self.alerts)} issues detected"

def get_report_link(self) -> Optional[ReportLinkData]:
if not self.model_errors:
return get_model_test_runs_link(self.report_url, self.model_unique_id)

return None
6 changes: 6 additions & 0 deletions elementary/monitor/alerts/grouping_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from enum import Enum


class GroupingType(str, Enum):
BY_ALERT = "alert"
BY_TABLE = "table"
8 changes: 8 additions & 0 deletions elementary/monitor/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,12 @@ def get_cli_properties() -> dict:
default=None,
help="DEPRECATED! - A slack webhook URL for sending alerts to a specific channel.",
)
@click.option(
"--group-alerts-threshold",
type=int,
default=Config.DEFAULT_GROUP_ALERTS_THRESHOLD,
help="The threshold for all alerts in a single message.",
)
@click.option(
"--timezone",
"-tz",
Expand Down Expand Up @@ -276,6 +282,7 @@ def monitor(
deprecated_slack_webhook,
slack_token,
slack_channel_name,
group_alerts_threshold,
timezone,
config_dir,
profiles_dir,
Expand Down Expand Up @@ -322,6 +329,7 @@ def monitor(
slack_webhook=slack_webhook,
slack_token=slack_token,
slack_channel_name=slack_channel_name,
group_alerts_threshold=group_alerts_threshold,
timezone=timezone,
env=env,
slack_group_alerts_by=group_by,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from alive_progress import alive_it

from elementary.config.config import Config
from elementary.monitor.alerts.group_of_alerts import GroupedByTableAlerts, GroupingType
from elementary.monitor.alerts.alerts_groups import AlertsGroup, GroupedByTableAlerts
from elementary.monitor.alerts.grouping_type import GroupingType
from elementary.monitor.alerts.model_alert import ModelAlertModel
from elementary.monitor.alerts.source_freshness_alert import SourceFreshnessAlertModel
from elementary.monitor.alerts.test_alert import TestAlertModel
Expand Down Expand Up @@ -251,18 +252,19 @@ def _send_alerts(

alerts_with_progress_bar = alive_it(alerts, title="Sending alerts")
sent_successfully_alerts = []
for alert in alerts_with_progress_bar:
sent_successfully = self.alerts_integration.send_alert(alert=alert)
for alert, sent_successfully in self.alerts_integration.send_alerts(
alerts_with_progress_bar, self.config.group_alerts_threshold
):
if sent_successfully:
if isinstance(alert, GroupedByTableAlerts):
if isinstance(alert, AlertsGroup):
sent_successfully_alerts.extend(alert.alerts)
else:
sent_successfully_alerts.append(alert)
else:
if isinstance(alert, GroupedByTableAlerts):
for grouped_alert in alert.alerts:
if isinstance(alert, AlertsGroup):
for inner_alert in alert.alerts:
logger.error(
f"Could not send the alert - {grouped_alert.id}. Full alert: {json.dumps(grouped_alert.data)}"
f"Could not send the alert - {inner_alert.id}. Full alert: {json.dumps(inner_alert.data)}"
)
else:
logger.error(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from abc import ABC, abstractmethod
from typing import Union
from typing import Generator, List, Sequence, Tuple, Union

from elementary.monitor.alerts.group_of_alerts import GroupedByTableAlerts
from elementary.monitor.alerts.alerts_groups import AlertsGroup, GroupedByTableAlerts
from elementary.monitor.alerts.model_alert import ModelAlertModel
from elementary.monitor.alerts.source_freshness_alert import SourceFreshnessAlertModel
from elementary.monitor.alerts.test_alert import TestAlertModel
from elementary.utils.log import get_logger

logger = get_logger(__name__)


class BaseIntegration(ABC):
Expand All @@ -22,9 +25,10 @@ def _get_alert_template(
ModelAlertModel,
SourceFreshnessAlertModel,
GroupedByTableAlerts,
AlertsGroup,
],
*args,
**kwargs
**kwargs,
):
if isinstance(alert, TestAlertModel):
if alert.is_elementary_test:
Expand All @@ -40,6 +44,8 @@ def _get_alert_template(
return self._get_source_freshness_template(alert)
elif isinstance(alert, GroupedByTableAlerts):
return self._get_group_by_table_template(alert)
elif isinstance(alert, AlertsGroup):
return self._get_alerts_group_template(alert)

@abstractmethod
def _get_dbt_test_template(self, alert: TestAlertModel, *args, **kwargs):
Expand Down Expand Up @@ -69,6 +75,10 @@ def _get_group_by_table_template(
):
raise NotImplementedError

@abstractmethod
def _get_alerts_group_template(self, alert: AlertsGroup, *args, **kwargs):
raise NotImplementedError

@abstractmethod
def _get_fallback_template(
self,
Expand All @@ -79,7 +89,7 @@ def _get_fallback_template(
GroupedByTableAlerts,
],
*args,
**kwargs
**kwargs,
):
raise NotImplementedError

Expand All @@ -91,12 +101,83 @@ def send_alert(
ModelAlertModel,
SourceFreshnessAlertModel,
GroupedByTableAlerts,
AlertsGroup,
],
*args,
**kwargs
**kwargs,
) -> bool:
raise NotImplementedError

def _group_alerts(
self,
alerts: Sequence[
Union[
TestAlertModel,
ModelAlertModel,
SourceFreshnessAlertModel,
GroupedByTableAlerts,
]
],
threshold: int,
) -> Sequence[
Union[
TestAlertModel,
ModelAlertModel,
SourceFreshnessAlertModel,
GroupedByTableAlerts,
AlertsGroup,
]
]:
flattened_alerts: List[
Union[TestAlertModel, ModelAlertModel, SourceFreshnessAlertModel]
] = []
for alert in alerts:
if isinstance(alert, AlertsGroup):
flattened_alerts.extend(alert.alerts)
else:
flattened_alerts.append(alert)

if len(flattened_alerts) >= threshold:
logger.info(f"Grouping {len(flattened_alerts)} alerts into one")
return [
AlertsGroup(alerts=flattened_alerts),
]
return alerts

def send_alerts(
self,
alerts: Sequence[
Union[
TestAlertModel,
ModelAlertModel,
SourceFreshnessAlertModel,
GroupedByTableAlerts,
]
],
group_alerts_threshold: int,
*args,
**kwargs,
) -> Generator[
Tuple[
Union[
TestAlertModel,
ModelAlertModel,
SourceFreshnessAlertModel,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
SourceFreshnessAlertModel,
SourceFreshnessAlertModel,
GroupedByTableAlerts,

no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the alert is an AlertsGroup, the inner alerts will be generated. This changes the behavior of GroupedByTableAlerts, but I checked that the method's caller can handle it as well.

],
bool,
],
None,
None,
]:
grouped_alerts = self._group_alerts(alerts, group_alerts_threshold)
for alert in grouped_alerts:
if isinstance(alert, AlertsGroup):
sent_successfully = self.send_alert(alert, *args, **kwargs)
for inner_alert in alert.alerts:
yield inner_alert, sent_successfully
else:
yield alert, self.send_alert(alert, *args, **kwargs)

@abstractmethod
def send_test_message(self, *args, **kwargs) -> bool:
raise NotImplementedError
Loading
Loading