Skip to content

Commit

Permalink
Slight refactors
Browse files Browse the repository at this point in the history
  • Loading branch information
kgeckhart committed May 16, 2024
1 parent 45af6a5 commit 384f779
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 225 deletions.
10 changes: 5 additions & 5 deletions pkg/job/cloudwatchrunner/discoveryjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@ import (
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model"
)

type DiscoveryJob struct {
type Discovery struct {
Job model.DiscoveryJob
Resources []*model.TaggedResource
}

func (d DiscoveryJob) Namespace() string {
func (d Discovery) Namespace() string {
return d.Job.Type
}

func (d DiscoveryJob) CustomTags() []model.Tag {
func (d Discovery) CustomTags() []model.Tag {
return d.Job.CustomTags
}

func (d DiscoveryJob) listMetricsParams() listmetrics.ProcessingParams {
func (d Discovery) listMetricsParams() listmetrics.ProcessingParams {
return listmetrics.ProcessingParams{
Namespace: d.Job.Type,
Metrics: d.Job.Metrics,
Expand All @@ -28,6 +28,6 @@ func (d DiscoveryJob) listMetricsParams() listmetrics.ProcessingParams {
}
}

func (d DiscoveryJob) resourceEnrichment() ResourceEnrichment {
func (d Discovery) resourceEnrichment() ResourceEnrichment {
return resourcemetadata.NewResourceAssociation(d.Job.DimensionsRegexps, d.Job.ExportedTagsOnMetrics, nil)
}
20 changes: 6 additions & 14 deletions pkg/job/cloudwatchrunner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ type Params struct {
GetMetricDataMetricsPerQuery int
}

func New(logger logging.Logger, factory clients.Factory, params Params, job Job) *Runner {
func NewDefault(logger logging.Logger, factory clients.Factory, params Params, job Job) *Runner {
cloudwatchClient := factory.GetCloudwatchClient(params.Region, params.Role, params.CloudwatchConcurrency)
lmProcessor := listmetrics.NewDefaultProcessor(logger, cloudwatchClient)
gmdProcessor := getmetricdata.NewDefaultProcessor(logger, cloudwatchClient, params.GetMetricDataMetricsPerQuery, params.CloudwatchConcurrency.GetMetricData)
return internalNew(logger, lmProcessor, gmdProcessor, params, job)
return New(logger, lmProcessor, gmdProcessor, params, job)
}

type Runner struct {
Expand All @@ -56,8 +56,8 @@ type Runner struct {
params Params
}

// internalNew allows an injection point for interfaces
func internalNew(logger logging.Logger, listMetrics listMetricsProcessor, getMetricData getMetricDataProcessor, params Params, job Job) *Runner {
// New allows an injection point for interfaces
func New(logger logging.Logger, listMetrics listMetricsProcessor, getMetricData getMetricDataProcessor, params Params, job Job) *Runner {
return &Runner{
logger: logger,
job: job,
Expand All @@ -67,7 +67,7 @@ func internalNew(logger logging.Logger, listMetrics listMetricsProcessor, getMet
}
}

func (r *Runner) Run(ctx context.Context) (*model.CloudwatchMetricResult, error) {
func (r *Runner) Run(ctx context.Context) ([]*model.CloudwatchData, error) {
a := appender.New(r.job.resourceEnrichment().Create(r.logger))

err := r.listMetrics.Run(ctx, r.job.listMetricsParams(), a)
Expand All @@ -85,13 +85,5 @@ func (r *Runner) Run(ctx context.Context) (*model.CloudwatchMetricResult, error)
return nil, fmt.Errorf("failed to get metric data: %w", err)
}

result := &model.CloudwatchMetricResult{
Context: &model.ScrapeContext{
Region: r.params.Region,
AccountID: r.params.AccountID,
CustomTags: r.job.CustomTags(),
},
Data: metrics,
}
return result, nil
return metrics, nil
}
17 changes: 2 additions & 15 deletions pkg/job/resourcemetadata/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func NewProcessor(logger logging.Logger, client tagging.Client) *Processor {
}
}

func (p Processor) Run(ctx context.Context, region string, account string, job model.DiscoveryJob) (*model.TaggedResourceResult, error) {
func (p Processor) Run(ctx context.Context, region string, job model.DiscoveryJob) ([]*model.TaggedResource, error) {
resources, err := p.client.GetResources(ctx, job, region)
if err != nil {
if errors.Is(err, tagging.ErrExpectedToFindResources) {
Expand All @@ -31,18 +31,5 @@ func (p Processor) Run(ctx context.Context, region string, account string, job m
return nil, fmt.Errorf("failed to describe resources: %w", err)
}

if len(resources) == 0 {
p.logger.Debug("No tagged resources", "region", region, "namespace", job.Type)
}

result := &model.TaggedResourceResult{
Context: &model.ScrapeContext{
Region: region,
AccountID: account,
CustomTags: job.CustomTags,
},
Data: resources,
}

return result, nil
return resources, nil
}
60 changes: 33 additions & 27 deletions pkg/job/resourcemetadata/resourceassociation.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,28 @@ func NewResourceAssociation(dimensionRegexps []model.DimensionsRegexp, tagsOnMet
func (ra Association) Create(logger logging.Logger) MetricResourceEnricher {
if len(ra.DimensionRegexps) > 0 && len(ra.Resources) > 0 {
maxDim := maxDimAdapter{wrapped: maxdimassociator.NewAssociator(logger, ra.DimensionRegexps, ra.Resources)}
return NewResourceAssociationEnricher(nil, maxDim)
return NewResourceAssociationEnricher(nil, maxDim, ra.TagsOnMetrics)
}

return NewResourceAssociationEnricher(globalResource, nil)
return NewResourceAssociationEnricher(globalResource, nil, nil)
}

type associator interface {
MetricToResource(cwMetric *model.Metric) *Resource
}

type ResourceAssociationEnricher struct {
associator associator
staticResource *Resource
associator associator
staticResource *Resource
resourceTagsOnMetrics []string
}

// NewResourceAssociationEnricher is an injectable function for testing purposes
func NewResourceAssociationEnricher(staticResource *Resource, associator associator) *ResourceAssociationEnricher {
func NewResourceAssociationEnricher(staticResource *Resource, associator associator, resourceTagsOnMetrics []string) *ResourceAssociationEnricher {
return &ResourceAssociationEnricher{
staticResource: staticResource,
associator: associator,
staticResource: staticResource,
associator: associator,
resourceTagsOnMetrics: resourceTagsOnMetrics,
}
}

Expand All @@ -64,26 +66,30 @@ func (rad *ResourceAssociationEnricher) Enrich(_ context.Context, metrics []*mod
if resource != nil {
metrics[outputI] = metric

// TODO tags on metrics should probably create a new Resource?
// var tags []model.Tag
// if len(c.resourceTagsOnMetrics) > 0 {
// tags = make([]model.Tag, 0, len(c.resourceTagsOnMetrics))
// for _, tagName := range c.resourceTagsOnMetrics {
// tag := model.Tag{
// Key: tagName,
// }
// for _, resourceTag := range resource.Tags {
// if resourceTag.Key == tagName {
// tag.Value = resourceTag.Value
// break
// }
// }
//
// // Always add the tag, even if it's empty, to ensure the same labels are present on all metrics for a single service
// tags = append(tags, tag)
// }
// }
associatedResources[outputI] = resource
var tags []model.Tag
if len(rad.resourceTagsOnMetrics) > 0 {
tags = make([]model.Tag, 0, len(rad.resourceTagsOnMetrics))
for _, tagName := range rad.resourceTagsOnMetrics {
tag := model.Tag{
Key: tagName,
}
for _, resourceTag := range resource.Tags {
if resourceTag.Key == tagName {
tag.Value = resourceTag.Value
break
}
}

// Always add the tag, even if it's empty, to ensure the same labels are present on all metrics for a single service
tags = append(tags, tag)
}
}

// TODO is it safe to modify the tags on the original resource pointer?
associatedResources[outputI] = &Resource{
Name: resource.Name,
Tags: tags,
}
outputI++
}
}
Expand Down
164 changes: 0 additions & 164 deletions pkg/job/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@ package job

import (
"context"
"fmt"
"sync"

"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/cloudwatch"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/config"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/job/cloudwatchrunner"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/job/getmetricdata"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/job/resourcemetadata"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/logging"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model"
)
Expand Down Expand Up @@ -152,164 +149,3 @@ func ScrapeAwsData(
wg.Wait()
return awsInfoData, cwData
}

type ScrapeRunner struct {
factory clients.Factory
jobsCfg model.JobsConfig
metricsPerQuery int
cloudwatchConcurrency cloudwatch.ConcurrencyConfig
taggingAPIConcurrency int

roleRegionToAccount map[model.Role]map[string]string
logger logging.Logger
}

func NewScrapeRunner(logger logging.Logger,
jobsCfg model.JobsConfig,
factory clients.Factory,
metricsPerQuery int,
cloudwatchConcurrency cloudwatch.ConcurrencyConfig,
taggingAPIConcurrency int,
) *ScrapeRunner {
roleRegionToAccount := map[model.Role]map[string]string{}
jobConfigVisitor(jobsCfg, func(_ any, role model.Role, region string) {
if _, exists := roleRegionToAccount[role]; !exists {
roleRegionToAccount[role] = map[string]string{}
}
roleRegionToAccount[role] = map[string]string{region: ""}
})

return &ScrapeRunner{
factory: factory,
logger: logger,
jobsCfg: jobsCfg,
metricsPerQuery: metricsPerQuery,
cloudwatchConcurrency: cloudwatchConcurrency,
taggingAPIConcurrency: taggingAPIConcurrency,
roleRegionToAccount: roleRegionToAccount,
}
}

func (sr ScrapeRunner) Run(ctx context.Context) ([]model.TaggedResourceResult, []model.CloudwatchMetricResult) {
sr.logger.Debug("Starting account initialization")
for role, regions := range sr.roleRegionToAccount {
for region := range regions {
accountID, err := sr.factory.GetAccountClient(region, role).GetAccount(ctx)
if err != nil {
sr.logger.Error(err, "Failed to get Account", "region", region, "role_arn", role.RoleArn)
} else {
sr.roleRegionToAccount[role][region] = accountID
}
}
}
sr.logger.Debug("Finished account initialization")

mux := &sync.Mutex{}
metricResults := make([]model.CloudwatchMetricResult, 0)
resourceResults := make([]model.TaggedResourceResult, 0)
var wg sync.WaitGroup

sr.logger.Debug("Starting job runs")
jobConfigVisitor(sr.jobsCfg, func(job any, role model.Role, region string) {
wg.Add(1)
go func() {
defer wg.Done()

var namespace string
jobAction(sr.logger, job, func(job model.DiscoveryJob) {
namespace = job.Type
}, func(job model.CustomNamespaceJob) {
namespace = job.Namespace
})
jobLogger := sr.logger.With("namespace", namespace, "region", region, "arn", role.RoleArn)
accountID := sr.roleRegionToAccount[role][region]
if accountID == "" {
jobLogger.Error(nil, "Account for job was not found see previous errors")
return
}

var jobToRun cloudwatchrunner.Job
jobAction(jobLogger, job,
func(job model.DiscoveryJob) {
taggingClient := sr.factory.GetTaggingClient(region, role, sr.taggingAPIConcurrency)
rmProcessor := resourcemetadata.NewProcessor(jobLogger, taggingClient)

jobLogger.Debug("Starting resource discovery")
resourceResult, err := rmProcessor.Run(ctx, region, accountID, job)
if err != nil {
jobLogger.Error(err, "Resource metadata processor failed")
return
}
if len(resourceResult.Data) > 0 {
mux.Lock()
resourceResults = append(resourceResults, *resourceResult)
mux.Unlock()
}
jobLogger.Debug("Resource discovery finished starting cloudwatch metrics runner", "discovered_resources", len(resourceResult.Data))

jobToRun = cloudwatchrunner.DiscoveryJob{Job: job, Resources: resourceResult.Data}
}, func(job model.CustomNamespaceJob) {
jobToRun = cloudwatchrunner.CustomNamespaceJob{Job: job}
},
)
jobLogger.Debug("Starting cloudwatch metrics runner")
runnerParams := cloudwatchrunner.Params{
Region: region,
Role: role,
CloudwatchConcurrency: sr.cloudwatchConcurrency,
GetMetricDataMetricsPerQuery: sr.metricsPerQuery,
}
runner := cloudwatchrunner.New(jobLogger, sr.factory, runnerParams, jobToRun)
metricResult, err := runner.Run(ctx)
if err != nil {
jobLogger.Error(err, "Failed to run job")
return
}

if metricResult == nil {
jobLogger.Info("No metrics data found")
}

jobLogger.Debug("Job run finished", "number_of_metrics", len(metricResult.Data))

mux.Lock()
defer mux.Unlock()
metricResults = append(metricResults, *metricResult)
}()
})
sr.logger.Debug("Finished job runs", "resource_results", len(resourceResults), "metric_results", len(metricResults))
wg.Wait()
return resourceResults, metricResults
}

// Walk through each custom namespace and discovery jobs and take an action
func jobConfigVisitor(jobsCfg model.JobsConfig, action func(job any, role model.Role, region string)) {
for _, job := range jobsCfg.DiscoveryJobs {
for _, role := range job.Roles {
for _, region := range job.Regions {
action(job, role, region)
}
}
}

for _, job := range jobsCfg.CustomNamespaceJobs {
for _, role := range job.Roles {
for _, region := range job.Regions {
action(job, role, region)
}
}
}
}

func jobAction(logger logging.Logger, job any, discovery func(job model.DiscoveryJob), custom func(job model.CustomNamespaceJob)) {
// Type switches are free https://stackoverflow.com/a/28027945
switch typedJob := job.(type) {
case model.DiscoveryJob:
discovery(typedJob)
case model.CustomNamespaceJob:
custom(typedJob)
default:
logger.Error(fmt.Errorf("config type of %T is not supported", typedJob), "Unexpected job type")
return
}
}
Loading

0 comments on commit 384f779

Please sign in to comment.