Skip to content

Commit

Permalink
Merge pull request #525 from brancz/fix-resolved-filter
Browse files Browse the repository at this point in the history
notify: move resolved alert filtering to integration
  • Loading branch information
fabxc authored Oct 6, 2016
2 parents a84fba7 + dcf2b3a commit 2433eeb
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 31 deletions.
17 changes: 17 additions & 0 deletions notify/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,23 @@ type Integration struct {

// Notify implements the Notifier interface.
func (i *Integration) Notify(ctx context.Context, alerts ...*types.Alert) (bool, error) {
var res []*types.Alert

// Resolved alerts have to be filtered only at this point, because they need
// to end up unfiltered in the SetNotifiesStage.
if i.conf.SendResolved() {
res = alerts
} else {
for _, a := range alerts {
if a.Status() != model.AlertResolved {
res = append(res, a)
}
}
}
if len(res) == 0 {
return false, nil
}

return i.notifier.Notify(ctx, alerts...)
}

Expand Down
31 changes: 0 additions & 31 deletions notify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ func createStage(rc *config.Receiver, tmpl *template.Template, wait func() time.
}
var s MultiStage
s = append(s, NewWaitStage(wait))
s = append(s, NewFilterResolvedStage(i.conf))
s = append(s, NewDedupStage(notificationLog, recv))
s = append(s, NewRetryStage(i))
s = append(s, NewSetNotifiesStage(notificationLog, recv))
Expand Down Expand Up @@ -380,36 +379,6 @@ func (ws *WaitStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.
return ctx, alerts, nil
}

// FilterResolvedStage filters alerts based on a given notifierConfig. Either
// returns all alerts or only those that are not resolved.
type FilterResolvedStage struct {
conf notifierConfig
}

// NewFilterRecolvedStage returns a new instance of a FilterResolvedStage.
func NewFilterResolvedStage(conf notifierConfig) *FilterResolvedStage {
return &FilterResolvedStage{
conf: conf,
}
}

// Exec implements the Stage interface.
func (fr *FilterResolvedStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var res []*types.Alert

if fr.conf.SendResolved() {
res = alerts
} else {
for _, a := range alerts {
if a.Status() != model.AlertResolved {
res = append(res, a)
}
}
}

return ctx, res, nil
}

// DedupStage filters alerts.
// Filtering happens based on a notification log.
type DedupStage struct {
Expand Down
45 changes: 45 additions & 0 deletions notify/notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@ import (
"github.com/prometheus/alertmanager/types"
)

type notifierConfigFunc func() bool

func (f notifierConfigFunc) SendResolved() bool {
return f()
}

type notifierFunc func(ctx context.Context, alerts ...*types.Alert) (bool, error)

func (f notifierFunc) Notify(ctx context.Context, alerts ...*types.Alert) (bool, error) {
return f(ctx, alerts...)
}

type failStage struct{}

func (s failStage) Exec(ctx context.Context, as ...*types.Alert) (context.Context, []*types.Alert, error) {
Expand Down Expand Up @@ -283,6 +295,39 @@ func TestRoutingStage(t *testing.T) {
}
}

func TestIntegration(t *testing.T) {
res := []*types.Alert{}
r := notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) {
res = append(res, alerts...)

return false, nil
})
i1 := Integration{
notifier: r,
conf: notifierConfigFunc(func() bool { return false }),
}
i2 := Integration{
notifier: r,
conf: notifierConfigFunc(func() bool { return true }),
}

alerts := []*types.Alert{
&types.Alert{
Alert: model.Alert{
EndsAt: time.Now().Add(-time.Hour),
},
},
}

i1.Notify(nil, alerts...)
i2.Notify(nil, alerts...)

// Even though the alert is sent to both integrations, which end up being
// delivered to the same notifier, only one is actually delivered as the
// second integration filters the resolved notifications.
require.Equal(t, res, alerts)
}

func TestSetNotifiesStage(t *testing.T) {
tnflog := &testNflog{}
s := &SetNotifiesStage{
Expand Down
98 changes: 98 additions & 0 deletions test/acceptance/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,3 +325,101 @@ receivers:

wg.Wait()
}

func TestResolvedFilter(t *testing.T) {
t.Parallel()

// This integration test ensures that even though resolved alerts may not be
// notified about, they must be set as notified. Resolved alerts, even when
// filtered, have to end up in the SetNotifiesStage, otherwise when an alert
// fires again it is ambiguous whether it was resolved in between or not.

var wg sync.WaitGroup
wg.Add(10)

for i := 0; i < 10; i++ {
go func() {
conf := `
global:
resolve_timeout: 10s
route:
receiver: "default"
group_by: [alertname]
group_wait: 1s
group_interval: 5s
receivers:
- name: "default"
webhook_configs:
- url: 'http://%s'
send_resolved: true
- url: 'http://%s'
send_resolved: false
`

at := NewAcceptanceTest(t, &AcceptanceOpts{
Tolerance: 150 * time.Millisecond,
})

co1 := at.Collector("webhook1")
wh1 := NewWebhook(co1)

co2 := at.Collector("webhook2")
wh2 := NewWebhook(co2)

am := at.Alertmanager(fmt.Sprintf(conf, wh1.Address(), wh2.Address()))

am.Push(At(1),
Alert("alertname", "test", "lbl", "v1"),
Alert("alertname", "test", "lbl", "v2"),
Alert("alertname", "test", "lbl", "v3"),
)

am.Push(At(16),
Alert("alertname", "test", "lbl", "v1"),
Alert("alertname", "test", "lbl", "v2"),
Alert("alertname", "test", "lbl", "v3"),
)

co1.Want(Between(2, 2.5),
Alert("alertname", "test", "lbl", "v1").Active(1),
Alert("alertname", "test", "lbl", "v2").Active(1),
Alert("alertname", "test", "lbl", "v3").Active(1),
)
co1.Want(Between(12, 13),
Alert("alertname", "test", "lbl", "v1").Active(1, 11),
Alert("alertname", "test", "lbl", "v2").Active(1, 11),
Alert("alertname", "test", "lbl", "v3").Active(1, 11),
)

co1.Want(Between(17, 17.5),
Alert("alertname", "test", "lbl", "v1").Active(16),
Alert("alertname", "test", "lbl", "v2").Active(16),
Alert("alertname", "test", "lbl", "v3").Active(16),
)
co1.Want(Between(27, 28),
Alert("alertname", "test", "lbl", "v1").Active(16, 26),
Alert("alertname", "test", "lbl", "v2").Active(16, 26),
Alert("alertname", "test", "lbl", "v3").Active(16, 26),
)

co2.Want(Between(2, 2.5),
Alert("alertname", "test", "lbl", "v1").Active(1),
Alert("alertname", "test", "lbl", "v2").Active(1),
Alert("alertname", "test", "lbl", "v3").Active(1),
)

co2.Want(Between(17, 17.5),
Alert("alertname", "test", "lbl", "v1").Active(16),
Alert("alertname", "test", "lbl", "v2").Active(16),
Alert("alertname", "test", "lbl", "v3").Active(16),
)

at.Run()
wg.Done()
}()
}

wg.Wait()
}

0 comments on commit 2433eeb

Please sign in to comment.