Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Discord integration #1507

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions models/integrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ type SlackIntegration struct {
Client *slack.Client `json:"client"`
}

type DiscordIntegration struct {
Name string `json:"name"`
Keys map[string]string `json:"keys"`
Properties map[string]bool `json:"properties"`
}

type CreateIntegrationSchema struct {
Name string `json:"name"`
Keys map[string]interface{} `json:"keys"`
Expand Down
51 changes: 38 additions & 13 deletions server/background_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ func (s *Server) ListenForIntegrationsUpdateEvents() error {
EditClusterCompHost("ui_host", integrationUpdate.UIUrl)
}
CacheDetails("slack", integrationUpdate.Keys, integrationUpdate.Properties, integrationUpdate.TenantName)
case "discord":
if s.opts.UiHost == "" {
EditClusterCompHost("ui_host", integrationUpdate.UIUrl)
}
CacheDetails("discord", integrationUpdate.Keys, integrationUpdate.Properties, integrationUpdate.TenantName)
case "s3":
CacheDetails("s3", integrationUpdate.Keys, integrationUpdate.Properties, integrationUpdate.TenantName)
case "github":
Expand Down Expand Up @@ -355,7 +360,7 @@ func (s *Server) StartBackgroundTasks() error {
go s.ScaleFunctionWorkers()
go s.ConnectorsDeadPodsRescheduler()

return nil
return nil
}

func (s *Server) uploadMsgsToTier2Storage() {
Expand Down Expand Up @@ -690,6 +695,28 @@ func (s *Server) CheckBrokenConnectedIntegrations() error {
serv.Errorf("[tenant: %s]CheckBrokenConnectedIntegrations at UpdateIsValidIntegration: %v", integration.TenantName, err.Error())
}
}
case "discord":
key := getAESKey()
if _, ok := integration.Keys["webhook_url"].(string); !ok {
integration.Keys["webhook_url"] = ""
}
webhookUrl, err := DecryptAES(key, integration.Keys["webhook_url"].(string))
if err != nil {
serv.Errorf("[tenant: %s]CheckBrokenConnectedIntegrations at DecryptAES: %v", integration.TenantName, err.Error())
}
err = testDiscordIntegration(webhookUrl)
if err != nil {
serv.Warnf("[tenant: %s]CheckBrokenConnectedIntegrations at testDiscordIntegration: %v", integration.TenantName, err.Error())
err = db.UpdateIsValidIntegration(integration.TenantName, integration.Name, false)
if err != nil {
serv.Errorf("[tenant: %s]CheckBrokenConnectedIntegrations at UpdateIsValidIntegration: %v", integration.TenantName, err.Error())
}
} else {
err = db.UpdateIsValidIntegration(integration.TenantName, integration.Name, true)
if err != nil {
serv.Errorf("[tenant: %s]CheckBrokenConnectedIntegrations at UpdateIsValidIntegration: %v", integration.TenantName, err.Error())
}
}
case "s3":
key := getAESKey()
if _, ok := integration.Keys["access_key"].(string); !ok {
Expand Down Expand Up @@ -771,45 +798,43 @@ func (s *Server) ConsumeNotificationsBufferMessages() error {
continue
}

msgs, err := fetchMessages[slackMsg](s,
msgs, err := fetchMessages(s,
NOTIFICATIONS_BUFFER_CONSUMER,
notificationsStreamName,
mAmount,
3*time.Second,
createSlackMsg)
3*time.Second)

if err != nil {
s.Errorf("Failed to fetch notifications: %v", err.Error())
continue
}

sendSlackNotifications(s, msgs)
sendNotifications(s, msgs)
}
}

func createSlackMsg(msg []byte, reply string) slackMsg {
return slackMsg{
func createNotificationBufferMsg(msg []byte, reply string) notificationBufferMsg {
return notificationBufferMsg{
Msg: msg,
ReplySubject: reply,
}
}

func fetchMessages[T any](s *Server,
func fetchMessages(s *Server,
consumer,
streamName string,
mAmount int,
timeToWait time.Duration,
create func(msg []byte, reply string) T) ([]T, error) {
timeToWait time.Duration) ([]notificationBufferMsg, error) {

req := []byte(strconv.FormatUint(uint64(mAmount), 10))
resp := make(chan T)
resp := make(chan notificationBufferMsg)
replySubject := consumer + "_reply_" + s.memphis.nuid.Next()

timeout := time.NewTimer(timeToWait)
sub, err := s.subscribeOnAcc(s.MemphisGlobalAccount(), replySubject, replySubject+"_sid", func(_ *client, subject string, reply string, msg []byte) {
go func(subject, reply string, msg []byte) {
if reply != "" {
m := create(msg, reply)
m := createNotificationBufferMsg(msg, reply)
resp <- m
}
}(subject, reply, copyBytes(msg))
Expand All @@ -821,7 +846,7 @@ func fetchMessages[T any](s *Server,
subject := fmt.Sprintf(JSApiRequestNextT, streamName, consumer)
s.sendInternalAccountMsgWithReply(s.MemphisGlobalAccount(), subject, replySubject, nil, req, true)

msgs := make([]T, 0)
var msgs []notificationBufferMsg
stop := false
for {
if stop {
Expand Down
19 changes: 19 additions & 0 deletions server/integrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func InitializeIntegrations() error {
StorageFunctionsMap = make(map[string]interface{})
SourceCodeManagementFunctionsMap = make(map[string]map[string]interface{})
NotificationFunctionsMap["slack"] = sendMessageToSlackChannel
NotificationFunctionsMap["discord"] = sendMessagesToDiscordChannel
StorageFunctionsMap["s3"] = serv.uploadToS3Storage
SourceCodeManagementFunctionsMap["github"] = make(map[string]interface{})
SourceCodeManagementFunctionsMap["github"]["get_all_repos"] = serv.getGithubRepositories
Expand Down Expand Up @@ -67,6 +68,12 @@ func InitializeConnections() error {
return err
}
integration.Keys["auth_token"] = decryptedValue
} else if value, ok := integration.Keys["webhook_url"]; ok {
decryptedValue, err := DecryptAES(key, value.(string))
if err != nil {
return err
}
integration.Keys["webhook_url"] = decryptedValue
}
CacheDetails(integration.Name, integration.Keys, integration.Properties, integration.TenantName)
}
Expand All @@ -77,6 +84,8 @@ func CacheDetails(integrationType string, keys map[string]interface{}, propertie
switch integrationType {
case "slack":
cacheDetailsSlack(keys, properties, tenantName)
case "discord":
cacheDetailsDiscord(keys, properties, tenantName)
case "s3":
cacheDetailsS3(keys, properties, tenantName)
case "github":
Expand All @@ -99,6 +108,11 @@ func EncryptOldUnencryptedValues() error {
if err != nil {
return err
}

err = encryptUnencryptedKeysByIntegrationType("discord", "webhook_url", tenant.Name)
if err != nil {
return err
}
}

err = encryptUnencryptedAppUsersPasswords()
Expand Down Expand Up @@ -126,6 +140,11 @@ func encryptUnencryptedKeysByIntegrationType(integrationType, keyTitle string, t
if err != nil {
needToEncrypt = true
}
} else if value, ok := integration.Keys["webhook_url"]; ok {
_, err := DecryptAES(key, value.(string))
if err != nil {
needToEncrypt = true
}
}
if needToEncrypt {
encryptedValue, err := EncryptAES([]byte(integration.Keys[keyTitle].(string)))
Expand Down
7 changes: 6 additions & 1 deletion server/memphis_handlers_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,13 @@ func updateNewClientWithConfig(c *client, connId string) {
c.Errorf("updateNewClientWithConfig: %v", err.Error())
}

discordEnabled, err := IsDiscordEnabled(c.acc.GetName())
if err != nil {
c.Errorf("updateNewClientWithConfig: %v", err.Error())
}

config := models.GlobalConfigurationsUpdate{
Notifications: slackEnabled,
Notifications: slackEnabled || discordEnabled,
}

sendConnectUpdate(c, config, connId)
Expand Down
60 changes: 55 additions & 5 deletions server/memphis_handlers_integrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ const INTEGRATIONS_AUDIT_LOGS_CONSUMER = "$memphis_integrations_audit_logs_consu
type IntegrationsHandler struct{ S *Server }

var integrationsAuditLogLabelToSubjectMap = map[string]string{
"slack": integrationsAuditLogsStream + ".%s.slack",
"s3": integrationsAuditLogsStream + ".%s.s3",
"github": integrationsAuditLogsStream + ".%s.github",
"slack": integrationsAuditLogsStream + ".%s.slack",
"discord": integrationsAuditLogsStream + ".%s.discord",
"s3": integrationsAuditLogsStream + ".%s.s3",
"github": integrationsAuditLogsStream + ".%s.github",
}

func (it IntegrationsHandler) CreateIntegration(c *gin.Context) {
Expand Down Expand Up @@ -88,6 +89,27 @@ func (it IntegrationsHandler) CreateIntegration(c *gin.Context) {
return
}
integration = slackIntegration
case "discord":
if !ValidataAccessToFeature(user.TenantName, "feature-integration-discord") {
serv.Warnf("[tenant: %v][user: %v]CreateIntegration at ValidataAccessToFeature: %v", user.TenantName, user.Username, "feature-notifications")
c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "This feature is not available on your current pricing plan, in order to enjoy it you will have to upgrade your plan"})
return
}
_, _, discordIntegration, errorCode, err := it.handleCreateDiscordIntegration(user.TenantName, body)
if err != nil {
if errorCode == 500 {
serv.Errorf("[tenant: %v][user: %v]CreateIntegration at handleCreateDiscordIntegration: %v", user.TenantName, user.Username, err.Error())
message = "Server error"
} else {
message = err.Error()
serv.Warnf("[tenant: %v][user: %v]CreateIntegration at handleCreateDiscordIntegration: %v", user.TenantName, user.Username, message)
auditLog := fmt.Sprintf("Error while trying to connect with Discord: %v", message)
it.Errorf(integrationType, user.TenantName, auditLog)
}
c.AbortWithStatusJSON(errorCode, gin.H{"message": message})
return
}
integration = discordIntegration
case "s3":
if !ValidataAccessToFeature(user.TenantName, "feature-storage-tiering") {
serv.Warnf("[tenant: %v][user: %v]CreateIntegration at ValidataAccessToFeature: %v", user.TenantName, user.Username, "feature-storage-tiering")
Expand Down Expand Up @@ -185,6 +207,22 @@ func (it IntegrationsHandler) UpdateIntegration(c *gin.Context) {
return
}
integration = slackIntegration
case "discord":
discordIntegration, errorCode, err := it.handleUpdateDiscordIntegration(user.TenantName, "discord", body)
if err != nil {
if errorCode == 500 {
serv.Errorf("[tenant:%v][user: %v]UpdateIntegration at handleUpdateDiscordIntegration: %v", user.TenantName, user.Username, err.Error())
message = "Server error"
} else {
message = err.Error()
serv.Warnf("[tenant:%v][user: %v]UpdateIntegration at handleUpdateDiscordIntegration: %v", user.TenantName, user.Username, message)
auditLog := fmt.Sprintf("Error while trying to connect with Discord: %v", message)
it.Errorf(integrationType, user.TenantName, auditLog)
}
c.AbortWithStatusJSON(errorCode, gin.H{"message": message})
return
}
integration = discordIntegration
case "s3":
s3Integration, errorCode, err := it.handleUpdateS3Integration(user.TenantName, body)
if err != nil {
Expand Down Expand Up @@ -375,7 +413,7 @@ func (it IntegrationsHandler) DisconnectIntegration(c *gin.Context) {
}

switch integrationType {
case "slack":
case "slack", "discord":
update := models.SdkClientsUpdates{
Type: sendNotificationType,
Update: false,
Expand All @@ -396,7 +434,7 @@ func (it IntegrationsHandler) DisconnectIntegration(c *gin.Context) {
c.IndentedJSON(200, gin.H{})
}

func createIntegrationsKeysAndProperties(integrationType, authToken string, channelID string, pmAlert bool, svfAlert bool, disconnectAlert bool, accessKey, secretKey, bucketName, region, url, forceS3PathStyle string, githubIntegrationDetails map[string]interface{}, repo, branch, repoType, repoOwner string) (map[string]interface{}, map[string]bool) {
func createIntegrationsKeysAndProperties(integrationType, authToken string, channelID string, webhookUrl string, pmAlert bool, svfAlert bool, disconnectAlert bool, accessKey, secretKey, bucketName, region, url, forceS3PathStyle string, githubIntegrationDetails map[string]interface{}, repo, branch, repoType, repoOwner string) (map[string]interface{}, map[string]bool) {
keys := make(map[string]interface{})
properties := make(map[string]bool)
switch integrationType {
Expand All @@ -406,6 +444,11 @@ func createIntegrationsKeysAndProperties(integrationType, authToken string, chan
properties[PoisonMAlert] = pmAlert
properties[SchemaVAlert] = svfAlert
properties[DisconEAlert] = disconnectAlert
case "discord":
keys["webhook_url"] = webhookUrl
properties[PoisonMAlert] = pmAlert
properties[SchemaVAlert] = svfAlert
properties[DisconEAlert] = disconnectAlert
case "s3":
keys["access_key"] = accessKey
keys["secret_key"] = secretKey
Expand Down Expand Up @@ -464,6 +507,10 @@ func (it IntegrationsHandler) GetIntegrationDetails(c *gin.Context) {
integration.Keys["auth_token"] = "xoxb-****"
}

if integration.Name == "discord" && integration.Keys["webhook_url"] != _EMPTY_ {
integration.Keys["webhook_url"] = hideDiscordWebhookUrl(integration.Keys["webhook_url"].(string))
}

if integration.Name == "s3" && integration.Keys["secret_key"] != _EMPTY_ {
integration.Keys["secret_key"] = hideIntegrationSecretKey(integration.Keys["secret_key"].(string))
}
Expand Down Expand Up @@ -523,6 +570,9 @@ func (it IntegrationsHandler) GetAllIntegrations(c *gin.Context) {
if integrations[i].Name == "slack" && integrations[i].Keys["auth_token"] != _EMPTY_ {
integrations[i].Keys["auth_token"] = "xoxb-****"
}
if integrations[i].Name == "discord" && integrations[i].Keys["webhook_url"] != _EMPTY_ {
integrations[i].Keys["webhook_url"] = hideDiscordWebhookUrl(integrations[i].Keys["webhook_url"].(string))
}
if integrations[i].Name == "s3" && integrations[i].Keys["secret_key"] != _EMPTY_ {
integrations[i].Keys["secret_key"] = hideIntegrationSecretKey(integrations[i].Keys["secret_key"].(string))
}
Expand Down
Loading