forked from kedacore/keda
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Add scaler for Google Cloud Tasks (kedacore#4834)
* feature: add cloud tasks scaler Signed-off-by: Jose Maria Alvarez <[email protected]> * Add cloud tasks e2e test Signed-off-by: Jose Maria Alvarez <[email protected]> * fix: Return files to the original state Signed-off-by: Jose Maria Alvarez <[email protected]> * fix: static checks Signed-off-by: Jose Maria Alvarez <[email protected]> * fix: more static checks fixed Signed-off-by: Jose Maria Alvarez <[email protected]> * fix: add location for queue Signed-off-by: Jose Maria Alvarez <[email protected]> * fix: add correct command to create test tasks in queue Signed-off-by: Jose Maria Alvarez <[email protected]> * fix: add fixes to test and add pod identity test Signed-off-by: Jose Maria Alvarez <[email protected]> * fix: specify location also in tasks Signed-off-by: Jose Maria Alvarez <[email protected]> * fix: correct indentation and location for purge Signed-off-by: Jose Maria Alvarez <[email protected]> * fix: correct naming, add package correctly to identity tests Signed-off-by: Jose Maria Alvarez <[email protected]> * fix: change test name Signed-off-by: Jose Maria Alvarez <[email protected]> * fix: add gcp as prefix for naming for clarity Signed-off-by: Jose Maria Alvarez <[email protected]> * fix: correct problem in test when changing name in struct Signed-off-by: Jose Maria Alvarez <[email protected]> * fix: order in changelog Signed-off-by: Jose Maria Alvarez <[email protected]> * fix: another order change in changelog Signed-off-by: Jose Maria Alvarez <[email protected]> * fix: more renaming Signed-off-by: Jose Maria Alvarez <[email protected]> * fix: put it in a new section Signed-off-by: Jose Maria Alvarez <[email protected]> * fix: try a new order for the changelog Signed-off-by: Jose Maria Alvarez <[email protected]> * fix: delete unneeded line Signed-off-by: Jose Maria Alvarez <[email protected]> * fix: into the new Changelog section Signed-off-by: Jose Maria Alvarez <[email protected]> * fix: another order fix Signed-off-by: Jose Maria Alvarez <[email protected]> --------- Signed-off-by: Jose Maria Alvarez <[email protected]> Signed-off-by: Jose Maria Alvarez Fernandez <[email protected]> Signed-off-by: anton.lysina <[email protected]>
- Loading branch information
1 parent
d868ef0
commit a262450
Showing
6 changed files
with
854 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,184 @@ | ||
package scalers | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"strconv" | ||
|
||
"github.com/go-logr/logr" | ||
v2 "k8s.io/api/autoscaling/v2" | ||
"k8s.io/metrics/pkg/apis/external_metrics" | ||
|
||
kedautil "github.com/kedacore/keda/v2/pkg/util" | ||
) | ||
|
||
const ( | ||
cloudTasksStackDriverQueueSize = "cloudtasks.googleapis.com/queue/depth" | ||
|
||
cloudTaskDefaultValue = 100 | ||
) | ||
|
||
type gcpCloudTasksScaler struct { | ||
client *StackDriverClient | ||
metricType v2.MetricTargetType | ||
metadata *gcpCloudTaskMetadata | ||
logger logr.Logger | ||
} | ||
|
||
type gcpCloudTaskMetadata struct { | ||
value float64 | ||
activationValue float64 | ||
|
||
queueName string | ||
projectID string | ||
gcpAuthorization *gcpAuthorizationMetadata | ||
scalerIndex int | ||
} | ||
|
||
// NewCloudTaskScaler creates a new cloudTaskScaler | ||
func NewGcpCloudTasksScaler(config *ScalerConfig) (Scaler, error) { | ||
metricType, err := GetMetricTargetType(config) | ||
if err != nil { | ||
return nil, fmt.Errorf("error getting scaler metric type: %w", err) | ||
} | ||
|
||
logger := InitializeLogger(config, "gcp_cloud_tasks_scaler") | ||
|
||
meta, err := parseGcpCloudTasksMetadata(config) | ||
if err != nil { | ||
return nil, fmt.Errorf("error parsing Cloud Tasks metadata: %w", err) | ||
} | ||
|
||
return &gcpCloudTasksScaler{ | ||
metricType: metricType, | ||
metadata: meta, | ||
logger: logger, | ||
}, nil | ||
} | ||
|
||
func parseGcpCloudTasksMetadata(config *ScalerConfig) (*gcpCloudTaskMetadata, error) { | ||
meta := gcpCloudTaskMetadata{value: cloudTaskDefaultValue} | ||
|
||
value, valuePresent := config.TriggerMetadata["value"] | ||
|
||
if valuePresent { | ||
triggerValue, err := strconv.ParseFloat(value, 64) | ||
if err != nil { | ||
return nil, fmt.Errorf("value parsing error %w", err) | ||
} | ||
meta.value = triggerValue | ||
} | ||
|
||
if val, ok := config.TriggerMetadata["queueName"]; ok { | ||
if val == "" { | ||
return nil, fmt.Errorf("no queue name given") | ||
} | ||
|
||
meta.queueName = val | ||
} else { | ||
return nil, fmt.Errorf("no queue name given") | ||
} | ||
|
||
meta.activationValue = 0 | ||
if val, ok := config.TriggerMetadata["activationValue"]; ok { | ||
activationValue, err := strconv.ParseFloat(val, 64) | ||
if err != nil { | ||
return nil, fmt.Errorf("activationValue parsing error %w", err) | ||
} | ||
meta.activationValue = activationValue | ||
} | ||
|
||
if val, ok := config.TriggerMetadata["projectID"]; ok { | ||
if val == "" { | ||
return nil, fmt.Errorf("no project id given") | ||
} | ||
|
||
meta.projectID = val | ||
} else { | ||
return nil, fmt.Errorf("no project id given") | ||
} | ||
|
||
auth, err := getGCPAuthorization(config) | ||
if err != nil { | ||
return nil, err | ||
} | ||
meta.gcpAuthorization = auth | ||
meta.scalerIndex = config.ScalerIndex | ||
return &meta, nil | ||
} | ||
|
||
func (s *gcpCloudTasksScaler) Close(context.Context) error { | ||
if s.client != nil { | ||
err := s.client.metricsClient.Close() | ||
s.client = nil | ||
if err != nil { | ||
s.logger.Error(err, "error closing StackDriver client") | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// GetMetricSpecForScaling returns the metric spec for the HPA | ||
func (s *gcpCloudTasksScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { | ||
externalMetric := &v2.ExternalMetricSource{ | ||
Metric: v2.MetricIdentifier{ | ||
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("gcp-ct-%s", s.metadata.queueName))), | ||
}, | ||
Target: GetMetricTargetMili(s.metricType, s.metadata.value), | ||
} | ||
|
||
// Create the metric spec for the HPA | ||
metricSpec := v2.MetricSpec{ | ||
External: externalMetric, | ||
Type: externalMetricType, | ||
} | ||
|
||
return []v2.MetricSpec{metricSpec} | ||
} | ||
|
||
// GetMetricsAndActivity connects to Stack Driver and finds the size of the cloud task | ||
func (s *gcpCloudTasksScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { | ||
metricType := cloudTasksStackDriverQueueSize | ||
|
||
value, err := s.getMetrics(ctx, metricType) | ||
if err != nil { | ||
s.logger.Error(err, "error getting metric", "metricType", metricType) | ||
return []external_metrics.ExternalMetricValue{}, false, err | ||
} | ||
|
||
metric := GenerateMetricInMili(metricName, value) | ||
|
||
return []external_metrics.ExternalMetricValue{metric}, value > s.metadata.activationValue, nil | ||
} | ||
|
||
func (s *gcpCloudTasksScaler) setStackdriverClient(ctx context.Context) error { | ||
var client *StackDriverClient | ||
var err error | ||
if s.metadata.gcpAuthorization.podIdentityProviderEnabled { | ||
client, err = NewStackDriverClientPodIdentity(ctx) | ||
} else { | ||
client, err = NewStackDriverClient(ctx, s.metadata.gcpAuthorization.GoogleApplicationCredentials) | ||
} | ||
|
||
if err != nil { | ||
return err | ||
} | ||
s.client = client | ||
return nil | ||
} | ||
|
||
// getMetrics gets metric type value from stackdriver api | ||
func (s *gcpCloudTasksScaler) getMetrics(ctx context.Context, metricType string) (float64, error) { | ||
if s.client == nil { | ||
err := s.setStackdriverClient(ctx) | ||
if err != nil { | ||
return -1, err | ||
} | ||
} | ||
filter := `metric.type="` + metricType + `" AND resource.labels.queue_id="` + s.metadata.queueName + `"` | ||
|
||
// Cloud Tasks metrics are collected every 60 seconds so no need to aggregate them. | ||
// See: https://cloud.google.com/monitoring/api/metrics_gcp#gcp-cloudtasks | ||
return s.client.GetMetrics(ctx, filter, s.metadata.projectID, nil) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
package scalers | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
"github.com/go-logr/logr" | ||
) | ||
|
||
var testGcpCloudTasksResolvedEnv = map[string]string{ | ||
"SAMPLE_CREDS": "{}", | ||
} | ||
|
||
type parseGcpCloudTasksMetadataTestData struct { | ||
authParams map[string]string | ||
metadata map[string]string | ||
isError bool | ||
} | ||
|
||
type gcpCloudTasksMetricIdentifier struct { | ||
metadataTestData *parseGcpCloudTasksMetadataTestData | ||
scalerIndex int | ||
name string | ||
} | ||
|
||
var testGcpCloudTasksMetadata = []parseGcpCloudTasksMetadataTestData{ | ||
{map[string]string{}, map[string]string{}, true}, | ||
// all properly formed | ||
{nil, map[string]string{"queueName": "myQueue", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS", "projectID": "myproject", "activationValue": "5"}, false}, | ||
// missing subscriptionName | ||
{nil, map[string]string{"queueName": "", "value": "7", "projectID": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true}, | ||
// missing credentials | ||
{nil, map[string]string{"queueName": "myQueue", "value": "7", "projectID": "myproject", "credentialsFromEnv": ""}, true}, | ||
// malformed subscriptionSize | ||
{nil, map[string]string{"queueName": "myQueue", "value": "AA", "projectID": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true}, | ||
// malformed mode | ||
{nil, map[string]string{"queueName": "", "mode": "AA", "value": "7", "projectID": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true}, | ||
// malformed activationTargetValue | ||
{nil, map[string]string{"queueName": "myQueue", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS", "projectID": "myproject", "activationValue": "AA"}, true}, | ||
// Credentials from AuthParams | ||
{map[string]string{"GoogleApplicationCredentials": "Creds"}, map[string]string{"queueName": "myQueue", "value": "7", "projectID": "myproject"}, false}, | ||
// Credentials from AuthParams with empty creds | ||
{map[string]string{"GoogleApplicationCredentials": ""}, map[string]string{"queueName": "myQueue", "subscriptionSize": "7", "projectID": "myproject"}, true}, | ||
// properly formed float value and activationTargetValue | ||
{nil, map[string]string{"queueName": "mysubscription", "value": "7.1", "credentialsFromEnv": "SAMPLE_CREDS", "activationValue": "2.1", "projectID": "myproject"}, false}, | ||
} | ||
|
||
var gcpCloudTasksMetricIdentifiers = []gcpCloudTasksMetricIdentifier{ | ||
{&testGcpCloudTasksMetadata[1], 0, "s0-gcp-ct-myQueue"}, | ||
{&testGcpCloudTasksMetadata[1], 1, "s1-gcp-ct-myQueue"}, | ||
} | ||
|
||
func TestGcpCloudTasksParseMetadata(t *testing.T) { | ||
for _, testData := range testGcpCloudTasksMetadata { | ||
_, err := parseGcpCloudTasksMetadata(&ScalerConfig{AuthParams: testData.authParams, TriggerMetadata: testData.metadata, ResolvedEnv: testGcpCloudTasksResolvedEnv}) | ||
if err != nil && !testData.isError { | ||
t.Error("Expected success but got error", err) | ||
} | ||
if testData.isError && err == nil { | ||
t.Error("Expected error but got success") | ||
} | ||
} | ||
} | ||
|
||
func TestGcpCloudTasksGetMetricSpecForScaling(t *testing.T) { | ||
for _, testData := range gcpCloudTasksMetricIdentifiers { | ||
meta, err := parseGcpCloudTasksMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testGcpCloudTasksResolvedEnv, ScalerIndex: testData.scalerIndex}) | ||
if err != nil { | ||
t.Fatal("Could not parse metadata:", err) | ||
} | ||
mockGcpCloudTasksScaler := gcpCloudTasksScaler{nil, "", meta, logr.Discard()} | ||
|
||
metricSpec := mockGcpCloudTasksScaler.GetMetricSpecForScaling(context.Background()) | ||
metricName := metricSpec[0].External.Metric.Name | ||
if metricName != testData.name { | ||
t.Error("Wrong External metric source name:", metricName) | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.