From 700c6be7fc6f7fd2608d8a09e9834899da87214a Mon Sep 17 00:00:00 2001 From: Rui Abreu Date: Wed, 29 Jan 2020 15:46:01 +0000 Subject: [PATCH 1/3] Adding association between a Kafka cluster and a notifier --- config/default-http-delete.tmpl | 2 +- core/internal/helpers/coordinators.go | 6 +++ core/internal/httpserver/config.go | 2 + core/internal/httpserver/structs.go | 2 + core/internal/notifier/coordinator.go | 15 ++++++- core/internal/notifier/coordinator_test.go | 52 +++++++++++++--------- core/internal/notifier/email.go | 6 +++ core/internal/notifier/http.go | 5 +++ core/internal/notifier/null.go | 7 +++ 9 files changed, 74 insertions(+), 23 deletions(-) diff --git a/config/default-http-delete.tmpl b/config/default-http-delete.tmpl index 70aa7972..0b1960f9 100644 --- a/config/default-http-delete.tmpl +++ b/config/default-http-delete.tmpl @@ -1 +1 @@ -{"api_key":"{{index .Extras "api_key"}}","app":"{{index .Extras "app"}}","block":false,"ids":["{{.Id}}"]} +{"api_key":"{{index .Extras "api_key"}}","app":"{{index .Extras "app"}}","block":false,"ids":["{{.ID}}"],"group":"{{.Result.Group}}"} diff --git a/core/internal/helpers/coordinators.go b/core/internal/helpers/coordinators.go index 4a8bb201..14dfd2a2 100644 --- a/core/internal/helpers/coordinators.go +++ b/core/internal/helpers/coordinators.go @@ -75,6 +75,12 @@ func (m *MockModule) GetName() string { return args.String(0) } +// GetCluster mocks the notifier.Module GetCluster func +func (m *MockModule) GetCluster() string { + args := m.Called() + return args.String(0) +} + // GetGroupWhitelist mocks the notifier.Module GetGroupWhitelist func func (m *MockModule) GetGroupWhitelist() *regexp.Regexp { args := m.Called() diff --git a/core/internal/httpserver/config.go b/core/internal/httpserver/config.go index 8191af90..dc090d03 100644 --- a/core/internal/httpserver/config.go +++ b/core/internal/httpserver/config.go @@ -214,6 +214,7 @@ func (hc *Coordinator) configNotifierHTTP(w http.ResponseWriter, r *http.Request SendClose: viper.GetBool(configRoot + ".send-close"), ExtraCa: viper.GetString(configRoot + ".extra-ca"), NoVerify: viper.GetString(configRoot + ".noverify"), + Cluster: viper.GetString(configRoot + ".cluster"), }, Request: requestInfo, }) @@ -266,6 +267,7 @@ func (hc *Coordinator) configNotifierEmail(w http.ResponseWriter, r *http.Reques To: viper.GetString(configRoot + ".to"), ExtraCa: viper.GetString(configRoot + ".extra-ca"), NoVerify: viper.GetString(configRoot + ".noverify"), + Cluster: viper.GetString(configRoot + ".cluster"), }, Request: requestInfo, }) diff --git a/core/internal/httpserver/structs.go b/core/internal/httpserver/structs.go index 5ba6a7be..d27d46ac 100644 --- a/core/internal/httpserver/structs.go +++ b/core/internal/httpserver/structs.go @@ -203,6 +203,7 @@ type httpResponseConfigModuleNotifierHTTP struct { SendClose bool `json:"send-close"` ExtraCa string `json:"extra-ca"` NoVerify string `json:"noverify"` + Cluster string `json:"cluster"` } type httpResponseConfigModuleNotifierSlack struct { @@ -239,6 +240,7 @@ type httpResponseConfigModuleNotifierEmail struct { To string `json:"to"` ExtraCa string `json:"extra-ca"` NoVerify string `json:"noverify"` + Cluster string `json:"cluster"` } type httpResponseConfigModuleNotifierNull struct { diff --git a/core/internal/notifier/coordinator.go b/core/internal/notifier/coordinator.go index 0e9b57db..0ba7afee 100644 --- a/core/internal/notifier/coordinator.go +++ b/core/internal/notifier/coordinator.go @@ -49,6 +49,7 @@ import ( type Module interface { protocol.Module GetName() string + GetCluster() string GetGroupWhitelist() *regexp.Regexp GetGroupBlacklist() *regexp.Regexp GetLogger() *zap.Logger @@ -96,7 +97,7 @@ type Coordinator struct { // getModuleForClass returns the correct module based on the passed className. As part of the Configure steps, if there // is any error, it will panic with an appropriate message describing the problem. -func getModuleForClass(app *protocol.ApplicationContext, moduleName, className string, groupWhitelist, groupBlacklist *regexp.Regexp, extras map[string]string, templateOpen, templateClose *template.Template) protocol.Module { +func getModuleForClass(app *protocol.ApplicationContext, moduleName, className string, groupWhitelist, groupBlacklist *regexp.Regexp, extras map[string]string, templateOpen, templateClose *template.Template, cluster string) protocol.Module { logger := app.Logger.With( zap.String("type", "module"), zap.String("coordinator", "notifier"), @@ -114,6 +115,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s extras: extras, templateOpen: templateOpen, templateClose: templateClose, + cluster: cluster, } case "email": return &EmailNotifier{ @@ -124,6 +126,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s extras: extras, templateOpen: templateOpen, templateClose: templateClose, + cluster: cluster, } case "null": return &NullNotifier{ @@ -134,6 +137,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s extras: extras, templateOpen: templateOpen, templateClose: templateClose, + cluster: cluster, } default: panic("Unknown notifier className provided: " + className) @@ -189,6 +193,9 @@ func (nc *Coordinator) Configure() { groupWhitelist = re } + var cluster string + cluster = viper.GetString(configRoot + ".cluster") + // Compile the blacklist for the consumer groups to not notify for var groupBlacklist *regexp.Regexp blacklist := viper.GetString(configRoot + ".group-blacklist") @@ -222,7 +229,7 @@ func (nc *Coordinator) Configure() { templateClose = tmpl.Templates()[0] } - module := getModuleForClass(nc.App, name, viper.GetString(configRoot+".class-name"), groupWhitelist, groupBlacklist, extras, templateOpen, templateClose) + module := getModuleForClass(nc.App, name, viper.GetString(configRoot+".class-name"), groupWhitelist, groupBlacklist, extras, templateOpen, templateClose,cluster) module.Configure(name, configRoot) nc.modules[name] = module interval := viper.GetInt64(configRoot + ".interval") @@ -431,6 +438,10 @@ func (nc *Coordinator) checkAndSendResponseToModules(response *protocol.Consumer for _, genericModule := range nc.modules { module := genericModule.(Module) + + if module.GetCluster() != "" && response.Cluster!=module.GetCluster() { + continue + } // No whitelist means everything passes groupWhitelist := module.GetGroupWhitelist() groupBlacklist := module.GetGroupBlacklist() diff --git a/core/internal/notifier/coordinator_test.go b/core/internal/notifier/coordinator_test.go index 5909c379..e1c497a1 100644 --- a/core/internal/notifier/coordinator_test.go +++ b/core/internal/notifier/coordinator_test.go @@ -475,6 +475,7 @@ var notifyModuleTests = []struct { ExpectSend bool ExpectClose bool ExpectID bool + AssociatedCluster string }{ /*{1, 0, false, false, false, false, false}, {2, 0, false, false, false, false, false}, @@ -482,25 +483,31 @@ var notifyModuleTests = []struct { {1, 0, false, true, false, false, false}, {1, 0, true, true, false, false, false}, */ - {1, 1, false, false, true, false, false}, - {1, 1, false, true, true, false, false}, - {1, 1, true, false, true, false, false}, - {1, 1, true, true, true, true, false}, - - {1, 2, false, false, true, false, true}, - {1, 2, false, true, true, false, true}, - {1, 2, true, false, true, false, true}, - {1, 2, true, true, true, false, true}, - - {3, 2, false, false, false, false, true}, - {3, 2, false, true, false, false, true}, - {3, 2, true, false, false, false, true}, - {3, 2, true, true, false, false, true}, - - {2, 1, false, false, false, false, false}, - {2, 1, false, true, false, false, false}, - {2, 1, true, false, false, false, false}, - {2, 1, true, true, true, true, false}, + {1, 1, false, false, true, false, false,""}, + {1, 1, false, false, true, false, false,"testcluster"}, + {1, 1, false, false, false, false, false,"unmatchedCluster"}, + {1, 1, false, true, true, false, false,""}, + {1, 1, true, false, true, false, false,""}, + {1, 1, true, true, true, true, false,""}, + + {1, 2, false, false, true, false, true,""}, + {1, 2, false, true, true, false, true,""}, + {1, 2, true, false, true, false, true,""}, + {1, 2, true, true, true, false, true,""}, + + {3, 2, false, false, false, false, true,""}, + {3, 2, false, true, false, false, true,""}, + {3, 2, true, false, false, false, true,""}, + {3, 2, true, true, false, false, true,""}, + + {2, 1, false, false, false, false, false,""}, + {2, 1, false, true, false, false, false,""}, + {2, 1, true, false, false, false, false,""}, + {2, 1, true, true, true, true, false,""}, +} + +func checkNotifierClusterMatch(cluster string) bool { + return cluster=="" || cluster=="testcluster" } func TestCoordinator_checkAndSendResponseToModules(t *testing.T) { @@ -549,14 +556,19 @@ func TestCoordinator_checkAndSendResponseToModules(t *testing.T) { Status: testSet.Status, TotalPartitions: i, } - // Set up the mock module and expected calls mockModule := &helpers.MockModule{} coordinator.modules["test"] = mockModule + mockModule.On("GetCluster").Return(testSet.AssociatedCluster) + + if checkNotifierClusterMatch(testSet.AssociatedCluster) { + mockModule.On("GetName").Return("test") mockModule.On("GetGroupWhitelist").Return((*regexp.Regexp)(nil)) mockModule.On("GetGroupBlacklist").Return((*regexp.Regexp)(nil)) mockModule.On("AcceptConsumerGroup", response).Return(true) + } + if testSet.ExpectSend { mockModule.On("Notify", response, mock.MatchedBy(func(s string) bool { return true }), mock.MatchedBy(func(t time.Time) bool { return true }), testSet.ExpectClose).Return() } diff --git a/core/internal/notifier/email.go b/core/internal/notifier/email.go index f95a0a32..8afc5383 100644 --- a/core/internal/notifier/email.go +++ b/core/internal/notifier/email.go @@ -39,6 +39,7 @@ type EmailNotifier struct { Log *zap.Logger name string + cluster string groupWhitelist *regexp.Regexp groupBlacklist *regexp.Regexp extras map[string]string @@ -140,6 +141,11 @@ func (module *EmailNotifier) GetName() string { return module.name } +// GetCluster returns the configured name of this module +func (module *EmailNotifier) GetCluster() string { + return module.cluster +} + // GetGroupWhitelist returns the compiled group whitelist (or nil, if there is not one) func (module *EmailNotifier) GetGroupWhitelist() *regexp.Regexp { return module.groupWhitelist diff --git a/core/internal/notifier/http.go b/core/internal/notifier/http.go index 87660903..1ba852b1 100644 --- a/core/internal/notifier/http.go +++ b/core/internal/notifier/http.go @@ -41,6 +41,7 @@ type HTTPNotifier struct { Log *zap.Logger name string + cluster string groupWhitelist *regexp.Regexp groupBlacklist *regexp.Regexp extras map[string]string @@ -126,6 +127,10 @@ func (module *HTTPNotifier) GetName() string { return module.name } +func (module *HTTPNotifier) GetCluster() string { + return module.cluster +} + // GetGroupWhitelist returns the compiled group whitelist (or nil, if there is not one) func (module *HTTPNotifier) GetGroupWhitelist() *regexp.Regexp { return module.groupWhitelist diff --git a/core/internal/notifier/null.go b/core/internal/notifier/null.go index c1768ac2..3763b660 100644 --- a/core/internal/notifier/null.go +++ b/core/internal/notifier/null.go @@ -31,6 +31,7 @@ type NullNotifier struct { Log *zap.Logger name string + cluster string groupWhitelist *regexp.Regexp groupBlacklist *regexp.Regexp extras map[string]string @@ -76,6 +77,12 @@ func (module *NullNotifier) GetName() string { return module.name } +// GetCluster returns the configured name of this module +func (module *NullNotifier) GetCluster() string { + return module.cluster +} + + // GetGroupWhitelist returns the compiled group whitelist (or nil, if there is not one) func (module *NullNotifier) GetGroupWhitelist() *regexp.Regexp { return module.groupWhitelist From 8546764c5f2ba108a169473520d6d4b56c580ff9 Mon Sep 17 00:00:00 2001 From: Rui Abreu Date: Wed, 29 Jan 2020 16:25:00 +0000 Subject: [PATCH 2/3] Adding association between a Kafka cluster and a notifier --- core/internal/notifier/coordinator.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/internal/notifier/coordinator.go b/core/internal/notifier/coordinator.go index 0ba7afee..cbed530a 100644 --- a/core/internal/notifier/coordinator.go +++ b/core/internal/notifier/coordinator.go @@ -193,8 +193,7 @@ func (nc *Coordinator) Configure() { groupWhitelist = re } - var cluster string - cluster = viper.GetString(configRoot + ".cluster") + cluster := viper.GetString(configRoot + ".cluster") // Compile the blacklist for the consumer groups to not notify for var groupBlacklist *regexp.Regexp From c020a2b21e7ec4593a095c2b6c23da9cd1a8bf11 Mon Sep 17 00:00:00 2001 From: Rui Abreu Date: Wed, 29 Jan 2020 17:26:46 +0000 Subject: [PATCH 3/3] Adding association between a Kafka cluster and a notifier --- core/internal/httpserver/config.go | 4 +- core/internal/httpserver/structs.go | 2 +- core/internal/notifier/coordinator.go | 11 ++-- core/internal/notifier/coordinator_test.go | 68 +++++++++++----------- core/internal/notifier/email.go | 2 +- core/internal/notifier/http.go | 2 +- core/internal/notifier/null.go | 3 +- 7 files changed, 45 insertions(+), 47 deletions(-) diff --git a/core/internal/httpserver/config.go b/core/internal/httpserver/config.go index dc090d03..838f976e 100644 --- a/core/internal/httpserver/config.go +++ b/core/internal/httpserver/config.go @@ -214,7 +214,7 @@ func (hc *Coordinator) configNotifierHTTP(w http.ResponseWriter, r *http.Request SendClose: viper.GetBool(configRoot + ".send-close"), ExtraCa: viper.GetString(configRoot + ".extra-ca"), NoVerify: viper.GetString(configRoot + ".noverify"), - Cluster: viper.GetString(configRoot + ".cluster"), + Cluster: viper.GetString(configRoot + ".cluster"), }, Request: requestInfo, }) @@ -267,7 +267,7 @@ func (hc *Coordinator) configNotifierEmail(w http.ResponseWriter, r *http.Reques To: viper.GetString(configRoot + ".to"), ExtraCa: viper.GetString(configRoot + ".extra-ca"), NoVerify: viper.GetString(configRoot + ".noverify"), - Cluster: viper.GetString(configRoot + ".cluster"), + Cluster: viper.GetString(configRoot + ".cluster"), }, Request: requestInfo, }) diff --git a/core/internal/httpserver/structs.go b/core/internal/httpserver/structs.go index d27d46ac..96aca008 100644 --- a/core/internal/httpserver/structs.go +++ b/core/internal/httpserver/structs.go @@ -240,7 +240,7 @@ type httpResponseConfigModuleNotifierEmail struct { To string `json:"to"` ExtraCa string `json:"extra-ca"` NoVerify string `json:"noverify"` - Cluster string `json:"cluster"` + Cluster string `json:"cluster"` } type httpResponseConfigModuleNotifierNull struct { diff --git a/core/internal/notifier/coordinator.go b/core/internal/notifier/coordinator.go index cbed530a..04ef11fd 100644 --- a/core/internal/notifier/coordinator.go +++ b/core/internal/notifier/coordinator.go @@ -115,7 +115,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s extras: extras, templateOpen: templateOpen, templateClose: templateClose, - cluster: cluster, + cluster: cluster, } case "email": return &EmailNotifier{ @@ -126,7 +126,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s extras: extras, templateOpen: templateOpen, templateClose: templateClose, - cluster: cluster, + cluster: cluster, } case "null": return &NullNotifier{ @@ -137,7 +137,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s extras: extras, templateOpen: templateOpen, templateClose: templateClose, - cluster: cluster, + cluster: cluster, } default: panic("Unknown notifier className provided: " + className) @@ -228,7 +228,7 @@ func (nc *Coordinator) Configure() { templateClose = tmpl.Templates()[0] } - module := getModuleForClass(nc.App, name, viper.GetString(configRoot+".class-name"), groupWhitelist, groupBlacklist, extras, templateOpen, templateClose,cluster) + module := getModuleForClass(nc.App, name, viper.GetString(configRoot+".class-name"), groupWhitelist, groupBlacklist, extras, templateOpen, templateClose, cluster) module.Configure(name, configRoot) nc.modules[name] = module interval := viper.GetInt64(configRoot + ".interval") @@ -437,8 +437,7 @@ func (nc *Coordinator) checkAndSendResponseToModules(response *protocol.Consumer for _, genericModule := range nc.modules { module := genericModule.(Module) - - if module.GetCluster() != "" && response.Cluster!=module.GetCluster() { + if module.GetCluster() != "" && response.Cluster != module.GetCluster() { continue } // No whitelist means everything passes diff --git a/core/internal/notifier/coordinator_test.go b/core/internal/notifier/coordinator_test.go index e1c497a1..942e96f0 100644 --- a/core/internal/notifier/coordinator_test.go +++ b/core/internal/notifier/coordinator_test.go @@ -468,13 +468,13 @@ func TestCoordinator_responseLoop_HaveIncidentError(t *testing.T) { } var notifyModuleTests = []struct { - Threshold int - Status protocol.StatusConstant - Existing bool - SendClose bool - ExpectSend bool - ExpectClose bool - ExpectID bool + Threshold int + Status protocol.StatusConstant + Existing bool + SendClose bool + ExpectSend bool + ExpectClose bool + ExpectID bool AssociatedCluster string }{ /*{1, 0, false, false, false, false, false}, @@ -483,31 +483,31 @@ var notifyModuleTests = []struct { {1, 0, false, true, false, false, false}, {1, 0, true, true, false, false, false}, */ - {1, 1, false, false, true, false, false,""}, - {1, 1, false, false, true, false, false,"testcluster"}, - {1, 1, false, false, false, false, false,"unmatchedCluster"}, - {1, 1, false, true, true, false, false,""}, - {1, 1, true, false, true, false, false,""}, - {1, 1, true, true, true, true, false,""}, - - {1, 2, false, false, true, false, true,""}, - {1, 2, false, true, true, false, true,""}, - {1, 2, true, false, true, false, true,""}, - {1, 2, true, true, true, false, true,""}, - - {3, 2, false, false, false, false, true,""}, - {3, 2, false, true, false, false, true,""}, - {3, 2, true, false, false, false, true,""}, - {3, 2, true, true, false, false, true,""}, - - {2, 1, false, false, false, false, false,""}, - {2, 1, false, true, false, false, false,""}, - {2, 1, true, false, false, false, false,""}, - {2, 1, true, true, true, true, false,""}, + {1, 1, false, false, true, false, false, ""}, + {1, 1, false, false, true, false, false, "testcluster"}, + {1, 1, false, false, false, false, false, "unmatchedCluster"}, + {1, 1, false, true, true, false, false, ""}, + {1, 1, true, false, true, false, false, ""}, + {1, 1, true, true, true, true, false, ""}, + + {1, 2, false, false, true, false, true, ""}, + {1, 2, false, true, true, false, true, ""}, + {1, 2, true, false, true, false, true, ""}, + {1, 2, true, true, true, false, true, ""}, + + {3, 2, false, false, false, false, true, ""}, + {3, 2, false, true, false, false, true, ""}, + {3, 2, true, false, false, false, true, ""}, + {3, 2, true, true, false, false, true, ""}, + + {2, 1, false, false, false, false, false, ""}, + {2, 1, false, true, false, false, false, ""}, + {2, 1, true, false, false, false, false, ""}, + {2, 1, true, true, true, true, false, ""}, } func checkNotifierClusterMatch(cluster string) bool { - return cluster=="" || cluster=="testcluster" + return cluster == "" || cluster == "testcluster" } func TestCoordinator_checkAndSendResponseToModules(t *testing.T) { @@ -563,11 +563,11 @@ func TestCoordinator_checkAndSendResponseToModules(t *testing.T) { if checkNotifierClusterMatch(testSet.AssociatedCluster) { - mockModule.On("GetName").Return("test") - mockModule.On("GetGroupWhitelist").Return((*regexp.Regexp)(nil)) - mockModule.On("GetGroupBlacklist").Return((*regexp.Regexp)(nil)) - mockModule.On("AcceptConsumerGroup", response).Return(true) - } + mockModule.On("GetName").Return("test") + mockModule.On("GetGroupWhitelist").Return((*regexp.Regexp)(nil)) + mockModule.On("GetGroupBlacklist").Return((*regexp.Regexp)(nil)) + mockModule.On("AcceptConsumerGroup", response).Return(true) + } if testSet.ExpectSend { mockModule.On("Notify", response, mock.MatchedBy(func(s string) bool { return true }), mock.MatchedBy(func(t time.Time) bool { return true }), testSet.ExpectClose).Return() diff --git a/core/internal/notifier/email.go b/core/internal/notifier/email.go index 8afc5383..ab171e7c 100644 --- a/core/internal/notifier/email.go +++ b/core/internal/notifier/email.go @@ -39,7 +39,7 @@ type EmailNotifier struct { Log *zap.Logger name string - cluster string + cluster string groupWhitelist *regexp.Regexp groupBlacklist *regexp.Regexp extras map[string]string diff --git a/core/internal/notifier/http.go b/core/internal/notifier/http.go index 1ba852b1..dda7563c 100644 --- a/core/internal/notifier/http.go +++ b/core/internal/notifier/http.go @@ -41,7 +41,7 @@ type HTTPNotifier struct { Log *zap.Logger name string - cluster string + cluster string groupWhitelist *regexp.Regexp groupBlacklist *regexp.Regexp extras map[string]string diff --git a/core/internal/notifier/null.go b/core/internal/notifier/null.go index 3763b660..5ad49d36 100644 --- a/core/internal/notifier/null.go +++ b/core/internal/notifier/null.go @@ -31,7 +31,7 @@ type NullNotifier struct { Log *zap.Logger name string - cluster string + cluster string groupWhitelist *regexp.Regexp groupBlacklist *regexp.Regexp extras map[string]string @@ -82,7 +82,6 @@ func (module *NullNotifier) GetCluster() string { return module.cluster } - // GetGroupWhitelist returns the compiled group whitelist (or nil, if there is not one) func (module *NullNotifier) GetGroupWhitelist() *regexp.Regexp { return module.groupWhitelist