diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f3c5f251fb..ff3af5147d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - **General:**: Add ScaledObject/ScaledJob names to output of `kubectl get triggerauthentication/clustertriggerauthentication` ([#796](https://github.com/kedacore/keda/issues/796)) - **General:**: Add standalone CRD generation to release workflow ([#2726](https://github.com/kedacore/keda/issues/2726)) +- **Cloud Tasks Scaler**: Add functionality to scale based on the queue length ([#3613](https://github.com/kedacore/keda/issues/3613)) ### Fixes diff --git a/pkg/metricsservice/api/metrics.pb.go b/pkg/metricsservice/api/metrics.pb.go index e38e5e355a1..dabfd91be3b 100644 --- a/pkg/metricsservice/api/metrics.pb.go +++ b/pkg/metricsservice/api/metrics.pb.go @@ -16,7 +16,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.30.0 -// protoc v4.23.2 +// protoc v3.6.1 // source: metrics.proto package api diff --git a/pkg/metricsservice/api/metrics_grpc.pb.go b/pkg/metricsservice/api/metrics_grpc.pb.go index 9eae639dc04..09e1282a285 100644 --- a/pkg/metricsservice/api/metrics_grpc.pb.go +++ b/pkg/metricsservice/api/metrics_grpc.pb.go @@ -16,7 +16,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.2 +// - protoc v3.6.1 // source: metrics.proto package api diff --git a/pkg/scalers/externalscaler/externalscaler.pb.go b/pkg/scalers/externalscaler/externalscaler.pb.go index 71293580816..22a4904b49b 100644 --- a/pkg/scalers/externalscaler/externalscaler.pb.go +++ b/pkg/scalers/externalscaler/externalscaler.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.30.0 -// protoc v4.23.2 +// protoc v3.6.1 // source: externalscaler.proto package externalscaler diff --git a/pkg/scalers/externalscaler/externalscaler_grpc.pb.go b/pkg/scalers/externalscaler/externalscaler_grpc.pb.go index 5489ae58ac4..cdc64377d55 100644 --- a/pkg/scalers/externalscaler/externalscaler_grpc.pb.go +++ b/pkg/scalers/externalscaler/externalscaler_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.2 +// - protoc v3.6.1 // source: externalscaler.proto package externalscaler diff --git a/pkg/scalers/gcp_cloud_tasks_scaler.go b/pkg/scalers/gcp_cloud_tasks_scaler.go new file mode 100644 index 00000000000..2606a12756d --- /dev/null +++ b/pkg/scalers/gcp_cloud_tasks_scaler.go @@ -0,0 +1,185 @@ +package scalers + +import ( + "context" + "fmt" + "strconv" + + "github.com/go-logr/logr" + v2 "k8s.io/api/autoscaling/v2" + "k8s.io/metrics/pkg/apis/external_metrics" + + kedautil "github.com/kedacore/keda/v2/pkg/util" +) + +const ( + cloudTasksStackDriverQueueSize = "cloudtasks.googleapis.com/queue/depth" + + cloudTaskDefaultValue = 100 +) + +type cloudTasksScaler struct { + client *StackDriverClient + metricType v2.MetricTargetType + metadata *cloudTaskMetadata + logger logr.Logger +} + +type cloudTaskMetadata struct { + value float64 + activationValue float64 + + queueName string + projectId string + gcpAuthorization *gcpAuthorizationMetadata + scalerIndex int +} + +// NewCloudTaskScaler creates a new cloudTaskScaler +func NewCloudTasksScaler(config *ScalerConfig) (Scaler, error) { + metricType, err := GetMetricTargetType(config) + if err != nil { + return nil, fmt.Errorf("error getting scaler metric type: %w", err) + } + + logger := InitializeLogger(config, "gcp_cloud_tasks_scaler") + + meta, err := parseCloudTasksMetadata(config, logger) + if err != nil { + return nil, fmt.Errorf("error parsing Cloud Tasks metadata: %w", err) + } + + return &cloudTasksScaler{ + metricType: metricType, + metadata: meta, + logger: logger, + }, nil +} + +func parseCloudTasksMetadata(config *ScalerConfig, logger logr.Logger) (*cloudTaskMetadata, error) { + + meta := cloudTaskMetadata{} + + value, valuePresent := config.TriggerMetadata["value"] + + if valuePresent { + triggerValue, err := strconv.ParseFloat(value, 64) + if err != nil { + return nil, fmt.Errorf("value parsing error %w", err) + } + meta.value = triggerValue + } + + if val, ok := config.TriggerMetadata["queueName"]; ok { + if val == "" { + return nil, fmt.Errorf("no queue name given") + } + + meta.queueName = val + } else { + return nil, fmt.Errorf("no queue name given") + } + + meta.activationValue = 0 + if val, ok := config.TriggerMetadata["activationValue"]; ok { + activationValue, err := strconv.ParseFloat(val, 64) + if err != nil { + return nil, fmt.Errorf("activationValue parsing error %w", err) + } + meta.activationValue = activationValue + } + + if val, ok := config.TriggerMetadata["projectId"]; ok { + if val == "" { + return nil, fmt.Errorf("no project id given") + } + + meta.projectId = val + } else { + return nil, fmt.Errorf("no project id given") + } + + auth, err := getGCPAuthorization(config) + if err != nil { + return nil, err + } + meta.gcpAuthorization = auth + meta.scalerIndex = config.ScalerIndex + return &meta, nil +} + +func (s *cloudTasksScaler) Close(context.Context) error { + if s.client != nil { + err := s.client.metricsClient.Close() + s.client = nil + if err != nil { + s.logger.Error(err, "error closing StackDriver client") + } + } + + return nil +} + +// GetMetricSpecForScaling returns the metric spec for the HPA +func (s *cloudTasksScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { + externalMetric := &v2.ExternalMetricSource{ + Metric: v2.MetricIdentifier{ + Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("gcp-ct-%s", s.metadata.queueName))), + }, + Target: GetMetricTargetMili(s.metricType, s.metadata.value), + } + + // Create the metric spec for the HPA + metricSpec := v2.MetricSpec{ + External: externalMetric, + Type: externalMetricType, + } + + return []v2.MetricSpec{metricSpec} +} + +// GetMetricsAndActivity connects to Stack Driver and finds the size of the cloud task +func (s *cloudTasksScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { + metricType := cloudTasksStackDriverQueueSize + + value, err := s.getMetrics(ctx, metricType) + if err != nil { + s.logger.Error(err, "error getting metric", "metricType", metricType) + return []external_metrics.ExternalMetricValue{}, false, err + } + + metric := GenerateMetricInMili(metricName, value) + + return []external_metrics.ExternalMetricValue{metric}, value > s.metadata.activationValue, nil +} + +func (s *cloudTasksScaler) setStackdriverClient(ctx context.Context) error { + var client *StackDriverClient + var err error + if s.metadata.gcpAuthorization.podIdentityProviderEnabled { + client, err = NewStackDriverClientPodIdentity(ctx) + } else { + client, err = NewStackDriverClient(ctx, s.metadata.gcpAuthorization.GoogleApplicationCredentials) + } + + if err != nil { + return err + } + s.client = client + return nil +} + +// getMetrics gets metric type value from stackdriver api +func (s *cloudTasksScaler) getMetrics(ctx context.Context, metricType string) (float64, error) { + if s.client == nil { + err := s.setStackdriverClient(ctx) + if err != nil { + return -1, err + } + } + filter := `metric.type="` + metricType + `" AND resource.labels.queue_id="` + s.metadata.queueName + `"` + + // Cloud Tasks metrics are collected every 60 seconds so no need to aggregate them. + // See: https://cloud.google.com/monitoring/api/metrics_gcp#gcp-cloudtasks + return s.client.GetMetrics(ctx, filter, s.metadata.projectId, nil) +} diff --git a/pkg/scalers/gcp_cloud_tasks_scaler_test.go b/pkg/scalers/gcp_cloud_tasks_scaler_test.go new file mode 100644 index 00000000000..68977def13d --- /dev/null +++ b/pkg/scalers/gcp_cloud_tasks_scaler_test.go @@ -0,0 +1,79 @@ +package scalers + +import ( + "context" + "testing" + + "github.com/go-logr/logr" +) + +var testCloudTasksResolvedEnv = map[string]string{ + "SAMPLE_CREDS": "{}", +} + +type parseCloudTasksMetadataTestData struct { + authParams map[string]string + metadata map[string]string + isError bool +} + +type gcpCloudTasksMetricIdentifier struct { + metadataTestData *parseCloudTasksMetadataTestData + scalerIndex int + name string +} + +var testCloudTasksMetadata = []parseCloudTasksMetadataTestData{ + {map[string]string{}, map[string]string{}, true}, + // all properly formed + {nil, map[string]string{"queueName": "myQueue", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS", "projectId": "myproject", "activationValue": "5"}, false}, + // missing subscriptionName + {nil, map[string]string{"queueName": "", "value": "7", "projectId": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true}, + // missing credentials + {nil, map[string]string{"queueName": "myQueue", "value": "7", "projectId": "myproject", "credentialsFromEnv": ""}, true}, + // malformed subscriptionSize + {nil, map[string]string{"queueName": "myQueue", "value": "AA", "projectId": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true}, + // malformed mode + {nil, map[string]string{"queueName": "", "mode": "AA", "value": "7", "projectId": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true}, + // malformed activationTargetValue + {nil, map[string]string{"queueName": "myQueue", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS", "projectId": "myproject", "activationValue": "AA"}, true}, + // Credentials from AuthParams + {map[string]string{"GoogleApplicationCredentials": "Creds"}, map[string]string{"queueName": "myQueue", "value": "7", "projectId": "myproject"}, false}, + // Credentials from AuthParams with empty creds + {map[string]string{"GoogleApplicationCredentials": ""}, map[string]string{"queueName": "myQueue", "subscriptionSize": "7", "projectId": "myproject"}, true}, + // properly formed float value and activationTargetValue + {nil, map[string]string{"queueName": "mysubscription", "value": "7.1", "credentialsFromEnv": "SAMPLE_CREDS", "activationValue": "2.1", "projectId": "myproject"}, false}, +} + +var gcpCloudTasksMetricIdentifiers = []gcpCloudTasksMetricIdentifier{ + {&testCloudTasksMetadata[1], 0, "s0-gcp-ct-myQueue"}, + {&testCloudTasksMetadata[1], 1, "s1-gcp-ct-myQueue"}, +} + +func TestCloudTasksParseMetadata(t *testing.T) { + for _, testData := range testCloudTasksMetadata { + _, err := parseCloudTasksMetadata(&ScalerConfig{AuthParams: testData.authParams, TriggerMetadata: testData.metadata, ResolvedEnv: testCloudTasksResolvedEnv}, logr.Discard()) + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } + if testData.isError && err == nil { + t.Error("Expected error but got success") + } + } +} + +func TestGcpCloudTasksGetMetricSpecForScaling(t *testing.T) { + for _, testData := range gcpCloudTasksMetricIdentifiers { + meta, err := parseCloudTasksMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testCloudTasksResolvedEnv, ScalerIndex: testData.scalerIndex}, logr.Discard()) + if err != nil { + t.Fatal("Could not parse metadata:", err) + } + mockGcpCloudTasksScaler := cloudTasksScaler{nil, "", meta, logr.Discard()} + + metricSpec := mockGcpCloudTasksScaler.GetMetricSpecForScaling(context.Background()) + metricName := metricSpec[0].External.Metric.Name + if metricName != testData.name { + t.Error("Wrong External metric source name:", metricName) + } + } +} diff --git a/pkg/scalers/liiklus/LiiklusService.pb.go b/pkg/scalers/liiklus/LiiklusService.pb.go index da287d165bd..67e91a5e059 100644 --- a/pkg/scalers/liiklus/LiiklusService.pb.go +++ b/pkg/scalers/liiklus/LiiklusService.pb.go @@ -1,16 +1,16 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.30.0 -// protoc v4.23.2 +// protoc v3.6.1 // source: LiiklusService.proto package liiklus import ( + empty "github.com/golang/protobuf/ptypes/empty" + timestamp "github.com/golang/protobuf/ptypes/timestamp" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" - emptypb "google.golang.org/protobuf/types/known/emptypb" - timestamppb "google.golang.org/protobuf/types/known/timestamppb" reflect "reflect" sync "sync" ) @@ -807,11 +807,11 @@ type ReceiveReply_Record struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Offset uint64 `protobuf:"varint,1,opt,name=offset,proto3" json:"offset,omitempty"` - Key []byte `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"` - Value []byte `protobuf:"bytes,3,opt,name=value,proto3" json:"value,omitempty"` - Timestamp *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=timestamp,proto3" json:"timestamp,omitempty"` - Replay bool `protobuf:"varint,5,opt,name=replay,proto3" json:"replay,omitempty"` + Offset uint64 `protobuf:"varint,1,opt,name=offset,proto3" json:"offset,omitempty"` + Key []byte `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"` + Value []byte `protobuf:"bytes,3,opt,name=value,proto3" json:"value,omitempty"` + Timestamp *timestamp.Timestamp `protobuf:"bytes,4,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Replay bool `protobuf:"varint,5,opt,name=replay,proto3" json:"replay,omitempty"` } func (x *ReceiveReply_Record) Reset() { @@ -867,7 +867,7 @@ func (x *ReceiveReply_Record) GetValue() []byte { return nil } -func (x *ReceiveReply_Record) GetTimestamp() *timestamppb.Timestamp { +func (x *ReceiveReply_Record) GetTimestamp() *timestamp.Timestamp { if x != nil { return x.Timestamp } @@ -1073,8 +1073,8 @@ var file_LiiklusService_proto_goTypes = []interface{}{ (*ReceiveReply_Record)(nil), // 13: com.github.bsideup.liiklus.ReceiveReply.Record nil, // 14: com.github.bsideup.liiklus.GetOffsetsReply.OffsetsEntry nil, // 15: com.github.bsideup.liiklus.GetEndOffsetsReply.OffsetsEntry - (*timestamppb.Timestamp)(nil), // 16: google.protobuf.Timestamp - (*emptypb.Empty)(nil), // 17: google.protobuf.Empty + (*timestamp.Timestamp)(nil), // 16: google.protobuf.Timestamp + (*empty.Empty)(nil), // 17: google.protobuf.Empty } var file_LiiklusService_proto_depIdxs = []int32{ 0, // 0: com.github.bsideup.liiklus.SubscribeRequest.autoOffsetReset:type_name -> com.github.bsideup.liiklus.SubscribeRequest.AutoOffsetReset diff --git a/pkg/scalers/liiklus/LiiklusService_grpc.pb.go b/pkg/scalers/liiklus/LiiklusService_grpc.pb.go index 51480c39dae..5e4c6069751 100644 --- a/pkg/scalers/liiklus/LiiklusService_grpc.pb.go +++ b/pkg/scalers/liiklus/LiiklusService_grpc.pb.go @@ -1,17 +1,17 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.2 +// - protoc v3.6.1 // source: LiiklusService.proto package liiklus import ( context "context" + empty "github.com/golang/protobuf/ptypes/empty" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" - emptypb "google.golang.org/protobuf/types/known/emptypb" ) // This is a compile-time assertion to ensure that this generated file @@ -35,7 +35,7 @@ type LiiklusServiceClient interface { Publish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*PublishReply, error) Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (LiiklusService_SubscribeClient, error) Receive(ctx context.Context, in *ReceiveRequest, opts ...grpc.CallOption) (LiiklusService_ReceiveClient, error) - Ack(ctx context.Context, in *AckRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) + Ack(ctx context.Context, in *AckRequest, opts ...grpc.CallOption) (*empty.Empty, error) GetOffsets(ctx context.Context, in *GetOffsetsRequest, opts ...grpc.CallOption) (*GetOffsetsReply, error) GetEndOffsets(ctx context.Context, in *GetEndOffsetsRequest, opts ...grpc.CallOption) (*GetEndOffsetsReply, error) } @@ -121,8 +121,8 @@ func (x *liiklusServiceReceiveClient) Recv() (*ReceiveReply, error) { return m, nil } -func (c *liiklusServiceClient) Ack(ctx context.Context, in *AckRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { - out := new(emptypb.Empty) +func (c *liiklusServiceClient) Ack(ctx context.Context, in *AckRequest, opts ...grpc.CallOption) (*empty.Empty, error) { + out := new(empty.Empty) err := c.cc.Invoke(ctx, LiiklusService_Ack_FullMethodName, in, out, opts...) if err != nil { return nil, err @@ -155,7 +155,7 @@ type LiiklusServiceServer interface { Publish(context.Context, *PublishRequest) (*PublishReply, error) Subscribe(*SubscribeRequest, LiiklusService_SubscribeServer) error Receive(*ReceiveRequest, LiiklusService_ReceiveServer) error - Ack(context.Context, *AckRequest) (*emptypb.Empty, error) + Ack(context.Context, *AckRequest) (*empty.Empty, error) GetOffsets(context.Context, *GetOffsetsRequest) (*GetOffsetsReply, error) GetEndOffsets(context.Context, *GetEndOffsetsRequest) (*GetEndOffsetsReply, error) mustEmbedUnimplementedLiiklusServiceServer() @@ -174,7 +174,7 @@ func (UnimplementedLiiklusServiceServer) Subscribe(*SubscribeRequest, LiiklusSer func (UnimplementedLiiklusServiceServer) Receive(*ReceiveRequest, LiiklusService_ReceiveServer) error { return status.Errorf(codes.Unimplemented, "method Receive not implemented") } -func (UnimplementedLiiklusServiceServer) Ack(context.Context, *AckRequest) (*emptypb.Empty, error) { +func (UnimplementedLiiklusServiceServer) Ack(context.Context, *AckRequest) (*empty.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method Ack not implemented") } func (UnimplementedLiiklusServiceServer) GetOffsets(context.Context, *GetOffsetsRequest) (*GetOffsetsReply, error) { diff --git a/pkg/scaling/scalers_builder.go b/pkg/scaling/scalers_builder.go index 7a4a53b24e5..9ab413e47d3 100644 --- a/pkg/scaling/scalers_builder.go +++ b/pkg/scaling/scalers_builder.go @@ -157,6 +157,8 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string, return scalers.NewExternalMockScaler(config) case "external-push": return scalers.NewExternalPushScaler(config) + case "gcp-cloudtasks": + return scalers.NewCloudTasksScaler(config) case "gcp-pubsub": return scalers.NewPubSubScaler(config) case "gcp-stackdriver":