diff --git a/pkg/job/cloudwatchrunner/customnamespace.go b/pkg/job/cloudwatchrunner/customnamespace.go new file mode 100644 index 00000000..8208df89 --- /dev/null +++ b/pkg/job/cloudwatchrunner/customnamespace.go @@ -0,0 +1,32 @@ +package cloudwatchrunner + +import ( + "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/job/listmetrics" + "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/job/resourcemetadata" + "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model" +) + +type CustomNamespaceJob struct { + Job model.CustomNamespaceJob +} + +func (c CustomNamespaceJob) Namespace() string { + return c.Job.Namespace +} + +func (c CustomNamespaceJob) listMetricsParams() listmetrics.ProcessingParams { + return listmetrics.ProcessingParams{ + Namespace: c.Job.Namespace, + Metrics: c.Job.Metrics, + RecentlyActiveOnly: c.Job.RecentlyActiveOnly, + DimensionNameRequirements: c.Job.DimensionNameRequirements, + } +} + +func (c CustomNamespaceJob) resourceEnrichment() ResourceEnrichment { + return resourcemetadata.StaticResource{Name: c.Job.Name} +} + +func (c CustomNamespaceJob) CustomTags() []model.Tag { + return c.Job.CustomTags +} diff --git a/pkg/job/cloudwatchrunner/discoveryjob.go b/pkg/job/cloudwatchrunner/discoveryjob.go new file mode 100644 index 00000000..e5ca63e0 --- /dev/null +++ b/pkg/job/cloudwatchrunner/discoveryjob.go @@ -0,0 +1,33 @@ +package cloudwatchrunner + +import ( + "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/job/listmetrics" + "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/job/resourcemetadata" + "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model" +) + +type DiscoveryJob struct { + Job model.DiscoveryJob + Resources []*model.TaggedResource +} + +func (d DiscoveryJob) Namespace() string { + return d.Job.Type +} + +func (d DiscoveryJob) CustomTags() []model.Tag { + return d.Job.CustomTags +} + +func (d DiscoveryJob) listMetricsParams() listmetrics.ProcessingParams { + return listmetrics.ProcessingParams{ + Namespace: d.Job.Type, + Metrics: d.Job.Metrics, + RecentlyActiveOnly: d.Job.RecentlyActiveOnly, + DimensionNameRequirements: d.Job.DimensionNameRequirements, + } +} + +func (d DiscoveryJob) resourceEnrichment() ResourceEnrichment { + return resourcemetadata.NewResourceAssociation(d.Job.DimensionsRegexps, d.Job.ExportedTagsOnMetrics, nil) +} diff --git a/pkg/job/runner.go b/pkg/job/cloudwatchrunner/runner.go similarity index 94% rename from pkg/job/runner.go rename to pkg/job/cloudwatchrunner/runner.go index 70e2afba..3b11e169 100644 --- a/pkg/job/runner.go +++ b/pkg/job/cloudwatchrunner/runner.go @@ -1,4 +1,4 @@ -package job +package cloudwatchrunner import ( "context" @@ -41,11 +41,11 @@ type Params struct { GetMetricDataMetricsPerQuery int } -func NewRunner(logger logging.Logger, factory clients.Factory, params Params, config Job) *Runner { +func New(logger logging.Logger, factory clients.Factory, params Params, job Job) *Runner { cloudwatchClient := factory.GetCloudwatchClient(params.Region, params.Role, params.CloudwatchConcurrency) lmProcessor := listmetrics.NewDefaultProcessor(logger, cloudwatchClient) gmdProcessor := getmetricdata.NewDefaultProcessor(logger, cloudwatchClient, params.GetMetricDataMetricsPerQuery, params.CloudwatchConcurrency.GetMetricData) - return internalNew(logger, lmProcessor, gmdProcessor, params, config) + return internalNew(logger, lmProcessor, gmdProcessor, params, job) } type Runner struct { diff --git a/pkg/job/discovery.go b/pkg/job/discovery.go index c1634c04..da3b46d2 100644 --- a/pkg/job/discovery.go +++ b/pkg/job/discovery.go @@ -19,6 +19,10 @@ type resourceAssociator interface { AssociateMetricToResource(cwMetric *model.Metric) (*model.TaggedResource, bool) } +type getMetricDataProcessor interface { + Run(ctx context.Context, namespace string, requests []*model.CloudwatchData) ([]*model.CloudwatchData, error) +} + func runDiscoveryJob( ctx context.Context, logger logging.Logger, diff --git a/pkg/job/scrape.go b/pkg/job/scrape.go index a2ecf392..06f5eb82 100644 --- a/pkg/job/scrape.go +++ b/pkg/job/scrape.go @@ -2,13 +2,14 @@ package job import ( "context" + "fmt" "sync" "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients" "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/cloudwatch" "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/config" + "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/job/cloudwatchrunner" "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/job/getmetricdata" - "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/job/listmetrics" "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/job/resourcemetadata" "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/logging" "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model" @@ -24,7 +25,12 @@ func ScrapeAwsData( taggingAPIConcurrency int, ) ([]model.TaggedResourceResult, []model.CloudwatchMetricResult) { if config.FlagsFromCtx(ctx).IsFeatureEnabled(config.UnifiedJobRunner) { - return runWithUnifiedRunner(ctx, logger, jobsCfg, factory, metricsPerQuery, cloudwatchConcurrency, taggingAPIConcurrency) + if len(jobsCfg.StaticJobs) > 0 { + logger.Error(nil, "Static jobs are not supported by the unified job runner at this time") + return nil, nil + } + runner := NewScrapeRunner(logger, jobsCfg, factory, metricsPerQuery, cloudwatchConcurrency, taggingAPIConcurrency) + return runner.Run(ctx) } mux := &sync.Mutex{} @@ -32,9 +38,9 @@ func ScrapeAwsData( awsInfoData := make([]model.TaggedResourceResult, 0) var wg sync.WaitGroup - for _, discoveryJob := range jobsCfg.DiscoveryJobs { - for _, role := range discoveryJob.Roles { - for _, region := range discoveryJob.Regions { + for _, job := range jobsCfg.DiscoveryJobs { + for _, role := range job.Roles { + for _, region := range job.Regions { wg.Add(1) go func(discoveryJob model.DiscoveryJob, region string, role model.Role) { defer wg.Done() @@ -75,7 +81,7 @@ func ScrapeAwsData( cwData = append(cwData, metricResult) mux.Unlock() } - }(discoveryJob, region, role) + }(job, region, role) } } } @@ -111,9 +117,9 @@ func ScrapeAwsData( } } - for _, customNamespaceJob := range jobsCfg.CustomNamespaceJobs { - for _, role := range customNamespaceJob.Roles { - for _, region := range customNamespaceJob.Regions { + for _, job := range jobsCfg.CustomNamespaceJobs { + for _, role := range job.Roles { + for _, region := range job.Regions { wg.Add(1) go func(customNamespaceJob model.CustomNamespaceJob, region string, role model.Role) { defer wg.Done() @@ -139,7 +145,7 @@ func ScrapeAwsData( mux.Lock() cwData = append(cwData, metricResult) mux.Unlock() - }(customNamespaceJob, region, role) + }(job, region, role) } } } @@ -147,62 +153,91 @@ func ScrapeAwsData( return awsInfoData, cwData } -func runWithUnifiedRunner( - ctx context.Context, - logger logging.Logger, +type ScrapeRunner struct { + factory clients.Factory + jobsCfg model.JobsConfig + metricsPerQuery int + cloudwatchConcurrency cloudwatch.ConcurrencyConfig + taggingAPIConcurrency int + + roleRegionToAccount map[model.Role]map[string]string + logger logging.Logger +} + +func NewScrapeRunner(logger logging.Logger, jobsCfg model.JobsConfig, factory clients.Factory, metricsPerQuery int, cloudwatchConcurrency cloudwatch.ConcurrencyConfig, taggingAPIConcurrency int, -) ([]model.TaggedResourceResult, []model.CloudwatchMetricResult) { - if len(jobsCfg.StaticJobs) > 0 { - logger.Error(nil, "Static jobs are not supported by the unified job runner at this time") +) *ScrapeRunner { + roleRegionToAccount := map[model.Role]map[string]string{} + jobConfigVisitor(jobsCfg, func(_ any, role model.Role, region string) { + if _, exists := roleRegionToAccount[role]; !exists { + roleRegionToAccount[role] = map[string]string{} + } + roleRegionToAccount[role] = map[string]string{region: ""} + }) + + return &ScrapeRunner{ + factory: factory, + logger: logger, + jobsCfg: jobsCfg, + metricsPerQuery: metricsPerQuery, + cloudwatchConcurrency: cloudwatchConcurrency, + taggingAPIConcurrency: taggingAPIConcurrency, + roleRegionToAccount: roleRegionToAccount, + } +} + +func (sr ScrapeRunner) Run(ctx context.Context) ([]model.TaggedResourceResult, []model.CloudwatchMetricResult) { + sr.logger.Debug("Starting account initialization") + for role, regions := range sr.roleRegionToAccount { + for region := range regions { + accountID, err := sr.factory.GetAccountClient(region, role).GetAccount(ctx) + if err != nil { + sr.logger.Error(err, "Failed to get Account", "region", region, "role_arn", role.RoleArn) + } else { + sr.roleRegionToAccount[role][region] = accountID + } + } } + sr.logger.Debug("Finished account initialization") mux := &sync.Mutex{} metricResults := make([]model.CloudwatchMetricResult, 0) resourceResults := make([]model.TaggedResourceResult, 0) var wg sync.WaitGroup - roleToRegionToAccount := map[model.Role]map[string]string{} - for _, job := range jobsCfg.DiscoveryJobs { - for _, role := range job.Roles { - if _, exists := roleToRegionToAccount[role]; !exists { - roleToRegionToAccount[role] = map[string]string{} + + sr.logger.Debug("Starting job runs") + jobConfigVisitor(sr.jobsCfg, func(job any, role model.Role, region string) { + wg.Add(1) + go func() { + defer wg.Done() + + var namespace string + jobAction(sr.logger, job, func(job model.DiscoveryJob) { + namespace = job.Type + }, func(job model.CustomNamespaceJob) { + namespace = job.Namespace + }) + jobLogger := sr.logger.With("namespace", namespace, "region", region, "arn", role.RoleArn) + accountID := sr.roleRegionToAccount[role][region] + if accountID == "" { + jobLogger.Error(nil, "Account for job was not found see previous errors") + return } - for _, region := range job.Regions { - jobLogger := logger.With("namespace", job.Type, "region", region, "arn", role.RoleArn) - var accountID string - if _, exists := roleToRegionToAccount[role][region]; !exists { - var err error - accountID, err = factory.GetAccountClient(region, role).GetAccount(ctx) - if err != nil { - logger.Error(err, "Couldn't get account Id") - continue - } - roleToRegionToAccount[role][region] = accountID - } else { - accountID = roleToRegionToAccount[role][region] - } - jobLogger = jobLogger.With("account", accountID) - // Capture important loop variables before starting goroutine - runnerParams := Params{ - Region: region, - Role: role, - CloudwatchConcurrency: cloudwatchConcurrency, - GetMetricDataMetricsPerQuery: metricsPerQuery, - } - wg.Add(1) - go func() { - defer wg.Done() - taggingClient := factory.GetTaggingClient(runnerParams.Region, runnerParams.Role, taggingAPIConcurrency) + var jobToRun cloudwatchrunner.Job + jobAction(jobLogger, job, + func(job model.DiscoveryJob) { + taggingClient := sr.factory.GetTaggingClient(region, role, sr.taggingAPIConcurrency) rmProcessor := resourcemetadata.NewProcessor(jobLogger, taggingClient) jobLogger.Debug("Starting resource discovery") - resourceResult, err := rmProcessor.Run(ctx, runnerParams.Region, runnerParams.AccountID, job) + resourceResult, err := rmProcessor.Run(ctx, region, accountID, job) if err != nil { - logger.Error(err, "Resource metadata processor failed") + jobLogger.Error(err, "Resource metadata processor failed") return } if len(resourceResult.Data) > 0 { @@ -210,133 +245,71 @@ func runWithUnifiedRunner( resourceResults = append(resourceResults, *resourceResult) mux.Unlock() } - jobLogger.Debug("Resource discovery finished starting cloudwatch metrics runner", "discovered_resources", len(resourceResult.Data)) - runner := NewRunner(jobLogger, factory, runnerParams, discoveryJob{job: job, resources: resourceResult.Data}) - metricResult, err := runner.Run(ctx) - if err != nil { - jobLogger.Error(err, "Failed to run job") - return - } + jobToRun = cloudwatchrunner.DiscoveryJob{Job: job, Resources: resourceResult.Data} + }, func(job model.CustomNamespaceJob) { + jobToRun = cloudwatchrunner.CustomNamespaceJob{Job: job} + }, + ) + jobLogger.Debug("Starting cloudwatch metrics runner") + runnerParams := cloudwatchrunner.Params{ + Region: region, + Role: role, + CloudwatchConcurrency: sr.cloudwatchConcurrency, + GetMetricDataMetricsPerQuery: sr.metricsPerQuery, + } + runner := cloudwatchrunner.New(jobLogger, sr.factory, runnerParams, jobToRun) + metricResult, err := runner.Run(ctx) + if err != nil { + jobLogger.Error(err, "Failed to run job") + return + } - if metricResult == nil { - jobLogger.Info("No metrics data found") - } + if metricResult == nil { + jobLogger.Info("No metrics data found") + } - jobLogger.Debug("Job run finished", "number_of_metrics", len(metricResult.Data)) + jobLogger.Debug("Job run finished", "number_of_metrics", len(metricResult.Data)) - mux.Lock() - defer mux.Unlock() - metricResults = append(metricResults, *metricResult) - }() + mux.Lock() + defer mux.Unlock() + metricResults = append(metricResults, *metricResult) + }() + }) + sr.logger.Debug("Finished job runs", "resource_results", len(resourceResults), "metric_results", len(metricResults)) + wg.Wait() + return resourceResults, metricResults +} + +// Walk through each custom namespace and discovery jobs and take an action +func jobConfigVisitor(jobsCfg model.JobsConfig, action func(job any, role model.Role, region string)) { + for _, job := range jobsCfg.DiscoveryJobs { + for _, role := range job.Roles { + for _, region := range job.Regions { + action(job, role, region) } } } for _, job := range jobsCfg.CustomNamespaceJobs { for _, role := range job.Roles { - if _, exists := roleToRegionToAccount[role]; !exists { - roleToRegionToAccount[role] = map[string]string{} - } for _, region := range job.Regions { - jobLogger := logger.With("namespace", job.Namespace, "region", region, "arn", role.RoleArn) - var accountID string - if _, exists := roleToRegionToAccount[role][region]; !exists { - var err error - accountID, err = factory.GetAccountClient(region, role).GetAccount(ctx) - if err != nil { - logger.Error(err, "Couldn't get account Id") - continue - } - roleToRegionToAccount[role][region] = accountID - } else { - accountID = roleToRegionToAccount[role][region] - } - jobLogger = jobLogger.With("account", accountID) - - runnerParams := Params{ - Region: region, - Role: role, - CloudwatchConcurrency: cloudwatchConcurrency, - GetMetricDataMetricsPerQuery: metricsPerQuery, - } - - wg.Add(1) - go func() { - defer wg.Done() - jobLogger.Debug("Starting cloudwatch metrics runner") - - runner := NewRunner(jobLogger, factory, runnerParams, customNamespaceJob{job: job}) - metricResult, err := runner.Run(ctx) - if err != nil { - jobLogger.Error(err, "Failed to run job") - return - } - if metricResult == nil { - jobLogger.Info("No metrics data found") - } - - jobLogger.Debug("Job run finished", "number_of_metrics", len(metricResult.Data)) - mux.Lock() - defer mux.Unlock() - metricResults = append(metricResults, *metricResult) - }() + action(job, role, region) } } } - - wg.Wait() - return resourceResults, metricResults } -type discoveryJob struct { - job model.DiscoveryJob - resources []*model.TaggedResource -} - -func (d discoveryJob) Namespace() string { - return d.job.Type -} - -func (d discoveryJob) CustomTags() []model.Tag { - return d.job.CustomTags -} - -func (d discoveryJob) listMetricsParams() listmetrics.ProcessingParams { - return listmetrics.ProcessingParams{ - Namespace: d.job.Type, - Metrics: d.job.Metrics, - RecentlyActiveOnly: d.job.RecentlyActiveOnly, - DimensionNameRequirements: d.job.DimensionNameRequirements, +func jobAction(logger logging.Logger, job any, discovery func(job model.DiscoveryJob), custom func(job model.CustomNamespaceJob)) { + // Type switches are free https://stackoverflow.com/a/28027945 + switch typedJob := job.(type) { + case model.DiscoveryJob: + discovery(typedJob) + case model.CustomNamespaceJob: + custom(typedJob) + default: + logger.Error(fmt.Errorf("config type of %T is not supported", typedJob), "Unexpected job type") + return } } - -func (d discoveryJob) resourceEnrichment() ResourceEnrichment { - return resourcemetadata.NewResourceAssociation(d.job.DimensionsRegexps, d.job.ExportedTagsOnMetrics, nil) -} - -type customNamespaceJob struct { - job model.CustomNamespaceJob -} - -func (c customNamespaceJob) Namespace() string { - return c.job.Namespace -} - -func (c customNamespaceJob) listMetricsParams() listmetrics.ProcessingParams { - return listmetrics.ProcessingParams{ - Namespace: c.job.Namespace, - Metrics: c.job.Metrics, - RecentlyActiveOnly: c.job.RecentlyActiveOnly, - DimensionNameRequirements: c.job.DimensionNameRequirements, - } -} - -func (c customNamespaceJob) resourceEnrichment() ResourceEnrichment { - return resourcemetadata.StaticResource{Name: c.job.Name} -} - -func (c customNamespaceJob) CustomTags() []model.Tag { - return c.job.CustomTags -}