diff --git a/cli/deps.go b/cli/deps.go index ff66a420..a64911f6 100644 --- a/cli/deps.go +++ b/cli/deps.go @@ -41,8 +41,15 @@ func InitDeps( telemetry.Init(ctx, cfg.Telemetry, logger) nrApp, err := newrelic.NewApplication( - newrelic.ConfigAppName(cfg.Telemetry.NewRelicAppName), + newrelic.ConfigAppName(cfg.Telemetry.ServiceName), newrelic.ConfigLicense(cfg.Telemetry.NewRelicAPIKey), + func(c *newrelic.Config) { + c.DistributedTracer.Enabled = true + c.DatastoreTracer.DatabaseNameReporting.Enabled = true + c.DatastoreTracer.InstanceReporting.Enabled = true + c.DatastoreTracer.QueryParameters.Enabled = true + c.DatastoreTracer.SlowQuery.Enabled = true + }, ) if err != nil { logger.Warn("failed to init newrelic", "err", err) diff --git a/cli/server.go b/cli/server.go index b42c3e01..f8a4dd91 100644 --- a/cli/server.go +++ b/cli/server.go @@ -154,7 +154,7 @@ func StartServer(ctx context.Context, cfg config.Config) error { if cfg.Notification.MessageHandler.Enabled { workerTicker := worker.NewTicker(logger, worker.WithTickerDuration(cfg.Notification.MessageHandler.PollDuration), worker.WithID("message-handler")) - notificationHandler := notification.NewHandler(cfg.Notification.MessageHandler, logger, queue, notifierRegistry, + notificationHandler := notification.NewHandler(cfg.Notification.MessageHandler, logger, nrApp, queue, notifierRegistry, notification.HandlerWithIdentifier(workerTicker.GetID())) wg.Add(1) go func() { @@ -166,7 +166,7 @@ func StartServer(ctx context.Context, cfg config.Config) error { } if cfg.Notification.DLQHandler.Enabled { workerDLQTicker := worker.NewTicker(logger, worker.WithTickerDuration(cfg.Notification.DLQHandler.PollDuration), worker.WithID("dlq-handler")) - notificationDLQHandler := notification.NewHandler(cfg.Notification.DLQHandler, logger, dlq, notifierRegistry, + notificationDLQHandler := notification.NewHandler(cfg.Notification.DLQHandler, logger, nrApp, dlq, notifierRegistry, notification.HandlerWithIdentifier(workerDLQTicker.GetID())) wg.Add(1) go func() { @@ -196,16 +196,23 @@ func StartServer(ctx context.Context, cfg config.Config) error { timeoutCtx, cancel := context.WithTimeout(context.Background(), gracefulStopQueueWaitPeriod) defer cancel() + logger.Warn("stopping queue...") if err := queue.Stop(timeoutCtx); err != nil { logger.Error("error stopping queue", "error", err) } + logger.Warn("queue stopped...") + + logger.Warn("stopping dlq...") if err := dlq.Stop(timeoutCtx); err != nil { logger.Error("error stopping dlq", "error", err) } + logger.Warn("dlq stopped...") + logger.Warn("closing db...") if err := pgClient.Close(); err != nil { - return err + logger.Error("error when closing db", "err", err) } + logger.Warn("db closed...") return err } diff --git a/cli/worker.go b/cli/worker.go index bc82c9e6..6ad73daf 100644 --- a/cli/worker.go +++ b/cli/worker.go @@ -130,7 +130,7 @@ func workerStartNotificationDLQHandlerCommand() *cobra.Command { func StartNotificationHandlerWorker(ctx context.Context, cfg config.Config, cancelWorkerChan chan struct{}) error { logger := initLogger(cfg.Log) - _, _, pgClient, notifierRegistry, err := InitDeps(ctx, logger, cfg, nil) + _, nrApp, pgClient, notifierRegistry, err := InitDeps(ctx, logger, cfg, nil) if err != nil { return err } @@ -150,7 +150,7 @@ func StartNotificationHandlerWorker(ctx context.Context, cfg config.Config, canc `, cfg.Notification.Queue.Kind.String())) } workerTicker := worker.NewTicker(logger, worker.WithTickerDuration(cfg.Notification.MessageHandler.PollDuration), worker.WithID("message-worker")) - notificationHandler := notification.NewHandler(cfg.Notification.MessageHandler, logger, queue, notifierRegistry, + notificationHandler := notification.NewHandler(cfg.Notification.MessageHandler, logger, nrApp, queue, notifierRegistry, notification.HandlerWithIdentifier(workerTicker.GetID())) go func() { @@ -170,7 +170,7 @@ func StartNotificationHandlerWorker(ctx context.Context, cfg config.Config, canc func StartNotificationDLQHandlerWorker(ctx context.Context, cfg config.Config, cancelWorkerChan chan struct{}) error { logger := initLogger(cfg.Log) - _, _, pgClient, notifierRegistry, err := InitDeps(ctx, logger, cfg, nil) + _, nrApp, pgClient, notifierRegistry, err := InitDeps(ctx, logger, cfg, nil) if err != nil { return err } @@ -191,7 +191,7 @@ func StartNotificationDLQHandlerWorker(ctx context.Context, cfg config.Config, c } workerTicker := worker.NewTicker(logger, worker.WithTickerDuration(cfg.Notification.DLQHandler.PollDuration), worker.WithID("dlq-worker")) - notificationHandler := notification.NewHandler(cfg.Notification.DLQHandler, logger, queue, notifierRegistry, + notificationHandler := notification.NewHandler(cfg.Notification.DLQHandler, logger, nrApp, queue, notifierRegistry, notification.HandlerWithIdentifier("dlq-"+workerTicker.GetID())) go func() { workerTicker.Run(ctx, cancelWorkerChan, func(ctx context.Context, runningAt time.Time) error { diff --git a/core/notification/handler.go b/core/notification/handler.go index 43bb3dd0..d23053ba 100644 --- a/core/notification/handler.go +++ b/core/notification/handler.go @@ -6,6 +6,7 @@ import ( "time" "github.com/goto/salt/log" + "github.com/newrelic/go-agent/v3/newrelic" "go.opencensus.io/tag" "go.opencensus.io/trace" @@ -25,16 +26,18 @@ type Handler struct { notifierRegistry map[string]Notifier supportedReceiverTypes []string messagingTracer *telemetry.MessagingTracer + nrApp *newrelic.Application batchSize int } // NewHandler creates a new handler with some supported type of Notifiers -func NewHandler(cfg HandlerConfig, logger log.Logger, q Queuer, registry map[string]Notifier, opts ...HandlerOption) *Handler { +func NewHandler(cfg HandlerConfig, logger log.Logger, nrApp *newrelic.Application, q Queuer, registry map[string]Notifier, opts ...HandlerOption) *Handler { h := &Handler{ batchSize: defaultBatchSize, logger: logger, + nrApp: nrApp, notifierRegistry: registry, q: q, } @@ -83,12 +86,18 @@ func (h *Handler) getNotifierPlugin(receiverType string) (Notifier, error) { } func (h *Handler) Process(ctx context.Context, runAt time.Time) error { + txn := h.nrApp.StartTransaction(h.identifier) + defer txn.End() + nrCtx := newrelic.NewContext(ctx, txn) + receiverTypes := h.supportedReceiverTypes if len(receiverTypes) == 0 { - return errors.New("no receiver type plugin registered, skipping dequeue") + err := errors.New("no receiver type plugin registered, skipping dequeue") + txn.NoticeError(err) + return err } else { - traceCtx, span := h.messagingTracer.StartSpan(ctx, "batch_dequeue", trace.StringAttribute("messaging.handler_id", h.identifier)) - defer span.End() + traceCtx, span := h.messagingTracer.StartSpan(nrCtx, "batch_dequeue", trace.StringAttribute("messaging.handler_id", h.identifier)) + defer h.messagingTracer.StopSpan() if err := h.q.Dequeue(traceCtx, receiverTypes, h.batchSize, h.MessageHandler); err != nil { if !errors.Is(err, ErrNoMessage) { @@ -96,7 +105,12 @@ func (h *Handler) Process(ctx context.Context, runAt time.Time) error { Code: trace.StatusCodeUnknown, Message: err.Error(), }) - return fmt.Errorf("dequeue failed on handler with id %s: %w", h.identifier, err) + err = fmt.Errorf("dequeue failed on handler with id %s: %w", h.identifier, err) + txn.NoticeError(err) + return err + } else { + // no messages found + txn.Ignore() } } } diff --git a/core/notification/handler_test.go b/core/notification/handler_test.go index 70379f73..65237c7c 100644 --- a/core/notification/handler_test.go +++ b/core/notification/handler_test.go @@ -133,7 +133,7 @@ func TestHandler_MessageHandler(t *testing.T) { tc.setup(mockQueue, mockNotifier) } - h := notification.NewHandler(notification.HandlerConfig{}, log.NewNoop(), mockQueue, map[string]notification.Notifier{ + h := notification.NewHandler(notification.HandlerConfig{}, log.NewNoop(), nil, mockQueue, map[string]notification.Notifier{ testReceiverType: mockNotifier, }) if err := h.MessageHandler(context.TODO(), tc.messages); (err != nil) != tc.wantErr { diff --git a/go.mod b/go.mod index 86b8060e..99f2a9a9 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/mcuadros/go-defaults v1.2.0 github.com/mitchellh/hashstructure/v2 v2.0.2 github.com/mitchellh/mapstructure v1.5.0 - github.com/newrelic/go-agent/v3 v3.20.2 + github.com/newrelic/go-agent/v3 v3.24.0 github.com/newrelic/go-agent/v3/integrations/nrgrpc v1.3.2 github.com/newrelic/newrelic-opencensus-exporter-go v0.4.0 github.com/ory/dockertest/v3 v3.9.1 @@ -35,9 +35,9 @@ require ( github.com/xo/dburl v0.12.4 go.opencensus.io v0.24.0 go.uber.org/zap v1.24.0 - golang.org/x/text v0.7.0 + golang.org/x/text v0.8.0 google.golang.org/genproto v0.0.0-20230221151758-ace64dc21148 - google.golang.org/grpc v1.53.0 + google.golang.org/grpc v1.54.0 google.golang.org/protobuf v1.28.1 gopkg.in/yaml.v3 v3.0.1 ) @@ -103,7 +103,7 @@ require ( github.com/golang-migrate/migrate/v4 v4.15.2 // indirect github.com/golang/glog v1.0.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect - github.com/golang/protobuf v1.5.2 // indirect + github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect github.com/gorilla/css v1.0.0 // indirect @@ -175,11 +175,11 @@ require ( go.uber.org/goleak v1.1.12 // indirect go.uber.org/multierr v1.8.0 // indirect golang.org/x/crypto v0.3.0 // indirect - golang.org/x/net v0.7.0 // indirect + golang.org/x/net v0.8.0 // indirect golang.org/x/oauth2 v0.5.0 // indirect golang.org/x/sync v0.1.0 // indirect - golang.org/x/sys v0.5.0 // indirect - golang.org/x/term v0.5.0 // indirect + golang.org/x/sys v0.6.0 // indirect + golang.org/x/term v0.6.0 // indirect google.golang.org/api v0.110.0 // indirect google.golang.org/appengine v1.6.7 // indirect gopkg.in/ini.v1 v1.67.0 // indirect diff --git a/go.sum b/go.sum index 9b7c9273..e8ace0ed 100644 --- a/go.sum +++ b/go.sum @@ -1057,8 +1057,9 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx4u74HPM= -github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -1714,8 +1715,8 @@ github.com/ncw/swift v1.0.47/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ github.com/ncw/swift v1.0.50/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM= github.com/ncw/swift v1.0.52/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM= github.com/neo4j/neo4j-go-driver v1.8.1-0.20200803113522-b626aa943eba/go.mod h1:ncO5VaFWh0Nrt+4KT4mOZboaczBZcLuHrG+/sUeP8gI= -github.com/newrelic/go-agent/v3 v3.20.2 h1:EqFMriW3Bv3on4tqKzI+fJmNYOEG55yw54v6yv8L+x8= -github.com/newrelic/go-agent/v3 v3.20.2/go.mod h1:rT6ZUxJc5rQbWLyCtjqQCOcfb01lKRFbc1yMQkcboWM= +github.com/newrelic/go-agent/v3 v3.24.0 h1:DPfbd+p0akRjv6UpWzWJl+pfOMSs+QkAeNRUp0fPLZI= +github.com/newrelic/go-agent/v3 v3.24.0/go.mod h1:7GnP0o5ZwEsnC001iDSoZRJ63jS6AtoAOggpg5XVJh8= github.com/newrelic/go-agent/v3/integrations/nrgrpc v1.3.2 h1:SBtZAkWapxfAxYVZHagHxsrbRt+ULkgxi0mbn6TMRM8= github.com/newrelic/go-agent/v3/integrations/nrgrpc v1.3.2/go.mod h1:h5GGcju9OuzmRdKa5glKh/SrMs1HhZyvXrf0mEPv70k= github.com/newrelic/newrelic-opencensus-exporter-go v0.4.0 h1:BjzhyzSrzc8/WtyZDWBF8XATW4M92EoZiy38kgL3gfo= @@ -2555,8 +2556,8 @@ golang.org/x/net v0.0.0-20220812174116-3211cb980234/go.mod h1:YDH+HFinaLZZlnHAfS golang.org/x/net v0.0.0-20220919232410-f2f64ebce3c1/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= golang.org/x/net v0.0.0-20221002022538-bcab6841153b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= -golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= -golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= +golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/oauth2 v0.0.0-20180227000427-d7d64896b5ff/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181106182150-f42d05182288/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -2780,8 +2781,8 @@ golang.org/x/sys v0.0.0-20220818161305-2296e01440c6/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= -golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -2791,8 +2792,8 @@ golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuX golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= -golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY= -golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.6.0 h1:clScbb1cHjoCkyRbWwBEUZ5H/tIFu5TAXIqaZD0Gcjw= +golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -2806,8 +2807,8 @@ golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= -golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= +golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/time v0.0.0-20161028155119-f51c12702a4d/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -2922,7 +2923,7 @@ golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/tools v0.3.0 h1:SrNbZl6ECOS1qFzgTdQfWXZM9XBkiA6tkFrH9YSTPHM= +golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -3158,8 +3159,8 @@ google.golang.org/grpc v1.46.2/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACu google.golang.org/grpc v1.47.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= google.golang.org/grpc v1.48.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= google.golang.org/grpc v1.49.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI= -google.golang.org/grpc v1.53.0 h1:LAv2ds7cmFV/XTS3XG1NneeENYrXGmorPxsBbptIjNc= -google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw= +google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag= +google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.2.0/go.mod h1:DNq5QpG7LJqD2AamLZ7zvKE0DEpVl2BSEVjFycAAjRY= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= diff --git a/internal/server/server.go b/internal/server/server.go index 852b9e9c..9383bf33 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -58,11 +58,11 @@ func RunServer( } loggerOpts := []grpc_zap.Option{ - grpc_zap.WithLevels(zaputil.GRPCCodeToLevel), + grpc_zap.WithLevels(grpc_zap.DefaultCodeToLevel), grpc_zap.WithTimestampFormat(time.RFC3339Nano), grpc_zap.WithDecider(func(fullMethodName string, err error) bool { // will not log gRPC calls if it was a call to healthcheck and no error was raised - if err == nil && fullMethodName == "grpc.health.v1.Health.Check" { + if err == nil && fullMethodName == grpc_health_v1.Health_Check_FullMethodName { return false } // by default everything will be logged diff --git a/internal/store/postgres/namespace.go b/internal/store/postgres/namespace.go index 3566a5c0..cb8b872c 100644 --- a/internal/store/postgres/namespace.go +++ b/internal/store/postgres/namespace.go @@ -61,6 +61,7 @@ func NewNamespaceRepository(client *pgc.Client) *NamespaceRepository { } func (r NamespaceRepository) List(ctx context.Context) ([]namespace.EncryptedNamespace, error) { + query, args, err := namespaceListQueryBuilder.PlaceholderFormat(sq.Dollar).ToSql() if err != nil { return nil, err diff --git a/pkg/httpclient/client.go b/pkg/httpclient/client.go index 325ebcde..62cbc509 100644 --- a/pkg/httpclient/client.go +++ b/pkg/httpclient/client.go @@ -5,6 +5,8 @@ import ( "time" "github.com/goto/siren/pkg/telemetry" + "github.com/newrelic/go-agent/v3/newrelic" + "go.opencensus.io/plugin/ochttp" ) type ClientOpt func(*Client) @@ -43,8 +45,11 @@ func New(cfg Config, opts ...ClientOpt) *Client { } c.httpClient = &http.Client{ - Transport: &telemetry.Transport{ - Base: transport, + Transport: &ochttp.Transport{ + Base: newrelic.NewRoundTripper(&telemetry.Transport{ + Origin: transport, + }), + NewClientTrace: ochttp.NewSpanAnnotatingClientTrace, }, } diff --git a/pkg/pgc/client.go b/pkg/pgc/client.go index 555defc3..4b5c0e26 100644 --- a/pkg/pgc/client.go +++ b/pkg/pgc/client.go @@ -80,10 +80,10 @@ func CheckError(err error) error { } func (c *Client) QueryRowxContext(ctx context.Context, op string, tableName string, query string, args ...interface{}) *sqlx.Row { - ctx, span := c.postgresTracer.StartSpan(ctx, op, tableName, - trace.StringAttribute("db.statement", query), - ) - defer span.End() + + ctx, span := c.postgresTracer.StartSpan(ctx, op, tableName, query) + defer c.postgresTracer.StopSpan() + sqlxRow := c.GetDB(ctx).QueryRowxContext(ctx, query, args...) if sqlxRow.Err() != nil { span.SetStatus(trace.Status{ @@ -95,10 +95,9 @@ func (c *Client) QueryRowxContext(ctx context.Context, op string, tableName stri } func (c *Client) QueryxContext(ctx context.Context, op string, tableName string, query string, args ...interface{}) (*sqlx.Rows, error) { - ctx, span := c.postgresTracer.StartSpan(ctx, op, tableName, - trace.StringAttribute("db.statement", query), - ) - defer span.End() + ctx, span := c.postgresTracer.StartSpan(ctx, op, tableName, query) + defer c.postgresTracer.StopSpan() + sqlxRows, err := c.GetDB(ctx).QueryxContext(ctx, query, args...) if err != nil { span.SetStatus(trace.Status{ @@ -110,10 +109,8 @@ func (c *Client) QueryxContext(ctx context.Context, op string, tableName string, } func (c *Client) GetContext(ctx context.Context, op string, tableName string, dest interface{}, query string, args ...interface{}) error { - ctx, span := c.postgresTracer.StartSpan(ctx, op, tableName, - trace.StringAttribute("db.statement", query), - ) - defer span.End() + ctx, span := c.postgresTracer.StartSpan(ctx, op, tableName, query) + defer c.postgresTracer.StopSpan() if err := c.GetDB(ctx).QueryRowxContext(ctx, query, args...).StructScan(dest); err != nil { span.SetStatus(trace.Status{ @@ -127,10 +124,8 @@ func (c *Client) GetContext(ctx context.Context, op string, tableName string, de } func (c *Client) ExecContext(ctx context.Context, op string, tableName string, query string, args ...interface{}) (sql.Result, error) { - ctx, span := c.postgresTracer.StartSpan(ctx, op, tableName, - trace.StringAttribute("db.statement", query), - ) - defer span.End() + ctx, span := c.postgresTracer.StartSpan(ctx, op, tableName, query) + defer c.postgresTracer.StopSpan() res, err := c.db.ExecContext(ctx, query, args...) if err != nil { @@ -145,10 +140,8 @@ func (c *Client) ExecContext(ctx context.Context, op string, tableName string, q } func (c *Client) NamedExecContext(ctx context.Context, op string, tableName string, query string, arg interface{}) (sql.Result, error) { - ctx, span := c.postgresTracer.StartSpan(ctx, op, tableName, - trace.StringAttribute("db.statement", query), - ) - defer span.End() + ctx, span := c.postgresTracer.StartSpan(ctx, op, tableName, query) + defer c.postgresTracer.StopSpan() res, err := c.db.NamedExecContext(ctx, query, arg) if err != nil { diff --git a/pkg/telemetry/config.go b/pkg/telemetry/config.go index 00686fb4..52805992 100644 --- a/pkg/telemetry/config.go +++ b/pkg/telemetry/config.go @@ -13,9 +13,8 @@ type Config struct { ServiceName string `mapstructure:"service_name" yaml:"service_name" default:"siren"` // NewRelic exporter. - EnableNewrelic bool `mapstructure:"enable_newrelic" yaml:"enable_newrelic" default:"false"` - NewRelicAppName string `mapstructure:"newrelic_app_name" yaml:"newrelic_app_name"` - NewRelicAPIKey string `mapstructure:"newrelic_api_key" yaml:"newrelic_api_key" default:"____LICENSE_STRING_OF_40_CHARACTERS_____"` + EnableNewrelic bool `mapstructure:"enable_newrelic" yaml:"enable_newrelic" default:"false"` + NewRelicAPIKey string `mapstructure:"newrelic_api_key" yaml:"newrelic_api_key" default:"____LICENSE_STRING_OF_40_CHARACTERS_____"` // OpenTelemetry Agent exporter. EnableOtelAgent bool `mapstructure:"enable_otel_agent" yaml:"enable_otel_agent" default:"false"` diff --git a/pkg/telemetry/httpclient.go b/pkg/telemetry/httpclient.go index d03dae1a..f31a66d8 100644 --- a/pkg/telemetry/httpclient.go +++ b/pkg/telemetry/httpclient.go @@ -3,16 +3,14 @@ package telemetry import ( "net/http" - "go.opencensus.io/plugin/ochttp" "go.opencensus.io/trace" ) type Transport struct { - Base http.RoundTripper + Origin http.RoundTripper } func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) { - rt := t.base() span := trace.FromContext(req.Context()) span.AddAttributes([]trace.Attribute{ @@ -21,18 +19,5 @@ func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) { ctx := trace.NewContext(req.Context(), span) - return rt.RoundTrip(req.WithContext(ctx)) -} - -func (t *Transport) base() http.RoundTripper { - if t.Base != nil { - return &ochttp.Transport{ - Base: t.Base, - NewClientTrace: ochttp.NewSpanAnnotatingClientTrace, - } - } - return &ochttp.Transport{ - Base: http.DefaultTransport, - NewClientTrace: ochttp.NewSpanAnnotatingClientTrace, - } + return t.Origin.RoundTrip(req.WithContext(ctx)) } diff --git a/pkg/telemetry/messaging.go b/pkg/telemetry/messaging.go index 09e81016..bc478a78 100644 --- a/pkg/telemetry/messaging.go +++ b/pkg/telemetry/messaging.go @@ -4,11 +4,14 @@ import ( "context" "fmt" + "github.com/newrelic/go-agent/v3/newrelic" "go.opencensus.io/trace" ) type MessagingTracer struct { - queueSystem string + queueSystem string + nrProducerSegment newrelic.MessageProducerSegment + span *trace.Span } func NewMessagingTracer(queueSystem string) *MessagingTracer { @@ -17,7 +20,16 @@ func NewMessagingTracer(queueSystem string) *MessagingTracer { } } -func (msg MessagingTracer) StartSpan(ctx context.Context, op string, spanAttributes ...trace.Attribute) (context.Context, *trace.Span) { +func (msg *MessagingTracer) StartSpan(ctx context.Context, op string, spanAttributes ...trace.Attribute) (context.Context, *trace.Span) { + nrTx := newrelic.FromContext(ctx) + msg.nrProducerSegment = newrelic.MessageProducerSegment{ + Library: msg.queueSystem, + DestinationType: newrelic.MessageExchange, + DestinationName: "notification_queue", + DestinationTemporary: false, + StartTime: nrTx.StartSegmentNow(), + } + // Refer https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md ctx, span := trace.StartSpan(ctx, fmt.Sprintf("notification_queue %s", op), trace.WithSpanKind(trace.SpanKindClient)) @@ -34,5 +46,12 @@ func (msg MessagingTracer) StartSpan(ctx context.Context, op string, spanAttribu traceAttributes..., ) + msg.span = span + return ctx, span } + +func (msg *MessagingTracer) StopSpan() { + msg.nrProducerSegment.End() + msg.span.End() +} diff --git a/pkg/telemetry/opencensus.go b/pkg/telemetry/opencensus.go index 3fa06872..fe2b409b 100644 --- a/pkg/telemetry/opencensus.go +++ b/pkg/telemetry/opencensus.go @@ -3,6 +3,7 @@ package telemetry import ( "context" "net/http" + "strings" "contrib.go.opencensus.io/exporter/ocagent" "contrib.go.opencensus.io/exporter/prometheus" @@ -63,7 +64,7 @@ func setupOpenCensus(ctx context.Context, mux *http.ServeMux, cfg Config) error } pe, err := prometheus.NewExporter(prometheus.Options{ - Namespace: cfg.ServiceName, + Namespace: strings.ReplaceAll(cfg.ServiceName, "-", "_"), }) if err != nil { return err diff --git a/pkg/telemetry/postgres.go b/pkg/telemetry/postgres.go index 20812ead..af5457ad 100644 --- a/pkg/telemetry/postgres.go +++ b/pkg/telemetry/postgres.go @@ -5,16 +5,19 @@ import ( "fmt" "strings" + "github.com/newrelic/go-agent/v3/newrelic" "github.com/xo/dburl" "go.opencensus.io/trace" ) type PostgresTracer struct { - dbSystem string - dbName string - dbUser string - dbAddr string - dbPort string + dbSystem string + dbName string + dbUser string + dbAddr string + dbPort string + nrDataStoreSegment newrelic.DatastoreSegment + span *trace.Span } func NewPostgresTracer(url string) (*PostgresTracer, error) { @@ -31,12 +34,25 @@ func NewPostgresTracer(url string) (*PostgresTracer, error) { }, err } -func (d PostgresTracer) StartSpan(ctx context.Context, op string, tableName string, spanAttributes ...trace.Attribute) (context.Context, *trace.Span) { +func (d *PostgresTracer) StartSpan(ctx context.Context, op string, tableName string, query string, spanAttributes ...trace.Attribute) (context.Context, *trace.Span) { + nrTx := newrelic.FromContext(ctx) + d.nrDataStoreSegment = newrelic.DatastoreSegment{ + Product: newrelic.DatastorePostgres, + DatabaseName: d.dbName, + PortPathOrID: d.dbPort, + Collection: tableName, + Operation: op, + Host: d.dbAddr, + StartTime: nrTx.StartSegmentNow(), + ParameterizedQuery: query, + } + // Refer https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/database.md ctx, span := trace.StartSpan(ctx, fmt.Sprintf("%s %s.%s", op, d.dbName, tableName), trace.WithSpanKind(trace.SpanKindClient)) traceAttributes := []trace.Attribute{ trace.StringAttribute("db.system", d.dbSystem), + trace.StringAttribute("db.statement", query), trace.StringAttribute("db.user", d.dbUser), trace.StringAttribute("net.sock.peer.addr", d.dbAddr), trace.StringAttribute("net.peer.port", d.dbPort), @@ -51,5 +67,12 @@ func (d PostgresTracer) StartSpan(ctx context.Context, op string, tableName stri traceAttributes..., ) + d.span = span + return ctx, span } + +func (d *PostgresTracer) StopSpan() { + d.nrDataStoreSegment.End() + d.span.End() +} diff --git a/pkg/zaputil/grpc.go b/pkg/zaputil/grpc.go index 5d80d894..a92756f4 100644 --- a/pkg/zaputil/grpc.go +++ b/pkg/zaputil/grpc.go @@ -3,8 +3,6 @@ package zaputil import ( "github.com/goto/salt/log" "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "google.golang.org/grpc/codes" ) // GRPCZapLogger returns *zap.Logger from salt/log.Logger. @@ -23,49 +21,3 @@ func GRPCZapLogger(logger log.Logger) (*zap.Logger, error) { } return zapLogger, nil } - -// GRPCCodeToLevel is the mapping of gRPC return codes and interceptor -// log level. Convert codes.OK to DEBUG level, the rest are -// the same with the DefaultCodeToLevel in [grpc_zap]. -// -// [grpc_zap]: https://pkg.go.dev/github.com/grpc-ecosystem/go-grpc-middleware/logging/zap#DefaultCodeToLevel -func GRPCCodeToLevel(code codes.Code) zapcore.Level { - switch code { - case codes.OK: - return zap.DebugLevel - case codes.Canceled: - return zap.InfoLevel - case codes.Unknown: - return zap.ErrorLevel - case codes.InvalidArgument: - return zap.InfoLevel - case codes.DeadlineExceeded: - return zap.WarnLevel - case codes.NotFound: - return zap.InfoLevel - case codes.AlreadyExists: - return zap.InfoLevel - case codes.PermissionDenied: - return zap.WarnLevel - case codes.Unauthenticated: - return zap.InfoLevel // unauthenticated requests can happen - case codes.ResourceExhausted: - return zap.WarnLevel - case codes.FailedPrecondition: - return zap.WarnLevel - case codes.Aborted: - return zap.WarnLevel - case codes.OutOfRange: - return zap.WarnLevel - case codes.Unimplemented: - return zap.ErrorLevel - case codes.Internal: - return zap.ErrorLevel - case codes.Unavailable: - return zap.WarnLevel - case codes.DataLoss: - return zap.ErrorLevel - default: - return zap.ErrorLevel - } -}