Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support slack_channel as receiver #85

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions core/notification/direct_channel_router.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package notification

import (
"context"

"github.com/goto/siren/core/log"
"github.com/goto/siren/core/subscription"
"github.com/goto/siren/pkg/errors"
)

const ReceiverTypeDirect = "direct"

type ReceiverView struct {
ID int
Type string
Channel string
}

type DirectChannelRouter struct {
deps Deps
}

func NewDirectChannelRouter(deps Deps) *DirectChannelRouter {
return &DirectChannelRouter{deps: deps}
}

func (r *DirectChannelRouter) PrepareMetaMessages(ctx context.Context, n Notification) ([]MetaMessage, []log.Notification, error) {
if len(n.ReceiverSelectors) > r.deps.Cfg.MaxNumReceiverSelectors {
return nil, nil, errors.ErrInvalid.WithMsgf("number of receiver selectors should be less than or equal threshold %d", r.deps.Cfg.MaxNumReceiverSelectors)
}

var metaMessages []MetaMessage
var notificationLogs []log.Notification

for _, selector := range n.ReceiverSelectors {
var channel string
var receiverType string

if slackChannel, ok := selector["channel_name"].(string); ok {
channel = slackChannel
receiverType = "slack_channel"
} else if directChannel, ok := selector["channel"].(string); ok {
channel = directChannel
receiverType = ReceiverTypeDirect
} else {
return nil, nil, errors.ErrInvalid.WithMsgf("missing or invalid channel in receiver selector")
}

rcvView := ReceiverView{
ID: 0, // Using 0 as a placeholder for direct channels
Type: receiverType,
Channel: channel,
}

metaMessages = append(metaMessages, n.MetaMessage(subscription.ReceiverView{
ID: uint64(rcvView.ID),
Type: rcvView.Type,
}))

notificationLogs = append(notificationLogs, log.Notification{
NamespaceID: n.NamespaceID,
NotificationID: n.ID,
ReceiverID: 0,
AlertIDs: n.AlertIDs,
})
}

if len(metaMessages) == 0 {
return nil, nil, errors.ErrNotFound
}

if len(metaMessages) > r.deps.Cfg.MaxMessagesReceiverFlow {
return nil, nil, errors.ErrInvalid.WithMsgf("sending %d messages exceed max messages direct channel flow threshold %d. this will spam and broadcast to %d channels. found %d receiver selectors passed, you might want to check your receiver selectors configuration", len(metaMessages), r.deps.Cfg.MaxMessagesReceiverFlow, len(metaMessages), len(n.ReceiverSelectors))
}

return metaMessages, notificationLogs, nil
}
53 changes: 31 additions & 22 deletions core/notification/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (
const (
ValidDurationRequestKey string = "valid_duration"

RouterReceiver string = "receiver"
RouterSubscriber string = "subscriber"
RouterReceiver string = "receiver"
RouterSubscriber string = "subscriber"
RouterDirectChannel string = "direct_channel"

TypeAlert string = "alert"
TypeEvent string = "event"
Expand All @@ -36,16 +37,16 @@ type Transactor interface {

// Notification is a model of notification
type Notification struct {
ID string `json:"id"`
NamespaceID uint64 `json:"namespace_id"`
Type string `json:"type"`
Data map[string]any `json:"data"`
Labels map[string]string `json:"labels"`
ValidDuration time.Duration `json:"valid_duration"`
Template string `json:"template"`
UniqueKey string `json:"unique_key"`
ReceiverSelectors []map[string]string `json:"receiver_selectors"`
CreatedAt time.Time `json:"created_at"`
ID string `json:"id"`
NamespaceID uint64 `json:"namespace_id"`
Type string `json:"type"`
Data map[string]any `json:"data"`
Labels map[string]string `json:"labels"`
ValidDuration time.Duration `json:"valid_duration"`
Template string `json:"template"`
UniqueKey string `json:"unique_key"`
ReceiverSelectors []map[string]interface{} `json:"receiver_selectors"`
CreatedAt time.Time `json:"created_at"`

// won't be stored in notification table, only to propagate this to notification_subscriber
AlertIDs []int64
Expand All @@ -65,19 +66,27 @@ func (n *Notification) EnrichID(id string) {
}

func (n Notification) Validate(routerKind string) error {
if routerKind == RouterReceiver {
if len(n.ReceiverSelectors) != 0 {
return nil
switch routerKind {
case RouterReceiver:
if len(n.ReceiverSelectors) == 0 {
return errors.ErrInvalid.WithMsgf("notification type receiver should have receiver_selectors: %v", n)
}
return errors.ErrInvalid.WithMsgf("notification type receiver should have receiver_selectors: %v", n)
} else if routerKind == RouterSubscriber {
if len(n.Labels) != 0 {
return nil
case RouterSubscriber:
if len(n.Labels) == 0 {
return errors.ErrInvalid.WithMsgf("notification type subscriber should have labels: %v", n)
}
return errors.ErrInvalid.WithMsgf("notification type subscriber should have labels: %v", n)
case RouterDirectChannel:
if len(n.ReceiverSelectors) == 0 {
return errors.ErrInvalid.WithMsgf("notification type direct channel should have receiver_selectors: %v", n)
}
if n.Type != "direct_channel" {
return errors.ErrInvalid.WithMsgf("invalid notification type for direct channel: %v", n)
}
// Additional checks for direct channel can be added here if needed
default:
return errors.ErrInvalid.WithMsgf("invalid router kind: %v", routerKind)
}

return errors.ErrInvalid.WithMsgf("invalid notification type: %v", n)
return nil
}

func (n Notification) MetaMessage(receiverView subscription.ReceiverView) MetaMessage {
Expand Down
38 changes: 36 additions & 2 deletions core/notification/notification_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestNotification_Validate(t *testing.T) {
Labels: map[string]string{
"receiver_id": "2",
},
ReceiverSelectors: []map[string]string{
ReceiverSelectors: []map[string]interface{}{
{
"varkey1": "value1",
},
Expand Down Expand Up @@ -70,12 +70,46 @@ func TestNotification_Validate(t *testing.T) {
},
},
},
{
name: "should return error if flow direct channel but has no receiver_selectors",
Flow: notification.RouterDirectChannel,
n: notification.Notification{
Labels: map[string]string{
"labelkey1": "value1",
},
Data: map[string]any{
"varkey1": "value1",
},
},
wantErr: true,
},
{
name: "should return nil error if flow direct channel and receiver_selectors exist with config",
Flow: notification.RouterDirectChannel,
n: notification.Notification{
Type: "direct_channel",
Labels: map[string]string{
"receiver_type": "channel",
},
Data: map[string]any{
"channel": "test-channel",
},
ReceiverSelectors: []map[string]interface{}{
{
"type": "channel",
"config": map[string]interface{}{
"channel": "test-channel",
},
},
},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := tc.n.Validate(tc.Flow)
if (err != nil) != tc.wantErr {
t.Errorf("Notification.ToMessage() error = %v, wantErr %v", err, tc.wantErr)
t.Errorf("Notification.Validate() error = %v, wantErr %v", err, tc.wantErr)
return
}
})
Expand Down
13 changes: 12 additions & 1 deletion core/notification/router_receiver_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,19 @@ func (s *RouterReceiverService) PrepareMetaMessages(ctx context.Context, n Notif
return nil, nil, errors.ErrInvalid.WithMsgf("number of receiver selectors should be less than or equal threshold %d", s.deps.Cfg.MaxNumReceiverSelectors)
}

convertedSelectors := make([]map[string]string, len(n.ReceiverSelectors))
for i, selector := range n.ReceiverSelectors {
convertedSelectors[i] = make(map[string]string)
for k, v := range selector {
if strVal, ok := v.(string); ok {
convertedSelectors[i][k] = strVal
} else {
return nil, nil, errors.ErrInvalid.WithMsgf("receiver selector value for key %s is not a string", k)
}
}
}
rcvs, err := s.deps.ReceiverService.List(ctx, receiver.Filter{
MultipleLabels: n.ReceiverSelectors,
MultipleLabels: convertedSelectors,
Expanded: true,
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion core/notification/router_receiver_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestRouterReceiverService_PrepareMetaMessage(t *testing.T) {
{
name: "should return error if number of receiver selector is more than threshold",
n: notification.Notification{
ReceiverSelectors: []map[string]string{
ReceiverSelectors: []map[string]interface{}{
{
"k1": "v1",
},
Expand Down
72 changes: 38 additions & 34 deletions core/notification/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package notification

import (
"context"
"encoding/json"
"fmt"
"time"

Expand All @@ -20,7 +19,7 @@ import (
)

type Router interface {
PrepareMetaMessages(ctx context.Context, n Notification) (metaMessages []MetaMessage, notificationLogs []log.Notification, err error)
PrepareMetaMessages(ctx context.Context, n Notification) ([]MetaMessage, []log.Notification, error)
}

type Dispatcher interface {
Expand Down Expand Up @@ -78,13 +77,31 @@ func NewService(
routerMap map[string]Router,
notifierPlugins map[string]Notifier,
) *Service {
if routerMap == nil {
routerMap = map[string]Router{
RouterReceiver: NewRouterReceiverService(deps),
RouterSubscriber: NewRouterSubscriberService(deps),
RouterDirectChannel: NewDirectChannelRouter(deps),
}
}
return &Service{
deps: deps,
routerMap: routerMap,
notifierPlugins: notifierPlugins,
}
}

// determineFlow determines the flow of the notification
func (s *Service) determineFlow(receiverSelectors []map[string]interface{}) string {
for _, selector := range receiverSelectors {
if config, ok := selector["config"].(map[string]interface{}); ok {
if _, hasChannel := config["channel"]; hasChannel {
return RouterDirectChannel
}
}
}
return RouterReceiver
}
func (s *Service) Dispatch(ctx context.Context, ns []Notification) ([]string, error) {
ctx = s.deps.Repository.WithTransaction(ctx)

Expand Down Expand Up @@ -169,13 +186,13 @@ func (s *Service) List(ctx context.Context, flt Filter) ([]Notification, error)
return notifications, err
}

func (s *Service) getRouter(notificationRouterKind string) (Router, error) {
selectedRouter, exist := s.routerMap[notificationRouterKind]
if !exist {
return nil, errors.ErrInvalid.WithMsgf("unsupported notification router kind: %q", notificationRouterKind)
}
return selectedRouter, nil
}
// func (s *Service) getRouter(notificationRouterKind string) (Router, error) {
// selectedRouter, exist := s.routerMap[notificationRouterKind]
// if !exist {
// return nil, errors.ErrInvalid.WithMsgf("unsupported notification router kind: %q", notificationRouterKind)
// }
// return selectedRouter, nil
// }

func (s *Service) dispatchInternal(ctx context.Context, ns []Notification) (notificationIDs []string, err error) {
var (
Expand All @@ -186,52 +203,39 @@ func (s *Service) dispatchInternal(ctx context.Context, ns []Notification) (noti
for _, n := range ns {
var flow string
if len(n.ReceiverSelectors) != 0 {
flow = RouterReceiver
flow = s.determineFlow(n.ReceiverSelectors)
} else if len(n.Labels) != 0 {
flow = RouterSubscriber
} else {
return nil, errors.ErrInvalid.WithMsgf("no receivers or labels found, unknown flow")
}

// NOTE: never invalid cause we have checked above
if err := n.Validate(flow); err != nil {
return nil, err
}

// TODO: test if flow is not recognized
router, err := s.getRouter(flow)
if err != nil {
return nil, err
router, ok := s.routerMap[flow]
if !ok {
return nil, errors.ErrInvalid.WithMsgf("unknown flow %s", flow)
}

generatedMetaMessages, generatedNotificationLogs, err := router.PrepareMetaMessages(ctx, n)
mms, nlogs, err := router.PrepareMetaMessages(ctx, n)
if err != nil {
if errors.Is(err, ErrRouteSubscriberNoMatchFound) {
errMessage := fmt.Sprintf("not matching any subscription for notification: %v", n)
nJson, err := json.MarshalIndent(n, "", " ")
if err == nil {
errMessage = fmt.Sprintf("not matching any subscription for notification: %s", string(nJson))
}
s.deps.Logger.Warn(errMessage)
continue
continue // Skip this notification and continue with the next one
}
return nil, err
}

metaMessages = append(metaMessages, generatedMetaMessages...)
notificationLogs = append(notificationLogs, generatedNotificationLogs...)
metaMessages = append(metaMessages, mms...)
notificationLogs = append(notificationLogs, nlogs...)
}

if len(metaMessages) == 0 {
return nil, ErrNoMessage
}

messages, err := s.PrepareMessages(ctx, metaMessages)
if err != nil {
return nil, err
}

if len(messages) == 0 {
s.deps.Logger.Info("no messages to process")
return nil, ErrNoMessage
}

if err := s.deps.LogService.LogNotifications(ctx, notificationLogs...); err != nil {
return nil, fmt.Errorf("failed logging notifications: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions core/notification/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,10 @@ func TestService_DispatchFailure(t *testing.T) {
)
if _, err := s.Dispatch(context.TODO(), tt.n); err != nil {
if err.Error() != tt.wantErrStr {
t.Errorf("Service.DispatchFailure() error = %v, wantErr %v", err, tt.wantErrStr)
t.Errorf("Service.DispatchSuccess() error = %v, wantErr %v", err, tt.wantErrStr)
}
} else {
t.Errorf("Service.DispatchFailure() error = %v, wantErr %v", err, tt.wantErrStr)
} else if tt.wantErrStr != "" {
t.Errorf("Service.DispatchSuccess() expected error %v, got nil", tt.wantErrStr)
}
})
}
Expand Down
Loading
Loading