Skip to content

Commit

Permalink
feature: add cloud tasks scaler
Browse files Browse the repository at this point in the history
Signed-off-by: Jose Maria Alvarez <[email protected]>
  • Loading branch information
Jose Maria Alvarez committed Jul 31, 2023
1 parent be05e22 commit 4f737e4
Show file tree
Hide file tree
Showing 10 changed files with 289 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

- **General:**: Add ScaledObject/ScaledJob names to output of `kubectl get triggerauthentication/clustertriggerauthentication` ([#796](https://github.com/kedacore/keda/issues/796))
- **General:**: Add standalone CRD generation to release workflow ([#2726](https://github.com/kedacore/keda/issues/2726))
- **Cloud Tasks Scaler**: Add functionality to scale based on the queue length ([#3613](https://github.com/kedacore/keda/issues/3613))

### Fixes

Expand Down
2 changes: 1 addition & 1 deletion pkg/metricsservice/api/metrics.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/metricsservice/api/metrics_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/scalers/externalscaler/externalscaler.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/scalers/externalscaler/externalscaler_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

185 changes: 185 additions & 0 deletions pkg/scalers/gcp_cloud_tasks_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package scalers

import (
"context"
"fmt"
"strconv"

"github.com/go-logr/logr"
v2 "k8s.io/api/autoscaling/v2"
"k8s.io/metrics/pkg/apis/external_metrics"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)

const (
cloudTasksStackDriverQueueSize = "cloudtasks.googleapis.com/queue/depth"

cloudTaskDefaultValue = 100
)

type cloudTasksScaler struct {
client *StackDriverClient
metricType v2.MetricTargetType
metadata *cloudTaskMetadata
logger logr.Logger
}

type cloudTaskMetadata struct {
value float64
activationValue float64

queueName string
projectId string
gcpAuthorization *gcpAuthorizationMetadata
scalerIndex int
}

// NewCloudTaskScaler creates a new cloudTaskScaler
func NewCloudTasksScaler(config *ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
return nil, fmt.Errorf("error getting scaler metric type: %w", err)
}

logger := InitializeLogger(config, "gcp_cloud_tasks_scaler")

meta, err := parseCloudTasksMetadata(config, logger)
if err != nil {
return nil, fmt.Errorf("error parsing Cloud Tasks metadata: %w", err)
}

return &cloudTasksScaler{
metricType: metricType,
metadata: meta,
logger: logger,
}, nil
}

func parseCloudTasksMetadata(config *ScalerConfig, logger logr.Logger) (*cloudTaskMetadata, error) {

meta := cloudTaskMetadata{}

value, valuePresent := config.TriggerMetadata["value"]

if valuePresent {
triggerValue, err := strconv.ParseFloat(value, 64)
if err != nil {
return nil, fmt.Errorf("value parsing error %w", err)
}
meta.value = triggerValue
}

if val, ok := config.TriggerMetadata["queueName"]; ok {
if val == "" {
return nil, fmt.Errorf("no queue name given")
}

meta.queueName = val
} else {
return nil, fmt.Errorf("no queue name given")
}

meta.activationValue = 0
if val, ok := config.TriggerMetadata["activationValue"]; ok {
activationValue, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("activationValue parsing error %w", err)
}
meta.activationValue = activationValue
}

if val, ok := config.TriggerMetadata["projectId"]; ok {
if val == "" {
return nil, fmt.Errorf("no project id given")
}

meta.projectId = val
} else {
return nil, fmt.Errorf("no project id given")
}

auth, err := getGCPAuthorization(config)
if err != nil {
return nil, err
}
meta.gcpAuthorization = auth
meta.scalerIndex = config.ScalerIndex
return &meta, nil
}

func (s *cloudTasksScaler) Close(context.Context) error {
if s.client != nil {
err := s.client.metricsClient.Close()
s.client = nil
if err != nil {
s.logger.Error(err, "error closing StackDriver client")
}
}

return nil
}

// GetMetricSpecForScaling returns the metric spec for the HPA
func (s *cloudTasksScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("gcp-ct-%s", s.metadata.queueName))),
},
Target: GetMetricTargetMili(s.metricType, s.metadata.value),
}

// Create the metric spec for the HPA
metricSpec := v2.MetricSpec{
External: externalMetric,
Type: externalMetricType,
}

return []v2.MetricSpec{metricSpec}
}

// GetMetricsAndActivity connects to Stack Driver and finds the size of the cloud task
func (s *cloudTasksScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
metricType := cloudTasksStackDriverQueueSize

value, err := s.getMetrics(ctx, metricType)
if err != nil {
s.logger.Error(err, "error getting metric", "metricType", metricType)
return []external_metrics.ExternalMetricValue{}, false, err
}

metric := GenerateMetricInMili(metricName, value)

return []external_metrics.ExternalMetricValue{metric}, value > s.metadata.activationValue, nil
}

func (s *cloudTasksScaler) setStackdriverClient(ctx context.Context) error {
var client *StackDriverClient
var err error
if s.metadata.gcpAuthorization.podIdentityProviderEnabled {
client, err = NewStackDriverClientPodIdentity(ctx)
} else {
client, err = NewStackDriverClient(ctx, s.metadata.gcpAuthorization.GoogleApplicationCredentials)
}

if err != nil {
return err
}
s.client = client
return nil
}

// getMetrics gets metric type value from stackdriver api
func (s *cloudTasksScaler) getMetrics(ctx context.Context, metricType string) (float64, error) {
if s.client == nil {
err := s.setStackdriverClient(ctx)
if err != nil {
return -1, err
}
}
filter := `metric.type="` + metricType + `" AND resource.labels.queue_id="` + s.metadata.queueName + `"`

// Cloud Tasks metrics are collected every 60 seconds so no need to aggregate them.
// See: https://cloud.google.com/monitoring/api/metrics_gcp#gcp-cloudtasks
return s.client.GetMetrics(ctx, filter, s.metadata.projectId, nil)
}
79 changes: 79 additions & 0 deletions pkg/scalers/gcp_cloud_tasks_scaler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package scalers

import (
"context"
"testing"

"github.com/go-logr/logr"
)

var testCloudTasksResolvedEnv = map[string]string{
"SAMPLE_CREDS": "{}",
}

type parseCloudTasksMetadataTestData struct {
authParams map[string]string
metadata map[string]string
isError bool
}

type gcpCloudTasksMetricIdentifier struct {
metadataTestData *parseCloudTasksMetadataTestData
scalerIndex int
name string
}

var testCloudTasksMetadata = []parseCloudTasksMetadataTestData{
{map[string]string{}, map[string]string{}, true},
// all properly formed
{nil, map[string]string{"queueName": "myQueue", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS", "projectId": "myproject", "activationValue": "5"}, false},
// missing subscriptionName
{nil, map[string]string{"queueName": "", "value": "7", "projectId": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// missing credentials
{nil, map[string]string{"queueName": "myQueue", "value": "7", "projectId": "myproject", "credentialsFromEnv": ""}, true},
// malformed subscriptionSize
{nil, map[string]string{"queueName": "myQueue", "value": "AA", "projectId": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// malformed mode
{nil, map[string]string{"queueName": "", "mode": "AA", "value": "7", "projectId": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// malformed activationTargetValue
{nil, map[string]string{"queueName": "myQueue", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS", "projectId": "myproject", "activationValue": "AA"}, true},
// Credentials from AuthParams
{map[string]string{"GoogleApplicationCredentials": "Creds"}, map[string]string{"queueName": "myQueue", "value": "7", "projectId": "myproject"}, false},
// Credentials from AuthParams with empty creds
{map[string]string{"GoogleApplicationCredentials": ""}, map[string]string{"queueName": "myQueue", "subscriptionSize": "7", "projectId": "myproject"}, true},
// properly formed float value and activationTargetValue
{nil, map[string]string{"queueName": "mysubscription", "value": "7.1", "credentialsFromEnv": "SAMPLE_CREDS", "activationValue": "2.1", "projectId": "myproject"}, false},
}

var gcpCloudTasksMetricIdentifiers = []gcpCloudTasksMetricIdentifier{
{&testCloudTasksMetadata[1], 0, "s0-gcp-ct-myQueue"},
{&testCloudTasksMetadata[1], 1, "s1-gcp-ct-myQueue"},
}

func TestCloudTasksParseMetadata(t *testing.T) {
for _, testData := range testCloudTasksMetadata {
_, err := parseCloudTasksMetadata(&ScalerConfig{AuthParams: testData.authParams, TriggerMetadata: testData.metadata, ResolvedEnv: testCloudTasksResolvedEnv}, logr.Discard())
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
}
}

func TestGcpCloudTasksGetMetricSpecForScaling(t *testing.T) {
for _, testData := range gcpCloudTasksMetricIdentifiers {
meta, err := parseCloudTasksMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testCloudTasksResolvedEnv, ScalerIndex: testData.scalerIndex}, logr.Discard())
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockGcpCloudTasksScaler := cloudTasksScaler{nil, "", meta, logr.Discard()}

metricSpec := mockGcpCloudTasksScaler.GetMetricSpecForScaling(context.Background())
metricName := metricSpec[0].External.Metric.Name
if metricName != testData.name {
t.Error("Wrong External metric source name:", metricName)
}
}
}
22 changes: 11 additions & 11 deletions pkg/scalers/liiklus/LiiklusService.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 4f737e4

Please sign in to comment.