From b6b4ca665166d2f1d37511a47c35d6d47693ed3a Mon Sep 17 00:00:00 2001 From: Jason Collins Date: Wed, 23 Oct 2024 17:56:21 -0700 Subject: [PATCH 01/13] add new struct for new metric event --- pkg/transaction/metric/centralmetric.go | 21 ++++++++++++++ pkg/transaction/metric/units.go | 38 +++++++++++++++++++++++++ pkg/transaction/models/definitions.go | 36 +++++++++++++++++++++++ 3 files changed, 95 insertions(+) create mode 100644 pkg/transaction/metric/units.go diff --git a/pkg/transaction/metric/centralmetric.go b/pkg/transaction/metric/centralmetric.go index 0a9394133..719c0f615 100644 --- a/pkg/transaction/metric/centralmetric.go +++ b/pkg/transaction/metric/centralmetric.go @@ -8,6 +8,27 @@ import ( "github.com/sirupsen/logrus" ) +type newMetric struct { + Subscription *models.ResourceReference `json:"subscription,omitempty"` + App *models.ApplicationResourceReference `json:"app,omitempty"` + Product *models.ProductResourceReference `json:"product,omitempty"` + API *models.APIResourceReference `json:"api,omitempty"` + AssetResource *models.ResourceReference `json:"assetResource,omitempty"` + ProductPlan *models.ResourceReference `json:"productPlan,omitempty"` + Units *Units `json:"units,omitempty"` + Reporter *reporter `json:"reporter,omitempty"` + Observation *ObservationDetails `json:"-"` + EventID string `json:"-"` +} + +type reporter struct { + AgentVersion string `json:"agentVersion,omitempty"` + AgentType string `json:"agentType,omitempty"` + AgentSDKVersion string `json:"agentSDKVersion,omitempty"` + AgentName string `json:"agentName,omitempty"` + ObservationDelta int64 `json:"observationDelta,omitempty"` +} + // metricInfo - the base object holding the metricInfo type metricInfo struct { Subscription *models.Subscription `json:"subscription,omitempty"` diff --git a/pkg/transaction/metric/units.go b/pkg/transaction/metric/units.go new file mode 100644 index 000000000..fc46c56c7 --- /dev/null +++ b/pkg/transaction/metric/units.go @@ -0,0 +1,38 @@ +package metric + +import ( + "encoding/json" + + "github.com/Axway/agent-sdk/pkg/transaction/models" +) + +type UnitCount struct { + Count int64 `json:"count"` + Quota *models.ResourceReference `json:"quota,omitempty"` +} + +type Transactions struct { + UnitCount + Response *ResponseMetrics `json:"response,omitempty"` + Status string `json:"status,omitempty"` +} + +type Units struct { + Transactions *Transactions `json:"transactions,omitempty"` + CustomUnits map[string]*UnitCount `json:"-"` +} + +func (u Units) MarshalJSON() ([]byte, error) { + // Add the fields from the struct to a new map + result := map[string]interface{}{ + "transactions": u.Transactions, + } + + // Add the custom units to the map + for k, cu := range u.CustomUnits { + result[k] = cu + } + + // return the marshaled map + return json.Marshal(result) +} diff --git a/pkg/transaction/models/definitions.go b/pkg/transaction/models/definitions.go index 7408a081e..474d30a75 100644 --- a/pkg/transaction/models/definitions.go +++ b/pkg/transaction/models/definitions.go @@ -9,6 +9,42 @@ type ConsumerDetails struct { Subscription *Subscription `json:"subscription,omitempty"` } +type ResourceReference struct { + ID string `json:"id,omitempty"` + LoggerField string `json:"-"` +} + +func (a ResourceReference) GetLogFields(fields logrus.Fields) logrus.Fields { + if a.ID != "" { + fields[a.LoggerField] = a.ID + } + return fields +} + +type APIResourceReference struct { + ResourceReference + Name string `json:"name,omitempty"` + APIServiceID string `json:"apiServiceId,omitempty"` +} + +type ApplicationResourceReference struct { + ResourceReference + ConsumerOrgID string `json:"consumerOrgId,omitempty"` +} + +type ProductResourceReference struct { + ResourceReference + VersionID string `json:"versionId,omitempty"` +} + +func (a ProductResourceReference) GetLogFields(fields logrus.Fields) logrus.Fields { + if a.ID != "" { + fields[a.LoggerField] = a.ID + fields["productVersionID"] = a.VersionID + } + return fields +} + // Subscription - Represents the subscription used in transaction summary consumer details type Subscription struct { ID string `json:"id,omitempty"` From 84edfc9f5d9a13881cc287d8c2af25e45949ef85 Mon Sep 17 00:00:00 2001 From: Jason Collins Date: Thu, 24 Oct 2024 15:30:01 -0700 Subject: [PATCH 02/13] updates to processing for handling new metric structure --- pkg/transaction/metric/apimetric.go | 2 +- pkg/transaction/metric/cachestorage.go | 37 ++- pkg/transaction/metric/centralmetric.go | 133 +++++----- pkg/transaction/metric/definition.go | 24 +- pkg/transaction/metric/metricbatch.go | 7 +- pkg/transaction/metric/metricscollector.go | 239 +++++++++--------- .../metric/metricscollector_test.go | 5 +- pkg/transaction/metric/statustext.go | 41 +++ pkg/transaction/metric/util.go | 82 +++--- pkg/transaction/models/definitions.go | 16 +- 10 files changed, 332 insertions(+), 254 deletions(-) create mode 100644 pkg/transaction/metric/statustext.go diff --git a/pkg/transaction/metric/apimetric.go b/pkg/transaction/metric/apimetric.go index 2cfca0968..a77f492bc 100644 --- a/pkg/transaction/metric/apimetric.go +++ b/pkg/transaction/metric/apimetric.go @@ -16,7 +16,6 @@ type APIMetric struct { AssetResource models.AssetResource `json:"assetResource,omitempty"` ProductPlan models.ProductPlan `json:"productPlan,omitempty"` Quota models.Quota `json:"quota,omitempty"` - Unit models.Unit `json:"unit,omitempty"` StatusCode string `json:"statusCode,omitempty"` Status string `json:"status,omitempty"` Count int64 `json:"count"` @@ -24,6 +23,7 @@ type APIMetric struct { Observation ObservationDetails `json:"observation"` EventID string `json:"-"` StartTime time.Time `json:"-"` + Unit *models.Unit `json:"unit,omitempty"` } // GetStartTime - Returns the start time for subscription metric diff --git a/pkg/transaction/metric/cachestorage.go b/pkg/transaction/metric/cachestorage.go index 20019e755..170878551 100644 --- a/pkg/transaction/metric/cachestorage.go +++ b/pkg/transaction/metric/cachestorage.go @@ -13,6 +13,7 @@ import ( "github.com/Axway/agent-sdk/pkg/agent" "github.com/Axway/agent-sdk/pkg/cache" "github.com/Axway/agent-sdk/pkg/traceability" + "github.com/Axway/agent-sdk/pkg/transaction/models" "github.com/Axway/agent-sdk/pkg/util" "github.com/rcrowley/go-metrics" ) @@ -32,8 +33,8 @@ type storageCache interface { updateUsage(usageCount int) updateVolume(bytes int64) updateAppUsage(usageCount int, appID string) - updateMetric(cachedMetric cachedMetricInterface, metric *centralMetricEvent) - removeMetric(metric *centralMetricEvent) + updateMetric(cachedMetric cachedMetricInterface, metric *centralMetric) + removeMetric(metric *centralMetric) save() } @@ -161,19 +162,27 @@ func (c *cacheStorage) loadMetrics(storageCache cache.Cache) { var cm cachedMetric json.Unmarshal(buffer, &cm) - var metric *centralMetricEvent + var metric *centralMetric for _, duration := range cm.Values { - unitID := "" - if cm.Unit != nil { - unitID = cm.Unit.ID - } metricDetail := Detail{ - APIDetails: *cm.API, - AppDetails: *cm.App, - UnitName: unitID, StatusCode: cm.StatusCode, Duration: duration, } + if cm.API != nil { + metricDetail.APIDetails = models.APIDetails{ + ID: cm.API.ID, + Name: cm.API.Name, + } + } + if cm.App != nil { + metricDetail.AppDetails = models.AppDetails{ + ID: cm.App.ID, + ConsumerOrgID: cm.App.ConsumerOrgID, + } + } + if cm.Unit != nil { + metricDetail.UnitName = cm.Unit.Name + } metric = c.collector.createOrUpdateMetric(metricDetail) } @@ -185,13 +194,13 @@ func (c *cacheStorage) loadMetrics(storageCache cache.Cache) { } storageCache.Set(newKey, cm) if metric != nil { - metric.StartTime = cm.StartTime + metric.Observation.Start = cm.StartTime.UnixMilli() } } } } -func (c *cacheStorage) updateMetric(cached cachedMetricInterface, metric *centralMetricEvent) { +func (c *cacheStorage) updateMetric(cached cachedMetricInterface, metric *centralMetric) { if !c.isInitialized { return } @@ -199,10 +208,10 @@ func (c *cacheStorage) updateMetric(cached cachedMetricInterface, metric *centra c.storageLock.Lock() defer c.storageLock.Unlock() - c.storage.Set(metric.getKey(), metric.createdCachedMetric(cached)) + c.storage.Set(metric.getKey(), metric.createCachedMetric(cached)) } -func (c *cacheStorage) removeMetric(metric *centralMetricEvent) { +func (c *cacheStorage) removeMetric(metric *centralMetric) { if !c.isInitialized { return } diff --git a/pkg/transaction/metric/centralmetric.go b/pkg/transaction/metric/centralmetric.go index 719c0f615..da8c4d430 100644 --- a/pkg/transaction/metric/centralmetric.go +++ b/pkg/transaction/metric/centralmetric.go @@ -1,6 +1,7 @@ package metric import ( + "fmt" "strings" "time" @@ -8,7 +9,7 @@ import ( "github.com/sirupsen/logrus" ) -type newMetric struct { +type centralMetric struct { Subscription *models.ResourceReference `json:"subscription,omitempty"` App *models.ApplicationResourceReference `json:"app,omitempty"` Product *models.ProductResourceReference `json:"product,omitempty"` @@ -16,98 +17,79 @@ type newMetric struct { AssetResource *models.ResourceReference `json:"assetResource,omitempty"` ProductPlan *models.ResourceReference `json:"productPlan,omitempty"` Units *Units `json:"units,omitempty"` - Reporter *reporter `json:"reporter,omitempty"` + Reporter *Reporter `json:"reporter,omitempty"` Observation *ObservationDetails `json:"-"` EventID string `json:"-"` } -type reporter struct { - AgentVersion string `json:"agentVersion,omitempty"` - AgentType string `json:"agentType,omitempty"` - AgentSDKVersion string `json:"agentSDKVersion,omitempty"` - AgentName string `json:"agentName,omitempty"` - ObservationDelta int64 `json:"observationDelta,omitempty"` -} - -// metricInfo - the base object holding the metricInfo -type metricInfo struct { - Subscription *models.Subscription `json:"subscription,omitempty"` - App *models.AppDetails `json:"app,omitempty"` - Product *models.Product `json:"product,omitempty"` - API *models.APIDetails `json:"api,omitempty"` - AssetResource *models.AssetResource `json:"assetResource,omitempty"` - ProductPlan *models.ProductPlan `json:"productPlan,omitempty"` - Quota *models.Quota `json:"quota,omitempty"` - Unit *models.Unit `json:"unit,omitempty"` - StatusCode string `json:"statusCode,omitempty"` -} - -// centralMetricEvent - the event that is actually sent to platform -type centralMetricEvent struct { - metricInfo - Status string `json:"status,omitempty"` - Count int64 `json:"count"` - Response *ResponseMetrics `json:"response,omitempty"` - Observation *ObservationDetails `json:"observation"` - EventID string `json:"-"` - StartTime time.Time `json:"-"` -} - // GetStartTime - Returns the start time for subscription metric -func (a *centralMetricEvent) GetStartTime() time.Time { - return a.StartTime +func (a *centralMetric) GetStartTime() time.Time { + return time.UnixMilli(a.Observation.Start) } // GetType - Returns APIMetric -func (a *centralMetricEvent) GetType() string { +func (a *centralMetric) GetType() string { return "APIMetric" } // GetType - Returns APIMetric -func (a *centralMetricEvent) GetEventID() string { +func (a *centralMetric) GetEventID() string { return a.EventID } -func (a *centralMetricEvent) GetLogFields() logrus.Fields { +func (a *centralMetric) GetLogFields() logrus.Fields { fields := logrus.Fields{ "id": a.EventID, - "count": a.Count, - "status": a.StatusCode, - "minResponse": a.Response.Min, - "maxResponse": a.Response.Max, - "avgResponse": a.Response.Avg, "startTimestamp": a.Observation.Start, "endTimestamp": a.Observation.End, } if a.Subscription != nil { - fields = a.Subscription.GetLogFields(fields) + fields = a.Subscription.GetLogFields(fields, "subscriptionID") } if a.App != nil { - fields = a.App.GetLogFields(fields) + fields = a.App.GetLogFields(fields, "applicationID") } if a.Product != nil { - fields = a.Product.GetLogFields(fields) + fields = a.Product.GetLogFields(fields, "productID") } if a.API != nil { - fields = a.API.GetLogFields(fields) + fields = a.API.GetLogFields(fields, "apiID") } if a.AssetResource != nil { - fields = a.AssetResource.GetLogFields(fields) + fields = a.AssetResource.GetLogFields(fields, "assetResourceID") } if a.ProductPlan != nil { - fields = a.ProductPlan.GetLogFields(fields) + fields = a.ProductPlan.GetLogFields(fields, "productPlanID") } - if a.Quota != nil { - fields = a.Quota.GetLogFields(fields) + + // add transaction unit info and custom units if they exist + if a.Units == nil { + return fields + } + if a.Units.Transactions != nil { + if a.Units.Transactions.Quota != nil { + fields = a.Units.Transactions.Quota.GetLogFields(fields, "transactionQuotaID") + } + fields["transactionCount"] = a.Units.Transactions.Count + fields["status"] = a.Units.Transactions.Status + fields["minResponse"] = a.Units.Transactions.Response.Min + fields["maxResponse"] = a.Units.Transactions.Response.Max + fields["avgResponse"] = a.Units.Transactions.Response.Avg + } + if len(a.Units.CustomUnits) == 0 { + return fields } - if a.Unit != nil { - fields = a.Unit.GetLogFields(fields) + for k, u := range a.Units.CustomUnits { + if u.Quota != nil { + fields = u.Quota.GetLogFields(fields, fmt.Sprintf("%sQuotaID", k)) + } + fields[fmt.Sprintf("%sCount", k)] = u.Count } return fields } // getKey - returns the cache key for the metric -func (a *centralMetricEvent) getKey() string { +func (a *centralMetric) getKey() string { subID := unknown if a.Subscription != nil { subID = a.Subscription.ID @@ -121,20 +103,41 @@ func (a *centralMetricEvent) getKey() string { apiID = a.API.ID } uniqueKey := unknown - if a.StatusCode != "" { - uniqueKey = a.StatusCode - } else if a.Unit != nil { - uniqueKey = a.Unit.ID + if a.Units != nil && a.Units.Transactions != nil && a.Units.Transactions.Status != "" { + uniqueKey = a.Units.Transactions.Status + } else { + // get the first, and should be only, custom unit name + for k := range a.Units.CustomUnits { + uniqueKey = k + break + } } return strings.Join([]string{metricKeyPrefix, subID, appID, apiID, uniqueKey}, ".") } -func (a *centralMetricEvent) createdCachedMetric(cached cachedMetricInterface) cachedMetric { - return cachedMetric{ - metricInfo: a.metricInfo, - StartTime: a.StartTime, - Count: cached.Count(), - Values: cached.Values(), +func (a *centralMetric) createCachedMetric(cached cachedMetricInterface) cachedMetric { + cacheM := cachedMetric{ + Subscription: a.Subscription, + App: a.App, + Product: a.Product, + API: a.API, + AssetResource: a.AssetResource, + ProductPlan: a.ProductPlan, + Count: cached.Count(), + Values: cached.Values(), + StartTime: time.UnixMilli(a.Observation.Start), + } + + if a.Units.Transactions != nil { + cacheM.Quota = a.Units.Transactions.Quota + cacheM.StatusCode = a.Units.Transactions.Status + } else { + for u := range a.Units.CustomUnits { + cacheM.Unit = &models.Unit{ + Name: u, + } + } } + return cacheM } diff --git a/pkg/transaction/metric/definition.go b/pkg/transaction/metric/definition.go index c6ad4bcca..8a2c69fa6 100644 --- a/pkg/transaction/metric/definition.go +++ b/pkg/transaction/metric/definition.go @@ -75,10 +75,18 @@ type ObservationDetails struct { // cachedMetric - struct to hold metric specific that gets cached and used for agent recovery type cachedMetric struct { - metricInfo - Count int64 `json:"count"` - Values []int64 `json:"values,omitempty"` - StartTime time.Time `json:"startTime"` + Subscription *models.ResourceReference `json:"subscription,omitempty"` + App *models.ApplicationResourceReference `json:"app,omitempty"` + Product *models.ProductResourceReference `json:"product,omitempty"` + API *models.APIResourceReference `json:"api,omitempty"` + AssetResource *models.ResourceReference `json:"assetResource,omitempty"` + ProductPlan *models.ResourceReference `json:"productPlan,omitempty"` + Quota *models.ResourceReference `json:"quota,omitempty"` + Unit *models.Unit `json:"unit,omitempty"` + StatusCode string `json:"statusCode,omitempty"` + Count int64 `json:"count"` + Values []int64 `json:"values,omitempty"` + StartTime time.Time `json:"startTime"` } // V4EventDistribution - represents V4 distribution @@ -184,3 +192,11 @@ func (t ISO8601Time) MarshalJSON() ([]byte, error) { b = append(b, '"') return b, nil } + +type Reporter struct { + AgentVersion string `json:"agentVersion,omitempty"` + AgentType string `json:"agentType,omitempty"` + AgentSDKVersion string `json:"agentSDKVersion,omitempty"` + AgentName string `json:"agentName,omitempty"` + ObservationDelta int64 `json:"observationDelta,omitempty"` +} diff --git a/pkg/transaction/metric/metricbatch.go b/pkg/transaction/metric/metricbatch.go index c362a21f9..657a2e558 100644 --- a/pkg/transaction/metric/metricbatch.go +++ b/pkg/transaction/metric/metricbatch.go @@ -3,6 +3,7 @@ package metric import ( "context" "encoding/json" + "time" "github.com/Axway/agent-sdk/pkg/traceability" beatPub "github.com/elastic/beats/v7/libbeat/publisher" @@ -84,7 +85,7 @@ func (b *EventBatch) Events() []beatPub.Event { // ACK - all events have been acknowledgeded, cleanup the counters func (b *EventBatch) ACK() { b.ackEvents(b.events) - b.collector.metricStartTime = b.collector.metricEndTime + b.collector.metricStartTime = time.Time{} b.batchUnlock() } @@ -183,7 +184,7 @@ func getEventsToAck(retryEvents []beatPub.Event, events []beatPub.Event) []beatP return ackEvents } -func getMetricFromEvent(event beatPub.Event) *APIMetric { +func getMetricFromEvent(event beatPub.Event) *centralMetric { if data, found := event.Content.Fields[messageKey]; found { v4Bytes := data.(string) v4Event := make(map[string]interface{}) @@ -206,7 +207,7 @@ func getMetricFromEvent(event beatPub.Event) *APIMetric { if err != nil { return nil } - metric := &APIMetric{} + metric := ¢ralMetric{} err = json.Unmarshal(buf, metric) if err != nil { return nil diff --git a/pkg/transaction/metric/metricscollector.go b/pkg/transaction/metric/metricscollector.go index 3a43c453c..4fb774a23 100644 --- a/pkg/transaction/metric/metricscollector.go +++ b/pkg/transaction/metric/metricscollector.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" "os" - "strconv" "strings" "sync" "time" @@ -30,9 +29,9 @@ import ( ) const ( - startTimestampStr = "start-timestamp" - endTimestampStr = "end-timestamp" - eventTypeStr = "event-type" + startTimestampStr = "startTimestamp" + endTimestampStr = "endTimestamp" + eventTypeStr = "eventType" usageStr = "usage" metricStr = "metric" volumeStr = "volume" @@ -68,11 +67,12 @@ type collector struct { metricStartTime time.Time metricEndTime time.Time orgGUID string + agentName string lock *sync.Mutex batchLock *sync.Mutex registry metrics.Registry metricBatch *EventBatch - metricMap map[string]map[string]map[string]map[string]*centralMetricEvent + metricMap map[string]map[string]map[string]map[string]*centralMetric metricMapLock *sync.Mutex publishItemQueue []publishQueueItem jobID string @@ -158,16 +158,17 @@ func createMetricCollector() Collector { metricCollector := &collector{ // Set the initial start time to be minimum 1m behind, so that the job can generate valid event // if any usage event are to be generated on startup - usageStartTime: now().Add(-1 * time.Minute), - metricStartTime: now().Add(-1 * time.Minute), + usageStartTime: now().Truncate(time.Minute), // round down to closest minute + metricStartTime: now().Truncate(time.Minute), // round down to closest minute lock: &sync.Mutex{}, batchLock: &sync.Mutex{}, metricMapLock: &sync.Mutex{}, registry: metrics.NewRegistry(), - metricMap: make(map[string]map[string]map[string]map[string]*centralMetricEvent), + metricMap: make(map[string]map[string]map[string]map[string]*centralMetric), publishItemQueue: make([]publishQueueItem, 0), metricConfig: agent.GetCentralConfig().GetMetricReportingConfig(), usageConfig: agent.GetCentralConfig().GetUsageReportingConfig(), + agentName: agent.GetCentralConfig().GetAgentName(), logger: logger, metricLogger: log.NewMetricFieldLogger(), } @@ -249,12 +250,19 @@ func (c *collector) InitializeBatch() { c.metricBatch = NewEventBatch(c) } +func (c *collector) updateStartTime() { + if c.metricStartTime.IsZero() { + c.metricStartTime = now().Truncate(time.Minute) + } +} + // AddMetric - add metric for API transaction to collection func (c *collector) AddMetric(apiDetails models.APIDetails, statusCode string, duration, bytes int64, appName string) { c.lock.Lock() defer c.lock.Unlock() c.batchLock.Lock() defer c.batchLock.Unlock() + c.updateStartTime() c.updateUsage(1) c.updateVolume(bytes) } @@ -277,14 +285,13 @@ func (c *collector) AddAPIMetricDetail(detail MetricDetail) { Status: detail.StatusCode, } newMetric := c.createMetric(transactionCtx) + // update the new metric with all the necessary details - newMetric.Count = detail.Count - newMetric.Response = &detail.Response - newMetric.StartTime = time.UnixMilli(detail.Observation.Start) + newMetric.Units.Transactions.Count = detail.Count + newMetric.Units.Transactions.Response = &detail.Response newMetric.Observation = &detail.Observation - newMetric.StatusCode = detail.StatusCode - newMetric.Status = c.getStatusText(detail.StatusCode) + c.updateStartTime() c.addMetric(newMetric) } @@ -297,7 +304,6 @@ func (c *collector) AddCustomMetricDetail(detail CustomMetricDetail) { logger := c.logger.WithField("handler", "customMetric"). WithField("apiID", detail.APIDetails.ID). WithField("appID", detail.AppDetails.ID). - WithField("unitID", detail.UnitDetails.ID). WithField("unitName", detail.UnitDetails.Name) if detail.APIDetails.ID == "" { @@ -310,7 +316,7 @@ func (c *collector) AddCustomMetricDetail(detail CustomMetricDetail) { return } - if detail.UnitDetails.ID == "" || detail.UnitDetails.Name == "" { + if detail.UnitDetails.Name == "" { logger.Error("custom units require Unit information") return } @@ -319,12 +325,14 @@ func (c *collector) AddCustomMetricDetail(detail CustomMetricDetail) { transactionCtx := transactionContext{ APIDetails: detail.APIDetails, AppDetails: detail.AppDetails, - UnitName: detail.UnitDetails.ID, + UnitName: detail.UnitDetails.Name, } metric := c.createMetric(transactionCtx) - metric.StartTime = time.UnixMilli(detail.Observation.Start) - metric.Observation = &detail.Observation + metric.Observation = &ObservationDetails{ + Start: detail.Observation.Start, + End: detail.Observation.End, + } if m := c.getExistingMetric(metric); m != nil { // use the cached metric @@ -332,25 +340,26 @@ func (c *collector) AddCustomMetricDetail(detail CustomMetricDetail) { } // add the count - metric.Count += detail.Count + metric.Units.CustomUnits[detail.UnitDetails.Name].Count += detail.Count counter := c.getOrRegisterCounter(metric.getKey()) counter.Inc(detail.Count) + c.updateStartTime() c.updateMetricWithCachedMetric(metric, newCustomCounter(counter)) } // AddAPIMetric - add api metric for API transaction func (c *collector) AddAPIMetric(metric *APIMetric) { + c.updateStartTime() c.addMetric(centralMetricFromAPIMetric(metric)) } // addMetric - add central metric event -func (c *collector) addMetric(metric *centralMetricEvent) { +func (c *collector) addMetric(metric *centralMetric) { if metric.EventID == "" { metric.EventID = uuid.NewString() } - metric.Status = c.getStatusText(metric.StatusCode) v4Event := c.createV4Event(metric.Observation.Start, metric) metricData, _ := json.Marshal(v4Event) @@ -363,7 +372,6 @@ func (c *collector) addMetric(metric *centralMetricEvent) { if err != nil { return } - c.updateUsage(metric.Count) c.metricBatch.AddEventWithoutHistogram(pubEvent) } @@ -391,34 +399,47 @@ func (c *collector) updateUsage(count int64) { c.storage.updateUsage(int(transactionCount.Count())) } -func (c *collector) createMetric(detail transactionContext) *centralMetricEvent { +func (c *collector) createMetric(detail transactionContext) *centralMetric { // Go get the access request and managed app accessRequest, managedApp := c.getAccessRequestAndManagedApp(agent.GetCacheManager(), detail) - cme := ¢ralMetricEvent{ - metricInfo: metricInfo{ - Subscription: c.createSubscriptionDetail(accessRequest), - App: c.createAppDetail(managedApp), - Product: c.getProduct(accessRequest), - API: c.createAPIDetail(detail.APIDetails), - AssetResource: c.getAssetResource(accessRequest), - ProductPlan: c.getProductPlan(accessRequest), - Quota: c.getQuota(accessRequest, detail.UnitName), - Unit: c.getProductPlanUnit(accessRequest, detail.UnitName), + me := ¢ralMetric{ + Subscription: c.createSubscriptionDetail(accessRequest), + App: c.createAppDetail(managedApp), + Product: c.getProduct(accessRequest), + API: c.createAPIDetail(detail.APIDetails), + AssetResource: c.getAssetResource(accessRequest), + ProductPlan: c.getProductPlan(accessRequest), + Observation: &ObservationDetails{ + Start: now().Unix(), }, - StartTime: now(), - EventID: uuid.NewString(), + EventID: uuid.NewString(), } + // transactions if detail.Status != "" { - cme.StatusCode = detail.Status - cme.Status = c.getStatusText(detail.Status) + me.Units = &Units{ + Transactions: &Transactions{ + UnitCount: UnitCount{ + Quota: c.getQuota(accessRequest, ""), // TODO figure this out for transaction quota + }, + Status: c.getStatusText(detail.Status), + }, + } + } else if detail.UnitName != "" { + me.Units = &Units{ + CustomUnits: map[string]*UnitCount{ + detail.UnitName: { + Quota: c.getQuota(accessRequest, detail.UnitName), + }, + }, + } } - return cme + return me } -func (c *collector) createOrUpdateMetric(detail Detail) *centralMetricEvent { +func (c *collector) createOrUpdateMetric(detail Detail) *centralMetric { if !c.metricConfig.CanPublish() || c.usageConfig.IsOfflineMode() { return nil // no need to update metrics with publish off } @@ -438,7 +459,7 @@ func (c *collector) createOrUpdateMetric(detail Detail) *centralMetricEvent { return c.updateMetricWithCachedMetric(metric, newCustomHistogram(histogram)) } -func (c *collector) getExistingMetric(metric *centralMetricEvent) *centralMetricEvent { +func (c *collector) getExistingMetric(metric *centralMetric) *centralMetric { keyParts := strings.Split(metric.getKey(), ".") c.metricMapLock.Lock() @@ -459,20 +480,20 @@ func (c *collector) getExistingMetric(metric *centralMetricEvent) *centralMetric return c.metricMap[keyParts[1]][keyParts[2]][keyParts[3]][keyParts[4]] } -func (c *collector) updateMetricWithCachedMetric(metric *centralMetricEvent, cached cachedMetricInterface) *centralMetricEvent { +func (c *collector) updateMetricWithCachedMetric(metric *centralMetric, cached cachedMetricInterface) *centralMetric { keyParts := strings.Split(metric.getKey(), ".") c.metricMapLock.Lock() defer c.metricMapLock.Unlock() if _, ok := c.metricMap[keyParts[1]]; !ok { - c.metricMap[keyParts[1]] = make(map[string]map[string]map[string]*centralMetricEvent) + c.metricMap[keyParts[1]] = make(map[string]map[string]map[string]*centralMetric) } if _, ok := c.metricMap[keyParts[1]][keyParts[2]]; !ok { - c.metricMap[keyParts[1]][keyParts[2]] = make(map[string]map[string]*centralMetricEvent) + c.metricMap[keyParts[1]][keyParts[2]] = make(map[string]map[string]*centralMetric) } if _, ok := c.metricMap[keyParts[1]][keyParts[2]][keyParts[3]]; !ok { - c.metricMap[keyParts[1]][keyParts[2]][keyParts[3]] = make(map[string]*centralMetricEvent) + c.metricMap[keyParts[1]][keyParts[2]][keyParts[3]] = make(map[string]*centralMetric) } if _, ok := c.metricMap[keyParts[1]][keyParts[2]][keyParts[3]][keyParts[4]]; !ok { // First api metric for sub+app+api+statuscode, @@ -525,7 +546,7 @@ func (c *collector) getAccessRequestAndManagedApp(cacheManager cache.Manager, de return accessRequest, managedApp } -func (c *collector) createSubscriptionDetail(accessRequest *management.AccessRequest) *models.Subscription { +func (c *collector) createSubscriptionDetail(accessRequest *management.AccessRequest) *models.ResourceReference { if accessRequest == nil { return nil } @@ -535,20 +556,13 @@ func (c *collector) createSubscriptionDetail(accessRequest *management.AccessReq return nil } - return &models.Subscription{ - ID: subRef.ID, - Name: subRef.Name, + return &models.ResourceReference{ + ID: subRef.ID, } } -func (c *collector) createAppDetail(appRI *v1.ResourceInstance) *models.AppDetails { +func (c *collector) createAppDetail(appRI *v1.ResourceInstance) *models.ApplicationResourceReference { if appRI == nil { - - // TODO remove the following when product plan unit ready - // return &models.AppDetails{ - // ID: "app-id", - // Name: "app-name", - // } return nil } @@ -565,23 +579,24 @@ func (c *collector) createAppDetail(appRI *v1.ResourceInstance) *models.AppDetai return nil } - return &models.AppDetails{ - ID: appRef.ID, - Name: appRef.Name, + return &models.ApplicationResourceReference{ + ResourceReference: models.ResourceReference{ + ID: appRef.ID, + }, ConsumerOrgID: orgID, } } -func (c *collector) createAPIDetail(api models.APIDetails) *models.APIDetails { - return &models.APIDetails{ - ID: api.ID, - Name: api.Name, - Revision: api.Revision, - TeamID: api.TeamID, +func (c *collector) createAPIDetail(api models.APIDetails) *models.APIResourceReference { + return &models.APIResourceReference{ + ResourceReference: models.ResourceReference{ + ID: api.ID, + }, + Name: api.Name, } } -func (c *collector) getAssetResource(accessRequest *management.AccessRequest) *models.AssetResource { +func (c *collector) getAssetResource(accessRequest *management.AccessRequest) *models.ResourceReference { if accessRequest == nil { return nil } @@ -590,13 +605,13 @@ func (c *collector) getAssetResource(accessRequest *management.AccessRequest) *m if assetResourceRef.ID == "" { return nil } - return &models.AssetResource{ - ID: assetResourceRef.ID, - Name: assetResourceRef.Name, + + return &models.ResourceReference{ + ID: assetResourceRef.ID, } } -func (c *collector) getProduct(accessRequest *management.AccessRequest) *models.Product { +func (c *collector) getProduct(accessRequest *management.AccessRequest) *models.ProductResourceReference { if accessRequest == nil { return nil } @@ -608,15 +623,15 @@ func (c *collector) getProduct(accessRequest *management.AccessRequest) *models. return nil } - return &models.Product{ - ID: productRef.ID, - Name: productRef.Name, - VersionID: releaseRef.ID, - VersionName: releaseRef.Name, + return &models.ProductResourceReference{ + ResourceReference: models.ResourceReference{ + ID: productRef.ID, + }, + VersionID: releaseRef.ID, } } -func (c *collector) getProductPlan(accessRequest *management.AccessRequest) *models.ProductPlan { +func (c *collector) getProductPlan(accessRequest *management.AccessRequest) *models.ResourceReference { if accessRequest == nil { return nil } @@ -626,12 +641,12 @@ func (c *collector) getProductPlan(accessRequest *management.AccessRequest) *mod return nil } - return &models.ProductPlan{ + return &models.ResourceReference{ ID: productPlanRef.ID, } } -func (c *collector) getQuota(accessRequest *management.AccessRequest, id string) *models.Quota { +func (c *collector) getQuota(accessRequest *management.AccessRequest, id string) *models.ResourceReference { if accessRequest == nil { return nil } @@ -641,27 +656,11 @@ func (c *collector) getQuota(accessRequest *management.AccessRequest, id string) return nil } - return &models.Quota{ + return &models.ResourceReference{ ID: quotaRef.ID, } } -func (c *collector) getProductPlanUnit(accessRequest *management.AccessRequest, name string) *models.Unit { - if accessRequest == nil { - return nil - } - - unitRef := accessRequest.GetReferenceByNameAndGVK(name, catalog.ProductPlanUnitGVK()) - if unitRef.ID == "" { - return nil - } - - return &models.Unit{ - ID: unitRef.ID, - Name: unitRef.Name, - } -} - func (c *collector) cleanup() { c.publishItemQueue = make([]publishQueueItem, 0) } @@ -815,17 +814,17 @@ func (c *collector) processMetric(metricName string, metric interface{}) { } } -func (c *collector) setMetricsFromHistogram(metrics *centralMetricEvent, histogram metrics.Histogram) { - metrics.Count = histogram.Count() - metrics.Response = &ResponseMetrics{ +func (c *collector) setMetricsFromHistogram(metrics *centralMetric, histogram metrics.Histogram) { + metrics.Units.Transactions.Count = histogram.Count() + metrics.Units.Transactions.Response = &ResponseMetrics{ Max: histogram.Max(), Min: histogram.Min(), Avg: histogram.Mean(), } } -func (c *collector) generateMetricEvent(histogram metrics.Histogram, metric *centralMetricEvent) { - if metric.Count == 0 { +func (c *collector) generateMetricEvent(histogram metrics.Histogram, metric *centralMetric) { + if metric.Units != nil && metric.Units.Transactions != nil && metric.Units.Transactions.Count == 0 { c.logger.Trace("skipping registry entry with no reported quantity") return } @@ -833,6 +832,13 @@ func (c *collector) generateMetricEvent(histogram metrics.Histogram, metric *cen Start: util.ConvertTimeToMillis(c.metricStartTime), End: util.ConvertTimeToMillis(c.metricEndTime), } + metric.Reporter = &Reporter{ + AgentVersion: cmd.BuildVersion, + AgentType: cmd.BuildAgentName, + AgentSDKVersion: cmd.SDKBuildVersion, + AgentName: c.agentName, + ObservationDelta: metric.Observation.End - metric.Observation.Start, + } // Generate app subscription metric c.generateV4Event(histogram, metric) @@ -924,22 +930,31 @@ func (c *collector) cleanupUsageCounter(usageEventItem usageEventPublishItem) { } } -func (c *collector) logMetric(msg string, metric *APIMetric) { +func (c *collector) logMetric(msg string, metric *centralMetric) { c.metricLogger.WithField("id", metric.EventID).Info(msg) } -func (c *collector) cleanupMetricCounter(histogram metrics.Histogram, metric *APIMetric) { +func (c *collector) cleanupMetricCounter(histogram metrics.Histogram, metric *centralMetric) { c.metricMapLock.Lock() defer c.metricMapLock.Unlock() - subID := metric.Subscription.ID - appID := metric.App.ID + subID := unknown + if metric.Subscription != nil { + subID = metric.Subscription.ID + } + appID := unknown + if metric.App != nil { + appID = metric.App.ID + } apiID := metric.API.ID - statusCode := metric.StatusCode + if metric.API != nil { + apiID = metric.API.ID + } + status := metric.Units.Transactions.Status if consumerAppMap, ok := c.metricMap[subID]; ok { if apiMap, ok := consumerAppMap[appID]; ok { if apiStatusMap, ok := apiMap[apiID]; ok { - c.storage.removeMetric(apiStatusMap[statusCode]) - delete(c.metricMap[subID][appID][apiID], statusCode) + c.storage.removeMetric(apiStatusMap[status]) + delete(c.metricMap[subID][appID][apiID], status) histogram.Clear() } if len(c.metricMap[subID][appID][apiID]) == 0 { @@ -956,18 +971,10 @@ func (c *collector) cleanupMetricCounter(histogram metrics.Histogram, metric *AP c.logger. WithField(startTimestampStr, util.ConvertTimeToMillis(c.usageStartTime)). WithField(endTimestampStr, util.ConvertTimeToMillis(c.usageEndTime)). - WithField("api-name", metric.API.Name). + WithField("apiName", metric.API.Name). Info("Published metrics report for API") } func (c *collector) getStatusText(statusCode string) string { - httpStatusCode, _ := strconv.Atoi(statusCode) - switch { - case httpStatusCode >= 100 && httpStatusCode < 400: - return "Success" - case httpStatusCode >= 400 && httpStatusCode < 500: - return "Failure" - default: - return "Exception" - } + return getStatusFromCodeString(statusCode).String() } diff --git a/pkg/transaction/metric/metricscollector_test.go b/pkg/transaction/metric/metricscollector_test.go index eb2cd1fb0..f6ea4451a 100644 --- a/pkg/transaction/metric/metricscollector_test.go +++ b/pkg/transaction/metric/metricscollector_test.go @@ -487,7 +487,7 @@ func TestMetricCollector(t *testing.T) { } runTestHealthcheck() myCollector.InitializeBatch() - metricCollector.metricMap = make(map[string]map[string]map[string]map[string]*centralMetricEvent) + metricCollector.metricMap = make(map[string]map[string]map[string]map[string]*centralMetric) cfg.SetAxwayManaged(test.trackVolume) testClient := setupMockClient(test.retryBatchCount) mockClient := testClient.(*MockClient) @@ -936,7 +936,6 @@ func TestCustomMetrics(t *testing.T) { AppDetails: appDetails1, Count: 5, UnitDetails: models.Unit{ - ID: "unit-id", Name: "unit-name", }, } @@ -982,7 +981,7 @@ func TestCustomMetrics(t *testing.T) { if tc.skip { return } - metricCollector.metricMap = map[string]map[string]map[string]map[string]*centralMetricEvent{} + metricCollector.metricMap = map[string]map[string]map[string]map[string]*centralMetric{} metricCollector.AddCustomMetricDetail(tc.metricEvent1) if tc.metricEvent2.Count > 0 { metricCollector.AddCustomMetricDetail(tc.metricEvent2) diff --git a/pkg/transaction/metric/statustext.go b/pkg/transaction/metric/statustext.go new file mode 100644 index 000000000..2e5e13646 --- /dev/null +++ b/pkg/transaction/metric/statustext.go @@ -0,0 +1,41 @@ +package metric + +import "strconv" + +type statusText string + +const ( + Success statusText = "Success" + Failure statusText = "Failure" + Exception statusText = "Exception" +) + +var statuses = map[string]statusText{ + Success.String(): Success, + Failure.String(): Failure, + Exception.String(): Exception, +} + +func (s statusText) String() string { + return string(s) +} + +func getStatusFromCodeString(statusCode string) statusText { + if v, ok := statuses[statusCode]; ok { + return v + } + + httpStatusCode, _ := strconv.Atoi(statusCode) + return getStatusFromCode(httpStatusCode) +} + +func getStatusFromCode(statusCode int) statusText { + switch { + case statusCode >= 100 && statusCode < 400: + return Success + case statusCode >= 400 && statusCode < 500: + return Failure + default: + return Exception + } +} diff --git a/pkg/transaction/metric/util.go b/pkg/transaction/metric/util.go index 4f35ef03f..610ee8ea8 100644 --- a/pkg/transaction/metric/util.go +++ b/pkg/transaction/metric/util.go @@ -4,78 +4,82 @@ import ( "github.com/Axway/agent-sdk/pkg/transaction/models" ) -func centralMetricFromAPIMetric(in *APIMetric) *centralMetricEvent { - out := ¢ralMetricEvent{ - metricInfo: metricInfo{ - StatusCode: in.StatusCode, +func centralMetricFromAPIMetric(in *APIMetric) *centralMetric { + out := ¢ralMetric{ + EventID: in.EventID, + Observation: &ObservationDetails{ + Start: in.Observation.Start, }, - Count: in.Count, - Status: in.Status, - EventID: in.EventID, - StartTime: in.StartTime, + } + + if in.Unit == nil { + // transaction units + out.Units = &Units{ + Transactions: &Transactions{ + UnitCount: UnitCount{ + Count: in.Count, + }, + Status: in.Status, + }, + } + } else { + // custom units + out.Units.CustomUnits[in.Unit.Name] = &UnitCount{ + Count: in.Count, + } } if in.Subscription.ID != unknown && in.Subscription.ID != "" { - out.Subscription = &models.Subscription{ - ID: in.Subscription.ID, - Name: in.Subscription.Name, + out.Subscription = &models.ResourceReference{ + ID: in.Subscription.ID, } } if in.App.ID != unknown && in.App.ID != "" { - out.App = &models.AppDetails{ - ID: in.App.ID, - Name: in.App.Name, + out.App = &models.ApplicationResourceReference{ + ResourceReference: models.ResourceReference{ + ID: in.App.ID, + }, ConsumerOrgID: in.App.ConsumerOrgID, } } if in.Product.ID != unknown && in.Product.ID != "" { - out.Product = &models.Product{ - ID: in.Product.ID, - Name: in.Product.Name, - VersionName: in.Product.VersionName, - VersionID: in.Product.VersionID, + out.Product = &models.ProductResourceReference{ + ResourceReference: models.ResourceReference{ + ID: in.Product.ID, + }, + VersionID: in.Product.VersionID, } } if in.API.ID != unknown && in.API.ID != "" { - out.API = &models.APIDetails{ - ID: in.API.ID, - Name: in.API.Name, - Revision: in.API.Revision, - TeamID: in.API.TeamID, - APIServiceInstance: in.API.APIServiceInstance, - Stage: in.API.Stage, - Version: in.API.Version, + out.API = &models.APIResourceReference{ + ResourceReference: models.ResourceReference{ + ID: in.API.ID, + }, + Name: in.API.Name, + //TODO find api service ID } } if in.AssetResource.ID != unknown && in.AssetResource.ID != "" { - out.AssetResource = &models.AssetResource{ - ID: in.AssetResource.ID, - Name: in.AssetResource.Name, + out.AssetResource = &models.ResourceReference{ + ID: in.AssetResource.ID, } } if in.ProductPlan.ID != unknown && in.ProductPlan.ID != "" { - out.ProductPlan = &models.ProductPlan{ + out.ProductPlan = &models.ResourceReference{ ID: in.ProductPlan.ID, } } if in.Quota.ID != unknown && in.Quota.ID != "" { - out.Quota = &models.Quota{ + out.Units.Transactions.Quota = &models.ResourceReference{ ID: in.Quota.ID, } } - if in.Unit.ID != unknown && in.Unit.ID != "" { - out.Unit = &models.Unit{ - ID: in.Unit.ID, - Name: in.Unit.Name, - } - } - return out } diff --git a/pkg/transaction/models/definitions.go b/pkg/transaction/models/definitions.go index 474d30a75..f762423fa 100644 --- a/pkg/transaction/models/definitions.go +++ b/pkg/transaction/models/definitions.go @@ -10,13 +10,12 @@ type ConsumerDetails struct { } type ResourceReference struct { - ID string `json:"id,omitempty"` - LoggerField string `json:"-"` + ID string `json:"id,omitempty"` } -func (a ResourceReference) GetLogFields(fields logrus.Fields) logrus.Fields { +func (a ResourceReference) GetLogFields(fields logrus.Fields, idFieldName string) logrus.Fields { if a.ID != "" { - fields[a.LoggerField] = a.ID + fields[idFieldName] = a.ID } return fields } @@ -37,9 +36,9 @@ type ProductResourceReference struct { VersionID string `json:"versionId,omitempty"` } -func (a ProductResourceReference) GetLogFields(fields logrus.Fields) logrus.Fields { +func (a ProductResourceReference) GetLogFields(fields logrus.Fields, idFieldName string) logrus.Fields { if a.ID != "" { - fields[a.LoggerField] = a.ID + fields[idFieldName] = a.ID fields["productVersionID"] = a.VersionID } return fields @@ -145,13 +144,12 @@ func (a APIDetails) GetLogFields(fields logrus.Fields) logrus.Fields { // Unit - struct for custom unit details to report type Unit struct { - ID string `json:"id"` Name string `json:"name"` } func (a Unit) GetLogFields(fields logrus.Fields) logrus.Fields { - if a.ID != "unknown" { - fields["apiID"] = a.ID + if a.Name != "unknown" { + fields["unitName"] = a.Name } return fields } From 3645aee17c23ef92a8243ba34d43b7aa6d52eb26 Mon Sep 17 00:00:00 2001 From: Jason Collins Date: Fri, 25 Oct 2024 20:09:50 -0700 Subject: [PATCH 03/13] update cache to lookup managed app by catalog app id --- pkg/agent/cache/managedapplication.go | 23 +++++++++++++++++++++ pkg/agent/cache/manager.go | 3 ++- pkg/agent/cache/migratepersistedcache.go | 26 ++++++++++++++++++++++++ 3 files changed, 51 insertions(+), 1 deletion(-) diff --git a/pkg/agent/cache/managedapplication.go b/pkg/agent/cache/managedapplication.go index e72eb7a77..724a75ffd 100644 --- a/pkg/agent/cache/managedapplication.go +++ b/pkg/agent/cache/managedapplication.go @@ -2,6 +2,8 @@ package cache import ( v1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/api/v1" + catalog "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/catalog/v1alpha1" + management "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/management/v1alpha1" ) // ManagedApplication cache related methods @@ -16,7 +18,14 @@ func (c *cacheManager) AddManagedApplication(resource *v1.ResourceInstance) { if resource == nil { return } + manApp := management.ManagedApplication{} + err := manApp.FromInstance(resource) + if err != nil { + return + } + catalogAppRef := manApp.GetReferenceByGVK(catalog.ApplicationGVK()) c.managedApplicationMap.SetWithSecondaryKey(resource.Metadata.ID, resource.Name, resource) + c.managedApplicationMap.SetSecondaryKey(resource.Metadata.ID, catalogAppRef.ID) } func (c *cacheManager) GetManagedApplication(id string) *v1.ResourceInstance { @@ -33,6 +42,20 @@ func (c *cacheManager) GetManagedApplication(id string) *v1.ResourceInstance { return nil } +func (c *cacheManager) GetManagedApplicationByApplicationID(id string) *v1.ResourceInstance { + c.ApplyResourceReadLock() + defer c.ReleaseResourceReadLock() + + managedApp, _ := c.managedApplicationMap.GetBySecondaryKey(id) + if managedApp != nil { + ri, ok := managedApp.(*v1.ResourceInstance) + if ok { + return ri + } + } + return nil +} + func (c *cacheManager) GetManagedApplicationByName(name string) *v1.ResourceInstance { c.ApplyResourceReadLock() defer c.ReleaseResourceReadLock() diff --git a/pkg/agent/cache/manager.go b/pkg/agent/cache/manager.go index e5f73f128..4c23cda87 100644 --- a/pkg/agent/cache/manager.go +++ b/pkg/agent/cache/manager.go @@ -92,6 +92,7 @@ type Manager interface { GetManagedApplicationCacheKeys() []string AddManagedApplication(resource *v1.ResourceInstance) GetManagedApplication(id string) *v1.ResourceInstance + GetManagedApplicationByApplicationID(id string) *v1.ResourceInstance GetManagedApplicationByName(name string) *v1.ResourceInstance DeleteManagedApplication(id string) error @@ -116,7 +117,6 @@ type Manager interface { ReleaseResourceReadLock() } -type teamRefreshHandler func() type cacheManager struct { jobs.Job logger log.FieldLogger @@ -157,6 +157,7 @@ func NewAgentCacheManager(cfg config.CentralConfig, persistCacheEnabled bool) Ma m.migrators = []cacheMigrate{ m.migrateAccessRequest, m.migrateInstanceCount, + m.migrateManagedApplications, } } m.initializeCache(cfg) diff --git a/pkg/agent/cache/migratepersistedcache.go b/pkg/agent/cache/migratepersistedcache.go index 6903aed90..3693b8183 100644 --- a/pkg/agent/cache/migratepersistedcache.go +++ b/pkg/agent/cache/migratepersistedcache.go @@ -4,6 +4,7 @@ import ( "sync" v1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/api/v1" + catalog "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/catalog/v1alpha1" management "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/management/v1alpha1" defs "github.com/Axway/agent-sdk/pkg/apic/definitions" "github.com/Axway/agent-sdk/pkg/util" @@ -105,3 +106,28 @@ func (c *cacheManager) migrateInstanceCount(key string) error { } return nil } + +func (c *cacheManager) migrateManagedApplications(cacheKey string) error { + if cacheKey != managedAppKey { + return nil + } + + for _, key := range c.managedApplicationMap.GetKeys() { + cachedManagedApp, _ := c.managedApplicationMap.Get(key) + if cachedManagedApp == nil { + continue + } + ri, ok := cachedManagedApp.(*v1.ResourceInstance) + if !ok { + continue + } + manApp := management.ManagedApplication{} + err := manApp.FromInstance(ri) + if err != nil { + continue + } + catalogAppRef := manApp.GetReferenceByGVK(catalog.ApplicationGVK()) + c.managedApplicationMap.SetSecondaryKey(key, catalogAppRef.ID) + } + return nil +} From 382293d94031ae89d3b32e80ae5f640c75915a13 Mon Sep 17 00:00:00 2001 From: Jason Collins Date: Fri, 25 Oct 2024 20:10:18 -0700 Subject: [PATCH 04/13] split metric key func --- pkg/transaction/metric/util.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pkg/transaction/metric/util.go b/pkg/transaction/metric/util.go index 610ee8ea8..cda225cac 100644 --- a/pkg/transaction/metric/util.go +++ b/pkg/transaction/metric/util.go @@ -1,6 +1,8 @@ package metric import ( + "strings" + "github.com/Axway/agent-sdk/pkg/transaction/models" ) @@ -83,3 +85,11 @@ func centralMetricFromAPIMetric(in *APIMetric) *centralMetric { return out } + +func splitMetricKey(key string) (string, string) { + const delimiter = "." + + groupKey := strings.Join(strings.Split(key, delimiter)[:4], delimiter) + metricKey := strings.Split(key, delimiter)[4] + return groupKey, metricKey +} From 06f6404e0075655ee3180c462a04e1498cb156f7 Mon Sep 17 00:00:00 2001 From: Jason Collins Date: Fri, 25 Oct 2024 20:10:48 -0700 Subject: [PATCH 05/13] custom registry for histo, count, grouped metrics --- pkg/transaction/metric/registry.go | 73 ++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 pkg/transaction/metric/registry.go diff --git a/pkg/transaction/metric/registry.go b/pkg/transaction/metric/registry.go new file mode 100644 index 000000000..736a4f0cd --- /dev/null +++ b/pkg/transaction/metric/registry.go @@ -0,0 +1,73 @@ +package metric + +import ( + "fmt" + "sync" + + "github.com/rcrowley/go-metrics" +) + +type registry interface { + Each(func(string, interface{})) + Get(string) interface{} + Register(string, interface{}) error +} + +// The metric registry which can store counters, histogram, or grouped metrics +type metricRegistry struct { + metrics map[string]interface{} + mutex sync.RWMutex +} + +func newRegistry() registry { + return &metricRegistry{metrics: make(map[string]interface{})} +} + +func (r *metricRegistry) Each(f func(string, interface{})) { + metrics := r.registered() + for i := range metrics { + kv := &metrics[i] + f(kv.name, kv.value) + } +} + +func (r *metricRegistry) Get(name string) interface{} { + r.mutex.RLock() + defer r.mutex.RUnlock() + return r.metrics[name] +} + +func (r *metricRegistry) Register(name string, i interface{}) error { + r.mutex.Lock() + defer r.mutex.Unlock() + return r.register(name, i) +} + +func (r *metricRegistry) register(name string, i interface{}) error { + if _, ok := r.metrics[name]; ok { + return fmt.Errorf("duplicate metric: %s", name) + } + switch i.(type) { + case metrics.Counter, metrics.Histogram, groupedMetrics: + r.metrics[name] = i + } + return nil +} + +type metricKV struct { + name string + value interface{} +} + +func (r *metricRegistry) registered() []metricKV { + r.mutex.RLock() + defer r.mutex.RUnlock() + metrics := make([]metricKV, 0, len(r.metrics)) + for name, i := range r.metrics { + metrics = append(metrics, metricKV{ + name: name, + value: i, + }) + } + return metrics +} From 5e11796e2244fb6e9091d1555afe73dc17d650a6 Mon Sep 17 00:00:00 2001 From: Jason Collins Date: Fri, 25 Oct 2024 20:11:07 -0700 Subject: [PATCH 06/13] update cache load for custom units --- pkg/transaction/metric/cachestorage.go | 52 +++++++++++++++----------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/pkg/transaction/metric/cachestorage.go b/pkg/transaction/metric/cachestorage.go index 170878551..65d37733b 100644 --- a/pkg/transaction/metric/cachestorage.go +++ b/pkg/transaction/metric/cachestorage.go @@ -162,28 +162,41 @@ func (c *cacheStorage) loadMetrics(storageCache cache.Cache) { var cm cachedMetric json.Unmarshal(buffer, &cm) + apiDetails := models.APIDetails{} + if cm.API != nil { + apiDetails.ID = cm.API.ID + apiDetails.Name = cm.API.Name + } + appDetails := models.AppDetails{} + if cm.API != nil { + appDetails.ID = cm.App.ID + appDetails.ConsumerOrgID = cm.App.ConsumerOrgID + } + + if len(cm.Values) == 0 { + if cm.Unit == nil { + continue + } + + c.collector.AddCustomMetricDetail(CustomMetricDetail{ + APIDetails: apiDetails, + AppDetails: appDetails, + UnitDetails: models.Unit{ + Name: cm.Unit.Name, + }, + Count: cm.Count, + }) + continue + } + var metric *centralMetric for _, duration := range cm.Values { - metricDetail := Detail{ + metric = c.collector.createOrUpdateHistogram(Detail{ + APIDetails: apiDetails, + AppDetails: appDetails, StatusCode: cm.StatusCode, Duration: duration, - } - if cm.API != nil { - metricDetail.APIDetails = models.APIDetails{ - ID: cm.API.ID, - Name: cm.API.Name, - } - } - if cm.App != nil { - metricDetail.AppDetails = models.AppDetails{ - ID: cm.App.ID, - ConsumerOrgID: cm.App.ConsumerOrgID, - } - } - if cm.Unit != nil { - metricDetail.UnitName = cm.Unit.Name - } - metric = c.collector.createOrUpdateMetric(metricDetail) + }) } newKey := metric.getKey() @@ -193,9 +206,6 @@ func (c *cacheStorage) loadMetrics(storageCache cache.Cache) { c.storageLock.Unlock() } storageCache.Set(newKey, cm) - if metric != nil { - metric.Observation.Start = cm.StartTime.UnixMilli() - } } } } From 34ea93509b9b1e570e4d54b8c5a2d7936f1aa6a9 Mon Sep 17 00:00:00 2001 From: Jason Collins Date: Fri, 25 Oct 2024 20:12:03 -0700 Subject: [PATCH 07/13] remove start time from cached metric --- pkg/transaction/metric/centralmetric.go | 21 ++++++++++++++++++++- pkg/transaction/metric/definition.go | 1 - 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/pkg/transaction/metric/centralmetric.go b/pkg/transaction/metric/centralmetric.go index da8c4d430..b7d2313f2 100644 --- a/pkg/transaction/metric/centralmetric.go +++ b/pkg/transaction/metric/centralmetric.go @@ -6,9 +6,22 @@ import ( "time" "github.com/Axway/agent-sdk/pkg/transaction/models" + "github.com/rcrowley/go-metrics" "github.com/sirupsen/logrus" ) +type groupedMetrics struct { + counters map[string]metrics.Counter + histograms map[string]metrics.Histogram +} + +func newGroupedMetric() groupedMetrics { + return groupedMetrics{ + counters: make(map[string]metrics.Counter), + histograms: make(map[string]metrics.Histogram), + } +} + type centralMetric struct { Subscription *models.ResourceReference `json:"subscription,omitempty"` App *models.ApplicationResourceReference `json:"app,omitempty"` @@ -116,6 +129,13 @@ func (a *centralMetric) getKey() string { return strings.Join([]string{metricKeyPrefix, subID, appID, apiID, uniqueKey}, ".") } +// getKey - returns the cache key for the metric +func (a *centralMetric) getKeyParts() (string, string, string, string) { + key := a.getKey() + parts := strings.Split(key, ".") + return parts[1], parts[2], parts[3], parts[4] +} + func (a *centralMetric) createCachedMetric(cached cachedMetricInterface) cachedMetric { cacheM := cachedMetric{ Subscription: a.Subscription, @@ -126,7 +146,6 @@ func (a *centralMetric) createCachedMetric(cached cachedMetricInterface) cachedM ProductPlan: a.ProductPlan, Count: cached.Count(), Values: cached.Values(), - StartTime: time.UnixMilli(a.Observation.Start), } if a.Units.Transactions != nil { diff --git a/pkg/transaction/metric/definition.go b/pkg/transaction/metric/definition.go index 8a2c69fa6..85a311ce4 100644 --- a/pkg/transaction/metric/definition.go +++ b/pkg/transaction/metric/definition.go @@ -86,7 +86,6 @@ type cachedMetric struct { StatusCode string `json:"statusCode,omitempty"` Count int64 `json:"count"` Values []int64 `json:"values,omitempty"` - StartTime time.Time `json:"startTime"` } // V4EventDistribution - represents V4 distribution From d397769b3b3d8d8e35ce9964ce21f59b43f1a327 Mon Sep 17 00:00:00 2001 From: Jason Collins Date: Fri, 25 Oct 2024 20:12:34 -0700 Subject: [PATCH 08/13] updates to group api/app/sub metrics to single grouped metric --- pkg/transaction/metric/metricbatch.go | 24 ++- pkg/transaction/metric/metricevent.go | 4 +- pkg/transaction/metric/metricscollector.go | 172 +++++++++++++++------ 3 files changed, 142 insertions(+), 58 deletions(-) diff --git a/pkg/transaction/metric/metricbatch.go b/pkg/transaction/metric/metricbatch.go index 657a2e558..3822ca5f3 100644 --- a/pkg/transaction/metric/metricbatch.go +++ b/pkg/transaction/metric/metricbatch.go @@ -12,20 +12,28 @@ import ( const cancelMsg = "event cancelled, counts added at next publish" +type eventMetric struct { + histogram metrics.Histogram + counters map[string]metrics.Counter +} + // EventBatch - creates a batch of MetricEvents to send to Condor type EventBatch struct { beatPub.Batch events []beatPub.Event - histograms map[string]metrics.Histogram + batchMetrics map[string]eventMetric collector *collector haveBatchLock bool } // AddEvent - adds an event to the batch -func (b *EventBatch) AddEvent(event beatPub.Event, histogram metrics.Histogram) { +func (b *EventBatch) AddEvent(event beatPub.Event, histogram metrics.Histogram, counters map[string]metrics.Counter) { b.events = append(b.events, event) eventID := event.Content.Meta[metricKey].(string) - b.histograms[eventID] = histogram + b.batchMetrics[eventID] = eventMetric{ + histogram: histogram, + counters: counters, + } } // AddEvent - adds an event to the batch @@ -138,12 +146,12 @@ func (b *EventBatch) ackEvents(events []beatPub.Event) { continue } b.collector.logMetric("published", metric) - histogram, found := b.histograms[metric.EventID] - if !found { + + if eventMetric, ok := b.batchMetrics[metric.EventID]; ok { + b.collector.cleanupMetricCounters(eventMetric.histogram, eventMetric.counters, metric) + } else { b.collector.metricLogger.WithField("eventID", metric.EventID).Warn("could not clean cached metric") - continue } - b.collector.cleanupMetricCounter(histogram, metric) } } @@ -151,7 +159,7 @@ func (b *EventBatch) ackEvents(events []beatPub.Event) { func NewEventBatch(c *collector) *EventBatch { return &EventBatch{ collector: c, - histograms: make(map[string]metrics.Histogram), + batchMetrics: make(map[string]eventMetric), haveBatchLock: false, } } diff --git a/pkg/transaction/metric/metricevent.go b/pkg/transaction/metric/metricevent.go index a9d8be9a1..caaba5ac2 100644 --- a/pkg/transaction/metric/metricevent.go +++ b/pkg/transaction/metric/metricevent.go @@ -22,7 +22,7 @@ type CondorMetricEvent struct { } // AddCondorMetricEventToBatch - creates the condor metric event and adds to the batch -func AddCondorMetricEventToBatch(metricEvent V4Event, batch *EventBatch, histogram metrics.Histogram) error { +func AddCondorMetricEventToBatch(metricEvent V4Event, batch *EventBatch, histogram metrics.Histogram, counters map[string]metrics.Counter) error { metricData, _ := json.Marshal(metricEvent) cme := &CondorMetricEvent{ @@ -35,7 +35,7 @@ func AddCondorMetricEventToBatch(metricEvent V4Event, batch *EventBatch, histogr if err != nil { return err } - batch.AddEvent(event, histogram) + batch.AddEvent(event, histogram, counters) return nil } diff --git a/pkg/transaction/metric/metricscollector.go b/pkg/transaction/metric/metricscollector.go index 4fb774a23..dba6875ef 100644 --- a/pkg/transaction/metric/metricscollector.go +++ b/pkg/transaction/metric/metricscollector.go @@ -70,7 +70,7 @@ type collector struct { agentName string lock *sync.Mutex batchLock *sync.Mutex - registry metrics.Registry + registry registry metricBatch *EventBatch metricMap map[string]map[string]map[string]map[string]*centralMetric metricMapLock *sync.Mutex @@ -163,7 +163,7 @@ func createMetricCollector() Collector { lock: &sync.Mutex{}, batchLock: &sync.Mutex{}, metricMapLock: &sync.Mutex{}, - registry: metrics.NewRegistry(), + registry: newRegistry(), metricMap: make(map[string]map[string]map[string]map[string]*centralMetric), publishItemQueue: make([]publishQueueItem, 0), metricConfig: agent.GetCentralConfig().GetMetricReportingConfig(), @@ -270,7 +270,16 @@ func (c *collector) AddMetric(apiDetails models.APIDetails, statusCode string, d // AddMetricDetail - add metric for API transaction and consumer subscription to collection func (c *collector) AddMetricDetail(metricDetail Detail) { c.AddMetric(metricDetail.APIDetails, metricDetail.StatusCode, metricDetail.Duration, metricDetail.Bytes, metricDetail.APIDetails.Name) - c.createOrUpdateMetric(metricDetail) + c.createOrUpdateHistogram(metricDetail) + // TODO remove this after testing + c.AddCustomMetricDetail(CustomMetricDetail{ + APIDetails: metricDetail.APIDetails, + AppDetails: metricDetail.AppDetails, + UnitDetails: models.Unit{ + Name: "x-custom-token", + }, + Count: 30, + }) } // AddAPIMetricDetail - add metric details for several response codes and transactions @@ -329,10 +338,6 @@ func (c *collector) AddCustomMetricDetail(detail CustomMetricDetail) { } metric := c.createMetric(transactionCtx) - metric.Observation = &ObservationDetails{ - Start: detail.Observation.Start, - End: detail.Observation.End, - } if m := c.getExistingMetric(metric); m != nil { // use the cached metric @@ -342,7 +347,7 @@ func (c *collector) AddCustomMetricDetail(detail CustomMetricDetail) { // add the count metric.Units.CustomUnits[detail.UnitDetails.Name].Count += detail.Count - counter := c.getOrRegisterCounter(metric.getKey()) + counter := c.getOrRegisterGroupedCounter(metric.getKey()) counter.Inc(detail.Count) c.updateStartTime() @@ -439,7 +444,7 @@ func (c *collector) createMetric(detail transactionContext) *centralMetric { return me } -func (c *collector) createOrUpdateMetric(detail Detail) *centralMetric { +func (c *collector) createOrUpdateHistogram(detail Detail) *centralMetric { if !c.metricConfig.CanPublish() || c.usageConfig.IsOfflineMode() { return nil // no need to update metrics with publish off } @@ -453,7 +458,7 @@ func (c *collector) createOrUpdateMetric(detail Detail) *centralMetric { metric := c.createMetric(transactionCtx) - histogram := c.getOrRegisterHistogram(metric.getKey()) + histogram := c.getOrRegisterGroupedHistogram(metric.getKey()) histogram.Update(detail.Duration) return c.updateMetricWithCachedMetric(metric, newCustomHistogram(histogram)) @@ -507,7 +512,7 @@ func (c *collector) updateMetricWithCachedMetric(metric *centralMetric, cached c // getAccessRequest - func (c *collector) getAccessRequestAndManagedApp(cacheManager cache.Manager, detail transactionContext) (*management.AccessRequest, *v1.ResourceInstance) { - if detail.AppDetails.Name == "" { + if detail.AppDetails.Name == "" && detail.AppDetails.ID == "" { return nil, nil } @@ -517,7 +522,14 @@ func (c *collector) getAccessRequestAndManagedApp(cacheManager cache.Manager, de Trace("metric collector information") // get the managed application - managedApp := cacheManager.GetManagedApplicationByName(detail.AppDetails.Name) + // cached metrics will only have the catalog api id + managedApp := cacheManager.GetManagedApplicationByApplicationID(detail.AppDetails.ID) + if managedApp == nil { + managedApp = cacheManager.GetManagedApplication(detail.AppDetails.ID) + } + if managedApp == nil { + managedApp = cacheManager.GetManagedApplicationByName(detail.AppDetails.Name) + } if managedApp == nil { c.logger. WithField("appName", detail.AppDetails.Name). @@ -552,7 +564,7 @@ func (c *collector) createSubscriptionDetail(accessRequest *management.AccessReq } subRef := accessRequest.GetReferenceByGVK(catalog.SubscriptionGVK()) - if subRef.ID != "" { + if subRef.ID == "" { return nil } @@ -791,29 +803,78 @@ func (c *collector) generateUsageEvent(orgGUID string) { c.publishItemQueue = append(c.publishItemQueue, queueItem) } -func (c *collector) processMetric(metricName string, metric interface{}) { +func (c *collector) processMetric(metricName string, groupedMetric interface{}) { c.metricMapLock.Lock() defer c.metricMapLock.Unlock() elements := strings.Split(metricName, ".") - if len(elements) == 5 { + if len(elements) == 4 { subscriptionID := elements[1] appID := elements[2] apiID := strings.ReplaceAll(elements[3], "#", ".") - statusCode := elements[4] if appMap, ok := c.metricMap[subscriptionID]; ok { if apiMap, ok := appMap[appID]; ok { - if statusMap, ok := apiMap[apiID]; ok { - if statusDetail, ok := statusMap[statusCode]; ok { - statusMetric := (metric.(metrics.Histogram)) - c.setMetricsFromHistogram(statusDetail, statusMetric) - c.generateMetricEvent(statusMetric, statusDetail) - } + if groupMap, ok := apiMap[apiID]; ok { + logger := c.logger.WithField("subscriptionID", subscriptionID).WithField("applicationID", appID).WithField("apiID", apiID) + c.handleGroupedMetric(logger, groupedMetric, groupMap) } } } } } +func (c *collector) handleGroupedMetric(logger log.FieldLogger, groupedMetricInterface interface{}, groupMap map[string]*centralMetric) { + groupedMetric, ok := groupedMetricInterface.(groupedMetrics) + if !ok { + logger.Error("metric data to process was not the expected type") + return + } + + countersAdded := false + // handle each histogram, on the first one add the counter information + for k, histo := range groupedMetric.histograms { + logger := logger.WithField("status", k) + metric, ok := groupMap[k] + if !ok { + logger.Error("no metrics in map for status") + continue + } + c.setMetricsFromHistogram(metric, histo) + var counters map[string]metrics.Counter + if !countersAdded { + c.setMetricCounters(logger, metric, groupedMetric.counters, groupMap) + counters = groupedMetric.counters + countersAdded = true + } + c.generateMetricEvent(histo, counters, metric) + } +} + +func (c *collector) setMetricCounters(logger log.FieldLogger, metricData *centralMetric, counters map[string]metrics.Counter, groupMap map[string]*centralMetric) { + metricData.Units.CustomUnits = map[string]*UnitCount{} + + for k, counter := range counters { + logger := logger.WithField("unit", k) + metric, ok := groupMap[k] + if !ok { + logger.Error("no counter in map for unit") + continue + } + + // create a new quota pointer + var quota *models.ResourceReference + if metric.Units.CustomUnits[k].Quota != nil { + quota = &models.ResourceReference{ + ID: metric.Units.CustomUnits[k].Quota.ID, + } + } + + metricData.Units.CustomUnits[k] = &UnitCount{ + Count: counter.Count(), + Quota: quota, + } + } +} + func (c *collector) setMetricsFromHistogram(metrics *centralMetric, histogram metrics.Histogram) { metrics.Units.Transactions.Count = histogram.Count() metrics.Units.Transactions.Response = &ResponseMetrics{ @@ -823,7 +884,7 @@ func (c *collector) setMetricsFromHistogram(metrics *centralMetric, histogram me } } -func (c *collector) generateMetricEvent(histogram metrics.Histogram, metric *centralMetric) { +func (c *collector) generateMetricEvent(histogram metrics.Histogram, counters map[string]metrics.Counter, metric *centralMetric) { if metric.Units != nil && metric.Units.Transactions != nil && metric.Units.Transactions.Count == 0 { c.logger.Trace("skipping registry entry with no reported quantity") return @@ -841,7 +902,7 @@ func (c *collector) generateMetricEvent(histogram metrics.Histogram, metric *cen } // Generate app subscription metric - c.generateV4Event(histogram, metric) + c.generateV4Event(histogram, counters, metric) } func (c *collector) createV4Event(startTime int64, v4data V4Data) V4Event { @@ -859,10 +920,10 @@ func (c *collector) createV4Event(startTime int64, v4data V4Data) V4Event { } } -func (c *collector) generateV4Event(histogram metrics.Histogram, v4data V4Data) { +func (c *collector) generateV4Event(histogram metrics.Histogram, counters map[string]metrics.Counter, v4data V4Data) { generatedEvent := c.createV4Event(c.metricStartTime.UnixMilli(), v4data) c.metricLogger.WithFields(generatedEvent.getLogFields()).Info("generated") - AddCondorMetricEventToBatch(generatedEvent, c.metricBatch, histogram) + AddCondorMetricEventToBatch(generatedEvent, c.metricBatch, histogram, counters) } func (c *collector) getOrRegisterCounter(name string) metrics.Counter { @@ -874,14 +935,34 @@ func (c *collector) getOrRegisterCounter(name string) metrics.Counter { return counter.(metrics.Counter) } -func (c *collector) getOrRegisterHistogram(name string) metrics.Histogram { - histogram := c.registry.Get(name) - if histogram == nil { +func (c *collector) getOrRegisterGroupedMetrics(name string) groupedMetrics { + group := c.registry.Get(name) + if group == nil { + group = newGroupedMetric() + c.registry.Register(name, group) + } + return group.(groupedMetrics) +} + +func (c *collector) getOrRegisterGroupedCounter(name string) metrics.Counter { + groupKey, countKey := splitMetricKey(name) + groupedMetric := c.getOrRegisterGroupedMetrics(groupKey) + + if _, ok := groupedMetric.counters[countKey]; !ok { + groupedMetric.counters[countKey] = metrics.NewCounter() + } + return groupedMetric.counters[countKey] +} + +func (c *collector) getOrRegisterGroupedHistogram(name string) metrics.Histogram { + groupKey, histoKey := splitMetricKey(name) + groupedMetric := c.getOrRegisterGroupedMetrics(groupKey) + + if _, ok := groupedMetric.histograms[histoKey]; !ok { sampler := metrics.NewUniformSample(2048) - histogram = metrics.NewHistogram(sampler) - c.registry.Register(name, histogram) + groupedMetric.histograms[histoKey] = metrics.NewHistogram(sampler) } - return histogram.(metrics.Histogram) + return groupedMetric.histograms[histoKey] } func (c *collector) publishEvents() { @@ -934,28 +1015,23 @@ func (c *collector) logMetric(msg string, metric *centralMetric) { c.metricLogger.WithField("id", metric.EventID).Info(msg) } -func (c *collector) cleanupMetricCounter(histogram metrics.Histogram, metric *centralMetric) { +func (c *collector) cleanupMetricCounters(histogram metrics.Histogram, counters map[string]metrics.Counter, metric *centralMetric) { c.metricMapLock.Lock() defer c.metricMapLock.Unlock() - subID := unknown - if metric.Subscription != nil { - subID = metric.Subscription.ID - } - appID := unknown - if metric.App != nil { - appID = metric.App.ID - } - apiID := metric.API.ID - if metric.API != nil { - apiID = metric.API.ID - } - status := metric.Units.Transactions.Status + subID, appID, apiID, group := metric.getKeyParts() if consumerAppMap, ok := c.metricMap[subID]; ok { if apiMap, ok := consumerAppMap[appID]; ok { if apiStatusMap, ok := apiMap[apiID]; ok { - c.storage.removeMetric(apiStatusMap[status]) - delete(c.metricMap[subID][appID][apiID], status) + c.storage.removeMetric(apiStatusMap[group]) + delete(c.metricMap[subID][appID][apiID], group) histogram.Clear() + + // clean any counters, if needed + for k, counter := range counters { + c.storage.removeMetric(apiStatusMap[k]) + delete(c.metricMap[subID][appID][apiID], k) + counter.Clear() + } } if len(c.metricMap[subID][appID][apiID]) == 0 { delete(c.metricMap[subID][appID], apiID) From 6aa83aa6fa4b0979c0a4777044cf7a8f92ee273f Mon Sep 17 00:00:00 2001 From: Jason Collins Date: Fri, 25 Oct 2024 21:23:14 -0700 Subject: [PATCH 09/13] concurrent map write handling --- pkg/transaction/metric/centralmetric.go | 24 +++++++++++++++++ pkg/transaction/metric/metricscollector.go | 31 ++++++++++------------ 2 files changed, 38 insertions(+), 17 deletions(-) diff --git a/pkg/transaction/metric/centralmetric.go b/pkg/transaction/metric/centralmetric.go index b7d2313f2..ffcca4557 100644 --- a/pkg/transaction/metric/centralmetric.go +++ b/pkg/transaction/metric/centralmetric.go @@ -3,6 +3,7 @@ package metric import ( "fmt" "strings" + "sync" "time" "github.com/Axway/agent-sdk/pkg/transaction/models" @@ -11,17 +12,40 @@ import ( ) type groupedMetrics struct { + lock *sync.Mutex counters map[string]metrics.Counter histograms map[string]metrics.Histogram } func newGroupedMetric() groupedMetrics { return groupedMetrics{ + lock: &sync.Mutex{}, counters: make(map[string]metrics.Counter), histograms: make(map[string]metrics.Histogram), } } +func (g groupedMetrics) getOrCreateHistogram(key string) metrics.Histogram { + g.lock.Lock() + defer g.lock.Unlock() + + if _, ok := g.histograms[key]; !ok { + sampler := metrics.NewUniformSample(2048) + g.histograms[key] = metrics.NewHistogram(sampler) + } + return g.histograms[key] +} + +func (g groupedMetrics) getOrCreateCounter(key string) metrics.Counter { + g.lock.Lock() + defer g.lock.Unlock() + + if _, ok := g.counters[key]; !ok { + g.counters[key] = metrics.NewCounter() + } + return g.counters[key] +} + type centralMetric struct { Subscription *models.ResourceReference `json:"subscription,omitempty"` App *models.ApplicationResourceReference `json:"app,omitempty"` diff --git a/pkg/transaction/metric/metricscollector.go b/pkg/transaction/metric/metricscollector.go index dba6875ef..2809dfe9a 100644 --- a/pkg/transaction/metric/metricscollector.go +++ b/pkg/transaction/metric/metricscollector.go @@ -272,14 +272,14 @@ func (c *collector) AddMetricDetail(metricDetail Detail) { c.AddMetric(metricDetail.APIDetails, metricDetail.StatusCode, metricDetail.Duration, metricDetail.Bytes, metricDetail.APIDetails.Name) c.createOrUpdateHistogram(metricDetail) // TODO remove this after testing - c.AddCustomMetricDetail(CustomMetricDetail{ - APIDetails: metricDetail.APIDetails, - AppDetails: metricDetail.AppDetails, - UnitDetails: models.Unit{ - Name: "x-custom-token", - }, - Count: 30, - }) + // c.AddCustomMetricDetail(CustomMetricDetail{ + // APIDetails: metricDetail.APIDetails, + // AppDetails: metricDetail.AppDetails, + // UnitDetails: models.Unit{ + // Name: "x-custom-token", + // }, + // Count: 30, + // }) } // AddAPIMetricDetail - add metric details for several response codes and transactions @@ -309,6 +309,10 @@ func (c *collector) AddCustomMetricDetail(detail CustomMetricDetail) { if !c.metricConfig.CanPublish() || c.usageConfig.IsOfflineMode() { return } + c.lock.Lock() + defer c.lock.Unlock() + c.batchLock.Lock() + defer c.batchLock.Unlock() logger := c.logger.WithField("handler", "customMetric"). WithField("apiID", detail.APIDetails.ID). @@ -948,21 +952,14 @@ func (c *collector) getOrRegisterGroupedCounter(name string) metrics.Counter { groupKey, countKey := splitMetricKey(name) groupedMetric := c.getOrRegisterGroupedMetrics(groupKey) - if _, ok := groupedMetric.counters[countKey]; !ok { - groupedMetric.counters[countKey] = metrics.NewCounter() - } - return groupedMetric.counters[countKey] + return groupedMetric.getOrCreateCounter(countKey) } func (c *collector) getOrRegisterGroupedHistogram(name string) metrics.Histogram { groupKey, histoKey := splitMetricKey(name) groupedMetric := c.getOrRegisterGroupedMetrics(groupKey) - if _, ok := groupedMetric.histograms[histoKey]; !ok { - sampler := metrics.NewUniformSample(2048) - groupedMetric.histograms[histoKey] = metrics.NewHistogram(sampler) - } - return groupedMetric.histograms[histoKey] + return groupedMetric.getOrCreateHistogram(histoKey) } func (c *collector) publishEvents() { From 4408d1fa2f6e93fc36856a2d9cc017b1768d0653 Mon Sep 17 00:00:00 2001 From: Jason Collins Date: Mon, 28 Oct 2024 10:17:34 -0700 Subject: [PATCH 10/13] update getting proper quota ref for unit --- pkg/transaction/metric/metricscollector.go | 38 ++++++++++++++++++++-- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/pkg/transaction/metric/metricscollector.go b/pkg/transaction/metric/metricscollector.go index 2809dfe9a..17db26c6b 100644 --- a/pkg/transaction/metric/metricscollector.go +++ b/pkg/transaction/metric/metricscollector.go @@ -430,7 +430,7 @@ func (c *collector) createMetric(detail transactionContext) *centralMetric { me.Units = &Units{ Transactions: &Transactions{ UnitCount: UnitCount{ - Quota: c.getQuota(accessRequest, ""), // TODO figure this out for transaction quota + Quota: c.getQuota(accessRequest, ""), }, Status: c.getStatusText(detail.Status), }, @@ -662,12 +662,44 @@ func (c *collector) getProductPlan(accessRequest *management.AccessRequest) *mod } } -func (c *collector) getQuota(accessRequest *management.AccessRequest, id string) *models.ResourceReference { +func (c *collector) getQuota(accessRequest *management.AccessRequest, unitName string) *models.ResourceReference { if accessRequest == nil { return nil } - quotaRef := accessRequest.GetReferenceByIDAndGVK(id, catalog.QuotaGVK()) + if unitName == "" && accessRequest.Spec.Quota == nil { + // no quota on transactions + return nil + } + + quotaName := "" + if unitName == "" && accessRequest.Spec.Quota != nil { + // get transactions quota + for _, r := range accessRequest.References { + rMap := r.(map[string]string) + if rMap["kind"] != catalog.QuotaGVK().Kind { + continue + } + if _, ok := rMap["unit"]; !ok { + // no unit is transactions + quotaName = strings.Split(rMap["name"], "/")[2] + } + } + } else { + // get custom unit quota + for _, r := range accessRequest.References { + rMap := r.(map[string]string) + if rMap["kind"] != catalog.QuotaGVK().Kind { + continue + } + if unit, ok := rMap["unit"]; ok && unitName == unit { + // no unit is transactions + quotaName = strings.Split(rMap["name"], "/")[2] + } + } + } + + quotaRef := accessRequest.GetReferenceByNameAndGVK(quotaName, catalog.QuotaGVK()) if quotaRef.ID == "" { return nil } From 86c7e98754362979b825f20c1c9e347f0ac53b45 Mon Sep 17 00:00:00 2001 From: Jason Collins Date: Mon, 28 Oct 2024 10:32:41 -0700 Subject: [PATCH 11/13] lookup api service id in cache --- pkg/transaction/metric/metricscollector.go | 7 ++++++- pkg/transaction/metric/util.go | 8 +++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/pkg/transaction/metric/metricscollector.go b/pkg/transaction/metric/metricscollector.go index 17db26c6b..0a30477bc 100644 --- a/pkg/transaction/metric/metricscollector.go +++ b/pkg/transaction/metric/metricscollector.go @@ -604,12 +604,17 @@ func (c *collector) createAppDetail(appRI *v1.ResourceInstance) *models.Applicat } func (c *collector) createAPIDetail(api models.APIDetails) *models.APIResourceReference { - return &models.APIResourceReference{ + ref := &models.APIResourceReference{ ResourceReference: models.ResourceReference{ ID: api.ID, }, Name: api.Name, } + svc, err := agent.GetAPICache().GetBySecondaryKey(strings.TrimPrefix(api.ID, transutil.SummaryEventProxyIDPrefix)) + if err == nil { + ref.APIServiceID = svc.(v1.ResourceInstance).Metadata.ID + } + return ref } func (c *collector) getAssetResource(accessRequest *management.AccessRequest) *models.ResourceReference { diff --git a/pkg/transaction/metric/util.go b/pkg/transaction/metric/util.go index cda225cac..4367809f1 100644 --- a/pkg/transaction/metric/util.go +++ b/pkg/transaction/metric/util.go @@ -3,7 +3,10 @@ package metric import ( "strings" + "github.com/Axway/agent-sdk/pkg/agent" + v1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/api/v1" "github.com/Axway/agent-sdk/pkg/transaction/models" + transutil "github.com/Axway/agent-sdk/pkg/transaction/util" ) func centralMetricFromAPIMetric(in *APIMetric) *centralMetric { @@ -61,7 +64,10 @@ func centralMetricFromAPIMetric(in *APIMetric) *centralMetric { ID: in.API.ID, }, Name: in.API.Name, - //TODO find api service ID + } + svc, err := agent.GetAPICache().GetBySecondaryKey(strings.TrimPrefix(in.API.ID, transutil.SummaryEventProxyIDPrefix)) + if err == nil { + out.API.APIServiceID = svc.(v1.ResourceInstance).Metadata.ID } } From 71472265acb42ad323a025a64834ec9813cc0cd4 Mon Sep 17 00:00:00 2001 From: Jason Collins Date: Mon, 28 Oct 2024 11:17:41 -0700 Subject: [PATCH 12/13] api service id lookup --- pkg/transaction/metric/cachestorage.go | 2 +- pkg/transaction/metric/metricscollector.go | 41 ++++++++++++---------- pkg/transaction/metric/util.go | 7 ++-- 3 files changed, 27 insertions(+), 23 deletions(-) diff --git a/pkg/transaction/metric/cachestorage.go b/pkg/transaction/metric/cachestorage.go index 65d37733b..56f3bdc18 100644 --- a/pkg/transaction/metric/cachestorage.go +++ b/pkg/transaction/metric/cachestorage.go @@ -168,7 +168,7 @@ func (c *cacheStorage) loadMetrics(storageCache cache.Cache) { apiDetails.Name = cm.API.Name } appDetails := models.AppDetails{} - if cm.API != nil { + if cm.App != nil { appDetails.ID = cm.App.ID appDetails.ConsumerOrgID = cm.App.ConsumerOrgID } diff --git a/pkg/transaction/metric/metricscollector.go b/pkg/transaction/metric/metricscollector.go index 0a30477bc..fe78bb27b 100644 --- a/pkg/transaction/metric/metricscollector.go +++ b/pkg/transaction/metric/metricscollector.go @@ -272,14 +272,14 @@ func (c *collector) AddMetricDetail(metricDetail Detail) { c.AddMetric(metricDetail.APIDetails, metricDetail.StatusCode, metricDetail.Duration, metricDetail.Bytes, metricDetail.APIDetails.Name) c.createOrUpdateHistogram(metricDetail) // TODO remove this after testing - // c.AddCustomMetricDetail(CustomMetricDetail{ - // APIDetails: metricDetail.APIDetails, - // AppDetails: metricDetail.AppDetails, - // UnitDetails: models.Unit{ - // Name: "x-custom-token", - // }, - // Count: 30, - // }) + c.AddCustomMetricDetail(CustomMetricDetail{ + APIDetails: metricDetail.APIDetails, + AppDetails: metricDetail.AppDetails, + UnitDetails: models.Unit{ + Name: "x-ai-tokens", + }, + Count: 30, + }) } // AddAPIMetricDetail - add metric details for several response codes and transactions @@ -610,9 +610,9 @@ func (c *collector) createAPIDetail(api models.APIDetails) *models.APIResourceRe }, Name: api.Name, } - svc, err := agent.GetAPICache().GetBySecondaryKey(strings.TrimPrefix(api.ID, transutil.SummaryEventProxyIDPrefix)) - if err == nil { - ref.APIServiceID = svc.(v1.ResourceInstance).Metadata.ID + svc := agent.GetCacheManager().GetAPIServiceWithAPIID(strings.TrimPrefix(api.ID, transutil.SummaryEventProxyIDPrefix)) + if svc != nil { + ref.APIServiceID = svc.Metadata.ID } return ref } @@ -681,28 +681,33 @@ func (c *collector) getQuota(accessRequest *management.AccessRequest, unitName s if unitName == "" && accessRequest.Spec.Quota != nil { // get transactions quota for _, r := range accessRequest.References { - rMap := r.(map[string]string) - if rMap["kind"] != catalog.QuotaGVK().Kind { + rMap := r.(map[string]interface{}) + if rMap["kind"].(string) != catalog.QuotaGVK().Kind { continue } if _, ok := rMap["unit"]; !ok { // no unit is transactions - quotaName = strings.Split(rMap["name"], "/")[2] + quotaName = strings.Split(rMap["name"].(string), "/")[2] + break } } } else { // get custom unit quota for _, r := range accessRequest.References { - rMap := r.(map[string]string) - if rMap["kind"] != catalog.QuotaGVK().Kind { + rMap := r.(map[string]interface{}) + if rMap["kind"].(string) != catalog.QuotaGVK().Kind { continue } - if unit, ok := rMap["unit"]; ok && unitName == unit { + if unit, ok := rMap["unit"]; ok && unitName == unit.(string) { // no unit is transactions - quotaName = strings.Split(rMap["name"], "/")[2] + quotaName = strings.Split(rMap["name"].(string), "/")[2] + break } } } + if quotaName == "" { + return nil + } quotaRef := accessRequest.GetReferenceByNameAndGVK(quotaName, catalog.QuotaGVK()) if quotaRef.ID == "" { diff --git a/pkg/transaction/metric/util.go b/pkg/transaction/metric/util.go index 4367809f1..bedc2686a 100644 --- a/pkg/transaction/metric/util.go +++ b/pkg/transaction/metric/util.go @@ -4,7 +4,6 @@ import ( "strings" "github.com/Axway/agent-sdk/pkg/agent" - v1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/api/v1" "github.com/Axway/agent-sdk/pkg/transaction/models" transutil "github.com/Axway/agent-sdk/pkg/transaction/util" ) @@ -65,9 +64,9 @@ func centralMetricFromAPIMetric(in *APIMetric) *centralMetric { }, Name: in.API.Name, } - svc, err := agent.GetAPICache().GetBySecondaryKey(strings.TrimPrefix(in.API.ID, transutil.SummaryEventProxyIDPrefix)) - if err == nil { - out.API.APIServiceID = svc.(v1.ResourceInstance).Metadata.ID + svc := agent.GetCacheManager().GetAPIServiceWithAPIID(strings.TrimPrefix(in.API.ID, transutil.SummaryEventProxyIDPrefix)) + if svc != nil { + out.API.APIServiceID = svc.Metadata.ID } } From bfba16556483835cf0f8fcfd52ff2c42155beb38 Mon Sep 17 00:00:00 2001 From: Jason Collins Date: Mon, 28 Oct 2024 11:21:57 -0700 Subject: [PATCH 13/13] remove TODO from testing --- pkg/transaction/metric/metricscollector.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/pkg/transaction/metric/metricscollector.go b/pkg/transaction/metric/metricscollector.go index fe78bb27b..9a87451f4 100644 --- a/pkg/transaction/metric/metricscollector.go +++ b/pkg/transaction/metric/metricscollector.go @@ -271,15 +271,6 @@ func (c *collector) AddMetric(apiDetails models.APIDetails, statusCode string, d func (c *collector) AddMetricDetail(metricDetail Detail) { c.AddMetric(metricDetail.APIDetails, metricDetail.StatusCode, metricDetail.Duration, metricDetail.Bytes, metricDetail.APIDetails.Name) c.createOrUpdateHistogram(metricDetail) - // TODO remove this after testing - c.AddCustomMetricDetail(CustomMetricDetail{ - APIDetails: metricDetail.APIDetails, - AppDetails: metricDetail.AppDetails, - UnitDetails: models.Unit{ - Name: "x-ai-tokens", - }, - Count: 30, - }) } // AddAPIMetricDetail - add metric details for several response codes and transactions