From 70803a44707e5949bd035c67ce5bb2f6c4b4b875 Mon Sep 17 00:00:00 2001 From: Tomasz Rogalski Date: Tue, 9 Jul 2024 13:27:28 +0200 Subject: [PATCH] Hearbeat reporting (#1469) * MOD healthChecker notifiers settings MOD extend bots and sinks with errorMsg * MOD Update report heartbeat mutation by sending also health status * MOD unify notificationKey * FIX lint * FIX CR --- cmd/botkube-agent/main.go | 123 ++++++++++++---------------- internal/health/failed.go | 27 ++++++ internal/health/health.go | 25 +++--- internal/health/health_test.go | 8 +- internal/health/status.go | 15 ++-- internal/heartbeat/gql_reporter.go | 44 ++++++++-- internal/heartbeat/noop_reporter.go | 2 +- internal/heartbeat/reporter.go | 26 +++++- internal/insights/k8s_collector.go | 2 +- pkg/bot/discord.go | 9 +- pkg/bot/mattermost.go | 11 ++- pkg/bot/slack_cloud.go | 23 +++--- pkg/bot/slack_socket.go | 7 +- pkg/bot/teams_cloud.go | 15 ++-- pkg/notifier/platform.go | 12 +++ pkg/sink/elasticsearch.go | 9 +- pkg/sink/pager_duty.go | 8 +- pkg/sink/webhook.go | 9 +- 18 files changed, 234 insertions(+), 141 deletions(-) create mode 100644 internal/health/failed.go create mode 100644 pkg/notifier/platform.go diff --git a/cmd/botkube-agent/main.go b/cmd/botkube-agent/main.go index d9cb70ab5..388afcd5e 100644 --- a/cmd/botkube-agent/main.go +++ b/cmd/botkube-agent/main.go @@ -262,82 +262,78 @@ func run(ctx context.Context) (err error) { Index: commGroupIdx + 1, } - scheduleBotNotifier := func(in bot.Bot) { - bots[fmt.Sprintf("%s-%s", commGroupName, in.IntegrationName())] = in - errGroup.Go(func() error { - defer analytics.ReportPanicIfOccurs(commGroupLogger, analyticsReporter) - return in.Start(ctx) - }) + scheduleNotifier := func(provider func() (notifier.Platform, error)) { + app, err := provider() + key := fmt.Sprintf("%s-%s", commGroupName, app.IntegrationName()) + if err != nil { + commGroupLogger.WithError(err).Errorf("while creating %s bot", app.IntegrationName()) + healthChecker.AddNotifier(key, health.NewFailed(health.FailureReasonConnectionError, err.Error())) + return + } + + healthChecker.AddNotifier(key, app) + + switch platform := app.(type) { + case notifier.Sink: + sinkNotifiers = append(sinkNotifiers, platform) + case bot.Bot: + bots[key] = platform + errGroup.Go(func() error { + defer analytics.ReportPanicIfOccurs(commGroupLogger, analyticsReporter) + return platform.Start(ctx) + }) + } } // Run bots if commGroupCfg.SocketSlack.Enabled { - sb, err := bot.NewSocketSlack(commGroupLogger.WithField(botLogFieldKey, "SocketSlack"), commGroupMeta, commGroupCfg.SocketSlack, executorFactory, analyticsReporter) - if err != nil { - return reportFatalError("while creating SocketSlack bot", err) - } - scheduleBotNotifier(sb) + scheduleNotifier(func() (notifier.Platform, error) { + return bot.NewSocketSlack(commGroupLogger.WithField(botLogFieldKey, "SocketSlack"), commGroupMeta, commGroupCfg.SocketSlack, executorFactory, analyticsReporter) + }) } if commGroupCfg.CloudSlack.Enabled { - sb, err := bot.NewCloudSlack(commGroupLogger.WithField(botLogFieldKey, "CloudSlack"), commGroupMeta, commGroupCfg.CloudSlack, conf.Settings.ClusterName, executorFactory, analyticsReporter) - if err != nil { - return reportFatalError("while creating CloudSlack bot", err) - } - scheduleBotNotifier(sb) + scheduleNotifier(func() (notifier.Platform, error) { + return bot.NewCloudSlack(commGroupLogger.WithField(botLogFieldKey, "CloudSlack"), commGroupMeta, commGroupCfg.CloudSlack, conf.Settings.ClusterName, executorFactory, analyticsReporter) + }) } if commGroupCfg.Mattermost.Enabled { - mb, err := bot.NewMattermost(ctx, commGroupLogger.WithField(botLogFieldKey, "Mattermost"), commGroupMeta, commGroupCfg.Mattermost, executorFactory, analyticsReporter) - if err != nil { - return reportFatalError("while creating Mattermost bot", err) - } - scheduleBotNotifier(mb) + scheduleNotifier(func() (notifier.Platform, error) { + return bot.NewMattermost(ctx, commGroupLogger.WithField(botLogFieldKey, "Mattermost"), commGroupMeta, commGroupCfg.Mattermost, executorFactory, analyticsReporter) + }) } if commGroupCfg.CloudTeams.Enabled { - ctb, err := bot.NewCloudTeams(commGroupLogger.WithField(botLogFieldKey, "CloudTeams"), commGroupMeta, commGroupCfg.CloudTeams, conf.Settings.ClusterName, executorFactory, analyticsReporter) - if err != nil { - return reportFatalError("while creating CloudTeams bot", err) - } - scheduleBotNotifier(ctb) + scheduleNotifier(func() (notifier.Platform, error) { + return bot.NewCloudTeams(commGroupLogger.WithField(botLogFieldKey, "CloudTeams"), commGroupMeta, commGroupCfg.CloudTeams, conf.Settings.ClusterName, executorFactory, analyticsReporter) + }) } if commGroupCfg.Discord.Enabled { - db, err := bot.NewDiscord(commGroupLogger.WithField(botLogFieldKey, "Discord"), commGroupMeta, commGroupCfg.Discord, executorFactory, analyticsReporter) - if err != nil { - return reportFatalError("while creating Discord bot", err) - } - scheduleBotNotifier(db) + scheduleNotifier(func() (notifier.Platform, error) { + return bot.NewDiscord(commGroupLogger.WithField(botLogFieldKey, "Discord"), commGroupMeta, commGroupCfg.Discord, executorFactory, analyticsReporter) + }) } // Run sinks if commGroupCfg.Elasticsearch.Enabled { - es, err := sink.NewElasticsearch(commGroupLogger.WithField(sinkLogFieldKey, "Elasticsearch"), commGroupMeta.Index, commGroupCfg.Elasticsearch, analyticsReporter) - if err != nil { - return reportFatalError("while creating Elasticsearch sink", err) - } - sinkNotifiers = append(sinkNotifiers, es) + scheduleNotifier(func() (notifier.Platform, error) { + return sink.NewElasticsearch(commGroupLogger.WithField(sinkLogFieldKey, "Elasticsearch"), commGroupMeta.Index, commGroupCfg.Elasticsearch, analyticsReporter) + }) } if commGroupCfg.Webhook.Enabled { - wh, err := sink.NewWebhook(commGroupLogger.WithField(sinkLogFieldKey, "Webhook"), commGroupMeta.Index, commGroupCfg.Webhook, analyticsReporter) - if err != nil { - return reportFatalError("while creating Webhook sink", err) - } - - sinkNotifiers = append(sinkNotifiers, wh) + scheduleNotifier(func() (notifier.Platform, error) { + return sink.NewWebhook(commGroupLogger.WithField(sinkLogFieldKey, "Webhook"), commGroupMeta.Index, commGroupCfg.Webhook, analyticsReporter) + }) } if commGroupCfg.PagerDuty.Enabled { - pd, err := sink.NewPagerDuty(commGroupLogger.WithField(sinkLogFieldKey, "PagerDuty"), commGroupMeta.Index, commGroupCfg.PagerDuty, conf.Settings.ClusterName, analyticsReporter) - if err != nil { - return reportFatalError("while creating PagerDuty sink", err) - } - - sinkNotifiers = append(sinkNotifiers, pd) + scheduleNotifier(func() (notifier.Platform, error) { + return sink.NewPagerDuty(commGroupLogger.WithField(sinkLogFieldKey, "PagerDuty"), commGroupMeta.Index, commGroupCfg.PagerDuty, conf.Settings.ClusterName, analyticsReporter) + }) } } - healthChecker.SetNotifiers(getHealthNotifiers(bots, sinkNotifiers)) if conf.ConfigWatcher.Enabled { restarter := reloader.NewRestarter( @@ -448,7 +444,7 @@ func run(ctx context.Context) (err error) { logger.Errorf("while reporting fatal error: %s", reportErr.Error()) } }() - heartbeatReporter := heartbeat.GetReporter(logger, gqlClient) + heartbeatReporter := heartbeat.GetReporter(logger, gqlClient, healthChecker) k8sCollector := insights.NewK8sCollector(k8sCli, heartbeatReporter, logger, reportHeartbeatInterval, reportHeartbeatMaxRetries) return k8sCollector.Start(ctx) }) @@ -496,12 +492,7 @@ func getAnalyticsReporter(disableAnalytics bool, logger logrus.FieldLogger) (ana return nil, fmt.Errorf("while creating new Analytics Client: %w", err) } - analyticsReporter := analytics.NewSegmentReporter(wrappedLogger, segmentCli) - if err != nil { - return nil, err - } - - return analyticsReporter, nil + return analytics.NewSegmentReporter(wrappedLogger, segmentCli), nil } func getK8sClients(cfg *rest.Config) (dynamic.Interface, discovery.DiscoveryInterface, error) { @@ -555,15 +546,15 @@ func sendHelp(ctx context.Context, s *storage.Help, clusterName string, executor var sent []string - for key, notifier := range notifiers { + for key, notifierItem := range notifiers { if alreadySentHelp[key] { continue } - help := interactive.NewHelpMessage(notifier.IntegrationName(), clusterName, executors).Build(true) - err := notifier.SendMessageToAll(ctx, help) + help := interactive.NewHelpMessage(notifierItem.IntegrationName(), clusterName, executors).Build(true) + err := notifierItem.SendMessageToAll(ctx, help) if err != nil { - return fmt.Errorf("while sending help message for %s: %w", notifier.IntegrationName(), err) + return fmt.Errorf("while sending help message for %s: %w", notifierItem.IntegrationName(), err) } sent = append(sent, key) } @@ -584,15 +575,3 @@ func findVersions(cli *kubernetes.Clientset) (string, string, error) { return fmt.Sprintf("K8s Server Version: %s\nBotkube version: %s", k8sVer.String(), botkubeVersion), k8sVer.String(), nil } - -func getHealthNotifiers(bots map[string]bot.Bot, sinks []notifier.Sink) map[string]health.Notifier { - notifiers := make(map[string]health.Notifier) - for key, botInstance := range bots { - notifiers[key] = botInstance - } - for key, sinkInstance := range sinks { - notifiers[fmt.Sprintf("%s-%d", sinkInstance.IntegrationName(), key)] = sinkInstance - } - - return notifiers -} diff --git a/internal/health/failed.go b/internal/health/failed.go new file mode 100644 index 000000000..93b8f27d8 --- /dev/null +++ b/internal/health/failed.go @@ -0,0 +1,27 @@ +package health + +// Failed represents failed platform. +type Failed struct { + status PlatformStatusMsg + failureReason FailureReasonMsg + errorMsg string +} + +// NewFailed creates a new Failed instance. +func NewFailed(failureReason FailureReasonMsg, errorMsg string) *Failed { + return &Failed{ + status: StatusUnHealthy, + failureReason: failureReason, + errorMsg: errorMsg, + } +} + +// GetStatus gets bot status. +func (b *Failed) GetStatus() PlatformStatus { + return PlatformStatus{ + Status: b.status, + Restarts: "0/0", + Reason: b.failureReason, + ErrorMsg: b.errorMsg, + } +} diff --git a/internal/health/health.go b/internal/health/health.go index ca2ad1fcb..e86f5124c 100644 --- a/internal/health/health.go +++ b/internal/health/health.go @@ -39,6 +39,7 @@ func NewChecker(ctx context.Context, config *config.Config, stats *plugin.Health ctx: ctx, config: config, pluginHealthStats: stats, + notifiers: map[string]Notifier{}, } } @@ -60,7 +61,7 @@ func (h *Checker) ServeHTTP(resp http.ResponseWriter, _ *http.Request) { } resp.Header().Set("Content-Type", "application/json") - status := h.getStatus() + status := h.GetStatus() respJSon, err := json.Marshal(status) if err != nil { http.Error(resp, err.Error(), http.StatusInternalServerError) @@ -79,18 +80,18 @@ func (h *Checker) NewServer(log logrus.FieldLogger, port string) *httpx.Server { return httpx.NewServer(log, addr, router) } -// SetNotifiers sets platform bots instances. -func (h *Checker) SetNotifiers(notifiers map[string]Notifier) { - h.notifiers = notifiers +// AddNotifier add platform bot instance +func (h *Checker) AddNotifier(key string, notifier Notifier) { + h.notifiers[key] = notifier } -func (h *Checker) getStatus() *status { - pluginsStats := make(map[string]pluginStatuses) +func (h *Checker) GetStatus() *Status { + pluginsStats := make(map[string]PluginStatus) h.collectSourcePluginsStatuses(pluginsStats) h.collectExecutorPluginsStatuses(pluginsStats) - return &status{ - Botkube: botStatus{ + return &Status{ + Botkube: BotStatus{ Status: h.getBotkubeStatus(), }, Plugins: pluginsStats, @@ -98,7 +99,7 @@ func (h *Checker) getStatus() *status { } } -func (h *Checker) collectSourcePluginsStatuses(plugins map[string]pluginStatuses) { +func (h *Checker) collectSourcePluginsStatuses(plugins map[string]PluginStatus) { if h.config == nil { return } @@ -109,7 +110,7 @@ func (h *Checker) collectSourcePluginsStatuses(plugins map[string]pluginStatuses } } -func (h *Checker) collectExecutorPluginsStatuses(plugins map[string]pluginStatuses) { +func (h *Checker) collectExecutorPluginsStatuses(plugins map[string]PluginStatus) { if h.config == nil { return } @@ -120,9 +121,9 @@ func (h *Checker) collectExecutorPluginsStatuses(plugins map[string]pluginStatus } } -func (h *Checker) collectPluginStatus(plugins map[string]pluginStatuses, pluginConfigName string, pluginName string, enabled bool) { +func (h *Checker) collectPluginStatus(plugins map[string]PluginStatus, pluginConfigName string, pluginName string, enabled bool) { status, restarts, threshold, _ := h.pluginHealthStats.GetStats(pluginName) - plugins[pluginConfigName] = pluginStatuses{ + plugins[pluginConfigName] = PluginStatus{ Enabled: enabled, Status: status, Restarts: fmt.Sprintf("%d/%d", restarts, threshold), diff --git a/internal/health/health_test.go b/internal/health/health_test.go index ec37ea864..7d0a80f9a 100644 --- a/internal/health/health_test.go +++ b/internal/health/health_test.go @@ -16,7 +16,7 @@ import ( func TestServeHTTPUnavailable(t *testing.T) { // given checker := NewChecker(context.TODO(), &config.Config{}, nil) - expectedStatus := checker.getStatus() + expectedStatus := checker.GetStatus() req, err := http.NewRequest("GET", "/", nil) require.NoError(t, err) @@ -29,7 +29,7 @@ func TestServeHTTPUnavailable(t *testing.T) { assert.Equal(t, http.StatusServiceUnavailable, rr.Code) assert.Equal(t, "application/json", rr.Header().Get("Content-Type")) - var resp status + var resp Status err = json.Unmarshal(rr.Body.Bytes(), &resp) require.NoError(t, err) @@ -41,7 +41,7 @@ func TestServeHTTPOK(t *testing.T) { // given checker := NewChecker(context.TODO(), &config.Config{}, nil) checker.MarkAsReady() - expectedStatus := checker.getStatus() + expectedStatus := checker.GetStatus() req, err := http.NewRequest("GET", "/", nil) require.NoError(t, err) @@ -54,7 +54,7 @@ func TestServeHTTPOK(t *testing.T) { assert.Equal(t, http.StatusOK, rr.Code) assert.Equal(t, "application/json", rr.Header().Get("Content-Type")) - var resp status + var resp Status err = json.Unmarshal(rr.Body.Bytes(), &resp) require.NoError(t, err) diff --git a/internal/health/status.go b/internal/health/status.go index 41383a807..509d42727 100644 --- a/internal/health/status.go +++ b/internal/health/status.go @@ -25,23 +25,24 @@ type PlatformStatus struct { Status PlatformStatusMsg `json:"status,omitempty"` Restarts string `json:"restarts,omitempty"` Reason FailureReasonMsg `json:"reason,omitempty"` + ErrorMsg string `json:"errorMsg,omitempty"` } -// status defines bot agent status. -type status struct { - Botkube botStatus `json:"botkube"` - Plugins map[string]pluginStatuses `json:"plugins,omitempty"` - Platforms platformStatuses `json:"platforms,omitempty"` +// Status defines bot agent status. +type Status struct { + Botkube BotStatus `json:"botkube"` + Plugins map[string]PluginStatus `json:"plugins,omitempty"` + Platforms platformStatuses `json:"platforms,omitempty"` } type platformStatuses map[string]PlatformStatus -type pluginStatuses struct { +type PluginStatus struct { Enabled bool `json:"enabled,omitempty"` Status string `json:"status,omitempty"` Restarts string `json:"restarts,omitempty"` } -type botStatus struct { +type BotStatus struct { Status BotkubeStatus `json:"status,omitempty"` } diff --git a/internal/heartbeat/gql_reporter.go b/internal/heartbeat/gql_reporter.go index bd8526fc9..e1ccd13c4 100644 --- a/internal/heartbeat/gql_reporter.go +++ b/internal/heartbeat/gql_reporter.go @@ -6,6 +6,8 @@ import ( "github.com/hasura/go-graphql-client" "github.com/pkg/errors" "github.com/sirupsen/logrus" + + "github.com/kubeshop/botkube/internal/health" ) var _ HeartbeatReporter = (*GraphQLHeartbeatReporter)(nil) @@ -18,29 +20,53 @@ type GraphQLClient interface { // GraphQLHeartbeatReporter reports heartbeat to GraphQL server. type GraphQLHeartbeatReporter struct { - log logrus.FieldLogger - gql GraphQLClient + log logrus.FieldLogger + gql GraphQLClient + healthChecker health.Checker } -func newGraphQLHeartbeatReporter(logger logrus.FieldLogger, client GraphQLClient) *GraphQLHeartbeatReporter { +func newGraphQLHeartbeatReporter(logger logrus.FieldLogger, client GraphQLClient, healthChecker health.Checker) *GraphQLHeartbeatReporter { return &GraphQLHeartbeatReporter{ - log: logger, - gql: client, + log: logger, + gql: client, + healthChecker: healthChecker, } } -func (r *GraphQLHeartbeatReporter) ReportHeartbeat(ctx context.Context, heartbeat DeploymentHeartbeatInput) error { +func (r *GraphQLHeartbeatReporter) ReportHeartbeat(ctx context.Context, heartbeat ReportHeartbeat) error { logger := r.log.WithFields(logrus.Fields{ "deploymentID": r.gql.DeploymentID(), "heartbeat": heartbeat, }) logger.Debug("Sending heartbeat...") var mutation struct { - Success bool `graphql:"reportDeploymentHeartbeat(id: $id, in: $heartbeat)"` + Success bool `graphql:"reportDeploymentHeartbeat(id: $id, in: $input)"` + } + status := r.healthChecker.GetStatus() + var pluginsStatuses []DeploymentHeartbeatHealthPluginInput + var platformsStatuses []DeploymentHeartbeatHealthPlatformInput + for pluginKey, pluginStatus := range status.Plugins { + pluginsStatuses = append(pluginsStatuses, DeploymentHeartbeatHealthPluginInput{ + Key: pluginKey, + Value: pluginStatus, + }) + } + for platformKey, platformStatus := range status.Platforms { + platformsStatuses = append(platformsStatuses, DeploymentHeartbeatHealthPlatformInput{ + Key: platformKey, + Value: platformStatus, + }) } variables := map[string]interface{}{ - "id": graphql.ID(r.gql.DeploymentID()), - "heartbeat": heartbeat, + "id": graphql.ID(r.gql.DeploymentID()), + "input": DeploymentHeartbeatInput{ + NodeCount: heartbeat.NodeCount, + Health: &DeploymentHeartbeatHealthInput{ + Botkube: status.Botkube, + Plugins: pluginsStatuses, + Platforms: platformsStatuses, + }, + }, } err := r.gql.Client().Mutate(ctx, &mutation, variables) if err != nil { diff --git a/internal/heartbeat/noop_reporter.go b/internal/heartbeat/noop_reporter.go index 6a98fad2b..fcd44b6b0 100644 --- a/internal/heartbeat/noop_reporter.go +++ b/internal/heartbeat/noop_reporter.go @@ -8,6 +8,6 @@ var _ HeartbeatReporter = (*NoopHeartbeatReporter)(nil) type NoopHeartbeatReporter struct{} -func (n NoopHeartbeatReporter) ReportHeartbeat(context.Context, DeploymentHeartbeatInput) error { +func (n NoopHeartbeatReporter) ReportHeartbeat(context.Context, ReportHeartbeat) error { return nil } diff --git a/internal/heartbeat/reporter.go b/internal/heartbeat/reporter.go index e137db090..1c38cf01d 100644 --- a/internal/heartbeat/reporter.go +++ b/internal/heartbeat/reporter.go @@ -4,19 +4,41 @@ import ( "context" "github.com/sirupsen/logrus" + + "github.com/kubeshop/botkube/internal/health" ) type DeploymentHeartbeatInput struct { + NodeCount int `json:"nodeCount"` + Health *DeploymentHeartbeatHealthInput `json:"health,omitempty"` +} + +type DeploymentHeartbeatHealthPluginInput struct { + Key string `json:"key"` + Value health.PluginStatus `json:"value"` +} +type DeploymentHeartbeatHealthPlatformInput struct { + Key string `json:"key"` + Value health.PlatformStatus `json:"value"` +} +type DeploymentHeartbeatHealthInput struct { + Botkube health.BotStatus `json:"botkube"` + Plugins []DeploymentHeartbeatHealthPluginInput `json:"plugins,omitempty"` + Platforms []DeploymentHeartbeatHealthPlatformInput `json:"platforms,omitempty"` +} + +type ReportHeartbeat struct { NodeCount int `json:"nodeCount"` } type HeartbeatReporter interface { - ReportHeartbeat(ctx context.Context, heartBeat DeploymentHeartbeatInput) error + ReportHeartbeat(ctx context.Context, heartBeat ReportHeartbeat) error } -func GetReporter(logger logrus.FieldLogger, gql GraphQLClient) HeartbeatReporter { +func GetReporter(logger logrus.FieldLogger, gql GraphQLClient, healthChecker health.Checker) HeartbeatReporter { return newGraphQLHeartbeatReporter( logger.WithField("component", "GraphQLHeartbeatReporter"), gql, + healthChecker, ) } diff --git a/internal/insights/k8s_collector.go b/internal/insights/k8s_collector.go index ac434b865..e28c23c69 100644 --- a/internal/insights/k8s_collector.go +++ b/internal/insights/k8s_collector.go @@ -44,7 +44,7 @@ func (k *K8sCollector) Start(ctx context.Context) error { k.failureCount.Add(1) } else { k.failureCount.Store(0) - err = k.heartbeatReporter.ReportHeartbeat(ctx, heartbeat.DeploymentHeartbeatInput{NodeCount: len(list.Items)}) + err = k.heartbeatReporter.ReportHeartbeat(ctx, heartbeat.ReportHeartbeat{NodeCount: len(list.Items)}) if err != nil { k.logger.Errorf("while reporting heartbeat: %s", err.Error()) } diff --git a/pkg/bot/discord.go b/pkg/bot/discord.go index 3fc2d5500..2b83c0801 100644 --- a/pkg/bot/discord.go +++ b/pkg/bot/discord.go @@ -57,6 +57,7 @@ type Discord struct { shutdownOnce sync.Once status health.PlatformStatusMsg failureReason health.FailureReasonMsg + errorMsg string } // discordMessage contains message details to execute command and send back the result. @@ -124,7 +125,7 @@ func (b *Discord) Start(ctx context.Context) error { // Open a websocket connection to Discord and begin listening. err := b.api.Open() if err != nil { - b.setFailureReason(health.FailureReasonConnectionError) + b.setFailureReason(health.FailureReasonConnectionError, fmt.Sprintf("while opening connection: %s", err.Error())) return fmt.Errorf("while opening connection: %w", err) } @@ -134,7 +135,7 @@ func (b *Discord) Start(ctx context.Context) error { } b.log.Info("Botkube connected to Discord!") - b.setFailureReason("") + b.setFailureReason("", "") go b.startMessageProcessor(ctx) <-ctx.Done() b.log.Info("Shutdown requested. Finishing...") @@ -423,13 +424,14 @@ func discordError(err error, channel string) error { return err } -func (b *Discord) setFailureReason(reason health.FailureReasonMsg) { +func (b *Discord) setFailureReason(reason health.FailureReasonMsg, errorMsg string) { if reason == "" { b.status = health.StatusHealthy } else { b.status = health.StatusUnHealthy } b.failureReason = reason + b.errorMsg = errorMsg } // GetStatus gets bot status. @@ -438,5 +440,6 @@ func (b *Discord) GetStatus() health.PlatformStatus { Status: b.status, Restarts: "0/0", Reason: b.failureReason, + ErrorMsg: b.errorMsg, } } diff --git a/pkg/bot/mattermost.go b/pkg/bot/mattermost.go index 41b33dbaa..97b18ce4f 100644 --- a/pkg/bot/mattermost.go +++ b/pkg/bot/mattermost.go @@ -72,6 +72,7 @@ type Mattermost struct { shutdownOnce sync.Once status health.PlatformStatusMsg failureReason health.FailureReasonMsg + errorMsg string } // mattermostMessage contains message details to execute command and send back the result @@ -164,7 +165,7 @@ func (b *Mattermost) Start(ctx context.Context) error { // Check connection to Mattermost server err := b.checkServerConnection(ctx) if err != nil { - b.setStatusReason(health.FailureReasonConnectionError) + b.setStatusReason(health.FailureReasonConnectionError, fmt.Sprintf("while pinging Mattermost server %q: %s", b.serverURL, err.Error())) return fmt.Errorf("while pinging Mattermost server %q: %w", b.serverURL, err) } @@ -177,7 +178,7 @@ func (b *Mattermost) Start(ctx context.Context) error { // For now, we are adding retry logic to reconnect to the server // https://github.com/kubeshop/botkube/issues/201 b.log.Info("Botkube connected to Mattermost!") - b.setStatusReason("") + b.setStatusReason("", "") go b.startMessageProcessor(ctx) for { @@ -190,7 +191,7 @@ func (b *Mattermost) Start(ctx context.Context) error { var appErr error b.wsClient, appErr = model.NewWebSocketClient4(b.webSocketURL, b.apiClient.AuthToken) if appErr != nil { - b.setStatusReason(health.FailureReasonConnectionError) + b.setStatusReason(health.FailureReasonConnectionError, fmt.Sprintf("while creating WebSocket connection: %s", appErr.Error())) return fmt.Errorf("while creating WebSocket connection: %w", appErr) } b.listen(ctx) @@ -593,13 +594,14 @@ func postFromEvent(event *model.WebSocketEvent) (*model.Post, error) { return post, nil } -func (b *Mattermost) setStatusReason(reason health.FailureReasonMsg) { +func (b *Mattermost) setStatusReason(reason health.FailureReasonMsg, errorMsg string) { if reason == "" { b.status = health.StatusHealthy } else { b.status = health.StatusUnHealthy } b.failureReason = reason + b.errorMsg = errorMsg } // GetStatus gets bot status. @@ -608,5 +610,6 @@ func (b *Mattermost) GetStatus() health.PlatformStatus { Status: b.status, Restarts: "0/0", Reason: b.failureReason, + ErrorMsg: b.errorMsg, } } diff --git a/pkg/bot/slack_cloud.go b/pkg/bot/slack_cloud.go index 349d40cd1..3c671f387 100644 --- a/pkg/bot/slack_cloud.go +++ b/pkg/bot/slack_cloud.go @@ -67,6 +67,7 @@ type CloudSlack struct { status health.PlatformStatusMsg failuresNo int failureReason health.FailureReasonMsg + errorMsg string reportOnce sync.Once } @@ -115,7 +116,7 @@ func NewCloudSlack(log logrus.FieldLogger, func (b *CloudSlack) Start(ctx context.Context) error { if b.cfg.ExecutionEventStreamingDisabled { - b.setFailureReason(health.FailureReasonQuotaExceeded) + b.setFailureReason(health.FailureReasonQuotaExceeded, quotaExceededMsg) b.log.Warn(quotaExceededMsg) return nil } @@ -132,11 +133,11 @@ func (b *CloudSlack) withRetries(ctx context.Context, log logrus.FieldLogger, ma // if the last run was long enough, we treat is as success, so we reset failures log.Infof("Resetting failures counter as last failure was more than %s ago", successIntervalDuration) b.failuresNo = 0 - b.setFailureReason("") + b.setFailureReason("", "") } lastFailureTimestamp = time.Now() b.failuresNo++ - b.setFailureReason(health.FailureReasonConnectionError) + b.setFailureReason(health.FailureReasonConnectionError, err.Error()) return retry.BackOffDelay(uint(b.failuresNo), err, cfg) } @@ -144,7 +145,7 @@ func (b *CloudSlack) withRetries(ctx context.Context, log logrus.FieldLogger, ma func() error { err := fn() if b.failuresNo >= maxRetries { - b.setFailureReason(health.FailureReasonMaxRetriesExceeded) + b.setFailureReason(health.FailureReasonMaxRetriesExceeded, fmt.Sprintf("Reached max number of %d retries", maxRetries)) log.Debugf("Reached max number of %d retries: %s", maxRetries, err) return retry.Unrecoverable(err) } @@ -216,7 +217,7 @@ func (b *CloudSlack) start(ctx context.Context) error { return fmt.Errorf("while sending gRPC connection request. %w", err) } - b.setFailureReason("") + b.setFailureReason("", "") go b.startMessageProcessor(ctx, messageWorkers, messages) b.reportOnce.Do(func() { @@ -225,7 +226,7 @@ func (b *CloudSlack) start(ctx context.Context) error { } }) b.failuresNo = 0 // Reset the failures to start exponential back-off from the beginning - b.setFailureReason("") + b.setFailureReason("", "") b.log.Info("Botkube connected to Slack!") for { @@ -267,9 +268,9 @@ func (b *CloudSlack) shutdown(messageWorkers *pool.Pool, messages chan *pb.Conne } func (b *CloudSlack) handleStreamMessage(ctx context.Context, data *pb.ConnectResponse) (error, bool) { - b.setFailureReason("") + b.setFailureReason("", "") if streamingError := b.checkStreamingError(data.Event); pb.IsQuotaExceededErr(streamingError) { - b.setFailureReason(health.FailureReasonQuotaExceeded) + b.setFailureReason(health.FailureReasonQuotaExceeded, quotaExceededMsg) b.log.Warn(quotaExceededMsg) return nil, true } @@ -309,7 +310,7 @@ func (b *CloudSlack) handleStreamMessage(ctx context.Context, data *pb.ConnectRe UserID: ev.User, EventTimeStamp: ev.EventTimeStamp, } - b.setFailureReason(health.FailureReasonQuotaExceeded) + b.setFailureReason(health.FailureReasonQuotaExceeded, quotaExceededMsg) response := quotaExceeded() if err := b.send(ctx, msg, response); err != nil { @@ -750,13 +751,14 @@ func quotaExceeded() interactive.CoreMessage { } } -func (b *CloudSlack) setFailureReason(reason health.FailureReasonMsg) { +func (b *CloudSlack) setFailureReason(reason health.FailureReasonMsg, errorMsg string) { if reason == "" { b.status = health.StatusHealthy } else { b.status = health.StatusUnHealthy } b.failureReason = reason + b.errorMsg = errorMsg } // GetStatus gets bot status. @@ -765,6 +767,7 @@ func (b *CloudSlack) GetStatus() health.PlatformStatus { Status: b.status, Restarts: fmt.Sprintf("%d/%d", b.failuresNo, maxRetries), Reason: b.failureReason, + ErrorMsg: b.errorMsg, } } diff --git a/pkg/bot/slack_socket.go b/pkg/bot/slack_socket.go index e5b95a4db..04a3c22c7 100644 --- a/pkg/bot/slack_socket.go +++ b/pkg/bot/slack_socket.go @@ -64,6 +64,7 @@ type SocketSlack struct { shutdownOnce sync.Once status health.PlatformStatusMsg failureReason health.FailureReasonMsg + errorMsg string } // socketSlackAnalyticsReporter defines a reporter that collects analytics data. @@ -128,7 +129,7 @@ func (b *SocketSlack) Start(ctx context.Context) error { } }() - b.setFailureReason("") + b.setFailureReason("", "") go b.startMessageProcessor(ctx) for { @@ -746,13 +747,14 @@ func (b *SocketSlack) getRealNameWithFallbackToUserID(ctx context.Context, userI return user.RealName } -func (b *SocketSlack) setFailureReason(reason health.FailureReasonMsg) { +func (b *SocketSlack) setFailureReason(reason health.FailureReasonMsg, errorMsg string) { if reason == "" { b.status = health.StatusHealthy } else { b.status = health.StatusUnHealthy } b.failureReason = reason + b.errorMsg = errorMsg } func (b *SocketSlack) GetStatus() health.PlatformStatus { @@ -760,6 +762,7 @@ func (b *SocketSlack) GetStatus() health.PlatformStatus { Status: b.status, Restarts: "0/0", Reason: b.failureReason, + ErrorMsg: b.errorMsg, } } diff --git a/pkg/bot/teams_cloud.go b/pkg/bot/teams_cloud.go index aa00aa89a..4e85853a5 100644 --- a/pkg/bot/teams_cloud.go +++ b/pkg/bot/teams_cloud.go @@ -81,6 +81,7 @@ type CloudTeams struct { status health.PlatformStatusMsg failuresNo int failureReason health.FailureReasonMsg + errorMsg string reportOnce sync.Once botMentionRegex *regexp.Regexp botName string @@ -183,6 +184,7 @@ func (b *CloudTeams) GetStatus() health.PlatformStatus { Status: b.status, Restarts: fmt.Sprintf("%d/%d", b.failuresNo, maxRetries), Reason: b.failureReason, + ErrorMsg: b.errorMsg, } } @@ -204,7 +206,7 @@ func (b *CloudTeams) start(ctx context.Context) error { } }) b.failuresNo = 0 // Reset the failures to start exponential back-off from the beginning - b.setFailureReason("") + b.setFailureReason("", "") b.log.Info("Botkube connected to Cloud Teams!") parallel, ctx := errgroup.WithContext(ctx) @@ -226,11 +228,11 @@ func (b *CloudTeams) withRetries(ctx context.Context, log logrus.FieldLogger, ma // if the last run was long enough, we treat is as success, so we reset failures log.Infof("Resetting failures counter as last failure was more than %s ago", successIntervalDuration) b.failuresNo = 0 - b.setFailureReason("") + b.setFailureReason("", "") } lastFailureTimestamp = time.Now() b.failuresNo++ - b.setFailureReason(health.FailureReasonConnectionError) + b.setFailureReason(health.FailureReasonConnectionError, err.Error()) return retry.BackOffDelay(uint(b.failuresNo), err, cfg) } @@ -238,7 +240,7 @@ func (b *CloudTeams) withRetries(ctx context.Context, log logrus.FieldLogger, ma func() error { err := fn() if b.failuresNo >= maxRetries { - b.setFailureReason(health.FailureReasonMaxRetriesExceeded) + b.setFailureReason(health.FailureReasonMaxRetriesExceeded, fmt.Sprintf("Reached max number of %d retries", maxRetries)) log.Debugf("Reached max number of %d retries: %s", maxRetries, err) return retry.Unrecoverable(err) } @@ -255,7 +257,7 @@ func (b *CloudTeams) withRetries(ctx context.Context, log logrus.FieldLogger, ma } func (b *CloudTeams) handleStreamMessage(ctx context.Context, data *pb.CloudActivity) (*pb.AgentActivity, error) { - b.setFailureReason("") + b.setFailureReason("", "") var act schema.Activity err := json.Unmarshal(data.Event, &act) if err != nil { @@ -468,13 +470,14 @@ func (b *CloudTeams) trimBotMention(msg string) string { return b.botMentionRegex.ReplaceAllString(msg, "") } -func (b *CloudTeams) setFailureReason(reason health.FailureReasonMsg) { +func (b *CloudTeams) setFailureReason(reason health.FailureReasonMsg, errorMsg string) { if reason == "" { b.status = health.StatusHealthy } else { b.status = health.StatusUnHealthy } b.failureReason = reason + b.errorMsg = errorMsg } type teamsCloudChannelConfigByID struct { diff --git a/pkg/notifier/platform.go b/pkg/notifier/platform.go new file mode 100644 index 000000000..ad8c5acf4 --- /dev/null +++ b/pkg/notifier/platform.go @@ -0,0 +1,12 @@ +package notifier + +import ( + "github.com/kubeshop/botkube/internal/health" + "github.com/kubeshop/botkube/pkg/config" +) + +// Platform represents platform notifier +type Platform interface { + GetStatus() health.PlatformStatus + IntegrationName() config.CommPlatformIntegration +} diff --git a/pkg/sink/elasticsearch.go b/pkg/sink/elasticsearch.go index 3a6b24ea2..79daacbdc 100644 --- a/pkg/sink/elasticsearch.go +++ b/pkg/sink/elasticsearch.go @@ -52,6 +52,7 @@ type Elasticsearch struct { clusterVersion string status health.PlatformStatusMsg failureReason health.FailureReasonMsg + errorMsg string } // NewElasticsearch creates a new Elasticsearch instance. @@ -217,12 +218,12 @@ func (e *Elasticsearch) SendEvent(ctx context.Context, rawData any, sources []st } err := e.flushIndex(ctx, indexCfg, rawData) if err != nil { - e.setFailureReason(health.FailureReasonConnectionError) + e.setFailureReason(health.FailureReasonConnectionError, fmt.Sprintf("while sending event to Elasticsearch index %q: %s", indexCfg.Name, err.Error())) errs = multierror.Append(errs, fmt.Errorf("while sending event to Elasticsearch index %q: %w", indexCfg.Name, err)) continue } - e.setFailureReason("") + e.setFailureReason("", "") e.log.Debugf("Event successfully sent to Elasticsearch index %q", indexCfg.Name) } @@ -239,13 +240,14 @@ func (e *Elasticsearch) Type() config.IntegrationType { return config.SinkIntegrationType } -func (e *Elasticsearch) setFailureReason(reason health.FailureReasonMsg) { +func (e *Elasticsearch) setFailureReason(reason health.FailureReasonMsg, errorMsg string) { if reason == "" { e.status = health.StatusHealthy } else { e.status = health.StatusUnHealthy } e.failureReason = reason + e.errorMsg = errorMsg } // GetStatus gets sink status @@ -254,6 +256,7 @@ func (e *Elasticsearch) GetStatus() health.PlatformStatus { Status: e.status, Restarts: "0/0", Reason: e.failureReason, + ErrorMsg: e.errorMsg, } } diff --git a/pkg/sink/pager_duty.go b/pkg/sink/pager_duty.go index 01f09c516..159f2a9e9 100644 --- a/pkg/sink/pager_duty.go +++ b/pkg/sink/pager_duty.go @@ -33,6 +33,7 @@ type PagerDuty struct { status health.PlatformStatusMsg failureReason health.FailureReasonMsg + errorMsg string statusMux sync.Mutex } @@ -96,7 +97,7 @@ func (w *PagerDuty) SendEvent(ctx context.Context, rawData any, sources []string resp, err := w.postEvent(ctx, in) if err != nil { - w.setFailureReason(health.FailureReasonConnectionError) + w.setFailureReason(health.FailureReasonConnectionError, fmt.Sprintf("while sending message to PagerDuty: %s", err.Error())) return fmt.Errorf("while sending message to PagerDuty: %w", err) } @@ -121,6 +122,7 @@ func (w *PagerDuty) GetStatus() health.PlatformStatus { Status: w.status, Restarts: "0/0", Reason: w.failureReason, + ErrorMsg: w.errorMsg, } } @@ -218,7 +220,7 @@ func (w *PagerDuty) triggerChange(ctx context.Context, in *incomingEvent, meta e }) } -func (w *PagerDuty) setFailureReason(reason health.FailureReasonMsg) { +func (w *PagerDuty) setFailureReason(reason health.FailureReasonMsg, errorMsg string) { if reason == "" { return } @@ -228,6 +230,7 @@ func (w *PagerDuty) setFailureReason(reason health.FailureReasonMsg) { w.status = health.StatusUnHealthy w.failureReason = reason + w.errorMsg = errorMsg } func (w *PagerDuty) markHealthy() { @@ -240,4 +243,5 @@ func (w *PagerDuty) markHealthy() { w.status = health.StatusHealthy w.failureReason = "" + w.errorMsg = "" } diff --git a/pkg/sink/webhook.go b/pkg/sink/webhook.go index 8dc466843..88ddbb12a 100644 --- a/pkg/sink/webhook.go +++ b/pkg/sink/webhook.go @@ -27,6 +27,7 @@ type Webhook struct { Bindings config.SinkBindings status health.PlatformStatusMsg failureReason health.FailureReasonMsg + errorMsg string } // WebhookPayload contains json payload to be sent to webhook url @@ -64,11 +65,11 @@ func (w *Webhook) SendEvent(ctx context.Context, rawData any, sources []string) err := w.PostWebhook(ctx, jsonPayload) if err != nil { - w.setFailureReason(health.FailureReasonConnectionError) + w.setFailureReason(health.FailureReasonConnectionError, fmt.Sprintf("while sending message to webhook: %s", err.Error())) return fmt.Errorf("while sending message to webhook: %w", err) } - w.setFailureReason("") + w.setFailureReason("", "") w.log.Debugf("Message successfully sent to Webhook: %+v", rawData) return nil } @@ -115,13 +116,14 @@ func (w *Webhook) Type() config.IntegrationType { return config.SinkIntegrationType } -func (w *Webhook) setFailureReason(reason health.FailureReasonMsg) { +func (w *Webhook) setFailureReason(reason health.FailureReasonMsg, errorMsg string) { if reason == "" { w.status = health.StatusHealthy } else { w.status = health.StatusUnHealthy } w.failureReason = reason + w.errorMsg = errorMsg } // GetStatus gets sink status @@ -130,5 +132,6 @@ func (w *Webhook) GetStatus() health.PlatformStatus { Status: w.status, Restarts: "0/0", Reason: w.failureReason, + ErrorMsg: w.errorMsg, } }