From 038d914cf0cb033f5a487bd747cc0c4e12d093f1 Mon Sep 17 00:00:00 2001 From: Huseyin BABAL Date: Tue, 8 Aug 2023 15:33:36 +0300 Subject: [PATCH 1/5] socket slack message handling --- go.mod | 3 ++ go.sum | 6 +++ pkg/bot/slack_socket.go | 100 +++++++++++++++++++++++++--------------- 3 files changed, 72 insertions(+), 37 deletions(-) diff --git a/go.mod b/go.mod index e75d9869b..eeea58c20 100644 --- a/go.mod +++ b/go.mod @@ -221,6 +221,7 @@ require ( github.com/segmentio/backo-go v0.0.0-20200129164019-23eae7c10bd3 // indirect github.com/sergi/go-diff v1.3.1 // indirect github.com/shopspring/decimal v1.3.1 // indirect + github.com/sourcegraph/conc v0.3.0 // indirect github.com/spf13/cast v1.5.0 // indirect github.com/tinylib/msgp v1.1.8 // indirect github.com/ulikunitz/xz v0.5.10 // indirect @@ -239,6 +240,8 @@ require ( go.opentelemetry.io/otel/metric v1.16.0 // indirect go.opentelemetry.io/otel/trace v1.16.0 // indirect go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5 // indirect + go.uber.org/atomic v1.11.0 // indirect + go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.10.0 // indirect golang.org/x/net v0.11.0 // indirect golang.org/x/sys v0.9.0 // indirect diff --git a/go.sum b/go.sum index ee027aaf6..97b5ee891 100644 --- a/go.sum +++ b/go.sum @@ -1122,6 +1122,8 @@ github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1 github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:UdhH50NIW0fCiwBSr0co2m7BnFLdv4fQTgdqdJTHFeE= +github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= +github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= @@ -1247,9 +1249,13 @@ go.szostok.io/version v1.2.0/go.mod h1:EiU0gPxaXb6MZ+apSN0WgDO6F4JXyC99k9PIXf2k2 go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= diff --git a/pkg/bot/slack_socket.go b/pkg/bot/slack_socket.go index 8355139de..fe5292c76 100644 --- a/pkg/bot/slack_socket.go +++ b/pkg/bot/slack_socket.go @@ -4,15 +4,16 @@ import ( "context" "errors" "fmt" - "regexp" - "strings" - "sync" - "github.com/google/uuid" "github.com/sirupsen/logrus" "github.com/slack-go/slack" "github.com/slack-go/slack/slackevents" "github.com/slack-go/slack/socketmode" + "github.com/sourcegraph/conc/pool" + "regexp" + "strings" + "sync" + "time" "github.com/kubeshop/botkube/internal/analytics" "github.com/kubeshop/botkube/pkg/api" @@ -33,19 +34,22 @@ var _ Bot = &SocketSlack{} // SocketSlack listens for user's message, execute commands and sends back the response. type SocketSlack struct { - log logrus.FieldLogger - executorFactory ExecutorFactory - reporter socketSlackAnalyticsReporter - botID string - client *slack.Client - channelsMutex sync.RWMutex - channels map[string]channelConfigByName - notifyMutex sync.Mutex - botMentionRegex *regexp.Regexp - commGroupName string - renderer *SlackRenderer - realNamesForID map[string]string - msgStatusTracker *SlackMessageStatusTracker + log logrus.FieldLogger + executorFactory ExecutorFactory + reporter socketSlackAnalyticsReporter + botID string + client *slack.Client + channelsMutex sync.RWMutex + channels map[string]channelConfigByName + notifyMutex sync.Mutex + botMentionRegex *regexp.Regexp + commGroupName string + renderer *SlackRenderer + realNamesForID map[string]string + msgStatusTracker *SlackMessageStatusTracker + messages chan slackMessage + slackMessageWorkers *pool.Pool + shutdownOnce sync.Once } // socketSlackAnalyticsReporter defines a reporter that collects analytics data. @@ -75,17 +79,19 @@ func NewSocketSlack(log logrus.FieldLogger, commGroupName string, cfg config.Soc } return &SocketSlack{ - log: log, - executorFactory: executorFactory, - reporter: reporter, - botID: botID, - client: client, - channels: channels, - commGroupName: commGroupName, - renderer: NewSlackRenderer(), - botMentionRegex: botMentionRegex, - realNamesForID: map[string]string{}, - msgStatusTracker: NewSlackMessageStatusTracker(log, client), + log: log, + executorFactory: executorFactory, + reporter: reporter, + botID: botID, + client: client, + channels: channels, + commGroupName: commGroupName, + renderer: NewSlackRenderer(), + botMentionRegex: botMentionRegex, + realNamesForID: map[string]string{}, + msgStatusTracker: NewSlackMessageStatusTracker(log, client), + messages: make(chan slackMessage, 100), + slackMessageWorkers: pool.New().WithMaxGoroutines(10), }, nil } @@ -106,10 +112,13 @@ func (b *SocketSlack) Start(ctx context.Context) error { } }() + go b.startMessageProcessor(ctx) + for { select { case <-ctx.Done(): b.log.Info("Shutdown requested. Finishing...") + b.shutdown() return nil case event := <-websocketClient.Events: switch event.Type { @@ -144,9 +153,7 @@ func (b *SocketSlack) Start(ctx context.Context) error { CommandOrigin: command.TypedOrigin, } - if err := b.handleMessage(ctx, msg); err != nil { - b.log.Errorf("while handling message: %s", err.Error()) - } + b.messages <- msg } } case socketmode.EventTypeInteractive: @@ -209,9 +216,7 @@ func (b *SocketSlack) Start(ctx context.Context) error { ResponseURL: callback.ResponseURL, BlockID: act.BlockID, } - if err := b.handleMessage(ctx, msg); err != nil { - b.log.Errorf("Message handling error: %s", err.Error()) - } + b.messages <- msg case slack.InteractionTypeViewSubmission: // this event is received when modal is submitted // the map key is the ID of the input block, for us, it's autogenerated @@ -230,9 +235,7 @@ func (b *SocketSlack) Start(ctx context.Context) error { CommandOrigin: cmdOrigin, } - if err := b.handleMessage(ctx, msg); err != nil { - b.log.Errorf("Message handling error: %s", err.Error()) - } + b.messages <- msg } } default: @@ -309,6 +312,28 @@ func (b *SocketSlack) SetNotificationsEnabled(channelName string, enabled bool) return nil } +func (b *SocketSlack) startMessageProcessor(ctx context.Context) { + b.log.Info("Starting socket slack message processor...") + defer b.log.Info("Stopped socket slack message processor...") + + for msg := range b.messages { + b.slackMessageWorkers.Go(func() { + err := b.handleMessage(ctx, msg) + if err != nil { + b.log.Errorf("while handling message: %s", err.Error()) + } + }) + } +} + +func (b *SocketSlack) shutdown() { + b.shutdownOnce.Do(func() { + b.log.Info("Shutting down socket slack message processor...") + close(b.messages) + b.slackMessageWorkers.Wait() + }) +} + func (b *SocketSlack) handleMessage(ctx context.Context, event slackMessage) error { // Handle message only if starts with mention request, found := b.findAndTrimBotMention(event.Text) @@ -318,6 +343,7 @@ func (b *SocketSlack) handleMessage(ctx context.Context, event slackMessage) err } b.log.Debugf("Slack incoming Request: %s", request) + time.Sleep(time.Second * 15) // Unfortunately we need to do a call for channel name based on ID every time a message arrives. // I wanted to query for channel IDs based on names and prepare a map in the `slackChannelsConfigFrom`, From 9751dab97fa650c0a9092173c00587c003604ddc Mon Sep 17 00:00:00 2001 From: Huseyin BABAL Date: Tue, 8 Aug 2023 16:04:57 +0300 Subject: [PATCH 2/5] mattermost worker pool added --- pkg/bot/mattermost.go | 102 ++++++++++++++++++++++++++---------------- 1 file changed, 63 insertions(+), 39 deletions(-) diff --git a/pkg/bot/mattermost.go b/pkg/bot/mattermost.go index 5c1886edb..acf7f05e3 100644 --- a/pkg/bot/mattermost.go +++ b/pkg/bot/mattermost.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/sourcegraph/conc/pool" "net/url" "regexp" "strings" @@ -46,23 +47,26 @@ const ( // Mattermost listens for user's message, execute commands and sends back the response. type Mattermost struct { - log logrus.FieldLogger - executorFactory ExecutorFactory - reporter AnalyticsReporter - serverURL string - botName string - botUserID string - teamName string - webSocketURL string - wsClient *model.WebSocketClient - apiClient *model.Client4 - channelsMutex sync.RWMutex - commGroupName string - channels map[string]channelConfigByID - notifyMutex sync.Mutex - botMentionRegex *regexp.Regexp - renderer *MattermostRenderer - userNamesForID map[string]string + log logrus.FieldLogger + executorFactory ExecutorFactory + reporter AnalyticsReporter + serverURL string + botName string + botUserID string + teamName string + webSocketURL string + wsClient *model.WebSocketClient + apiClient *model.Client4 + channelsMutex sync.RWMutex + commGroupName string + channels map[string]channelConfigByID + notifyMutex sync.Mutex + botMentionRegex *regexp.Regexp + renderer *MattermostRenderer + userNamesForID map[string]string + messages chan mattermostMessage + mmMessageWorkers *pool.Pool + shutdownOnce sync.Once } // mattermostMessage contains message details to execute command and send back the result @@ -121,24 +125,39 @@ func NewMattermost(ctx context.Context, log logrus.FieldLogger, commGroupName st } return &Mattermost{ - log: log, - executorFactory: executorFactory, - reporter: reporter, - serverURL: cfg.URL, - botName: cfg.BotName, - botUserID: botUserID, - teamName: cfg.Team, - apiClient: client, - webSocketURL: webSocketURL, - commGroupName: commGroupName, - channels: channelsByIDCfg, - botMentionRegex: botMentionRegex, - renderer: NewMattermostRenderer(), - userNamesForID: map[string]string{}, + log: log, + executorFactory: executorFactory, + reporter: reporter, + serverURL: cfg.URL, + botName: cfg.BotName, + botUserID: botUserID, + teamName: cfg.Team, + apiClient: client, + webSocketURL: webSocketURL, + commGroupName: commGroupName, + channels: channelsByIDCfg, + botMentionRegex: botMentionRegex, + renderer: NewMattermostRenderer(), + userNamesForID: map[string]string{}, + messages: make(chan mattermostMessage, 100), + mmMessageWorkers: pool.New().WithMaxGoroutines(10), }, nil } -// Start establishes mattermost connection and listens for messages +func (b *Mattermost) startMessageProcessor(ctx context.Context) { + b.log.Info("Starting mattermost message processor...") + defer b.log.Info("Stopped mattermost message processor...") + + for msg := range b.messages { + b.mmMessageWorkers.Go(func() { + err := b.handleMessage(ctx, msg) + if err != nil { + wrappedErr := fmt.Errorf("while handling message: %w", err) + b.log.Errorf(wrappedErr.Error()) + } + }) + } +} // Start establishes mattermost connection and listens for messages func (b *Mattermost) Start(ctx context.Context) error { b.log.Info("Starting bot") @@ -161,6 +180,7 @@ func (b *Mattermost) Start(ctx context.Context) error { select { case <-ctx.Done(): b.log.Info("Shutdown requested. Finishing...") + b.shutdown() return nil default: var appErr error @@ -213,7 +233,7 @@ func (b *Mattermost) SetNotificationsEnabled(channelID string, enabled bool) err } // Check incoming message and take action -func (b *Mattermost) handleMessage(ctx context.Context, mm *mattermostMessage) error { +func (b *Mattermost) handleMessage(ctx context.Context, mm mattermostMessage) error { post, err := postFromEvent(mm.Event) if err != nil { return fmt.Errorf("while getting post from event: %w", err) @@ -387,14 +407,10 @@ func (b *Mattermost) listen(ctx context.Context) { continue } - mm := &mattermostMessage{ + mm := mattermostMessage{ Event: event, } - err := b.handleMessage(ctx, mm) - if err != nil { - wrappedErr := fmt.Errorf("while handling message: %w", err) - b.log.Errorf(wrappedErr.Error()) - } + b.messages <- mm } } } @@ -482,6 +498,14 @@ func (b *Mattermost) getUserName(ctx context.Context, userID string) (string, er return user.Username, nil } +func (b *Mattermost) shutdown() { + b.shutdownOnce.Do(func() { + b.log.Info("Shutting down mattermost message processor...") + close(b.messages) + b.mmMessageWorkers.Wait() + }) +} + func getBotUserID(ctx context.Context, client *model.Client4, teamID, botName string) (string, error) { users, _, err := client.AutocompleteUsersInTeam(ctx, teamID, botName, 1, "") if err != nil { From c623bb6820458b566b8432fd8a04b2574bee0bd2 Mon Sep 17 00:00:00 2001 From: Huseyin BABAL Date: Tue, 8 Aug 2023 16:45:28 +0300 Subject: [PATCH 3/5] discord, and slack platforms added --- go.mod | 2 +- go.sum | 2 - pkg/bot/discord.go | 81 +++++++---- pkg/bot/mattermost.go | 5 +- pkg/bot/slack_cloud.go | 309 ++++++++++++++++++++++------------------ pkg/bot/slack_legacy.go | 78 +++++++--- pkg/bot/slack_socket.go | 9 +- 7 files changed, 293 insertions(+), 193 deletions(-) diff --git a/go.mod b/go.mod index eeea58c20..ee0dec758 100644 --- a/go.mod +++ b/go.mod @@ -52,6 +52,7 @@ require ( github.com/sha1sum/aws_signing_client v0.0.0-20200229211254-f7815c59d5c1 github.com/sirupsen/logrus v1.9.0 github.com/slack-go/slack v0.12.2 + github.com/sourcegraph/conc v0.3.0 github.com/spf13/cobra v1.7.0 github.com/spf13/pflag v1.0.5 github.com/spiffe/spire v1.5.6 @@ -221,7 +222,6 @@ require ( github.com/segmentio/backo-go v0.0.0-20200129164019-23eae7c10bd3 // indirect github.com/sergi/go-diff v1.3.1 // indirect github.com/shopspring/decimal v1.3.1 // indirect - github.com/sourcegraph/conc v0.3.0 // indirect github.com/spf13/cast v1.5.0 // indirect github.com/tinylib/msgp v1.1.8 // indirect github.com/ulikunitz/xz v0.5.10 // indirect diff --git a/go.sum b/go.sum index 97b5ee891..8d45ad266 100644 --- a/go.sum +++ b/go.sum @@ -1248,12 +1248,10 @@ go.szostok.io/version v1.2.0 h1:8eMMdfsonjbibwZRLJ8TnrErY8bThFTQsZYV16mcXms= go.szostok.io/version v1.2.0/go.mod h1:EiU0gPxaXb6MZ+apSN0WgDO6F4JXyC99k9PIXf2k2E8= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= -go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= diff --git a/pkg/bot/discord.go b/pkg/bot/discord.go index e6dfdeec5..4edb9134c 100644 --- a/pkg/bot/discord.go +++ b/pkg/bot/discord.go @@ -11,6 +11,7 @@ import ( "github.com/bwmarrin/discordgo" "github.com/sirupsen/logrus" + "github.com/sourcegraph/conc/pool" "github.com/kubeshop/botkube/pkg/api" "github.com/kubeshop/botkube/pkg/bot/interactive" @@ -39,17 +40,20 @@ const ( // Discord listens for user's message, execute commands and sends back the response. type Discord struct { - log logrus.FieldLogger - executorFactory ExecutorFactory - reporter AnalyticsReporter - api *discordgo.Session - botID string - channelsMutex sync.RWMutex - channels map[string]channelConfigByID - notifyMutex sync.Mutex - botMentionRegex *regexp.Regexp - commGroupName string - renderer *DiscordRenderer + log logrus.FieldLogger + executorFactory ExecutorFactory + reporter AnalyticsReporter + api *discordgo.Session + botID string + channelsMutex sync.RWMutex + channels map[string]channelConfigByID + notifyMutex sync.Mutex + botMentionRegex *regexp.Regexp + commGroupName string + renderer *DiscordRenderer + messages chan discordMessage + discordMessageWorkers *pool.Pool + shutdownOnce sync.Once } // discordMessage contains message details to execute command and send back the result. @@ -75,19 +79,33 @@ func NewDiscord(log logrus.FieldLogger, commGroupName string, cfg config.Discord } return &Discord{ - log: log, - reporter: reporter, - executorFactory: executorFactory, - api: api, - botID: cfg.BotID, - commGroupName: commGroupName, - channels: channelsCfg, - botMentionRegex: botMentionRegex, - renderer: NewDiscordRenderer(), + log: log, + reporter: reporter, + executorFactory: executorFactory, + api: api, + botID: cfg.BotID, + commGroupName: commGroupName, + channels: channelsCfg, + botMentionRegex: botMentionRegex, + renderer: NewDiscordRenderer(), + messages: make(chan discordMessage, 100), + discordMessageWorkers: pool.New().WithMaxGoroutines(10), }, nil } -// Start starts the Discord websocket connection and listens for messages. +func (b *Discord) startMessageProcessor(ctx context.Context) { + b.log.Info("Starting discord message processor...") + defer b.log.Info("Stopped discord message processor...") + + for msg := range b.messages { + b.discordMessageWorkers.Go(func() { + err := b.handleMessage(ctx, msg) + if err != nil { + b.log.Errorf("while handling message: %s", err.Error()) + } + }) + } +} // Start starts the Discord websocket connection and listens for messages. func (b *Discord) Start(ctx context.Context) error { b.log.Info("Starting bot") @@ -113,14 +131,10 @@ func (b *Discord) Start(ctx context.Context) error { } b.log.Info("Botkube connected to Discord!") - + go b.startMessageProcessor(ctx) <-ctx.Done() b.log.Info("Shutdown requested. Finishing...") - err = b.api.Close() - if err != nil { - return fmt.Errorf("while closing connection: %w", err) - } - + b.shutdown() return nil } @@ -345,6 +359,19 @@ func (b *Discord) formatMessage(msg interactive.CoreMessage) (*discordgo.Message }, nil } +func (b *Discord) shutdown() { + b.shutdownOnce.Do(func() { + b.log.Info("Shutting down discord message processor...") + close(b.messages) + b.discordMessageWorkers.Wait() + + err := b.api.Close() + if err != nil { + b.log.Errorf("While closing connection: %v", err) + } + }) +} + func discordChannelsConfigFrom(log logrus.FieldLogger, api *discordgo.Session, channelsCfg config.IdentifiableMap[config.ChannelBindingsByID]) (map[string]channelConfigByID, error) { res := make(map[string]channelConfigByID) for channAlias, channCfg := range channelsCfg { diff --git a/pkg/bot/mattermost.go b/pkg/bot/mattermost.go index acf7f05e3..4b5311e3c 100644 --- a/pkg/bot/mattermost.go +++ b/pkg/bot/mattermost.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "github.com/sourcegraph/conc/pool" "net/url" "regexp" "strings" @@ -13,6 +12,7 @@ import ( "github.com/mattermost/mattermost/server/public/model" "github.com/sirupsen/logrus" + "github.com/sourcegraph/conc/pool" "github.com/kubeshop/botkube/pkg/api" "github.com/kubeshop/botkube/pkg/bot/interactive" @@ -176,6 +176,9 @@ func (b *Mattermost) Start(ctx context.Context) error { // For now, we are adding retry logic to reconnect to the server // https://github.com/kubeshop/botkube/issues/201 b.log.Info("Botkube connected to Mattermost!") + + go b.startMessageProcessor(ctx) + for { select { case <-ctx.Done(): diff --git a/pkg/bot/slack_cloud.go b/pkg/bot/slack_cloud.go index 30e4f6022..e2b908259 100644 --- a/pkg/bot/slack_cloud.go +++ b/pkg/bot/slack_cloud.go @@ -16,6 +16,7 @@ import ( "github.com/sirupsen/logrus" "github.com/slack-go/slack" "github.com/slack-go/slack/slackevents" + "github.com/sourcegraph/conc/pool" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" @@ -47,21 +48,24 @@ var _ Bot = &CloudSlack{} // CloudSlack listens for user's message, execute commands and sends back the response. type CloudSlack struct { - log logrus.FieldLogger - cfg config.CloudSlack - client *slack.Client - executorFactory ExecutorFactory - reporter cloudSlackAnalyticsReporter - commGroupName string - realNamesForID map[string]string - botMentionRegex *regexp.Regexp - botID string - channelsMutex sync.RWMutex - renderer *SlackRenderer - channels map[string]channelConfigByName - notifyMutex sync.Mutex - clusterName string - msgStatusTracker *SlackMessageStatusTracker + log logrus.FieldLogger + cfg config.CloudSlack + client *slack.Client + executorFactory ExecutorFactory + reporter cloudSlackAnalyticsReporter + commGroupName string + realNamesForID map[string]string + botMentionRegex *regexp.Regexp + botID string + channelsMutex sync.RWMutex + renderer *SlackRenderer + channels map[string]channelConfigByName + notifyMutex sync.Mutex + clusterName string + msgStatusTracker *SlackMessageStatusTracker + messages chan *pb.ConnectResponse + slackMessageWorkers *pool.Pool + shutdownOnce sync.Once } // cloudSlackAnalyticsReporter defines a reporter that collects analytics data. @@ -192,157 +196,188 @@ func (b *CloudSlack) start(ctx context.Context) error { return fmt.Errorf("while sending gRPC connection request. %w", err) } + defer b.shutdown() + + go b.startMessageProcessor(ctx) + for { data, err := c.Recv() if err != nil { if err == io.EOF { b.log.Warn("gRPC connection was closed by server") - return nil + return errors.New("gRPC connection closed") } errStatus, ok := status.FromError(err) if ok && errStatus.Code() == codes.Canceled && errStatus.Message() == context.Canceled.Error() { b.log.Debugf("Context was cancelled. Skipping returning error...") - return nil + return fmt.Errorf("while resolving error from gRPC response %s", errStatus.Err()) } return fmt.Errorf("while receiving cloud slack events: %w", err) } - if streamingError := b.checkStreamingError(data.Event); pb.IsQuotaExceededErr(streamingError) { - b.log.Warn(quotaExceededMsg) - return nil - } - if len(data.Event) == 0 { - continue + b.messages <- data + } +} + +func (b *CloudSlack) startMessageProcessor(ctx context.Context) { + b.log.Info("Starting cloud slack message processor...") + defer b.log.Info("Stopped cloud slack message processor...") + + for msg := range b.messages { + b.slackMessageWorkers.Go(func() { + err, _ := b.handleStreamMessage(ctx, msg) + if err != nil { + b.log.Errorf("while handling message: %s", err.Error()) + } + }) + } +} + +func (b *CloudSlack) shutdown() { + b.shutdownOnce.Do(func() { + b.log.Info("Shutting down cloud slack message processor...") + close(b.messages) + b.slackMessageWorkers.Wait() + }) +} + +func (b *CloudSlack) handleStreamMessage(ctx context.Context, data *pb.ConnectResponse) (error, bool) { + if streamingError := b.checkStreamingError(data.Event); pb.IsQuotaExceededErr(streamingError) { + b.log.Warn(quotaExceededMsg) + return nil, true + } + if len(data.Event) == 0 { + return nil, false + } + event, err := slackevents.ParseEvent(data.Event, slackevents.OptionNoVerifyToken()) + if err != nil { + return fmt.Errorf("while parsing event: %w", err), true + } + switch event.Type { + case slackevents.CallbackEvent: + b.log.Debugf("Got callback event %s", formatx.StructDumper().Sdump(event)) + innerEvent := event.InnerEvent + switch ev := innerEvent.Data.(type) { + case *slackevents.AppMentionEvent: + b.log.Debugf("Got app mention %s", formatx.StructDumper().Sdump(innerEvent)) + userName := b.getRealNameWithFallbackToUserID(ctx, ev.User) + msg := slackMessage{ + Text: ev.Text, + Channel: ev.Channel, + ThreadTimeStamp: ev.ThreadTimeStamp, + UserID: ev.User, + EventTimeStamp: ev.EventTimeStamp, + UserName: userName, + CommandOrigin: command.TypedOrigin, + } + + if err := b.handleMessage(ctx, msg); err != nil { + b.log.Errorf("while handling message: %s", err.Error()) + } + case *slackevents.MessageEvent: + b.log.Debugf("Got generic message event %s", formatx.StructDumper().Sdump(innerEvent)) + msg := slackMessage{ + Text: ev.Text, + Channel: ev.Channel, + UserID: ev.User, + EventTimeStamp: ev.EventTimeStamp, + } + response := quotaExceeded() + + if err := b.send(ctx, msg, response); err != nil { + return fmt.Errorf("while sending message: %w", err), true + } } - event, err := slackevents.ParseEvent(data.Event, slackevents.OptionNoVerifyToken()) + case string(slack.InteractionTypeBlockActions), string(slack.InteractionTypeViewSubmission): + var callback slack.InteractionCallback + err = json.Unmarshal(data.Event, &callback) if err != nil { - return fmt.Errorf("while parsing event: %w", err) + b.log.Errorf("Invalid event %+v\n", data.Event) + return fmt.Errorf("Invalid event %+v\n", data.Event), false } - switch event.Type { - case slackevents.CallbackEvent: - b.log.Debugf("Got callback event %s", formatx.StructDumper().Sdump(event)) - innerEvent := event.InnerEvent - switch ev := innerEvent.Data.(type) { - case *slackevents.AppMentionEvent: - b.log.Debugf("Got app mention %s", formatx.StructDumper().Sdump(innerEvent)) - userName := b.getRealNameWithFallbackToUserID(ctx, ev.User) - msg := slackMessage{ - Text: ev.Text, - Channel: ev.Channel, - ThreadTimeStamp: ev.ThreadTimeStamp, - UserID: ev.User, - EventTimeStamp: ev.EventTimeStamp, - UserName: userName, - CommandOrigin: command.TypedOrigin, - } - if err := b.handleMessage(ctx, msg); err != nil { - b.log.Errorf("while handling message: %s", err.Error()) - } - case *slackevents.MessageEvent: - b.log.Debugf("Got generic message event %s", formatx.StructDumper().Sdump(innerEvent)) - msg := slackMessage{ - Text: ev.Text, - Channel: ev.Channel, - UserID: ev.User, - EventTimeStamp: ev.EventTimeStamp, - } - response := quotaExceeded() + switch callback.Type { + case slack.InteractionTypeBlockActions: + b.log.Debugf("Got block action %s", formatx.StructDumper().Sdump(callback)) - if err := b.send(ctx, msg, response); err != nil { - return fmt.Errorf("while sending message: %w", err) - } - } - case string(slack.InteractionTypeBlockActions), string(slack.InteractionTypeViewSubmission): - var callback slack.InteractionCallback - err = json.Unmarshal(data.Event, &callback) - if err != nil { - b.log.Errorf("Invalid event %+v\n", data.Event) - continue + if len(callback.ActionCallback.BlockActions) != 1 { + b.log.Debug("Ignoring callback as the number of actions is different from 1") + return nil, false } - switch callback.Type { - case slack.InteractionTypeBlockActions: - b.log.Debugf("Got block action %s", formatx.StructDumper().Sdump(callback)) - - if len(callback.ActionCallback.BlockActions) != 1 { - b.log.Debug("Ignoring callback as the number of actions is different from 1") - continue + act := callback.ActionCallback.BlockActions[0] + if act == nil || strings.HasPrefix(act.ActionID, urlButtonActionIDPrefix) { + reportErr := b.reporter.ReportCommand(b.IntegrationName(), act.ActionID, command.ButtonClickOrigin, false) + if reportErr != nil { + b.log.Errorf("while reporting URL command, error: %s", reportErr.Error()) } + return nil, false // skip the url actions + } - act := callback.ActionCallback.BlockActions[0] - if act == nil || strings.HasPrefix(act.ActionID, urlButtonActionIDPrefix) { - reportErr := b.reporter.ReportCommand(b.IntegrationName(), act.ActionID, command.ButtonClickOrigin, false) - if reportErr != nil { - b.log.Errorf("while reporting URL command, error: %s", reportErr.Error()) - } - continue // skip the url actions - } + channelID := callback.Channel.ID + if channelID == "" && callback.View.ID != "" { + // TODO: add support when we will need to handle button clicks from active modal. + // + // The request is coming from active modal, currently we don't support that. + // We process that only when the modal is submitted (see slack.InteractionTypeViewSubmission action type). + b.log.Debug("Ignoring callback as its source is an active modal") + return nil, false + } - channelID := callback.Channel.ID - if channelID == "" && callback.View.ID != "" { - // TODO: add support when we will need to handle button clicks from active modal. - // - // The request is coming from active modal, currently we don't support that. - // We process that only when the modal is submitted (see slack.InteractionTypeViewSubmission action type). - b.log.Debug("Ignoring callback as its source is an active modal") - continue - } + cmd, cmdOrigin := resolveBlockActionCommand(*act) + // Use thread's TS if interactive call triggered within thread. + threadTs := callback.MessageTs + if callback.Message.Msg.ThreadTimestamp != "" { + threadTs = callback.Message.Msg.ThreadTimestamp + } - cmd, cmdOrigin := resolveBlockActionCommand(*act) - // Use thread's TS if interactive call triggered within thread. - threadTs := callback.MessageTs - if callback.Message.Msg.ThreadTimestamp != "" { - threadTs = callback.Message.Msg.ThreadTimestamp - } + state := removeBotNameFromIDs(b.BotName(), callback.BlockActionState) + + userName := b.getRealNameWithFallbackToUserID(ctx, callback.User.ID) + msg := slackMessage{ + Text: cmd, + Channel: channelID, + ThreadTimeStamp: threadTs, + TriggerID: callback.TriggerID, + UserID: callback.User.ID, + UserName: userName, + CommandOrigin: cmdOrigin, + State: state, + EventTimeStamp: callback.Message.Timestamp, + ResponseURL: callback.ResponseURL, + BlockID: act.BlockID, + } + if err := b.handleMessage(ctx, msg); err != nil { + b.log.Errorf("Message handling error: %s", err.Error()) + } + case slack.InteractionTypeViewSubmission: // this event is received when modal is submitted + + // the map key is the ID of the input block, for us, it's autogenerated + for _, item := range callback.View.State.Values { + for actID, act := range item { + act.ActionID = actID // normalize event + + cmd, cmdOrigin := resolveBlockActionCommand(act) + userName := b.getRealNameWithFallbackToUserID(ctx, callback.User.ID) + msg := slackMessage{ + Text: cmd, + Channel: callback.View.PrivateMetadata, + UserID: callback.User.ID, + UserName: userName, + EventTimeStamp: "", // there is no timestamp for interactive callbacks + CommandOrigin: cmdOrigin, + } - state := removeBotNameFromIDs(b.BotName(), callback.BlockActionState) - - userName := b.getRealNameWithFallbackToUserID(ctx, callback.User.ID) - msg := slackMessage{ - Text: cmd, - Channel: channelID, - ThreadTimeStamp: threadTs, - TriggerID: callback.TriggerID, - UserID: callback.User.ID, - UserName: userName, - CommandOrigin: cmdOrigin, - State: state, - EventTimeStamp: callback.Message.Timestamp, - ResponseURL: callback.ResponseURL, - BlockID: act.BlockID, - } - if err := b.handleMessage(ctx, msg); err != nil { - b.log.Errorf("Message handling error: %s", err.Error()) - } - case slack.InteractionTypeViewSubmission: // this event is received when modal is submitted - - // the map key is the ID of the input block, for us, it's autogenerated - for _, item := range callback.View.State.Values { - for actID, act := range item { - act.ActionID = actID // normalize event - - cmd, cmdOrigin := resolveBlockActionCommand(act) - userName := b.getRealNameWithFallbackToUserID(ctx, callback.User.ID) - msg := slackMessage{ - Text: cmd, - Channel: callback.View.PrivateMetadata, - UserID: callback.User.ID, - UserName: userName, - EventTimeStamp: "", // there is no timestamp for interactive callbacks - CommandOrigin: cmdOrigin, - } - - if err := b.handleMessage(ctx, msg); err != nil { - b.log.Errorf("Message handling error: %s", err.Error()) - } + if err := b.handleMessage(ctx, msg); err != nil { + b.log.Errorf("Message handling error: %s", err.Error()) } } - default: - b.log.Debugf("get unhandled event %s", callback.Type) } + default: + b.log.Debugf("get unhandled event %s", callback.Type) } - b.log.Debugf("received: %q\n", event) } + b.log.Debugf("received: %q\n", event) + return nil, false } func (b *CloudSlack) SendMessage(ctx context.Context, msg interactive.CoreMessage, sourceBindings []string) error { diff --git a/pkg/bot/slack_legacy.go b/pkg/bot/slack_legacy.go index d75198b82..95d2bbcda 100644 --- a/pkg/bot/slack_legacy.go +++ b/pkg/bot/slack_legacy.go @@ -9,6 +9,7 @@ import ( "github.com/sirupsen/logrus" "github.com/slack-go/slack" + "github.com/sourcegraph/conc/pool" "github.com/kubeshop/botkube/internal/analytics" "github.com/kubeshop/botkube/pkg/bot/interactive" @@ -37,17 +38,20 @@ var _ Bot = &Slack{} // Slack listens for user's message, execute commands and sends back the response. type Slack struct { - log logrus.FieldLogger - executorFactory ExecutorFactory - reporter FatalErrorAnalyticsReporter - botID string - client *slack.Client - channelsMutex sync.RWMutex - channels map[string]channelConfigByName - notifyMutex sync.Mutex - botMentionRegex *regexp.Regexp - commGroupName string - renderer *SlackRenderer + log logrus.FieldLogger + executorFactory ExecutorFactory + reporter FatalErrorAnalyticsReporter + botID string + client *slack.Client + channelsMutex sync.RWMutex + channels map[string]channelConfigByName + notifyMutex sync.Mutex + botMentionRegex *regexp.Regexp + commGroupName string + renderer *SlackRenderer + messages chan slackLegacyMessage + slackMessageWorkers *pool.Pool + shutdownOnce sync.Once } // slackLegacyMessage contains message details to execute command and send back the result @@ -79,18 +83,34 @@ func NewSlack(log logrus.FieldLogger, commGroupName string, cfg config.Slack, ex } return &Slack{ - log: log, - executorFactory: executorFactory, - reporter: reporter, - botID: botID, - client: client, - channels: channels, - commGroupName: commGroupName, - botMentionRegex: botMentionRegex, - renderer: NewSlackRenderer(), + log: log, + executorFactory: executorFactory, + reporter: reporter, + botID: botID, + client: client, + channels: channels, + commGroupName: commGroupName, + botMentionRegex: botMentionRegex, + renderer: NewSlackRenderer(), + messages: make(chan slackLegacyMessage, 100), + slackMessageWorkers: pool.New().WithMaxGoroutines(10), }, nil } +func (b *Slack) startMessageProcessor(ctx context.Context) { + b.log.Info("Starting slack message processor...") + defer b.log.Info("Stopped slack message processor...") + + for msg := range b.messages { + b.slackMessageWorkers.Go(func() { + err := b.handleMessage(ctx, msg) + if err != nil { + b.log.Errorf("while handling message: %s", err.Error()) + } + }) + } +} + // Start starts the Slack RTM connection and listens for messages func (b *Slack) Start(ctx context.Context) error { b.log.Info("Starting bot") @@ -101,11 +121,14 @@ func (b *Slack) Start(ctx context.Context) error { rtm.ManageConnection() }() + go b.startMessageProcessor(ctx) + for { select { case <-ctx.Done(): b.log.Info("Shutdown requested. Finishing...") - return rtm.Disconnect() + b.shutdown(rtm) + return nil case msg, ok := <-rtm.IncomingEvents: if !ok { b.log.Info("Incoming events channel closed. Finishing...") @@ -372,6 +395,19 @@ func (b *Slack) findAndTrimBotMention(msg string) (string, bool) { return b.botMentionRegex.ReplaceAllString(msg, ""), true } +func (b *Slack) shutdown(rtm *slack.RTM) { + b.shutdownOnce.Do(func() { + b.log.Info("Shutting down slack message processor...") + close(b.messages) + b.slackMessageWorkers.Wait() + + err := rtm.Disconnect() + if err != nil { + b.log.Errorf("Error while disconnecting from Slack: %v", err) + } + }) +} + func uploadFileToSlack(ctx context.Context, channel string, resp interactive.CoreMessage, client *slack.Client, ts string) (*slack.File, error) { params := slack.FileUploadParameters{ Filename: "Response.txt", diff --git a/pkg/bot/slack_socket.go b/pkg/bot/slack_socket.go index fe5292c76..c3fd70a0f 100644 --- a/pkg/bot/slack_socket.go +++ b/pkg/bot/slack_socket.go @@ -4,16 +4,17 @@ import ( "context" "errors" "fmt" + "regexp" + "strings" + "sync" + "time" + "github.com/google/uuid" "github.com/sirupsen/logrus" "github.com/slack-go/slack" "github.com/slack-go/slack/slackevents" "github.com/slack-go/slack/socketmode" "github.com/sourcegraph/conc/pool" - "regexp" - "strings" - "sync" - "time" "github.com/kubeshop/botkube/internal/analytics" "github.com/kubeshop/botkube/pkg/api" From aa70c7686a5c5519485e16900bea4a2f07336594 Mon Sep 17 00:00:00 2001 From: Huseyin BABAL Date: Wed, 9 Aug 2023 16:39:31 +0300 Subject: [PATCH 4/5] renaming --- pkg/bot/discord.go | 8 +++-- pkg/bot/mattermost.go | 76 ++++++++++++++++++++--------------------- pkg/bot/slack_cloud.go | 56 ++++++++++++++++-------------- pkg/bot/slack_socket.go | 67 +++++++++++++++++++----------------- 4 files changed, 110 insertions(+), 97 deletions(-) diff --git a/pkg/bot/discord.go b/pkg/bot/discord.go index 4edb9134c..ff1bcf366 100644 --- a/pkg/bot/discord.go +++ b/pkg/bot/discord.go @@ -36,6 +36,10 @@ const ( // discordMaxMessageSize max size before a message should be uploaded as a file. discordMaxMessageSize = 2000 + + discordMessageWorkersCount = 10 + + discordMessageChannelSize = 100 ) // Discord listens for user's message, execute commands and sends back the response. @@ -88,8 +92,8 @@ func NewDiscord(log logrus.FieldLogger, commGroupName string, cfg config.Discord channels: channelsCfg, botMentionRegex: botMentionRegex, renderer: NewDiscordRenderer(), - messages: make(chan discordMessage, 100), - discordMessageWorkers: pool.New().WithMaxGoroutines(10), + messages: make(chan discordMessage, discordMessageChannelSize), + discordMessageWorkers: pool.New().WithMaxGoroutines(discordMessageWorkersCount), }, nil } diff --git a/pkg/bot/mattermost.go b/pkg/bot/mattermost.go index 4b5311e3c..1863182f0 100644 --- a/pkg/bot/mattermost.go +++ b/pkg/bot/mattermost.go @@ -47,26 +47,26 @@ const ( // Mattermost listens for user's message, execute commands and sends back the response. type Mattermost struct { - log logrus.FieldLogger - executorFactory ExecutorFactory - reporter AnalyticsReporter - serverURL string - botName string - botUserID string - teamName string - webSocketURL string - wsClient *model.WebSocketClient - apiClient *model.Client4 - channelsMutex sync.RWMutex - commGroupName string - channels map[string]channelConfigByID - notifyMutex sync.Mutex - botMentionRegex *regexp.Regexp - renderer *MattermostRenderer - userNamesForID map[string]string - messages chan mattermostMessage - mmMessageWorkers *pool.Pool - shutdownOnce sync.Once + log logrus.FieldLogger + executorFactory ExecutorFactory + reporter AnalyticsReporter + serverURL string + botName string + botUserID string + teamName string + webSocketURL string + wsClient *model.WebSocketClient + apiClient *model.Client4 + channelsMutex sync.RWMutex + commGroupName string + channels map[string]channelConfigByID + notifyMutex sync.Mutex + botMentionRegex *regexp.Regexp + renderer *MattermostRenderer + userNamesForID map[string]string + messages chan mattermostMessage + messageWorkers *pool.Pool + shutdownOnce sync.Once } // mattermostMessage contains message details to execute command and send back the result @@ -125,22 +125,22 @@ func NewMattermost(ctx context.Context, log logrus.FieldLogger, commGroupName st } return &Mattermost{ - log: log, - executorFactory: executorFactory, - reporter: reporter, - serverURL: cfg.URL, - botName: cfg.BotName, - botUserID: botUserID, - teamName: cfg.Team, - apiClient: client, - webSocketURL: webSocketURL, - commGroupName: commGroupName, - channels: channelsByIDCfg, - botMentionRegex: botMentionRegex, - renderer: NewMattermostRenderer(), - userNamesForID: map[string]string{}, - messages: make(chan mattermostMessage, 100), - mmMessageWorkers: pool.New().WithMaxGoroutines(10), + log: log, + executorFactory: executorFactory, + reporter: reporter, + serverURL: cfg.URL, + botName: cfg.BotName, + botUserID: botUserID, + teamName: cfg.Team, + apiClient: client, + webSocketURL: webSocketURL, + commGroupName: commGroupName, + channels: channelsByIDCfg, + botMentionRegex: botMentionRegex, + renderer: NewMattermostRenderer(), + userNamesForID: map[string]string{}, + messages: make(chan mattermostMessage, 100), + messageWorkers: pool.New().WithMaxGoroutines(10), }, nil } @@ -149,7 +149,7 @@ func (b *Mattermost) startMessageProcessor(ctx context.Context) { defer b.log.Info("Stopped mattermost message processor...") for msg := range b.messages { - b.mmMessageWorkers.Go(func() { + b.messageWorkers.Go(func() { err := b.handleMessage(ctx, msg) if err != nil { wrappedErr := fmt.Errorf("while handling message: %w", err) @@ -505,7 +505,7 @@ func (b *Mattermost) shutdown() { b.shutdownOnce.Do(func() { b.log.Info("Shutting down mattermost message processor...") close(b.messages) - b.mmMessageWorkers.Wait() + b.messageWorkers.Wait() }) } diff --git a/pkg/bot/slack_cloud.go b/pkg/bot/slack_cloud.go index e2b908259..7e296bf41 100644 --- a/pkg/bot/slack_cloud.go +++ b/pkg/bot/slack_cloud.go @@ -36,36 +36,38 @@ import ( ) const ( - APIKeyContextKey = "X-Api-Key" // #nosec - DeploymentIDContextKey = "X-Deployment-Id" // #nosec - retryDelay = time.Second - maxRetries = 30 - successIntervalDuration = 3 * time.Minute - quotaExceededMsg = "Quota exceeded detected. Stopping reconnecting to Botkube Cloud gRPC API..." + APIKeyContextKey = "X-Api-Key" // #nosec + DeploymentIDContextKey = "X-Deployment-Id" // #nosec + retryDelay = time.Second + maxRetries = 30 + successIntervalDuration = 3 * time.Minute + quotaExceededMsg = "Quota exceeded detected. Stopping reconnecting to Botkube Cloud gRPC API..." + cloudSlackMessageWorkersCount = 10 + cloudSlackMessageChannelSize = 100 ) var _ Bot = &CloudSlack{} // CloudSlack listens for user's message, execute commands and sends back the response. type CloudSlack struct { - log logrus.FieldLogger - cfg config.CloudSlack - client *slack.Client - executorFactory ExecutorFactory - reporter cloudSlackAnalyticsReporter - commGroupName string - realNamesForID map[string]string - botMentionRegex *regexp.Regexp - botID string - channelsMutex sync.RWMutex - renderer *SlackRenderer - channels map[string]channelConfigByName - notifyMutex sync.Mutex - clusterName string - msgStatusTracker *SlackMessageStatusTracker - messages chan *pb.ConnectResponse - slackMessageWorkers *pool.Pool - shutdownOnce sync.Once + log logrus.FieldLogger + cfg config.CloudSlack + client *slack.Client + executorFactory ExecutorFactory + reporter cloudSlackAnalyticsReporter + commGroupName string + realNamesForID map[string]string + botMentionRegex *regexp.Regexp + botID string + channelsMutex sync.RWMutex + renderer *SlackRenderer + channels map[string]channelConfigByName + notifyMutex sync.Mutex + clusterName string + msgStatusTracker *SlackMessageStatusTracker + messages chan *pb.ConnectResponse + messageWorkers *pool.Pool + shutdownOnce sync.Once } // cloudSlackAnalyticsReporter defines a reporter that collects analytics data. @@ -111,6 +113,8 @@ func NewCloudSlack(log logrus.FieldLogger, clusterName: clusterName, realNamesForID: map[string]string{}, msgStatusTracker: NewSlackMessageStatusTracker(log, client), + messages: make(chan *pb.ConnectResponse, cloudSlackMessageChannelSize), + messageWorkers: pool.New().WithMaxGoroutines(cloudSlackMessageWorkersCount), }, nil } @@ -223,7 +227,7 @@ func (b *CloudSlack) startMessageProcessor(ctx context.Context) { defer b.log.Info("Stopped cloud slack message processor...") for msg := range b.messages { - b.slackMessageWorkers.Go(func() { + b.messageWorkers.Go(func() { err, _ := b.handleStreamMessage(ctx, msg) if err != nil { b.log.Errorf("while handling message: %s", err.Error()) @@ -236,7 +240,7 @@ func (b *CloudSlack) shutdown() { b.shutdownOnce.Do(func() { b.log.Info("Shutting down cloud slack message processor...") close(b.messages) - b.slackMessageWorkers.Wait() + b.messageWorkers.Wait() }) } diff --git a/pkg/bot/slack_socket.go b/pkg/bot/slack_socket.go index c3fd70a0f..dd933d25e 100644 --- a/pkg/bot/slack_socket.go +++ b/pkg/bot/slack_socket.go @@ -31,26 +31,31 @@ import ( // - split to multiple files in a separate package, // - review all the methods and see if they can be simplified. +const ( + socketSlackMessageWorkersCount = 10 + socketSlackMessageChannelSize = 100 +) + var _ Bot = &SocketSlack{} // SocketSlack listens for user's message, execute commands and sends back the response. type SocketSlack struct { - log logrus.FieldLogger - executorFactory ExecutorFactory - reporter socketSlackAnalyticsReporter - botID string - client *slack.Client - channelsMutex sync.RWMutex - channels map[string]channelConfigByName - notifyMutex sync.Mutex - botMentionRegex *regexp.Regexp - commGroupName string - renderer *SlackRenderer - realNamesForID map[string]string - msgStatusTracker *SlackMessageStatusTracker - messages chan slackMessage - slackMessageWorkers *pool.Pool - shutdownOnce sync.Once + log logrus.FieldLogger + executorFactory ExecutorFactory + reporter socketSlackAnalyticsReporter + botID string + client *slack.Client + channelsMutex sync.RWMutex + channels map[string]channelConfigByName + notifyMutex sync.Mutex + botMentionRegex *regexp.Regexp + commGroupName string + renderer *SlackRenderer + realNamesForID map[string]string + msgStatusTracker *SlackMessageStatusTracker + messages chan slackMessage + messageWorkers *pool.Pool + shutdownOnce sync.Once } // socketSlackAnalyticsReporter defines a reporter that collects analytics data. @@ -80,19 +85,19 @@ func NewSocketSlack(log logrus.FieldLogger, commGroupName string, cfg config.Soc } return &SocketSlack{ - log: log, - executorFactory: executorFactory, - reporter: reporter, - botID: botID, - client: client, - channels: channels, - commGroupName: commGroupName, - renderer: NewSlackRenderer(), - botMentionRegex: botMentionRegex, - realNamesForID: map[string]string{}, - msgStatusTracker: NewSlackMessageStatusTracker(log, client), - messages: make(chan slackMessage, 100), - slackMessageWorkers: pool.New().WithMaxGoroutines(10), + log: log, + executorFactory: executorFactory, + reporter: reporter, + botID: botID, + client: client, + channels: channels, + commGroupName: commGroupName, + renderer: NewSlackRenderer(), + botMentionRegex: botMentionRegex, + realNamesForID: map[string]string{}, + msgStatusTracker: NewSlackMessageStatusTracker(log, client), + messages: make(chan slackMessage, socketSlackMessageChannelSize), + messageWorkers: pool.New().WithMaxGoroutines(socketSlackMessageWorkersCount), }, nil } @@ -318,7 +323,7 @@ func (b *SocketSlack) startMessageProcessor(ctx context.Context) { defer b.log.Info("Stopped socket slack message processor...") for msg := range b.messages { - b.slackMessageWorkers.Go(func() { + b.messageWorkers.Go(func() { err := b.handleMessage(ctx, msg) if err != nil { b.log.Errorf("while handling message: %s", err.Error()) @@ -331,7 +336,7 @@ func (b *SocketSlack) shutdown() { b.shutdownOnce.Do(func() { b.log.Info("Shutting down socket slack message processor...") close(b.messages) - b.slackMessageWorkers.Wait() + b.messageWorkers.Wait() }) } From 18a36a6e9a57ad5ffa2e98db523bd6f0a48dcefc Mon Sep 17 00:00:00 2001 From: Huseyin BABAL Date: Thu, 10 Aug 2023 15:47:46 +0300 Subject: [PATCH 5/5] addressed pr comments --- pkg/bot/bot.go | 5 +++++ pkg/bot/discord.go | 23 ++++++++--------------- pkg/bot/mattermost.go | 13 +++++++------ pkg/bot/slack_cloud.go | 20 +++++++++----------- pkg/bot/slack_legacy.go | 8 +++++--- pkg/bot/slack_socket.go | 11 +++-------- 6 files changed, 37 insertions(+), 43 deletions(-) diff --git a/pkg/bot/bot.go b/pkg/bot/bot.go index 8bd70ac32..af0af5197 100644 --- a/pkg/bot/bot.go +++ b/pkg/bot/bot.go @@ -8,6 +8,11 @@ import ( "github.com/kubeshop/botkube/pkg/notifier" ) +const ( + platformMessageWorkersCount = 10 + platformMessageChannelSize = 100 +) + // Bot connects to communication channels and reads/sends messages. It is a two-way integration. type Bot interface { Start(ctx context.Context) error diff --git a/pkg/bot/discord.go b/pkg/bot/discord.go index ff1bcf366..f2135e11f 100644 --- a/pkg/bot/discord.go +++ b/pkg/bot/discord.go @@ -36,10 +36,6 @@ const ( // discordMaxMessageSize max size before a message should be uploaded as a file. discordMaxMessageSize = 2000 - - discordMessageWorkersCount = 10 - - discordMessageChannelSize = 100 ) // Discord listens for user's message, execute commands and sends back the response. @@ -92,8 +88,8 @@ func NewDiscord(log logrus.FieldLogger, commGroupName string, cfg config.Discord channels: channelsCfg, botMentionRegex: botMentionRegex, renderer: NewDiscordRenderer(), - messages: make(chan discordMessage, discordMessageChannelSize), - discordMessageWorkers: pool.New().WithMaxGoroutines(discordMessageWorkersCount), + messages: make(chan discordMessage, platformMessageChannelSize), + discordMessageWorkers: pool.New().WithMaxGoroutines(platformMessageWorkersCount), }, nil } @@ -105,7 +101,7 @@ func (b *Discord) startMessageProcessor(ctx context.Context) { b.discordMessageWorkers.Go(func() { err := b.handleMessage(ctx, msg) if err != nil { - b.log.Errorf("while handling message: %s", err.Error()) + b.log.WithError(err).Error("Failed to handle Discord message") } }) } @@ -115,12 +111,9 @@ func (b *Discord) Start(ctx context.Context) error { // Register the messageCreate func as a callback for MessageCreate events. b.api.AddHandler(func(s *discordgo.Session, m *discordgo.MessageCreate) { - msg := discordMessage{ + b.messages <- discordMessage{ Event: m, } - if err := b.handleMessage(ctx, msg); err != nil { - b.log.Errorf("Message handling error: %s", err.Error()) - } }) // Open a websocket connection to Discord and begin listening. @@ -366,13 +359,13 @@ func (b *Discord) formatMessage(msg interactive.CoreMessage) (*discordgo.Message func (b *Discord) shutdown() { b.shutdownOnce.Do(func() { b.log.Info("Shutting down discord message processor...") - close(b.messages) - b.discordMessageWorkers.Wait() - err := b.api.Close() if err != nil { - b.log.Errorf("While closing connection: %v", err) + b.log.WithError(err).Error("Failed to close discord connection") } + + close(b.messages) + b.discordMessageWorkers.Wait() }) } diff --git a/pkg/bot/mattermost.go b/pkg/bot/mattermost.go index 1863182f0..4176c51d3 100644 --- a/pkg/bot/mattermost.go +++ b/pkg/bot/mattermost.go @@ -37,8 +37,10 @@ const ( // mattermostMaxMessageSize max size before a message should be uploaded as a file. mattermostMaxMessageSize = 3990 - httpsScheme = "https" - mattermostBotMentionRegexFmt = "^@(?i)%s" + httpsScheme = "https" + mattermostBotMentionRegexFmt = "^@(?i)%s" + mattermostMessageChannelSize = 100 + mattermostMessageWorkersCount = 10 ) // TODO: @@ -139,8 +141,8 @@ func NewMattermost(ctx context.Context, log logrus.FieldLogger, commGroupName st botMentionRegex: botMentionRegex, renderer: NewMattermostRenderer(), userNamesForID: map[string]string{}, - messages: make(chan mattermostMessage, 100), - messageWorkers: pool.New().WithMaxGoroutines(10), + messages: make(chan mattermostMessage, platformMessageChannelSize), + messageWorkers: pool.New().WithMaxGoroutines(platformMessageWorkersCount), }, nil } @@ -152,8 +154,7 @@ func (b *Mattermost) startMessageProcessor(ctx context.Context) { b.messageWorkers.Go(func() { err := b.handleMessage(ctx, msg) if err != nil { - wrappedErr := fmt.Errorf("while handling message: %w", err) - b.log.Errorf(wrappedErr.Error()) + b.log.WithError(err).Error("Failed to handle Mattermost message") } }) } diff --git a/pkg/bot/slack_cloud.go b/pkg/bot/slack_cloud.go index 7e296bf41..1f2f164e3 100644 --- a/pkg/bot/slack_cloud.go +++ b/pkg/bot/slack_cloud.go @@ -36,14 +36,12 @@ import ( ) const ( - APIKeyContextKey = "X-Api-Key" // #nosec - DeploymentIDContextKey = "X-Deployment-Id" // #nosec - retryDelay = time.Second - maxRetries = 30 - successIntervalDuration = 3 * time.Minute - quotaExceededMsg = "Quota exceeded detected. Stopping reconnecting to Botkube Cloud gRPC API..." - cloudSlackMessageWorkersCount = 10 - cloudSlackMessageChannelSize = 100 + APIKeyContextKey = "X-Api-Key" // #nosec + DeploymentIDContextKey = "X-Deployment-Id" // #nosec + retryDelay = time.Second + maxRetries = 30 + successIntervalDuration = 3 * time.Minute + quotaExceededMsg = "Quota exceeded detected. Stopping reconnecting to Botkube Cloud gRPC API..." ) var _ Bot = &CloudSlack{} @@ -113,8 +111,8 @@ func NewCloudSlack(log logrus.FieldLogger, clusterName: clusterName, realNamesForID: map[string]string{}, msgStatusTracker: NewSlackMessageStatusTracker(log, client), - messages: make(chan *pb.ConnectResponse, cloudSlackMessageChannelSize), - messageWorkers: pool.New().WithMaxGoroutines(cloudSlackMessageWorkersCount), + messages: make(chan *pb.ConnectResponse, platformMessageChannelSize), + messageWorkers: pool.New().WithMaxGoroutines(platformMessageWorkersCount), }, nil } @@ -230,7 +228,7 @@ func (b *CloudSlack) startMessageProcessor(ctx context.Context) { b.messageWorkers.Go(func() { err, _ := b.handleStreamMessage(ctx, msg) if err != nil { - b.log.Errorf("while handling message: %s", err.Error()) + b.log.WithError(err).Error("Failed to handle Cloud Slack message") } }) } diff --git a/pkg/bot/slack_legacy.go b/pkg/bot/slack_legacy.go index 95d2bbcda..82ea76e0e 100644 --- a/pkg/bot/slack_legacy.go +++ b/pkg/bot/slack_legacy.go @@ -32,7 +32,9 @@ import ( // Maximum length for the text in this field is 3000 characters. (..)" // // source: https://api.slack.com/reference/block-kit/blocks#section -const slackMaxMessageSize = 3001 +const ( + slackMaxMessageSize = 3001 +) var _ Bot = &Slack{} @@ -92,8 +94,8 @@ func NewSlack(log logrus.FieldLogger, commGroupName string, cfg config.Slack, ex commGroupName: commGroupName, botMentionRegex: botMentionRegex, renderer: NewSlackRenderer(), - messages: make(chan slackLegacyMessage, 100), - slackMessageWorkers: pool.New().WithMaxGoroutines(10), + messages: make(chan slackLegacyMessage, platformMessageChannelSize), + slackMessageWorkers: pool.New().WithMaxGoroutines(platformMessageWorkersCount), }, nil } diff --git a/pkg/bot/slack_socket.go b/pkg/bot/slack_socket.go index dd933d25e..c9f4635a8 100644 --- a/pkg/bot/slack_socket.go +++ b/pkg/bot/slack_socket.go @@ -31,11 +31,6 @@ import ( // - split to multiple files in a separate package, // - review all the methods and see if they can be simplified. -const ( - socketSlackMessageWorkersCount = 10 - socketSlackMessageChannelSize = 100 -) - var _ Bot = &SocketSlack{} // SocketSlack listens for user's message, execute commands and sends back the response. @@ -96,8 +91,8 @@ func NewSocketSlack(log logrus.FieldLogger, commGroupName string, cfg config.Soc botMentionRegex: botMentionRegex, realNamesForID: map[string]string{}, msgStatusTracker: NewSlackMessageStatusTracker(log, client), - messages: make(chan slackMessage, socketSlackMessageChannelSize), - messageWorkers: pool.New().WithMaxGoroutines(socketSlackMessageWorkersCount), + messages: make(chan slackMessage, platformMessageChannelSize), + messageWorkers: pool.New().WithMaxGoroutines(platformMessageWorkersCount), }, nil } @@ -326,7 +321,7 @@ func (b *SocketSlack) startMessageProcessor(ctx context.Context) { b.messageWorkers.Go(func() { err := b.handleMessage(ctx, msg) if err != nil { - b.log.Errorf("while handling message: %s", err.Error()) + b.log.WithError(err).Error("Failed to handle Socket Slack message") } }) }