Skip to content

Commit

Permalink
renaming
Browse files Browse the repository at this point in the history
  • Loading branch information
huseyinbabal committed Aug 9, 2023
1 parent c623bb6 commit aa70c76
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 97 deletions.
8 changes: 6 additions & 2 deletions pkg/bot/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ const (

// discordMaxMessageSize max size before a message should be uploaded as a file.
discordMaxMessageSize = 2000

discordMessageWorkersCount = 10

discordMessageChannelSize = 100
)

// Discord listens for user's message, execute commands and sends back the response.
Expand Down Expand Up @@ -88,8 +92,8 @@ func NewDiscord(log logrus.FieldLogger, commGroupName string, cfg config.Discord
channels: channelsCfg,
botMentionRegex: botMentionRegex,
renderer: NewDiscordRenderer(),
messages: make(chan discordMessage, 100),
discordMessageWorkers: pool.New().WithMaxGoroutines(10),
messages: make(chan discordMessage, discordMessageChannelSize),
discordMessageWorkers: pool.New().WithMaxGoroutines(discordMessageWorkersCount),
}, nil
}

Expand Down
76 changes: 38 additions & 38 deletions pkg/bot/mattermost.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,26 @@ const (

// Mattermost listens for user's message, execute commands and sends back the response.
type Mattermost struct {
log logrus.FieldLogger
executorFactory ExecutorFactory
reporter AnalyticsReporter
serverURL string
botName string
botUserID string
teamName string
webSocketURL string
wsClient *model.WebSocketClient
apiClient *model.Client4
channelsMutex sync.RWMutex
commGroupName string
channels map[string]channelConfigByID
notifyMutex sync.Mutex
botMentionRegex *regexp.Regexp
renderer *MattermostRenderer
userNamesForID map[string]string
messages chan mattermostMessage
mmMessageWorkers *pool.Pool
shutdownOnce sync.Once
log logrus.FieldLogger
executorFactory ExecutorFactory
reporter AnalyticsReporter
serverURL string
botName string
botUserID string
teamName string
webSocketURL string
wsClient *model.WebSocketClient
apiClient *model.Client4
channelsMutex sync.RWMutex
commGroupName string
channels map[string]channelConfigByID
notifyMutex sync.Mutex
botMentionRegex *regexp.Regexp
renderer *MattermostRenderer
userNamesForID map[string]string
messages chan mattermostMessage
messageWorkers *pool.Pool
shutdownOnce sync.Once
}

// mattermostMessage contains message details to execute command and send back the result
Expand Down Expand Up @@ -125,22 +125,22 @@ func NewMattermost(ctx context.Context, log logrus.FieldLogger, commGroupName st
}

return &Mattermost{
log: log,
executorFactory: executorFactory,
reporter: reporter,
serverURL: cfg.URL,
botName: cfg.BotName,
botUserID: botUserID,
teamName: cfg.Team,
apiClient: client,
webSocketURL: webSocketURL,
commGroupName: commGroupName,
channels: channelsByIDCfg,
botMentionRegex: botMentionRegex,
renderer: NewMattermostRenderer(),
userNamesForID: map[string]string{},
messages: make(chan mattermostMessage, 100),
mmMessageWorkers: pool.New().WithMaxGoroutines(10),
log: log,
executorFactory: executorFactory,
reporter: reporter,
serverURL: cfg.URL,
botName: cfg.BotName,
botUserID: botUserID,
teamName: cfg.Team,
apiClient: client,
webSocketURL: webSocketURL,
commGroupName: commGroupName,
channels: channelsByIDCfg,
botMentionRegex: botMentionRegex,
renderer: NewMattermostRenderer(),
userNamesForID: map[string]string{},
messages: make(chan mattermostMessage, 100),
messageWorkers: pool.New().WithMaxGoroutines(10),
}, nil
}

Expand All @@ -149,7 +149,7 @@ func (b *Mattermost) startMessageProcessor(ctx context.Context) {
defer b.log.Info("Stopped mattermost message processor...")

for msg := range b.messages {
b.mmMessageWorkers.Go(func() {
b.messageWorkers.Go(func() {
err := b.handleMessage(ctx, msg)
if err != nil {
wrappedErr := fmt.Errorf("while handling message: %w", err)
Expand Down Expand Up @@ -505,7 +505,7 @@ func (b *Mattermost) shutdown() {
b.shutdownOnce.Do(func() {
b.log.Info("Shutting down mattermost message processor...")
close(b.messages)
b.mmMessageWorkers.Wait()
b.messageWorkers.Wait()
})
}

Expand Down
56 changes: 30 additions & 26 deletions pkg/bot/slack_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,36 +36,38 @@ import (
)

const (
APIKeyContextKey = "X-Api-Key" // #nosec
DeploymentIDContextKey = "X-Deployment-Id" // #nosec
retryDelay = time.Second
maxRetries = 30
successIntervalDuration = 3 * time.Minute
quotaExceededMsg = "Quota exceeded detected. Stopping reconnecting to Botkube Cloud gRPC API..."
APIKeyContextKey = "X-Api-Key" // #nosec
DeploymentIDContextKey = "X-Deployment-Id" // #nosec
retryDelay = time.Second
maxRetries = 30
successIntervalDuration = 3 * time.Minute
quotaExceededMsg = "Quota exceeded detected. Stopping reconnecting to Botkube Cloud gRPC API..."
cloudSlackMessageWorkersCount = 10
cloudSlackMessageChannelSize = 100
)

var _ Bot = &CloudSlack{}

// CloudSlack listens for user's message, execute commands and sends back the response.
type CloudSlack struct {
log logrus.FieldLogger
cfg config.CloudSlack
client *slack.Client
executorFactory ExecutorFactory
reporter cloudSlackAnalyticsReporter
commGroupName string
realNamesForID map[string]string
botMentionRegex *regexp.Regexp
botID string
channelsMutex sync.RWMutex
renderer *SlackRenderer
channels map[string]channelConfigByName
notifyMutex sync.Mutex
clusterName string
msgStatusTracker *SlackMessageStatusTracker
messages chan *pb.ConnectResponse
slackMessageWorkers *pool.Pool
shutdownOnce sync.Once
log logrus.FieldLogger
cfg config.CloudSlack
client *slack.Client
executorFactory ExecutorFactory
reporter cloudSlackAnalyticsReporter
commGroupName string
realNamesForID map[string]string
botMentionRegex *regexp.Regexp
botID string
channelsMutex sync.RWMutex
renderer *SlackRenderer
channels map[string]channelConfigByName
notifyMutex sync.Mutex
clusterName string
msgStatusTracker *SlackMessageStatusTracker
messages chan *pb.ConnectResponse
messageWorkers *pool.Pool
shutdownOnce sync.Once
}

// cloudSlackAnalyticsReporter defines a reporter that collects analytics data.
Expand Down Expand Up @@ -111,6 +113,8 @@ func NewCloudSlack(log logrus.FieldLogger,
clusterName: clusterName,
realNamesForID: map[string]string{},
msgStatusTracker: NewSlackMessageStatusTracker(log, client),
messages: make(chan *pb.ConnectResponse, cloudSlackMessageChannelSize),
messageWorkers: pool.New().WithMaxGoroutines(cloudSlackMessageWorkersCount),
}, nil
}

Expand Down Expand Up @@ -223,7 +227,7 @@ func (b *CloudSlack) startMessageProcessor(ctx context.Context) {
defer b.log.Info("Stopped cloud slack message processor...")

for msg := range b.messages {
b.slackMessageWorkers.Go(func() {
b.messageWorkers.Go(func() {
err, _ := b.handleStreamMessage(ctx, msg)
if err != nil {
b.log.Errorf("while handling message: %s", err.Error())
Expand All @@ -236,7 +240,7 @@ func (b *CloudSlack) shutdown() {
b.shutdownOnce.Do(func() {
b.log.Info("Shutting down cloud slack message processor...")
close(b.messages)
b.slackMessageWorkers.Wait()
b.messageWorkers.Wait()
})
}

Expand Down
67 changes: 36 additions & 31 deletions pkg/bot/slack_socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,31 @@ import (
// - split to multiple files in a separate package,
// - review all the methods and see if they can be simplified.

const (
socketSlackMessageWorkersCount = 10
socketSlackMessageChannelSize = 100
)

var _ Bot = &SocketSlack{}

// SocketSlack listens for user's message, execute commands and sends back the response.
type SocketSlack struct {
log logrus.FieldLogger
executorFactory ExecutorFactory
reporter socketSlackAnalyticsReporter
botID string
client *slack.Client
channelsMutex sync.RWMutex
channels map[string]channelConfigByName
notifyMutex sync.Mutex
botMentionRegex *regexp.Regexp
commGroupName string
renderer *SlackRenderer
realNamesForID map[string]string
msgStatusTracker *SlackMessageStatusTracker
messages chan slackMessage
slackMessageWorkers *pool.Pool
shutdownOnce sync.Once
log logrus.FieldLogger
executorFactory ExecutorFactory
reporter socketSlackAnalyticsReporter
botID string
client *slack.Client
channelsMutex sync.RWMutex
channels map[string]channelConfigByName
notifyMutex sync.Mutex
botMentionRegex *regexp.Regexp
commGroupName string
renderer *SlackRenderer
realNamesForID map[string]string
msgStatusTracker *SlackMessageStatusTracker
messages chan slackMessage
messageWorkers *pool.Pool
shutdownOnce sync.Once
}

// socketSlackAnalyticsReporter defines a reporter that collects analytics data.
Expand Down Expand Up @@ -80,19 +85,19 @@ func NewSocketSlack(log logrus.FieldLogger, commGroupName string, cfg config.Soc
}

return &SocketSlack{
log: log,
executorFactory: executorFactory,
reporter: reporter,
botID: botID,
client: client,
channels: channels,
commGroupName: commGroupName,
renderer: NewSlackRenderer(),
botMentionRegex: botMentionRegex,
realNamesForID: map[string]string{},
msgStatusTracker: NewSlackMessageStatusTracker(log, client),
messages: make(chan slackMessage, 100),
slackMessageWorkers: pool.New().WithMaxGoroutines(10),
log: log,
executorFactory: executorFactory,
reporter: reporter,
botID: botID,
client: client,
channels: channels,
commGroupName: commGroupName,
renderer: NewSlackRenderer(),
botMentionRegex: botMentionRegex,
realNamesForID: map[string]string{},
msgStatusTracker: NewSlackMessageStatusTracker(log, client),
messages: make(chan slackMessage, socketSlackMessageChannelSize),
messageWorkers: pool.New().WithMaxGoroutines(socketSlackMessageWorkersCount),
}, nil
}

Expand Down Expand Up @@ -318,7 +323,7 @@ func (b *SocketSlack) startMessageProcessor(ctx context.Context) {
defer b.log.Info("Stopped socket slack message processor...")

for msg := range b.messages {
b.slackMessageWorkers.Go(func() {
b.messageWorkers.Go(func() {
err := b.handleMessage(ctx, msg)
if err != nil {
b.log.Errorf("while handling message: %s", err.Error())
Expand All @@ -331,7 +336,7 @@ func (b *SocketSlack) shutdown() {
b.shutdownOnce.Do(func() {
b.log.Info("Shutting down socket slack message processor...")
close(b.messages)
b.slackMessageWorkers.Wait()
b.messageWorkers.Wait()
})
}

Expand Down

0 comments on commit aa70c76

Please sign in to comment.