Skip to content

Commit

Permalink
APIGOV-28410 usage reports on schedule (#826)
Browse files Browse the repository at this point in the history
* APIGOV-28410 usage reports on schedule

* APIGOV-28410 fix deadlock

* APIGOV-28410 updates

* APIGOV-28410 fix tests

* APIGOV-28410 change func name
  • Loading branch information
alrosca authored Sep 30, 2024
1 parent 10b0e52 commit 5f992f8
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 7 deletions.
6 changes: 3 additions & 3 deletions pkg/transaction/metric/cachestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ func (c *cacheStorage) initialize() {

func (c *cacheStorage) loadUsage(storageCache cache.Cache) {
// update the collector usage start time
usageStartTime, err := c.parseTimeFromCache(storageCache, usageStartTimeKey)
usageStartTime, err := parseTimeFromCache(storageCache, usageStartTimeKey)
if err == nil && !agent.GetCentralConfig().GetUsageReportingConfig().IsOfflineMode() {
// do not load this start time when offline
c.collector.usageStartTime = usageStartTime
}
// update the collector metric start time
metricStartTime, err := c.parseTimeFromCache(storageCache, metricStartTimeKey)
metricStartTime, err := parseTimeFromCache(storageCache, metricStartTimeKey)
if err == nil && !agent.GetCentralConfig().GetUsageReportingConfig().IsOfflineMode() {
// do not load this start time when offline
c.collector.metricStartTime = metricStartTime
Expand Down Expand Up @@ -254,7 +254,7 @@ func (c *cacheStorage) storeCacheJob() {
}
}

func (c *cacheStorage) parseTimeFromCache(storage cache.Cache, key string) (time.Time, error) {
func parseTimeFromCache(storage cache.Cache, key string) (time.Time, error) {
resultTime := now()
item, err := storage.Get(key)
if err != nil {
Expand Down
14 changes: 14 additions & 0 deletions pkg/transaction/metric/metricscollector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ var (
traceStatus = healthcheck.OK
)

func getFutureTime() time.Time {
return time.Now().Add(10 * time.Minute)
}

func createCentralCfg(url, env string) *config.CentralConfiguration {

cfg := config.NewCentralConfig(config.TraceabilityAgent).(*config.CentralConfiguration)
Expand All @@ -64,6 +68,7 @@ func createCentralCfg(url, env string) *config.CentralConfiguration {
usgCfg.Publish = true
metricCfg := cfg.MetricReporting.(*config.MetricReportingConfiguration)
metricCfg.Publish = true
// metricCfg.Schedule = "1 * * * * * *"
return cfg
}

Expand Down Expand Up @@ -627,6 +632,9 @@ func TestMetricCollectorUsageAggregation(t *testing.T) {
setupMockClient(0)
myCollector := createMetricCollector()
metricCollector := myCollector.(*collector)
metricCollector.usagePublisher.schedule = "* * * * *"
metricCollector.usagePublisher.report.currTimeFunc = getFutureTime

mockReports := generateMockReports(test.transactionsPerReport)
b, _ := json.Marshal(mockReports)
metricCollector.reports.reportCache.Set("lighthouse_events", string(b))
Expand Down Expand Up @@ -679,6 +687,8 @@ func TestMetricCollectorCache(t *testing.T) {
traceability.SetDataDirPath(".")
myCollector := createMetricCollector()
metricCollector := myCollector.(*collector)
metricCollector.usagePublisher.schedule = "* * * * *"
metricCollector.usagePublisher.report.currTimeFunc = getFutureTime

metricCollector.AddMetric(apiDetails1, "200", 5, 10, "")
metricCollector.AddMetric(apiDetails1, "200", 10, 10, "")
Expand All @@ -701,6 +711,8 @@ func TestMetricCollectorCache(t *testing.T) {
// Recreate the collector that loads the stored metrics, so 3 transactions
myCollector = createMetricCollector()
metricCollector = myCollector.(*collector)
metricCollector.usagePublisher.schedule = "* * * * *"
metricCollector.usagePublisher.report.currTimeFunc = getFutureTime

metricCollector.AddMetric(apiDetails1, "200", 5, 10, "")
metricCollector.AddMetric(apiDetails1, "200", 10, 10, "")
Expand All @@ -721,6 +733,8 @@ func TestMetricCollectorCache(t *testing.T) {
// Recreate the collector that loads the stored metrics, 0 transactions
myCollector = createMetricCollector()
metricCollector = myCollector.(*collector)
metricCollector.usagePublisher.schedule = "* * * * *"
metricCollector.usagePublisher.report.currTimeFunc = getFutureTime

metricCollector.Execute()
// Validate only no usage report sent as no previous or new transactions
Expand Down
50 changes: 50 additions & 0 deletions pkg/transaction/metric/reportcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@ import (
"github.com/Axway/agent-sdk/pkg/jobs"
"github.com/Axway/agent-sdk/pkg/traceability"
"github.com/Axway/agent-sdk/pkg/util/log"
"github.com/gorhill/cronexpr"
)

const (
eventsKey = "lighthouse_events"
lastPublishTimestampKey = "timestamp"
offlineCacheFileName = "agent-report-working.json"
offlineReportSuffix = "usage_report.json"
offlineReportDateFormat = "2006_01_02"
qaOfflineReportDateFormat = "2006_01_02_15_04"
)

type currentTimeFunc func() time.Time

type usageReportCache struct {
jobs.Job
logger log.FieldLogger
Expand All @@ -36,6 +40,7 @@ type usageReportCache struct {
reportCacheLock sync.Mutex
isInitialized bool
offlineReportDateFormat string
currTimeFunc currentTimeFunc
}

func newReportCache() *usageReportCache {
Expand All @@ -46,6 +51,7 @@ func newReportCache() *usageReportCache {
reportCache: cache.New(),
isInitialized: false,
offlineReportDateFormat: offlineReportDateFormat,
currTimeFunc: time.Now,
}
if agent.GetCentralConfig().GetUsageReportingConfig().UsingQAVars() {
reportManager.offlineReportDateFormat = qaOfflineReportDateFormat
Expand Down Expand Up @@ -110,6 +116,23 @@ func (c *usageReportCache) updateEvents(lighthouseEvent UsageEvent) {
c.setEvents(lighthouseEvent)
}

func (c *usageReportCache) setLastPublishTimestamp(lastPublishTimestamp time.Time) {
c.reportCache.Set(lastPublishTimestampKey, lastPublishTimestamp)
c.reportCache.Save(c.cacheFilePath)
}

func (c *usageReportCache) getLastPublishTimestamp() time.Time {
c.reportCacheLock.Lock()
defer c.reportCacheLock.Unlock()

lastPublishTime, err := parseTimeFromCache(c.reportCache, lastPublishTimestampKey)
if err != nil {
return time.Time{}
}

return lastPublishTime
}

func (c *usageReportCache) generateReportPath(timestamp ISO8601Time, index int) string {
format := "%s_%s"
if index != 0 {
Expand Down Expand Up @@ -233,7 +256,34 @@ func (c *usageReportCache) sendReport(publishFunc func(event UsageEvent) error)
return nil
}

// update the publish time
lastPublishTime := time.Now()
c.setLastPublishTimestamp(lastPublishTime)

savedEvents.Report = make(map[string]UsageReport)
c.setEvents(savedEvents)
return nil
}

func (c *usageReportCache) shouldPublish(schedule string) bool {
currentTime := c.currTimeFunc()
lastPublishTimestamp := c.getLastPublishTimestamp()

// if the last publish was made more than a day ago, publish
elapsedTimeSinceLastPublish := currentTime.Sub(lastPublishTimestamp)
if lastPublishTimestamp.IsZero() || elapsedTimeSinceLastPublish >= 24*time.Hour {
return true
}

cronSchedule, err := cronexpr.Parse(schedule)
if err != nil {
return false
}
// publish if last scheduled time is past
nextPublishTime := cronSchedule.Next(lastPublishTimestamp)
if nextPublishTime.Before(currentTime) {
return true
}

return false
}
13 changes: 9 additions & 4 deletions pkg/transaction/metric/usagepublisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type usagePublisher struct {
storage storageCache
report *usageReportCache
jobID string
schedule string
ready bool
offline bool
logger log.FieldLogger
Expand Down Expand Up @@ -197,10 +198,11 @@ func (c *usagePublisher) registerReportJob() {
if agent.GetCentralConfig().GetUsageReportingConfig().IsOfflineMode() {
schedule = agent.GetCentralConfig().GetUsageReportingConfig().GetReportSchedule()
}
c.schedule = schedule

// start the job according to the cron schedule
var err error
c.jobID, err = jobs.RegisterScheduledJobWithName(c, schedule, "Usage Reporting")
c.jobID, err = jobs.RegisterScheduledJobWithName(c, c.schedule, "Usage Reporting")
if err != nil {
c.logger.WithError(err).Error("could not register usage report creation job")
}
Expand Down Expand Up @@ -233,8 +235,11 @@ func (c *usagePublisher) Ready() bool {

// Execute - process the offline report generation
func (c *usagePublisher) Execute() error {
if c.offline {
return c.report.saveReport()
if c.report.shouldPublish(c.schedule) {
if c.offline {
return c.report.saveReport()
}
return c.report.sendReport(c.publishToPlatformUsage)
}
return c.report.sendReport(c.publishToPlatformUsage)
return nil
}

0 comments on commit 5f992f8

Please sign in to comment.