diff --git a/docs/user_guide/outputs/prometheus_output.md b/docs/user_guide/outputs/prometheus_output.md index 3a341a8f..2bcee846 100644 --- a/docs/user_guide/outputs/prometheus_output.md +++ b/docs/user_guide/outputs/prometheus_output.md @@ -82,6 +82,8 @@ outputs: target-template: # list of processors to apply on the message before writing event-processors: + # an integer, sets the number of worker handling messages to be converted into Prometheus metrics + num-workers: 1 # Enables Consul service registration service-registration: # Consul server address, default to localhost:8500 @@ -458,6 +460,13 @@ When caching is enabled, the received gNMI updates are not processed and convert Once a scrape request is received from `Prometheus`, all the cached gNMI updates are retrieved from the cache, converted to [events](../event_processors/intro.md#the-event-format), the configured processors, if any, are then applied to the whole list of events. Finally, The resulting event are converted into metrics and written back to `Prometheus` within the scrape response. +## Prometheus Output Metrics + +When a Prometheus server (gNMI API) is enabled, `gnmic` prometheus output exposes 2 prometheus Gauges: + +* `number_of_prometheus_metrics_total`: Number of metrics stored by the prometheus output. +* `number_of_prometheus_cached_metrics_total`: Number of metrics cached by the prometheus output. + ## Examples ### **A simple Prometheus output** diff --git a/docs/user_guide/outputs/prometheus_write_output.md b/docs/user_guide/outputs/prometheus_write_output.md index 943ac22e..5c11fc70 100644 --- a/docs/user_guide/outputs/prometheus_write_output.md +++ b/docs/user_guide/outputs/prometheus_write_output.md @@ -90,6 +90,8 @@ outputs: target-template: # list of processors to apply on the message before writing event-processors: + # an integer, sets the number of worker handling messages to be converted into Prometheus metrics + num-workers: 1 ``` `gnmic` creates the prometheus metric name and its labels from the subscription name, the gnmic path and the value name. @@ -137,3 +139,15 @@ For the previous example the labels would be: ```bash {interface_name="1/1/1",subinterface_index=0,source="$routerIP:Port",subscription_name="port-stats"} ``` + +## Prometheus Write Metrics + +When a Prometheus server (gNMI API) is enabled, `gnmic` prometheus write output exposes 4 prometheus counters and 2 prometheus Gauges: + +* `number_of_prometheus_write_msgs_sent_success_total`: Number of msgs successfully sent by gnmic prometheus_write output. +* `number_of_prometheus_write_msgs_sent_fail_total`: Number of failed msgs sent by gnmic prometheus_write output. +* `msg_send_duration_ns`: gnmic prometheus_write output send duration in ns. + +* `number_of_prometheus_write_metadata_msgs_sent_success_total`: Number of metadata msgs successfully sent by gnmic prometheus_write output. +* `number_of_prometheus_write_metadata_msgs_sent_fail_total`: Number of failed metadata msgs sent by gnmic prometheus_write output. +* `metadata_msg_send_duration_ns`: gnmic prometheus_write output metadata send duration in ns. diff --git a/outputs/prometheus_output/prometheus_output/prometheus_cache.go b/outputs/prometheus_output/prometheus_output/prometheus_cache.go index a1b45553..e8feb0e2 100644 --- a/outputs/prometheus_output/prometheus_output/prometheus_cache.go +++ b/outputs/prometheus_output/prometheus_output/prometheus_cache.go @@ -24,8 +24,11 @@ func (p *prometheusOutput) collectFromCache(ch chan<- prometheus.Metric) { p.logger.Printf("failed to read from cache: %v", err) return } + numNotifications := len(notifications) + prometheusNumberOfCachedMetrics.Set(float64(numNotifications)) + p.targetsMeta.DeleteExpired() - events := make([]*formatters.EventMsg, 0, len(notifications)) + events := make([]*formatters.EventMsg, 0, numNotifications) for subName, notifs := range notifications { // build events without processors for _, notif := range notifs { @@ -45,17 +48,17 @@ func (p *prometheusOutput) collectFromCache(ch chan<- prometheus.Metric) { } } - if p.Cfg.CacheConfig.Debug { + if p.cfg.CacheConfig.Debug { p.logger.Printf("got %d events from cache pre processors", len(events)) } for _, proc := range p.evps { events = proc.Apply(events...) } - if p.Cfg.CacheConfig.Debug { + if p.cfg.CacheConfig.Debug { p.logger.Printf("got %d events from cache post processors", len(events)) } - ctx, cancel := context.WithTimeout(context.Background(), p.Cfg.Timeout) + ctx, cancel := context.WithTimeout(context.Background(), p.cfg.Timeout) defer cancel() now := time.Now() for _, ev := range events { diff --git a/outputs/prometheus_output/prometheus_output/prometheus_metrics.go b/outputs/prometheus_output/prometheus_output/prometheus_metrics.go new file mode 100644 index 00000000..c2857a47 --- /dev/null +++ b/outputs/prometheus_output/prometheus_output/prometheus_metrics.go @@ -0,0 +1,48 @@ +// © 2023 Nokia. +// +// This code is a Contribution to the gNMIc project (“Work”) made under the Google Software Grant and Corporate Contributor License Agreement (“CLA”) and governed by the Apache License 2.0. +// No other rights or licenses in or to any of Nokia’s intellectual property are granted for any other purpose. +// This code is provided on an “as is” basis without any warranties of any kind. +// +// SPDX-License-Identifier: Apache-2.0 + +package prometheus_output + +import "github.com/prometheus/client_golang/prometheus" + +const ( + namespace = "gnmic" + subsystem = "prometheus_output" +) + +var prometheusNumberOfMetrics = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "number_of_prometheus_metrics_total", + Help: "Number of metrics stored by the prometheus output", + }) + +var prometheusNumberOfCachedMetrics = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "number_of_prometheus_cached_metrics_total", + Help: "Number of metrics cached by the prometheus output", + }) + +func (p *prometheusOutput) initMetrics() { + if p.cfg.CacheConfig == nil { + prometheusNumberOfMetrics.Set(0) + return + } + prometheusNumberOfCachedMetrics.Set(0) +} + +func (p *prometheusOutput) registerMetrics(reg *prometheus.Registry) error { + p.initMetrics() + if p.cfg.CacheConfig == nil { + return reg.Register(prometheusNumberOfMetrics) + } + return reg.Register(prometheusNumberOfCachedMetrics) +} diff --git a/outputs/prometheus_output/prometheus_output/prometheus_output.go b/outputs/prometheus_output/prometheus_output/prometheus_output.go index d4e58ff0..a6739422 100644 --- a/outputs/prometheus_output/prometheus_output/prometheus_output.go +++ b/outputs/prometheus_output/prometheus_output/prometheus_output.go @@ -54,7 +54,8 @@ const ( loggingPrefix = "[prometheus_output:%s] " // this is used to timeout the collection method // in case it drags for too long - defaultTimeout = 10 * time.Second + defaultTimeout = 10 * time.Second + defaultNumWorkers = 1 ) type promMetric struct { @@ -70,8 +71,9 @@ type promMetric struct { func init() { outputs.Register(outputType, func() outputs.Output { return &prometheusOutput{ - Cfg: &config{}, + cfg: &config{}, eventChan: make(chan *formatters.EventMsg), + msgChan: make(chan *outputs.ProtoMsg), wg: new(sync.WaitGroup), entries: make(map[uint64]*promMetric), logger: log.New(io.Discard, loggingPrefix, utils.DefaultLoggingFlags), @@ -80,9 +82,10 @@ func init() { } type prometheusOutput struct { - Cfg *config + cfg *config logger *log.Logger eventChan chan *formatters.EventMsg + msgChan chan *outputs.ProtoMsg wg *sync.WaitGroup server *http.Server @@ -117,6 +120,8 @@ type config struct { ServiceRegistration *serviceRegistration `mapstructure:"service-registration,omitempty" json:"service-registration,omitempty"` Timeout time.Duration `mapstructure:"timeout,omitempty" json:"timeout,omitempty"` CacheConfig *cache.Config `mapstructure:"cache,omitempty" json:"cache-config,omitempty"` + NumWorkers int `mapstructure:"num-workers,omitempty" json:"num-workers,omitempty"` + EnableMetrics bool `mapstructure:"enable-metrics,omitempty" json:"enable-metrics,omitempty"` clusterName string address string @@ -124,7 +129,7 @@ type config struct { } func (p *prometheusOutput) String() string { - b, err := json.Marshal(p) + b, err := json.Marshal(p.cfg) if err != nil { return "" } @@ -142,7 +147,7 @@ func (p *prometheusOutput) SetEventProcessors(ps map[string]map[string]interface logger *log.Logger, tcs map[string]*types.TargetConfig, acts map[string]map[string]interface{}) { - for _, epName := range p.Cfg.EventProcessors { + for _, epName := range p.cfg.EventProcessors { if epCfg, ok := ps[epName]; ok { epType := "" for k := range epCfg { @@ -172,23 +177,23 @@ func (p *prometheusOutput) SetEventProcessors(ps map[string]map[string]interface } func (p *prometheusOutput) Init(ctx context.Context, name string, cfg map[string]interface{}, opts ...outputs.Option) error { - err := outputs.DecodeConfig(cfg, p.Cfg) + err := outputs.DecodeConfig(cfg, p.cfg) if err != nil { return err } - if p.Cfg.Name == "" { - p.Cfg.Name = name + if p.cfg.Name == "" { + p.cfg.Name = name } - p.logger.SetPrefix(fmt.Sprintf(loggingPrefix, p.Cfg.Name)) + p.logger.SetPrefix(fmt.Sprintf(loggingPrefix, p.cfg.Name)) for _, opt := range opts { opt(p) } - if p.Cfg.TargetTemplate == "" { + if p.cfg.TargetTemplate == "" { p.targetTpl = outputs.DefaultTargetTemplate - } else if p.Cfg.AddTarget != "" { - p.targetTpl, err = utils.CreateTemplate("target-template", p.Cfg.TargetTemplate) + } else if p.cfg.AddTarget != "" { + p.targetTpl, err = utils.CreateTemplate("target-template", p.cfg.TargetTemplate) if err != nil { return err } @@ -200,20 +205,20 @@ func (p *prometheusOutput) Init(ctx context.Context, name string, cfg map[string } p.mb = &promcom.MetricBuilder{ - Prefix: p.Cfg.MetricPrefix, - AppendSubscriptionName: p.Cfg.AppendSubscriptionName, - StringsAsLabels: p.Cfg.StringsAsLabels, + Prefix: p.cfg.MetricPrefix, + AppendSubscriptionName: p.cfg.AppendSubscriptionName, + StringsAsLabels: p.cfg.StringsAsLabels, } - if p.Cfg.CacheConfig != nil { + if p.cfg.CacheConfig != nil { p.gnmiCache, err = cache.New( - p.Cfg.CacheConfig, + p.cfg.CacheConfig, cache.WithLogger(p.logger), ) if err != nil { return err } - p.targetsMeta = ttlcache.New(ttlcache.WithTTL[string, outputs.Meta](p.Cfg.Expiration)) + p.targetsMeta = ttlcache.New(ttlcache.WithTTL[string, outputs.Meta](p.cfg.Expiration)) } // create prometheus registry @@ -227,41 +232,47 @@ func (p *prometheusOutput) Init(ctx context.Context, name string, cfg map[string promHandler := promhttp.HandlerFor(registry, promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError}) mux := http.NewServeMux() - mux.Handle(p.Cfg.Path, promHandler) + mux.Handle(p.cfg.Path, promHandler) p.server = &http.Server{ - Addr: p.Cfg.Listen, + Addr: p.cfg.Listen, Handler: mux, } // create tcp listener var listener net.Listener switch { - case p.Cfg.TLS == nil: - listener, err = net.Listen("tcp", p.Cfg.Listen) + case p.cfg.TLS == nil: + listener, err = net.Listen("tcp", p.cfg.Listen) default: var tlsConfig *tls.Config tlsConfig, err = utils.NewTLSConfig( - p.Cfg.TLS.CaFile, - p.Cfg.TLS.CertFile, - p.Cfg.TLS.KeyFile, - p.Cfg.TLS.ClientAuth, + p.cfg.TLS.CaFile, + p.cfg.TLS.CertFile, + p.cfg.TLS.KeyFile, + p.cfg.TLS.ClientAuth, true, true, ) if err != nil { return err } - listener, err = tls.Listen("tcp", p.Cfg.Listen, tlsConfig) + listener, err = tls.Listen("tcp", p.cfg.Listen, tlsConfig) } if err != nil { return err } // start worker - p.wg.Add(2) + p.wg.Add(1 + p.cfg.NumWorkers) wctx, wcancel := context.WithCancel(ctx) - go p.worker(wctx) - go p.expireMetricsPeriodic(wctx) + for i := 0; i < p.cfg.NumWorkers; i++ { + go p.worker(wctx) + } + + if p.cfg.CacheConfig == nil { + go p.expireMetricsPeriodic(wctx) + } + go func() { defer p.wg.Done() err = p.server.Serve(listener) @@ -284,35 +295,19 @@ func (p *prometheusOutput) Write(ctx context.Context, rsp proto.Message, meta ou if rsp == nil { return } - switch rsp := rsp.(type) { - case *gnmi.SubscribeResponse: - measName := "default" - if subName, ok := meta["subscription-name"]; ok { - measName = subName - } - var err error - rsp, err = outputs.AddSubscriptionTarget(rsp, meta, p.Cfg.AddTarget, p.targetTpl) - if err != nil { - p.logger.Printf("failed to add target to the response: %v", err) - } - if p.gnmiCache != nil { - p.gnmiCache.Write(ctx, measName, rsp) - target := utils.GetHost(meta["source"]) - p.targetsMeta.Set(measName+"/"+target, meta, ttlcache.DefaultTTL) - return - } - events, err := formatters.ResponseToEventMsgs(measName, rsp, meta, p.evps...) - if err != nil { - p.logger.Printf("failed to convert message to event: %v", err) - return - } - for _, ev := range events { - select { - case <-ctx.Done(): - return - case p.eventChan <- ev: - } + + wctx, cancel := context.WithTimeout(ctx, p.cfg.Timeout) + defer cancel() + + select { + case <-ctx.Done(): + return + case p.msgChan <- outputs.NewProtoMsg(rsp, meta): + case <-wctx.Done(): + if p.cfg.Debug { + p.logger.Printf("writing expired after %s", p.cfg.Timeout) } + return } } @@ -334,7 +329,7 @@ func (p *prometheusOutput) WriteEvent(ctx context.Context, ev *formatters.EventM func (p *prometheusOutput) Close() error { var err error if p.consulClient != nil { - err = p.consulClient.Agent().ServiceDeregister(p.Cfg.ServiceRegistration.Name) + err = p.consulClient.Agent().ServiceDeregister(p.cfg.ServiceRegistration.Name) if err != nil { p.logger.Printf("failed to deregister consul service: %v", err) } @@ -353,7 +348,14 @@ func (p *prometheusOutput) Close() error { return nil } -func (p *prometheusOutput) RegisterMetrics(reg *prometheus.Registry) {} +func (p *prometheusOutput) RegisterMetrics(reg *prometheus.Registry) { + if !p.cfg.EnableMetrics { + return + } + if err := p.registerMetrics(reg); err != nil { + p.logger.Printf("failed to register metric: %v", err) + } +} // Describe implements prometheus.Collector func (p *prometheusOutput) Describe(ch chan<- *prometheus.Desc) {} @@ -362,7 +364,7 @@ func (p *prometheusOutput) Describe(ch chan<- *prometheus.Desc) {} func (p *prometheusOutput) Collect(ch chan<- prometheus.Metric) { p.Lock() defer p.Unlock() - if p.Cfg.CacheConfig != nil { + if p.cfg.CacheConfig != nil { p.collectFromCache(ch) return } @@ -370,7 +372,7 @@ func (p *prometheusOutput) Collect(ch chan<- prometheus.Metric) { // run expire before exporting metrics p.expireMetrics() - ctx, cancel := context.WithTimeout(context.Background(), p.Cfg.Timeout) + ctx, cancel := context.WithTimeout(context.Background(), p.cfg.Timeout) defer cancel() for _, entry := range p.entries { @@ -390,35 +392,72 @@ func (p *prometheusOutput) worker(ctx context.Context) { case <-ctx.Done(): return case ev := <-p.eventChan: - if p.Cfg.Debug { - p.logger.Printf("got event to store: %+v", ev) - } - p.Lock() - for _, pm := range p.metricsFromEvent(ev, time.Now()) { - key := pm.calculateKey() - if e, ok := p.entries[key]; ok && pm.time != nil { - if e.time.Before(*pm.time) { - p.entries[key] = pm - } - } else { - p.entries[key] = pm - } - if p.Cfg.Debug { - p.logger.Printf("saved key=%d, metric: %+v", key, pm) - } - } - p.Unlock() + p.workerHandleEvent(ev) + case m := <-p.msgChan: + p.workerHandleProto(ctx, m) + } + } +} + +func (p *prometheusOutput) workerHandleProto(ctx context.Context, m *outputs.ProtoMsg) { + pmsg := m.GetMsg() + switch pmsg := pmsg.(type) { + case *gnmi.SubscribeResponse: + meta := m.GetMeta() + measName := "default" + if subName, ok := meta["subscription-name"]; ok { + measName = subName + } + var err error + pmsg, err = outputs.AddSubscriptionTarget(pmsg, m.GetMeta(), p.cfg.AddTarget, p.targetTpl) + if err != nil { + p.logger.Printf("failed to add target to the response: %v", err) + } + if p.gnmiCache != nil { + p.gnmiCache.Write(ctx, measName, pmsg) + target := utils.GetHost(meta["source"]) + p.targetsMeta.Set(measName+"/"+target, meta, ttlcache.DefaultTTL) + return + } + events, err := formatters.ResponseToEventMsgs(measName, pmsg, meta, p.evps...) + if err != nil { + p.logger.Printf("failed to convert message to event: %v", err) + return + } + for _, ev := range events { + p.workerHandleEvent(ev) + } + } +} + +func (p *prometheusOutput) workerHandleEvent(ev *formatters.EventMsg) { + if p.cfg.Debug { + p.logger.Printf("got event to store: %+v", ev) + } + p.Lock() + defer p.Unlock() + for _, pm := range p.metricsFromEvent(ev, time.Now()) { + key := pm.calculateKey() + e, ok := p.entries[key] + // if the entry key is not present add it to the map. + // if present add it only if the entry timestamp is newer than the + // existing one. + if !ok || (ok && pm.time != nil && e.time.Before(*pm.time)) { + p.entries[key] = pm + } + if p.cfg.Debug { + p.logger.Printf("saved key=%d, metric: %+v", key, pm) } } } func (p *prometheusOutput) expireMetrics() { - if p.Cfg.Expiration <= 0 { + if p.cfg.Expiration <= 0 { return } - expiry := time.Now().Add(-p.Cfg.Expiration) + expiry := time.Now().Add(-p.cfg.Expiration) for k, e := range p.entries { - if p.Cfg.ExportTimestamps { + if p.cfg.ExportTimestamps { if e.time.Before(expiry) { delete(p.entries, k) } @@ -431,10 +470,13 @@ func (p *prometheusOutput) expireMetrics() { } func (p *prometheusOutput) expireMetricsPeriodic(ctx context.Context) { - if p.Cfg.Expiration <= 0 { + if p.cfg.Expiration <= 0 { return } - ticker := time.NewTicker(p.Cfg.Expiration) + p.Lock() + prometheusNumberOfMetrics.Set(float64(len(p.entries))) + p.Unlock() + ticker := time.NewTicker(p.cfg.Expiration) defer ticker.Stop() for { select { @@ -443,28 +485,32 @@ func (p *prometheusOutput) expireMetricsPeriodic(ctx context.Context) { case <-ticker.C: p.Lock() p.expireMetrics() + prometheusNumberOfMetrics.Set(float64(len(p.entries))) p.Unlock() } } } func (p *prometheusOutput) setDefaults() error { - if p.Cfg.Listen == "" { - p.Cfg.Listen = defaultListen + if p.cfg.Listen == "" { + p.cfg.Listen = defaultListen + } + if p.cfg.Path == "" { + p.cfg.Path = defaultPath } - if p.Cfg.Path == "" { - p.Cfg.Path = defaultPath + if p.cfg.Expiration == 0 { + p.cfg.Expiration = defaultExpiration } - if p.Cfg.Expiration == 0 { - p.Cfg.Expiration = defaultExpiration + if p.cfg.CacheConfig != nil && p.cfg.AddTarget == "" { + p.cfg.AddTarget = "if-not-present" } - if p.Cfg.CacheConfig != nil && p.Cfg.AddTarget == "" { - p.Cfg.AddTarget = "if-not-present" + if p.cfg.Timeout <= 0 { + p.cfg.Timeout = defaultTimeout } - if p.Cfg.Timeout <= 0 { - p.Cfg.Timeout = defaultTimeout + if p.cfg.NumWorkers <= 0 { + p.cfg.NumWorkers = defaultNumWorkers } - if p.Cfg.ServiceRegistration == nil { + if p.cfg.ServiceRegistration == nil { return nil } @@ -472,18 +518,18 @@ func (p *prometheusOutput) setDefaults() error { var err error var port string switch { - case p.Cfg.ServiceRegistration.ServiceAddress != "": - p.Cfg.address, port, err = net.SplitHostPort(p.Cfg.ServiceRegistration.ServiceAddress) + case p.cfg.ServiceRegistration.ServiceAddress != "": + p.cfg.address, port, err = net.SplitHostPort(p.cfg.ServiceRegistration.ServiceAddress) if err != nil { // if service-address does not include a port number, use the port number from the listen field if strings.Contains(err.Error(), "missing port in address") { - p.Cfg.address = p.Cfg.ServiceRegistration.ServiceAddress - _, port, err = net.SplitHostPort(p.Cfg.Listen) + p.cfg.address = p.cfg.ServiceRegistration.ServiceAddress + _, port, err = net.SplitHostPort(p.cfg.Listen) if err != nil { p.logger.Printf("invalid 'listen' field format: %v", err) return err } - p.Cfg.port, err = strconv.Atoi(port) + p.cfg.port, err = strconv.Atoi(port) if err != nil { p.logger.Printf("invalid 'listen' field format: %v", err) return err @@ -495,18 +541,18 @@ func (p *prometheusOutput) setDefaults() error { return err } // the service-address contains both an address and a port number - p.Cfg.port, err = strconv.Atoi(port) + p.cfg.port, err = strconv.Atoi(port) if err != nil { p.logger.Printf("invalid 'service-registration.service-address' field format: %v", err) return err } default: - p.Cfg.address, port, err = net.SplitHostPort(p.Cfg.Listen) + p.cfg.address, port, err = net.SplitHostPort(p.cfg.Listen) if err != nil { p.logger.Printf("invalid 'listen' field format: %v", err) return err } - p.Cfg.port, err = strconv.Atoi(port) + p.cfg.port, err = strconv.Atoi(port) if err != nil { p.logger.Printf("invalid 'listen' field format: %v", err) return err @@ -636,25 +682,25 @@ func getFloat(v interface{}) (float64, error) { } func (p *prometheusOutput) SetName(name string) { - if p.Cfg.Name == "" { - p.Cfg.Name = name + if p.cfg.Name == "" { + p.cfg.Name = name } - if p.Cfg.ServiceRegistration != nil { - if p.Cfg.ServiceRegistration.Name == "" { - p.Cfg.ServiceRegistration.Name = fmt.Sprintf("prometheus-%s", p.Cfg.Name) + if p.cfg.ServiceRegistration != nil { + if p.cfg.ServiceRegistration.Name == "" { + p.cfg.ServiceRegistration.Name = fmt.Sprintf("prometheus-%s", p.cfg.Name) } if name == "" { name = uuid.New().String() } - p.Cfg.ServiceRegistration.id = fmt.Sprintf("%s-%s", p.Cfg.ServiceRegistration.Name, name) - p.Cfg.ServiceRegistration.Tags = append(p.Cfg.ServiceRegistration.Tags, fmt.Sprintf("gnmic-instance=%s", name)) + p.cfg.ServiceRegistration.id = fmt.Sprintf("%s-%s", p.cfg.ServiceRegistration.Name, name) + p.cfg.ServiceRegistration.Tags = append(p.cfg.ServiceRegistration.Tags, fmt.Sprintf("gnmic-instance=%s", name)) } } func (p *prometheusOutput) SetClusterName(name string) { - p.Cfg.clusterName = name - if p.Cfg.ServiceRegistration != nil { - p.Cfg.ServiceRegistration.Tags = append(p.Cfg.ServiceRegistration.Tags, fmt.Sprintf("gnmic-cluster=%s", name)) + p.cfg.clusterName = name + if p.cfg.ServiceRegistration != nil { + p.cfg.ServiceRegistration.Tags = append(p.cfg.ServiceRegistration.Tags, fmt.Sprintf("gnmic-cluster=%s", name)) } } @@ -666,7 +712,7 @@ func (p *prometheusOutput) metricsFromEvent(ev *formatters.EventMsg, now time.Ti for vName, val := range ev.Values { v, err := getFloat(val) if err != nil { - if !p.Cfg.StringsAsLabels { + if !p.cfg.StringsAsLabels { continue } v = 1.0 @@ -677,10 +723,10 @@ func (p *prometheusOutput) metricsFromEvent(ev *formatters.EventMsg, now time.Ti value: v, addedAt: now, } - if p.Cfg.OverrideTimestamps && p.Cfg.ExportTimestamps { + if p.cfg.OverrideTimestamps && p.cfg.ExportTimestamps { ev.Timestamp = now.UnixNano() } - if p.Cfg.ExportTimestamps { + if p.cfg.ExportTimestamps { tm := time.Unix(0, ev.Timestamp) pm.time = &tm } diff --git a/outputs/prometheus_output/prometheus_output/prometheus_service_registration.go b/outputs/prometheus_output/prometheus_output/prometheus_service_registration.go index 2ad73689..826d9f1c 100644 --- a/outputs/prometheus_output/prometheus_output/prometheus_service_registration.go +++ b/outputs/prometheus_output/prometheus_output/prometheus_service_registration.go @@ -48,20 +48,20 @@ type serviceRegistration struct { } func (p *prometheusOutput) registerService(ctx context.Context) { - if p.Cfg.ServiceRegistration == nil { + if p.cfg.ServiceRegistration == nil { return } var err error clientConfig := &api.Config{ - Address: p.Cfg.ServiceRegistration.Address, + Address: p.cfg.ServiceRegistration.Address, Scheme: "http", - Datacenter: p.Cfg.ServiceRegistration.Datacenter, - Token: p.Cfg.ServiceRegistration.Token, + Datacenter: p.cfg.ServiceRegistration.Datacenter, + Token: p.cfg.ServiceRegistration.Token, } - if p.Cfg.ServiceRegistration.Username != "" && p.Cfg.ServiceRegistration.Password != "" { + if p.cfg.ServiceRegistration.Username != "" && p.cfg.ServiceRegistration.Password != "" { clientConfig.HttpAuth = &api.HttpBasicAuth{ - Username: p.Cfg.ServiceRegistration.Username, - Password: p.Cfg.ServiceRegistration.Password, + Username: p.cfg.ServiceRegistration.Username, + Password: p.cfg.ServiceRegistration.Password, } } INITCONSUL: @@ -85,8 +85,8 @@ INITCONSUL: defer cancel() doneCh := make(chan struct{}) - if p.Cfg.ServiceRegistration.UseLock { - doneCh, err = p.acquireAndKeepLock(ctx, "gnmic/"+p.Cfg.clusterName+"/prometheus-output", []byte(p.Cfg.ServiceRegistration.id)) + if p.cfg.ServiceRegistration.UseLock { + doneCh, err = p.acquireAndKeepLock(ctx, "gnmic/"+p.cfg.clusterName+"/prometheus-output", []byte(p.cfg.ServiceRegistration.id)) if err != nil { p.logger.Printf("failed to acquire lock: %v", err) time.Sleep(1 * time.Second) @@ -95,26 +95,26 @@ INITCONSUL: } service := &api.AgentServiceRegistration{ - ID: p.Cfg.ServiceRegistration.id, - Name: p.Cfg.ServiceRegistration.Name, - Address: p.Cfg.address, - Port: p.Cfg.port, - Tags: p.Cfg.ServiceRegistration.Tags, + ID: p.cfg.ServiceRegistration.id, + Name: p.cfg.ServiceRegistration.Name, + Address: p.cfg.address, + Port: p.cfg.port, + Tags: p.cfg.ServiceRegistration.Tags, Checks: api.AgentServiceChecks{ { - TTL: p.Cfg.ServiceRegistration.CheckInterval.String(), - DeregisterCriticalServiceAfter: p.Cfg.ServiceRegistration.deregisterAfter, + TTL: p.cfg.ServiceRegistration.CheckInterval.String(), + DeregisterCriticalServiceAfter: p.cfg.ServiceRegistration.deregisterAfter, }, }, } - ttlCheckID := "service:" + p.Cfg.ServiceRegistration.id - if p.Cfg.ServiceRegistration.EnableHTTPCheck { + ttlCheckID := "service:" + p.cfg.ServiceRegistration.id + if p.cfg.ServiceRegistration.EnableHTTPCheck { service.Checks = append(service.Checks, &api.AgentServiceCheck{ - HTTP: p.Cfg.ServiceRegistration.httpCheckAddress, + HTTP: p.cfg.ServiceRegistration.httpCheckAddress, Method: "GET", - Interval: p.Cfg.ServiceRegistration.CheckInterval.String(), + Interval: p.cfg.ServiceRegistration.CheckInterval.String(), TLSSkipVerify: true, - DeregisterCriticalServiceAfter: p.Cfg.ServiceRegistration.deregisterAfter, + DeregisterCriticalServiceAfter: p.cfg.ServiceRegistration.deregisterAfter, }) ttlCheckID = ttlCheckID + ":1" } @@ -130,7 +130,7 @@ INITCONSUL: if err != nil { p.logger.Printf("failed to pass TTL check: %v", err) } - ticker := time.NewTicker(p.Cfg.ServiceRegistration.CheckInterval / 2) + ticker := time.NewTicker(p.cfg.ServiceRegistration.CheckInterval / 2) for { select { case <-ticker.C: @@ -149,32 +149,32 @@ INITCONSUL: } func (p *prometheusOutput) setServiceRegistrationDefaults() { - if p.Cfg.ServiceRegistration.Address == "" { - p.Cfg.ServiceRegistration.Address = defaultServiceRegistrationAddress + if p.cfg.ServiceRegistration.Address == "" { + p.cfg.ServiceRegistration.Address = defaultServiceRegistrationAddress } - if p.Cfg.ServiceRegistration.CheckInterval <= 5*time.Second { - p.Cfg.ServiceRegistration.CheckInterval = defaultRegistrationCheckInterval + if p.cfg.ServiceRegistration.CheckInterval <= 5*time.Second { + p.cfg.ServiceRegistration.CheckInterval = defaultRegistrationCheckInterval } - if p.Cfg.ServiceRegistration.MaxFail <= 0 { - p.Cfg.ServiceRegistration.MaxFail = defaultMaxServiceFail + if p.cfg.ServiceRegistration.MaxFail <= 0 { + p.cfg.ServiceRegistration.MaxFail = defaultMaxServiceFail } - deregisterTimer := p.Cfg.ServiceRegistration.CheckInterval * time.Duration(p.Cfg.ServiceRegistration.MaxFail) - p.Cfg.ServiceRegistration.deregisterAfter = deregisterTimer.String() + deregisterTimer := p.cfg.ServiceRegistration.CheckInterval * time.Duration(p.cfg.ServiceRegistration.MaxFail) + p.cfg.ServiceRegistration.deregisterAfter = deregisterTimer.String() - if !p.Cfg.ServiceRegistration.EnableHTTPCheck { + if !p.cfg.ServiceRegistration.EnableHTTPCheck { return } - p.Cfg.ServiceRegistration.httpCheckAddress = p.Cfg.ServiceRegistration.HTTPCheckAddress - if p.Cfg.ServiceRegistration.httpCheckAddress != "" { - p.Cfg.ServiceRegistration.httpCheckAddress = filepath.Join(p.Cfg.ServiceRegistration.httpCheckAddress, p.Cfg.Path) - if !strings.HasPrefix(p.Cfg.ServiceRegistration.httpCheckAddress, "http") { - p.Cfg.ServiceRegistration.httpCheckAddress = "http://" + p.Cfg.ServiceRegistration.httpCheckAddress + p.cfg.ServiceRegistration.httpCheckAddress = p.cfg.ServiceRegistration.HTTPCheckAddress + if p.cfg.ServiceRegistration.httpCheckAddress != "" { + p.cfg.ServiceRegistration.httpCheckAddress = filepath.Join(p.cfg.ServiceRegistration.httpCheckAddress, p.cfg.Path) + if !strings.HasPrefix(p.cfg.ServiceRegistration.httpCheckAddress, "http") { + p.cfg.ServiceRegistration.httpCheckAddress = "http://" + p.cfg.ServiceRegistration.httpCheckAddress } return } - p.Cfg.ServiceRegistration.httpCheckAddress = filepath.Join(p.Cfg.Listen, p.Cfg.Path) - if !strings.HasPrefix(p.Cfg.ServiceRegistration.httpCheckAddress, "http") { - p.Cfg.ServiceRegistration.httpCheckAddress = "http://" + p.Cfg.ServiceRegistration.httpCheckAddress + p.cfg.ServiceRegistration.httpCheckAddress = filepath.Join(p.cfg.Listen, p.cfg.Path) + if !strings.HasPrefix(p.cfg.ServiceRegistration.httpCheckAddress, "http") { + p.cfg.ServiceRegistration.httpCheckAddress = "http://" + p.cfg.ServiceRegistration.httpCheckAddress } } @@ -196,7 +196,7 @@ func (p *prometheusOutput) acquireLock(ctx context.Context, key string, val []by kvPair.Session, _, err = p.consulClient.Session().Create( &api.SessionEntry{ Behavior: "delete", - TTL: time.Duration(p.Cfg.ServiceRegistration.CheckInterval * 2).String(), + TTL: time.Duration(p.cfg.ServiceRegistration.CheckInterval * 2).String(), LockDelay: 0, }, writeOpts, @@ -216,7 +216,7 @@ func (p *prometheusOutput) acquireLock(ctx context.Context, key string, val []by if acquired { return kvPair.Session, nil } - if p.Cfg.Debug { + if p.cfg.Debug { p.logger.Printf("failed acquiring lock to %q: already locked", kvPair.Key) } time.Sleep(10 * time.Second) @@ -236,7 +236,7 @@ func (p *prometheusOutput) keepLock(ctx context.Context, sessionID string) (chan return } err := p.consulClient.Session().RenewPeriodic( - time.Duration(p.Cfg.ServiceRegistration.CheckInterval/2).String(), + time.Duration(p.cfg.ServiceRegistration.CheckInterval/2).String(), sessionID, writeOpts, doneChan, diff --git a/outputs/prometheus_output/prometheus_write_output/prometheus_write_client.go b/outputs/prometheus_output/prometheus_write_output/prometheus_write_client.go index 573c06dc..5fc1120b 100644 --- a/outputs/prometheus_output/prometheus_write_output/prometheus_write_client.go +++ b/outputs/prometheus_output/prometheus_write_output/prometheus_write_client.go @@ -26,15 +26,15 @@ var backoff = 100 * time.Millisecond func (p *promWriteOutput) createHTTPClient() error { c := &http.Client{ - Timeout: p.Cfg.Timeout, + Timeout: p.cfg.Timeout, } - if p.Cfg.TLS != nil { + if p.cfg.TLS != nil { tlsCfg, err := utils.NewTLSConfig( - p.Cfg.TLS.CaFile, - p.Cfg.TLS.CertFile, - p.Cfg.TLS.KeyFile, + p.cfg.TLS.CaFile, + p.cfg.TLS.CertFile, + p.cfg.TLS.KeyFile, "", - p.Cfg.TLS.SkipVerify, + p.cfg.TLS.SkipVerify, false, ) if err != nil { @@ -50,19 +50,19 @@ func (p *promWriteOutput) createHTTPClient() error { func (p *promWriteOutput) writer(ctx context.Context) { p.logger.Printf("starting writer") - ticker := time.NewTicker(p.Cfg.Interval) + ticker := time.NewTicker(p.cfg.Interval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: - if p.Cfg.Debug { + if p.cfg.Debug { p.logger.Printf("write interval reached, writing to remote") } p.write(ctx) case <-p.buffDrainCh: - if p.Cfg.Debug { + if p.cfg.Debug { p.logger.Printf("buffer full, writing to remote") } p.write(ctx) @@ -72,7 +72,7 @@ func (p *promWriteOutput) writer(ctx context.Context) { func (p *promWriteOutput) write(ctx context.Context) { buffSize := len(p.timeSeriesCh) - if p.Cfg.Debug { + if p.cfg.Debug { p.logger.Printf("write triggered, buffer size: %d", buffSize) } if buffSize == 0 { @@ -97,29 +97,33 @@ WRITE: if numTS == 0 { return } - chunk := make([]prompb.TimeSeries, 0, p.Cfg.MaxTimeSeriesPerWrite) + chunk := make([]prompb.TimeSeries, 0, p.cfg.MaxTimeSeriesPerWrite) for i, pt := range pts { // append timeSeries to chunk chunk = append(chunk, pt) // if the chunk size reaches the configured max or // we reach the max number of time series gathered, send. chunkSize := len(chunk) - if chunkSize == p.Cfg.MaxTimeSeriesPerWrite || i+1 == numTS { - if p.Cfg.Debug { + if chunkSize == p.cfg.MaxTimeSeriesPerWrite || i+1 == numTS { + if p.cfg.Debug { p.logger.Printf("writing a %d time series chunk", chunkSize) } + start := time.Now() err := p.writeRequest(ctx, &prompb.WriteRequest{ Timeseries: chunk, }) if err != nil { - p.logger.Print(err) + prometheusWriteNumberOfFailSendMsgs.WithLabelValues(err.Error()).Inc() + continue } + prometheusWriteSendDuration.Set(float64(time.Since(start).Nanoseconds())) + prometheusWriteNumberOfSentMsgs.Add(float64(chunkSize)) // return if we are done with the gathered time series if i+1 == numTS { return } // reset chunk if we are not done yet - chunk = make([]prompb.TimeSeries, 0, p.Cfg.MaxTimeSeriesPerWrite) + chunk = make([]prompb.TimeSeries, 0, p.cfg.MaxTimeSeriesPerWrite) } } } @@ -142,7 +146,7 @@ RETRY: retries++ err = fmt.Errorf("failed to write to remote: %w", err) p.logger.Print(err) - if retries < p.Cfg.MaxRetries { + if retries < p.cfg.MaxRetries { time.Sleep(backoff) goto RETRY } @@ -150,7 +154,7 @@ RETRY: } defer rsp.Body.Close() - if p.Cfg.Debug { + if p.cfg.Debug { p.logger.Printf("got response from remote: status=%s", rsp.Status) } if rsp.StatusCode >= 300 { @@ -165,11 +169,11 @@ RETRY: // metadataWriter writes the cached metadata entries to the remote address each `metadata.interval` func (p *promWriteOutput) metadataWriter(ctx context.Context) { - if p.Cfg.Metadata == nil || !p.Cfg.Metadata.Include { + if p.cfg.Metadata == nil || !p.cfg.Metadata.Include { return } p.writeMetadata(ctx) - ticker := time.NewTicker(p.Cfg.Metadata.Interval) + ticker := time.NewTicker(p.cfg.Metadata.Interval) defer ticker.Stop() for { select { @@ -191,29 +195,32 @@ func (p *promWriteOutput) writeMetadata(ctx context.Context) { return } - mds := make([]prompb.MetricMetadata, 0, p.Cfg.Metadata.MaxEntriesPerWrite) + mds := make([]prompb.MetricMetadata, 0, p.cfg.Metadata.MaxEntriesPerWrite) count := 0 // keep track of the number of entries in mds for _, md := range p.metadataCache { - if count < p.Cfg.Metadata.MaxEntriesPerWrite { + if count < p.cfg.Metadata.MaxEntriesPerWrite { count++ mds = append(mds, md) continue } // max entries reached, write accumulated entries - if p.Cfg.Debug { + if p.cfg.Debug { p.logger.Printf("writing %d metadata points", len(mds)) } + start := time.Now() err := p.writeRequest(ctx, &prompb.WriteRequest{ Metadata: mds, }) if err != nil { - p.logger.Printf("metadata write err: %v", err) + prometheusWriteNumberOfFailSendMetadataMsgs.WithLabelValues(err.Error()).Inc() return } + prometheusWriteMetadataSendDuration.Set(float64(time.Since(start).Nanoseconds())) + prometheusWriteNumberOfSentMetadataMsgs.Add(float64(len(mds))) // reset counter and array then continue with the loop count = 0 - mds = make([]prompb.MetricMetadata, 0, p.Cfg.Metadata.MaxEntriesPerWrite) + mds = make([]prompb.MetricMetadata, 0, p.cfg.Metadata.MaxEntriesPerWrite) } // no metadata entries to write, return @@ -222,15 +229,19 @@ func (p *promWriteOutput) writeMetadata(ctx context.Context) { } // loop done with some metadata entries left to write - if p.Cfg.Debug { + if p.cfg.Debug { p.logger.Printf("writing %d metadata points", len(mds)) } + start := time.Now() err := p.writeRequest(ctx, &prompb.WriteRequest{ Metadata: mds, }) if err != nil { - p.logger.Printf("metadata write err: %v", err) + prometheusWriteNumberOfFailSendMetadataMsgs.WithLabelValues(err.Error()).Inc() + return } + prometheusWriteMetadataSendDuration.Set(float64(time.Since(start).Nanoseconds())) + prometheusWriteNumberOfSentMetadataMsgs.Add(float64(len(mds))) } func (p *promWriteOutput) makeHTTPRequest(ctx context.Context, wr *prompb.WriteRequest) (*http.Request, error) { @@ -239,7 +250,7 @@ func (p *promWriteOutput) makeHTTPRequest(ctx context.Context, wr *prompb.WriteR return nil, fmt.Errorf("failed to marshal proto write request: %v", err) } compBytes := snappy.Encode(nil, b) - httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, p.Cfg.URL, bytes.NewBuffer(compBytes)) + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, p.cfg.URL, bytes.NewBuffer(compBytes)) if err != nil { return nil, fmt.Errorf("failed to create HTTP request: %v", err) } @@ -248,15 +259,15 @@ func (p *promWriteOutput) makeHTTPRequest(ctx context.Context, wr *prompb.WriteR httpReq.Header.Set("User-Agent", userAgent) httpReq.Header.Set("Content-Type", "application/x-protobuf") - if p.Cfg.Authentication != nil { - httpReq.SetBasicAuth(p.Cfg.Authentication.Username, p.Cfg.Authentication.Password) + if p.cfg.Authentication != nil { + httpReq.SetBasicAuth(p.cfg.Authentication.Username, p.cfg.Authentication.Password) } - if p.Cfg.Authorization != nil && p.Cfg.Authorization.Type != "" { - httpReq.Header.Set("Authorization", fmt.Sprintf("%s %s", p.Cfg.Authorization.Type, p.Cfg.Authorization.Credentials)) + if p.cfg.Authorization != nil && p.cfg.Authorization.Type != "" { + httpReq.Header.Set("Authorization", fmt.Sprintf("%s %s", p.cfg.Authorization.Type, p.cfg.Authorization.Credentials)) } - for k, v := range p.Cfg.Headers { + for k, v := range p.cfg.Headers { httpReq.Header.Add(k, v) } diff --git a/outputs/prometheus_output/prometheus_write_output/prometheus_write_metrics.go b/outputs/prometheus_output/prometheus_write_output/prometheus_write_metrics.go new file mode 100644 index 00000000..815d7454 --- /dev/null +++ b/outputs/prometheus_output/prometheus_write_output/prometheus_write_metrics.go @@ -0,0 +1,93 @@ +// © 2023 Nokia. +// +// This code is a Contribution to the gNMIc project (“Work”) made under the Google Software Grant and Corporate Contributor License Agreement (“CLA”) and governed by the Apache License 2.0. +// No other rights or licenses in or to any of Nokia’s intellectual property are granted for any other purpose. +// This code is provided on an “as is” basis without any warranties of any kind. +// +// SPDX-License-Identifier: Apache-2.0 + +package prometheus_write_output + +import "github.com/prometheus/client_golang/prometheus" + +const ( + namespace = "gnmic" + subsystem = "prometheus_write_output" +) + +var prometheusWriteNumberOfSentMsgs = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "number_of_prometheus_write_msgs_sent_success_total", + Help: "Number of msgs successfully sent by gnmic prometheus_write output", +}) + +var prometheusWriteNumberOfFailSendMsgs = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "number_of_prometheus_write_msgs_sent_fail_total", + Help: "Number of failed msgs sent by gnmic prometheus_write output", +}, []string{"reason"}) + +var prometheusWriteSendDuration = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "msg_send_duration_ns", + Help: "gnmic prometheus_write output send duration in ns", +}) + +var prometheusWriteNumberOfSentMetadataMsgs = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "number_of_prometheus_write_metadata_msgs_sent_success_total", + Help: "Number of metadata msgs successfully sent by gnmic prometheus_write output", +}) + +var prometheusWriteNumberOfFailSendMetadataMsgs = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "number_of_prometheus_write_metadata_msgs_sent_fail_total", + Help: "Number of failed metadata msgs sent by gnmic prometheus_write output", +}, []string{"reason"}) + +var prometheusWriteMetadataSendDuration = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "metadata_msg_send_duration_ns", + Help: "gnmic prometheus_write output metadata send duration in ns", +}) + +func initMetrics() { + // data msgs metrics + prometheusWriteNumberOfSentMsgs.Add(0) + prometheusWriteNumberOfFailSendMsgs.WithLabelValues("").Add(0) + prometheusWriteSendDuration.Set(0) + // metadata msgs metrics + prometheusWriteNumberOfSentMetadataMsgs.Add(0) + prometheusWriteNumberOfFailSendMetadataMsgs.WithLabelValues("").Add(0) + prometheusWriteMetadataSendDuration.Set(0) +} + +func registerMetrics(reg *prometheus.Registry) error { + initMetrics() + var err error + if err = reg.Register(prometheusWriteNumberOfSentMsgs); err != nil { + return err + } + if err = reg.Register(prometheusWriteNumberOfFailSendMsgs); err != nil { + return err + } + if err = reg.Register(prometheusWriteSendDuration); err != nil { + return err + } + if err = reg.Register(prometheusWriteNumberOfSentMetadataMsgs); err != nil { + return err + } + if err = reg.Register(prometheusWriteNumberOfFailSendMetadataMsgs); err != nil { + return err + } + if err = reg.Register(prometheusWriteMetadataSendDuration); err != nil { + return err + } + return nil +} diff --git a/outputs/prometheus_output/prometheus_write_output/prometheus_write_output.go b/outputs/prometheus_output/prometheus_write_output/prometheus_write_output.go index 952337a4..ad56ed6b 100644 --- a/outputs/prometheus_output/prometheus_write_output/prometheus_write_output.go +++ b/outputs/prometheus_output/prometheus_write_output/prometheus_write_output.go @@ -43,15 +43,17 @@ const ( defaultMaxMetaDataEntriesPerWrite = 500 defaultMetricHelp = "gNMIc generated metric" userAgent = "gNMIc prometheus write" + defaultNumWorkers = 1 ) func init() { outputs.Register(outputType, func() outputs.Output { return &promWriteOutput{ - Cfg: &config{}, + cfg: &config{}, logger: log.New(io.Discard, loggingPrefix, utils.DefaultLoggingFlags), eventChan: make(chan *formatters.EventMsg), + msgChan: make(chan *outputs.ProtoMsg), buffDrainCh: make(chan struct{}), m: new(sync.Mutex), metadataCache: make(map[string]prompb.MetricMetadata), @@ -60,11 +62,12 @@ func init() { } type promWriteOutput struct { - Cfg *config + cfg *config logger *log.Logger httpClient *http.Client eventChan chan *formatters.EventMsg + msgChan chan *outputs.ProtoMsg timeSeriesCh chan *prompb.TimeSeries buffDrainCh chan struct{} mb *promcom.MetricBuilder @@ -100,6 +103,8 @@ type config struct { TargetTemplate string `mapstructure:"target-template,omitempty" json:"target-template,omitempty"` StringsAsLabels bool `mapstructure:"strings-as-labels,omitempty" json:"strings-as-labels,omitempty"` EventProcessors []string `mapstructure:"event-processors,omitempty" json:"event-processors,omitempty"` + NumWorkers int `mapstructure:"num-workers,omitempty" json:"num-workers,omitempty"` + EnableMetrics bool `mapstructure:"enable-metrics,omitempty" json:"enable-metrics,omitempty"` } type auth struct { @@ -119,26 +124,26 @@ type metadata struct { } func (p *promWriteOutput) Init(ctx context.Context, name string, cfg map[string]interface{}, opts ...outputs.Option) error { - err := outputs.DecodeConfig(cfg, p.Cfg) + err := outputs.DecodeConfig(cfg, p.cfg) if err != nil { return err } - if p.Cfg.URL == "" { + if p.cfg.URL == "" { return errors.New("missing url field") } - if p.Cfg.Name == "" { - p.Cfg.Name = name + if p.cfg.Name == "" { + p.cfg.Name = name } - p.logger.SetPrefix(fmt.Sprintf(loggingPrefix, p.Cfg.Name)) + p.logger.SetPrefix(fmt.Sprintf(loggingPrefix, p.cfg.Name)) for _, opt := range opts { opt(p) } - if p.Cfg.TargetTemplate == "" { + if p.cfg.TargetTemplate == "" { p.targetTpl = outputs.DefaultTargetTemplate - } else if p.Cfg.AddTarget != "" { - p.targetTpl, err = utils.CreateTemplate("target-template", p.Cfg.TargetTemplate) + } else if p.cfg.AddTarget != "" { + p.targetTpl, err = utils.CreateTemplate("target-template", p.cfg.TargetTemplate) if err != nil { return err } @@ -151,23 +156,26 @@ func (p *promWriteOutput) Init(ctx context.Context, name string, cfg map[string] } p.mb = &promcom.MetricBuilder{ - Prefix: p.Cfg.MetricPrefix, - AppendSubscriptionName: p.Cfg.AppendSubscriptionName, - StringsAsLabels: p.Cfg.StringsAsLabels, + Prefix: p.cfg.MetricPrefix, + AppendSubscriptionName: p.cfg.AppendSubscriptionName, + StringsAsLabels: p.cfg.StringsAsLabels, } // initialize buffer chan - p.timeSeriesCh = make(chan *prompb.TimeSeries, p.Cfg.BufferSize) + p.timeSeriesCh = make(chan *prompb.TimeSeries, p.cfg.BufferSize) err = p.createHTTPClient() if err != nil { return err } ctx, p.cfn = context.WithCancel(ctx) - go p.worker(ctx) + for i := 0; i < p.cfg.NumWorkers; i++ { + go p.worker(ctx) + } + go p.writer(ctx) go p.metadataWriter(ctx) - p.logger.Printf("initialized prometheus write output %s: %s", p.Cfg.Name, p.String()) + p.logger.Printf("initialized prometheus write output %s: %s", p.cfg.Name, p.String()) return nil } @@ -175,30 +183,19 @@ func (p *promWriteOutput) Write(ctx context.Context, rsp proto.Message, meta out if rsp == nil { return } - switch rsp := rsp.(type) { - case *gnmi.SubscribeResponse: - measName := "default" - if subName, ok := meta["subscription-name"]; ok { - measName = subName - } - var err error - rsp, err = outputs.AddSubscriptionTarget(rsp, meta, p.Cfg.AddTarget, p.targetTpl) - if err != nil { - p.logger.Printf("failed to add target to the response: %v", err) - } - events, err := formatters.ResponseToEventMsgs(measName, rsp, meta, p.evps...) - if err != nil { - p.logger.Printf("failed to convert message to event: %v", err) - return - } - for _, ev := range events { - select { - case <-ctx.Done(): - return - case p.eventChan <- ev: - } + wctx, cancel := context.WithTimeout(ctx, p.cfg.Timeout) + defer cancel() + + select { + case <-ctx.Done(): + return + case p.msgChan <- outputs.NewProtoMsg(rsp, meta): + case <-wctx.Done(): + if p.cfg.Debug { + p.logger.Printf("writing expired after %s", p.cfg.Timeout) } + return } } @@ -225,10 +222,17 @@ func (p *promWriteOutput) Close() error { return nil } -func (p *promWriteOutput) RegisterMetrics(_ *prometheus.Registry) {} +func (p *promWriteOutput) RegisterMetrics(reg *prometheus.Registry) { + if !p.cfg.EnableMetrics { + return + } + if err := registerMetrics(reg); err != nil { + p.logger.Printf("failed to register metric: %v", err) + } +} func (p *promWriteOutput) String() string { - b, err := json.Marshal(p) + b, err := json.Marshal(p.cfg) if err != nil { return "" } @@ -246,7 +250,7 @@ func (p *promWriteOutput) SetEventProcessors(ps map[string]map[string]interface{ logger *log.Logger, tcs map[string]*types.TargetConfig, acts map[string]map[string]interface{}) { - for _, epName := range p.Cfg.EventProcessors { + for _, epName := range p.cfg.EventProcessors { if epCfg, ok := ps[epName]; ok { epType := "" for k := range epCfg { @@ -275,8 +279,8 @@ func (p *promWriteOutput) SetEventProcessors(ps map[string]map[string]interface{ } func (p *promWriteOutput) SetName(name string) { - if p.Cfg.Name == "" { - p.Cfg.Name = name + if p.cfg.Name == "" { + p.cfg.Name = name } } @@ -292,64 +296,98 @@ func (p *promWriteOutput) worker(ctx context.Context) { case <-ctx.Done(): return case ev := <-p.eventChan: - if p.Cfg.Debug { - p.logger.Printf("got event to buffer: %+v", ev) - } - for _, pts := range p.mb.TimeSeriesFromEvent(ev) { - if len(p.timeSeriesCh) >= p.Cfg.BufferSize { - //if p.Cfg.Debug { - p.logger.Printf("buffer size reached, triggering write") - // } - p.buffDrainCh <- struct{}{} - } - // populate metadata cache - p.m.Lock() - if p.Cfg.Debug { - p.logger.Printf("saving metrics metadata") - } - p.metadataCache[pts.Name] = prompb.MetricMetadata{ - Type: prompb.MetricMetadata_COUNTER, - MetricFamilyName: pts.Name, - Help: defaultMetricHelp, - } - p.m.Unlock() - // write time series to buffer - if p.Cfg.Debug { - p.logger.Printf("writing TimeSeries to buffer") - } - p.timeSeriesCh <- pts.TS + p.workerHandleEvent(ev) + case m := <-p.msgChan: + p.workerHandleProto(ctx, m) + } + } +} + +func (p *promWriteOutput) workerHandleProto(ctx context.Context, m *outputs.ProtoMsg) { + pmsg := m.GetMsg() + switch pmsg := pmsg.(type) { + case *gnmi.SubscribeResponse: + meta := m.GetMeta() + measName := "default" + if subName, ok := meta["subscription-name"]; ok { + measName = subName + } + var err error + pmsg, err = outputs.AddSubscriptionTarget(pmsg, m.GetMeta(), p.cfg.AddTarget, p.targetTpl) + if err != nil { + p.logger.Printf("failed to add target to the response: %v", err) + } + events, err := formatters.ResponseToEventMsgs(measName, pmsg, meta, p.evps...) + if err != nil { + p.logger.Printf("failed to convert message to event: %v", err) + return + } + for _, ev := range events { + p.workerHandleEvent(ev) + } + } +} + +func (p *promWriteOutput) workerHandleEvent(ev *formatters.EventMsg) { + if p.cfg.Debug { + p.logger.Printf("got event to buffer: %+v", ev) + } + for _, pts := range p.mb.TimeSeriesFromEvent(ev) { + if len(p.timeSeriesCh) >= p.cfg.BufferSize { + if p.cfg.Debug { + p.logger.Printf("buffer size reached, triggering write") } + p.buffDrainCh <- struct{}{} + } + // populate metadata cache + p.m.Lock() + if p.cfg.Debug { + p.logger.Printf("saving metrics metadata") } + p.metadataCache[pts.Name] = prompb.MetricMetadata{ + Type: prompb.MetricMetadata_COUNTER, + MetricFamilyName: pts.Name, + Help: defaultMetricHelp, + } + p.m.Unlock() + // write time series to buffer + if p.cfg.Debug { + p.logger.Printf("writing TimeSeries to buffer") + } + p.timeSeriesCh <- pts.TS } } func (p *promWriteOutput) setDefaults() error { - if p.Cfg.Timeout <= 0 { - p.Cfg.Timeout = defaultTimeout + if p.cfg.Timeout <= 0 { + p.cfg.Timeout = defaultTimeout + } + if p.cfg.Interval <= 0 { + p.cfg.Interval = defaultWriteInterval } - if p.Cfg.Interval <= 0 { - p.Cfg.Interval = defaultWriteInterval + if p.cfg.BufferSize <= 0 { + p.cfg.BufferSize = defaultBufferSize } - if p.Cfg.BufferSize <= 0 { - p.Cfg.BufferSize = defaultBufferSize + if p.cfg.NumWorkers <= 0 { + p.cfg.NumWorkers = defaultNumWorkers } - if p.Cfg.MaxTimeSeriesPerWrite <= 0 { - p.Cfg.MaxTimeSeriesPerWrite = defaultMaxTSPerWrite + if p.cfg.MaxTimeSeriesPerWrite <= 0 { + p.cfg.MaxTimeSeriesPerWrite = defaultMaxTSPerWrite } - if p.Cfg.Metadata == nil { - p.Cfg.Metadata = &metadata{ + if p.cfg.Metadata == nil { + p.cfg.Metadata = &metadata{ Include: true, Interval: defaultMetadataWriteInterval, MaxEntriesPerWrite: defaultMaxMetaDataEntriesPerWrite, } return nil } - if p.Cfg.Metadata.Include { - if p.Cfg.Metadata.Interval <= 0 { - p.Cfg.Metadata.Interval = defaultMetadataWriteInterval + if p.cfg.Metadata.Include { + if p.cfg.Metadata.Interval <= 0 { + p.cfg.Metadata.Interval = defaultMetadataWriteInterval } - if p.Cfg.Metadata.MaxEntriesPerWrite <= 0 { - p.Cfg.Metadata.MaxEntriesPerWrite = defaultMaxMetaDataEntriesPerWrite + if p.cfg.Metadata.MaxEntriesPerWrite <= 0 { + p.cfg.Metadata.MaxEntriesPerWrite = defaultMaxMetaDataEntriesPerWrite } } return nil