Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding association between a Kafka cluster and a notifier #611

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/default-http-delete.tmpl
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"api_key":"{{index .Extras "api_key"}}","app":"{{index .Extras "app"}}","block":false,"ids":["{{.Id}}"]}
{"api_key":"{{index .Extras "api_key"}}","app":"{{index .Extras "app"}}","block":false,"ids":["{{.ID}}"],"group":"{{.Result.Group}}"}
6 changes: 6 additions & 0 deletions core/internal/helpers/coordinators.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ func (m *MockModule) GetName() string {
return args.String(0)
}

// GetCluster mocks the notifier.Module GetCluster func
func (m *MockModule) GetCluster() string {
args := m.Called()
return args.String(0)
}

// GetGroupWhitelist mocks the notifier.Module GetGroupWhitelist func
func (m *MockModule) GetGroupWhitelist() *regexp.Regexp {
args := m.Called()
Expand Down
2 changes: 2 additions & 0 deletions core/internal/httpserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func (hc *Coordinator) configNotifierHTTP(w http.ResponseWriter, r *http.Request
SendClose: viper.GetBool(configRoot + ".send-close"),
ExtraCa: viper.GetString(configRoot + ".extra-ca"),
NoVerify: viper.GetString(configRoot + ".noverify"),
Cluster: viper.GetString(configRoot + ".cluster"),
},
Request: requestInfo,
})
Expand Down Expand Up @@ -266,6 +267,7 @@ func (hc *Coordinator) configNotifierEmail(w http.ResponseWriter, r *http.Reques
To: viper.GetString(configRoot + ".to"),
ExtraCa: viper.GetString(configRoot + ".extra-ca"),
NoVerify: viper.GetString(configRoot + ".noverify"),
Cluster: viper.GetString(configRoot + ".cluster"),
},
Request: requestInfo,
})
Expand Down
2 changes: 2 additions & 0 deletions core/internal/httpserver/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ type httpResponseConfigModuleNotifierHTTP struct {
SendClose bool `json:"send-close"`
ExtraCa string `json:"extra-ca"`
NoVerify string `json:"noverify"`
Cluster string `json:"cluster"`
}

type httpResponseConfigModuleNotifierSlack struct {
Expand Down Expand Up @@ -239,6 +240,7 @@ type httpResponseConfigModuleNotifierEmail struct {
To string `json:"to"`
ExtraCa string `json:"extra-ca"`
NoVerify string `json:"noverify"`
Cluster string `json:"cluster"`
}

type httpResponseConfigModuleNotifierNull struct {
Expand Down
13 changes: 11 additions & 2 deletions core/internal/notifier/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
type Module interface {
protocol.Module
GetName() string
GetCluster() string
GetGroupWhitelist() *regexp.Regexp
GetGroupBlacklist() *regexp.Regexp
GetLogger() *zap.Logger
Expand Down Expand Up @@ -96,7 +97,7 @@ type Coordinator struct {

// getModuleForClass returns the correct module based on the passed className. As part of the Configure steps, if there
// is any error, it will panic with an appropriate message describing the problem.
func getModuleForClass(app *protocol.ApplicationContext, moduleName, className string, groupWhitelist, groupBlacklist *regexp.Regexp, extras map[string]string, templateOpen, templateClose *template.Template) protocol.Module {
func getModuleForClass(app *protocol.ApplicationContext, moduleName, className string, groupWhitelist, groupBlacklist *regexp.Regexp, extras map[string]string, templateOpen, templateClose *template.Template, cluster string) protocol.Module {
logger := app.Logger.With(
zap.String("type", "module"),
zap.String("coordinator", "notifier"),
Expand All @@ -114,6 +115,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s
extras: extras,
templateOpen: templateOpen,
templateClose: templateClose,
cluster: cluster,
}
case "email":
return &EmailNotifier{
Expand All @@ -124,6 +126,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s
extras: extras,
templateOpen: templateOpen,
templateClose: templateClose,
cluster: cluster,
}
case "null":
return &NullNotifier{
Expand All @@ -134,6 +137,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s
extras: extras,
templateOpen: templateOpen,
templateClose: templateClose,
cluster: cluster,
}
default:
panic("Unknown notifier className provided: " + className)
Expand Down Expand Up @@ -189,6 +193,8 @@ func (nc *Coordinator) Configure() {
groupWhitelist = re
}

cluster := viper.GetString(configRoot + ".cluster")

// Compile the blacklist for the consumer groups to not notify for
var groupBlacklist *regexp.Regexp
blacklist := viper.GetString(configRoot + ".group-blacklist")
Expand Down Expand Up @@ -222,7 +228,7 @@ func (nc *Coordinator) Configure() {
templateClose = tmpl.Templates()[0]
}

module := getModuleForClass(nc.App, name, viper.GetString(configRoot+".class-name"), groupWhitelist, groupBlacklist, extras, templateOpen, templateClose)
module := getModuleForClass(nc.App, name, viper.GetString(configRoot+".class-name"), groupWhitelist, groupBlacklist, extras, templateOpen, templateClose, cluster)
module.Configure(name, configRoot)
nc.modules[name] = module
interval := viper.GetInt64(configRoot + ".interval")
Expand Down Expand Up @@ -431,6 +437,9 @@ func (nc *Coordinator) checkAndSendResponseToModules(response *protocol.Consumer
for _, genericModule := range nc.modules {
module := genericModule.(Module)

if module.GetCluster() != "" && response.Cluster != module.GetCluster() {
continue
}
// No whitelist means everything passes
groupWhitelist := module.GetGroupWhitelist()
groupBlacklist := module.GetGroupBlacklist()
Expand Down
74 changes: 43 additions & 31 deletions core/internal/notifier/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,39 +468,46 @@ func TestCoordinator_responseLoop_HaveIncidentError(t *testing.T) {
}

var notifyModuleTests = []struct {
Threshold int
Status protocol.StatusConstant
Existing bool
SendClose bool
ExpectSend bool
ExpectClose bool
ExpectID bool
Threshold int
Status protocol.StatusConstant
Existing bool
SendClose bool
ExpectSend bool
ExpectClose bool
ExpectID bool
AssociatedCluster string
}{
/*{1, 0, false, false, false, false, false},
{2, 0, false, false, false, false, false},
{1, 0, true, false, false, false, false},
{1, 0, false, true, false, false, false},
{1, 0, true, true, false, false, false}, */

{1, 1, false, false, true, false, false},
{1, 1, false, true, true, false, false},
{1, 1, true, false, true, false, false},
{1, 1, true, true, true, true, false},

{1, 2, false, false, true, false, true},
{1, 2, false, true, true, false, true},
{1, 2, true, false, true, false, true},
{1, 2, true, true, true, false, true},

{3, 2, false, false, false, false, true},
{3, 2, false, true, false, false, true},
{3, 2, true, false, false, false, true},
{3, 2, true, true, false, false, true},

{2, 1, false, false, false, false, false},
{2, 1, false, true, false, false, false},
{2, 1, true, false, false, false, false},
{2, 1, true, true, true, true, false},
{1, 1, false, false, true, false, false, ""},
{1, 1, false, false, true, false, false, "testcluster"},
{1, 1, false, false, false, false, false, "unmatchedCluster"},
{1, 1, false, true, true, false, false, ""},
{1, 1, true, false, true, false, false, ""},
{1, 1, true, true, true, true, false, ""},

{1, 2, false, false, true, false, true, ""},
{1, 2, false, true, true, false, true, ""},
{1, 2, true, false, true, false, true, ""},
{1, 2, true, true, true, false, true, ""},

{3, 2, false, false, false, false, true, ""},
{3, 2, false, true, false, false, true, ""},
{3, 2, true, false, false, false, true, ""},
{3, 2, true, true, false, false, true, ""},

{2, 1, false, false, false, false, false, ""},
{2, 1, false, true, false, false, false, ""},
{2, 1, true, false, false, false, false, ""},
{2, 1, true, true, true, true, false, ""},
}

func checkNotifierClusterMatch(cluster string) bool {
return cluster == "" || cluster == "testcluster"
}

func TestCoordinator_checkAndSendResponseToModules(t *testing.T) {
Expand Down Expand Up @@ -549,14 +556,19 @@ func TestCoordinator_checkAndSendResponseToModules(t *testing.T) {
Status: testSet.Status,
TotalPartitions: i,
}

// Set up the mock module and expected calls
mockModule := &helpers.MockModule{}
coordinator.modules["test"] = mockModule
mockModule.On("GetName").Return("test")
mockModule.On("GetGroupWhitelist").Return((*regexp.Regexp)(nil))
mockModule.On("GetGroupBlacklist").Return((*regexp.Regexp)(nil))
mockModule.On("AcceptConsumerGroup", response).Return(true)
mockModule.On("GetCluster").Return(testSet.AssociatedCluster)

if checkNotifierClusterMatch(testSet.AssociatedCluster) {

mockModule.On("GetName").Return("test")
mockModule.On("GetGroupWhitelist").Return((*regexp.Regexp)(nil))
mockModule.On("GetGroupBlacklist").Return((*regexp.Regexp)(nil))
mockModule.On("AcceptConsumerGroup", response).Return(true)
}

if testSet.ExpectSend {
mockModule.On("Notify", response, mock.MatchedBy(func(s string) bool { return true }), mock.MatchedBy(func(t time.Time) bool { return true }), testSet.ExpectClose).Return()
}
Expand Down
6 changes: 6 additions & 0 deletions core/internal/notifier/email.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type EmailNotifier struct {
Log *zap.Logger

name string
cluster string
groupWhitelist *regexp.Regexp
groupBlacklist *regexp.Regexp
extras map[string]string
Expand Down Expand Up @@ -140,6 +141,11 @@ func (module *EmailNotifier) GetName() string {
return module.name
}

// GetCluster returns the configured name of this module
func (module *EmailNotifier) GetCluster() string {
return module.cluster
}

// GetGroupWhitelist returns the compiled group whitelist (or nil, if there is not one)
func (module *EmailNotifier) GetGroupWhitelist() *regexp.Regexp {
return module.groupWhitelist
Expand Down
5 changes: 5 additions & 0 deletions core/internal/notifier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type HTTPNotifier struct {
Log *zap.Logger

name string
cluster string
groupWhitelist *regexp.Regexp
groupBlacklist *regexp.Regexp
extras map[string]string
Expand Down Expand Up @@ -126,6 +127,10 @@ func (module *HTTPNotifier) GetName() string {
return module.name
}

func (module *HTTPNotifier) GetCluster() string {
return module.cluster
}

// GetGroupWhitelist returns the compiled group whitelist (or nil, if there is not one)
func (module *HTTPNotifier) GetGroupWhitelist() *regexp.Regexp {
return module.groupWhitelist
Expand Down
6 changes: 6 additions & 0 deletions core/internal/notifier/null.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type NullNotifier struct {
Log *zap.Logger

name string
cluster string
groupWhitelist *regexp.Regexp
groupBlacklist *regexp.Regexp
extras map[string]string
Expand Down Expand Up @@ -76,6 +77,11 @@ func (module *NullNotifier) GetName() string {
return module.name
}

// GetCluster returns the configured name of this module
func (module *NullNotifier) GetCluster() string {
return module.cluster
}

// GetGroupWhitelist returns the compiled group whitelist (or nil, if there is not one)
func (module *NullNotifier) GetGroupWhitelist() *regexp.Regexp {
return module.groupWhitelist
Expand Down