Skip to content

Commit

Permalink
Support Query at the Elasticsearch scaler (#6217)
Browse files Browse the repository at this point in the history
* Add elasticsearch query at e2e test

Signed-off-by: Rick Brouwer <[email protected]>

* Support query at the Elasticsearch scaler

Signed-off-by: rickbrouwer <[email protected]>

* Support query at the Elasticsearch scaler test

Signed-off-by: rickbrouwer <[email protected]>

* Update changelog

Signed-off-by: Rick Brouwer <[email protected]>

---------

Signed-off-by: Rick Brouwer <[email protected]>
Signed-off-by: rickbrouwer <[email protected]>
  • Loading branch information
rickbrouwer authored Oct 21, 2024
1 parent 1d52295 commit a390a91
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 32 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ Here is an overview of all new **experimental** features:
### Improvements

- **AWS CloudWatch Scaler**: Add support for ignoreNullValues ([#5352](https://github.com/kedacore/keda/issues/5352))
- **Elasticsearch Scaler**: Support Query at the Elasticsearch scaler ([#6216](https://github.com/kedacore/keda/issues/6216))
- **Etcd Scaler**: Add username and password support for etcd ([#6199](https://github.com/kedacore/keda/pull/6199))
- **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778))
- **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738))
Expand Down
48 changes: 37 additions & 11 deletions pkg/scalers/elasticsearch_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"

"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/go-logr/logr"
"github.com/tidwall/gjson"
v2 "k8s.io/api/autoscaling/v2"
Expand All @@ -34,7 +35,8 @@ type elasticsearchMetadata struct {
CloudID string `keda:"name=cloudID, order=authParams;triggerMetadata, optional"`
APIKey string `keda:"name=apiKey, order=authParams;triggerMetadata, optional"`
Index []string `keda:"name=index, order=authParams;triggerMetadata, separator=;"`
SearchTemplateName string `keda:"name=searchTemplateName, order=authParams;triggerMetadata"`
SearchTemplateName string `keda:"name=searchTemplateName, order=authParams;triggerMetadata, optional"`
Query string `keda:"name=query, order=authParams;triggerMetadata, optional"`
Parameters []string `keda:"name=parameters, order=triggerMetadata, optional, separator=;"`
ValueLocation string `keda:"name=valueLocation, order=authParams;triggerMetadata"`
TargetValue float64 `keda:"name=targetValue, order=authParams;triggerMetadata"`
Expand All @@ -57,6 +59,13 @@ func (m *elasticsearchMetadata) Validate() error {
if len(m.Addresses) > 0 && (m.Username == "" || m.Password == "") {
return fmt.Errorf("both username and password must be provided when addresses is used")
}
if m.SearchTemplateName == "" && m.Query == "" {
return fmt.Errorf("either searchTemplateName or query must be provided")
}
if m.SearchTemplateName != "" && m.Query != "" {
return fmt.Errorf("cannot provide both searchTemplateName and query")
}

return nil
}

Expand Down Expand Up @@ -93,7 +102,12 @@ func parseElasticsearchMetadata(config *scalersconfig.ScalerConfig) (elasticsear
return meta, err
}

meta.MetricName = GenerateMetricNameWithIndex(config.TriggerIndex, util.NormalizeString(fmt.Sprintf("elasticsearch-%s", meta.SearchTemplateName)))
if meta.SearchTemplateName != "" {
meta.MetricName = GenerateMetricNameWithIndex(config.TriggerIndex, util.NormalizeString(fmt.Sprintf("elasticsearch-%s", meta.SearchTemplateName)))
} else {
meta.MetricName = GenerateMetricNameWithIndex(config.TriggerIndex, "elasticsearch-query")
}

meta.TriggerIndex = config.TriggerIndex

return meta, nil
Expand Down Expand Up @@ -137,17 +151,29 @@ func (s *elasticsearchScaler) Close(_ context.Context) error {
// getQueryResult returns result of the scaler query
func (s *elasticsearchScaler) getQueryResult(ctx context.Context) (float64, error) {
// Build the request body.
var body bytes.Buffer
if err := json.NewEncoder(&body).Encode(buildQuery(&s.metadata)); err != nil {
s.logger.Error(err, "Error encoding query: %s", err)
var res *esapi.Response
var err error

if s.metadata.SearchTemplateName != "" {
// Using SearchTemplateName
var body bytes.Buffer
if err := json.NewEncoder(&body).Encode(buildQuery(&s.metadata)); err != nil {
s.logger.Error(err, "Error encoding query: %s", err)
}
res, err = s.esClient.SearchTemplate(
&body,
s.esClient.SearchTemplate.WithIndex(s.metadata.Index...),
s.esClient.SearchTemplate.WithContext(ctx),
)
} else {
// Using Query
res, err = s.esClient.Search(
s.esClient.Search.WithIndex(s.metadata.Index...),
s.esClient.Search.WithBody(strings.NewReader(s.metadata.Query)),
s.esClient.Search.WithContext(ctx),
)
}

// Run the templated search
res, err := s.esClient.SearchTemplate(
&body,
s.esClient.SearchTemplate.WithIndex(s.metadata.Index...),
s.esClient.SearchTemplate.WithContext(ctx),
)
if err != nil {
s.logger.Error(err, fmt.Sprintf("Could not query elasticsearch: %s", err))
return 0, err
Expand Down
56 changes: 51 additions & 5 deletions pkg/scalers/elasticsearch_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,34 @@ var testCases = []parseElasticsearchMetadataTestData{
expectedError: fmt.Errorf("missing required parameter \"index\""),
},
{
name: "no searchTemplateName given",
name: "query and searchTemplateName provided",
metadata: map[string]string{
"addresses": "http://localhost:9200",
"index": "index1",
"addresses": "http://localhost:9200",
"index": "index1",
"query": `{"match": {"field": "value"}}`,
"searchTemplateName": "myTemplate",
"valueLocation": "hits.total.value",
"targetValue": "12",
},
authParams: map[string]string{"username": "admin"},
expectedError: fmt.Errorf("missing required parameter \"searchTemplateName\""),
authParams: map[string]string{
"username": "admin",
"password": "password",
},
expectedError: fmt.Errorf("cannot provide both searchTemplateName and query"),
},
{
name: "neither query nor searchTemplateName provided",
metadata: map[string]string{
"addresses": "http://localhost:9200",
"index": "index1",
"valueLocation": "hits.total.value",
"targetValue": "12",
},
authParams: map[string]string{
"username": "admin",
"password": "password",
},
expectedError: fmt.Errorf("either searchTemplateName or query must be provided"),
},
{
name: "no valueLocation given",
Expand Down Expand Up @@ -306,6 +327,31 @@ var testCases = []parseElasticsearchMetadataTestData{
},
expectedError: nil,
},
{
name: "valid query parameter",
metadata: map[string]string{
"addresses": "http://localhost:9200",
"index": "index1",
"query": `{"match": {"field": "value"}}`,
"valueLocation": "hits.total.value",
"targetValue": "12",
},
authParams: map[string]string{
"username": "admin",
"password": "password",
},
expectedMetadata: &elasticsearchMetadata{
Addresses: []string{"http://localhost:9200"},
Index: []string{"index1"},
Username: "admin",
Password: "password",
Query: `{"match": {"field": "value"}}`,
ValueLocation: "hits.total.value",
TargetValue: 12,
MetricName: "s0-elasticsearch-query",
},
expectedError: nil,
},
}

func TestParseElasticsearchMetadata(t *testing.T) {
Expand Down
91 changes: 75 additions & 16 deletions tests/scalers/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ spec:
name: elasticsearch
`

scaledObjectTemplate = `
scaledObjectTemplateSearchTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
Expand Down Expand Up @@ -232,6 +232,54 @@ spec:
name: keda-trigger-auth-elasticsearch-secret
`

scaledObjectTemplateQuery = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: {{.ScaledObjectName}}
namespace: {{.TestNamespace}}
labels:
app: {{.DeploymentName}}
spec:
scaleTargetRef:
name: {{.DeploymentName}}
minReplicaCount: 0
maxReplicaCount: 2
pollingInterval: 3
cooldownPeriod: 5
triggers:
- type: elasticsearch
metadata:
addresses: "http://{{.DeploymentName}}.{{.TestNamespace}}.svc:9200"
username: "elastic"
index: {{.IndexName}}
query: |
{
"query": {
"bool": {
"must": [
{
"range": {
"@timestamp": {
"gte": "now-1m",
"lte": "now"
}
}
},
{
"match_all": {}
}
]
}
}
}
valueLocation: "hits.total.value"
targetValue: "1"
activationTargetValue: "4"
authenticationRef:
name: keda-trigger-auth-elasticsearch-secret
`

elasticsearchCreateIndex = `
{
"mappings": {
Expand Down Expand Up @@ -297,23 +345,39 @@ spec:
func TestElasticsearchScaler(t *testing.T) {
kc := GetKubernetesClient(t)
data, templates := getTemplateData()
t.Cleanup(func() {
DeleteKubernetesResources(t, testNamespace, data, templates)
})

// Create kubernetes resources
CreateKubernetesResources(t, kc, testNamespace, data, templates)

// setup elastic
setupElasticsearch(t, kc)

assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, 3),
"replica count should be %d after 3 minutes", minReplicaCount)
t.Run("test with searchTemplateName", func(t *testing.T) {
t.Log("--- testing with searchTemplateName ---")

// Create ScaledObject with searchTemplateName
KubectlApplyWithTemplate(t, data, "scaledObjectTemplateSearchTemplate", scaledObjectTemplateSearchTemplate)

// test scaling
testActivation(t, kc)
testScaleOut(t, kc)
testScaleIn(t, kc)
testElasticsearchScaler(t, kc)

// Delete ScaledObject
KubectlDeleteWithTemplate(t, data, "scaledObjectTemplateSearchTemplate", scaledObjectTemplateSearchTemplate)
})

t.Run("test with query", func(t *testing.T) {
t.Log("--- testing with query ---")

// Create ScaledObject with query
KubectlApplyWithTemplate(t, data, "scaledObjectTemplateQuery", scaledObjectTemplateQuery)

testElasticsearchScaler(t, kc)

// Delete ScaledObject
KubectlDeleteWithTemplate(t, data, "scaledObjectTemplateQuery", scaledObjectTemplateQuery)
})

// cleanup
DeleteKubernetesResources(t, testNamespace, data, templates)
}

func setupElasticsearch(t *testing.T, kc *kubernetes.Clientset) {
Expand All @@ -326,22 +390,18 @@ func setupElasticsearch(t *testing.T, kc *kubernetes.Clientset) {
require.NoErrorf(t, err, "cannot execute command - %s", err)
}

func testActivation(t *testing.T, kc *kubernetes.Clientset) {
func testElasticsearchScaler(t *testing.T, kc *kubernetes.Clientset) {
t.Log("--- testing activation ---")
addElements(t, 3)

AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, minReplicaCount, 60)
}

func testScaleOut(t *testing.T, kc *kubernetes.Clientset) {
t.Log("--- testing scale out ---")
addElements(t, 5)

assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, 3),
"replica count should be %d after 3 minutes", maxReplicaCount)
}

func testScaleIn(t *testing.T, kc *kubernetes.Clientset) {
t.Log("--- testing scale in ---")

assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, 3),
Expand Down Expand Up @@ -383,6 +443,5 @@ func getTemplateData() (templateData, []Template) {
{Name: "serviceTemplate", Config: serviceTemplate},
{Name: "elasticsearchDeploymentTemplate", Config: elasticsearchDeploymentTemplate},
{Name: "deploymentTemplate", Config: deploymentTemplate},
{Name: "scaledObjectTemplate", Config: scaledObjectTemplate},
}
}

0 comments on commit a390a91

Please sign in to comment.