Skip to content

Commit

Permalink
Refactor azure queue scaler
Browse files Browse the repository at this point in the history
Signed-off-by: rickbrouwer <[email protected]>
  • Loading branch information
rickbrouwer committed Oct 24, 2024
1 parent b2ce95d commit 9ba08c2
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 116 deletions.
1 change: 1 addition & 0 deletions pkg/scalers/aws_sqs_queue_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
)

const (
defaultTargetQueueLength = 5
targetQueueLengthDefault = 5
activationTargetQueueLengthDefault = 0
defaultScaleOnInFlight = true
Expand Down
142 changes: 54 additions & 88 deletions pkg/scalers/azure_queue_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package scalers
import (
"context"
"fmt"
"strconv"
"strings"

"github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue"
Expand All @@ -34,37 +33,47 @@ import (
)

const (
queueLengthMetricName = "queueLength"
activationQueueLengthMetricName = "activationQueueLength"
defaultTargetQueueLength = 5
externalMetricType = "External"
QueueLengthStrategyAll string = "all"
QueueLengthStrategyVisibleOnly string = "visibleonly"
externalMetricType = "External"
queueLengthStrategyAll = "all"
queueLengthStrategyVisibleOnly = "visibleonly"
)

var (
maxPeekMessages int32 = 32
)
var maxPeekMessages int32 = 32

type azureQueueScaler struct {
metricType v2.MetricTargetType
metadata *azureQueueMetadata
metadata azureQueueMetadata
queueClient *azqueue.QueueClient
logger logr.Logger
}

type azureQueueMetadata struct {
targetQueueLength int64
activationTargetQueueLength int64
queueName string
connection string
accountName string
endpointSuffix string
queueLengthStrategy string
triggerIndex int
ActivationQueueLength int64 `keda:"name=activationQueueLength,order=triggerMetadata,default=0"`
QueueName string `keda:"name=queueName,order=triggerMetadata"`
QueueLength int64 `keda:"name=queueLength,order=triggerMetadata,default=5"`
Connection string `keda:"name=connection,order=authParams;triggerMetadata;resolvedEnv,optional"`
AccountName string `keda:"name=accountName,order=triggerMetadata,optional"`
EndpointSuffix string `keda:"name=endpointSuffix,order=triggerMetadata,optional"`
QueueLengthStrategy string `keda:"name=queueLengthStrategy,default=all"`
TriggerIndex int
}

func (m *azureQueueMetadata) Validate() error {
if m.QueueName == "" {
return fmt.Errorf("no queueName given")
}

if m.QueueLengthStrategy != "" {
strategy := strings.ToLower(m.QueueLengthStrategy)
if strategy != queueLengthStrategyAll && strategy != queueLengthStrategyVisibleOnly {
return fmt.Errorf("invalid queueLengthStrategy %s given", m.QueueLengthStrategy)
}
m.QueueLengthStrategy = strategy
}

return nil
}

// NewAzureQueueScaler creates a new scaler for queue
func NewAzureQueueScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
Expand All @@ -73,14 +82,14 @@ func NewAzureQueueScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {

logger := InitializeLogger(config, "azure_queue_scaler")

meta, podIdentity, err := parseAzureQueueMetadata(config, logger)
meta, podIdentity, err := parseAzureQueueMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing azure queue metadata: %w", err)
}

queueClient, err := azure.GetStorageQueueClient(logger, podIdentity, meta.connection, meta.accountName, meta.endpointSuffix, meta.queueName, config.GlobalHTTPTimeout)
queueClient, err := azure.GetStorageQueueClient(logger, podIdentity, meta.Connection, meta.AccountName, meta.EndpointSuffix, meta.QueueName, config.GlobalHTTPTimeout)
if err != nil {
return nil, fmt.Errorf("error creating azure blob client: %w", err)
return nil, fmt.Errorf("error creating azure queue client: %w", err)
}

return &azureQueueScaler{
Expand All @@ -91,105 +100,63 @@ func NewAzureQueueScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
}, nil
}

func parseAzureQueueMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (*azureQueueMetadata, kedav1alpha1.AuthPodIdentity, error) {
func parseAzureQueueMetadata(config *scalersconfig.ScalerConfig) (azureQueueMetadata, kedav1alpha1.AuthPodIdentity, error) {
meta := azureQueueMetadata{}
meta.targetQueueLength = defaultTargetQueueLength

if val, ok := config.TriggerMetadata[queueLengthMetricName]; ok {
queueLength, err := strconv.ParseInt(val, 10, 64)
if err != nil {
logger.Error(err, "Error parsing azure queue metadata", "queueLengthMetricName", queueLengthMetricName)
return nil, kedav1alpha1.AuthPodIdentity{},
fmt.Errorf("error parsing azure queue metadata %s: %w", queueLengthMetricName, err)
}

meta.targetQueueLength = queueLength
err := config.TypedConfig(&meta)
if err != nil {
return meta, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("error parsing azure queue metadata: %w", err)
}

meta.activationTargetQueueLength = 0
if val, ok := config.TriggerMetadata[activationQueueLengthMetricName]; ok {
activationQueueLength, err := strconv.ParseInt(val, 10, 64)
if err != nil {
logger.Error(err, "Error parsing azure queue metadata", activationQueueLengthMetricName, activationQueueLengthMetricName)
return nil, kedav1alpha1.AuthPodIdentity{},
fmt.Errorf("error parsing azure queue metadata %s: %w", activationQueueLengthMetricName, err)
}

meta.activationTargetQueueLength = activationQueueLength
err = meta.Validate()
if err != nil {
return meta, kedav1alpha1.AuthPodIdentity{}, err
}

endpointSuffix, err := azure.ParseAzureStorageEndpointSuffix(config.TriggerMetadata, azure.QueueEndpoint)
if err != nil {
return nil, kedav1alpha1.AuthPodIdentity{}, err
}

meta.endpointSuffix = endpointSuffix

if val, ok := config.TriggerMetadata["queueName"]; ok && val != "" {
meta.queueName = val
} else {
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no queueName given")
}

if val, ok := config.TriggerMetadata["queueLengthStrategy"]; ok && val != "" {
strategy := strings.ToLower(val)
if strategy == QueueLengthStrategyAll || strategy == QueueLengthStrategyVisibleOnly {
meta.queueLengthStrategy = strategy
} else {
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("invalid queueLengthStrategy %s given", val)
}
} else {
meta.queueLengthStrategy = QueueLengthStrategyAll
return meta, kedav1alpha1.AuthPodIdentity{}, err
}
meta.EndpointSuffix = endpointSuffix

// If the Use AAD Pod Identity is not present, or set to "none"
// then check for connection string
switch config.PodIdentity.Provider {
case "", kedav1alpha1.PodIdentityProviderNone:
// Azure Queue Scaler expects a "connection" parameter in the metadata
// of the scaler or in a TriggerAuthentication object
if config.AuthParams["connection"] != "" {
// Found the connection in a parameter from TriggerAuthentication
meta.connection = config.AuthParams["connection"]
} else if config.TriggerMetadata["connectionFromEnv"] != "" {
meta.connection = config.ResolvedEnv[config.TriggerMetadata["connectionFromEnv"]]
}

if len(meta.connection) == 0 {
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no connection setting given")
if meta.Connection == "" {
return meta, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no connection setting given")
}
case kedav1alpha1.PodIdentityProviderAzureWorkload:
// If the Use AAD Pod Identity is present then check account name
if val, ok := config.TriggerMetadata["accountName"]; ok && val != "" {
meta.accountName = val
} else {
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no accountName given")
if meta.AccountName == "" {
return meta, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no accountName given")
}
default:
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("pod identity %s not supported for azure storage queues", config.PodIdentity.Provider)
return meta, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("pod identity %s not supported for azure storage queues", config.PodIdentity.Provider)
}

meta.triggerIndex = config.TriggerIndex

return &meta, config.PodIdentity, nil
meta.TriggerIndex = config.TriggerIndex
return meta, config.PodIdentity, nil
}

func (s *azureQueueScaler) Close(context.Context) error {
return nil
}

// GetMetricsAndActivity returns value for a supported metric and an error if there is a problem getting the metric
func (s *azureQueueScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
metricName := kedautil.NormalizeString(fmt.Sprintf("azure-queue-%s", s.metadata.QueueName))
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("azure-queue-%s", s.metadata.queueName))),
Name: GenerateMetricNameWithIndex(s.metadata.TriggerIndex, metricName),
},
Target: GetMetricTarget(s.metricType, s.metadata.targetQueueLength),
Target: GetMetricTarget(s.metricType, s.metadata.QueueLength),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2.MetricSpec{metricSpec}
}

// GetMetricsAndActivity returns value for a supported metric and an error if there is a problem getting the metric
func (s *azureQueueScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
queuelen, err := s.getMessageCount(ctx)
if err != nil {
Expand All @@ -198,12 +165,11 @@ func (s *azureQueueScaler) GetMetricsAndActivity(ctx context.Context, metricName
}

metric := GenerateMetricInMili(metricName, float64(queuelen))
return []external_metrics.ExternalMetricValue{metric}, queuelen > s.metadata.activationTargetQueueLength, nil
return []external_metrics.ExternalMetricValue{metric}, queuelen > s.metadata.ActivationQueueLength, nil
}

func (s *azureQueueScaler) getMessageCount(ctx context.Context) (int64, error) {
strategy := strings.ToLower(s.metadata.queueLengthStrategy)
if strategy == QueueLengthStrategyVisibleOnly {
if strings.ToLower(s.metadata.QueueLengthStrategy) == queueLengthStrategyVisibleOnly {
queue, err := s.queueClient.PeekMessages(ctx, &azqueue.PeekMessagesOptions{NumberOfMessages: &maxPeekMessages})
if err != nil {
return 0, err
Expand Down
113 changes: 85 additions & 28 deletions pkg/scalers/azure_queue_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ package scalers

import (
"context"
"fmt"
"testing"

"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
v2 "k8s.io/api/autoscaling/v2"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
Expand Down Expand Up @@ -81,40 +84,94 @@ var azQueueMetricIdentifiers = []azQueueMetricIdentifier{
}

func TestAzQueueParseMetadata(t *testing.T) {
for _, testData := range testAzQueueMetadata {
_, podIdentity, err := parseAzureQueueMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata,
ResolvedEnv: testData.resolvedEnv, AuthParams: testData.authParams,
PodIdentity: kedav1alpha1.AuthPodIdentity{Provider: testData.podIdentity}},
logr.Discard())
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Errorf("Expected error but got success. testData: %v", testData)
}
if testData.podIdentity != "" && testData.podIdentity != podIdentity.Provider && err == nil {
t.Error("Expected success but got error: podIdentity value is not returned as expected")
for i, testData := range testAzQueueMetadata {
testName := fmt.Sprintf("test case %d", i)
switch i {
case 0:
testName = "nothing passed"
case 1:
testName = "properly formed"
case 2:
testName = "empty queueName"
case 3:
testName = "improperly formed queueLength"
case 4:
testName = "improperly formed activationQueueLength"
case 5:
testName = "podIdentity azure-workload with account name"
case 6:
testName = "podIdentity azure-workload without account name"
case 7:
testName = "podIdentity azure-workload without queue name"
case 8:
testName = "podIdentity azure-workload with cloud"
case 9:
testName = "podIdentity azure-workload with invalid cloud"
case 10:
testName = "podIdentity azure-workload with private cloud and endpoint suffix"
case 11:
testName = "podIdentity azure-workload with private cloud and no endpoint suffix"
case 12:
testName = "podIdentity azure-workload with endpoint suffix and no cloud"
case 13:
testName = "connection from authParams"
}

t.Run(testName, func(t *testing.T) {
config := &scalersconfig.ScalerConfig{
TriggerMetadata: testData.metadata,
ResolvedEnv: testData.resolvedEnv,
AuthParams: testData.authParams,
PodIdentity: kedav1alpha1.AuthPodIdentity{Provider: testData.podIdentity},
}

_, podIdentity, err := parseAzureQueueMetadata(config)
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Errorf("Expected error but got success. testData: %v", testData)
}
if testData.podIdentity != "" && testData.podIdentity != podIdentity.Provider && err == nil {
t.Error("Expected success but got error: podIdentity value is not returned as expected")
}
})
}
}

func TestAzQueueGetMetricSpecForScaling(t *testing.T) {
for _, testData := range azQueueMetricIdentifiers {
meta, _, err := parseAzureQueueMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata,
ResolvedEnv: testData.metadataTestData.resolvedEnv, AuthParams: testData.metadataTestData.authParams,
PodIdentity: kedav1alpha1.AuthPodIdentity{Provider: testData.metadataTestData.podIdentity}, TriggerIndex: testData.triggerIndex},
logr.Discard())
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockAzQueueScaler := azureQueueScaler{
metadata: meta,
for i, testData := range azQueueMetricIdentifiers {
testName := fmt.Sprintf("test case %d", i)
switch i {
case 0:
testName = "properly formed queue metric"
case 1:
testName = "azure-workload queue metric"
}

metricSpec := mockAzQueueScaler.GetMetricSpecForScaling(context.Background())
metricName := metricSpec[0].External.Metric.Name
if metricName != testData.name {
t.Error("Wrong External metric source name:", metricName)
}
t.Run(testName, func(t *testing.T) {
config := &scalersconfig.ScalerConfig{
TriggerMetadata: testData.metadataTestData.metadata,
ResolvedEnv: testData.metadataTestData.resolvedEnv,
AuthParams: testData.metadataTestData.authParams,
PodIdentity: kedav1alpha1.AuthPodIdentity{Provider: testData.metadataTestData.podIdentity},
TriggerIndex: testData.triggerIndex,
}

meta, _, err := parseAzureQueueMetadata(config)
if err != nil {
t.Fatal("Could not parse metadata:", err)
}

mockAzQueueScaler := azureQueueScaler{
metadata: meta,
logger: logr.Discard(),
metricType: v2.AverageValueMetricType,
}

metricSpec := mockAzQueueScaler.GetMetricSpecForScaling(context.Background())
metricName := metricSpec[0].External.Metric.Name
assert.Equal(t, testData.name, metricName)
})
}
}

0 comments on commit 9ba08c2

Please sign in to comment.