From 535d7f72f0899c7250cbfcf325ffb5b21d77c556 Mon Sep 17 00:00:00 2001 From: Dao Thanh Tung Date: Tue, 4 Jun 2024 12:42:26 +0100 Subject: [PATCH] refactor aws cloudwatch scaler (#5852) Signed-off-by: dttung2905 --- pkg/scalers/aws_cloudwatch_scaler.go | 263 +++++++--------------- pkg/scalers/aws_cloudwatch_scaler_test.go | 122 +++++----- 2 files changed, 142 insertions(+), 243 deletions(-) diff --git a/pkg/scalers/aws_cloudwatch_scaler.go b/pkg/scalers/aws_cloudwatch_scaler.go index 10eb10b260e..cd12a91976d 100644 --- a/pkg/scalers/aws_cloudwatch_scaler.go +++ b/pkg/scalers/aws_cloudwatch_scaler.go @@ -2,9 +2,8 @@ package scalers import ( "context" + "errors" "fmt" - "strconv" - "strings" "time" "github.com/aws/aws-sdk-go-v2/aws" @@ -18,13 +17,6 @@ import ( "github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" ) -const ( - defaultMetricCollectionTime = 300 - defaultMetricStat = "Average" - defaultMetricStatPeriod = 300 - defaultMetricEndTimeOffset = 0 -) - type awsCloudwatchScaler struct { metricType v2.MetricTargetType metadata *awsCloudwatchMetadata @@ -33,28 +25,68 @@ type awsCloudwatchScaler struct { } type awsCloudwatchMetadata struct { - namespace string - metricsName string - dimensionName []string - dimensionValue []string - expression string + awsAuthorization awsutils.AuthorizationMetadata - targetMetricValue float64 - activationTargetMetricValue float64 - minMetricValue float64 + triggerIndex int + Namespace string `keda:"name=namespace, order=triggerMetadata, optional"` + MetricsName string `keda:"name=metricName, order=triggerMetadata, optional"` + DimensionName []string `keda:"name=dimensionName, order=triggerMetadata, optional"` + DimensionValue []string `keda:"name=dimensionValue, order=triggerMetadata, optional"` + Expression string `keda:"name=expression, order=triggerMetadata, optional"` + + TargetMetricValue float64 `keda:"name=targetMetricValue, order=triggerMetadata"` + ActivationTargetMetricValue float64 `keda:"name=activationTargetMetricValue, order=triggerMetadata, optional"` + MinMetricValue float64 `keda:"name=minMetricValue, order=triggerMetadata"` + + MetricCollectionTime int64 `keda:"name=metricCollectionTime, order=triggerMetadata, optional, default=300"` + MetricStat string `keda:"name=metricStat, order=triggerMetadata, optional, default=Average"` + MetricUnit string `keda:"name=metricUnit, order=triggerMetadata, optional"` // Need to check the metric unit + MetricStatPeriod int64 `keda:"name=metricStatPeriod, order=triggerMetadata, optional, default=300"` + MetricEndTimeOffset int64 `keda:"name=metricEndTimeOffset, order=triggerMetadata, optional, default=0"` + + AwsRegion string `keda:"name=awsRegion, order=triggerMetadata"` + AwsEndpoint string `keda:"name=awsEndpoint, order=triggerMetadata, optional"` +} - metricCollectionTime int64 - metricStat string - metricUnit string - metricStatPeriod int64 - metricEndTimeOffset int64 +func (a *awsCloudwatchMetadata) Validate() error { + var err error + if a.Expression == "" { + if a.Namespace == "" { + return errors.New("namespace not given") + } - awsRegion string - awsEndpoint string + if a.MetricsName == "" { + return errors.New("metric name not given") + } - awsAuthorization awsutils.AuthorizationMetadata + if a.DimensionName == nil { + return errors.New("dimension name not given") + } + + if a.DimensionValue == nil { + return errors.New("dimension value not given") + } - triggerIndex int + if len(a.DimensionName) != len(a.DimensionValue) { + return errors.New("dimensionName and dimensionValue are not matching in size") + } + + if err = checkMetricUnit(a.MetricUnit); err != nil { + return err + } + } + + if err = checkMetricStat(a.MetricStat); err != nil { + return err + } + if err = checkMetricStatPeriod(a.MetricStatPeriod); err != nil { + return err + } + if a.MetricCollectionTime < 0 || a.MetricCollectionTime%a.MetricStatPeriod != 0 { + return fmt.Errorf("metricCollectionTime must be greater than 0 and a multiple of metricStatPeriod(%d), %d is given", a.MetricStatPeriod, a.MetricCollectionTime) + } + + return nil } // NewAwsCloudwatchScaler creates a new awsCloudwatchScaler @@ -81,156 +113,23 @@ func NewAwsCloudwatchScaler(ctx context.Context, config *scalersconfig.ScalerCon }, nil } -func getIntMetadataValue(metadata map[string]string, key string, required bool, defaultValue int64) (int64, error) { - if val, ok := metadata[key]; ok && val != "" { - value, err := strconv.Atoi(val) - if err != nil { - return 0, fmt.Errorf("error parsing %s metadata: %w", key, err) - } - return int64(value), nil - } - - if required { - return 0, fmt.Errorf("metadata %s not given", key) - } - - return defaultValue, nil -} - -func getFloatMetadataValue(metadata map[string]string, key string, required bool, defaultValue float64) (float64, error) { - if val, ok := metadata[key]; ok && val != "" { - value, err := strconv.ParseFloat(val, 64) - if err != nil { - return 0, fmt.Errorf("error parsing %s metadata: %w", key, err) - } - return value, nil - } - - if required { - return 0, fmt.Errorf("metadata %s not given", key) - } - - return defaultValue, nil -} - func createCloudwatchClient(ctx context.Context, metadata *awsCloudwatchMetadata) (*cloudwatch.Client, error) { - cfg, err := awsutils.GetAwsConfig(ctx, metadata.awsRegion, metadata.awsAuthorization) + cfg, err := awsutils.GetAwsConfig(ctx, metadata.AwsRegion, metadata.awsAuthorization) if err != nil { return nil, err } return cloudwatch.NewFromConfig(*cfg, func(options *cloudwatch.Options) { - if metadata.awsEndpoint != "" { - options.BaseEndpoint = aws.String(metadata.awsEndpoint) + if metadata.AwsEndpoint != "" { + options.BaseEndpoint = aws.String(metadata.AwsEndpoint) } }), nil } func parseAwsCloudwatchMetadata(config *scalersconfig.ScalerConfig) (*awsCloudwatchMetadata, error) { - var err error - meta := awsCloudwatchMetadata{} - - if config.TriggerMetadata["expression"] != "" { - if val, ok := config.TriggerMetadata["expression"]; ok && val != "" { - meta.expression = val - } else { - return nil, fmt.Errorf("expression not given") - } - } else { - if val, ok := config.TriggerMetadata["namespace"]; ok && val != "" { - meta.namespace = val - } else { - return nil, fmt.Errorf("namespace not given") - } - - if val, ok := config.TriggerMetadata["metricName"]; ok && val != "" { - meta.metricsName = val - } else { - return nil, fmt.Errorf("metric name not given") - } - - if val, ok := config.TriggerMetadata["dimensionName"]; ok && val != "" { - meta.dimensionName = strings.Split(val, ";") - } else { - return nil, fmt.Errorf("dimension name not given") - } - - if val, ok := config.TriggerMetadata["dimensionValue"]; ok && val != "" { - meta.dimensionValue = strings.Split(val, ";") - } else { - return nil, fmt.Errorf("dimension value not given") - } - - if len(meta.dimensionName) != len(meta.dimensionValue) { - return nil, fmt.Errorf("dimensionName and dimensionValue are not matching in size") - } - - meta.metricUnit = config.TriggerMetadata["metricUnit"] - if err = checkMetricUnit(meta.metricUnit); err != nil { - return nil, err - } - } - - targetMetricValue, err := getFloatMetadataValue(config.TriggerMetadata, "targetMetricValue", true, 0) - if err != nil { - return nil, err - } - meta.targetMetricValue = targetMetricValue - - activationTargetMetricValue, err := getFloatMetadataValue(config.TriggerMetadata, "activationTargetMetricValue", false, 0) - if err != nil { - return nil, err - } - meta.activationTargetMetricValue = activationTargetMetricValue - - minMetricValue, err := getFloatMetadataValue(config.TriggerMetadata, "minMetricValue", true, 0) - if err != nil { - return nil, err - } - meta.minMetricValue = minMetricValue - - meta.metricStat = defaultMetricStat - if val, ok := config.TriggerMetadata["metricStat"]; ok && val != "" { - meta.metricStat = val - } - if err = checkMetricStat(meta.metricStat); err != nil { - return nil, err - } - - metricStatPeriod, err := getIntMetadataValue(config.TriggerMetadata, "metricStatPeriod", false, defaultMetricStatPeriod) - if err != nil { - return nil, err - } - meta.metricStatPeriod = metricStatPeriod - - if err = checkMetricStatPeriod(meta.metricStatPeriod); err != nil { - return nil, err - } - - metricCollectionTime, err := getIntMetadataValue(config.TriggerMetadata, "metricCollectionTime", false, defaultMetricCollectionTime) - if err != nil { - return nil, err - } - meta.metricCollectionTime = metricCollectionTime - - if meta.metricCollectionTime < 0 || meta.metricCollectionTime%meta.metricStatPeriod != 0 { - return nil, fmt.Errorf("metricCollectionTime must be greater than 0 and a multiple of metricStatPeriod(%d), %d is given", meta.metricStatPeriod, meta.metricCollectionTime) - } - - metricEndTimeOffset, err := getIntMetadataValue(config.TriggerMetadata, "metricEndTimeOffset", false, defaultMetricEndTimeOffset) - if err != nil { - return nil, err - } - meta.metricEndTimeOffset = metricEndTimeOffset - - if val, ok := config.TriggerMetadata["awsRegion"]; ok && val != "" { - meta.awsRegion = val - } else { - return nil, fmt.Errorf("no awsRegion given") - } - - if val, ok := config.TriggerMetadata["awsEndpoint"]; ok { - meta.awsEndpoint = val + meta := &awsCloudwatchMetadata{} + if err := config.TypedConfig(meta); err != nil { + return nil, fmt.Errorf("error parsing prometheus metadata: %w", err) } awsAuthorization, err := awsutils.GetAwsAuthorization(config.TriggerUniqueKey, config.PodIdentity, config.TriggerMetadata, config.AuthParams, config.ResolvedEnv) @@ -241,7 +140,7 @@ func parseAwsCloudwatchMetadata(config *scalersconfig.ScalerConfig) (*awsCloudwa meta.triggerIndex = config.TriggerIndex - return &meta, nil + return meta, nil } func checkMetricStat(stat string) error { @@ -300,7 +199,7 @@ func (s *awsCloudwatchScaler) GetMetricsAndActivity(ctx context.Context, metricN metric := GenerateMetricInMili(metricName, metricValue) - return []external_metrics.ExternalMetricValue{metric}, metricValue > s.metadata.activationTargetMetricValue, nil + return []external_metrics.ExternalMetricValue{metric}, metricValue > s.metadata.ActivationTargetMetricValue, nil } func (s *awsCloudwatchScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { @@ -308,7 +207,7 @@ func (s *awsCloudwatchScaler) GetMetricSpecForScaling(context.Context) []v2.Metr Metric: v2.MetricIdentifier{ Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, "aws-cloudwatch"), }, - Target: GetMetricTargetMili(s.metricType, s.metadata.targetMetricValue), + Target: GetMetricTargetMili(s.metricType, s.metadata.TargetMetricValue), } metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType} return []v2.MetricSpec{metricSpec} @@ -322,33 +221,33 @@ func (s *awsCloudwatchScaler) Close(context.Context) error { func (s *awsCloudwatchScaler) GetCloudwatchMetrics(ctx context.Context) (float64, error) { var input cloudwatch.GetMetricDataInput - startTime, endTime := computeQueryWindow(time.Now(), s.metadata.metricStatPeriod, s.metadata.metricEndTimeOffset, s.metadata.metricCollectionTime) + startTime, endTime := computeQueryWindow(time.Now(), s.metadata.MetricStatPeriod, s.metadata.MetricEndTimeOffset, s.metadata.MetricCollectionTime) - if s.metadata.expression != "" { + if s.metadata.Expression != "" { input = cloudwatch.GetMetricDataInput{ StartTime: aws.Time(startTime), EndTime: aws.Time(endTime), ScanBy: types.ScanByTimestampDescending, MetricDataQueries: []types.MetricDataQuery{ { - Expression: aws.String(s.metadata.expression), + Expression: aws.String(s.metadata.Expression), Id: aws.String("q1"), - Period: aws.Int32(int32(s.metadata.metricStatPeriod)), + Period: aws.Int32(int32(s.metadata.MetricStatPeriod)), }, }, } } else { var dimensions []types.Dimension - for i := range s.metadata.dimensionName { + for i := range s.metadata.DimensionName { dimensions = append(dimensions, types.Dimension{ - Name: &s.metadata.dimensionName[i], - Value: &s.metadata.dimensionValue[i], + Name: &s.metadata.DimensionName[i], + Value: &s.metadata.DimensionValue[i], }) } var metricUnit string - if s.metadata.metricUnit != "" { - metricUnit = s.metadata.metricUnit + if s.metadata.MetricUnit != "" { + metricUnit = s.metadata.MetricUnit } input = cloudwatch.GetMetricDataInput{ @@ -360,12 +259,12 @@ func (s *awsCloudwatchScaler) GetCloudwatchMetrics(ctx context.Context) (float64 Id: aws.String("c1"), MetricStat: &types.MetricStat{ Metric: &types.Metric{ - Namespace: aws.String(s.metadata.namespace), + Namespace: aws.String(s.metadata.Namespace), Dimensions: dimensions, - MetricName: aws.String(s.metadata.metricsName), + MetricName: aws.String(s.metadata.MetricsName), }, - Period: aws.Int32(int32(s.metadata.metricStatPeriod)), - Stat: aws.String(s.metadata.metricStat), + Period: aws.Int32(int32(s.metadata.MetricStatPeriod)), + Stat: aws.String(s.metadata.MetricStat), Unit: types.StandardUnit(metricUnit), }, ReturnData: aws.Bool(true), @@ -387,7 +286,7 @@ func (s *awsCloudwatchScaler) GetCloudwatchMetrics(ctx context.Context) (float64 metricValue = output.MetricDataResults[0].Values[0] } else { s.logger.Info("empty metric data received, returning minMetricValue") - metricValue = s.metadata.minMetricValue + metricValue = s.metadata.MinMetricValue } return metricValue, nil diff --git a/pkg/scalers/aws_cloudwatch_scaler_test.go b/pkg/scalers/aws_cloudwatch_scaler_test.go index 0564f205b88..35290154e80 100644 --- a/pkg/scalers/aws_cloudwatch_scaler_test.go +++ b/pkg/scalers/aws_cloudwatch_scaler_test.go @@ -366,81 +366,81 @@ var awsCloudwatchMetricIdentifiers = []awsCloudwatchMetricIdentifier{ var awsCloudwatchGetMetricTestData = []awsCloudwatchMetadata{ { - namespace: "Custom", - metricsName: "HasData", - dimensionName: []string{"DIM"}, - dimensionValue: []string{"DIM_VALUE"}, - targetMetricValue: 100, - minMetricValue: 0, - metricCollectionTime: 60, - metricStat: "Average", - metricUnit: "SampleCount", - metricStatPeriod: 60, - metricEndTimeOffset: 60, - awsRegion: "us-west-2", + Namespace: "Custom", + MetricsName: "HasData", + DimensionName: []string{"DIM"}, + DimensionValue: []string{"DIM_VALUE"}, + TargetMetricValue: 100, + MinMetricValue: 0, + MetricCollectionTime: 60, + MetricStat: "Average", + MetricUnit: "SampleCount", + MetricStatPeriod: 60, + MetricEndTimeOffset: 60, + AwsRegion: "us-west-2", awsAuthorization: awsutils.AuthorizationMetadata{PodIdentityOwner: false}, triggerIndex: 0, }, { - namespace: "Custom", - metricsName: "HasDataNoUnit", - dimensionName: []string{"DIM"}, - dimensionValue: []string{"DIM_VALUE"}, - targetMetricValue: 100, - minMetricValue: 0, - metricCollectionTime: 60, - metricStat: "Average", - metricUnit: "", - metricStatPeriod: 60, - metricEndTimeOffset: 60, - awsRegion: "us-west-2", + Namespace: "Custom", + MetricsName: "HasDataNoUnit", + DimensionName: []string{"DIM"}, + DimensionValue: []string{"DIM_VALUE"}, + TargetMetricValue: 100, + MinMetricValue: 0, + MetricCollectionTime: 60, + MetricStat: "Average", + MetricUnit: "", + MetricStatPeriod: 60, + MetricEndTimeOffset: 60, + AwsRegion: "us-west-2", awsAuthorization: awsutils.AuthorizationMetadata{PodIdentityOwner: false}, triggerIndex: 0, }, { - namespace: "Custom", - metricsName: testAWSCloudwatchErrorMetric, - dimensionName: []string{"DIM"}, - dimensionValue: []string{"DIM_VALUE"}, - targetMetricValue: 100, - minMetricValue: 0, - metricCollectionTime: 60, - metricStat: "Average", - metricUnit: "", - metricStatPeriod: 60, - metricEndTimeOffset: 60, - awsRegion: "us-west-2", + Namespace: "Custom", + MetricsName: testAWSCloudwatchErrorMetric, + DimensionName: []string{"DIM"}, + DimensionValue: []string{"DIM_VALUE"}, + TargetMetricValue: 100, + MinMetricValue: 0, + MetricCollectionTime: 60, + MetricStat: "Average", + MetricUnit: "", + MetricStatPeriod: 60, + MetricEndTimeOffset: 60, + AwsRegion: "us-west-2", awsAuthorization: awsutils.AuthorizationMetadata{PodIdentityOwner: false}, triggerIndex: 0, }, { - namespace: "Custom", - metricsName: testAWSCloudwatchNoValueMetric, - dimensionName: []string{"DIM"}, - dimensionValue: []string{"DIM_VALUE"}, - targetMetricValue: 100, - minMetricValue: 0, - metricCollectionTime: 60, - metricStat: "Average", - metricUnit: "", - metricStatPeriod: 60, - metricEndTimeOffset: 60, - awsRegion: "us-west-2", + Namespace: "Custom", + MetricsName: testAWSCloudwatchNoValueMetric, + DimensionName: []string{"DIM"}, + DimensionValue: []string{"DIM_VALUE"}, + TargetMetricValue: 100, + MinMetricValue: 0, + MetricCollectionTime: 60, + MetricStat: "Average", + MetricUnit: "", + MetricStatPeriod: 60, + MetricEndTimeOffset: 60, + AwsRegion: "us-west-2", awsAuthorization: awsutils.AuthorizationMetadata{PodIdentityOwner: false}, triggerIndex: 0, }, { - namespace: "Custom", - metricsName: "HasDataFromExpression", - expression: "SELECT MIN(MessageCount) FROM \"AWS/AmazonMQ\" WHERE Broker = 'production' and Queue = 'worker'", - targetMetricValue: 100, - minMetricValue: 0, - metricCollectionTime: 60, - metricStat: "Average", - metricUnit: "SampleCount", - metricStatPeriod: 60, - metricEndTimeOffset: 60, - awsRegion: "us-west-2", + Namespace: "Custom", + MetricsName: "HasDataFromExpression", + Expression: "SELECT MIN(MessageCount) FROM \"AWS/AmazonMQ\" WHERE Broker = 'production' and Queue = 'worker'", + TargetMetricValue: 100, + MinMetricValue: 0, + MetricCollectionTime: 60, + MetricStat: "Average", + MetricUnit: "SampleCount", + MetricStatPeriod: 60, + MetricEndTimeOffset: 60, + AwsRegion: "us-west-2", awsAuthorization: awsutils.AuthorizationMetadata{PodIdentityOwner: false}, triggerIndex: 0, }, @@ -502,8 +502,8 @@ func TestAWSCloudwatchGetMetricSpecForScaling(t *testing.T) { func TestAWSCloudwatchScalerGetMetrics(t *testing.T) { for _, meta := range awsCloudwatchGetMetricTestData { mockAWSCloudwatchScaler := awsCloudwatchScaler{"", &meta, &mockCloudwatch{}, logr.Discard()} - value, _, err := mockAWSCloudwatchScaler.GetMetricsAndActivity(context.Background(), meta.metricsName) - switch meta.metricsName { + value, _, err := mockAWSCloudwatchScaler.GetMetricsAndActivity(context.Background(), meta.MetricsName) + switch meta.MetricsName { case testAWSCloudwatchErrorMetric: assert.Error(t, err, "expect error because of cloudwatch api error") case testAWSCloudwatchNoValueMetric: