diff --git a/go.mod b/go.mod index 338763588..5da926879 100644 --- a/go.mod +++ b/go.mod @@ -56,6 +56,7 @@ require ( github.com/sha1sum/aws_signing_client v0.0.0-20200229211254-f7815c59d5c1 github.com/sirupsen/logrus v1.9.0 github.com/slack-go/slack v0.12.2 + github.com/sourcegraph/conc v0.3.0 github.com/spf13/cobra v1.7.0 github.com/spf13/pflag v1.0.5 github.com/spiffe/spire v1.5.6 @@ -250,6 +251,8 @@ require ( go.opentelemetry.io/otel/metric v1.16.0 // indirect go.opentelemetry.io/otel/trace v1.16.0 // indirect go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5 // indirect + go.uber.org/atomic v1.11.0 // indirect + go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.10.0 // indirect golang.org/x/net v0.11.0 // indirect golang.org/x/sys v0.9.0 // indirect diff --git a/go.sum b/go.sum index c89750dfb..b17c1729c 100644 --- a/go.sum +++ b/go.sum @@ -1143,6 +1143,8 @@ github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1 github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:UdhH50NIW0fCiwBSr0co2m7BnFLdv4fQTgdqdJTHFeE= +github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= +github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= @@ -1272,10 +1274,12 @@ go.szostok.io/version v1.2.0 h1:8eMMdfsonjbibwZRLJ8TnrErY8bThFTQsZYV16mcXms= go.szostok.io/version v1.2.0/go.mod h1:EiU0gPxaXb6MZ+apSN0WgDO6F4JXyC99k9PIXf2k2E8= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= -go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= diff --git a/pkg/bot/bot.go b/pkg/bot/bot.go index 8bd70ac32..af0af5197 100644 --- a/pkg/bot/bot.go +++ b/pkg/bot/bot.go @@ -8,6 +8,11 @@ import ( "github.com/kubeshop/botkube/pkg/notifier" ) +const ( + platformMessageWorkersCount = 10 + platformMessageChannelSize = 100 +) + // Bot connects to communication channels and reads/sends messages. It is a two-way integration. type Bot interface { Start(ctx context.Context) error diff --git a/pkg/bot/discord.go b/pkg/bot/discord.go index e6dfdeec5..f2135e11f 100644 --- a/pkg/bot/discord.go +++ b/pkg/bot/discord.go @@ -11,6 +11,7 @@ import ( "github.com/bwmarrin/discordgo" "github.com/sirupsen/logrus" + "github.com/sourcegraph/conc/pool" "github.com/kubeshop/botkube/pkg/api" "github.com/kubeshop/botkube/pkg/bot/interactive" @@ -39,17 +40,20 @@ const ( // Discord listens for user's message, execute commands and sends back the response. type Discord struct { - log logrus.FieldLogger - executorFactory ExecutorFactory - reporter AnalyticsReporter - api *discordgo.Session - botID string - channelsMutex sync.RWMutex - channels map[string]channelConfigByID - notifyMutex sync.Mutex - botMentionRegex *regexp.Regexp - commGroupName string - renderer *DiscordRenderer + log logrus.FieldLogger + executorFactory ExecutorFactory + reporter AnalyticsReporter + api *discordgo.Session + botID string + channelsMutex sync.RWMutex + channels map[string]channelConfigByID + notifyMutex sync.Mutex + botMentionRegex *regexp.Regexp + commGroupName string + renderer *DiscordRenderer + messages chan discordMessage + discordMessageWorkers *pool.Pool + shutdownOnce sync.Once } // discordMessage contains message details to execute command and send back the result. @@ -75,30 +79,41 @@ func NewDiscord(log logrus.FieldLogger, commGroupName string, cfg config.Discord } return &Discord{ - log: log, - reporter: reporter, - executorFactory: executorFactory, - api: api, - botID: cfg.BotID, - commGroupName: commGroupName, - channels: channelsCfg, - botMentionRegex: botMentionRegex, - renderer: NewDiscordRenderer(), + log: log, + reporter: reporter, + executorFactory: executorFactory, + api: api, + botID: cfg.BotID, + commGroupName: commGroupName, + channels: channelsCfg, + botMentionRegex: botMentionRegex, + renderer: NewDiscordRenderer(), + messages: make(chan discordMessage, platformMessageChannelSize), + discordMessageWorkers: pool.New().WithMaxGoroutines(platformMessageWorkersCount), }, nil } -// Start starts the Discord websocket connection and listens for messages. +func (b *Discord) startMessageProcessor(ctx context.Context) { + b.log.Info("Starting discord message processor...") + defer b.log.Info("Stopped discord message processor...") + + for msg := range b.messages { + b.discordMessageWorkers.Go(func() { + err := b.handleMessage(ctx, msg) + if err != nil { + b.log.WithError(err).Error("Failed to handle Discord message") + } + }) + } +} // Start starts the Discord websocket connection and listens for messages. func (b *Discord) Start(ctx context.Context) error { b.log.Info("Starting bot") // Register the messageCreate func as a callback for MessageCreate events. b.api.AddHandler(func(s *discordgo.Session, m *discordgo.MessageCreate) { - msg := discordMessage{ + b.messages <- discordMessage{ Event: m, } - if err := b.handleMessage(ctx, msg); err != nil { - b.log.Errorf("Message handling error: %s", err.Error()) - } }) // Open a websocket connection to Discord and begin listening. @@ -113,14 +128,10 @@ func (b *Discord) Start(ctx context.Context) error { } b.log.Info("Botkube connected to Discord!") - + go b.startMessageProcessor(ctx) <-ctx.Done() b.log.Info("Shutdown requested. Finishing...") - err = b.api.Close() - if err != nil { - return fmt.Errorf("while closing connection: %w", err) - } - + b.shutdown() return nil } @@ -345,6 +356,19 @@ func (b *Discord) formatMessage(msg interactive.CoreMessage) (*discordgo.Message }, nil } +func (b *Discord) shutdown() { + b.shutdownOnce.Do(func() { + b.log.Info("Shutting down discord message processor...") + err := b.api.Close() + if err != nil { + b.log.WithError(err).Error("Failed to close discord connection") + } + + close(b.messages) + b.discordMessageWorkers.Wait() + }) +} + func discordChannelsConfigFrom(log logrus.FieldLogger, api *discordgo.Session, channelsCfg config.IdentifiableMap[config.ChannelBindingsByID]) (map[string]channelConfigByID, error) { res := make(map[string]channelConfigByID) for channAlias, channCfg := range channelsCfg { diff --git a/pkg/bot/mattermost.go b/pkg/bot/mattermost.go index 5c1886edb..4176c51d3 100644 --- a/pkg/bot/mattermost.go +++ b/pkg/bot/mattermost.go @@ -12,6 +12,7 @@ import ( "github.com/mattermost/mattermost/server/public/model" "github.com/sirupsen/logrus" + "github.com/sourcegraph/conc/pool" "github.com/kubeshop/botkube/pkg/api" "github.com/kubeshop/botkube/pkg/bot/interactive" @@ -36,8 +37,10 @@ const ( // mattermostMaxMessageSize max size before a message should be uploaded as a file. mattermostMaxMessageSize = 3990 - httpsScheme = "https" - mattermostBotMentionRegexFmt = "^@(?i)%s" + httpsScheme = "https" + mattermostBotMentionRegexFmt = "^@(?i)%s" + mattermostMessageChannelSize = 100 + mattermostMessageWorkersCount = 10 ) // TODO: @@ -63,6 +66,9 @@ type Mattermost struct { botMentionRegex *regexp.Regexp renderer *MattermostRenderer userNamesForID map[string]string + messages chan mattermostMessage + messageWorkers *pool.Pool + shutdownOnce sync.Once } // mattermostMessage contains message details to execute command and send back the result @@ -135,10 +141,24 @@ func NewMattermost(ctx context.Context, log logrus.FieldLogger, commGroupName st botMentionRegex: botMentionRegex, renderer: NewMattermostRenderer(), userNamesForID: map[string]string{}, + messages: make(chan mattermostMessage, platformMessageChannelSize), + messageWorkers: pool.New().WithMaxGoroutines(platformMessageWorkersCount), }, nil } -// Start establishes mattermost connection and listens for messages +func (b *Mattermost) startMessageProcessor(ctx context.Context) { + b.log.Info("Starting mattermost message processor...") + defer b.log.Info("Stopped mattermost message processor...") + + for msg := range b.messages { + b.messageWorkers.Go(func() { + err := b.handleMessage(ctx, msg) + if err != nil { + b.log.WithError(err).Error("Failed to handle Mattermost message") + } + }) + } +} // Start establishes mattermost connection and listens for messages func (b *Mattermost) Start(ctx context.Context) error { b.log.Info("Starting bot") @@ -157,10 +177,14 @@ func (b *Mattermost) Start(ctx context.Context) error { // For now, we are adding retry logic to reconnect to the server // https://github.com/kubeshop/botkube/issues/201 b.log.Info("Botkube connected to Mattermost!") + + go b.startMessageProcessor(ctx) + for { select { case <-ctx.Done(): b.log.Info("Shutdown requested. Finishing...") + b.shutdown() return nil default: var appErr error @@ -213,7 +237,7 @@ func (b *Mattermost) SetNotificationsEnabled(channelID string, enabled bool) err } // Check incoming message and take action -func (b *Mattermost) handleMessage(ctx context.Context, mm *mattermostMessage) error { +func (b *Mattermost) handleMessage(ctx context.Context, mm mattermostMessage) error { post, err := postFromEvent(mm.Event) if err != nil { return fmt.Errorf("while getting post from event: %w", err) @@ -387,14 +411,10 @@ func (b *Mattermost) listen(ctx context.Context) { continue } - mm := &mattermostMessage{ + mm := mattermostMessage{ Event: event, } - err := b.handleMessage(ctx, mm) - if err != nil { - wrappedErr := fmt.Errorf("while handling message: %w", err) - b.log.Errorf(wrappedErr.Error()) - } + b.messages <- mm } } } @@ -482,6 +502,14 @@ func (b *Mattermost) getUserName(ctx context.Context, userID string) (string, er return user.Username, nil } +func (b *Mattermost) shutdown() { + b.shutdownOnce.Do(func() { + b.log.Info("Shutting down mattermost message processor...") + close(b.messages) + b.messageWorkers.Wait() + }) +} + func getBotUserID(ctx context.Context, client *model.Client4, teamID, botName string) (string, error) { users, _, err := client.AutocompleteUsersInTeam(ctx, teamID, botName, 1, "") if err != nil { diff --git a/pkg/bot/slack_cloud.go b/pkg/bot/slack_cloud.go index 30e4f6022..1f2f164e3 100644 --- a/pkg/bot/slack_cloud.go +++ b/pkg/bot/slack_cloud.go @@ -16,6 +16,7 @@ import ( "github.com/sirupsen/logrus" "github.com/slack-go/slack" "github.com/slack-go/slack/slackevents" + "github.com/sourcegraph/conc/pool" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" @@ -62,6 +63,9 @@ type CloudSlack struct { notifyMutex sync.Mutex clusterName string msgStatusTracker *SlackMessageStatusTracker + messages chan *pb.ConnectResponse + messageWorkers *pool.Pool + shutdownOnce sync.Once } // cloudSlackAnalyticsReporter defines a reporter that collects analytics data. @@ -107,6 +111,8 @@ func NewCloudSlack(log logrus.FieldLogger, clusterName: clusterName, realNamesForID: map[string]string{}, msgStatusTracker: NewSlackMessageStatusTracker(log, client), + messages: make(chan *pb.ConnectResponse, platformMessageChannelSize), + messageWorkers: pool.New().WithMaxGoroutines(platformMessageWorkersCount), }, nil } @@ -192,157 +198,188 @@ func (b *CloudSlack) start(ctx context.Context) error { return fmt.Errorf("while sending gRPC connection request. %w", err) } + defer b.shutdown() + + go b.startMessageProcessor(ctx) + for { data, err := c.Recv() if err != nil { if err == io.EOF { b.log.Warn("gRPC connection was closed by server") - return nil + return errors.New("gRPC connection closed") } errStatus, ok := status.FromError(err) if ok && errStatus.Code() == codes.Canceled && errStatus.Message() == context.Canceled.Error() { b.log.Debugf("Context was cancelled. Skipping returning error...") - return nil + return fmt.Errorf("while resolving error from gRPC response %s", errStatus.Err()) } return fmt.Errorf("while receiving cloud slack events: %w", err) } - if streamingError := b.checkStreamingError(data.Event); pb.IsQuotaExceededErr(streamingError) { - b.log.Warn(quotaExceededMsg) - return nil - } - if len(data.Event) == 0 { - continue + b.messages <- data + } +} + +func (b *CloudSlack) startMessageProcessor(ctx context.Context) { + b.log.Info("Starting cloud slack message processor...") + defer b.log.Info("Stopped cloud slack message processor...") + + for msg := range b.messages { + b.messageWorkers.Go(func() { + err, _ := b.handleStreamMessage(ctx, msg) + if err != nil { + b.log.WithError(err).Error("Failed to handle Cloud Slack message") + } + }) + } +} + +func (b *CloudSlack) shutdown() { + b.shutdownOnce.Do(func() { + b.log.Info("Shutting down cloud slack message processor...") + close(b.messages) + b.messageWorkers.Wait() + }) +} + +func (b *CloudSlack) handleStreamMessage(ctx context.Context, data *pb.ConnectResponse) (error, bool) { + if streamingError := b.checkStreamingError(data.Event); pb.IsQuotaExceededErr(streamingError) { + b.log.Warn(quotaExceededMsg) + return nil, true + } + if len(data.Event) == 0 { + return nil, false + } + event, err := slackevents.ParseEvent(data.Event, slackevents.OptionNoVerifyToken()) + if err != nil { + return fmt.Errorf("while parsing event: %w", err), true + } + switch event.Type { + case slackevents.CallbackEvent: + b.log.Debugf("Got callback event %s", formatx.StructDumper().Sdump(event)) + innerEvent := event.InnerEvent + switch ev := innerEvent.Data.(type) { + case *slackevents.AppMentionEvent: + b.log.Debugf("Got app mention %s", formatx.StructDumper().Sdump(innerEvent)) + userName := b.getRealNameWithFallbackToUserID(ctx, ev.User) + msg := slackMessage{ + Text: ev.Text, + Channel: ev.Channel, + ThreadTimeStamp: ev.ThreadTimeStamp, + UserID: ev.User, + EventTimeStamp: ev.EventTimeStamp, + UserName: userName, + CommandOrigin: command.TypedOrigin, + } + + if err := b.handleMessage(ctx, msg); err != nil { + b.log.Errorf("while handling message: %s", err.Error()) + } + case *slackevents.MessageEvent: + b.log.Debugf("Got generic message event %s", formatx.StructDumper().Sdump(innerEvent)) + msg := slackMessage{ + Text: ev.Text, + Channel: ev.Channel, + UserID: ev.User, + EventTimeStamp: ev.EventTimeStamp, + } + response := quotaExceeded() + + if err := b.send(ctx, msg, response); err != nil { + return fmt.Errorf("while sending message: %w", err), true + } } - event, err := slackevents.ParseEvent(data.Event, slackevents.OptionNoVerifyToken()) + case string(slack.InteractionTypeBlockActions), string(slack.InteractionTypeViewSubmission): + var callback slack.InteractionCallback + err = json.Unmarshal(data.Event, &callback) if err != nil { - return fmt.Errorf("while parsing event: %w", err) + b.log.Errorf("Invalid event %+v\n", data.Event) + return fmt.Errorf("Invalid event %+v\n", data.Event), false } - switch event.Type { - case slackevents.CallbackEvent: - b.log.Debugf("Got callback event %s", formatx.StructDumper().Sdump(event)) - innerEvent := event.InnerEvent - switch ev := innerEvent.Data.(type) { - case *slackevents.AppMentionEvent: - b.log.Debugf("Got app mention %s", formatx.StructDumper().Sdump(innerEvent)) - userName := b.getRealNameWithFallbackToUserID(ctx, ev.User) - msg := slackMessage{ - Text: ev.Text, - Channel: ev.Channel, - ThreadTimeStamp: ev.ThreadTimeStamp, - UserID: ev.User, - EventTimeStamp: ev.EventTimeStamp, - UserName: userName, - CommandOrigin: command.TypedOrigin, - } - if err := b.handleMessage(ctx, msg); err != nil { - b.log.Errorf("while handling message: %s", err.Error()) - } - case *slackevents.MessageEvent: - b.log.Debugf("Got generic message event %s", formatx.StructDumper().Sdump(innerEvent)) - msg := slackMessage{ - Text: ev.Text, - Channel: ev.Channel, - UserID: ev.User, - EventTimeStamp: ev.EventTimeStamp, - } - response := quotaExceeded() + switch callback.Type { + case slack.InteractionTypeBlockActions: + b.log.Debugf("Got block action %s", formatx.StructDumper().Sdump(callback)) - if err := b.send(ctx, msg, response); err != nil { - return fmt.Errorf("while sending message: %w", err) - } - } - case string(slack.InteractionTypeBlockActions), string(slack.InteractionTypeViewSubmission): - var callback slack.InteractionCallback - err = json.Unmarshal(data.Event, &callback) - if err != nil { - b.log.Errorf("Invalid event %+v\n", data.Event) - continue + if len(callback.ActionCallback.BlockActions) != 1 { + b.log.Debug("Ignoring callback as the number of actions is different from 1") + return nil, false } - switch callback.Type { - case slack.InteractionTypeBlockActions: - b.log.Debugf("Got block action %s", formatx.StructDumper().Sdump(callback)) - - if len(callback.ActionCallback.BlockActions) != 1 { - b.log.Debug("Ignoring callback as the number of actions is different from 1") - continue + act := callback.ActionCallback.BlockActions[0] + if act == nil || strings.HasPrefix(act.ActionID, urlButtonActionIDPrefix) { + reportErr := b.reporter.ReportCommand(b.IntegrationName(), act.ActionID, command.ButtonClickOrigin, false) + if reportErr != nil { + b.log.Errorf("while reporting URL command, error: %s", reportErr.Error()) } + return nil, false // skip the url actions + } - act := callback.ActionCallback.BlockActions[0] - if act == nil || strings.HasPrefix(act.ActionID, urlButtonActionIDPrefix) { - reportErr := b.reporter.ReportCommand(b.IntegrationName(), act.ActionID, command.ButtonClickOrigin, false) - if reportErr != nil { - b.log.Errorf("while reporting URL command, error: %s", reportErr.Error()) - } - continue // skip the url actions - } + channelID := callback.Channel.ID + if channelID == "" && callback.View.ID != "" { + // TODO: add support when we will need to handle button clicks from active modal. + // + // The request is coming from active modal, currently we don't support that. + // We process that only when the modal is submitted (see slack.InteractionTypeViewSubmission action type). + b.log.Debug("Ignoring callback as its source is an active modal") + return nil, false + } - channelID := callback.Channel.ID - if channelID == "" && callback.View.ID != "" { - // TODO: add support when we will need to handle button clicks from active modal. - // - // The request is coming from active modal, currently we don't support that. - // We process that only when the modal is submitted (see slack.InteractionTypeViewSubmission action type). - b.log.Debug("Ignoring callback as its source is an active modal") - continue - } + cmd, cmdOrigin := resolveBlockActionCommand(*act) + // Use thread's TS if interactive call triggered within thread. + threadTs := callback.MessageTs + if callback.Message.Msg.ThreadTimestamp != "" { + threadTs = callback.Message.Msg.ThreadTimestamp + } - cmd, cmdOrigin := resolveBlockActionCommand(*act) - // Use thread's TS if interactive call triggered within thread. - threadTs := callback.MessageTs - if callback.Message.Msg.ThreadTimestamp != "" { - threadTs = callback.Message.Msg.ThreadTimestamp - } + state := removeBotNameFromIDs(b.BotName(), callback.BlockActionState) + + userName := b.getRealNameWithFallbackToUserID(ctx, callback.User.ID) + msg := slackMessage{ + Text: cmd, + Channel: channelID, + ThreadTimeStamp: threadTs, + TriggerID: callback.TriggerID, + UserID: callback.User.ID, + UserName: userName, + CommandOrigin: cmdOrigin, + State: state, + EventTimeStamp: callback.Message.Timestamp, + ResponseURL: callback.ResponseURL, + BlockID: act.BlockID, + } + if err := b.handleMessage(ctx, msg); err != nil { + b.log.Errorf("Message handling error: %s", err.Error()) + } + case slack.InteractionTypeViewSubmission: // this event is received when modal is submitted + + // the map key is the ID of the input block, for us, it's autogenerated + for _, item := range callback.View.State.Values { + for actID, act := range item { + act.ActionID = actID // normalize event + + cmd, cmdOrigin := resolveBlockActionCommand(act) + userName := b.getRealNameWithFallbackToUserID(ctx, callback.User.ID) + msg := slackMessage{ + Text: cmd, + Channel: callback.View.PrivateMetadata, + UserID: callback.User.ID, + UserName: userName, + EventTimeStamp: "", // there is no timestamp for interactive callbacks + CommandOrigin: cmdOrigin, + } - state := removeBotNameFromIDs(b.BotName(), callback.BlockActionState) - - userName := b.getRealNameWithFallbackToUserID(ctx, callback.User.ID) - msg := slackMessage{ - Text: cmd, - Channel: channelID, - ThreadTimeStamp: threadTs, - TriggerID: callback.TriggerID, - UserID: callback.User.ID, - UserName: userName, - CommandOrigin: cmdOrigin, - State: state, - EventTimeStamp: callback.Message.Timestamp, - ResponseURL: callback.ResponseURL, - BlockID: act.BlockID, - } - if err := b.handleMessage(ctx, msg); err != nil { - b.log.Errorf("Message handling error: %s", err.Error()) - } - case slack.InteractionTypeViewSubmission: // this event is received when modal is submitted - - // the map key is the ID of the input block, for us, it's autogenerated - for _, item := range callback.View.State.Values { - for actID, act := range item { - act.ActionID = actID // normalize event - - cmd, cmdOrigin := resolveBlockActionCommand(act) - userName := b.getRealNameWithFallbackToUserID(ctx, callback.User.ID) - msg := slackMessage{ - Text: cmd, - Channel: callback.View.PrivateMetadata, - UserID: callback.User.ID, - UserName: userName, - EventTimeStamp: "", // there is no timestamp for interactive callbacks - CommandOrigin: cmdOrigin, - } - - if err := b.handleMessage(ctx, msg); err != nil { - b.log.Errorf("Message handling error: %s", err.Error()) - } + if err := b.handleMessage(ctx, msg); err != nil { + b.log.Errorf("Message handling error: %s", err.Error()) } } - default: - b.log.Debugf("get unhandled event %s", callback.Type) } + default: + b.log.Debugf("get unhandled event %s", callback.Type) } - b.log.Debugf("received: %q\n", event) } + b.log.Debugf("received: %q\n", event) + return nil, false } func (b *CloudSlack) SendMessage(ctx context.Context, msg interactive.CoreMessage, sourceBindings []string) error { diff --git a/pkg/bot/slack_legacy.go b/pkg/bot/slack_legacy.go index d75198b82..82ea76e0e 100644 --- a/pkg/bot/slack_legacy.go +++ b/pkg/bot/slack_legacy.go @@ -9,6 +9,7 @@ import ( "github.com/sirupsen/logrus" "github.com/slack-go/slack" + "github.com/sourcegraph/conc/pool" "github.com/kubeshop/botkube/internal/analytics" "github.com/kubeshop/botkube/pkg/bot/interactive" @@ -31,23 +32,28 @@ import ( // Maximum length for the text in this field is 3000 characters. (..)" // // source: https://api.slack.com/reference/block-kit/blocks#section -const slackMaxMessageSize = 3001 +const ( + slackMaxMessageSize = 3001 +) var _ Bot = &Slack{} // Slack listens for user's message, execute commands and sends back the response. type Slack struct { - log logrus.FieldLogger - executorFactory ExecutorFactory - reporter FatalErrorAnalyticsReporter - botID string - client *slack.Client - channelsMutex sync.RWMutex - channels map[string]channelConfigByName - notifyMutex sync.Mutex - botMentionRegex *regexp.Regexp - commGroupName string - renderer *SlackRenderer + log logrus.FieldLogger + executorFactory ExecutorFactory + reporter FatalErrorAnalyticsReporter + botID string + client *slack.Client + channelsMutex sync.RWMutex + channels map[string]channelConfigByName + notifyMutex sync.Mutex + botMentionRegex *regexp.Regexp + commGroupName string + renderer *SlackRenderer + messages chan slackLegacyMessage + slackMessageWorkers *pool.Pool + shutdownOnce sync.Once } // slackLegacyMessage contains message details to execute command and send back the result @@ -79,18 +85,34 @@ func NewSlack(log logrus.FieldLogger, commGroupName string, cfg config.Slack, ex } return &Slack{ - log: log, - executorFactory: executorFactory, - reporter: reporter, - botID: botID, - client: client, - channels: channels, - commGroupName: commGroupName, - botMentionRegex: botMentionRegex, - renderer: NewSlackRenderer(), + log: log, + executorFactory: executorFactory, + reporter: reporter, + botID: botID, + client: client, + channels: channels, + commGroupName: commGroupName, + botMentionRegex: botMentionRegex, + renderer: NewSlackRenderer(), + messages: make(chan slackLegacyMessage, platformMessageChannelSize), + slackMessageWorkers: pool.New().WithMaxGoroutines(platformMessageWorkersCount), }, nil } +func (b *Slack) startMessageProcessor(ctx context.Context) { + b.log.Info("Starting slack message processor...") + defer b.log.Info("Stopped slack message processor...") + + for msg := range b.messages { + b.slackMessageWorkers.Go(func() { + err := b.handleMessage(ctx, msg) + if err != nil { + b.log.Errorf("while handling message: %s", err.Error()) + } + }) + } +} + // Start starts the Slack RTM connection and listens for messages func (b *Slack) Start(ctx context.Context) error { b.log.Info("Starting bot") @@ -101,11 +123,14 @@ func (b *Slack) Start(ctx context.Context) error { rtm.ManageConnection() }() + go b.startMessageProcessor(ctx) + for { select { case <-ctx.Done(): b.log.Info("Shutdown requested. Finishing...") - return rtm.Disconnect() + b.shutdown(rtm) + return nil case msg, ok := <-rtm.IncomingEvents: if !ok { b.log.Info("Incoming events channel closed. Finishing...") @@ -372,6 +397,19 @@ func (b *Slack) findAndTrimBotMention(msg string) (string, bool) { return b.botMentionRegex.ReplaceAllString(msg, ""), true } +func (b *Slack) shutdown(rtm *slack.RTM) { + b.shutdownOnce.Do(func() { + b.log.Info("Shutting down slack message processor...") + close(b.messages) + b.slackMessageWorkers.Wait() + + err := rtm.Disconnect() + if err != nil { + b.log.Errorf("Error while disconnecting from Slack: %v", err) + } + }) +} + func uploadFileToSlack(ctx context.Context, channel string, resp interactive.CoreMessage, client *slack.Client, ts string) (*slack.File, error) { params := slack.FileUploadParameters{ Filename: "Response.txt", diff --git a/pkg/bot/slack_socket.go b/pkg/bot/slack_socket.go index 8355139de..c9f4635a8 100644 --- a/pkg/bot/slack_socket.go +++ b/pkg/bot/slack_socket.go @@ -7,12 +7,14 @@ import ( "regexp" "strings" "sync" + "time" "github.com/google/uuid" "github.com/sirupsen/logrus" "github.com/slack-go/slack" "github.com/slack-go/slack/slackevents" "github.com/slack-go/slack/socketmode" + "github.com/sourcegraph/conc/pool" "github.com/kubeshop/botkube/internal/analytics" "github.com/kubeshop/botkube/pkg/api" @@ -46,6 +48,9 @@ type SocketSlack struct { renderer *SlackRenderer realNamesForID map[string]string msgStatusTracker *SlackMessageStatusTracker + messages chan slackMessage + messageWorkers *pool.Pool + shutdownOnce sync.Once } // socketSlackAnalyticsReporter defines a reporter that collects analytics data. @@ -86,6 +91,8 @@ func NewSocketSlack(log logrus.FieldLogger, commGroupName string, cfg config.Soc botMentionRegex: botMentionRegex, realNamesForID: map[string]string{}, msgStatusTracker: NewSlackMessageStatusTracker(log, client), + messages: make(chan slackMessage, platformMessageChannelSize), + messageWorkers: pool.New().WithMaxGoroutines(platformMessageWorkersCount), }, nil } @@ -106,10 +113,13 @@ func (b *SocketSlack) Start(ctx context.Context) error { } }() + go b.startMessageProcessor(ctx) + for { select { case <-ctx.Done(): b.log.Info("Shutdown requested. Finishing...") + b.shutdown() return nil case event := <-websocketClient.Events: switch event.Type { @@ -144,9 +154,7 @@ func (b *SocketSlack) Start(ctx context.Context) error { CommandOrigin: command.TypedOrigin, } - if err := b.handleMessage(ctx, msg); err != nil { - b.log.Errorf("while handling message: %s", err.Error()) - } + b.messages <- msg } } case socketmode.EventTypeInteractive: @@ -209,9 +217,7 @@ func (b *SocketSlack) Start(ctx context.Context) error { ResponseURL: callback.ResponseURL, BlockID: act.BlockID, } - if err := b.handleMessage(ctx, msg); err != nil { - b.log.Errorf("Message handling error: %s", err.Error()) - } + b.messages <- msg case slack.InteractionTypeViewSubmission: // this event is received when modal is submitted // the map key is the ID of the input block, for us, it's autogenerated @@ -230,9 +236,7 @@ func (b *SocketSlack) Start(ctx context.Context) error { CommandOrigin: cmdOrigin, } - if err := b.handleMessage(ctx, msg); err != nil { - b.log.Errorf("Message handling error: %s", err.Error()) - } + b.messages <- msg } } default: @@ -309,6 +313,28 @@ func (b *SocketSlack) SetNotificationsEnabled(channelName string, enabled bool) return nil } +func (b *SocketSlack) startMessageProcessor(ctx context.Context) { + b.log.Info("Starting socket slack message processor...") + defer b.log.Info("Stopped socket slack message processor...") + + for msg := range b.messages { + b.messageWorkers.Go(func() { + err := b.handleMessage(ctx, msg) + if err != nil { + b.log.WithError(err).Error("Failed to handle Socket Slack message") + } + }) + } +} + +func (b *SocketSlack) shutdown() { + b.shutdownOnce.Do(func() { + b.log.Info("Shutting down socket slack message processor...") + close(b.messages) + b.messageWorkers.Wait() + }) +} + func (b *SocketSlack) handleMessage(ctx context.Context, event slackMessage) error { // Handle message only if starts with mention request, found := b.findAndTrimBotMention(event.Text) @@ -318,6 +344,7 @@ func (b *SocketSlack) handleMessage(ctx context.Context, event slackMessage) err } b.log.Debugf("Slack incoming Request: %s", request) + time.Sleep(time.Second * 15) // Unfortunately we need to do a call for channel name based on ID every time a message arrives. // I wanted to query for channel IDs based on names and prepare a map in the `slackChannelsConfigFrom`,