Skip to content

Commit

Permalink
Merge pull request #379 from prometheus/fabxc-inhibit-fix
Browse files Browse the repository at this point in the history
inhibit: reduce O(n^2) complexity problem
  • Loading branch information
fabxc authored Jun 14, 2016
2 parents 5002dde + a5c6b23 commit e6892e1
Show file tree
Hide file tree
Showing 3 changed files with 284 additions and 34 deletions.
148 changes: 114 additions & 34 deletions inhibit.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package main

import (
"sync"
"time"

"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
Expand All @@ -31,7 +32,8 @@ type Inhibitor struct {
rules []*InhibitRule
marker types.Marker

mtx sync.RWMutex
mtx sync.RWMutex
stopc chan struct{}
}

// NewInhibitor returns a new Inhibitor.
Expand All @@ -41,41 +43,87 @@ func NewInhibitor(ap provider.Alerts, rs []*config.InhibitRule, mk types.Marker)
marker: mk,
}
for _, cr := range rs {
ih.rules = append(ih.rules, NewInhibitRule(cr))
r := NewInhibitRule(cr)
ih.rules = append(ih.rules, r)
}
return ih
}

// Mutes returns true iff the given label set is muted.
func (ih *Inhibitor) Mutes(lset model.LabelSet) bool {
alerts := ih.alerts.GetPending()
defer alerts.Close()

// TODO(fabxc): improve erroring for iterators so it does not
// go silenced here.

for alert := range alerts.Next() {
if err := alerts.Err(); err != nil {
log.Errorf("Error iterating alerts: %s", err)
continue
}
if alert.Resolved() {
continue
func (ih *Inhibitor) runGC() {
for {
select {
case <-time.After(15 * time.Minute):
for _, r := range ih.rules {
r.gc()
}
case <-ih.stopc:
return
}
for _, rule := range ih.rules {
if rule.Mutes(alert.Labels, lset) {
ih.marker.SetInhibited(lset.Fingerprint(), true)
return true
}
}

// Run the Inihibitor's background processing.
func (ih *Inhibitor) Run() {
ih.mtx.Lock()
ih.stopc = make(chan struct{})
ih.mtx.Unlock()

go ih.runGC()

it := ih.alerts.Subscribe()
defer it.Close()

for {
select {
case <-ih.stopc:
return
case a := <-it.Next():
if err := it.Err(); err != nil {
log.Errorf("Error iterating alerts: %s", err)
continue
}
if a.Resolved() {
// As alerts can also time out without an update, we never
// handle new resolved alerts but invalidate the cache on read.
continue
}
// Populate the inhibition rules' cache.
for _, r := range ih.rules {
if r.SourceMatchers.Match(a.Labels) {
r.set(a)
}
}
}
}
if err := alerts.Err(); err != nil {
log.Errorf("Error after iterating alerts: %s", err)
}

// Stop the Inhibitor's background processing.
func (ih *Inhibitor) Stop() {
if ih == nil {
return
}
ih.mtx.Lock()
defer ih.mtx.Unlock()

ih.marker.SetInhibited(lset.Fingerprint(), false)
if ih.stopc != nil {
close(ih.stopc)
ih.stopc = nil
}
}

// Mutes returns true iff the given label set is muted.
func (ih *Inhibitor) Mutes(lset model.LabelSet) bool {
fp := lset.Fingerprint()

for _, r := range ih.rules {
if r.TargetMatchers.Match(lset) && r.hasEqual(lset) {
ih.marker.SetInhibited(fp, true)
return true
}
}
ih.marker.SetInhibited(fp, false)
return false

}

// An InhibitRule specifies that a class of (source) alerts should inhibit
Expand All @@ -93,6 +141,10 @@ type InhibitRule struct {
// A set of label names whose label values need to be identical in source and
// target alerts in order for the inhibition to take effect.
Equal map[model.LabelName]struct{}

mtx sync.RWMutex
// Cache of alerts matching source labels.
scache map[model.Fingerprint]*types.Alert
}

// NewInhibitRule returns a new InihibtRule based on a configuration definition.
Expand Down Expand Up @@ -125,20 +177,48 @@ func NewInhibitRule(cr *config.InhibitRule) *InhibitRule {
SourceMatchers: sourcem,
TargetMatchers: targetm,
Equal: equal,
scache: map[model.Fingerprint]*types.Alert{},
}
}

// Mutes returns true iff the Inhibition rule applies for the given
// source and target label set.
func (r *InhibitRule) Mutes(source, target model.LabelSet) bool {
if !r.TargetMatchers.Match(target) || !r.SourceMatchers.Match(source) {
return false
}
for ln := range r.Equal {
if source[ln] != target[ln] {
return false
// set the alert in the source cache.
func (r *InhibitRule) set(a *types.Alert) {
r.mtx.Lock()
r.mtx.Unlock()

r.scache[a.Fingerprint()] = a
}

// hasEqual checks whether the source cache contains alerts matching
// the equal labels for the given label set.
func (r *InhibitRule) hasEqual(lset model.LabelSet) bool {
r.mtx.RLock()
defer r.mtx.RUnlock()

Outer:
for _, a := range r.scache {
// The cache might be stale and contain resolved alerts.
if a.Resolved() {
continue
}
for n := range r.Equal {
if a.Labels[n] != lset[n] {
continue Outer
}
}
return true
}
return false
}

// gc clears out resolved alerts from the source cache.
func (r *InhibitRule) gc() {
r.mtx.Lock()
defer r.mtx.Unlock()

return true
for fp, a := range r.scache {
if a.Resolved() {
delete(r.scache, fp)
}
}
}
168 changes: 168 additions & 0 deletions inhibit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// Copyright 2016 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"reflect"
"testing"
"time"

"github.com/kylelemons/godebug/pretty"
"github.com/prometheus/alertmanager/types"
"github.com/prometheus/common/model"
)

func TestInhibitRuleHasEqual(t *testing.T) {
now := time.Now()
cases := []struct {
initial map[model.Fingerprint]*types.Alert
equal model.LabelNames
input model.LabelSet
result bool
}{
{
// No source alerts at all.
initial: map[model.Fingerprint]*types.Alert{},
input: model.LabelSet{"a": "b"},
result: false,
},
{
// No equal labels, any source alerts satisfies the requirement.
initial: map[model.Fingerprint]*types.Alert{1: &types.Alert{}},
input: model.LabelSet{"a": "b"},
result: true,
},
{
// Matching but already resolved.
initial: map[model.Fingerprint]*types.Alert{
1: &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{"a": "b", "b": "f"},
StartsAt: now.Add(-time.Minute),
EndsAt: now.Add(-time.Second),
},
},
2: &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{"a": "b", "b": "c"},
StartsAt: now.Add(-time.Minute),
EndsAt: now.Add(-time.Second),
},
},
},
equal: model.LabelNames{"a", "b"},
input: model.LabelSet{"a": "b", "b": "c"},
result: false,
},
{
// Matching but already resolved.
initial: map[model.Fingerprint]*types.Alert{
1: &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{"a": "b", "c": "d"},
StartsAt: now.Add(-time.Minute),
EndsAt: now.Add(-time.Second),
},
},
2: &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{"a": "b", "c": "f"},
StartsAt: now.Add(-time.Minute),
EndsAt: now.Add(-time.Second),
},
},
},
equal: model.LabelNames{"a"},
input: model.LabelSet{"a": "b"},
result: false,
},
{
// Equal label does not match.
initial: map[model.Fingerprint]*types.Alert{
1: &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{"a": "c", "c": "d"},
StartsAt: now.Add(-time.Minute),
EndsAt: now.Add(-time.Second),
},
},
2: &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{"a": "c", "c": "f"},
StartsAt: now.Add(-time.Minute),
EndsAt: now.Add(-time.Second),
},
},
},
equal: model.LabelNames{"a"},
input: model.LabelSet{"a": "b"},
result: false,
},
}

for _, c := range cases {
r := &InhibitRule{
Equal: map[model.LabelName]struct{}{},
scache: map[model.Fingerprint]*types.Alert{},
}
for _, ln := range c.equal {
r.Equal[ln] = struct{}{}
}
for k, v := range c.initial {
r.scache[k] = v
}

if have := r.hasEqual(c.input); have != c.result {
t.Errorf("Unexpected result %q, expected %q", have, c.result)
}
if !reflect.DeepEqual(r.scache, c.initial) {
t.Errorf("Cache state unexpectedly changed")
t.Errorf(pretty.Compare(r.scache, c.initial))
}
}
}

func TestInhibitRuleGC(t *testing.T) {
// TODO(fabxc): add now() injection function to Resolved() to remove
// dependency on machine time in this test.
now := time.Now()
newAlert := func(start, end time.Duration) *types.Alert {
return &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{"a": "b"},
StartsAt: now.Add(start * time.Minute),
EndsAt: now.Add(end * time.Minute),
},
}
}

before := map[model.Fingerprint]*types.Alert{
0: newAlert(-10, -5),
1: newAlert(10, 20),
2: newAlert(-10, 10),
3: newAlert(-10, -1),
}
after := map[model.Fingerprint]*types.Alert{
1: newAlert(10, 20),
2: newAlert(-10, 10),
}

r := &InhibitRule{scache: before}
r.gc()

if !reflect.DeepEqual(r.scache, after) {
t.Errorf("Unexpected cache state after GC")
t.Errorf(pretty.Compare(r.scache, after))
}
}
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,14 @@ func main() {
}
tmpl.ExternalURL = amURL

inhibitor.Stop()
disp.Stop()

inhibitor = NewInhibitor(alerts, conf.InhibitRules, marker)
disp = NewDispatcher(alerts, NewRoute(conf.Route, nil), build(conf.Receivers), marker)

go disp.Run()
go inhibitor.Run()

return nil
}
Expand Down

0 comments on commit e6892e1

Please sign in to comment.