Skip to content

Commit

Permalink
feat: Support slack channel passed as config in notification API
Browse files Browse the repository at this point in the history
  • Loading branch information
Manish Dangi committed Oct 15, 2024
1 parent 4b0f987 commit ab41b38
Show file tree
Hide file tree
Showing 12 changed files with 400 additions and 100 deletions.
76 changes: 75 additions & 1 deletion core/notification/mocks/receiver_service.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 10 additions & 10 deletions core/notification/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ type Transactor interface {

// Notification is a model of notification
type Notification struct {
ID string `json:"id"`
NamespaceID uint64 `json:"namespace_id"`
Type string `json:"type"`
Data map[string]any `json:"data"`
Labels map[string]string `json:"labels"`
ValidDuration time.Duration `json:"valid_duration"`
Template string `json:"template"`
UniqueKey string `json:"unique_key"`
ReceiverSelectors []map[string]string `json:"receiver_selectors"`
CreatedAt time.Time `json:"created_at"`
ID string `json:"id"`
NamespaceID uint64 `json:"namespace_id"`
Type string `json:"type"`
Data map[string]any `json:"data"`
Labels map[string]string `json:"labels"`
ValidDuration time.Duration `json:"valid_duration"`
Template string `json:"template"`
UniqueKey string `json:"unique_key"`
ReceiverSelectors []map[string]interface{} `json:"receiver_selectors"`
CreatedAt time.Time `json:"created_at"`

// won't be stored in notification table, only to propagate this to notification_subscriber
AlertIDs []int64
Expand Down
2 changes: 1 addition & 1 deletion core/notification/notification_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestNotification_Validate(t *testing.T) {
Labels: map[string]string{
"receiver_id": "2",
},
ReceiverSelectors: []map[string]string{
ReceiverSelectors: []map[string]interface{}{
{
"varkey1": "value1",
},
Expand Down
113 changes: 103 additions & 10 deletions core/notification/router_receiver_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package notification

import (
"context"
"fmt"
"strconv"

"github.com/goto/siren/core/log"
"github.com/goto/siren/core/receiver"
Expand All @@ -26,10 +28,26 @@ func (s *RouterReceiverService) PrepareMetaMessages(ctx context.Context, n Notif
return nil, nil, errors.ErrInvalid.WithMsgf("number of receiver selectors should be less than or equal threshold %d", s.deps.Cfg.MaxNumReceiverSelectors)
}

rcvs, err := s.deps.ReceiverService.List(ctx, receiver.Filter{
MultipleLabels: n.ReceiverSelectors,
Expanded: true,
})
var rcvs []receiver.Receiver
var userConfigs map[uint64]map[string]interface{}

// Check if any selector contains a config
hasConfig := false
for _, selector := range n.ReceiverSelectors {
if _, ok := selector["config"]; ok {
hasConfig = true
break
}
}

if hasConfig {
// Handle case when config is provided
rcvs, userConfigs, err = s.handleConfigCase(ctx, n.ReceiverSelectors)
} else {
// Handle case when only receiver IDs are provided
rcvs, err = s.handleIDOnlyCase(ctx, n.ReceiverSelectors)
}

if err != nil {
return nil, nil, err
}
Expand All @@ -38,11 +56,31 @@ func (s *RouterReceiverService) PrepareMetaMessages(ctx context.Context, n Notif
return nil, nil, errors.ErrNotFound
}

// Check if the number of receivers exceeds the max messages receiver flow
if len(rcvs) > s.deps.Cfg.MaxMessagesReceiverFlow {
return nil, nil, errors.ErrInvalid.WithMsgf("sending %d messages exceed max messages receiver flow threshold %d. this will spam and broadcast to %d channel. found %d receiver selectors passed, you might want to check your receiver selectors configuration", len(rcvs), s.deps.Cfg.MaxMessagesReceiverFlow, len(rcvs), len(n.ReceiverSelectors))
}

for _, rcv := range rcvs {
var rcvView = &subscription.ReceiverView{}
rcvView := &subscription.ReceiverView{}
rcvView.FromReceiver(rcv)
metaMessages = append(metaMessages, n.MetaMessage(*rcvView))

if config, ok := userConfigs[rcv.ID]; ok {
// Merge user-provided config with receiver config
for k, v := range config {
rcvView.Configurations[k] = v
}
}

// Ensure required fields are set
if err := s.validateConfigurations(rcvView.Configurations); err != nil {
return nil, nil, err
}

metaMessage := n.MetaMessage(*rcvView)
metaMessage.NotificationIDs = []string{n.ID}

metaMessages = append(metaMessages, metaMessage)
notificationLogs = append(notificationLogs, log.Notification{
NamespaceID: n.NamespaceID,
NotificationID: n.ID,
Expand All @@ -51,10 +89,65 @@ func (s *RouterReceiverService) PrepareMetaMessages(ctx context.Context, n Notif
})
}

var metaMessagesNum = len(metaMessages)
if metaMessagesNum > s.deps.Cfg.MaxMessagesReceiverFlow {
return nil, nil, errors.ErrInvalid.WithMsgf("sending %d messages exceed max messages receiver flow threshold %d. this will spam and broadcast to %d channel. found %d receiver selectors passed, you might want to check your receiver selectors configuration", metaMessagesNum, s.deps.Cfg.MaxMessagesReceiverFlow, metaMessagesNum, len(n.ReceiverSelectors))
return metaMessages, notificationLogs, nil
}

func (s *RouterReceiverService) handleConfigCase(ctx context.Context, selectors []map[string]interface{}) ([]receiver.Receiver, map[uint64]map[string]interface{}, error) {
var receiverIDs []uint64
userConfigs := make(map[uint64]map[string]interface{})

for _, selector := range selectors {
if idStr, ok := selector["id"].(string); ok {
id, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
return nil, nil, errors.ErrInvalid.WithMsgf("invalid receiver id: %s", idStr)
}
receiverIDs = append(receiverIDs, id)
}
if config, ok := selector["config"].(map[string]interface{}); ok {
for _, id := range receiverIDs {
if userConfigs[id] == nil {
userConfigs[id] = make(map[string]interface{})
}
for k, v := range config {
userConfigs[id][k] = v
}
}
}
}

return metaMessages, notificationLogs, nil
rcvs := make([]receiver.Receiver, 0, len(receiverIDs))
for _, id := range receiverIDs {
rcv, err := s.deps.ReceiverService.Get(ctx, id)
if err != nil {
return nil, nil, err
}
rcvs = append(rcvs, *rcv)
}

return rcvs, userConfigs, nil
}

func (s *RouterReceiverService) handleIDOnlyCase(ctx context.Context, selectors []map[string]interface{}) ([]receiver.Receiver, error) {
convertedSelectors := make([]map[string]string, len(selectors))
for i, selector := range selectors {
convertedSelectors[i] = make(map[string]string)
for k, v := range selector {
convertedSelectors[i][k] = fmt.Sprint(v)
}
}
return s.deps.ReceiverService.List(ctx, receiver.Filter{
MultipleLabels: convertedSelectors,
Expanded: true,
})
}

func (s *RouterReceiverService) validateConfigurations(configs map[string]interface{}) error {
requiredFields := []string{"token", "workspace", "channel_name"}
for _, field := range requiredFields {
if _, ok := configs[field]; !ok {
return errors.ErrInvalid.WithMsgf("%s is required in the config", field)
}
}
return nil
}
45 changes: 39 additions & 6 deletions core/notification/router_receiver_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestRouterReceiverService_PrepareMetaMessage(t *testing.T) {
{
name: "should return error if number of receiver selector is more than threshold",
n: notification.Notification{
ReceiverSelectors: []map[string]string{
ReceiverSelectors: []map[string]interface{}{
{
"k1": "v1",
},
Expand Down Expand Up @@ -73,28 +73,61 @@ func TestRouterReceiverService_PrepareMetaMessage(t *testing.T) {
},
{
name: "should return no error if succeed",
n: notification.Notification{},
n: notification.Notification{
ID: "test-notification-id",
NamespaceID: 123,
},
setup: func(rs *mocks.ReceiverService, n *mocks.Notifier) {
rs.EXPECT().List(mock.AnythingOfType("context.todoCtx"), mock.AnythingOfType("receiver.Filter")).Return([]receiver.Receiver{
{
ID: 1,
Configurations: map[string]interface{}{
"token": "token1",
"workspace": "workspace1",
"channel_name": "channel1",
},
},
{
ID: 2,
Configurations: map[string]interface{}{
"token": "token2",
"workspace": "workspace2",
"channel_name": "channel2",
},
},
}, nil)
},
want: []notification.MetaMessage{
{
ReceiverID: 1,
ReceiverID: 1,
NotificationIDs: []string{"test-notification-id"},
ReceiverConfigs: map[string]interface{}{
"token": "token1",
"workspace": "workspace1",
"channel_name": "channel1",
},
},
{
ReceiverID: 2,
ReceiverID: 2,
NotificationIDs: []string{"test-notification-id"},
ReceiverConfigs: map[string]interface{}{
"token": "token2",
"workspace": "workspace2",
"channel_name": "channel2",
},
},
},
want1: []log.Notification{
{ReceiverID: 1},
{ReceiverID: 2},
{
ReceiverID: 1,
NotificationID: "test-notification-id",
NamespaceID: 123,
},
{
ReceiverID: 2,
NotificationID: "test-notification-id",
NamespaceID: 123,
},
},
},
}
Expand Down
2 changes: 2 additions & 0 deletions core/notification/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type SubscriptionService interface {

type ReceiverService interface {
List(ctx context.Context, flt receiver.Filter) ([]receiver.Receiver, error)
Get(ctx context.Context, id uint64, opts ...receiver.GetOption) (*receiver.Receiver, error)
}

type SilenceService interface {
Expand Down Expand Up @@ -237,6 +238,7 @@ func (s *Service) dispatchInternal(ctx context.Context, ns []Notification) (noti
}

if err := s.deps.Q.Enqueue(ctx, messages...); err != nil {
fmt.Printf("Context: %+v\n", ctx)
return nil, fmt.Errorf("failed enqueuing messages: %w", err)
}

Expand Down
Loading

0 comments on commit ab41b38

Please sign in to comment.