diff --git a/pkg/scalers/aws_sqs_queue_scaler.go b/pkg/scalers/aws_sqs_queue_scaler.go index 1c5976685fb..a99851a87ea 100644 --- a/pkg/scalers/aws_sqs_queue_scaler.go +++ b/pkg/scalers/aws_sqs_queue_scaler.go @@ -20,6 +20,7 @@ import ( ) const ( + defaultTargetQueueLength = 5 targetQueueLengthDefault = 5 activationTargetQueueLengthDefault = 0 defaultScaleOnInFlight = true diff --git a/pkg/scalers/azure_queue_scaler.go b/pkg/scalers/azure_queue_scaler.go index 6f642ec04bf..782352acb84 100644 --- a/pkg/scalers/azure_queue_scaler.go +++ b/pkg/scalers/azure_queue_scaler.go @@ -19,7 +19,6 @@ package scalers import ( "context" "fmt" - "strconv" "strings" "github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue" @@ -34,37 +33,34 @@ import ( ) const ( - queueLengthMetricName = "queueLength" - activationQueueLengthMetricName = "activationQueueLength" - defaultTargetQueueLength = 5 - externalMetricType = "External" - QueueLengthStrategyAll string = "all" - QueueLengthStrategyVisibleOnly string = "visibleonly" + externalMetricType = "External" + queueLengthStrategyVisibleOnly = "visibleonly" ) -var ( - maxPeekMessages int32 = 32 -) +var maxPeekMessages int32 = 32 type azureQueueScaler struct { metricType v2.MetricTargetType - metadata *azureQueueMetadata + metadata azureQueueMetadata queueClient *azqueue.QueueClient logger logr.Logger } type azureQueueMetadata struct { - targetQueueLength int64 - activationTargetQueueLength int64 - queueName string - connection string - accountName string - endpointSuffix string - queueLengthStrategy string - triggerIndex int + ActivationQueueLength int64 `keda:"name=activationQueueLength, order=triggerMetadata, default=0"` + QueueName string `keda:"name=queueName, order=triggerMetadata"` + QueueLength int64 `keda:"name=queueLength, order=triggerMetadata, default=5"` + Connection string `keda:"name=connection, order=authParams;triggerMetadata;resolvedEnv, optional"` + AccountName string `keda:"name=accountName, order=triggerMetadata, optional"` + EndpointSuffix string `keda:"name=endpointSuffix, order=triggerMetadata, optional"` + QueueLengthStrategy string `keda:"name=queueLengthStrategy, order=triggerMetadata, enum=all;visibleonly, default=all"` + TriggerIndex int +} + +func (m *azureQueueMetadata) Validate() error { + return nil } -// NewAzureQueueScaler creates a new scaler for queue func NewAzureQueueScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { metricType, err := GetMetricTargetType(config) if err != nil { @@ -73,14 +69,14 @@ func NewAzureQueueScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { logger := InitializeLogger(config, "azure_queue_scaler") - meta, podIdentity, err := parseAzureQueueMetadata(config, logger) + meta, podIdentity, err := parseAzureQueueMetadata(config) if err != nil { return nil, fmt.Errorf("error parsing azure queue metadata: %w", err) } - queueClient, err := azure.GetStorageQueueClient(logger, podIdentity, meta.connection, meta.accountName, meta.endpointSuffix, meta.queueName, config.GlobalHTTPTimeout) + queueClient, err := azure.GetStorageQueueClient(logger, podIdentity, meta.Connection, meta.AccountName, meta.EndpointSuffix, meta.QueueName, config.GlobalHTTPTimeout) if err != nil { - return nil, fmt.Errorf("error creating azure blob client: %w", err) + return nil, fmt.Errorf("error creating azure queue client: %w", err) } return &azureQueueScaler{ @@ -91,56 +87,23 @@ func NewAzureQueueScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { }, nil } -func parseAzureQueueMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (*azureQueueMetadata, kedav1alpha1.AuthPodIdentity, error) { +func parseAzureQueueMetadata(config *scalersconfig.ScalerConfig) (azureQueueMetadata, kedav1alpha1.AuthPodIdentity, error) { meta := azureQueueMetadata{} - meta.targetQueueLength = defaultTargetQueueLength - - if val, ok := config.TriggerMetadata[queueLengthMetricName]; ok { - queueLength, err := strconv.ParseInt(val, 10, 64) - if err != nil { - logger.Error(err, "Error parsing azure queue metadata", "queueLengthMetricName", queueLengthMetricName) - return nil, kedav1alpha1.AuthPodIdentity{}, - fmt.Errorf("error parsing azure queue metadata %s: %w", queueLengthMetricName, err) - } - - meta.targetQueueLength = queueLength + err := config.TypedConfig(&meta) + if err != nil { + return meta, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("error parsing azure queue metadata: %w", err) } - meta.activationTargetQueueLength = 0 - if val, ok := config.TriggerMetadata[activationQueueLengthMetricName]; ok { - activationQueueLength, err := strconv.ParseInt(val, 10, 64) - if err != nil { - logger.Error(err, "Error parsing azure queue metadata", activationQueueLengthMetricName, activationQueueLengthMetricName) - return nil, kedav1alpha1.AuthPodIdentity{}, - fmt.Errorf("error parsing azure queue metadata %s: %w", activationQueueLengthMetricName, err) - } - - meta.activationTargetQueueLength = activationQueueLength + err = meta.Validate() + if err != nil { + return meta, kedav1alpha1.AuthPodIdentity{}, err } endpointSuffix, err := azure.ParseAzureStorageEndpointSuffix(config.TriggerMetadata, azure.QueueEndpoint) if err != nil { - return nil, kedav1alpha1.AuthPodIdentity{}, err - } - - meta.endpointSuffix = endpointSuffix - - if val, ok := config.TriggerMetadata["queueName"]; ok && val != "" { - meta.queueName = val - } else { - return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no queueName given") - } - - if val, ok := config.TriggerMetadata["queueLengthStrategy"]; ok && val != "" { - strategy := strings.ToLower(val) - if strategy == QueueLengthStrategyAll || strategy == QueueLengthStrategyVisibleOnly { - meta.queueLengthStrategy = strategy - } else { - return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("invalid queueLengthStrategy %s given", val) - } - } else { - meta.queueLengthStrategy = QueueLengthStrategyAll + return meta, kedav1alpha1.AuthPodIdentity{}, err } + meta.EndpointSuffix = endpointSuffix // If the Use AAD Pod Identity is not present, or set to "none" // then check for connection string @@ -148,48 +111,39 @@ func parseAzureQueueMetadata(config *scalersconfig.ScalerConfig, logger logr.Log case "", kedav1alpha1.PodIdentityProviderNone: // Azure Queue Scaler expects a "connection" parameter in the metadata // of the scaler or in a TriggerAuthentication object - if config.AuthParams["connection"] != "" { - // Found the connection in a parameter from TriggerAuthentication - meta.connection = config.AuthParams["connection"] - } else if config.TriggerMetadata["connectionFromEnv"] != "" { - meta.connection = config.ResolvedEnv[config.TriggerMetadata["connectionFromEnv"]] - } - - if len(meta.connection) == 0 { - return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no connection setting given") + if meta.Connection == "" { + return meta, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no connection setting given") } case kedav1alpha1.PodIdentityProviderAzureWorkload: // If the Use AAD Pod Identity is present then check account name - if val, ok := config.TriggerMetadata["accountName"]; ok && val != "" { - meta.accountName = val - } else { - return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no accountName given") + if meta.AccountName == "" { + return meta, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no accountName given") } default: - return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("pod identity %s not supported for azure storage queues", config.PodIdentity.Provider) + return meta, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("pod identity %s not supported for azure storage queues", config.PodIdentity.Provider) } - meta.triggerIndex = config.TriggerIndex - - return &meta, config.PodIdentity, nil + meta.TriggerIndex = config.TriggerIndex + return meta, config.PodIdentity, nil } func (s *azureQueueScaler) Close(context.Context) error { return nil } +// GetMetricsAndActivity returns value for a supported metric and an error if there is a problem getting the metric func (s *azureQueueScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { + metricName := kedautil.NormalizeString(fmt.Sprintf("azure-queue-%s", s.metadata.QueueName)) externalMetric := &v2.ExternalMetricSource{ Metric: v2.MetricIdentifier{ - Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("azure-queue-%s", s.metadata.queueName))), + Name: GenerateMetricNameWithIndex(s.metadata.TriggerIndex, metricName), }, - Target: GetMetricTarget(s.metricType, s.metadata.targetQueueLength), + Target: GetMetricTarget(s.metricType, s.metadata.QueueLength), } metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType} return []v2.MetricSpec{metricSpec} } -// GetMetricsAndActivity returns value for a supported metric and an error if there is a problem getting the metric func (s *azureQueueScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { queuelen, err := s.getMessageCount(ctx) if err != nil { @@ -198,12 +152,11 @@ func (s *azureQueueScaler) GetMetricsAndActivity(ctx context.Context, metricName } metric := GenerateMetricInMili(metricName, float64(queuelen)) - return []external_metrics.ExternalMetricValue{metric}, queuelen > s.metadata.activationTargetQueueLength, nil + return []external_metrics.ExternalMetricValue{metric}, queuelen > s.metadata.ActivationQueueLength, nil } func (s *azureQueueScaler) getMessageCount(ctx context.Context) (int64, error) { - strategy := strings.ToLower(s.metadata.queueLengthStrategy) - if strategy == QueueLengthStrategyVisibleOnly { + if strings.ToLower(s.metadata.QueueLengthStrategy) == queueLengthStrategyVisibleOnly { queue, err := s.queueClient.PeekMessages(ctx, &azqueue.PeekMessagesOptions{NumberOfMessages: &maxPeekMessages}) if err != nil { return 0, err diff --git a/pkg/scalers/azure_queue_scaler_test.go b/pkg/scalers/azure_queue_scaler_test.go index a36da33123c..e2e9ab73634 100644 --- a/pkg/scalers/azure_queue_scaler_test.go +++ b/pkg/scalers/azure_queue_scaler_test.go @@ -21,6 +21,8 @@ import ( "testing" "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" + v2 "k8s.io/api/autoscaling/v2" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" "github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" @@ -31,6 +33,7 @@ var testAzQueueResolvedEnv = map[string]string{ } type parseAzQueueMetadataTestData struct { + name string metadata map[string]string isError bool resolvedEnv map[string]string @@ -39,82 +42,281 @@ type parseAzQueueMetadataTestData struct { } type azQueueMetricIdentifier struct { + name string metadataTestData *parseAzQueueMetadataTestData triggerIndex int - name string + metricName string } var testAzQueueMetadata = []parseAzQueueMetadataTestData{ - // nothing passed - {map[string]string{}, true, testAzQueueResolvedEnv, map[string]string{}, ""}, - // properly formed - {map[string]string{"connectionFromEnv": "CONNECTION", "queueName": "sample", "queueLength": "5"}, false, testAzQueueResolvedEnv, map[string]string{}, ""}, - // Empty queueName - {map[string]string{"connectionFromEnv": "CONNECTION", "queueName": ""}, true, testAzQueueResolvedEnv, map[string]string{}, ""}, - // improperly formed queueLength - {map[string]string{"connectionFromEnv": "CONNECTION", "queueName": "sample", "queueLength": "AA"}, true, testAzQueueResolvedEnv, map[string]string{}, ""}, - // improperly formed activationQueueLength - {map[string]string{"connectionFromEnv": "CONNECTION", "queueName": "sample", "queueLength": "1", "activationQueueLength": "AA"}, true, testAzQueueResolvedEnv, map[string]string{}, ""}, - // podIdentity = azure-workload with account name - {map[string]string{"accountName": "sample_acc", "queueName": "sample_queue"}, false, testAzQueueResolvedEnv, map[string]string{}, kedav1alpha1.PodIdentityProviderAzureWorkload}, - // podIdentity = azure-workload without account name - {map[string]string{"accountName": "", "queueName": "sample_queue"}, true, testAzQueueResolvedEnv, map[string]string{}, kedav1alpha1.PodIdentityProviderAzureWorkload}, - // podIdentity = azure-workload without queue name - {map[string]string{"accountName": "sample_acc", "queueName": ""}, true, testAzQueueResolvedEnv, map[string]string{}, kedav1alpha1.PodIdentityProviderAzureWorkload}, - // podIdentity = azure-workload with cloud - {map[string]string{"accountName": "sample_acc", "queueName": "sample_queue", "cloud": "AzurePublicCloud"}, false, testAzQueueResolvedEnv, map[string]string{}, kedav1alpha1.PodIdentityProviderAzureWorkload}, - // podIdentity = azure-workload with invalid cloud - {map[string]string{"accountName": "sample_acc", "queueName": "sample_queue", "cloud": "InvalidCloud"}, true, testAzQueueResolvedEnv, map[string]string{}, kedav1alpha1.PodIdentityProviderAzureWorkload}, - // podIdentity = azure-workload with private cloud and endpoint suffix - {map[string]string{"accountName": "sample_acc", "queueName": "sample_queue", "cloud": "Private", "endpointSuffix": "queue.core.private.cloud"}, false, testAzQueueResolvedEnv, map[string]string{}, kedav1alpha1.PodIdentityProviderAzureWorkload}, - // podIdentity = azure-workload with private cloud and no endpoint suffix - {map[string]string{"accountName": "sample_acc", "queueName": "sample_queue", "cloud": "Private", "endpointSuffix": ""}, true, testAzQueueResolvedEnv, map[string]string{}, kedav1alpha1.PodIdentityProviderAzureWorkload}, - // podIdentity = azure-workload with endpoint suffix and no cloud - {map[string]string{"accountName": "sample_acc", "queueName": "sample_queue", "cloud": "", "endpointSuffix": "ignored"}, false, testAzQueueResolvedEnv, map[string]string{}, kedav1alpha1.PodIdentityProviderAzureWorkload}, - // connection from authParams - {map[string]string{"queueName": "sample", "queueLength": "5"}, false, testAzQueueResolvedEnv, map[string]string{"connection": "value"}, kedav1alpha1.PodIdentityProviderNone}, + { + name: "nothing passed", + metadata: map[string]string{}, + isError: true, + resolvedEnv: testAzQueueResolvedEnv, + authParams: map[string]string{}, + podIdentity: "", + }, + { + name: "properly formed", + metadata: map[string]string{"connectionFromEnv": "CONNECTION", "queueName": "sample", "queueLength": "5"}, + isError: false, + resolvedEnv: testAzQueueResolvedEnv, + authParams: map[string]string{}, + podIdentity: "", + }, + { + name: "empty queueName", + metadata: map[string]string{"connectionFromEnv": "CONNECTION", "queueName": ""}, + isError: true, + resolvedEnv: testAzQueueResolvedEnv, + authParams: map[string]string{}, + podIdentity: "", + }, + { + name: "improperly formed queueLength", + metadata: map[string]string{"connectionFromEnv": "CONNECTION", "queueName": "sample", "queueLength": "AA"}, + isError: true, + resolvedEnv: testAzQueueResolvedEnv, + authParams: map[string]string{}, + podIdentity: "", + }, + { + name: "improperly formed activationQueueLength", + metadata: map[string]string{"connectionFromEnv": "CONNECTION", "queueName": "sample", "queueLength": "1", "activationQueueLength": "AA"}, + isError: true, + resolvedEnv: testAzQueueResolvedEnv, + authParams: map[string]string{}, + podIdentity: "", + }, + { + name: "podIdentity azure-workload with account name", + metadata: map[string]string{"accountName": "sample_acc", "queueName": "sample_queue"}, + isError: false, + resolvedEnv: testAzQueueResolvedEnv, + authParams: map[string]string{}, + podIdentity: kedav1alpha1.PodIdentityProviderAzureWorkload, + }, + { + name: "podIdentity azure-workload without account name", + metadata: map[string]string{"accountName": "", "queueName": "sample_queue"}, + isError: true, + resolvedEnv: testAzQueueResolvedEnv, + authParams: map[string]string{}, + podIdentity: kedav1alpha1.PodIdentityProviderAzureWorkload, + }, + { + name: "podIdentity azure-workload without queue name", + metadata: map[string]string{"accountName": "sample_acc", "queueName": ""}, + isError: true, + resolvedEnv: testAzQueueResolvedEnv, + authParams: map[string]string{}, + podIdentity: kedav1alpha1.PodIdentityProviderAzureWorkload, + }, + { + name: "podIdentity azure-workload with cloud", + metadata: map[string]string{"accountName": "sample_acc", "queueName": "sample_queue", "cloud": "AzurePublicCloud"}, + isError: false, + resolvedEnv: testAzQueueResolvedEnv, + authParams: map[string]string{}, + podIdentity: kedav1alpha1.PodIdentityProviderAzureWorkload, + }, + { + name: "podIdentity azure-workload with invalid cloud", + metadata: map[string]string{"accountName": "sample_acc", "queueName": "sample_queue", "cloud": "InvalidCloud"}, + isError: true, + resolvedEnv: testAzQueueResolvedEnv, + authParams: map[string]string{}, + podIdentity: kedav1alpha1.PodIdentityProviderAzureWorkload, + }, + { + name: "podIdentity azure-workload with private cloud and endpoint suffix", + metadata: map[string]string{"accountName": "sample_acc", "queueName": "sample_queue", "cloud": "Private", "endpointSuffix": "queue.core.private.cloud"}, + isError: false, + resolvedEnv: testAzQueueResolvedEnv, + authParams: map[string]string{}, + podIdentity: kedav1alpha1.PodIdentityProviderAzureWorkload, + }, + { + name: "podIdentity azure-workload with private cloud and no endpoint suffix", + metadata: map[string]string{"accountName": "sample_acc", "queueName": "sample_queue", "cloud": "Private", "endpointSuffix": ""}, + isError: true, + resolvedEnv: testAzQueueResolvedEnv, + authParams: map[string]string{}, + podIdentity: kedav1alpha1.PodIdentityProviderAzureWorkload, + }, + { + name: "podIdentity azure-workload with endpoint suffix and no cloud", + metadata: map[string]string{"accountName": "sample_acc", "queueName": "sample_queue", "cloud": "", "endpointSuffix": "ignored"}, + isError: false, + resolvedEnv: testAzQueueResolvedEnv, + authParams: map[string]string{}, + podIdentity: kedav1alpha1.PodIdentityProviderAzureWorkload, + }, + { + name: "connection from authParams", + metadata: map[string]string{"queueName": "sample", "queueLength": "5"}, + isError: false, + resolvedEnv: testAzQueueResolvedEnv, + authParams: map[string]string{"connection": "value"}, + podIdentity: kedav1alpha1.PodIdentityProviderNone, + }, + { + name: "valid queueLengthStrategy all", + metadata: map[string]string{"connectionFromEnv": "CONNECTION", "queueName": "sample", "queueLength": "5", "queueLengthStrategy": "all"}, + isError: false, + resolvedEnv: testAzQueueResolvedEnv, + authParams: map[string]string{}, + podIdentity: "", + }, + { + name: "valid queueLengthStrategy visibleonly", + metadata: map[string]string{"connectionFromEnv": "CONNECTION", "queueName": "sample", "queueLength": "5", "queueLengthStrategy": "visibleonly"}, + isError: false, + resolvedEnv: testAzQueueResolvedEnv, + authParams: map[string]string{}, + podIdentity: "", + }, + { + name: "invalid queueLengthStrategy", + metadata: map[string]string{"connectionFromEnv": "CONNECTION", "queueName": "sample", "queueLength": "5", "queueLengthStrategy": "invalid"}, + isError: true, + resolvedEnv: testAzQueueResolvedEnv, + authParams: map[string]string{}, + podIdentity: "", + }, } var azQueueMetricIdentifiers = []azQueueMetricIdentifier{ - {&testAzQueueMetadata[1], 0, "s0-azure-queue-sample"}, - {&testAzQueueMetadata[5], 1, "s1-azure-queue-sample_queue"}, + { + name: "properly formed queue metric", + metadataTestData: &testAzQueueMetadata[1], + triggerIndex: 0, + metricName: "s0-azure-queue-sample", + }, + { + name: "azure-workload queue metric", + metadataTestData: &testAzQueueMetadata[5], + triggerIndex: 1, + metricName: "s1-azure-queue-sample_queue", + }, +} + +type mockAzureQueueClient struct { + peekMessageCount int + totalMessages int32 +} + +func (m *mockAzureQueueClient) getMessageCount(visibleOnly bool) int64 { + if visibleOnly { + if m.peekMessageCount >= 32 { + return int64(m.totalMessages) + } + return int64(m.peekMessageCount) + } + return int64(m.totalMessages) } func TestAzQueueParseMetadata(t *testing.T) { for _, testData := range testAzQueueMetadata { - _, podIdentity, err := parseAzureQueueMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, - ResolvedEnv: testData.resolvedEnv, AuthParams: testData.authParams, - PodIdentity: kedav1alpha1.AuthPodIdentity{Provider: testData.podIdentity}}, - logr.Discard()) - if err != nil && !testData.isError { - t.Error("Expected success but got error", err) - } - if testData.isError && err == nil { - t.Errorf("Expected error but got success. testData: %v", testData) - } - if testData.podIdentity != "" && testData.podIdentity != podIdentity.Provider && err == nil { - t.Error("Expected success but got error: podIdentity value is not returned as expected") - } + t.Run(testData.name, func(t *testing.T) { + config := &scalersconfig.ScalerConfig{ + TriggerMetadata: testData.metadata, + ResolvedEnv: testData.resolvedEnv, + AuthParams: testData.authParams, + PodIdentity: kedav1alpha1.AuthPodIdentity{Provider: testData.podIdentity}, + } + + _, podIdentity, err := parseAzureQueueMetadata(config) + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } + if testData.isError && err == nil { + t.Errorf("Expected error but got success. testData: %v", testData) + } + if testData.podIdentity != "" && testData.podIdentity != podIdentity.Provider && err == nil { + t.Error("Expected success but got error: podIdentity value is not returned as expected") + } + }) } } func TestAzQueueGetMetricSpecForScaling(t *testing.T) { for _, testData := range azQueueMetricIdentifiers { - meta, _, err := parseAzureQueueMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, - ResolvedEnv: testData.metadataTestData.resolvedEnv, AuthParams: testData.metadataTestData.authParams, - PodIdentity: kedav1alpha1.AuthPodIdentity{Provider: testData.metadataTestData.podIdentity}, TriggerIndex: testData.triggerIndex}, - logr.Discard()) - if err != nil { - t.Fatal("Could not parse metadata:", err) - } - mockAzQueueScaler := azureQueueScaler{ - metadata: meta, - } + t.Run(testData.name, func(t *testing.T) { + config := &scalersconfig.ScalerConfig{ + TriggerMetadata: testData.metadataTestData.metadata, + ResolvedEnv: testData.metadataTestData.resolvedEnv, + AuthParams: testData.metadataTestData.authParams, + PodIdentity: kedav1alpha1.AuthPodIdentity{Provider: testData.metadataTestData.podIdentity}, + TriggerIndex: testData.triggerIndex, + } - metricSpec := mockAzQueueScaler.GetMetricSpecForScaling(context.Background()) - metricName := metricSpec[0].External.Metric.Name - if metricName != testData.name { - t.Error("Wrong External metric source name:", metricName) - } + meta, _, err := parseAzureQueueMetadata(config) + if err != nil { + t.Fatal("Could not parse metadata:", err) + } + + mockAzQueueScaler := azureQueueScaler{ + metadata: meta, + logger: logr.Discard(), + metricType: v2.AverageValueMetricType, + } + + metricSpec := mockAzQueueScaler.GetMetricSpecForScaling(context.Background()) + metricName := metricSpec[0].External.Metric.Name + assert.Equal(t, testData.metricName, metricName) + }) + } +} + +func TestAzQueueGetMessageCount(t *testing.T) { + testCases := []struct { + name string + strategy string + peekMessages int + totalMessages int32 + expectedCount int64 + }{ + { + name: "default strategy (all)", + strategy: "", + peekMessages: 10, + totalMessages: 100, + expectedCount: 100, + }, + { + name: "explicit all strategy", + strategy: "all", + peekMessages: 10, + totalMessages: 100, + expectedCount: 100, + }, + { + name: "visibleonly strategy with less than 32 messages", + strategy: "visibleonly", + peekMessages: 10, + totalMessages: 100, + expectedCount: 10, + }, + { + name: "visibleonly strategy with 32 or more messages", + strategy: "visibleonly", + peekMessages: 35, + totalMessages: 100, + expectedCount: 100, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + mockClient := &mockAzureQueueClient{ + peekMessageCount: tc.peekMessages, + totalMessages: tc.totalMessages, + } + + count := mockClient.getMessageCount(tc.strategy == "visibleonly") + assert.Equal(t, tc.expectedCount, count) + }) } }