From a390a91d5433bfe52bdd68026174310d1be9b1ab Mon Sep 17 00:00:00 2001 From: rickbrouwer Date: Mon, 21 Oct 2024 14:07:24 +0200 Subject: [PATCH 1/3] Support Query at the Elasticsearch scaler (#6217) * Add elasticsearch query at e2e test Signed-off-by: Rick Brouwer * Support query at the Elasticsearch scaler Signed-off-by: rickbrouwer * Support query at the Elasticsearch scaler test Signed-off-by: rickbrouwer * Update changelog Signed-off-by: Rick Brouwer --------- Signed-off-by: Rick Brouwer Signed-off-by: rickbrouwer --- CHANGELOG.md | 1 + pkg/scalers/elasticsearch_scaler.go | 48 +++++++--- pkg/scalers/elasticsearch_scaler_test.go | 56 +++++++++++- .../elasticsearch/elasticsearch_test.go | 91 +++++++++++++++---- 4 files changed, 164 insertions(+), 32 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 449f61bef8e..c583cd2b8a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -73,6 +73,7 @@ Here is an overview of all new **experimental** features: ### Improvements - **AWS CloudWatch Scaler**: Add support for ignoreNullValues ([#5352](https://github.com/kedacore/keda/issues/5352)) +- **Elasticsearch Scaler**: Support Query at the Elasticsearch scaler ([#6216](https://github.com/kedacore/keda/issues/6216)) - **Etcd Scaler**: Add username and password support for etcd ([#6199](https://github.com/kedacore/keda/pull/6199)) - **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778)) - **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738)) diff --git a/pkg/scalers/elasticsearch_scaler.go b/pkg/scalers/elasticsearch_scaler.go index 44a27e8e463..70e2030d9bf 100644 --- a/pkg/scalers/elasticsearch_scaler.go +++ b/pkg/scalers/elasticsearch_scaler.go @@ -10,6 +10,7 @@ import ( "strings" "github.com/elastic/go-elasticsearch/v7" + "github.com/elastic/go-elasticsearch/v7/esapi" "github.com/go-logr/logr" "github.com/tidwall/gjson" v2 "k8s.io/api/autoscaling/v2" @@ -34,7 +35,8 @@ type elasticsearchMetadata struct { CloudID string `keda:"name=cloudID, order=authParams;triggerMetadata, optional"` APIKey string `keda:"name=apiKey, order=authParams;triggerMetadata, optional"` Index []string `keda:"name=index, order=authParams;triggerMetadata, separator=;"` - SearchTemplateName string `keda:"name=searchTemplateName, order=authParams;triggerMetadata"` + SearchTemplateName string `keda:"name=searchTemplateName, order=authParams;triggerMetadata, optional"` + Query string `keda:"name=query, order=authParams;triggerMetadata, optional"` Parameters []string `keda:"name=parameters, order=triggerMetadata, optional, separator=;"` ValueLocation string `keda:"name=valueLocation, order=authParams;triggerMetadata"` TargetValue float64 `keda:"name=targetValue, order=authParams;triggerMetadata"` @@ -57,6 +59,13 @@ func (m *elasticsearchMetadata) Validate() error { if len(m.Addresses) > 0 && (m.Username == "" || m.Password == "") { return fmt.Errorf("both username and password must be provided when addresses is used") } + if m.SearchTemplateName == "" && m.Query == "" { + return fmt.Errorf("either searchTemplateName or query must be provided") + } + if m.SearchTemplateName != "" && m.Query != "" { + return fmt.Errorf("cannot provide both searchTemplateName and query") + } + return nil } @@ -93,7 +102,12 @@ func parseElasticsearchMetadata(config *scalersconfig.ScalerConfig) (elasticsear return meta, err } - meta.MetricName = GenerateMetricNameWithIndex(config.TriggerIndex, util.NormalizeString(fmt.Sprintf("elasticsearch-%s", meta.SearchTemplateName))) + if meta.SearchTemplateName != "" { + meta.MetricName = GenerateMetricNameWithIndex(config.TriggerIndex, util.NormalizeString(fmt.Sprintf("elasticsearch-%s", meta.SearchTemplateName))) + } else { + meta.MetricName = GenerateMetricNameWithIndex(config.TriggerIndex, "elasticsearch-query") + } + meta.TriggerIndex = config.TriggerIndex return meta, nil @@ -137,17 +151,29 @@ func (s *elasticsearchScaler) Close(_ context.Context) error { // getQueryResult returns result of the scaler query func (s *elasticsearchScaler) getQueryResult(ctx context.Context) (float64, error) { // Build the request body. - var body bytes.Buffer - if err := json.NewEncoder(&body).Encode(buildQuery(&s.metadata)); err != nil { - s.logger.Error(err, "Error encoding query: %s", err) + var res *esapi.Response + var err error + + if s.metadata.SearchTemplateName != "" { + // Using SearchTemplateName + var body bytes.Buffer + if err := json.NewEncoder(&body).Encode(buildQuery(&s.metadata)); err != nil { + s.logger.Error(err, "Error encoding query: %s", err) + } + res, err = s.esClient.SearchTemplate( + &body, + s.esClient.SearchTemplate.WithIndex(s.metadata.Index...), + s.esClient.SearchTemplate.WithContext(ctx), + ) + } else { + // Using Query + res, err = s.esClient.Search( + s.esClient.Search.WithIndex(s.metadata.Index...), + s.esClient.Search.WithBody(strings.NewReader(s.metadata.Query)), + s.esClient.Search.WithContext(ctx), + ) } - // Run the templated search - res, err := s.esClient.SearchTemplate( - &body, - s.esClient.SearchTemplate.WithIndex(s.metadata.Index...), - s.esClient.SearchTemplate.WithContext(ctx), - ) if err != nil { s.logger.Error(err, fmt.Sprintf("Could not query elasticsearch: %s", err)) return 0, err diff --git a/pkg/scalers/elasticsearch_scaler_test.go b/pkg/scalers/elasticsearch_scaler_test.go index 95725065703..2e6966f0422 100644 --- a/pkg/scalers/elasticsearch_scaler_test.go +++ b/pkg/scalers/elasticsearch_scaler_test.go @@ -73,13 +73,34 @@ var testCases = []parseElasticsearchMetadataTestData{ expectedError: fmt.Errorf("missing required parameter \"index\""), }, { - name: "no searchTemplateName given", + name: "query and searchTemplateName provided", metadata: map[string]string{ - "addresses": "http://localhost:9200", - "index": "index1", + "addresses": "http://localhost:9200", + "index": "index1", + "query": `{"match": {"field": "value"}}`, + "searchTemplateName": "myTemplate", + "valueLocation": "hits.total.value", + "targetValue": "12", }, - authParams: map[string]string{"username": "admin"}, - expectedError: fmt.Errorf("missing required parameter \"searchTemplateName\""), + authParams: map[string]string{ + "username": "admin", + "password": "password", + }, + expectedError: fmt.Errorf("cannot provide both searchTemplateName and query"), + }, + { + name: "neither query nor searchTemplateName provided", + metadata: map[string]string{ + "addresses": "http://localhost:9200", + "index": "index1", + "valueLocation": "hits.total.value", + "targetValue": "12", + }, + authParams: map[string]string{ + "username": "admin", + "password": "password", + }, + expectedError: fmt.Errorf("either searchTemplateName or query must be provided"), }, { name: "no valueLocation given", @@ -306,6 +327,31 @@ var testCases = []parseElasticsearchMetadataTestData{ }, expectedError: nil, }, + { + name: "valid query parameter", + metadata: map[string]string{ + "addresses": "http://localhost:9200", + "index": "index1", + "query": `{"match": {"field": "value"}}`, + "valueLocation": "hits.total.value", + "targetValue": "12", + }, + authParams: map[string]string{ + "username": "admin", + "password": "password", + }, + expectedMetadata: &elasticsearchMetadata{ + Addresses: []string{"http://localhost:9200"}, + Index: []string{"index1"}, + Username: "admin", + Password: "password", + Query: `{"match": {"field": "value"}}`, + ValueLocation: "hits.total.value", + TargetValue: 12, + MetricName: "s0-elasticsearch-query", + }, + expectedError: nil, + }, } func TestParseElasticsearchMetadata(t *testing.T) { diff --git a/tests/scalers/elasticsearch/elasticsearch_test.go b/tests/scalers/elasticsearch/elasticsearch_test.go index de2d997066c..ea83f315298 100644 --- a/tests/scalers/elasticsearch/elasticsearch_test.go +++ b/tests/scalers/elasticsearch/elasticsearch_test.go @@ -202,7 +202,7 @@ spec: name: elasticsearch ` - scaledObjectTemplate = ` + scaledObjectTemplateSearchTemplate = ` apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: @@ -232,6 +232,54 @@ spec: name: keda-trigger-auth-elasticsearch-secret ` + scaledObjectTemplateQuery = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + minReplicaCount: 0 + maxReplicaCount: 2 + pollingInterval: 3 + cooldownPeriod: 5 + triggers: + - type: elasticsearch + metadata: + addresses: "http://{{.DeploymentName}}.{{.TestNamespace}}.svc:9200" + username: "elastic" + index: {{.IndexName}} + query: | + { + "query": { + "bool": { + "must": [ + { + "range": { + "@timestamp": { + "gte": "now-1m", + "lte": "now" + } + } + }, + { + "match_all": {} + } + ] + } + } + } + valueLocation: "hits.total.value" + targetValue: "1" + activationTargetValue: "4" + authenticationRef: + name: keda-trigger-auth-elasticsearch-secret +` + elasticsearchCreateIndex = ` { "mappings": { @@ -297,9 +345,6 @@ spec: func TestElasticsearchScaler(t *testing.T) { kc := GetKubernetesClient(t) data, templates := getTemplateData() - t.Cleanup(func() { - DeleteKubernetesResources(t, testNamespace, data, templates) - }) // Create kubernetes resources CreateKubernetesResources(t, kc, testNamespace, data, templates) @@ -307,13 +352,32 @@ func TestElasticsearchScaler(t *testing.T) { // setup elastic setupElasticsearch(t, kc) - assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, 3), - "replica count should be %d after 3 minutes", minReplicaCount) + t.Run("test with searchTemplateName", func(t *testing.T) { + t.Log("--- testing with searchTemplateName ---") + + // Create ScaledObject with searchTemplateName + KubectlApplyWithTemplate(t, data, "scaledObjectTemplateSearchTemplate", scaledObjectTemplateSearchTemplate) - // test scaling - testActivation(t, kc) - testScaleOut(t, kc) - testScaleIn(t, kc) + testElasticsearchScaler(t, kc) + + // Delete ScaledObject + KubectlDeleteWithTemplate(t, data, "scaledObjectTemplateSearchTemplate", scaledObjectTemplateSearchTemplate) + }) + + t.Run("test with query", func(t *testing.T) { + t.Log("--- testing with query ---") + + // Create ScaledObject with query + KubectlApplyWithTemplate(t, data, "scaledObjectTemplateQuery", scaledObjectTemplateQuery) + + testElasticsearchScaler(t, kc) + + // Delete ScaledObject + KubectlDeleteWithTemplate(t, data, "scaledObjectTemplateQuery", scaledObjectTemplateQuery) + }) + + // cleanup + DeleteKubernetesResources(t, testNamespace, data, templates) } func setupElasticsearch(t *testing.T, kc *kubernetes.Clientset) { @@ -326,22 +390,18 @@ func setupElasticsearch(t *testing.T, kc *kubernetes.Clientset) { require.NoErrorf(t, err, "cannot execute command - %s", err) } -func testActivation(t *testing.T, kc *kubernetes.Clientset) { +func testElasticsearchScaler(t *testing.T, kc *kubernetes.Clientset) { t.Log("--- testing activation ---") addElements(t, 3) AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, minReplicaCount, 60) -} -func testScaleOut(t *testing.T, kc *kubernetes.Clientset) { t.Log("--- testing scale out ---") addElements(t, 5) assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, 3), "replica count should be %d after 3 minutes", maxReplicaCount) -} -func testScaleIn(t *testing.T, kc *kubernetes.Clientset) { t.Log("--- testing scale in ---") assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, 3), @@ -383,6 +443,5 @@ func getTemplateData() (templateData, []Template) { {Name: "serviceTemplate", Config: serviceTemplate}, {Name: "elasticsearchDeploymentTemplate", Config: elasticsearchDeploymentTemplate}, {Name: "deploymentTemplate", Config: deploymentTemplate}, - {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, } } From d849c75069191c79c8a8f35f2dcaa1e49b9272e9 Mon Sep 17 00:00:00 2001 From: Rushen Wang <45029442+dovics@users.noreply.github.com> Date: Tue, 22 Oct 2024 18:50:54 +0800 Subject: [PATCH 2/3] Prevent multiple ScaledObjects managing one HPA (#6198) * Prevent multiple ScaledObjects managing one HPA Signed-off-by: wangrushen * fix: e2e test and mv changelog to improvements Signed-off-by: wangrushen * check nil for spec.advanced.horizontalPodAutoscalerConfig Signed-off-by: wangrushen --------- Signed-off-by: wangrushen --- CHANGELOG.md | 1 + apis/keda/v1alpha1/scaledobject_webhook.go | 16 +++++++ .../scaled_object_validation_test.go | 42 +++++++++++++++++++ 3 files changed, 59 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c583cd2b8a2..32f6c393aae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -72,6 +72,7 @@ Here is an overview of all new **experimental** features: ### Improvements +- **General**: Prevent multiple ScaledObjects managing one HPA ([#6130](https://github.com/kedacore/keda/issues/6130)) - **AWS CloudWatch Scaler**: Add support for ignoreNullValues ([#5352](https://github.com/kedacore/keda/issues/5352)) - **Elasticsearch Scaler**: Support Query at the Elasticsearch scaler ([#6216](https://github.com/kedacore/keda/issues/6216)) - **Etcd Scaler**: Add username and password support for etcd ([#6199](https://github.com/kedacore/keda/pull/6199)) diff --git a/apis/keda/v1alpha1/scaledobject_webhook.go b/apis/keda/v1alpha1/scaledobject_webhook.go index b96c984445b..41bd0355596 100644 --- a/apis/keda/v1alpha1/scaledobject_webhook.go +++ b/apis/keda/v1alpha1/scaledobject_webhook.go @@ -295,6 +295,7 @@ func verifyScaledObjects(incomingSo *ScaledObject, action string, _ bool) error return err } + incomingSoHpaName := getHpaName(*incomingSo) for _, so := range soList.Items { if so.Name == incomingSo.Name { continue @@ -315,6 +316,13 @@ func verifyScaledObjects(incomingSo *ScaledObject, action string, _ bool) error metricscollector.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "other-scaled-object") return err } + + if getHpaName(so) == incomingSoHpaName { + err = fmt.Errorf("the HPA '%s' is already managed by the ScaledObject '%s'", so.Spec.Advanced.HorizontalPodAutoscalerConfig.Name, so.Name) + scaledobjectlog.Error(err, "validation error") + metricscollector.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "other-scaled-object-hpa") + return err + } } // verify ScalingModifiers structure if defined in ScaledObject @@ -572,3 +580,11 @@ func isContainerResourceLimitSet(ctx context.Context, namespace string, triggerT return false } + +func getHpaName(so ScaledObject) string { + if so.Spec.Advanced == nil || so.Spec.Advanced.HorizontalPodAutoscalerConfig == nil || so.Spec.Advanced.HorizontalPodAutoscalerConfig.Name == "" { + return fmt.Sprintf("keda-hpa-%s", so.Name) + } + + return so.Spec.Advanced.HorizontalPodAutoscalerConfig.Name +} diff --git a/tests/internals/scaled_object_validation/scaled_object_validation_test.go b/tests/internals/scaled_object_validation/scaled_object_validation_test.go index 9cdaff34515..2af7f6b81d8 100644 --- a/tests/internals/scaled_object_validation/scaled_object_validation_test.go +++ b/tests/internals/scaled_object_validation/scaled_object_validation_test.go @@ -131,6 +131,27 @@ spec: desiredReplicas: '1' ` + customHpaScaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + advanced: + horizontalPodAutoscalerConfig: + name: {{.HpaName}} + triggers: + - type: cron + metadata: + timezone: Etc/UTC + start: 0 * * * * + end: 1 * * * * + desiredReplicas: '1' + ` + hpaTemplate = ` apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler @@ -179,6 +200,8 @@ func TestScaledObjectValidations(t *testing.T) { testScaledWorkloadByOtherScaledObject(t, data) + testManagedHpaByOtherScaledObject(t, data) + testScaledWorkloadByOtherHpa(t, data) testScaledWorkloadByOtherHpaWithOwnershipTransfer(t, data) @@ -220,6 +243,25 @@ func testScaledWorkloadByOtherScaledObject(t *testing.T, data templateData) { KubectlDeleteWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate) } +func testManagedHpaByOtherScaledObject(t *testing.T, data templateData) { + t.Log("--- already managed hpa by other scaledobject---") + + data.HpaName = hpaName + + data.ScaledObjectName = scaledObject1Name + err := KubectlApplyWithErrors(t, data, "scaledObjectTemplate", customHpaScaledObjectTemplate) + assert.NoErrorf(t, err, "cannot deploy the scaledObject - %s", err) + + data.ScaledObjectName = scaledObject2Name + data.DeploymentName = fmt.Sprintf("%s-other-deployment", testName) + err = KubectlApplyWithErrors(t, data, "scaledObjectTemplate", customHpaScaledObjectTemplate) + assert.Errorf(t, err, "can deploy the scaledObject - %s", err) + assert.Contains(t, err.Error(), fmt.Sprintf("the HPA '%s' is already managed by the ScaledObject '%s", hpaName, scaledObject1Name)) + + data.ScaledObjectName = scaledObject1Name + KubectlDeleteWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate) +} + func testScaledWorkloadByOtherHpa(t *testing.T, data templateData) { t.Log("--- already scaled workload by other hpa---") From 2cf3c4ce21324c0ea632b8f541099198e72b373a Mon Sep 17 00:00:00 2001 From: rickbrouwer Date: Wed, 23 Oct 2024 21:48:56 +0200 Subject: [PATCH 3/3] Refactor Loki Scaler (#6264) Signed-off-by: rickbrouwer --- pkg/scalers/loki_scaler.go | 213 +++++++++++-------------------- pkg/scalers/loki_scaler_test.go | 29 ++--- pkg/scalers/prometheus_scaler.go | 12 -- 3 files changed, 85 insertions(+), 169 deletions(-) diff --git a/pkg/scalers/loki_scaler.go b/pkg/scalers/loki_scaler.go index 11a43e5384c..dff08107f02 100644 --- a/pkg/scalers/loki_scaler.go +++ b/pkg/scalers/loki_scaler.go @@ -19,37 +19,27 @@ import ( ) const ( - lokiServerAddress = "serverAddress" - lokiQuery = "query" - lokiThreshold = "threshold" - lokiActivationThreshold = "activationThreshold" - lokiNamespace = "namespace" - tenantName = "tenantName" + defaultIgnoreNullValues = true tenantNameHeaderKey = "X-Scope-OrgID" - lokiIgnoreNullValues = "ignoreNullValues" -) - -var ( - lokiDefaultIgnoreNullValues = true ) type lokiScaler struct { metricType v2.MetricTargetType - metadata *lokiMetadata + metadata lokiMetadata httpClient *http.Client logger logr.Logger } type lokiMetadata struct { - serverAddress string - query string - threshold float64 - activationThreshold float64 - lokiAuth *authentication.AuthMeta - triggerIndex int - tenantName string - ignoreNullValues bool - unsafeSsl bool + ServerAddress string `keda:"name=serverAddress,order=triggerMetadata"` + Query string `keda:"name=query,order=triggerMetadata"` + Threshold float64 `keda:"name=threshold,order=triggerMetadata"` + ActivationThreshold float64 `keda:"name=activationThreshold,order=triggerMetadata,default=0"` + TenantName string `keda:"name=tenantName,order=triggerMetadata,optional"` + IgnoreNullValues bool `keda:"name=ignoreNullValues,order=triggerMetadata,default=true"` + UnsafeSsl bool `keda:"name=unsafeSsl,order=triggerMetadata,default=false"` + TriggerIndex int + Auth *authentication.AuthMeta } type lokiQueryResult struct { @@ -57,113 +47,54 @@ type lokiQueryResult struct { Data struct { ResultType string `json:"resultType"` Result []struct { - Metric struct { - } `json:"metric"` - Value []interface{} `json:"value"` + Metric struct{} `json:"metric"` + Value []interface{} `json:"value"` } `json:"result"` } `json:"data"` } -// NewLokiScaler returns a new lokiScaler func NewLokiScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { metricType, err := GetMetricTargetType(config) if err != nil { return nil, fmt.Errorf("error getting scaler metric type: %w", err) } - logger := InitializeLogger(config, "loki_scaler") - meta, err := parseLokiMetadata(config) if err != nil { return nil, fmt.Errorf("error parsing loki metadata: %w", err) } - httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, meta.unsafeSsl) + httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, meta.UnsafeSsl) return &lokiScaler{ metricType: metricType, metadata: meta, httpClient: httpClient, - logger: logger, + logger: InitializeLogger(config, "loki_scaler"), }, nil } -func parseLokiMetadata(config *scalersconfig.ScalerConfig) (meta *lokiMetadata, err error) { - meta = &lokiMetadata{} - - if val, ok := config.TriggerMetadata[lokiServerAddress]; ok && val != "" { - meta.serverAddress = val - } else { - return nil, fmt.Errorf("no %s given", lokiServerAddress) - } - - if val, ok := config.TriggerMetadata[lokiQuery]; ok && val != "" { - meta.query = val - } else { - return nil, fmt.Errorf("no %s given", lokiQuery) - } - - if val, ok := config.TriggerMetadata[lokiThreshold]; ok && val != "" { - t, err := strconv.ParseFloat(val, 64) - if err != nil { - return nil, fmt.Errorf("error parsing %s: %w", lokiThreshold, err) - } - - meta.threshold = t - } else { - if config.AsMetricSource { - meta.threshold = 0 - } else { - return nil, fmt.Errorf("no %s given", lokiThreshold) - } - } - - meta.activationThreshold = 0 - if val, ok := config.TriggerMetadata[lokiActivationThreshold]; ok { - t, err := strconv.ParseFloat(val, 64) - if err != nil { - return nil, fmt.Errorf("activationThreshold parsing error %w", err) - } - - meta.activationThreshold = t - } - - if val, ok := config.TriggerMetadata[tenantName]; ok && val != "" { - meta.tenantName = val +func parseLokiMetadata(config *scalersconfig.ScalerConfig) (lokiMetadata, error) { + meta := lokiMetadata{} + err := config.TypedConfig(&meta) + if err != nil { + return meta, fmt.Errorf("error parsing loki metadata: %w", err) } - meta.ignoreNullValues = lokiDefaultIgnoreNullValues - if val, ok := config.TriggerMetadata[lokiIgnoreNullValues]; ok && val != "" { - ignoreNullValues, err := strconv.ParseBool(val) - if err != nil { - return nil, fmt.Errorf("err incorrect value for ignoreNullValues given: %s please use true or false", val) - } - meta.ignoreNullValues = ignoreNullValues + if config.AsMetricSource { + meta.Threshold = 0 } - meta.unsafeSsl = false - if val, ok := config.TriggerMetadata[unsafeSsl]; ok && val != "" { - unsafeSslValue, err := strconv.ParseBool(val) - if err != nil { - return nil, fmt.Errorf("error parsing %s: %w", unsafeSsl, err) - } - - meta.unsafeSsl = unsafeSslValue - } - - meta.triggerIndex = config.TriggerIndex - - // parse auth configs from ScalerConfig auth, err := authentication.GetAuthConfigs(config.TriggerMetadata, config.AuthParams) if err != nil { - return nil, err + return meta, err } - meta.lokiAuth = auth + meta.Auth = auth + meta.TriggerIndex = config.TriggerIndex return meta, nil } -// Close returns a nil error func (s *lokiScaler) Close(context.Context) error { if s.httpClient != nil { s.httpClient.CloseIdleConnections() @@ -171,100 +102,101 @@ func (s *lokiScaler) Close(context.Context) error { return nil } -// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler func (s *lokiScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { externalMetric := &v2.ExternalMetricSource{ Metric: v2.MetricIdentifier{ - Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, "loki"), + Name: GenerateMetricNameWithIndex(s.metadata.TriggerIndex, "loki"), }, - Target: GetMetricTargetMili(s.metricType, s.metadata.threshold), - } - metricSpec := v2.MetricSpec{ - External: externalMetric, Type: externalMetricType, + Target: GetMetricTargetMili(s.metricType, s.metadata.Threshold), } + metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType} return []v2.MetricSpec{metricSpec} } -// ExecuteLokiQuery returns the result of the LogQL query execution func (s *lokiScaler) ExecuteLokiQuery(ctx context.Context) (float64, error) { - u, err := url.ParseRequestURI(s.metadata.serverAddress) + u, err := url.ParseRequestURI(s.metadata.ServerAddress) if err != nil { return -1, err } u.Path = "/loki/api/v1/query" - - u.RawQuery = url.Values{ - "query": []string{s.metadata.query}, - }.Encode() + u.RawQuery = url.Values{"query": []string{s.metadata.Query}}.Encode() req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil) if err != nil { return -1, err } - if s.metadata.lokiAuth != nil && s.metadata.lokiAuth.EnableBearerAuth { - req.Header.Add("Authorization", authentication.GetBearerToken(s.metadata.lokiAuth)) - } else if s.metadata.lokiAuth != nil && s.metadata.lokiAuth.EnableBasicAuth { - req.SetBasicAuth(s.metadata.lokiAuth.Username, s.metadata.lokiAuth.Password) + if s.metadata.Auth != nil { + if s.metadata.Auth.EnableBearerAuth { + req.Header.Add("Authorization", authentication.GetBearerToken(s.metadata.Auth)) + } else if s.metadata.Auth.EnableBasicAuth { + req.SetBasicAuth(s.metadata.Auth.Username, s.metadata.Auth.Password) + } } - if s.metadata.tenantName != "" { - req.Header.Add(tenantNameHeaderKey, s.metadata.tenantName) + if s.metadata.TenantName != "" { + req.Header.Add(tenantNameHeaderKey, s.metadata.TenantName) } - r, err := s.httpClient.Do(req) + resp, err := s.httpClient.Do(req) if err != nil { return -1, err } - defer r.Body.Close() + defer resp.Body.Close() - b, err := io.ReadAll(r.Body) + body, err := io.ReadAll(resp.Body) if err != nil { return -1, err } - if !(r.StatusCode >= 200 && r.StatusCode <= 299) { - err := fmt.Errorf("loki query api returned error. status: %d response: %s", r.StatusCode, string(b)) - s.logger.Error(err, "loki query api returned error") - return -1, err + if resp.StatusCode < 200 || resp.StatusCode > 299 { + return -1, fmt.Errorf("loki query api returned error. status: %d response: %s", resp.StatusCode, string(body)) } var result lokiQueryResult - err = json.Unmarshal(b, &result) - if err != nil { + if err := json.Unmarshal(body, &result); err != nil { return -1, err } - var v float64 = -1 + return s.parseQueryResult(result) +} - // allow for zero element or single element result sets +func (s *lokiScaler) parseQueryResult(result lokiQueryResult) (float64, error) { if len(result.Data.Result) == 0 { - if s.metadata.ignoreNullValues { + if s.metadata.IgnoreNullValues { return 0, nil } return -1, fmt.Errorf("loki metrics may be lost, the result is empty") - } else if len(result.Data.Result) > 1 { - return -1, fmt.Errorf("loki query %s returned multiple elements", s.metadata.query) } - valueLen := len(result.Data.Result[0].Value) - if valueLen == 0 { - if s.metadata.ignoreNullValues { + if len(result.Data.Result) > 1 { + return -1, fmt.Errorf("loki query %s returned multiple elements", s.metadata.Query) + } + + values := result.Data.Result[0].Value + if len(values) == 0 { + if s.metadata.IgnoreNullValues { return 0, nil } return -1, fmt.Errorf("loki metrics may be lost, the value list is empty") - } else if valueLen < 2 { - return -1, fmt.Errorf("loki query %s didn't return enough values", s.metadata.query) } - val := result.Data.Result[0].Value[1] - if val != nil { - str := val.(string) - v, err = strconv.ParseFloat(str, 64) - if err != nil { - s.logger.Error(err, "Error converting loki value", "loki_value", str) - return -1, err - } + if len(values) < 2 { + return -1, fmt.Errorf("loki query %s didn't return enough values", s.metadata.Query) + } + + if values[1] == nil { + return 0, nil + } + + str, ok := values[1].(string) + if !ok { + return -1, fmt.Errorf("failed to parse loki value as string") + } + + v, err := strconv.ParseFloat(str, 64) + if err != nil { + return -1, fmt.Errorf("error converting loki value %s: %w", str, err) } return v, nil @@ -279,6 +211,5 @@ func (s *lokiScaler) GetMetricsAndActivity(ctx context.Context, metricName strin } metric := GenerateMetricInMili(metricName, val) - - return []external_metrics.ExternalMetricValue{metric}, val > s.metadata.activationThreshold, nil + return []external_metrics.ExternalMetricValue{metric}, val > s.metadata.ActivationThreshold, nil } diff --git a/pkg/scalers/loki_scaler_test.go b/pkg/scalers/loki_scaler_test.go index 06f95f46419..e5f8082269d 100644 --- a/pkg/scalers/loki_scaler_test.go +++ b/pkg/scalers/loki_scaler_test.go @@ -38,7 +38,7 @@ var testLokiMetadata = []parseLokiMetadataTestData{ {map[string]string{"serverAddress": "http://localhost:3100", "threshold": "1", "query": ""}, true}, // ignoreNullValues with wrong value {map[string]string{"serverAddress": "http://localhost:3100", "threshold": "1", "query": "sum(rate({filename=\"/var/log/syslog\"}[1m])) by (level)", "ignoreNullValues": "xxxx"}, true}, - + // with unsafeSsl {map[string]string{"serverAddress": "https://localhost:3100", "threshold": "1", "query": "sum(rate({filename=\"/var/log/syslog\"}[1m])) by (level)", "unsafeSsl": "true"}, false}, } @@ -83,14 +83,14 @@ func TestLokiScalerAuthParams(t *testing.T) { } if err == nil { - if meta.lokiAuth.EnableBasicAuth && !strings.Contains(testData.metadata["authModes"], "basic") { + if meta.Auth.EnableBasicAuth && !strings.Contains(testData.metadata["authModes"], "basic") { t.Error("wrong auth mode detected") } } } } -type lokiQromQueryResultTestData struct { +type lokiQueryResultTestData struct { name string bodyStr string responseStatus int @@ -100,7 +100,7 @@ type lokiQromQueryResultTestData struct { unsafeSsl bool } -var testLokiQueryResult = []lokiQromQueryResultTestData{ +var testLokiQueryResult = []lokiQueryResultTestData{ { name: "no results", bodyStr: `{}`, @@ -189,17 +189,16 @@ func TestLokiScalerExecuteLogQLQuery(t *testing.T) { t.Run(testData.name, func(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, _ *http.Request) { writer.WriteHeader(testData.responseStatus) - if _, err := writer.Write([]byte(testData.bodyStr)); err != nil { t.Fatal(err) } })) scaler := lokiScaler{ - metadata: &lokiMetadata{ - serverAddress: server.URL, - ignoreNullValues: testData.ignoreNullValues, - unsafeSsl: testData.unsafeSsl, + metadata: lokiMetadata{ + ServerAddress: server.URL, + IgnoreNullValues: testData.ignoreNullValues, + UnsafeSsl: testData.unsafeSsl, }, httpClient: http.DefaultClient, logger: logr.Discard(), @@ -208,7 +207,6 @@ func TestLokiScalerExecuteLogQLQuery(t *testing.T) { value, err := scaler.ExecuteLokiQuery(context.TODO()) assert.Equal(t, testData.expectedValue, value) - if testData.isError { assert.Error(t, err) } else { @@ -219,7 +217,7 @@ func TestLokiScalerExecuteLogQLQuery(t *testing.T) { } func TestLokiScalerTenantHeader(t *testing.T) { - testData := lokiQromQueryResultTestData{ + testData := lokiQueryResultTestData{ name: "no values", bodyStr: `{"data":{"result":[]}}`, responseStatus: http.StatusOK, @@ -238,15 +236,14 @@ func TestLokiScalerTenantHeader(t *testing.T) { })) scaler := lokiScaler{ - metadata: &lokiMetadata{ - serverAddress: server.URL, - tenantName: tenantName, - ignoreNullValues: testData.ignoreNullValues, + metadata: lokiMetadata{ + ServerAddress: server.URL, + TenantName: tenantName, + IgnoreNullValues: testData.ignoreNullValues, }, httpClient: http.DefaultClient, } _, err := scaler.ExecuteLokiQuery(context.TODO()) - assert.NoError(t, err) } diff --git a/pkg/scalers/prometheus_scaler.go b/pkg/scalers/prometheus_scaler.go index 5a0516f42a0..521d5693442 100644 --- a/pkg/scalers/prometheus_scaler.go +++ b/pkg/scalers/prometheus_scaler.go @@ -25,18 +25,6 @@ import ( kedautil "github.com/kedacore/keda/v2/pkg/util" ) -const ( - promServerAddress = "serverAddress" - promQuery = "query" - promQueryParameters = "queryParameters" - promThreshold = "threshold" - promActivationThreshold = "activationThreshold" - promNamespace = "namespace" - promCustomHeaders = "customHeaders" - ignoreNullValues = "ignoreNullValues" - unsafeSsl = "unsafeSsl" -) - type prometheusScaler struct { metricType v2.MetricTargetType metadata *prometheusMetadata