From 6861b8ff1a00dc8143f230157c1c9b1190772bf7 Mon Sep 17 00:00:00 2001 From: Alibi Shalgymbay Date: Tue, 12 Dec 2023 08:17:58 +0600 Subject: [PATCH 1/6] Add main functionality --- models/integrations.go | 6 + server/background_tasks.go | 27 ++ server/integrations.go | 19 + server/memphis_handlers_connections.go | 7 +- server/memphis_handlers_integrations.go | 60 +++- server/notifications.go | 17 + server/notifications_discord.go | 332 +++++++++++++++++ server/notifications_slack.go | 4 +- server/storage_s3.go | 4 +- .../assets/images/discordIntegrationIcon.svg | 9 + ui_src/src/components/upgradePlans/index.js | 11 + ui_src/src/const/integrationList.js | 35 ++ .../components/discordIntegration/index.js | 333 ++++++++++++++++++ .../components/discordIntegration/style.scss | 0 .../components/integrationItem/index.js | 11 + .../src/domain/overview/integrations/index.js | 13 + 16 files changed, 878 insertions(+), 10 deletions(-) create mode 100644 server/notifications_discord.go create mode 100644 ui_src/src/assets/images/discordIntegrationIcon.svg create mode 100644 ui_src/src/domain/administration/integrations/components/discordIntegration/index.js create mode 100644 ui_src/src/domain/administration/integrations/components/discordIntegration/style.scss diff --git a/models/integrations.go b/models/integrations.go index bf61d0dce..41f8402fd 100644 --- a/models/integrations.go +++ b/models/integrations.go @@ -33,6 +33,12 @@ type SlackIntegration struct { Client *slack.Client `json:"client"` } +type DiscordIntegration struct { + Name string `json:"name"` + Keys map[string]string `json:"keys"` + Properties map[string]bool `json:"properties"` +} + type CreateIntegrationSchema struct { Name string `json:"name"` Keys map[string]interface{} `json:"keys"` diff --git a/server/background_tasks.go b/server/background_tasks.go index f339dbeeb..f13033c71 100644 --- a/server/background_tasks.go +++ b/server/background_tasks.go @@ -129,6 +129,11 @@ func (s *Server) ListenForIntegrationsUpdateEvents() error { EditClusterCompHost("ui_host", integrationUpdate.UIUrl) } CacheDetails("slack", integrationUpdate.Keys, integrationUpdate.Properties, integrationUpdate.TenantName) + case "discord": + if s.opts.UiHost == "" { + EditClusterCompHost("ui_host", integrationUpdate.UIUrl) + } + CacheDetails("discord", integrationUpdate.Keys, integrationUpdate.Properties, integrationUpdate.TenantName) case "s3": CacheDetails("s3", integrationUpdate.Keys, integrationUpdate.Properties, integrationUpdate.TenantName) case "github": @@ -714,6 +719,28 @@ func (s *Server) CheckBrokenConnectedIntegrations() error { serv.Errorf("[tenant: %s]CheckBrokenConnectedIntegrations at UpdateIsValidIntegration: %v", integration.TenantName, err.Error()) } } + case "discord": + key := getAESKey() + if _, ok := integration.Keys["webhook_url"].(string); !ok { + integration.Keys["webhook_url"] = "" + } + webhookUrl, err := DecryptAES(key, integration.Keys["webhook_url"].(string)) + if err != nil { + serv.Errorf("[tenant: %s]CheckBrokenConnectedIntegrations at DecryptAES: %v", integration.TenantName, err.Error()) + } + err = testDiscordIntegration(webhookUrl) + if err != nil { + serv.Warnf("[tenant: %s]CheckBrokenConnectedIntegrations at testDiscordIntegration: %v", integration.TenantName, err.Error()) + err = db.UpdateIsValidIntegration(integration.TenantName, integration.Name, false) + if err != nil { + serv.Errorf("[tenant: %s]CheckBrokenConnectedIntegrations at UpdateIsValidIntegration: %v", integration.TenantName, err.Error()) + } + } else { + err = db.UpdateIsValidIntegration(integration.TenantName, integration.Name, true) + if err != nil { + serv.Errorf("[tenant: %s]CheckBrokenConnectedIntegrations at UpdateIsValidIntegration: %v", integration.TenantName, err.Error()) + } + } case "s3": key := getAESKey() if _, ok := integration.Keys["access_key"].(string); !ok { diff --git a/server/integrations.go b/server/integrations.go index aa43e3b86..f5784f999 100644 --- a/server/integrations.go +++ b/server/integrations.go @@ -33,6 +33,7 @@ func InitializeIntegrations() error { StorageFunctionsMap = make(map[string]interface{}) SourceCodeManagementFunctionsMap = make(map[string]map[string]interface{}) NotificationFunctionsMap["slack"] = sendMessageToSlackChannel + NotificationFunctionsMap["discord"] = sendMessageToDiscordChannel StorageFunctionsMap["s3"] = serv.uploadToS3Storage SourceCodeManagementFunctionsMap["github"] = make(map[string]interface{}) SourceCodeManagementFunctionsMap["github"]["get_all_repos"] = serv.getGithubRepositories @@ -67,6 +68,12 @@ func InitializeConnections() error { return err } integration.Keys["auth_token"] = decryptedValue + } else if value, ok := integration.Keys["webhook_url"]; ok { + decryptedValue, err := DecryptAES(key, value.(string)) + if err != nil { + return err + } + integration.Keys["webhook_url"] = decryptedValue } CacheDetails(integration.Name, integration.Keys, integration.Properties, integration.TenantName) } @@ -77,6 +84,8 @@ func CacheDetails(integrationType string, keys map[string]interface{}, propertie switch integrationType { case "slack": cacheDetailsSlack(keys, properties, tenantName) + case "discord": + cacheDetailsDiscord(keys, properties, tenantName) case "s3": cacheDetailsS3(keys, properties, tenantName) case "github": @@ -99,6 +108,11 @@ func EncryptOldUnencryptedValues() error { if err != nil { return err } + + err = encryptUnencryptedKeysByIntegrationType("discord", "webhook_url", tenant.Name) + if err != nil { + return err + } } err = encryptUnencryptedAppUsersPasswords() @@ -126,6 +140,11 @@ func encryptUnencryptedKeysByIntegrationType(integrationType, keyTitle string, t if err != nil { needToEncrypt = true } + } else if value, ok := integration.Keys["webhook_url"]; ok { + _, err := DecryptAES(key, value.(string)) + if err != nil { + needToEncrypt = true + } } if needToEncrypt { encryptedValue, err := EncryptAES([]byte(integration.Keys[keyTitle].(string))) diff --git a/server/memphis_handlers_connections.go b/server/memphis_handlers_connections.go index 5b5421f28..58948cf89 100644 --- a/server/memphis_handlers_connections.go +++ b/server/memphis_handlers_connections.go @@ -41,8 +41,13 @@ func updateNewClientWithConfig(c *client, connId string) { c.Errorf("updateNewClientWithConfig: %v", err.Error()) } + discordEnabled, err := IsDiscordEnabled(c.acc.GetName()) + if err != nil { + c.Errorf("updateNewClientWithConfig: %v", err.Error()) + } + config := models.GlobalConfigurationsUpdate{ - Notifications: slackEnabled, + Notifications: slackEnabled || discordEnabled, } sendConnectUpdate(c, config, connId) diff --git a/server/memphis_handlers_integrations.go b/server/memphis_handlers_integrations.go index bce8d439d..f860c12af 100644 --- a/server/memphis_handlers_integrations.go +++ b/server/memphis_handlers_integrations.go @@ -33,9 +33,10 @@ const INTEGRATIONS_AUDIT_LOGS_CONSUMER = "$memphis_integrations_audit_logs_consu type IntegrationsHandler struct{ S *Server } var integrationsAuditLogLabelToSubjectMap = map[string]string{ - "slack": integrationsAuditLogsStream + ".%s.slack", - "s3": integrationsAuditLogsStream + ".%s.s3", - "github": integrationsAuditLogsStream + ".%s.github", + "slack": integrationsAuditLogsStream + ".%s.slack", + "discord": integrationsAuditLogsStream + ".%s.discord", + "s3": integrationsAuditLogsStream + ".%s.s3", + "github": integrationsAuditLogsStream + ".%s.github", } func (it IntegrationsHandler) CreateIntegration(c *gin.Context) { @@ -88,6 +89,27 @@ func (it IntegrationsHandler) CreateIntegration(c *gin.Context) { return } integration = slackIntegration + case "discord": + if !ValidataAccessToFeature(user.TenantName, "feature-integration-discord") { + serv.Warnf("[tenant: %v][user: %v]CreateIntegration at ValidataAccessToFeature: %v", user.TenantName, user.Username, "feature-notifications") + c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "This feature is not available on your current pricing plan, in order to enjoy it you will have to upgrade your plan"}) + return + } + _, _, discordIntegration, errorCode, err := it.handleCreateDiscordIntegration(user.TenantName, body) + if err != nil { + if errorCode == 500 { + serv.Errorf("[tenant: %v][user: %v]CreateIntegration at handleCreateDiscordIntegration: %v", user.TenantName, user.Username, err.Error()) + message = "Server error" + } else { + message = err.Error() + serv.Warnf("[tenant: %v][user: %v]CreateIntegration at handleCreateDiscordIntegration: %v", user.TenantName, user.Username, message) + auditLog := fmt.Sprintf("Error while trying to connect with Discord: %v", message) + it.Errorf(integrationType, user.TenantName, auditLog) + } + c.AbortWithStatusJSON(errorCode, gin.H{"message": message}) + return + } + integration = discordIntegration case "s3": if !ValidataAccessToFeature(user.TenantName, "feature-storage-tiering") { serv.Warnf("[tenant: %v][user: %v]CreateIntegration at ValidataAccessToFeature: %v", user.TenantName, user.Username, "feature-storage-tiering") @@ -185,6 +207,22 @@ func (it IntegrationsHandler) UpdateIntegration(c *gin.Context) { return } integration = slackIntegration + case "discord": + discordIntegration, errorCode, err := it.handleUpdateDiscordIntegration(user.TenantName, "discord", body) + if err != nil { + if errorCode == 500 { + serv.Errorf("[tenant:%v][user: %v]UpdateIntegration at handleUpdateDiscordIntegration: %v", user.TenantName, user.Username, err.Error()) + message = "Server error" + } else { + message = err.Error() + serv.Warnf("[tenant:%v][user: %v]UpdateIntegration at handleUpdateDiscordIntegration: %v", user.TenantName, user.Username, message) + auditLog := fmt.Sprintf("Error while trying to connect with Discord: %v", message) + it.Errorf(integrationType, user.TenantName, auditLog) + } + c.AbortWithStatusJSON(errorCode, gin.H{"message": message}) + return + } + integration = discordIntegration case "s3": s3Integration, errorCode, err := it.handleUpdateS3Integration(user.TenantName, body) if err != nil { @@ -375,7 +413,7 @@ func (it IntegrationsHandler) DisconnectIntegration(c *gin.Context) { } switch integrationType { - case "slack": + case "slack", "discord": update := models.SdkClientsUpdates{ Type: sendNotificationType, Update: false, @@ -396,7 +434,7 @@ func (it IntegrationsHandler) DisconnectIntegration(c *gin.Context) { c.IndentedJSON(200, gin.H{}) } -func createIntegrationsKeysAndProperties(integrationType, authToken string, channelID string, pmAlert bool, svfAlert bool, disconnectAlert bool, accessKey, secretKey, bucketName, region, url, forceS3PathStyle string, githubIntegrationDetails map[string]interface{}, repo, branch, repoType, repoOwner string) (map[string]interface{}, map[string]bool) { +func createIntegrationsKeysAndProperties(integrationType, authToken string, channelID string, webhookUrl string, pmAlert bool, svfAlert bool, disconnectAlert bool, accessKey, secretKey, bucketName, region, url, forceS3PathStyle string, githubIntegrationDetails map[string]interface{}, repo, branch, repoType, repoOwner string) (map[string]interface{}, map[string]bool) { keys := make(map[string]interface{}) properties := make(map[string]bool) switch integrationType { @@ -406,6 +444,11 @@ func createIntegrationsKeysAndProperties(integrationType, authToken string, chan properties[PoisonMAlert] = pmAlert properties[SchemaVAlert] = svfAlert properties[DisconEAlert] = disconnectAlert + case "discord": + keys["webhook_url"] = webhookUrl + properties[PoisonMAlert] = pmAlert + properties[SchemaVAlert] = svfAlert + properties[DisconEAlert] = disconnectAlert case "s3": keys["access_key"] = accessKey keys["secret_key"] = secretKey @@ -464,6 +507,10 @@ func (it IntegrationsHandler) GetIntegrationDetails(c *gin.Context) { integration.Keys["auth_token"] = "xoxb-****" } + if integration.Name == "discord" && integration.Keys["webhook_url"] != "" { + integration.Keys["webhook_url"] = hideDiscordWebhookUrl(integration.Keys["webhook_url"].(string)) + } + if integration.Name == "s3" && integration.Keys["secret_key"] != "" { integration.Keys["secret_key"] = hideIntegrationSecretKey(integration.Keys["secret_key"].(string)) } @@ -523,6 +570,9 @@ func (it IntegrationsHandler) GetAllIntegrations(c *gin.Context) { if integrations[i].Name == "slack" && integrations[i].Keys["auth_token"] != "" { integrations[i].Keys["auth_token"] = "xoxb-****" } + if integrations[i].Name == "discord" && integrations[i].Keys["webhook_url"] != "" { + integrations[i].Keys["webhook_url"] = hideDiscordWebhookUrl(integrations[i].Keys["webhook_url"].(string)) + } if integrations[i].Name == "s3" && integrations[i].Keys["secret_key"] != "" { integrations[i].Keys["secret_key"] = hideIntegrationSecretKey(integrations[i].Keys["secret_key"].(string)) } diff --git a/server/notifications.go b/server/notifications.go index 58eefeadf..e2a4e0fe1 100644 --- a/server/notifications.go +++ b/server/notifications.go @@ -31,6 +31,17 @@ func SendNotification(tenantName string, title string, message string, msgType s } } } + case "discord": + if tenantInetgrations, ok := IntegrationsConcurrentCache.Load(tenantName); ok { + if discordIntegration, ok := tenantInetgrations["discord"].(models.DiscordIntegration); ok { + if discordIntegration.Properties[msgType] { + err := f.(func(models.DiscordIntegration, string, string) error)(discordIntegration, title, message) + if err != nil { + return err + } + } + } + } default: return errors.New("failed sending notification: unsupported integration") } @@ -46,6 +57,12 @@ func shouldSendNotification(tenantName string, alertType string) bool { return true } } + + if discordIntegration, ok := tenantInetgrations["discord"].(models.DiscordIntegration); ok { + if discordIntegration.Properties[alertType] { + return true + } + } } return false } diff --git a/server/notifications_discord.go b/server/notifications_discord.go new file mode 100644 index 000000000..82696eeec --- /dev/null +++ b/server/notifications_discord.go @@ -0,0 +1,332 @@ +// Copyright 2022-2023 The Memphis.dev Authors +// Licensed under the Memphis Business Source License 1.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// Changed License: [Apache License, Version 2.0 (https://www.apache.org/licenses/LICENSE-2.0), as published by the Apache Foundation. +// +// https://github.com/memphisdev/memphis/blob/master/LICENSE +// +// Additional Use Grant: You may make use of the Licensed Work (i) only as part of your own product or service, provided it is not a message broker or a message queue product or service; and (ii) provided that you do not use, provide, distribute, or make available the Licensed Work as a Service. +// A "Service" is a commercial offering, product, hosted, or managed service, that allows third parties (other than your own employees and contractors acting on your behalf) to access and/or use the Licensed Work or a substantial set of the features or functionality of the Licensed Work to third parties as a software-as-a-service, platform-as-a-service, infrastructure-as-a-service or other similar services that compete with Licensor products or services. +package server + +import ( + "bytes" + "encoding/json" + "errors" + "net/http" + "strings" + + "github.com/memphisdev/memphis/db" + "github.com/memphisdev/memphis/models" +) + +type Author struct { + Name string `json:"name"` +} + +type Embed struct { + Color string `json:"color"` + Author Author `json:"author"` + Title string `json:"title"` + Description string `json:"description"` +} + +type DiscordMessage struct { + Embeds []Embed `json:"embeds"` +} + +func IsDiscordEnabled(tenantName string) (bool, error) { + exist, _, err := db.GetIntegration("discord", tenantName) + if err != nil { + return false, err + } + + if !exist { + return false, nil + } + + return true, nil +} + +func sendMessageToDiscordChannel(integration models.DiscordIntegration, title string, message string) error { + payload := DiscordMessage{ + Embeds: []Embed{ + { + Color: "6641663", + Author: Author{ + Name: "Memphis", + }, + Title: title, + Description: message, + }, + }, + } + + payloadBytes, err := json.Marshal(payload) + if err != nil { + return err + } + + resp, err := http.Post( + integration.Keys["webhook_url"], + "application/json", + bytes.NewBuffer(payloadBytes), + ) + if err != nil { + return err + } + defer resp.Body.Close() + return nil +} + +func cacheDetailsDiscord(keys map[string]interface{}, properties map[string]bool, tenantName string) { + var webhookUrl string + var poisonMessageAlert, schemaValidationFailAlert, disconnectionEventsAlert bool + discordIntegration := models.DiscordIntegration{} + discordIntegration.Keys = make(map[string]string) + discordIntegration.Properties = make(map[string]bool) + if keys == nil { + deleteIntegrationFromTenant(tenantName, "discord", IntegrationsConcurrentCache) + return + } + if properties == nil { + poisonMessageAlert = false + schemaValidationFailAlert = false + disconnectionEventsAlert = false + } + webhookUrl, ok := keys["webhook_url"].(string) + if !ok { + deleteIntegrationFromTenant(tenantName, "discord", IntegrationsConcurrentCache) + return + } + poisonMessageAlert, ok = properties[PoisonMAlert] + if !ok { + poisonMessageAlert = false + } + schemaValidationFailAlert, ok = properties[SchemaVAlert] + if !ok { + schemaValidationFailAlert = false + } + disconnectionEventsAlert, ok = properties[DisconEAlert] + if !ok { + disconnectionEventsAlert = false + } + if discordIntegration.Keys["webhook_url"] != webhookUrl { + discordIntegration.Keys["webhook_url"] = webhookUrl + } + + discordIntegration.Properties[PoisonMAlert] = poisonMessageAlert + discordIntegration.Properties[SchemaVAlert] = schemaValidationFailAlert + discordIntegration.Properties[DisconEAlert] = disconnectionEventsAlert + discordIntegration.Name = "discord" + if _, ok := IntegrationsConcurrentCache.Load(tenantName); !ok { + IntegrationsConcurrentCache.Add(tenantName, map[string]interface{}{"discord": discordIntegration}) + } else { + err := addIntegrationToTenant(tenantName, "discord", IntegrationsConcurrentCache, discordIntegration) + if err != nil { + serv.Errorf("cacheDetailsDiscord: " + err.Error()) + return + } + } +} + +func (it IntegrationsHandler) getDiscordIntegrationDetails(body models.CreateIntegrationSchema) (map[string]interface{}, map[string]bool, int, error) { + var webhookUrl, uiUrl string + var pmAlert, svfAlert, disconnectAlert bool + webhookUrl, ok := body.Keys["webhook_url"].(string) + if !ok { + return map[string]interface{}{}, map[string]bool{}, SHOWABLE_ERROR_STATUS_CODE, errors.New("must provide webhook url for discord integration") + } + uiUrl = body.UIUrl + if uiUrl == "" { + return map[string]interface{}{}, map[string]bool{}, 500, errors.New("must provide UI url for discord integration") + } + + pmAlert, ok = body.Properties[PoisonMAlert] + if !ok { + pmAlert = false + } + svfAlert, ok = body.Properties[SchemaVAlert] + if !ok { + svfAlert = false + } + disconnectAlert, ok = body.Properties[DisconEAlert] + if !ok { + disconnectAlert = false + } + + keys, properties := createIntegrationsKeysAndProperties("discord", "", "", webhookUrl, pmAlert, svfAlert, disconnectAlert, "", "", "", "", "", "", map[string]interface{}{}, "", "", "", "") + return keys, properties, 0, nil +} + +func (it IntegrationsHandler) handleCreateDiscordIntegration(tenantName string, body models.CreateIntegrationSchema) (map[string]interface{}, map[string]bool, models.Integration, int, error) { + keys, properties, errorCode, err := it.getDiscordIntegrationDetails(body) + if err != nil { + return keys, properties, models.Integration{}, errorCode, err + } + if it.S.opts.UiHost == "" { + EditClusterCompHost("ui_host", body.UIUrl) + } + discordIntegration, err := createDiscordIntegration(tenantName, keys, properties, body.UIUrl) + if err != nil { + errMsg := strings.ToLower(err.Error()) + if strings.Contains(errMsg, "invalid webhook url") { + return map[string]interface{}{}, map[string]bool{}, models.Integration{}, SHOWABLE_ERROR_STATUS_CODE, err + } else { + return map[string]interface{}{}, map[string]bool{}, models.Integration{}, 500, err + } + } + return keys, properties, discordIntegration, 0, nil +} + +func (it IntegrationsHandler) handleUpdateDiscordIntegration(tenantName, integrationType string, body models.CreateIntegrationSchema) (models.Integration, int, error) { + keys, properties, errorCode, err := it.getDiscordIntegrationDetails(body) + if err != nil { + return models.Integration{}, errorCode, err + } + discordIntegration, err := updateDiscordIntegration(tenantName, keys["webhook_url"].(string), properties[PoisonMAlert], properties[SchemaVAlert], properties[DisconEAlert], body.UIUrl) + if err != nil { + errMsg := strings.ToLower(err.Error()) + if strings.Contains(errMsg, "invalid webhook url") { + return models.Integration{}, SHOWABLE_ERROR_STATUS_CODE, err + } else { + return models.Integration{}, 500, err + } + } + return discordIntegration, 0, nil +} + +func createDiscordIntegration(tenantName string, keys map[string]interface{}, properties map[string]bool, uiUrl string) (models.Integration, error) { + var discordIntegration models.Integration + exist, discordIntegration, err := db.GetIntegration("discord", tenantName) + if err != nil { + return discordIntegration, err + } else if !exist { + err := testDiscordIntegration(keys["webhook_url"].(string)) + if err != nil { + return discordIntegration, err + } + stringMapKeys := GetKeysAsStringMap(keys) + cloneKeys := copyMaps(stringMapKeys) + encryptedValue, err := EncryptAES([]byte(keys["webhook_url"].(string))) + if err != nil { + return models.Integration{}, err + } + cloneKeys["webhook_url"] = encryptedValue + interfaceMapKeys := copyStringMapToInterfaceMap(cloneKeys) + integrationRes, insertErr := db.InsertNewIntegration(tenantName, "discord", interfaceMapKeys, properties) + if insertErr != nil { + return discordIntegration, insertErr + } + discordIntegration = integrationRes + integrationToUpdate := models.CreateIntegration{ + Name: "discord", + Keys: keys, + Properties: properties, + UIUrl: uiUrl, + TenantName: tenantName, + IsValid: integrationRes.IsValid, + } + msg, err := json.Marshal(integrationToUpdate) + if err != nil { + return discordIntegration, err + } + err = serv.sendInternalAccountMsgWithReply(serv.MemphisGlobalAccount(), INTEGRATIONS_UPDATES_SUBJ, _EMPTY_, nil, msg, true) + if err != nil { + return discordIntegration, err + } + update := models.SdkClientsUpdates{ + Type: sendNotificationType, + Update: properties[SchemaVAlert], + } + serv.SendUpdateToClients(update) + discordIntegration.Keys["webhook_url"] = hideDiscordWebhookUrl(keys["webhook_url"].(string)) + return discordIntegration, nil + } + return discordIntegration, errors.New("discord integration already exists") +} + +func updateDiscordIntegration(tenantName string, webhookUrl string, pmAlert bool, svfAlert bool, disconnectAlert bool, uiUrl string) (models.Integration, error) { + var discordIntegration models.Integration + if webhookUrl == "" { + exist, integrationFromDb, err := db.GetIntegration("discord", tenantName) + if err != nil { + return models.Integration{}, err + } + if !exist { + return models.Integration{}, errors.New("no webhook url was provided") + } + key := getAESKey() + url, err := DecryptAES(key, integrationFromDb.Keys["webhook_url"].(string)) + if err != nil { + return models.Integration{}, err + } + webhookUrl = url + } + err := testDiscordIntegration(webhookUrl) + if err != nil { + return discordIntegration, err + } + keys, properties := createIntegrationsKeysAndProperties("discord", "", "", webhookUrl, pmAlert, svfAlert, disconnectAlert, "", "", "", "", "", "", map[string]interface{}{}, "", "", "", "") + stringMapKeys := GetKeysAsStringMap(keys) + cloneKeys := copyMaps(stringMapKeys) + encryptedValue, err := EncryptAES([]byte(webhookUrl)) + if err != nil { + return models.Integration{}, err + } + cloneKeys["webhook_url"] = encryptedValue + interfaceMapKeys := copyStringMapToInterfaceMap(cloneKeys) + discordIntegration, err = db.UpdateIntegration(tenantName, "discord", interfaceMapKeys, properties) + if err != nil { + return models.Integration{}, err + } + + integrationToUpdate := models.CreateIntegration{ + Name: "discord", + Keys: keys, + Properties: properties, + UIUrl: uiUrl, + TenantName: tenantName, + IsValid: discordIntegration.IsValid, + } + msg, err := json.Marshal(integrationToUpdate) + if err != nil { + return models.Integration{}, err + } + err = serv.sendInternalAccountMsgWithReply(serv.MemphisGlobalAccount(), INTEGRATIONS_UPDATES_SUBJ, _EMPTY_, nil, msg, true) + if err != nil { + return models.Integration{}, err + } + update := models.SdkClientsUpdates{ + Type: sendNotificationType, + Update: svfAlert, + } + serv.SendUpdateToClients(update) + keys["webhook_url"] = hideDiscordWebhookUrl(cloneKeys["webhook_url"]) + discordIntegration.Keys = keys + discordIntegration.Properties = properties + return discordIntegration, nil +} + +func testDiscordIntegration(webhookUrl string) error { + resp, err := http.Head(webhookUrl) + if err != nil { + return errors.New("invalid webhook url") + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return errors.New("invalid webhook url") + } + return nil +} + +func hideDiscordWebhookUrl(webhookUrl string) string { + if webhookUrl != "" { + webhookUrl = "https://discord.com/api/webhooks/****" + return webhookUrl + } + return webhookUrl +} diff --git a/server/notifications_slack.go b/server/notifications_slack.go index acf2a50f0..fcfdd28d3 100644 --- a/server/notifications_slack.go +++ b/server/notifications_slack.go @@ -144,7 +144,7 @@ func (it IntegrationsHandler) getSlackIntegrationDetails(body models.CreateInteg disconnectAlert = false } - keys, properties := createIntegrationsKeysAndProperties("slack", authToken, channelID, pmAlert, svfAlert, disconnectAlert, "", "", "", "", "", "", map[string]interface{}{}, "", "", "", "") + keys, properties := createIntegrationsKeysAndProperties("slack", authToken, channelID, "", pmAlert, svfAlert, disconnectAlert, "", "", "", "", "", "", map[string]interface{}{}, "", "", "", "") return keys, properties, 0, nil } @@ -256,7 +256,7 @@ func updateSlackIntegration(tenantName string, authToken string, channelID strin if err != nil { return slackIntegration, err } - keys, properties := createIntegrationsKeysAndProperties("slack", authToken, channelID, pmAlert, svfAlert, disconnectAlert, "", "", "", "", "", "", map[string]interface{}{}, "", "", "", "") + keys, properties := createIntegrationsKeysAndProperties("slack", authToken, channelID, "", pmAlert, svfAlert, disconnectAlert, "", "", "", "", "", "", map[string]interface{}{}, "", "", "", "") stringMapKeys := GetKeysAsStringMap(keys) cloneKeys := copyMaps(stringMapKeys) encryptedValue, err := EncryptAES([]byte(authToken)) diff --git a/server/storage_s3.go b/server/storage_s3.go index 30bcc2b6b..81542ac2e 100644 --- a/server/storage_s3.go +++ b/server/storage_s3.go @@ -81,7 +81,7 @@ func (it IntegrationsHandler) handleCreateS3Integration(tenantName string, keys return models.Integration{}, statusCode, err } - keysMap, properties := createIntegrationsKeysAndProperties("s3", "", "", false, false, false, keys["access_key"].(string), keys["secret_key"].(string), keys["bucket_name"].(string), keys["region"].(string), keys["url"].(string), keys["s3_path_style"].(string), map[string]interface{}{}, "", "", "", "") + keysMap, properties := createIntegrationsKeysAndProperties("s3", "", "", "", false, false, false, keys["access_key"].(string), keys["secret_key"].(string), keys["bucket_name"].(string), keys["region"].(string), keys["url"].(string), keys["s3_path_style"].(string), map[string]interface{}{}, "", "", "", "") s3Integration, err := createS3Integration(tenantName, keysMap, properties) if err != nil { if strings.Contains(err.Error(), "already exists") { @@ -99,7 +99,7 @@ func (it IntegrationsHandler) handleUpdateS3Integration(tenantName string, body return models.Integration{}, statusCode, err } integrationType := strings.ToLower(body.Name) - keysMap, properties := createIntegrationsKeysAndProperties(integrationType, "", "", false, false, false, keys["access_key"].(string), keys["secret_key"].(string), keys["bucket_name"].(string), keys["region"].(string), keys["url"].(string), keys["s3_path_style"].(string), map[string]interface{}{}, "", "", "", "") + keysMap, properties := createIntegrationsKeysAndProperties(integrationType, "", "", "", false, false, false, keys["access_key"].(string), keys["secret_key"].(string), keys["bucket_name"].(string), keys["region"].(string), keys["url"].(string), keys["s3_path_style"].(string), map[string]interface{}{}, "", "", "", "") s3Integration, err := updateS3Integration(tenantName, keysMap, properties) if err != nil { return s3Integration, 500, err diff --git a/ui_src/src/assets/images/discordIntegrationIcon.svg b/ui_src/src/assets/images/discordIntegrationIcon.svg new file mode 100644 index 000000000..ae68e661c --- /dev/null +++ b/ui_src/src/assets/images/discordIntegrationIcon.svg @@ -0,0 +1,9 @@ + + + + + + + + + diff --git a/ui_src/src/components/upgradePlans/index.js b/ui_src/src/components/upgradePlans/index.js index a5e8c39b9..b2e3e6015 100644 --- a/ui_src/src/components/upgradePlans/index.js +++ b/ui_src/src/components/upgradePlans/index.js @@ -204,6 +204,17 @@ const UpgradePlans = ({ open, onClose, content, isExternal = true }) => { )} + {downgradeInstructions['feature-integration-discord'] && ( +
+
+

Using Discord integration is violating the new plan

+
+ + Please fix the following issues before performing a downgrade +
+
+
+ )} {downgradeInstructions['feature-management-users'] && (

Too many management users ({downgradeInstructions['feature-management-users']['usage']})

diff --git a/ui_src/src/const/integrationList.js b/ui_src/src/const/integrationList.js index 946c94939..ad3575032 100644 --- a/ui_src/src/const/integrationList.js +++ b/ui_src/src/const/integrationList.js @@ -38,6 +38,7 @@ import { ReactComponent as DebeziumIcon } from '../assets/images/debeziumIcon.sv import { ReactComponent as ElasticIcon } from '../assets/images/elasticIcon.svg'; import { ReactComponent as ZapierIcon } from '../assets/images/zapierIcon.svg'; import { ReactComponent as SlackLogo } from '../assets/images/slackLogo.svg'; +import { ReactComponent as DiscordLogo } from '../assets/images/discordIntegrationIcon.svg'; import { ReactComponent as MemphisVerifiedIcon } from '../assets/images/memphisFunctionIcon.svg'; import s3Banner from '../assets/images/s3Banner.webp'; import { ReactComponent as S3Logo } from '../assets/images/s3Logo.svg'; @@ -275,6 +276,40 @@ export const INTEGRATION_LIST = {
) }, + Discord: { + name: 'Discord', + by: 'Memphis.dev', + banner: discordBanner, + insideBanner: discordBannerPopup, + icon: , + description: 'Receive alerts and notifications directly to your chosen discord channel for faster response and better real-time observability', + category: CATEGORY_LIST['Notifications'], + hasLogs: true, + comingSoon: false, + header: ( +
+ +
+

Discord

+ + + + + + + +
+
+ ), + integrateDesc: ( +
+

Description

+ + Receive alerts and notifications directly to your chosen discord channel for faster response and better real-time observability + +
+ ) + }, S3: { name: 'S3', by: 'Memphis.dev', diff --git a/ui_src/src/domain/administration/integrations/components/discordIntegration/index.js b/ui_src/src/domain/administration/integrations/components/discordIntegration/index.js new file mode 100644 index 000000000..d21264d51 --- /dev/null +++ b/ui_src/src/domain/administration/integrations/components/discordIntegration/index.js @@ -0,0 +1,333 @@ +// Copyright 2022-2023 The Memphis.dev Authors +// Licensed under the Memphis Business Source License 1.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// Changed License: [Apache License, Version 2.0 (https://www.apache.org/licenses/LICENSE-2.0), as published by the Apache Foundation. +// +// https://github.com/memphisdev/memphis/blob/master/LICENSE +// +// Additional Use Grant: You may make use of the Licensed Work (i) only as part of your own product or service, provided it is not a message broker or a message queue product or service; and (ii) provided that you do not use, provide, distribute, or make available the Licensed Work as a Service. +// A "Service" is a commercial offering, product, hosted, or managed service, that allows third parties (other than your own employees and contractors acting on your behalf) to access and/or use the Licensed Work or a substantial set of the features or functionality of the Licensed Work to third parties as a software-as-a-service, platform-as-a-service, infrastructure-as-a-service or other similar services that compete with Licensor products or services. + +import './style.scss'; + +import React, { useState, useContext, useEffect } from 'react'; +import { Form } from 'antd'; + +import { ReactComponent as PoisionAlertIcon } from '../../../../../assets/images/poisionAlertIcon.svg'; +import { ReactComponent as DisconAlertIcon } from '../../../../../assets/images/disconAlertIcon.svg'; +import { ReactComponent as SchemaAlertIcon } from '../../../../../assets/images/schemaAlertIcon.svg'; +import { ReactComponent as PurpleQuestionMark } from '../../../../../assets/images/purpleQuestionMark.svg'; +import { INTEGRATION_LIST, getTabList } from '../../../../../const/integrationList'; +import { ApiEndpoints } from '../../../../../const/apiEndpoints'; +import { httpRequest } from '../../../../../services/http'; +import Switcher from '../../../../../components/switcher'; +import Button from '../../../../../components/button'; +import { Context } from '../../../../../hooks/store'; +import Input from '../../../../../components/Input'; +import CustomTabs from '../../../../../components/Tabs'; +import { URL } from '../../../../../config'; +import Loader from '../../../../../components/loader'; +import CloudMoadl from '../../../../../components/cloudModal'; +import { showMessages } from '../../../../../services/genericServices'; +import { isCloud } from '../../../../../services/valueConvertor'; +import IntegrationDetails from '../integrationItem/integrationDetails'; +import IntegrationLogs from '../integrationItem/integrationLogs'; +import { FaArrowCircleUp } from 'react-icons/fa'; + +const urlSplit = URL.split('/', 3); + +const DiscordIntegration = ({ close, value }) => { + const isValue = value && Object.keys(value)?.length !== 0; + const discordConfiguration = INTEGRATION_LIST['Discord']; + const [creationForm] = Form.useForm(); + const [state, dispatch] = useContext(Context); + const [formFields, setFormFields] = useState({ + name: 'discord', + ui_url: `${urlSplit[0]}//${urlSplit[2]}`, + keys: { + webhook_url: value?.keys?.webhook_url || '', + }, + properties: { + poison_message_alert: value?.properties ? (value?.properties?.poison_message_alert ? true : false) : true, + schema_validation_fail_alert: value?.properties ? (value?.properties?.schema_validation_fail_alert ? true : false) : true, + disconnection_events_alert: value?.properties ? (value?.properties?.disconnection_events_alert ? true : false) : true + } + }); + const [loadingSubmit, setLoadingSubmit] = useState(false); + const [loadingDisconnect, setLoadingDisconnect] = useState(false); + const [imagesLoaded, setImagesLoaded] = useState(false); + const [tabValue, setTabValue] = useState('Configuration'); + const [cloudModalOpen, setCloudModalOpen] = useState(false); + const tabs = getTabList('Discord'); + + useEffect(() => { + const images = []; + images.push(INTEGRATION_LIST['Discord'].banner.props.src); + images.push(INTEGRATION_LIST['Discord'].insideBanner.props.src); + const promises = []; + + images.forEach((imageUrl) => { + const image = new Image(); + promises.push( + new Promise((resolve) => { + image.onload = resolve; + }) + ); + image.src = imageUrl; + }); + + Promise.all(promises).then(() => { + setImagesLoaded(true); + }); + }, []); + + const updateKeysState = (field, value) => { + let updatedValue = { ...formFields.keys }; + updatedValue[field] = value; + setFormFields((formFields) => ({ ...formFields, keys: updatedValue })); + }; + const updatePropertiesState = (field, value) => { + let updatedValue = { ...formFields.properties }; + updatedValue[field] = value; + setFormFields((formFields) => ({ ...formFields, properties: updatedValue })); + }; + + const handleSubmit = async () => { + const values = await creationForm.validateFields(); + if (values?.errorFields) { + return; + } else { + setLoadingSubmit(true); + if (isValue) { + if (values.webhook_url === 'https://discord.com/api/webhooks/****') { + updateIntegration(false); + } else { + updateIntegration(); + } + } else { + createIntegration(); + } + } + }; + + const closeModal = (data, disconnect = false) => { + setTimeout(() => { + disconnect ? setLoadingDisconnect(false) : setLoadingSubmit(false); + }, 1000); + close(data); + showMessages('success', disconnect ? 'The integration was successfully disconnected' : 'The integration connected successfully'); + }; + + const updateIntegration = async (withWebhook = true) => { + let newFormFields = { ...formFields }; + if (!withWebhook) { + let updatedKeys = { ...formFields.keys }; + updatedKeys['webhook_url'] = ''; + newFormFields = { ...newFormFields, keys: updatedKeys }; + } + try { + const data = await httpRequest('POST', ApiEndpoints.UPDATE_INTEGRATION, { ...newFormFields }); + dispatch({ type: 'UPDATE_INTEGRATION', payload: data }); + closeModal(data); + } catch (err) { + setLoadingSubmit(false); + } + }; + + const createIntegration = async () => { + try { + const data = await httpRequest('POST', ApiEndpoints.CREATE_INTEGRATION, { ...formFields }); + dispatch({ type: 'ADD_INTEGRATION', payload: data }); + closeModal(data); + } catch (err) { + setLoadingSubmit(false); + } + }; + const disconnect = async () => { + setLoadingDisconnect(true); + try { + await httpRequest('DELETE', ApiEndpoints.DISCONNECT_INTEGRATION, { + name: formFields.name + }); + dispatch({ type: 'REMOVE_INTEGRATION', payload: formFields.name }); + + closeModal({}, true); + } catch (err) { + setLoadingDisconnect(false); + } + }; + + return ( + + {!imagesLoaded && ( +
+ +
+ )} + {imagesLoaded && ( + <> + {discordConfiguration?.insideBanner} +
+ {discordConfiguration.header} +
+ {isValue && ( +
+
+ setTabValue(tabValue)} tabs={tabs} /> + +
+ {tabValue === 'Details' && } + {tabValue === 'Logs' && } + {tabValue === 'Configuration' && ( +
+ + +
+

API details

+
+

Webhook URL

+ Copy and paste your discord 'Webhook URL' here + + updateKeysState('webhook_url', e.target.value)} + onChange={(e) => updateKeysState('webhook_url', e.target.value)} + value={formFields?.keys?.webhook_url} + /> + +
+ +
+

Notify me when:

+ Memphis will send only the selected triggers + <> +
+
+ +
+

New unacked message

+ + Messages that cause a consumer group to repeatedly require a delivery (possibly due to a consumer failure) + such that the message is never processed completely and acknowledged + +
+
+ + updatePropertiesState('poison_message_alert', !formFields.properties.poison_message_alert)} + checked={formFields.properties?.poison_message_alert} + /> + +
+
+
+ +
+

Schema validation failure

+ Triggered once a client fails in schema validation +
+
+ + + updatePropertiesState('schema_validation_fail_alert', !formFields.properties.schema_validation_fail_alert) + } + checked={formFields.properties?.schema_validation_fail_alert} + /> + +
+
+
+ +
+

Disconnected clients

+ Triggered once a producer/consumer get disconnected +
+
+ + + updatePropertiesState('disconnection_events_alert', !formFields.properties.disconnection_events_alert) + } + checked={formFields.properties?.disconnection_events_alert} + /> + +
+ +
+
+
+ )} + +
+
+
+ + setCloudModalOpen(false)} /> + + )} +
+ ); +}; + +export default DiscordIntegration; diff --git a/ui_src/src/domain/administration/integrations/components/discordIntegration/style.scss b/ui_src/src/domain/administration/integrations/components/discordIntegration/style.scss new file mode 100644 index 000000000..e69de29bb diff --git a/ui_src/src/domain/administration/integrations/components/integrationItem/index.js b/ui_src/src/domain/administration/integrations/components/integrationItem/index.js index df05b3523..4097a511a 100644 --- a/ui_src/src/domain/administration/integrations/components/integrationItem/index.js +++ b/ui_src/src/domain/administration/integrations/components/integrationItem/index.js @@ -20,6 +20,7 @@ import { ReactComponent as MemphisVerifiedIcon } from '../../../../../assets/ima import { capitalizeFirst } from '../../../../../services/valueConvertor'; import { Context } from '../../../../../hooks/store'; import SlackIntegration from '../slackIntegration'; +import DiscordIntegration from '../discordIntegration'; import S3Integration from '../s3Integration'; import Tag from '../../../../../components/tag'; import DataDogIntegration from '../dataDogIntegration'; @@ -66,6 +67,16 @@ const IntegrationItem = ({ value, lockFeature, isOpen }) => { value={ref.current} /> ); + case 'Discord': + return ( + { + modalFlip(false); + setIntegrateValue(data); + }} + value={ref.current} + /> + ); case 'Github': return ( { const [modalIsOpen, modalFlip] = useState(false); const [integrations, setIntegrations] = useState([ { name: 'Slack', logo: slackLogo, value: {} }, + { name: 'Discord', logo: discordIntegrationIcon, value: {} }, { name: 'S3', logo: s3Logo, value: {} }, { name: 'Debezium', logo: debeziumIcon, value: {} } ]); @@ -94,6 +97,16 @@ const Integrations = () => { value={integrations[0]?.value} /> ); + case 'Discord': + return ( + { + modalFlip(false); + updateIntegrationValue(value, 0); + }} + value={integrations[0]?.value} + /> + ); case 'S3': return ( Date: Tue, 12 Dec 2023 08:23:21 +0600 Subject: [PATCH 2/6] format --- models/integrations.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/models/integrations.go b/models/integrations.go index 41f8402fd..7fa7b7670 100644 --- a/models/integrations.go +++ b/models/integrations.go @@ -34,9 +34,9 @@ type SlackIntegration struct { } type DiscordIntegration struct { - Name string `json:"name"` - Keys map[string]string `json:"keys"` - Properties map[string]bool `json:"properties"` + Name string `json:"name"` + Keys map[string]string `json:"keys"` + Properties map[string]bool `json:"properties"` } type CreateIntegrationSchema struct { From 0a7ccbd068935bde2289bd0bdb9342423d4b4aed Mon Sep 17 00:00:00 2001 From: Alibi Shalgymbay Date: Tue, 9 Jan 2024 23:13:34 +0600 Subject: [PATCH 3/6] update: merge with new changes --- ui_src/src/const/integrationList.js | 2 +- .../components/discordIntegration/index.js | 34 +++++++++---------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/ui_src/src/const/integrationList.js b/ui_src/src/const/integrationList.js index 264ac82fa..4ef6a9bec 100644 --- a/ui_src/src/const/integrationList.js +++ b/ui_src/src/const/integrationList.js @@ -38,7 +38,7 @@ import { ReactComponent as DebeziumIcon } from 'assets/images/debeziumIcon.svg'; import { ReactComponent as ElasticIcon } from 'assets/images/elasticIcon.svg'; import { ReactComponent as ZapierIcon } from 'assets/images/zapierIcon.svg'; import { ReactComponent as SlackLogo } from 'assets/images/slackLogo.svg'; -import { ReactComponent as DiscordLogo } from '/assets/images/discordIntegrationIcon.svg'; +import { ReactComponent as DiscordLogo } from 'assets/images/discordIntegrationIcon.svg'; import { ReactComponent as MemphisVerifiedIcon } from 'assets/images/memphisFunctionIcon.svg'; import s3Banner from 'assets/images/s3Banner.webp'; import { ReactComponent as S3Logo } from 'assets/images/s3Logo.svg'; diff --git a/ui_src/src/domain/administration/integrations/components/discordIntegration/index.js b/ui_src/src/domain/administration/integrations/components/discordIntegration/index.js index d21264d51..3839d058a 100644 --- a/ui_src/src/domain/administration/integrations/components/discordIntegration/index.js +++ b/ui_src/src/domain/administration/integrations/components/discordIntegration/index.js @@ -15,23 +15,23 @@ import './style.scss'; import React, { useState, useContext, useEffect } from 'react'; import { Form } from 'antd'; -import { ReactComponent as PoisionAlertIcon } from '../../../../../assets/images/poisionAlertIcon.svg'; -import { ReactComponent as DisconAlertIcon } from '../../../../../assets/images/disconAlertIcon.svg'; -import { ReactComponent as SchemaAlertIcon } from '../../../../../assets/images/schemaAlertIcon.svg'; -import { ReactComponent as PurpleQuestionMark } from '../../../../../assets/images/purpleQuestionMark.svg'; -import { INTEGRATION_LIST, getTabList } from '../../../../../const/integrationList'; -import { ApiEndpoints } from '../../../../../const/apiEndpoints'; -import { httpRequest } from '../../../../../services/http'; -import Switcher from '../../../../../components/switcher'; -import Button from '../../../../../components/button'; -import { Context } from '../../../../../hooks/store'; -import Input from '../../../../../components/Input'; -import CustomTabs from '../../../../../components/Tabs'; -import { URL } from '../../../../../config'; -import Loader from '../../../../../components/loader'; -import CloudMoadl from '../../../../../components/cloudModal'; -import { showMessages } from '../../../../../services/genericServices'; -import { isCloud } from '../../../../../services/valueConvertor'; +import { ReactComponent as PoisionAlertIcon } from 'assets/images/poisionAlertIcon.svg'; +import { ReactComponent as DisconAlertIcon } from 'assets/images/disconAlertIcon.svg'; +import { ReactComponent as SchemaAlertIcon } from 'assets/images/schemaAlertIcon.svg'; +import { ReactComponent as PurpleQuestionMark } from 'assets/images/purpleQuestionMark.svg'; +import { INTEGRATION_LIST, getTabList } from 'const/integrationList'; +import { ApiEndpoints } from 'const/apiEndpoints'; +import { httpRequest } from 'services/http'; +import Switcher from 'components/switcher'; +import Button from 'components/button'; +import { Context } from 'hooks/store'; +import Input from 'components/Input'; +import CustomTabs from 'components/Tabs'; +import { URL } from 'config'; +import Loader from 'components/loader'; +import CloudMoadl from 'components/cloudModal'; +import { showMessages } from 'services/genericServices'; +import { isCloud } from 'services/valueConvertor'; import IntegrationDetails from '../integrationItem/integrationDetails'; import IntegrationLogs from '../integrationItem/integrationLogs'; import { FaArrowCircleUp } from 'react-icons/fa'; From 79e9dc3723859b2603f252155614b8b354376c02 Mon Sep 17 00:00:00 2001 From: Alibi Shalgymbay Date: Wed, 10 Jan 2024 03:24:58 +0600 Subject: [PATCH 4/6] feat: add ratelimit handling --- server/background_tasks.go | 24 +++---- server/notifications.go | 123 ++++++++++++++++++++++++++------ server/notifications_discord.go | 94 +++++++++++++++++++++++- server/notifications_slack.go | 69 ++---------------- server/storage_s3.go | 6 +- 5 files changed, 210 insertions(+), 106 deletions(-) diff --git a/server/background_tasks.go b/server/background_tasks.go index 2db77d318..40070a11e 100644 --- a/server/background_tasks.go +++ b/server/background_tasks.go @@ -360,7 +360,7 @@ func (s *Server) StartBackgroundTasks() error { go s.ScaleFunctionWorkers() go s.ConnectorsDeadPodsRescheduler() - return nil + return nil } func (s *Server) uploadMsgsToTier2Storage() { @@ -798,45 +798,43 @@ func (s *Server) ConsumeNotificationsBufferMessages() error { continue } - msgs, err := fetchMessages[slackMsg](s, + msgs, err := fetchMessages(s, NOTIFICATIONS_BUFFER_CONSUMER, notificationsStreamName, mAmount, - 3*time.Second, - createSlackMsg) + 3*time.Second) if err != nil { s.Errorf("Failed to fetch notifications: %v", err.Error()) continue } - sendSlackNotifications(s, msgs) + sendNotifications(s, msgs) } } -func createSlackMsg(msg []byte, reply string) slackMsg { - return slackMsg{ +func createNotificationBufferMsg(msg []byte, reply string) notificationBufferMsg { + return notificationBufferMsg{ Msg: msg, ReplySubject: reply, } } -func fetchMessages[T any](s *Server, +func fetchMessages(s *Server, consumer, streamName string, mAmount int, - timeToWait time.Duration, - create func(msg []byte, reply string) T) ([]T, error) { + timeToWait time.Duration) ([]notificationBufferMsg, error) { req := []byte(strconv.FormatUint(uint64(mAmount), 10)) - resp := make(chan T) + resp := make(chan notificationBufferMsg) replySubject := consumer + "_reply_" + s.memphis.nuid.Next() timeout := time.NewTimer(timeToWait) sub, err := s.subscribeOnAcc(s.MemphisGlobalAccount(), replySubject, replySubject+"_sid", func(_ *client, subject string, reply string, msg []byte) { go func(subject, reply string, msg []byte) { if reply != "" { - m := create(msg, reply) + m := createNotificationBufferMsg(msg, reply) resp <- m } }(subject, reply, copyBytes(msg)) @@ -848,7 +846,7 @@ func fetchMessages[T any](s *Server, subject := fmt.Sprintf(JSApiRequestNextT, streamName, consumer) s.sendInternalAccountMsgWithReply(s.MemphisGlobalAccount(), subject, replySubject, nil, req, true) - msgs := make([]T, 0) + var msgs []notificationBufferMsg stop := false for { if stop { diff --git a/server/notifications.go b/server/notifications.go index be40022bf..8e5c00410 100644 --- a/server/notifications.go +++ b/server/notifications.go @@ -19,16 +19,13 @@ import ( "time" ) -const ( - slackIntegrationName = "slack" -) - type NotificationMsg struct { - TenantName string `json:"tenantName"` - Title string `json:"title"` - Message string `json:"message"` - MsgType string `json:"msgType"` - Time time.Time `json:"time"` + TenantName string `json:"tenantName"` + Title string `json:"title"` + Message string `json:"message"` + MsgType string `json:"msgType"` + IntegrationName string `json:"integrationName"` + Time time.Time `json:"time"` } type NotificationMsgWithReply struct { @@ -36,12 +33,17 @@ type NotificationMsgWithReply struct { ReplySubject string } +type notificationBufferMsg struct { + Msg []byte + ReplySubject string +} + func (s *Server) SendNotification(tenantName string, title string, message string, msgType string) error { for k := range NotificationFunctionsMap { switch k { - case slackIntegrationName: + case "slack": if tenantInetgrations, ok := IntegrationsConcurrentCache.Load(tenantName); ok { - if slackIntegration, ok := tenantInetgrations[slackIntegrationName].(models.SlackIntegration); ok { + if slackIntegration, ok := tenantInetgrations["slack"].(models.SlackIntegration); ok { if slackIntegration.Properties[msgType] { // TODO: if the stream doesn't exist save the messages in buffer if !NOTIFICATIONS_BUFFER_STREAM_CREATED { @@ -53,14 +55,15 @@ func (s *Server) SendNotification(tenantName string, title string, message strin tenantName = serv.MemphisGlobalAccountString() } notificationMsg := NotificationMsg{ - TenantName: tenantName, - Title: title, - Message: message, - MsgType: msgType, - Time: time.Now(), + TenantName: tenantName, + Title: title, + Message: message, + MsgType: msgType, + IntegrationName: "slack", + Time: time.Now(), } - err := saveSlackNotificationToQueue(s, notificationsStreamName+".user_notifications", tenantName, ¬ificationMsg) + err := saveNotificationToQueue(s, notificationsStreamName+".user_notifications", tenantName, ¬ificationMsg) if err != nil { return err } @@ -69,11 +72,30 @@ func (s *Server) SendNotification(tenantName string, title string, message strin } case "discord": if tenantInetgrations, ok := IntegrationsConcurrentCache.Load(tenantName); ok { - if discordIntegration, ok := tenantInetgrations["discord"].(models.DiscordIntegration); ok { - if discordIntegration.Properties[msgType] { + if slackIntegration, ok := tenantInetgrations["slack"].(models.SlackIntegration); ok { + if slackIntegration.Properties[msgType] { + // TODO: if the stream doesn't exist save the messages in buffer + if !NOTIFICATIONS_BUFFER_STREAM_CREATED { + return nil + } + + // TODO: do we need msg-id here? if yes - what's the best way to generate it? hash title? if tenantName == "" { tenantName = serv.MemphisGlobalAccountString() } + notificationMsg := NotificationMsg{ + TenantName: tenantName, + Title: title, + Message: message, + MsgType: msgType, + IntegrationName: "discord", + Time: time.Now(), + } + + err := saveNotificationToQueue(s, notificationsStreamName+".user_notifications", tenantName, ¬ificationMsg) + if err != nil { + return err + } } } } @@ -84,7 +106,7 @@ func (s *Server) SendNotification(tenantName string, title string, message strin return nil } -func saveSlackNotificationToQueue(s *Server, subject, tenantName string, notificationMsg *NotificationMsg) error { +func saveNotificationToQueue(s *Server, subject, tenantName string, notificationMsg *NotificationMsg) error { msg, err := json.Marshal(notificationMsg) if err != nil { return err @@ -113,3 +135,64 @@ func shouldSendNotification(tenantName string, alertType string) bool { } return false } + +func sendNotifications(s *Server, msgs []notificationBufferMsg) { + groupedMsgs := groupMessagesByTenantAndIntegration(msgs, s) + for integrationName, tenantMsgs := range groupedMsgs { + for tenantName, tMsgs := range tenantMsgs { + switch integrationName { + case "slack": + sendSlackTenantNotifications(s, tenantName, tMsgs) + case "discord": + sendDiscordTenantNotifications(s, tenantName, tMsgs) + } + } + + } +} + +func nackMsgs(s *Server, msgs []NotificationMsgWithReply, nackDuration time.Duration) error { + nakPayload := []byte(fmt.Sprintf("%s {\"delay\": %d}", AckNak, nackDuration.Nanoseconds())) + for i := 0; i < len(msgs); i++ { + m := msgs[i] + err := s.sendInternalAccountMsg(s.MemphisGlobalAccount(), m.ReplySubject, nakPayload) + if err != nil { + return err + } + } + + return nil +} + +func ackMsgs(s *Server, msgs []NotificationMsgWithReply) { + for i := 0; i < len(msgs); i++ { + m := msgs[i] + s.sendInternalAccountMsg(s.MemphisGlobalAccount(), m.ReplySubject, []byte(_EMPTY_)) + } +} + +func groupMessagesByTenantAndIntegration(msgs []notificationBufferMsg, l Logger) map[string]map[string][]NotificationMsgWithReply { + groupMsgs := make(map[string]map[string][]NotificationMsgWithReply) + for _, message := range msgs { + msg := message.Msg + reply := message.ReplySubject + var nm NotificationMsg + err := json.Unmarshal(msg, &nm) + if err != nil { + // TODO: does it make sense to send ack for this message? + // TODO: it's malformed and won't be unmarshalled next time as well + l.Errorf("failed to unmarshal notification message: %v", err) + continue + } + nmr := NotificationMsgWithReply{ + NotificationMsg: &nm, + ReplySubject: reply, + } + if _, ok := groupMsgs[nm.IntegrationName][nm.TenantName]; !ok { + groupMsgs[nm.IntegrationName][nm.TenantName] = []NotificationMsgWithReply{} + } + groupMsgs[nm.IntegrationName][nm.TenantName] = append(groupMsgs[nm.IntegrationName][nm.TenantName], nmr) + } + + return groupMsgs +} diff --git a/server/notifications_discord.go b/server/notifications_discord.go index 82696eeec..604a18055 100644 --- a/server/notifications_discord.go +++ b/server/notifications_discord.go @@ -15,11 +15,13 @@ import ( "bytes" "encoding/json" "errors" - "net/http" - "strings" - + "fmt" "github.com/memphisdev/memphis/db" "github.com/memphisdev/memphis/models" + "net/http" + "strconv" + "strings" + "time" ) type Author struct { @@ -37,6 +39,23 @@ type DiscordMessage struct { Embeds []Embed `json:"embeds"` } +type DiscordRateLimitedError struct { + RetryAfter time.Duration +} + +func (e *DiscordRateLimitedError) Error() string { + return fmt.Sprintf("discord rate limit exceeded, retry after %s", e.RetryAfter) +} + +type DiscordStatusCodeError struct { + Code int + Status string +} + +func (e *DiscordStatusCodeError) Error() string { + return fmt.Sprintf("discord server error: %s", e.Status) +} + func IsDiscordEnabled(tenantName string) (bool, error) { exist, _, err := db.GetIntegration("discord", tenantName) if err != nil { @@ -50,6 +69,22 @@ func IsDiscordEnabled(tenantName string) (bool, error) { return true, nil } +func checkDiscordStatusCode(resp *http.Response) error { + if resp.StatusCode == http.StatusTooManyRequests { + retry, err := strconv.ParseInt(resp.Header.Get("X-RateLimit-Reset-After"), 10, 64) + if err != nil { + return err + } + return &DiscordRateLimitedError{time.Duration(retry) * time.Second} + } + + if resp.StatusCode != http.StatusNoContent { + return &DiscordStatusCodeError{Code: resp.StatusCode, Status: resp.Status} + } + + return nil +} + func sendMessageToDiscordChannel(integration models.DiscordIntegration, title string, message string) error { payload := DiscordMessage{ Embeds: []Embed{ @@ -78,6 +113,12 @@ func sendMessageToDiscordChannel(integration models.DiscordIntegration, title st return err } defer resp.Body.Close() + + err = checkDiscordStatusCode(resp) + if err != nil { + return err + } + return nil } @@ -330,3 +371,50 @@ func hideDiscordWebhookUrl(webhookUrl string) string { } return webhookUrl } + +func sendDiscordTenantNotifications(s *Server, tenantName string, msgs []NotificationMsgWithReply) { + var ok bool + if _, ok := NotificationFunctionsMap["discord"]; !ok { + s.Errorf("[tenant: %v]discord integration doesn't exist", tenantName) + return + } + + var tenantIntegrations map[string]any + if tenantIntegrations, ok = IntegrationsConcurrentCache.Load(tenantName); !ok { + // discord is either not enabled or have been disabled - just ack these messages + ackMsgs(s, msgs) + return + } + + var discordIntegration models.DiscordIntegration + if discordIntegration, ok = tenantIntegrations["discord"].(models.DiscordIntegration); !ok { + // discord is either not enabled or have been disabled - just ack these messages + ackMsgs(s, msgs) + return + } + + for i := 0; i < len(msgs); i++ { + m := msgs[i] + err := sendMessageToDiscordChannel(discordIntegration, m.NotificationMsg.Title, m.NotificationMsg.Message) + if err != nil { + var rateLimit *DiscordRateLimitedError + if errors.As(err, &rateLimit) { + s.Warnf("[tenant: %v]failed to send discord notification: %v", tenantName, err.Error()) + err := nackMsgs(s, msgs[i:], rateLimit.RetryAfter) + if err != nil { + s.Errorf("[tenant: %v]failed to send NACK for discord notification: %v", tenantName, err.Error()) + } + + return + } + + s.Errorf("[tenant: %v]failed to send discord notification: %v", tenantName, err.Error()) + + } + + err = s.sendInternalAccountMsg(s.MemphisGlobalAccount(), m.ReplySubject, []byte(_EMPTY_)) + if err != nil { + s.Errorf("[tenant: %v]failed to send ACK for discord notification: %v", tenantName, err.Error()) + } + } +} diff --git a/server/notifications_slack.go b/server/notifications_slack.go index daad35149..9fbfab0b0 100644 --- a/server/notifications_slack.go +++ b/server/notifications_slack.go @@ -14,12 +14,9 @@ package server import ( "encoding/json" "errors" - "fmt" - "strings" - "time" - "github.com/memphisdev/memphis/db" "github.com/memphisdev/memphis/models" + "strings" "github.com/slack-go/slack" ) @@ -317,21 +314,9 @@ func hideSlackAuthToken(authToken string) string { return authToken } -type slackMsg struct { - Msg []byte - ReplySubject string -} - -func sendSlackNotifications(s *Server, msgs []slackMsg) { - tenantMsgs := groupMessagesByTenant(msgs, s) - for tenantName, tMsgs := range tenantMsgs { - sendTenantSlackNotifications(s, tenantName, tMsgs) - } -} - -func sendTenantSlackNotifications(s *Server, tenantName string, msgs []NotificationMsgWithReply) { +func sendSlackTenantNotifications(s *Server, tenantName string, msgs []NotificationMsgWithReply) { var ok bool - if _, ok := NotificationFunctionsMap[slackIntegrationName]; !ok { + if _, ok := NotificationFunctionsMap["slack"]; !ok { s.Errorf("[tenant: %v]slack integration doesn't exist", tenantName) return } @@ -344,7 +329,7 @@ func sendTenantSlackNotifications(s *Server, tenantName string, msgs []Notificat } var slackIntegration models.SlackIntegration - if slackIntegration, ok = tenantIntegrations[slackIntegrationName].(models.SlackIntegration); !ok { + if slackIntegration, ok = tenantIntegrations["slack"].(models.SlackIntegration); !ok { // slack is either not enabled or have been disabled - just ack these messages ackMsgs(s, msgs) return @@ -379,49 +364,3 @@ func sendTenantSlackNotifications(s *Server, tenantName string, msgs []Notificat } } } - -func nackMsgs(s *Server, msgs []NotificationMsgWithReply, nackDuration time.Duration) error { - nakPayload := []byte(fmt.Sprintf("%s {\"delay\": %d}", AckNak, nackDuration.Nanoseconds())) - for i := 0; i < len(msgs); i++ { - m := msgs[i] - err := s.sendInternalAccountMsg(s.MemphisGlobalAccount(), m.ReplySubject, nakPayload) - if err != nil { - return err - } - } - - return nil -} - -func ackMsgs(s *Server, msgs []NotificationMsgWithReply) { - for i := 0; i < len(msgs); i++ { - m := msgs[i] - s.sendInternalAccountMsg(s.MemphisGlobalAccount(), m.ReplySubject, []byte(_EMPTY_)) - } -} - -func groupMessagesByTenant(msgs []slackMsg, l Logger) map[string][]NotificationMsgWithReply { - tenantMsgs := make(map[string][]NotificationMsgWithReply) - for _, message := range msgs { - msg := message.Msg - reply := message.ReplySubject - var nm NotificationMsg - err := json.Unmarshal(msg, &nm) - if err != nil { - // TODO: does it make sense to send ack for this message? - // TODO: it's malformed and won't be unmarshalled next time as well - l.Errorf("failed to unmarshal slack message: %v", err) - continue - } - nmr := NotificationMsgWithReply{ - NotificationMsg: &nm, - ReplySubject: reply, - } - if _, ok := tenantMsgs[nm.TenantName]; !ok { - tenantMsgs[nm.TenantName] = []NotificationMsgWithReply{} - } - tenantMsgs[nm.TenantName] = append(tenantMsgs[nm.TenantName], nmr) - } - - return tenantMsgs -} diff --git a/server/storage_s3.go b/server/storage_s3.go index 6999ca0bd..3e45e25ef 100644 --- a/server/storage_s3.go +++ b/server/storage_s3.go @@ -99,11 +99,7 @@ func (it IntegrationsHandler) handleUpdateS3Integration(tenantName string, body return models.Integration{}, statusCode, err } integrationType := strings.ToLower(body.Name) -<<<<<<< HEAD - keysMap, properties := createIntegrationsKeysAndProperties(integrationType, "", "", "", false, false, false, keys["access_key"].(string), keys["secret_key"].(string), keys["bucket_name"].(string), keys["region"].(string), keys["url"].(string), keys["s3_path_style"].(string), map[string]interface{}{}, "", "", "", "") -======= - keysMap, properties := createIntegrationsKeysAndProperties(integrationType, _EMPTY_, _EMPTY_, false, false, false, keys["access_key"].(string), keys["secret_key"].(string), keys["bucket_name"].(string), keys["region"].(string), keys["url"].(string), keys["s3_path_style"].(string), map[string]interface{}{}, _EMPTY_, _EMPTY_, _EMPTY_, _EMPTY_) ->>>>>>> upstream/master + keysMap, properties := createIntegrationsKeysAndProperties(integrationType, _EMPTY_, _EMPTY_, _EMPTY_, false, false, false, keys["access_key"].(string), keys["secret_key"].(string), keys["bucket_name"].(string), keys["region"].(string), keys["url"].(string), keys["s3_path_style"].(string), map[string]interface{}{}, _EMPTY_, _EMPTY_, _EMPTY_, _EMPTY_) s3Integration, err := updateS3Integration(tenantName, keysMap, properties) if err != nil { return s3Integration, 500, err From db6d08f14a149ef154e253eb44f46e758c8742d6 Mon Sep 17 00:00:00 2001 From: Alibi Shalgymbay Date: Wed, 10 Jan 2024 03:33:54 +0600 Subject: [PATCH 5/6] fix: init map in map --- server/notifications.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/notifications.go b/server/notifications.go index 8e5c00410..b92fe873f 100644 --- a/server/notifications.go +++ b/server/notifications.go @@ -188,6 +188,9 @@ func groupMessagesByTenantAndIntegration(msgs []notificationBufferMsg, l Logger) NotificationMsg: &nm, ReplySubject: reply, } + if _, ok := groupMsgs[nm.IntegrationName]; !ok { + groupMsgs[nm.IntegrationName] = make(map[string][]NotificationMsgWithReply) + } if _, ok := groupMsgs[nm.IntegrationName][nm.TenantName]; !ok { groupMsgs[nm.IntegrationName][nm.TenantName] = []NotificationMsgWithReply{} } From 288711b2f496796788792649e5cbcc380784e84c Mon Sep 17 00:00:00 2001 From: Alibi Shalgymbay Date: Wed, 10 Jan 2024 21:31:23 +0600 Subject: [PATCH 6/6] feat: add batched messaging --- server/integrations.go | 2 +- server/notifications.go | 1 - server/notifications_discord.go | 44 +++++++++++++++++++-------------- 3 files changed, 27 insertions(+), 20 deletions(-) diff --git a/server/integrations.go b/server/integrations.go index a95b80df5..27507ff3a 100644 --- a/server/integrations.go +++ b/server/integrations.go @@ -33,7 +33,7 @@ func InitializeIntegrations() error { StorageFunctionsMap = make(map[string]interface{}) SourceCodeManagementFunctionsMap = make(map[string]map[string]interface{}) NotificationFunctionsMap["slack"] = sendMessageToSlackChannel - NotificationFunctionsMap["discord"] = sendMessageToDiscordChannel + NotificationFunctionsMap["discord"] = sendMessagesToDiscordChannel StorageFunctionsMap["s3"] = serv.uploadToS3Storage SourceCodeManagementFunctionsMap["github"] = make(map[string]interface{}) SourceCodeManagementFunctionsMap["github"]["get_all_repos"] = serv.getGithubRepositories diff --git a/server/notifications.go b/server/notifications.go index b92fe873f..8a6db06db 100644 --- a/server/notifications.go +++ b/server/notifications.go @@ -147,7 +147,6 @@ func sendNotifications(s *Server, msgs []notificationBufferMsg) { sendDiscordTenantNotifications(s, tenantName, tMsgs) } } - } } diff --git a/server/notifications_discord.go b/server/notifications_discord.go index 604a18055..1e22be8eb 100644 --- a/server/notifications_discord.go +++ b/server/notifications_discord.go @@ -85,18 +85,23 @@ func checkDiscordStatusCode(resp *http.Response) error { return nil } -func sendMessageToDiscordChannel(integration models.DiscordIntegration, title string, message string) error { - payload := DiscordMessage{ - Embeds: []Embed{ - { - Color: "6641663", - Author: Author{ - Name: "Memphis", - }, - Title: title, - Description: message, +func sendMessagesToDiscordChannel(integration models.DiscordIntegration, msgs []NotificationMsgWithReply) error { + var embeds []Embed + for i := 0; i < len(msgs); i++ { + m := msgs[i] + embed := Embed{ + Color: "6641663", + Author: Author{ + Name: "Memphis", }, - }, + Title: m.NotificationMsg.Title, + Description: m.NotificationMsg.Message, + } + embeds = append(embeds, embed) + } + + payload := DiscordMessage{ + Embeds: embeds, } payloadBytes, err := json.Marshal(payload) @@ -393,9 +398,15 @@ func sendDiscordTenantNotifications(s *Server, tenantName string, msgs []Notific return } - for i := 0; i < len(msgs); i++ { - m := msgs[i] - err := sendMessageToDiscordChannel(discordIntegration, m.NotificationMsg.Title, m.NotificationMsg.Message) + const batchedAmount = 10 + + for i := 0; i < len(msgs); i = i + batchedAmount { + right := i + batchedAmount + if right > len(msgs) { + right = len(msgs) + } + ms := msgs[i:right] + err := sendMessagesToDiscordChannel(discordIntegration, ms) if err != nil { var rateLimit *DiscordRateLimitedError if errors.As(err, &rateLimit) { @@ -412,9 +423,6 @@ func sendDiscordTenantNotifications(s *Server, tenantName string, msgs []Notific } - err = s.sendInternalAccountMsg(s.MemphisGlobalAccount(), m.ReplySubject, []byte(_EMPTY_)) - if err != nil { - s.Errorf("[tenant: %v]failed to send ACK for discord notification: %v", tenantName, err.Error()) - } + ackMsgs(s, ms) } }