diff --git a/pkg/agent/cache/managedapplication.go b/pkg/agent/cache/managedapplication.go index e72eb7a77..724a75ffd 100644 --- a/pkg/agent/cache/managedapplication.go +++ b/pkg/agent/cache/managedapplication.go @@ -2,6 +2,8 @@ package cache import ( v1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/api/v1" + catalog "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/catalog/v1alpha1" + management "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/management/v1alpha1" ) // ManagedApplication cache related methods @@ -16,7 +18,14 @@ func (c *cacheManager) AddManagedApplication(resource *v1.ResourceInstance) { if resource == nil { return } + manApp := management.ManagedApplication{} + err := manApp.FromInstance(resource) + if err != nil { + return + } + catalogAppRef := manApp.GetReferenceByGVK(catalog.ApplicationGVK()) c.managedApplicationMap.SetWithSecondaryKey(resource.Metadata.ID, resource.Name, resource) + c.managedApplicationMap.SetSecondaryKey(resource.Metadata.ID, catalogAppRef.ID) } func (c *cacheManager) GetManagedApplication(id string) *v1.ResourceInstance { @@ -33,6 +42,20 @@ func (c *cacheManager) GetManagedApplication(id string) *v1.ResourceInstance { return nil } +func (c *cacheManager) GetManagedApplicationByApplicationID(id string) *v1.ResourceInstance { + c.ApplyResourceReadLock() + defer c.ReleaseResourceReadLock() + + managedApp, _ := c.managedApplicationMap.GetBySecondaryKey(id) + if managedApp != nil { + ri, ok := managedApp.(*v1.ResourceInstance) + if ok { + return ri + } + } + return nil +} + func (c *cacheManager) GetManagedApplicationByName(name string) *v1.ResourceInstance { c.ApplyResourceReadLock() defer c.ReleaseResourceReadLock() diff --git a/pkg/agent/cache/manager.go b/pkg/agent/cache/manager.go index e5f73f128..4c23cda87 100644 --- a/pkg/agent/cache/manager.go +++ b/pkg/agent/cache/manager.go @@ -92,6 +92,7 @@ type Manager interface { GetManagedApplicationCacheKeys() []string AddManagedApplication(resource *v1.ResourceInstance) GetManagedApplication(id string) *v1.ResourceInstance + GetManagedApplicationByApplicationID(id string) *v1.ResourceInstance GetManagedApplicationByName(name string) *v1.ResourceInstance DeleteManagedApplication(id string) error @@ -116,7 +117,6 @@ type Manager interface { ReleaseResourceReadLock() } -type teamRefreshHandler func() type cacheManager struct { jobs.Job logger log.FieldLogger @@ -157,6 +157,7 @@ func NewAgentCacheManager(cfg config.CentralConfig, persistCacheEnabled bool) Ma m.migrators = []cacheMigrate{ m.migrateAccessRequest, m.migrateInstanceCount, + m.migrateManagedApplications, } } m.initializeCache(cfg) diff --git a/pkg/agent/cache/migratepersistedcache.go b/pkg/agent/cache/migratepersistedcache.go index 6903aed90..3693b8183 100644 --- a/pkg/agent/cache/migratepersistedcache.go +++ b/pkg/agent/cache/migratepersistedcache.go @@ -4,6 +4,7 @@ import ( "sync" v1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/api/v1" + catalog "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/catalog/v1alpha1" management "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/management/v1alpha1" defs "github.com/Axway/agent-sdk/pkg/apic/definitions" "github.com/Axway/agent-sdk/pkg/util" @@ -105,3 +106,28 @@ func (c *cacheManager) migrateInstanceCount(key string) error { } return nil } + +func (c *cacheManager) migrateManagedApplications(cacheKey string) error { + if cacheKey != managedAppKey { + return nil + } + + for _, key := range c.managedApplicationMap.GetKeys() { + cachedManagedApp, _ := c.managedApplicationMap.Get(key) + if cachedManagedApp == nil { + continue + } + ri, ok := cachedManagedApp.(*v1.ResourceInstance) + if !ok { + continue + } + manApp := management.ManagedApplication{} + err := manApp.FromInstance(ri) + if err != nil { + continue + } + catalogAppRef := manApp.GetReferenceByGVK(catalog.ApplicationGVK()) + c.managedApplicationMap.SetSecondaryKey(key, catalogAppRef.ID) + } + return nil +} diff --git a/pkg/transaction/metric/apimetric.go b/pkg/transaction/metric/apimetric.go index 2cfca0968..a77f492bc 100644 --- a/pkg/transaction/metric/apimetric.go +++ b/pkg/transaction/metric/apimetric.go @@ -16,7 +16,6 @@ type APIMetric struct { AssetResource models.AssetResource `json:"assetResource,omitempty"` ProductPlan models.ProductPlan `json:"productPlan,omitempty"` Quota models.Quota `json:"quota,omitempty"` - Unit models.Unit `json:"unit,omitempty"` StatusCode string `json:"statusCode,omitempty"` Status string `json:"status,omitempty"` Count int64 `json:"count"` @@ -24,6 +23,7 @@ type APIMetric struct { Observation ObservationDetails `json:"observation"` EventID string `json:"-"` StartTime time.Time `json:"-"` + Unit *models.Unit `json:"unit,omitempty"` } // GetStartTime - Returns the start time for subscription metric diff --git a/pkg/transaction/metric/cachestorage.go b/pkg/transaction/metric/cachestorage.go index 20019e755..56f3bdc18 100644 --- a/pkg/transaction/metric/cachestorage.go +++ b/pkg/transaction/metric/cachestorage.go @@ -13,6 +13,7 @@ import ( "github.com/Axway/agent-sdk/pkg/agent" "github.com/Axway/agent-sdk/pkg/cache" "github.com/Axway/agent-sdk/pkg/traceability" + "github.com/Axway/agent-sdk/pkg/transaction/models" "github.com/Axway/agent-sdk/pkg/util" "github.com/rcrowley/go-metrics" ) @@ -32,8 +33,8 @@ type storageCache interface { updateUsage(usageCount int) updateVolume(bytes int64) updateAppUsage(usageCount int, appID string) - updateMetric(cachedMetric cachedMetricInterface, metric *centralMetricEvent) - removeMetric(metric *centralMetricEvent) + updateMetric(cachedMetric cachedMetricInterface, metric *centralMetric) + removeMetric(metric *centralMetric) save() } @@ -161,20 +162,41 @@ func (c *cacheStorage) loadMetrics(storageCache cache.Cache) { var cm cachedMetric json.Unmarshal(buffer, &cm) - var metric *centralMetricEvent - for _, duration := range cm.Values { - unitID := "" - if cm.Unit != nil { - unitID = cm.Unit.ID + apiDetails := models.APIDetails{} + if cm.API != nil { + apiDetails.ID = cm.API.ID + apiDetails.Name = cm.API.Name + } + appDetails := models.AppDetails{} + if cm.App != nil { + appDetails.ID = cm.App.ID + appDetails.ConsumerOrgID = cm.App.ConsumerOrgID + } + + if len(cm.Values) == 0 { + if cm.Unit == nil { + continue } - metricDetail := Detail{ - APIDetails: *cm.API, - AppDetails: *cm.App, - UnitName: unitID, + + c.collector.AddCustomMetricDetail(CustomMetricDetail{ + APIDetails: apiDetails, + AppDetails: appDetails, + UnitDetails: models.Unit{ + Name: cm.Unit.Name, + }, + Count: cm.Count, + }) + continue + } + + var metric *centralMetric + for _, duration := range cm.Values { + metric = c.collector.createOrUpdateHistogram(Detail{ + APIDetails: apiDetails, + AppDetails: appDetails, StatusCode: cm.StatusCode, Duration: duration, - } - metric = c.collector.createOrUpdateMetric(metricDetail) + }) } newKey := metric.getKey() @@ -184,14 +206,11 @@ func (c *cacheStorage) loadMetrics(storageCache cache.Cache) { c.storageLock.Unlock() } storageCache.Set(newKey, cm) - if metric != nil { - metric.StartTime = cm.StartTime - } } } } -func (c *cacheStorage) updateMetric(cached cachedMetricInterface, metric *centralMetricEvent) { +func (c *cacheStorage) updateMetric(cached cachedMetricInterface, metric *centralMetric) { if !c.isInitialized { return } @@ -199,10 +218,10 @@ func (c *cacheStorage) updateMetric(cached cachedMetricInterface, metric *centra c.storageLock.Lock() defer c.storageLock.Unlock() - c.storage.Set(metric.getKey(), metric.createdCachedMetric(cached)) + c.storage.Set(metric.getKey(), metric.createCachedMetric(cached)) } -func (c *cacheStorage) removeMetric(metric *centralMetricEvent) { +func (c *cacheStorage) removeMetric(metric *centralMetric) { if !c.isInitialized { return } diff --git a/pkg/transaction/metric/centralmetric.go b/pkg/transaction/metric/centralmetric.go index 0a9394133..ffcca4557 100644 --- a/pkg/transaction/metric/centralmetric.go +++ b/pkg/transaction/metric/centralmetric.go @@ -1,92 +1,132 @@ package metric import ( + "fmt" "strings" + "sync" "time" "github.com/Axway/agent-sdk/pkg/transaction/models" + "github.com/rcrowley/go-metrics" "github.com/sirupsen/logrus" ) -// metricInfo - the base object holding the metricInfo -type metricInfo struct { - Subscription *models.Subscription `json:"subscription,omitempty"` - App *models.AppDetails `json:"app,omitempty"` - Product *models.Product `json:"product,omitempty"` - API *models.APIDetails `json:"api,omitempty"` - AssetResource *models.AssetResource `json:"assetResource,omitempty"` - ProductPlan *models.ProductPlan `json:"productPlan,omitempty"` - Quota *models.Quota `json:"quota,omitempty"` - Unit *models.Unit `json:"unit,omitempty"` - StatusCode string `json:"statusCode,omitempty"` +type groupedMetrics struct { + lock *sync.Mutex + counters map[string]metrics.Counter + histograms map[string]metrics.Histogram } -// centralMetricEvent - the event that is actually sent to platform -type centralMetricEvent struct { - metricInfo - Status string `json:"status,omitempty"` - Count int64 `json:"count"` - Response *ResponseMetrics `json:"response,omitempty"` - Observation *ObservationDetails `json:"observation"` - EventID string `json:"-"` - StartTime time.Time `json:"-"` +func newGroupedMetric() groupedMetrics { + return groupedMetrics{ + lock: &sync.Mutex{}, + counters: make(map[string]metrics.Counter), + histograms: make(map[string]metrics.Histogram), + } +} + +func (g groupedMetrics) getOrCreateHistogram(key string) metrics.Histogram { + g.lock.Lock() + defer g.lock.Unlock() + + if _, ok := g.histograms[key]; !ok { + sampler := metrics.NewUniformSample(2048) + g.histograms[key] = metrics.NewHistogram(sampler) + } + return g.histograms[key] +} + +func (g groupedMetrics) getOrCreateCounter(key string) metrics.Counter { + g.lock.Lock() + defer g.lock.Unlock() + + if _, ok := g.counters[key]; !ok { + g.counters[key] = metrics.NewCounter() + } + return g.counters[key] +} + +type centralMetric struct { + Subscription *models.ResourceReference `json:"subscription,omitempty"` + App *models.ApplicationResourceReference `json:"app,omitempty"` + Product *models.ProductResourceReference `json:"product,omitempty"` + API *models.APIResourceReference `json:"api,omitempty"` + AssetResource *models.ResourceReference `json:"assetResource,omitempty"` + ProductPlan *models.ResourceReference `json:"productPlan,omitempty"` + Units *Units `json:"units,omitempty"` + Reporter *Reporter `json:"reporter,omitempty"` + Observation *ObservationDetails `json:"-"` + EventID string `json:"-"` } // GetStartTime - Returns the start time for subscription metric -func (a *centralMetricEvent) GetStartTime() time.Time { - return a.StartTime +func (a *centralMetric) GetStartTime() time.Time { + return time.UnixMilli(a.Observation.Start) } // GetType - Returns APIMetric -func (a *centralMetricEvent) GetType() string { +func (a *centralMetric) GetType() string { return "APIMetric" } // GetType - Returns APIMetric -func (a *centralMetricEvent) GetEventID() string { +func (a *centralMetric) GetEventID() string { return a.EventID } -func (a *centralMetricEvent) GetLogFields() logrus.Fields { +func (a *centralMetric) GetLogFields() logrus.Fields { fields := logrus.Fields{ "id": a.EventID, - "count": a.Count, - "status": a.StatusCode, - "minResponse": a.Response.Min, - "maxResponse": a.Response.Max, - "avgResponse": a.Response.Avg, "startTimestamp": a.Observation.Start, "endTimestamp": a.Observation.End, } if a.Subscription != nil { - fields = a.Subscription.GetLogFields(fields) + fields = a.Subscription.GetLogFields(fields, "subscriptionID") } if a.App != nil { - fields = a.App.GetLogFields(fields) + fields = a.App.GetLogFields(fields, "applicationID") } if a.Product != nil { - fields = a.Product.GetLogFields(fields) + fields = a.Product.GetLogFields(fields, "productID") } if a.API != nil { - fields = a.API.GetLogFields(fields) + fields = a.API.GetLogFields(fields, "apiID") } if a.AssetResource != nil { - fields = a.AssetResource.GetLogFields(fields) + fields = a.AssetResource.GetLogFields(fields, "assetResourceID") } if a.ProductPlan != nil { - fields = a.ProductPlan.GetLogFields(fields) + fields = a.ProductPlan.GetLogFields(fields, "productPlanID") + } + + // add transaction unit info and custom units if they exist + if a.Units == nil { + return fields } - if a.Quota != nil { - fields = a.Quota.GetLogFields(fields) + if a.Units.Transactions != nil { + if a.Units.Transactions.Quota != nil { + fields = a.Units.Transactions.Quota.GetLogFields(fields, "transactionQuotaID") + } + fields["transactionCount"] = a.Units.Transactions.Count + fields["status"] = a.Units.Transactions.Status + fields["minResponse"] = a.Units.Transactions.Response.Min + fields["maxResponse"] = a.Units.Transactions.Response.Max + fields["avgResponse"] = a.Units.Transactions.Response.Avg } - if a.Unit != nil { - fields = a.Unit.GetLogFields(fields) + if len(a.Units.CustomUnits) == 0 { + return fields + } + for k, u := range a.Units.CustomUnits { + if u.Quota != nil { + fields = u.Quota.GetLogFields(fields, fmt.Sprintf("%sQuotaID", k)) + } + fields[fmt.Sprintf("%sCount", k)] = u.Count } return fields } // getKey - returns the cache key for the metric -func (a *centralMetricEvent) getKey() string { +func (a *centralMetric) getKey() string { subID := unknown if a.Subscription != nil { subID = a.Subscription.ID @@ -100,20 +140,47 @@ func (a *centralMetricEvent) getKey() string { apiID = a.API.ID } uniqueKey := unknown - if a.StatusCode != "" { - uniqueKey = a.StatusCode - } else if a.Unit != nil { - uniqueKey = a.Unit.ID + if a.Units != nil && a.Units.Transactions != nil && a.Units.Transactions.Status != "" { + uniqueKey = a.Units.Transactions.Status + } else { + // get the first, and should be only, custom unit name + for k := range a.Units.CustomUnits { + uniqueKey = k + break + } } return strings.Join([]string{metricKeyPrefix, subID, appID, apiID, uniqueKey}, ".") } -func (a *centralMetricEvent) createdCachedMetric(cached cachedMetricInterface) cachedMetric { - return cachedMetric{ - metricInfo: a.metricInfo, - StartTime: a.StartTime, - Count: cached.Count(), - Values: cached.Values(), +// getKey - returns the cache key for the metric +func (a *centralMetric) getKeyParts() (string, string, string, string) { + key := a.getKey() + parts := strings.Split(key, ".") + return parts[1], parts[2], parts[3], parts[4] +} + +func (a *centralMetric) createCachedMetric(cached cachedMetricInterface) cachedMetric { + cacheM := cachedMetric{ + Subscription: a.Subscription, + App: a.App, + Product: a.Product, + API: a.API, + AssetResource: a.AssetResource, + ProductPlan: a.ProductPlan, + Count: cached.Count(), + Values: cached.Values(), + } + + if a.Units.Transactions != nil { + cacheM.Quota = a.Units.Transactions.Quota + cacheM.StatusCode = a.Units.Transactions.Status + } else { + for u := range a.Units.CustomUnits { + cacheM.Unit = &models.Unit{ + Name: u, + } + } } + return cacheM } diff --git a/pkg/transaction/metric/definition.go b/pkg/transaction/metric/definition.go index c6ad4bcca..85a311ce4 100644 --- a/pkg/transaction/metric/definition.go +++ b/pkg/transaction/metric/definition.go @@ -75,10 +75,17 @@ type ObservationDetails struct { // cachedMetric - struct to hold metric specific that gets cached and used for agent recovery type cachedMetric struct { - metricInfo - Count int64 `json:"count"` - Values []int64 `json:"values,omitempty"` - StartTime time.Time `json:"startTime"` + Subscription *models.ResourceReference `json:"subscription,omitempty"` + App *models.ApplicationResourceReference `json:"app,omitempty"` + Product *models.ProductResourceReference `json:"product,omitempty"` + API *models.APIResourceReference `json:"api,omitempty"` + AssetResource *models.ResourceReference `json:"assetResource,omitempty"` + ProductPlan *models.ResourceReference `json:"productPlan,omitempty"` + Quota *models.ResourceReference `json:"quota,omitempty"` + Unit *models.Unit `json:"unit,omitempty"` + StatusCode string `json:"statusCode,omitempty"` + Count int64 `json:"count"` + Values []int64 `json:"values,omitempty"` } // V4EventDistribution - represents V4 distribution @@ -184,3 +191,11 @@ func (t ISO8601Time) MarshalJSON() ([]byte, error) { b = append(b, '"') return b, nil } + +type Reporter struct { + AgentVersion string `json:"agentVersion,omitempty"` + AgentType string `json:"agentType,omitempty"` + AgentSDKVersion string `json:"agentSDKVersion,omitempty"` + AgentName string `json:"agentName,omitempty"` + ObservationDelta int64 `json:"observationDelta,omitempty"` +} diff --git a/pkg/transaction/metric/metricbatch.go b/pkg/transaction/metric/metricbatch.go index c362a21f9..3822ca5f3 100644 --- a/pkg/transaction/metric/metricbatch.go +++ b/pkg/transaction/metric/metricbatch.go @@ -3,6 +3,7 @@ package metric import ( "context" "encoding/json" + "time" "github.com/Axway/agent-sdk/pkg/traceability" beatPub "github.com/elastic/beats/v7/libbeat/publisher" @@ -11,20 +12,28 @@ import ( const cancelMsg = "event cancelled, counts added at next publish" +type eventMetric struct { + histogram metrics.Histogram + counters map[string]metrics.Counter +} + // EventBatch - creates a batch of MetricEvents to send to Condor type EventBatch struct { beatPub.Batch events []beatPub.Event - histograms map[string]metrics.Histogram + batchMetrics map[string]eventMetric collector *collector haveBatchLock bool } // AddEvent - adds an event to the batch -func (b *EventBatch) AddEvent(event beatPub.Event, histogram metrics.Histogram) { +func (b *EventBatch) AddEvent(event beatPub.Event, histogram metrics.Histogram, counters map[string]metrics.Counter) { b.events = append(b.events, event) eventID := event.Content.Meta[metricKey].(string) - b.histograms[eventID] = histogram + b.batchMetrics[eventID] = eventMetric{ + histogram: histogram, + counters: counters, + } } // AddEvent - adds an event to the batch @@ -84,7 +93,7 @@ func (b *EventBatch) Events() []beatPub.Event { // ACK - all events have been acknowledgeded, cleanup the counters func (b *EventBatch) ACK() { b.ackEvents(b.events) - b.collector.metricStartTime = b.collector.metricEndTime + b.collector.metricStartTime = time.Time{} b.batchUnlock() } @@ -137,12 +146,12 @@ func (b *EventBatch) ackEvents(events []beatPub.Event) { continue } b.collector.logMetric("published", metric) - histogram, found := b.histograms[metric.EventID] - if !found { + + if eventMetric, ok := b.batchMetrics[metric.EventID]; ok { + b.collector.cleanupMetricCounters(eventMetric.histogram, eventMetric.counters, metric) + } else { b.collector.metricLogger.WithField("eventID", metric.EventID).Warn("could not clean cached metric") - continue } - b.collector.cleanupMetricCounter(histogram, metric) } } @@ -150,7 +159,7 @@ func (b *EventBatch) ackEvents(events []beatPub.Event) { func NewEventBatch(c *collector) *EventBatch { return &EventBatch{ collector: c, - histograms: make(map[string]metrics.Histogram), + batchMetrics: make(map[string]eventMetric), haveBatchLock: false, } } @@ -183,7 +192,7 @@ func getEventsToAck(retryEvents []beatPub.Event, events []beatPub.Event) []beatP return ackEvents } -func getMetricFromEvent(event beatPub.Event) *APIMetric { +func getMetricFromEvent(event beatPub.Event) *centralMetric { if data, found := event.Content.Fields[messageKey]; found { v4Bytes := data.(string) v4Event := make(map[string]interface{}) @@ -206,7 +215,7 @@ func getMetricFromEvent(event beatPub.Event) *APIMetric { if err != nil { return nil } - metric := &APIMetric{} + metric := ¢ralMetric{} err = json.Unmarshal(buf, metric) if err != nil { return nil diff --git a/pkg/transaction/metric/metricevent.go b/pkg/transaction/metric/metricevent.go index a9d8be9a1..caaba5ac2 100644 --- a/pkg/transaction/metric/metricevent.go +++ b/pkg/transaction/metric/metricevent.go @@ -22,7 +22,7 @@ type CondorMetricEvent struct { } // AddCondorMetricEventToBatch - creates the condor metric event and adds to the batch -func AddCondorMetricEventToBatch(metricEvent V4Event, batch *EventBatch, histogram metrics.Histogram) error { +func AddCondorMetricEventToBatch(metricEvent V4Event, batch *EventBatch, histogram metrics.Histogram, counters map[string]metrics.Counter) error { metricData, _ := json.Marshal(metricEvent) cme := &CondorMetricEvent{ @@ -35,7 +35,7 @@ func AddCondorMetricEventToBatch(metricEvent V4Event, batch *EventBatch, histogr if err != nil { return err } - batch.AddEvent(event, histogram) + batch.AddEvent(event, histogram, counters) return nil } diff --git a/pkg/transaction/metric/metricscollector.go b/pkg/transaction/metric/metricscollector.go index 3a43c453c..9a87451f4 100644 --- a/pkg/transaction/metric/metricscollector.go +++ b/pkg/transaction/metric/metricscollector.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" "os" - "strconv" "strings" "sync" "time" @@ -30,9 +29,9 @@ import ( ) const ( - startTimestampStr = "start-timestamp" - endTimestampStr = "end-timestamp" - eventTypeStr = "event-type" + startTimestampStr = "startTimestamp" + endTimestampStr = "endTimestamp" + eventTypeStr = "eventType" usageStr = "usage" metricStr = "metric" volumeStr = "volume" @@ -68,11 +67,12 @@ type collector struct { metricStartTime time.Time metricEndTime time.Time orgGUID string + agentName string lock *sync.Mutex batchLock *sync.Mutex - registry metrics.Registry + registry registry metricBatch *EventBatch - metricMap map[string]map[string]map[string]map[string]*centralMetricEvent + metricMap map[string]map[string]map[string]map[string]*centralMetric metricMapLock *sync.Mutex publishItemQueue []publishQueueItem jobID string @@ -158,16 +158,17 @@ func createMetricCollector() Collector { metricCollector := &collector{ // Set the initial start time to be minimum 1m behind, so that the job can generate valid event // if any usage event are to be generated on startup - usageStartTime: now().Add(-1 * time.Minute), - metricStartTime: now().Add(-1 * time.Minute), + usageStartTime: now().Truncate(time.Minute), // round down to closest minute + metricStartTime: now().Truncate(time.Minute), // round down to closest minute lock: &sync.Mutex{}, batchLock: &sync.Mutex{}, metricMapLock: &sync.Mutex{}, - registry: metrics.NewRegistry(), - metricMap: make(map[string]map[string]map[string]map[string]*centralMetricEvent), + registry: newRegistry(), + metricMap: make(map[string]map[string]map[string]map[string]*centralMetric), publishItemQueue: make([]publishQueueItem, 0), metricConfig: agent.GetCentralConfig().GetMetricReportingConfig(), usageConfig: agent.GetCentralConfig().GetUsageReportingConfig(), + agentName: agent.GetCentralConfig().GetAgentName(), logger: logger, metricLogger: log.NewMetricFieldLogger(), } @@ -249,12 +250,19 @@ func (c *collector) InitializeBatch() { c.metricBatch = NewEventBatch(c) } +func (c *collector) updateStartTime() { + if c.metricStartTime.IsZero() { + c.metricStartTime = now().Truncate(time.Minute) + } +} + // AddMetric - add metric for API transaction to collection func (c *collector) AddMetric(apiDetails models.APIDetails, statusCode string, duration, bytes int64, appName string) { c.lock.Lock() defer c.lock.Unlock() c.batchLock.Lock() defer c.batchLock.Unlock() + c.updateStartTime() c.updateUsage(1) c.updateVolume(bytes) } @@ -262,7 +270,7 @@ func (c *collector) AddMetric(apiDetails models.APIDetails, statusCode string, d // AddMetricDetail - add metric for API transaction and consumer subscription to collection func (c *collector) AddMetricDetail(metricDetail Detail) { c.AddMetric(metricDetail.APIDetails, metricDetail.StatusCode, metricDetail.Duration, metricDetail.Bytes, metricDetail.APIDetails.Name) - c.createOrUpdateMetric(metricDetail) + c.createOrUpdateHistogram(metricDetail) } // AddAPIMetricDetail - add metric details for several response codes and transactions @@ -277,14 +285,13 @@ func (c *collector) AddAPIMetricDetail(detail MetricDetail) { Status: detail.StatusCode, } newMetric := c.createMetric(transactionCtx) + // update the new metric with all the necessary details - newMetric.Count = detail.Count - newMetric.Response = &detail.Response - newMetric.StartTime = time.UnixMilli(detail.Observation.Start) + newMetric.Units.Transactions.Count = detail.Count + newMetric.Units.Transactions.Response = &detail.Response newMetric.Observation = &detail.Observation - newMetric.StatusCode = detail.StatusCode - newMetric.Status = c.getStatusText(detail.StatusCode) + c.updateStartTime() c.addMetric(newMetric) } @@ -293,11 +300,14 @@ func (c *collector) AddCustomMetricDetail(detail CustomMetricDetail) { if !c.metricConfig.CanPublish() || c.usageConfig.IsOfflineMode() { return } + c.lock.Lock() + defer c.lock.Unlock() + c.batchLock.Lock() + defer c.batchLock.Unlock() logger := c.logger.WithField("handler", "customMetric"). WithField("apiID", detail.APIDetails.ID). WithField("appID", detail.AppDetails.ID). - WithField("unitID", detail.UnitDetails.ID). WithField("unitName", detail.UnitDetails.Name) if detail.APIDetails.ID == "" { @@ -310,7 +320,7 @@ func (c *collector) AddCustomMetricDetail(detail CustomMetricDetail) { return } - if detail.UnitDetails.ID == "" || detail.UnitDetails.Name == "" { + if detail.UnitDetails.Name == "" { logger.Error("custom units require Unit information") return } @@ -319,12 +329,10 @@ func (c *collector) AddCustomMetricDetail(detail CustomMetricDetail) { transactionCtx := transactionContext{ APIDetails: detail.APIDetails, AppDetails: detail.AppDetails, - UnitName: detail.UnitDetails.ID, + UnitName: detail.UnitDetails.Name, } metric := c.createMetric(transactionCtx) - metric.StartTime = time.UnixMilli(detail.Observation.Start) - metric.Observation = &detail.Observation if m := c.getExistingMetric(metric); m != nil { // use the cached metric @@ -332,25 +340,26 @@ func (c *collector) AddCustomMetricDetail(detail CustomMetricDetail) { } // add the count - metric.Count += detail.Count + metric.Units.CustomUnits[detail.UnitDetails.Name].Count += detail.Count - counter := c.getOrRegisterCounter(metric.getKey()) + counter := c.getOrRegisterGroupedCounter(metric.getKey()) counter.Inc(detail.Count) + c.updateStartTime() c.updateMetricWithCachedMetric(metric, newCustomCounter(counter)) } // AddAPIMetric - add api metric for API transaction func (c *collector) AddAPIMetric(metric *APIMetric) { + c.updateStartTime() c.addMetric(centralMetricFromAPIMetric(metric)) } // addMetric - add central metric event -func (c *collector) addMetric(metric *centralMetricEvent) { +func (c *collector) addMetric(metric *centralMetric) { if metric.EventID == "" { metric.EventID = uuid.NewString() } - metric.Status = c.getStatusText(metric.StatusCode) v4Event := c.createV4Event(metric.Observation.Start, metric) metricData, _ := json.Marshal(v4Event) @@ -363,7 +372,6 @@ func (c *collector) addMetric(metric *centralMetricEvent) { if err != nil { return } - c.updateUsage(metric.Count) c.metricBatch.AddEventWithoutHistogram(pubEvent) } @@ -391,34 +399,47 @@ func (c *collector) updateUsage(count int64) { c.storage.updateUsage(int(transactionCount.Count())) } -func (c *collector) createMetric(detail transactionContext) *centralMetricEvent { +func (c *collector) createMetric(detail transactionContext) *centralMetric { // Go get the access request and managed app accessRequest, managedApp := c.getAccessRequestAndManagedApp(agent.GetCacheManager(), detail) - cme := ¢ralMetricEvent{ - metricInfo: metricInfo{ - Subscription: c.createSubscriptionDetail(accessRequest), - App: c.createAppDetail(managedApp), - Product: c.getProduct(accessRequest), - API: c.createAPIDetail(detail.APIDetails), - AssetResource: c.getAssetResource(accessRequest), - ProductPlan: c.getProductPlan(accessRequest), - Quota: c.getQuota(accessRequest, detail.UnitName), - Unit: c.getProductPlanUnit(accessRequest, detail.UnitName), + me := ¢ralMetric{ + Subscription: c.createSubscriptionDetail(accessRequest), + App: c.createAppDetail(managedApp), + Product: c.getProduct(accessRequest), + API: c.createAPIDetail(detail.APIDetails), + AssetResource: c.getAssetResource(accessRequest), + ProductPlan: c.getProductPlan(accessRequest), + Observation: &ObservationDetails{ + Start: now().Unix(), }, - StartTime: now(), - EventID: uuid.NewString(), + EventID: uuid.NewString(), } + // transactions if detail.Status != "" { - cme.StatusCode = detail.Status - cme.Status = c.getStatusText(detail.Status) + me.Units = &Units{ + Transactions: &Transactions{ + UnitCount: UnitCount{ + Quota: c.getQuota(accessRequest, ""), + }, + Status: c.getStatusText(detail.Status), + }, + } + } else if detail.UnitName != "" { + me.Units = &Units{ + CustomUnits: map[string]*UnitCount{ + detail.UnitName: { + Quota: c.getQuota(accessRequest, detail.UnitName), + }, + }, + } } - return cme + return me } -func (c *collector) createOrUpdateMetric(detail Detail) *centralMetricEvent { +func (c *collector) createOrUpdateHistogram(detail Detail) *centralMetric { if !c.metricConfig.CanPublish() || c.usageConfig.IsOfflineMode() { return nil // no need to update metrics with publish off } @@ -432,13 +453,13 @@ func (c *collector) createOrUpdateMetric(detail Detail) *centralMetricEvent { metric := c.createMetric(transactionCtx) - histogram := c.getOrRegisterHistogram(metric.getKey()) + histogram := c.getOrRegisterGroupedHistogram(metric.getKey()) histogram.Update(detail.Duration) return c.updateMetricWithCachedMetric(metric, newCustomHistogram(histogram)) } -func (c *collector) getExistingMetric(metric *centralMetricEvent) *centralMetricEvent { +func (c *collector) getExistingMetric(metric *centralMetric) *centralMetric { keyParts := strings.Split(metric.getKey(), ".") c.metricMapLock.Lock() @@ -459,20 +480,20 @@ func (c *collector) getExistingMetric(metric *centralMetricEvent) *centralMetric return c.metricMap[keyParts[1]][keyParts[2]][keyParts[3]][keyParts[4]] } -func (c *collector) updateMetricWithCachedMetric(metric *centralMetricEvent, cached cachedMetricInterface) *centralMetricEvent { +func (c *collector) updateMetricWithCachedMetric(metric *centralMetric, cached cachedMetricInterface) *centralMetric { keyParts := strings.Split(metric.getKey(), ".") c.metricMapLock.Lock() defer c.metricMapLock.Unlock() if _, ok := c.metricMap[keyParts[1]]; !ok { - c.metricMap[keyParts[1]] = make(map[string]map[string]map[string]*centralMetricEvent) + c.metricMap[keyParts[1]] = make(map[string]map[string]map[string]*centralMetric) } if _, ok := c.metricMap[keyParts[1]][keyParts[2]]; !ok { - c.metricMap[keyParts[1]][keyParts[2]] = make(map[string]map[string]*centralMetricEvent) + c.metricMap[keyParts[1]][keyParts[2]] = make(map[string]map[string]*centralMetric) } if _, ok := c.metricMap[keyParts[1]][keyParts[2]][keyParts[3]]; !ok { - c.metricMap[keyParts[1]][keyParts[2]][keyParts[3]] = make(map[string]*centralMetricEvent) + c.metricMap[keyParts[1]][keyParts[2]][keyParts[3]] = make(map[string]*centralMetric) } if _, ok := c.metricMap[keyParts[1]][keyParts[2]][keyParts[3]][keyParts[4]]; !ok { // First api metric for sub+app+api+statuscode, @@ -486,7 +507,7 @@ func (c *collector) updateMetricWithCachedMetric(metric *centralMetricEvent, cac // getAccessRequest - func (c *collector) getAccessRequestAndManagedApp(cacheManager cache.Manager, detail transactionContext) (*management.AccessRequest, *v1.ResourceInstance) { - if detail.AppDetails.Name == "" { + if detail.AppDetails.Name == "" && detail.AppDetails.ID == "" { return nil, nil } @@ -496,7 +517,14 @@ func (c *collector) getAccessRequestAndManagedApp(cacheManager cache.Manager, de Trace("metric collector information") // get the managed application - managedApp := cacheManager.GetManagedApplicationByName(detail.AppDetails.Name) + // cached metrics will only have the catalog api id + managedApp := cacheManager.GetManagedApplicationByApplicationID(detail.AppDetails.ID) + if managedApp == nil { + managedApp = cacheManager.GetManagedApplication(detail.AppDetails.ID) + } + if managedApp == nil { + managedApp = cacheManager.GetManagedApplicationByName(detail.AppDetails.Name) + } if managedApp == nil { c.logger. WithField("appName", detail.AppDetails.Name). @@ -525,30 +553,23 @@ func (c *collector) getAccessRequestAndManagedApp(cacheManager cache.Manager, de return accessRequest, managedApp } -func (c *collector) createSubscriptionDetail(accessRequest *management.AccessRequest) *models.Subscription { +func (c *collector) createSubscriptionDetail(accessRequest *management.AccessRequest) *models.ResourceReference { if accessRequest == nil { return nil } subRef := accessRequest.GetReferenceByGVK(catalog.SubscriptionGVK()) - if subRef.ID != "" { + if subRef.ID == "" { return nil } - return &models.Subscription{ - ID: subRef.ID, - Name: subRef.Name, + return &models.ResourceReference{ + ID: subRef.ID, } } -func (c *collector) createAppDetail(appRI *v1.ResourceInstance) *models.AppDetails { +func (c *collector) createAppDetail(appRI *v1.ResourceInstance) *models.ApplicationResourceReference { if appRI == nil { - - // TODO remove the following when product plan unit ready - // return &models.AppDetails{ - // ID: "app-id", - // Name: "app-name", - // } return nil } @@ -565,23 +586,29 @@ func (c *collector) createAppDetail(appRI *v1.ResourceInstance) *models.AppDetai return nil } - return &models.AppDetails{ - ID: appRef.ID, - Name: appRef.Name, + return &models.ApplicationResourceReference{ + ResourceReference: models.ResourceReference{ + ID: appRef.ID, + }, ConsumerOrgID: orgID, } } -func (c *collector) createAPIDetail(api models.APIDetails) *models.APIDetails { - return &models.APIDetails{ - ID: api.ID, - Name: api.Name, - Revision: api.Revision, - TeamID: api.TeamID, +func (c *collector) createAPIDetail(api models.APIDetails) *models.APIResourceReference { + ref := &models.APIResourceReference{ + ResourceReference: models.ResourceReference{ + ID: api.ID, + }, + Name: api.Name, } + svc := agent.GetCacheManager().GetAPIServiceWithAPIID(strings.TrimPrefix(api.ID, transutil.SummaryEventProxyIDPrefix)) + if svc != nil { + ref.APIServiceID = svc.Metadata.ID + } + return ref } -func (c *collector) getAssetResource(accessRequest *management.AccessRequest) *models.AssetResource { +func (c *collector) getAssetResource(accessRequest *management.AccessRequest) *models.ResourceReference { if accessRequest == nil { return nil } @@ -590,13 +617,13 @@ func (c *collector) getAssetResource(accessRequest *management.AccessRequest) *m if assetResourceRef.ID == "" { return nil } - return &models.AssetResource{ - ID: assetResourceRef.ID, - Name: assetResourceRef.Name, + + return &models.ResourceReference{ + ID: assetResourceRef.ID, } } -func (c *collector) getProduct(accessRequest *management.AccessRequest) *models.Product { +func (c *collector) getProduct(accessRequest *management.AccessRequest) *models.ProductResourceReference { if accessRequest == nil { return nil } @@ -608,15 +635,15 @@ func (c *collector) getProduct(accessRequest *management.AccessRequest) *models. return nil } - return &models.Product{ - ID: productRef.ID, - Name: productRef.Name, - VersionID: releaseRef.ID, - VersionName: releaseRef.Name, + return &models.ProductResourceReference{ + ResourceReference: models.ResourceReference{ + ID: productRef.ID, + }, + VersionID: releaseRef.ID, } } -func (c *collector) getProductPlan(accessRequest *management.AccessRequest) *models.ProductPlan { +func (c *collector) getProductPlan(accessRequest *management.AccessRequest) *models.ResourceReference { if accessRequest == nil { return nil } @@ -626,39 +653,60 @@ func (c *collector) getProductPlan(accessRequest *management.AccessRequest) *mod return nil } - return &models.ProductPlan{ + return &models.ResourceReference{ ID: productPlanRef.ID, } } -func (c *collector) getQuota(accessRequest *management.AccessRequest, id string) *models.Quota { +func (c *collector) getQuota(accessRequest *management.AccessRequest, unitName string) *models.ResourceReference { if accessRequest == nil { return nil } - quotaRef := accessRequest.GetReferenceByIDAndGVK(id, catalog.QuotaGVK()) - if quotaRef.ID == "" { + if unitName == "" && accessRequest.Spec.Quota == nil { + // no quota on transactions return nil } - return &models.Quota{ - ID: quotaRef.ID, + quotaName := "" + if unitName == "" && accessRequest.Spec.Quota != nil { + // get transactions quota + for _, r := range accessRequest.References { + rMap := r.(map[string]interface{}) + if rMap["kind"].(string) != catalog.QuotaGVK().Kind { + continue + } + if _, ok := rMap["unit"]; !ok { + // no unit is transactions + quotaName = strings.Split(rMap["name"].(string), "/")[2] + break + } + } + } else { + // get custom unit quota + for _, r := range accessRequest.References { + rMap := r.(map[string]interface{}) + if rMap["kind"].(string) != catalog.QuotaGVK().Kind { + continue + } + if unit, ok := rMap["unit"]; ok && unitName == unit.(string) { + // no unit is transactions + quotaName = strings.Split(rMap["name"].(string), "/")[2] + break + } + } } -} - -func (c *collector) getProductPlanUnit(accessRequest *management.AccessRequest, name string) *models.Unit { - if accessRequest == nil { + if quotaName == "" { return nil } - unitRef := accessRequest.GetReferenceByNameAndGVK(name, catalog.ProductPlanUnitGVK()) - if unitRef.ID == "" { + quotaRef := accessRequest.GetReferenceByNameAndGVK(quotaName, catalog.QuotaGVK()) + if quotaRef.ID == "" { return nil } - return &models.Unit{ - ID: unitRef.ID, - Name: unitRef.Name, + return &models.ResourceReference{ + ID: quotaRef.ID, } } @@ -792,40 +840,89 @@ func (c *collector) generateUsageEvent(orgGUID string) { c.publishItemQueue = append(c.publishItemQueue, queueItem) } -func (c *collector) processMetric(metricName string, metric interface{}) { +func (c *collector) processMetric(metricName string, groupedMetric interface{}) { c.metricMapLock.Lock() defer c.metricMapLock.Unlock() elements := strings.Split(metricName, ".") - if len(elements) == 5 { + if len(elements) == 4 { subscriptionID := elements[1] appID := elements[2] apiID := strings.ReplaceAll(elements[3], "#", ".") - statusCode := elements[4] if appMap, ok := c.metricMap[subscriptionID]; ok { if apiMap, ok := appMap[appID]; ok { - if statusMap, ok := apiMap[apiID]; ok { - if statusDetail, ok := statusMap[statusCode]; ok { - statusMetric := (metric.(metrics.Histogram)) - c.setMetricsFromHistogram(statusDetail, statusMetric) - c.generateMetricEvent(statusMetric, statusDetail) - } + if groupMap, ok := apiMap[apiID]; ok { + logger := c.logger.WithField("subscriptionID", subscriptionID).WithField("applicationID", appID).WithField("apiID", apiID) + c.handleGroupedMetric(logger, groupedMetric, groupMap) } } } } } -func (c *collector) setMetricsFromHistogram(metrics *centralMetricEvent, histogram metrics.Histogram) { - metrics.Count = histogram.Count() - metrics.Response = &ResponseMetrics{ +func (c *collector) handleGroupedMetric(logger log.FieldLogger, groupedMetricInterface interface{}, groupMap map[string]*centralMetric) { + groupedMetric, ok := groupedMetricInterface.(groupedMetrics) + if !ok { + logger.Error("metric data to process was not the expected type") + return + } + + countersAdded := false + // handle each histogram, on the first one add the counter information + for k, histo := range groupedMetric.histograms { + logger := logger.WithField("status", k) + metric, ok := groupMap[k] + if !ok { + logger.Error("no metrics in map for status") + continue + } + c.setMetricsFromHistogram(metric, histo) + var counters map[string]metrics.Counter + if !countersAdded { + c.setMetricCounters(logger, metric, groupedMetric.counters, groupMap) + counters = groupedMetric.counters + countersAdded = true + } + c.generateMetricEvent(histo, counters, metric) + } +} + +func (c *collector) setMetricCounters(logger log.FieldLogger, metricData *centralMetric, counters map[string]metrics.Counter, groupMap map[string]*centralMetric) { + metricData.Units.CustomUnits = map[string]*UnitCount{} + + for k, counter := range counters { + logger := logger.WithField("unit", k) + metric, ok := groupMap[k] + if !ok { + logger.Error("no counter in map for unit") + continue + } + + // create a new quota pointer + var quota *models.ResourceReference + if metric.Units.CustomUnits[k].Quota != nil { + quota = &models.ResourceReference{ + ID: metric.Units.CustomUnits[k].Quota.ID, + } + } + + metricData.Units.CustomUnits[k] = &UnitCount{ + Count: counter.Count(), + Quota: quota, + } + } +} + +func (c *collector) setMetricsFromHistogram(metrics *centralMetric, histogram metrics.Histogram) { + metrics.Units.Transactions.Count = histogram.Count() + metrics.Units.Transactions.Response = &ResponseMetrics{ Max: histogram.Max(), Min: histogram.Min(), Avg: histogram.Mean(), } } -func (c *collector) generateMetricEvent(histogram metrics.Histogram, metric *centralMetricEvent) { - if metric.Count == 0 { +func (c *collector) generateMetricEvent(histogram metrics.Histogram, counters map[string]metrics.Counter, metric *centralMetric) { + if metric.Units != nil && metric.Units.Transactions != nil && metric.Units.Transactions.Count == 0 { c.logger.Trace("skipping registry entry with no reported quantity") return } @@ -833,9 +930,16 @@ func (c *collector) generateMetricEvent(histogram metrics.Histogram, metric *cen Start: util.ConvertTimeToMillis(c.metricStartTime), End: util.ConvertTimeToMillis(c.metricEndTime), } + metric.Reporter = &Reporter{ + AgentVersion: cmd.BuildVersion, + AgentType: cmd.BuildAgentName, + AgentSDKVersion: cmd.SDKBuildVersion, + AgentName: c.agentName, + ObservationDelta: metric.Observation.End - metric.Observation.Start, + } // Generate app subscription metric - c.generateV4Event(histogram, metric) + c.generateV4Event(histogram, counters, metric) } func (c *collector) createV4Event(startTime int64, v4data V4Data) V4Event { @@ -853,10 +957,10 @@ func (c *collector) createV4Event(startTime int64, v4data V4Data) V4Event { } } -func (c *collector) generateV4Event(histogram metrics.Histogram, v4data V4Data) { +func (c *collector) generateV4Event(histogram metrics.Histogram, counters map[string]metrics.Counter, v4data V4Data) { generatedEvent := c.createV4Event(c.metricStartTime.UnixMilli(), v4data) c.metricLogger.WithFields(generatedEvent.getLogFields()).Info("generated") - AddCondorMetricEventToBatch(generatedEvent, c.metricBatch, histogram) + AddCondorMetricEventToBatch(generatedEvent, c.metricBatch, histogram, counters) } func (c *collector) getOrRegisterCounter(name string) metrics.Counter { @@ -868,14 +972,27 @@ func (c *collector) getOrRegisterCounter(name string) metrics.Counter { return counter.(metrics.Counter) } -func (c *collector) getOrRegisterHistogram(name string) metrics.Histogram { - histogram := c.registry.Get(name) - if histogram == nil { - sampler := metrics.NewUniformSample(2048) - histogram = metrics.NewHistogram(sampler) - c.registry.Register(name, histogram) +func (c *collector) getOrRegisterGroupedMetrics(name string) groupedMetrics { + group := c.registry.Get(name) + if group == nil { + group = newGroupedMetric() + c.registry.Register(name, group) } - return histogram.(metrics.Histogram) + return group.(groupedMetrics) +} + +func (c *collector) getOrRegisterGroupedCounter(name string) metrics.Counter { + groupKey, countKey := splitMetricKey(name) + groupedMetric := c.getOrRegisterGroupedMetrics(groupKey) + + return groupedMetric.getOrCreateCounter(countKey) +} + +func (c *collector) getOrRegisterGroupedHistogram(name string) metrics.Histogram { + groupKey, histoKey := splitMetricKey(name) + groupedMetric := c.getOrRegisterGroupedMetrics(groupKey) + + return groupedMetric.getOrCreateHistogram(histoKey) } func (c *collector) publishEvents() { @@ -924,23 +1041,27 @@ func (c *collector) cleanupUsageCounter(usageEventItem usageEventPublishItem) { } } -func (c *collector) logMetric(msg string, metric *APIMetric) { +func (c *collector) logMetric(msg string, metric *centralMetric) { c.metricLogger.WithField("id", metric.EventID).Info(msg) } -func (c *collector) cleanupMetricCounter(histogram metrics.Histogram, metric *APIMetric) { +func (c *collector) cleanupMetricCounters(histogram metrics.Histogram, counters map[string]metrics.Counter, metric *centralMetric) { c.metricMapLock.Lock() defer c.metricMapLock.Unlock() - subID := metric.Subscription.ID - appID := metric.App.ID - apiID := metric.API.ID - statusCode := metric.StatusCode + subID, appID, apiID, group := metric.getKeyParts() if consumerAppMap, ok := c.metricMap[subID]; ok { if apiMap, ok := consumerAppMap[appID]; ok { if apiStatusMap, ok := apiMap[apiID]; ok { - c.storage.removeMetric(apiStatusMap[statusCode]) - delete(c.metricMap[subID][appID][apiID], statusCode) + c.storage.removeMetric(apiStatusMap[group]) + delete(c.metricMap[subID][appID][apiID], group) histogram.Clear() + + // clean any counters, if needed + for k, counter := range counters { + c.storage.removeMetric(apiStatusMap[k]) + delete(c.metricMap[subID][appID][apiID], k) + counter.Clear() + } } if len(c.metricMap[subID][appID][apiID]) == 0 { delete(c.metricMap[subID][appID], apiID) @@ -956,18 +1077,10 @@ func (c *collector) cleanupMetricCounter(histogram metrics.Histogram, metric *AP c.logger. WithField(startTimestampStr, util.ConvertTimeToMillis(c.usageStartTime)). WithField(endTimestampStr, util.ConvertTimeToMillis(c.usageEndTime)). - WithField("api-name", metric.API.Name). + WithField("apiName", metric.API.Name). Info("Published metrics report for API") } func (c *collector) getStatusText(statusCode string) string { - httpStatusCode, _ := strconv.Atoi(statusCode) - switch { - case httpStatusCode >= 100 && httpStatusCode < 400: - return "Success" - case httpStatusCode >= 400 && httpStatusCode < 500: - return "Failure" - default: - return "Exception" - } + return getStatusFromCodeString(statusCode).String() } diff --git a/pkg/transaction/metric/metricscollector_test.go b/pkg/transaction/metric/metricscollector_test.go index eb2cd1fb0..f6ea4451a 100644 --- a/pkg/transaction/metric/metricscollector_test.go +++ b/pkg/transaction/metric/metricscollector_test.go @@ -487,7 +487,7 @@ func TestMetricCollector(t *testing.T) { } runTestHealthcheck() myCollector.InitializeBatch() - metricCollector.metricMap = make(map[string]map[string]map[string]map[string]*centralMetricEvent) + metricCollector.metricMap = make(map[string]map[string]map[string]map[string]*centralMetric) cfg.SetAxwayManaged(test.trackVolume) testClient := setupMockClient(test.retryBatchCount) mockClient := testClient.(*MockClient) @@ -936,7 +936,6 @@ func TestCustomMetrics(t *testing.T) { AppDetails: appDetails1, Count: 5, UnitDetails: models.Unit{ - ID: "unit-id", Name: "unit-name", }, } @@ -982,7 +981,7 @@ func TestCustomMetrics(t *testing.T) { if tc.skip { return } - metricCollector.metricMap = map[string]map[string]map[string]map[string]*centralMetricEvent{} + metricCollector.metricMap = map[string]map[string]map[string]map[string]*centralMetric{} metricCollector.AddCustomMetricDetail(tc.metricEvent1) if tc.metricEvent2.Count > 0 { metricCollector.AddCustomMetricDetail(tc.metricEvent2) diff --git a/pkg/transaction/metric/registry.go b/pkg/transaction/metric/registry.go new file mode 100644 index 000000000..736a4f0cd --- /dev/null +++ b/pkg/transaction/metric/registry.go @@ -0,0 +1,73 @@ +package metric + +import ( + "fmt" + "sync" + + "github.com/rcrowley/go-metrics" +) + +type registry interface { + Each(func(string, interface{})) + Get(string) interface{} + Register(string, interface{}) error +} + +// The metric registry which can store counters, histogram, or grouped metrics +type metricRegistry struct { + metrics map[string]interface{} + mutex sync.RWMutex +} + +func newRegistry() registry { + return &metricRegistry{metrics: make(map[string]interface{})} +} + +func (r *metricRegistry) Each(f func(string, interface{})) { + metrics := r.registered() + for i := range metrics { + kv := &metrics[i] + f(kv.name, kv.value) + } +} + +func (r *metricRegistry) Get(name string) interface{} { + r.mutex.RLock() + defer r.mutex.RUnlock() + return r.metrics[name] +} + +func (r *metricRegistry) Register(name string, i interface{}) error { + r.mutex.Lock() + defer r.mutex.Unlock() + return r.register(name, i) +} + +func (r *metricRegistry) register(name string, i interface{}) error { + if _, ok := r.metrics[name]; ok { + return fmt.Errorf("duplicate metric: %s", name) + } + switch i.(type) { + case metrics.Counter, metrics.Histogram, groupedMetrics: + r.metrics[name] = i + } + return nil +} + +type metricKV struct { + name string + value interface{} +} + +func (r *metricRegistry) registered() []metricKV { + r.mutex.RLock() + defer r.mutex.RUnlock() + metrics := make([]metricKV, 0, len(r.metrics)) + for name, i := range r.metrics { + metrics = append(metrics, metricKV{ + name: name, + value: i, + }) + } + return metrics +} diff --git a/pkg/transaction/metric/statustext.go b/pkg/transaction/metric/statustext.go new file mode 100644 index 000000000..2e5e13646 --- /dev/null +++ b/pkg/transaction/metric/statustext.go @@ -0,0 +1,41 @@ +package metric + +import "strconv" + +type statusText string + +const ( + Success statusText = "Success" + Failure statusText = "Failure" + Exception statusText = "Exception" +) + +var statuses = map[string]statusText{ + Success.String(): Success, + Failure.String(): Failure, + Exception.String(): Exception, +} + +func (s statusText) String() string { + return string(s) +} + +func getStatusFromCodeString(statusCode string) statusText { + if v, ok := statuses[statusCode]; ok { + return v + } + + httpStatusCode, _ := strconv.Atoi(statusCode) + return getStatusFromCode(httpStatusCode) +} + +func getStatusFromCode(statusCode int) statusText { + switch { + case statusCode >= 100 && statusCode < 400: + return Success + case statusCode >= 400 && statusCode < 500: + return Failure + default: + return Exception + } +} diff --git a/pkg/transaction/metric/units.go b/pkg/transaction/metric/units.go new file mode 100644 index 000000000..fc46c56c7 --- /dev/null +++ b/pkg/transaction/metric/units.go @@ -0,0 +1,38 @@ +package metric + +import ( + "encoding/json" + + "github.com/Axway/agent-sdk/pkg/transaction/models" +) + +type UnitCount struct { + Count int64 `json:"count"` + Quota *models.ResourceReference `json:"quota,omitempty"` +} + +type Transactions struct { + UnitCount + Response *ResponseMetrics `json:"response,omitempty"` + Status string `json:"status,omitempty"` +} + +type Units struct { + Transactions *Transactions `json:"transactions,omitempty"` + CustomUnits map[string]*UnitCount `json:"-"` +} + +func (u Units) MarshalJSON() ([]byte, error) { + // Add the fields from the struct to a new map + result := map[string]interface{}{ + "transactions": u.Transactions, + } + + // Add the custom units to the map + for k, cu := range u.CustomUnits { + result[k] = cu + } + + // return the marshaled map + return json.Marshal(result) +} diff --git a/pkg/transaction/metric/util.go b/pkg/transaction/metric/util.go index 4f35ef03f..bedc2686a 100644 --- a/pkg/transaction/metric/util.go +++ b/pkg/transaction/metric/util.go @@ -1,81 +1,100 @@ package metric import ( + "strings" + + "github.com/Axway/agent-sdk/pkg/agent" "github.com/Axway/agent-sdk/pkg/transaction/models" + transutil "github.com/Axway/agent-sdk/pkg/transaction/util" ) -func centralMetricFromAPIMetric(in *APIMetric) *centralMetricEvent { - out := ¢ralMetricEvent{ - metricInfo: metricInfo{ - StatusCode: in.StatusCode, +func centralMetricFromAPIMetric(in *APIMetric) *centralMetric { + out := ¢ralMetric{ + EventID: in.EventID, + Observation: &ObservationDetails{ + Start: in.Observation.Start, }, - Count: in.Count, - Status: in.Status, - EventID: in.EventID, - StartTime: in.StartTime, + } + + if in.Unit == nil { + // transaction units + out.Units = &Units{ + Transactions: &Transactions{ + UnitCount: UnitCount{ + Count: in.Count, + }, + Status: in.Status, + }, + } + } else { + // custom units + out.Units.CustomUnits[in.Unit.Name] = &UnitCount{ + Count: in.Count, + } } if in.Subscription.ID != unknown && in.Subscription.ID != "" { - out.Subscription = &models.Subscription{ - ID: in.Subscription.ID, - Name: in.Subscription.Name, + out.Subscription = &models.ResourceReference{ + ID: in.Subscription.ID, } } if in.App.ID != unknown && in.App.ID != "" { - out.App = &models.AppDetails{ - ID: in.App.ID, - Name: in.App.Name, + out.App = &models.ApplicationResourceReference{ + ResourceReference: models.ResourceReference{ + ID: in.App.ID, + }, ConsumerOrgID: in.App.ConsumerOrgID, } } if in.Product.ID != unknown && in.Product.ID != "" { - out.Product = &models.Product{ - ID: in.Product.ID, - Name: in.Product.Name, - VersionName: in.Product.VersionName, - VersionID: in.Product.VersionID, + out.Product = &models.ProductResourceReference{ + ResourceReference: models.ResourceReference{ + ID: in.Product.ID, + }, + VersionID: in.Product.VersionID, } } if in.API.ID != unknown && in.API.ID != "" { - out.API = &models.APIDetails{ - ID: in.API.ID, - Name: in.API.Name, - Revision: in.API.Revision, - TeamID: in.API.TeamID, - APIServiceInstance: in.API.APIServiceInstance, - Stage: in.API.Stage, - Version: in.API.Version, + out.API = &models.APIResourceReference{ + ResourceReference: models.ResourceReference{ + ID: in.API.ID, + }, + Name: in.API.Name, + } + svc := agent.GetCacheManager().GetAPIServiceWithAPIID(strings.TrimPrefix(in.API.ID, transutil.SummaryEventProxyIDPrefix)) + if svc != nil { + out.API.APIServiceID = svc.Metadata.ID } } if in.AssetResource.ID != unknown && in.AssetResource.ID != "" { - out.AssetResource = &models.AssetResource{ - ID: in.AssetResource.ID, - Name: in.AssetResource.Name, + out.AssetResource = &models.ResourceReference{ + ID: in.AssetResource.ID, } } if in.ProductPlan.ID != unknown && in.ProductPlan.ID != "" { - out.ProductPlan = &models.ProductPlan{ + out.ProductPlan = &models.ResourceReference{ ID: in.ProductPlan.ID, } } if in.Quota.ID != unknown && in.Quota.ID != "" { - out.Quota = &models.Quota{ + out.Units.Transactions.Quota = &models.ResourceReference{ ID: in.Quota.ID, } } - if in.Unit.ID != unknown && in.Unit.ID != "" { - out.Unit = &models.Unit{ - ID: in.Unit.ID, - Name: in.Unit.Name, - } - } - return out } + +func splitMetricKey(key string) (string, string) { + const delimiter = "." + + groupKey := strings.Join(strings.Split(key, delimiter)[:4], delimiter) + metricKey := strings.Split(key, delimiter)[4] + return groupKey, metricKey +} diff --git a/pkg/transaction/models/definitions.go b/pkg/transaction/models/definitions.go index 7408a081e..f762423fa 100644 --- a/pkg/transaction/models/definitions.go +++ b/pkg/transaction/models/definitions.go @@ -9,6 +9,41 @@ type ConsumerDetails struct { Subscription *Subscription `json:"subscription,omitempty"` } +type ResourceReference struct { + ID string `json:"id,omitempty"` +} + +func (a ResourceReference) GetLogFields(fields logrus.Fields, idFieldName string) logrus.Fields { + if a.ID != "" { + fields[idFieldName] = a.ID + } + return fields +} + +type APIResourceReference struct { + ResourceReference + Name string `json:"name,omitempty"` + APIServiceID string `json:"apiServiceId,omitempty"` +} + +type ApplicationResourceReference struct { + ResourceReference + ConsumerOrgID string `json:"consumerOrgId,omitempty"` +} + +type ProductResourceReference struct { + ResourceReference + VersionID string `json:"versionId,omitempty"` +} + +func (a ProductResourceReference) GetLogFields(fields logrus.Fields, idFieldName string) logrus.Fields { + if a.ID != "" { + fields[idFieldName] = a.ID + fields["productVersionID"] = a.VersionID + } + return fields +} + // Subscription - Represents the subscription used in transaction summary consumer details type Subscription struct { ID string `json:"id,omitempty"` @@ -109,13 +144,12 @@ func (a APIDetails) GetLogFields(fields logrus.Fields) logrus.Fields { // Unit - struct for custom unit details to report type Unit struct { - ID string `json:"id"` Name string `json:"name"` } func (a Unit) GetLogFields(fields logrus.Fields) logrus.Fields { - if a.ID != "unknown" { - fields["apiID"] = a.ID + if a.Name != "unknown" { + fields["unitName"] = a.Name } return fields }