Skip to content

Commit

Permalink
feat: add instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
mabdh committed Jun 2, 2024
1 parent ac21942 commit 213e629
Show file tree
Hide file tree
Showing 11 changed files with 174 additions and 94 deletions.
36 changes: 28 additions & 8 deletions core/alert/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
"github.com/goto/siren/core/template"
"github.com/goto/siren/pkg/errors"
"github.com/goto/siren/pkg/structure"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

type LogService interface {
Expand All @@ -22,20 +25,27 @@ type NotificationService interface {

// Service handles business logic
type Service struct {
cfg Config
logger saltlog.Logger
repository Repository
logService LogService
notificationService NotificationService
registry map[string]AlertTransformer
cfg Config
logger saltlog.Logger
repository Repository
logService LogService
notificationService NotificationService
registry map[string]AlertTransformer
metricGaugeNumAlerts metric.Int64Gauge
}

// NewService returns repository struct
func NewService(cfg Config, logger saltlog.Logger, repository Repository, logService LogService, notificationService NotificationService, registry map[string]AlertTransformer) *Service {
return &Service{cfg, logger, repository, logService, notificationService, registry}
metricGaugeNumAlerts, err := otel.Meter("github.com/goto/siren/core/alert").
Int64Gauge("siren.alert.number")
if err != nil {
otel.Handle(err)
}

return &Service{cfg, logger, repository, logService, notificationService, registry, metricGaugeNumAlerts}
}

func (s *Service) CreateAlerts(ctx context.Context, providerType string, providerID uint64, namespaceID uint64, body map[string]any) ([]Alert, error) {
func (s *Service) CreateAlerts(ctx context.Context, providerType string, providerID uint64, namespaceID uint64, body map[string]any) (alerts []Alert, err error) {
pluginService, err := s.getProviderPluginService(providerType)
if err != nil {
return nil, err
Expand All @@ -45,6 +55,10 @@ func (s *Service) CreateAlerts(ctx context.Context, providerType string, provide
return nil, err
}

defer func() {
s.instrumentNumberAlerts(ctx, len(alerts), err)
}()

for i := 0; i < len(alerts); i++ {
createdAlert, err := s.repository.Create(ctx, alerts[i])
if err != nil {
Expand Down Expand Up @@ -189,3 +203,9 @@ func BuildNotifications(

return notifications, nil
}

func (s *Service) instrumentNumberAlerts(ctx context.Context, num int, err error) {
s.metricGaugeNumAlerts.Record(ctx, int64(num), metric.WithAttributes(
attribute.Bool("success", err == nil),
))
}
42 changes: 29 additions & 13 deletions core/notification/dispatch_bulk_notification_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,36 @@ import (
"github.com/goto/siren/pkg/errors"
"github.com/goto/siren/pkg/structure"
"github.com/mitchellh/hashstructure/v2"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"golang.org/x/exp/maps"
)

// DispatchBulkNotificationService only supports subscriber routing and not supporting direct receiver routing
type DispatchBulkNotificationService struct {
deps Deps
notifierPlugins map[string]Notifier
routerMap map[string]Router
deps Deps
notifierPlugins map[string]Notifier
routerMap map[string]Router
metricGaugeNumBulkNotification metric.Int64Gauge
}

func NewDispatchBulkNotificationService(
deps Deps,
notifierPlugins map[string]Notifier,
routerMap map[string]Router,
) *DispatchBulkNotificationService {
metricGaugeNumBulkNotification, err := otel.Meter("github.com/goto/siren/core/notification").
Int64Gauge("siren.notification.bulk.notification_number")
if err != nil {
otel.Handle(err)
}

return &DispatchBulkNotificationService{
deps: deps,
notifierPlugins: notifierPlugins,
routerMap: routerMap,
deps: deps,
notifierPlugins: notifierPlugins,
routerMap: routerMap,
metricGaugeNumBulkNotification: metricGaugeNumBulkNotification,
}
}

Expand All @@ -38,11 +49,7 @@ func (s *DispatchBulkNotificationService) getRouter(notificationRouterKind strin
return selectedRouter, nil
}

func (s *DispatchBulkNotificationService) prepareMetaMessages(ctx context.Context, ns []Notification) ([]MetaMessage, []log.Notification, error) {
var (
metaMessages []MetaMessage
notificationLogs []log.Notification
)
func (s *DispatchBulkNotificationService) prepareMetaMessages(ctx context.Context, ns []Notification) (metaMessages []MetaMessage, notificationLogs []log.Notification, err error) {
for _, n := range ns {
if err := n.Validate(RouterSubscriber); err != nil {
return nil, nil, err
Expand All @@ -65,9 +72,12 @@ func (s *DispatchBulkNotificationService) prepareMetaMessages(ctx context.Contex
return metaMessages, notificationLogs, nil
}

func (s *DispatchBulkNotificationService) Dispatch(ctx context.Context, ns []Notification) ([]string, error) {
func (s *DispatchBulkNotificationService) Dispatch(ctx context.Context, ns []Notification) (notificationIDs []string, err error) {
defer func() {
s.instrumentNumberBulkNotification(ctx, len(ns), err)
}()

var (
notificationIDs []string
metaMessages []MetaMessage
notificationLogs []log.Notification
)
Expand Down Expand Up @@ -184,3 +194,9 @@ func MergeMetaMessage(from MetaMessage, to MetaMessage) MetaMessage {
output.SubscriptionIDs = append(output.SubscriptionIDs, from.SubscriptionIDs...)
return output
}

func (s *DispatchBulkNotificationService) instrumentNumberBulkNotification(ctx context.Context, num int, err error) {
s.metricGaugeNumBulkNotification.Record(ctx, int64(num), metric.WithAttributes(
attribute.Bool("success", err == nil),
))
}
6 changes: 6 additions & 0 deletions core/notification/dispatch_bulk_notification_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,9 +447,15 @@ func TestReduceMetaMessages(t *testing.T) {
sort.Slice(got, func(i, j int) bool {
return got[i].ReceiverID < got[j].ReceiverID
})
sort.Slice(got, func(i, j int) bool {
return len(got[i].SubscriptionIDs) < len(got[j].SubscriptionIDs)
})
sort.Slice(tt.want, func(i, j int) bool {
return tt.want[i].ReceiverID < tt.want[j].ReceiverID
})
sort.Slice(tt.want, func(i, j int) bool {
return len(tt.want[i].SubscriptionIDs) < len(tt.want[j].SubscriptionIDs)
})
if diff := cmp.Diff(got, tt.want); diff != "" {
t.Errorf("ReduceMetaMessages() diff = %v", diff)
}
Expand Down
88 changes: 60 additions & 28 deletions core/notification/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
"time"

"github.com/goto/salt/log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/goto/siren/pkg/errors"
)
Expand All @@ -22,16 +25,23 @@ type Handler struct {
notifierRegistry map[string]Notifier
supportedReceiverTypes []string
batchSize int
metricHistMQDuration metric.Int64Histogram
}

// NewHandler creates a new handler with some supported type of Notifiers
func NewHandler(cfg HandlerConfig, logger log.Logger, q Queuer, registry map[string]Notifier, opts ...HandlerOption) *Handler {
metricHistMQDuration, err := otel.Meter("github.com/goto/siren/core/notification").
Int64Histogram("siren.notification.queue.duration")
if err != nil {
otel.Handle(err)
}
h := &Handler{
batchSize: defaultBatchSize,

logger: logger,
notifierRegistry: registry,
q: q,
logger: logger,
notifierRegistry: registry,
q: q,
metricHistMQDuration: metricHistMQDuration,
}

if cfg.BatchSize != 0 {
Expand Down Expand Up @@ -89,42 +99,64 @@ func (h *Handler) Process(ctx context.Context, runAt time.Time) error {
return nil
}

func (h *Handler) errorMessageHandler(ctx context.Context, retryable bool, herr error, msg *Message) error {
msg.MarkFailed(time.Now(), retryable, herr)
if err := h.q.ErrorCallback(ctx, *msg); err != nil {
return fmt.Errorf("failed to execute error callback with receiver type %s and error %w", msg.ReceiverType, err)
}
return herr
}

// MessageHandler is a function to handler dequeued message
func (h *Handler) MessageHandler(ctx context.Context, messages []Message) error {
for _, message := range messages {
notifier, err := h.getNotifierPlugin(message.ReceiverType)
if err != nil {
return err
for _, msg := range messages {
if err := h.SingleMessageHandler(ctx, &msg); err != nil {
h.logger.Error(err.Error())
}
}
return nil
}

message.MarkPending(time.Now())
func (h *Handler) SingleMessageHandler(ctx context.Context, msg *Message) error {

newConfig, err := notifier.PostHookQueueTransformConfigs(ctx, message.Configs)
if err != nil {
message.MarkFailed(time.Now(), false, err)
defer func() {
h.instrumentMQDuration(ctx, msg)
}()

if cerr := h.q.ErrorCallback(ctx, message); cerr != nil {
return cerr
}
return err
}
message.Configs = newConfig
notifier, err := h.getNotifierPlugin(msg.ReceiverType)
if err != nil {
return h.errorMessageHandler(ctx, false, err, msg)
}

if retryable, err := notifier.Send(ctx, message); err != nil {
message.MarkFailed(time.Now(), retryable, err)
msg.MarkPending(time.Now())

if cerr := h.q.ErrorCallback(ctx, message); cerr != nil {
return cerr
}
return err
}
newConfig, err := notifier.PostHookQueueTransformConfigs(ctx, msg.Configs)
if err != nil {
return h.errorMessageHandler(ctx, false, err, msg)
}
msg.Configs = newConfig

message.MarkPublished(time.Now())
if retryable, err := notifier.Send(ctx, *msg); err != nil {
return h.errorMessageHandler(ctx, retryable, err, msg)
}

if err := h.q.SuccessCallback(ctx, message); err != nil {
return err
}
msg.MarkPublished(time.Now())

if err := h.q.SuccessCallback(ctx, *msg); err != nil {
return err
}

return nil
}

func (h *Handler) instrumentMQDuration(ctx context.Context, msg *Message) {
h.metricHistMQDuration.Record(
ctx, time.Since(msg.CreatedAt).Milliseconds(), metric.WithAttributes(
attribute.String("receiver_type", msg.ReceiverType),
attribute.String("status", string(msg.Status)),
attribute.Int("try_count", msg.TryCount),
attribute.Int("max_try", msg.MaxTries),
attribute.Bool("retryable", msg.Retryable),
),
)
}
31 changes: 17 additions & 14 deletions core/notification/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ import (

const testReceiverType = "test"

func TestHandler_MessageHandler(t *testing.T) {
func TestHandler_SingleMessageHandler(t *testing.T) {
testCases := []struct {
name string
messages []notification.Message
setup func(*mocks.Queuer, *mocks.Notifier)
wantErr bool
name string
messages []notification.Message
setup func(*mocks.Queuer, *mocks.Notifier)
wantErrStr string
}{
{
name: "return error if plugin type is not supported",
Expand All @@ -29,8 +29,10 @@ func TestHandler_MessageHandler(t *testing.T) {
},
},
setup: func(q *mocks.Queuer, _ *mocks.Notifier) {
q.EXPECT().ErrorCallback(mock.AnythingOfType("context.todoCtx"), mock.AnythingOfType("notification.Message")).Return(nil)
q.EXPECT().ErrorCallback(mock.AnythingOfType("context.todoCtx"), mock.AnythingOfType("notification.Message")).Return(nil)
},
wantErr: true,
wantErrStr: "unsupported receiver type: \"random\" on handler ",
},
{
name: "return error if post hook transform config is failing and error callback success",
Expand All @@ -43,7 +45,7 @@ func TestHandler_MessageHandler(t *testing.T) {
n.EXPECT().PostHookQueueTransformConfigs(mock.AnythingOfType("context.todoCtx"), mock.AnythingOfType("map[string]interface {}")).Return(nil, errors.New("some error"))
q.EXPECT().ErrorCallback(mock.AnythingOfType("context.todoCtx"), mock.AnythingOfType("notification.Message")).Return(nil)
},
wantErr: true,
wantErrStr: "some error",
},
{
name: "return error if post hook transform config is failing and error callback is failing",
Expand All @@ -56,7 +58,7 @@ func TestHandler_MessageHandler(t *testing.T) {
n.EXPECT().PostHookQueueTransformConfigs(mock.AnythingOfType("context.todoCtx"), mock.AnythingOfType("map[string]interface {}")).Return(nil, errors.New("some error"))
q.EXPECT().ErrorCallback(mock.AnythingOfType("context.todoCtx"), mock.AnythingOfType("notification.Message")).Return(errors.New("some error"))
},
wantErr: true,
wantErrStr: "failed to execute error callback with receiver type test and error some error",
},
{
name: "return error if send message return error and error handler queue return error",
Expand All @@ -70,7 +72,7 @@ func TestHandler_MessageHandler(t *testing.T) {
n.EXPECT().Send(mock.AnythingOfType("context.todoCtx"), mock.AnythingOfType("notification.Message")).Return(false, errors.New("some error"))
q.EXPECT().ErrorCallback(mock.AnythingOfType("context.todoCtx"), mock.AnythingOfType("notification.Message")).Return(errors.New("some error"))
},
wantErr: true,
wantErrStr: "failed to execute error callback with receiver type test and error some error",
},
{
name: "return error if send message return error and error handler queue return no error",
Expand All @@ -84,7 +86,7 @@ func TestHandler_MessageHandler(t *testing.T) {
n.EXPECT().Send(mock.AnythingOfType("context.todoCtx"), mock.AnythingOfType("notification.Message")).Return(false, errors.New("some error"))
q.EXPECT().ErrorCallback(mock.AnythingOfType("context.todoCtx"), mock.AnythingOfType("notification.Message")).Return(nil)
},
wantErr: true,
wantErrStr: "some error",
},
{
name: "return error if send message success and success handler queue return error",
Expand All @@ -98,7 +100,7 @@ func TestHandler_MessageHandler(t *testing.T) {
n.EXPECT().Send(mock.AnythingOfType("context.todoCtx"), mock.AnythingOfType("notification.Message")).Return(false, nil)
q.EXPECT().SuccessCallback(mock.AnythingOfType("context.todoCtx"), mock.AnythingOfType("notification.Message")).Return(errors.New("some error"))
},
wantErr: true,
wantErrStr: "some error",
},
{
name: "return no error if send message success and success handler queue return no error",
Expand All @@ -112,7 +114,6 @@ func TestHandler_MessageHandler(t *testing.T) {
n.EXPECT().Send(mock.AnythingOfType("context.todoCtx"), mock.AnythingOfType("notification.Message")).Return(false, nil)
q.EXPECT().SuccessCallback(mock.AnythingOfType("context.todoCtx"), mock.AnythingOfType("notification.Message")).Return(nil)
},
wantErr: false,
},
}
for _, tc := range testCases {
Expand All @@ -129,8 +130,10 @@ func TestHandler_MessageHandler(t *testing.T) {
h := notification.NewHandler(notification.HandlerConfig{}, log.NewNoop(), mockQueue, map[string]notification.Notifier{
testReceiverType: mockNotifier,
})
if err := h.MessageHandler(context.TODO(), tc.messages); (err != nil) != tc.wantErr {
t.Errorf("Handler.messageHandler() error = %v, wantErr %v", err, tc.wantErr)
if err := h.SingleMessageHandler(context.TODO(), &tc.messages[0]); err != nil {
if err.Error() != tc.wantErrStr {
t.Errorf("Handler.messageHandler() error = %s, wantErr = %s", err.Error(), tc.wantErrStr)
}
}

mockQueue.AssertExpectations(t)
Expand Down
Loading

0 comments on commit 213e629

Please sign in to comment.