Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into fix-5570
Browse files Browse the repository at this point in the history
Signed-off-by: Yury Akudovich <[email protected]>
  • Loading branch information
yorik committed Oct 21, 2024
2 parents e78aa81 + a390a91 commit 571837d
Show file tree
Hide file tree
Showing 23 changed files with 1,310 additions and 388 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **CloudEventSource**: Introduce ClusterCloudEventSource ([#3533](https://github.com/kedacore/keda/issues/3533))
- **CloudEventSource**: Provide ClusterCloudEventSource around the management of ScaledJobs resources ([#3523](https://github.com/kedacore/keda/issues/3523))
- **CloudEventSource**: Provide ClusterCloudEventSource around the management of TriggerAuthentication/ClusterTriggerAuthentication resources ([#3524](https://github.com/kedacore/keda/issues/3524))
- **Github Action**: Fix panic when env for runnerScopeFromEnv or ownerFromEnv is empty ([#6156](https://github.com/kedacore/keda/issues/6156))
- **RabbitMQ Scaler**: provide separate paremeters for user and password ([#2513](https://github.com/kedacore/keda/issues/2513))

#### Experimental

Expand All @@ -71,9 +73,12 @@ Here is an overview of all new **experimental** features:
### Improvements

- **AWS CloudWatch Scaler**: Add support for ignoreNullValues ([#5352](https://github.com/kedacore/keda/issues/5352))
- **Elasticsearch Scaler**: Support Query at the Elasticsearch scaler ([#6216](https://github.com/kedacore/keda/issues/6216))
- **Etcd Scaler**: Add username and password support for etcd ([#6199](https://github.com/kedacore/keda/pull/6199))
- **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778))
- **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738))
- **Grafana dashboard**: Fix dashboard to handle wildcard scaledObject variables ([#6214](https://github.com/kedacore/keda/issues/6214))
- **Kafka**: Allow disabling FAST negotation when using Kerberos ([#6188](https://github.com/kedacore/keda/issues/6188))
- **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689))
- **RabbitMQ Scaler**: Add connection name for AMQP ([#5958](https://github.com/kedacore/keda/issues/5958))
- **Selenium Scaler**: Add Support for Username and Password Authentication ([#6144](https://github.com/kedacore/keda/issues/6144))
Expand Down Expand Up @@ -102,7 +107,7 @@ New deprecation(s):

### Other

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- **Cron scaler**: Simplify cron scaler code ([#6056](https://github.com/kedacore/keda/issues/6056))

## v2.15.1

Expand Down
92 changes: 38 additions & 54 deletions pkg/scalers/cron_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ import (
)

const (
defaultDesiredReplicas = 1
cronMetricType = "External"
cronMetricType = "External"
)

type cronScaler struct {
metricType v2.MetricTargetType
metadata cronMetadata
logger logr.Logger
metricType v2.MetricTargetType
metadata cronMetadata
logger logr.Logger
startSchedule cron.Schedule
endSchedule cron.Schedule
}

type cronMetadata struct {
Expand All @@ -35,21 +36,11 @@ type cronMetadata struct {
}

func (m *cronMetadata) Validate() error {
if m.Timezone == "" {
return fmt.Errorf("no timezone specified")
}

parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
if m.Start == "" {
return fmt.Errorf("no start schedule specified")
}
if _, err := parser.Parse(m.Start); err != nil {
return fmt.Errorf("error parsing start schedule: %w", err)
}

if m.End == "" {
return fmt.Errorf("no end schedule specified")
}
if _, err := parser.Parse(m.End); err != nil {
return fmt.Errorf("error parsing end schedule: %w", err)
}
Expand All @@ -65,7 +56,6 @@ func (m *cronMetadata) Validate() error {
return nil
}

// NewCronScaler creates a new cronScaler
func NewCronScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
Expand All @@ -77,25 +67,23 @@ func NewCronScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
return nil, fmt.Errorf("error parsing cron metadata: %w", err)
}

parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)

startSchedule, _ := parser.Parse(meta.Start)
endSchedule, _ := parser.Parse(meta.End)

return &cronScaler{
metricType: metricType,
metadata: meta,
logger: InitializeLogger(config, "cron_scaler"),
metricType: metricType,
metadata: meta,
logger: InitializeLogger(config, "cron_scaler"),
startSchedule: startSchedule,
endSchedule: endSchedule,
}, nil
}

func getCronTime(location *time.Location, spec string) (int64, error) {
c := cron.New(cron.WithLocation(location))
_, err := c.AddFunc(spec, func() { _ = fmt.Sprintf("Cron initialized for location %s", location.String()) })
if err != nil {
return 0, err
}

c.Start()
cronTime := c.Entries()[0].Next.Unix()
c.Stop()

return cronTime, nil
func getCronTime(location *time.Location, schedule cron.Schedule) time.Time {
// Use the pre-parsed cron schedule directly to get the next time
return schedule.Next(time.Now().In(location))
}

func parseCronMetadata(config *scalersconfig.ScalerConfig) (cronMetadata, error) {
Expand Down Expand Up @@ -131,37 +119,33 @@ func (s *cronScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
return []v2.MetricSpec{metricSpec}
}

// GetMetricsAndActivity returns value for a supported metric and an error if there is a problem getting the metric
func (s *cronScaler) GetMetricsAndActivity(_ context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
var defaultDesiredReplicas = int64(defaultDesiredReplicas)

location, err := time.LoadLocation(s.metadata.Timezone)
if err != nil {
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("unable to load timezone. Error: %w", err)
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("unable to load timezone: %w", err)
}

// Since we are considering the timestamp here and not the exact time, timezone does matter.
currentTime := time.Now().Unix()
currentTime := time.Now().In(location)

nextStartTime, startTimecronErr := getCronTime(location, s.metadata.Start)
if startTimecronErr != nil {
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error initializing start cron: %w", startTimecronErr)
}
// Use the pre-parsed schedules to get the next start and end times
nextStartTime := getCronTime(location, s.startSchedule)
nextEndTime := getCronTime(location, s.endSchedule)

isWithinInterval := false

nextEndTime, endTimecronErr := getCronTime(location, s.metadata.End)
if endTimecronErr != nil {
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error intializing end cron: %w", endTimecronErr)
if nextStartTime.Before(nextEndTime) {
// Interval within the same day
isWithinInterval = currentTime.After(nextStartTime) && currentTime.Before(nextEndTime)
} else {
// Interval spans midnight
isWithinInterval = currentTime.After(nextStartTime) || currentTime.Before(nextEndTime)
}

switch {
case nextStartTime < nextEndTime && currentTime < nextStartTime:
metric := GenerateMetricInMili(metricName, float64(defaultDesiredReplicas))
return []external_metrics.ExternalMetricValue{metric}, false, nil
case currentTime <= nextEndTime:
metric := GenerateMetricInMili(metricName, float64(s.metadata.DesiredReplicas))
return []external_metrics.ExternalMetricValue{metric}, true, nil
default:
metric := GenerateMetricInMili(metricName, float64(defaultDesiredReplicas))
return []external_metrics.ExternalMetricValue{metric}, false, nil
metricValue := float64(1)
if isWithinInterval {
metricValue = float64(s.metadata.DesiredReplicas)
}

metric := GenerateMetricInMili(metricName, metricValue)
return []external_metrics.ExternalMetricValue{metric}, isWithinInterval, nil
}
14 changes: 13 additions & 1 deletion pkg/scalers/cron_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/go-logr/logr"
"github.com/robfig/cron/v3"
"github.com/stretchr/testify/assert"

"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
Expand Down Expand Up @@ -121,7 +122,18 @@ func TestCronGetMetricSpecForScaling(t *testing.T) {
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockCronScaler := cronScaler{"", meta, logr.Discard()}

parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
startSchedule, _ := parser.Parse(meta.Start)
endSchedule, _ := parser.Parse(meta.End)

mockCronScaler := cronScaler{
metricType: "",
metadata: meta,
logger: logr.Discard(),
startSchedule: startSchedule,
endSchedule: endSchedule,
}

metricSpec := mockCronScaler.GetMetricSpecForScaling(context.Background())
metricName := metricSpec[0].External.Metric.Name
Expand Down
48 changes: 37 additions & 11 deletions pkg/scalers/elasticsearch_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"

"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/go-logr/logr"
"github.com/tidwall/gjson"
v2 "k8s.io/api/autoscaling/v2"
Expand All @@ -34,7 +35,8 @@ type elasticsearchMetadata struct {
CloudID string `keda:"name=cloudID, order=authParams;triggerMetadata, optional"`
APIKey string `keda:"name=apiKey, order=authParams;triggerMetadata, optional"`
Index []string `keda:"name=index, order=authParams;triggerMetadata, separator=;"`
SearchTemplateName string `keda:"name=searchTemplateName, order=authParams;triggerMetadata"`
SearchTemplateName string `keda:"name=searchTemplateName, order=authParams;triggerMetadata, optional"`
Query string `keda:"name=query, order=authParams;triggerMetadata, optional"`
Parameters []string `keda:"name=parameters, order=triggerMetadata, optional, separator=;"`
ValueLocation string `keda:"name=valueLocation, order=authParams;triggerMetadata"`
TargetValue float64 `keda:"name=targetValue, order=authParams;triggerMetadata"`
Expand All @@ -57,6 +59,13 @@ func (m *elasticsearchMetadata) Validate() error {
if len(m.Addresses) > 0 && (m.Username == "" || m.Password == "") {
return fmt.Errorf("both username and password must be provided when addresses is used")
}
if m.SearchTemplateName == "" && m.Query == "" {
return fmt.Errorf("either searchTemplateName or query must be provided")
}
if m.SearchTemplateName != "" && m.Query != "" {
return fmt.Errorf("cannot provide both searchTemplateName and query")
}

return nil
}

Expand Down Expand Up @@ -93,7 +102,12 @@ func parseElasticsearchMetadata(config *scalersconfig.ScalerConfig) (elasticsear
return meta, err
}

meta.MetricName = GenerateMetricNameWithIndex(config.TriggerIndex, util.NormalizeString(fmt.Sprintf("elasticsearch-%s", meta.SearchTemplateName)))
if meta.SearchTemplateName != "" {
meta.MetricName = GenerateMetricNameWithIndex(config.TriggerIndex, util.NormalizeString(fmt.Sprintf("elasticsearch-%s", meta.SearchTemplateName)))
} else {
meta.MetricName = GenerateMetricNameWithIndex(config.TriggerIndex, "elasticsearch-query")
}

meta.TriggerIndex = config.TriggerIndex

return meta, nil
Expand Down Expand Up @@ -137,17 +151,29 @@ func (s *elasticsearchScaler) Close(_ context.Context) error {
// getQueryResult returns result of the scaler query
func (s *elasticsearchScaler) getQueryResult(ctx context.Context) (float64, error) {
// Build the request body.
var body bytes.Buffer
if err := json.NewEncoder(&body).Encode(buildQuery(&s.metadata)); err != nil {
s.logger.Error(err, "Error encoding query: %s", err)
var res *esapi.Response
var err error

if s.metadata.SearchTemplateName != "" {
// Using SearchTemplateName
var body bytes.Buffer
if err := json.NewEncoder(&body).Encode(buildQuery(&s.metadata)); err != nil {
s.logger.Error(err, "Error encoding query: %s", err)
}
res, err = s.esClient.SearchTemplate(
&body,
s.esClient.SearchTemplate.WithIndex(s.metadata.Index...),
s.esClient.SearchTemplate.WithContext(ctx),
)
} else {
// Using Query
res, err = s.esClient.Search(
s.esClient.Search.WithIndex(s.metadata.Index...),
s.esClient.Search.WithBody(strings.NewReader(s.metadata.Query)),
s.esClient.Search.WithContext(ctx),
)
}

// Run the templated search
res, err := s.esClient.SearchTemplate(
&body,
s.esClient.SearchTemplate.WithIndex(s.metadata.Index...),
s.esClient.SearchTemplate.WithContext(ctx),
)
if err != nil {
s.logger.Error(err, fmt.Sprintf("Could not query elasticsearch: %s", err))
return 0, err
Expand Down
56 changes: 51 additions & 5 deletions pkg/scalers/elasticsearch_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,34 @@ var testCases = []parseElasticsearchMetadataTestData{
expectedError: fmt.Errorf("missing required parameter \"index\""),
},
{
name: "no searchTemplateName given",
name: "query and searchTemplateName provided",
metadata: map[string]string{
"addresses": "http://localhost:9200",
"index": "index1",
"addresses": "http://localhost:9200",
"index": "index1",
"query": `{"match": {"field": "value"}}`,
"searchTemplateName": "myTemplate",
"valueLocation": "hits.total.value",
"targetValue": "12",
},
authParams: map[string]string{"username": "admin"},
expectedError: fmt.Errorf("missing required parameter \"searchTemplateName\""),
authParams: map[string]string{
"username": "admin",
"password": "password",
},
expectedError: fmt.Errorf("cannot provide both searchTemplateName and query"),
},
{
name: "neither query nor searchTemplateName provided",
metadata: map[string]string{
"addresses": "http://localhost:9200",
"index": "index1",
"valueLocation": "hits.total.value",
"targetValue": "12",
},
authParams: map[string]string{
"username": "admin",
"password": "password",
},
expectedError: fmt.Errorf("either searchTemplateName or query must be provided"),
},
{
name: "no valueLocation given",
Expand Down Expand Up @@ -306,6 +327,31 @@ var testCases = []parseElasticsearchMetadataTestData{
},
expectedError: nil,
},
{
name: "valid query parameter",
metadata: map[string]string{
"addresses": "http://localhost:9200",
"index": "index1",
"query": `{"match": {"field": "value"}}`,
"valueLocation": "hits.total.value",
"targetValue": "12",
},
authParams: map[string]string{
"username": "admin",
"password": "password",
},
expectedMetadata: &elasticsearchMetadata{
Addresses: []string{"http://localhost:9200"},
Index: []string{"index1"},
Username: "admin",
Password: "password",
Query: `{"match": {"field": "value"}}`,
ValueLocation: "hits.total.value",
TargetValue: 12,
MetricName: "s0-elasticsearch-query",
},
expectedError: nil,
},
}

func TestParseElasticsearchMetadata(t *testing.T) {
Expand Down
Loading

0 comments on commit 571837d

Please sign in to comment.