From 5f992f850b1c7f8c57cc236bea203efbefe034cd Mon Sep 17 00:00:00 2001 From: Alin Rosca <137875910+alrosca@users.noreply.github.com> Date: Mon, 30 Sep 2024 20:00:52 +0300 Subject: [PATCH] APIGOV-28410 usage reports on schedule (#826) * APIGOV-28410 usage reports on schedule * APIGOV-28410 fix deadlock * APIGOV-28410 updates * APIGOV-28410 fix tests * APIGOV-28410 change func name --- pkg/transaction/metric/cachestorage.go | 6 +-- .../metric/metricscollector_test.go | 14 ++++++ pkg/transaction/metric/reportcache.go | 50 +++++++++++++++++++ pkg/transaction/metric/usagepublisher.go | 13 +++-- 4 files changed, 76 insertions(+), 7 deletions(-) diff --git a/pkg/transaction/metric/cachestorage.go b/pkg/transaction/metric/cachestorage.go index cd31e63e3..1779c23ad 100644 --- a/pkg/transaction/metric/cachestorage.go +++ b/pkg/transaction/metric/cachestorage.go @@ -84,13 +84,13 @@ func (c *cacheStorage) initialize() { func (c *cacheStorage) loadUsage(storageCache cache.Cache) { // update the collector usage start time - usageStartTime, err := c.parseTimeFromCache(storageCache, usageStartTimeKey) + usageStartTime, err := parseTimeFromCache(storageCache, usageStartTimeKey) if err == nil && !agent.GetCentralConfig().GetUsageReportingConfig().IsOfflineMode() { // do not load this start time when offline c.collector.usageStartTime = usageStartTime } // update the collector metric start time - metricStartTime, err := c.parseTimeFromCache(storageCache, metricStartTimeKey) + metricStartTime, err := parseTimeFromCache(storageCache, metricStartTimeKey) if err == nil && !agent.GetCentralConfig().GetUsageReportingConfig().IsOfflineMode() { // do not load this start time when offline c.collector.metricStartTime = metricStartTime @@ -254,7 +254,7 @@ func (c *cacheStorage) storeCacheJob() { } } -func (c *cacheStorage) parseTimeFromCache(storage cache.Cache, key string) (time.Time, error) { +func parseTimeFromCache(storage cache.Cache, key string) (time.Time, error) { resultTime := now() item, err := storage.Get(key) if err != nil { diff --git a/pkg/transaction/metric/metricscollector_test.go b/pkg/transaction/metric/metricscollector_test.go index 6fe7ee76a..8aa891415 100644 --- a/pkg/transaction/metric/metricscollector_test.go +++ b/pkg/transaction/metric/metricscollector_test.go @@ -46,6 +46,10 @@ var ( traceStatus = healthcheck.OK ) +func getFutureTime() time.Time { + return time.Now().Add(10 * time.Minute) +} + func createCentralCfg(url, env string) *config.CentralConfiguration { cfg := config.NewCentralConfig(config.TraceabilityAgent).(*config.CentralConfiguration) @@ -64,6 +68,7 @@ func createCentralCfg(url, env string) *config.CentralConfiguration { usgCfg.Publish = true metricCfg := cfg.MetricReporting.(*config.MetricReportingConfiguration) metricCfg.Publish = true + // metricCfg.Schedule = "1 * * * * * *" return cfg } @@ -627,6 +632,9 @@ func TestMetricCollectorUsageAggregation(t *testing.T) { setupMockClient(0) myCollector := createMetricCollector() metricCollector := myCollector.(*collector) + metricCollector.usagePublisher.schedule = "* * * * *" + metricCollector.usagePublisher.report.currTimeFunc = getFutureTime + mockReports := generateMockReports(test.transactionsPerReport) b, _ := json.Marshal(mockReports) metricCollector.reports.reportCache.Set("lighthouse_events", string(b)) @@ -679,6 +687,8 @@ func TestMetricCollectorCache(t *testing.T) { traceability.SetDataDirPath(".") myCollector := createMetricCollector() metricCollector := myCollector.(*collector) + metricCollector.usagePublisher.schedule = "* * * * *" + metricCollector.usagePublisher.report.currTimeFunc = getFutureTime metricCollector.AddMetric(apiDetails1, "200", 5, 10, "") metricCollector.AddMetric(apiDetails1, "200", 10, 10, "") @@ -701,6 +711,8 @@ func TestMetricCollectorCache(t *testing.T) { // Recreate the collector that loads the stored metrics, so 3 transactions myCollector = createMetricCollector() metricCollector = myCollector.(*collector) + metricCollector.usagePublisher.schedule = "* * * * *" + metricCollector.usagePublisher.report.currTimeFunc = getFutureTime metricCollector.AddMetric(apiDetails1, "200", 5, 10, "") metricCollector.AddMetric(apiDetails1, "200", 10, 10, "") @@ -721,6 +733,8 @@ func TestMetricCollectorCache(t *testing.T) { // Recreate the collector that loads the stored metrics, 0 transactions myCollector = createMetricCollector() metricCollector = myCollector.(*collector) + metricCollector.usagePublisher.schedule = "* * * * *" + metricCollector.usagePublisher.report.currTimeFunc = getFutureTime metricCollector.Execute() // Validate only no usage report sent as no previous or new transactions diff --git a/pkg/transaction/metric/reportcache.go b/pkg/transaction/metric/reportcache.go index b1e04463e..95ae212bc 100644 --- a/pkg/transaction/metric/reportcache.go +++ b/pkg/transaction/metric/reportcache.go @@ -18,16 +18,20 @@ import ( "github.com/Axway/agent-sdk/pkg/jobs" "github.com/Axway/agent-sdk/pkg/traceability" "github.com/Axway/agent-sdk/pkg/util/log" + "github.com/gorhill/cronexpr" ) const ( eventsKey = "lighthouse_events" + lastPublishTimestampKey = "timestamp" offlineCacheFileName = "agent-report-working.json" offlineReportSuffix = "usage_report.json" offlineReportDateFormat = "2006_01_02" qaOfflineReportDateFormat = "2006_01_02_15_04" ) +type currentTimeFunc func() time.Time + type usageReportCache struct { jobs.Job logger log.FieldLogger @@ -36,6 +40,7 @@ type usageReportCache struct { reportCacheLock sync.Mutex isInitialized bool offlineReportDateFormat string + currTimeFunc currentTimeFunc } func newReportCache() *usageReportCache { @@ -46,6 +51,7 @@ func newReportCache() *usageReportCache { reportCache: cache.New(), isInitialized: false, offlineReportDateFormat: offlineReportDateFormat, + currTimeFunc: time.Now, } if agent.GetCentralConfig().GetUsageReportingConfig().UsingQAVars() { reportManager.offlineReportDateFormat = qaOfflineReportDateFormat @@ -110,6 +116,23 @@ func (c *usageReportCache) updateEvents(lighthouseEvent UsageEvent) { c.setEvents(lighthouseEvent) } +func (c *usageReportCache) setLastPublishTimestamp(lastPublishTimestamp time.Time) { + c.reportCache.Set(lastPublishTimestampKey, lastPublishTimestamp) + c.reportCache.Save(c.cacheFilePath) +} + +func (c *usageReportCache) getLastPublishTimestamp() time.Time { + c.reportCacheLock.Lock() + defer c.reportCacheLock.Unlock() + + lastPublishTime, err := parseTimeFromCache(c.reportCache, lastPublishTimestampKey) + if err != nil { + return time.Time{} + } + + return lastPublishTime +} + func (c *usageReportCache) generateReportPath(timestamp ISO8601Time, index int) string { format := "%s_%s" if index != 0 { @@ -233,7 +256,34 @@ func (c *usageReportCache) sendReport(publishFunc func(event UsageEvent) error) return nil } + // update the publish time + lastPublishTime := time.Now() + c.setLastPublishTimestamp(lastPublishTime) + savedEvents.Report = make(map[string]UsageReport) c.setEvents(savedEvents) return nil } + +func (c *usageReportCache) shouldPublish(schedule string) bool { + currentTime := c.currTimeFunc() + lastPublishTimestamp := c.getLastPublishTimestamp() + + // if the last publish was made more than a day ago, publish + elapsedTimeSinceLastPublish := currentTime.Sub(lastPublishTimestamp) + if lastPublishTimestamp.IsZero() || elapsedTimeSinceLastPublish >= 24*time.Hour { + return true + } + + cronSchedule, err := cronexpr.Parse(schedule) + if err != nil { + return false + } + // publish if last scheduled time is past + nextPublishTime := cronSchedule.Next(lastPublishTimestamp) + if nextPublishTime.Before(currentTime) { + return true + } + + return false +} diff --git a/pkg/transaction/metric/usagepublisher.go b/pkg/transaction/metric/usagepublisher.go index 67f686895..156e7e933 100644 --- a/pkg/transaction/metric/usagepublisher.go +++ b/pkg/transaction/metric/usagepublisher.go @@ -26,6 +26,7 @@ type usagePublisher struct { storage storageCache report *usageReportCache jobID string + schedule string ready bool offline bool logger log.FieldLogger @@ -197,10 +198,11 @@ func (c *usagePublisher) registerReportJob() { if agent.GetCentralConfig().GetUsageReportingConfig().IsOfflineMode() { schedule = agent.GetCentralConfig().GetUsageReportingConfig().GetReportSchedule() } + c.schedule = schedule // start the job according to the cron schedule var err error - c.jobID, err = jobs.RegisterScheduledJobWithName(c, schedule, "Usage Reporting") + c.jobID, err = jobs.RegisterScheduledJobWithName(c, c.schedule, "Usage Reporting") if err != nil { c.logger.WithError(err).Error("could not register usage report creation job") } @@ -233,8 +235,11 @@ func (c *usagePublisher) Ready() bool { // Execute - process the offline report generation func (c *usagePublisher) Execute() error { - if c.offline { - return c.report.saveReport() + if c.report.shouldPublish(c.schedule) { + if c.offline { + return c.report.saveReport() + } + return c.report.sendReport(c.publishToPlatformUsage) } - return c.report.sendReport(c.publishToPlatformUsage) + return nil }