diff --git a/CHANGELOG.md b/CHANGELOG.md index ae231f28b90..78ed7354e6f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -61,6 +61,8 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - **CloudEventSource**: Introduce ClusterCloudEventSource ([#3533](https://github.com/kedacore/keda/issues/3533)) - **CloudEventSource**: Provide ClusterCloudEventSource around the management of ScaledJobs resources ([#3523](https://github.com/kedacore/keda/issues/3523)) - **CloudEventSource**: Provide ClusterCloudEventSource around the management of TriggerAuthentication/ClusterTriggerAuthentication resources ([#3524](https://github.com/kedacore/keda/issues/3524)) +- **Github Action**: Fix panic when env for runnerScopeFromEnv or ownerFromEnv is empty ([#6156](https://github.com/kedacore/keda/issues/6156)) +- **RabbitMQ Scaler**: provide separate paremeters for user and password ([#2513](https://github.com/kedacore/keda/issues/2513)) #### Experimental @@ -71,9 +73,12 @@ Here is an overview of all new **experimental** features: ### Improvements - **AWS CloudWatch Scaler**: Add support for ignoreNullValues ([#5352](https://github.com/kedacore/keda/issues/5352)) +- **Elasticsearch Scaler**: Support Query at the Elasticsearch scaler ([#6216](https://github.com/kedacore/keda/issues/6216)) +- **Etcd Scaler**: Add username and password support for etcd ([#6199](https://github.com/kedacore/keda/pull/6199)) - **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778)) - **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738)) - **Grafana dashboard**: Fix dashboard to handle wildcard scaledObject variables ([#6214](https://github.com/kedacore/keda/issues/6214)) +- **Kafka**: Allow disabling FAST negotation when using Kerberos ([#6188](https://github.com/kedacore/keda/issues/6188)) - **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689)) - **RabbitMQ Scaler**: Add connection name for AMQP ([#5958](https://github.com/kedacore/keda/issues/5958)) - **Selenium Scaler**: Add Support for Username and Password Authentication ([#6144](https://github.com/kedacore/keda/issues/6144)) @@ -102,7 +107,7 @@ New deprecation(s): ### Other -- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) +- **Cron scaler**: Simplify cron scaler code ([#6056](https://github.com/kedacore/keda/issues/6056)) ## v2.15.1 diff --git a/pkg/scalers/cron_scaler.go b/pkg/scalers/cron_scaler.go index bd9c5a22a7d..8cf4c35ba1a 100644 --- a/pkg/scalers/cron_scaler.go +++ b/pkg/scalers/cron_scaler.go @@ -16,14 +16,15 @@ import ( ) const ( - defaultDesiredReplicas = 1 - cronMetricType = "External" + cronMetricType = "External" ) type cronScaler struct { - metricType v2.MetricTargetType - metadata cronMetadata - logger logr.Logger + metricType v2.MetricTargetType + metadata cronMetadata + logger logr.Logger + startSchedule cron.Schedule + endSchedule cron.Schedule } type cronMetadata struct { @@ -35,21 +36,11 @@ type cronMetadata struct { } func (m *cronMetadata) Validate() error { - if m.Timezone == "" { - return fmt.Errorf("no timezone specified") - } - parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) - if m.Start == "" { - return fmt.Errorf("no start schedule specified") - } if _, err := parser.Parse(m.Start); err != nil { return fmt.Errorf("error parsing start schedule: %w", err) } - if m.End == "" { - return fmt.Errorf("no end schedule specified") - } if _, err := parser.Parse(m.End); err != nil { return fmt.Errorf("error parsing end schedule: %w", err) } @@ -65,7 +56,6 @@ func (m *cronMetadata) Validate() error { return nil } -// NewCronScaler creates a new cronScaler func NewCronScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { metricType, err := GetMetricTargetType(config) if err != nil { @@ -77,25 +67,23 @@ func NewCronScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { return nil, fmt.Errorf("error parsing cron metadata: %w", err) } + parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) + + startSchedule, _ := parser.Parse(meta.Start) + endSchedule, _ := parser.Parse(meta.End) + return &cronScaler{ - metricType: metricType, - metadata: meta, - logger: InitializeLogger(config, "cron_scaler"), + metricType: metricType, + metadata: meta, + logger: InitializeLogger(config, "cron_scaler"), + startSchedule: startSchedule, + endSchedule: endSchedule, }, nil } -func getCronTime(location *time.Location, spec string) (int64, error) { - c := cron.New(cron.WithLocation(location)) - _, err := c.AddFunc(spec, func() { _ = fmt.Sprintf("Cron initialized for location %s", location.String()) }) - if err != nil { - return 0, err - } - - c.Start() - cronTime := c.Entries()[0].Next.Unix() - c.Stop() - - return cronTime, nil +func getCronTime(location *time.Location, schedule cron.Schedule) time.Time { + // Use the pre-parsed cron schedule directly to get the next time + return schedule.Next(time.Now().In(location)) } func parseCronMetadata(config *scalersconfig.ScalerConfig) (cronMetadata, error) { @@ -131,37 +119,33 @@ func (s *cronScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { return []v2.MetricSpec{metricSpec} } -// GetMetricsAndActivity returns value for a supported metric and an error if there is a problem getting the metric func (s *cronScaler) GetMetricsAndActivity(_ context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { - var defaultDesiredReplicas = int64(defaultDesiredReplicas) - location, err := time.LoadLocation(s.metadata.Timezone) if err != nil { - return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("unable to load timezone. Error: %w", err) + return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("unable to load timezone: %w", err) } - // Since we are considering the timestamp here and not the exact time, timezone does matter. - currentTime := time.Now().Unix() + currentTime := time.Now().In(location) - nextStartTime, startTimecronErr := getCronTime(location, s.metadata.Start) - if startTimecronErr != nil { - return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error initializing start cron: %w", startTimecronErr) - } + // Use the pre-parsed schedules to get the next start and end times + nextStartTime := getCronTime(location, s.startSchedule) + nextEndTime := getCronTime(location, s.endSchedule) + + isWithinInterval := false - nextEndTime, endTimecronErr := getCronTime(location, s.metadata.End) - if endTimecronErr != nil { - return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error intializing end cron: %w", endTimecronErr) + if nextStartTime.Before(nextEndTime) { + // Interval within the same day + isWithinInterval = currentTime.After(nextStartTime) && currentTime.Before(nextEndTime) + } else { + // Interval spans midnight + isWithinInterval = currentTime.After(nextStartTime) || currentTime.Before(nextEndTime) } - switch { - case nextStartTime < nextEndTime && currentTime < nextStartTime: - metric := GenerateMetricInMili(metricName, float64(defaultDesiredReplicas)) - return []external_metrics.ExternalMetricValue{metric}, false, nil - case currentTime <= nextEndTime: - metric := GenerateMetricInMili(metricName, float64(s.metadata.DesiredReplicas)) - return []external_metrics.ExternalMetricValue{metric}, true, nil - default: - metric := GenerateMetricInMili(metricName, float64(defaultDesiredReplicas)) - return []external_metrics.ExternalMetricValue{metric}, false, nil + metricValue := float64(1) + if isWithinInterval { + metricValue = float64(s.metadata.DesiredReplicas) } + + metric := GenerateMetricInMili(metricName, metricValue) + return []external_metrics.ExternalMetricValue{metric}, isWithinInterval, nil } diff --git a/pkg/scalers/cron_scaler_test.go b/pkg/scalers/cron_scaler_test.go index 2b9f5b810f3..d0b67ec2bed 100644 --- a/pkg/scalers/cron_scaler_test.go +++ b/pkg/scalers/cron_scaler_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/go-logr/logr" + "github.com/robfig/cron/v3" "github.com/stretchr/testify/assert" "github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" @@ -121,7 +122,18 @@ func TestCronGetMetricSpecForScaling(t *testing.T) { if err != nil { t.Fatal("Could not parse metadata:", err) } - mockCronScaler := cronScaler{"", meta, logr.Discard()} + + parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) + startSchedule, _ := parser.Parse(meta.Start) + endSchedule, _ := parser.Parse(meta.End) + + mockCronScaler := cronScaler{ + metricType: "", + metadata: meta, + logger: logr.Discard(), + startSchedule: startSchedule, + endSchedule: endSchedule, + } metricSpec := mockCronScaler.GetMetricSpecForScaling(context.Background()) metricName := metricSpec[0].External.Metric.Name diff --git a/pkg/scalers/elasticsearch_scaler.go b/pkg/scalers/elasticsearch_scaler.go index 44a27e8e463..70e2030d9bf 100644 --- a/pkg/scalers/elasticsearch_scaler.go +++ b/pkg/scalers/elasticsearch_scaler.go @@ -10,6 +10,7 @@ import ( "strings" "github.com/elastic/go-elasticsearch/v7" + "github.com/elastic/go-elasticsearch/v7/esapi" "github.com/go-logr/logr" "github.com/tidwall/gjson" v2 "k8s.io/api/autoscaling/v2" @@ -34,7 +35,8 @@ type elasticsearchMetadata struct { CloudID string `keda:"name=cloudID, order=authParams;triggerMetadata, optional"` APIKey string `keda:"name=apiKey, order=authParams;triggerMetadata, optional"` Index []string `keda:"name=index, order=authParams;triggerMetadata, separator=;"` - SearchTemplateName string `keda:"name=searchTemplateName, order=authParams;triggerMetadata"` + SearchTemplateName string `keda:"name=searchTemplateName, order=authParams;triggerMetadata, optional"` + Query string `keda:"name=query, order=authParams;triggerMetadata, optional"` Parameters []string `keda:"name=parameters, order=triggerMetadata, optional, separator=;"` ValueLocation string `keda:"name=valueLocation, order=authParams;triggerMetadata"` TargetValue float64 `keda:"name=targetValue, order=authParams;triggerMetadata"` @@ -57,6 +59,13 @@ func (m *elasticsearchMetadata) Validate() error { if len(m.Addresses) > 0 && (m.Username == "" || m.Password == "") { return fmt.Errorf("both username and password must be provided when addresses is used") } + if m.SearchTemplateName == "" && m.Query == "" { + return fmt.Errorf("either searchTemplateName or query must be provided") + } + if m.SearchTemplateName != "" && m.Query != "" { + return fmt.Errorf("cannot provide both searchTemplateName and query") + } + return nil } @@ -93,7 +102,12 @@ func parseElasticsearchMetadata(config *scalersconfig.ScalerConfig) (elasticsear return meta, err } - meta.MetricName = GenerateMetricNameWithIndex(config.TriggerIndex, util.NormalizeString(fmt.Sprintf("elasticsearch-%s", meta.SearchTemplateName))) + if meta.SearchTemplateName != "" { + meta.MetricName = GenerateMetricNameWithIndex(config.TriggerIndex, util.NormalizeString(fmt.Sprintf("elasticsearch-%s", meta.SearchTemplateName))) + } else { + meta.MetricName = GenerateMetricNameWithIndex(config.TriggerIndex, "elasticsearch-query") + } + meta.TriggerIndex = config.TriggerIndex return meta, nil @@ -137,17 +151,29 @@ func (s *elasticsearchScaler) Close(_ context.Context) error { // getQueryResult returns result of the scaler query func (s *elasticsearchScaler) getQueryResult(ctx context.Context) (float64, error) { // Build the request body. - var body bytes.Buffer - if err := json.NewEncoder(&body).Encode(buildQuery(&s.metadata)); err != nil { - s.logger.Error(err, "Error encoding query: %s", err) + var res *esapi.Response + var err error + + if s.metadata.SearchTemplateName != "" { + // Using SearchTemplateName + var body bytes.Buffer + if err := json.NewEncoder(&body).Encode(buildQuery(&s.metadata)); err != nil { + s.logger.Error(err, "Error encoding query: %s", err) + } + res, err = s.esClient.SearchTemplate( + &body, + s.esClient.SearchTemplate.WithIndex(s.metadata.Index...), + s.esClient.SearchTemplate.WithContext(ctx), + ) + } else { + // Using Query + res, err = s.esClient.Search( + s.esClient.Search.WithIndex(s.metadata.Index...), + s.esClient.Search.WithBody(strings.NewReader(s.metadata.Query)), + s.esClient.Search.WithContext(ctx), + ) } - // Run the templated search - res, err := s.esClient.SearchTemplate( - &body, - s.esClient.SearchTemplate.WithIndex(s.metadata.Index...), - s.esClient.SearchTemplate.WithContext(ctx), - ) if err != nil { s.logger.Error(err, fmt.Sprintf("Could not query elasticsearch: %s", err)) return 0, err diff --git a/pkg/scalers/elasticsearch_scaler_test.go b/pkg/scalers/elasticsearch_scaler_test.go index 95725065703..2e6966f0422 100644 --- a/pkg/scalers/elasticsearch_scaler_test.go +++ b/pkg/scalers/elasticsearch_scaler_test.go @@ -73,13 +73,34 @@ var testCases = []parseElasticsearchMetadataTestData{ expectedError: fmt.Errorf("missing required parameter \"index\""), }, { - name: "no searchTemplateName given", + name: "query and searchTemplateName provided", metadata: map[string]string{ - "addresses": "http://localhost:9200", - "index": "index1", + "addresses": "http://localhost:9200", + "index": "index1", + "query": `{"match": {"field": "value"}}`, + "searchTemplateName": "myTemplate", + "valueLocation": "hits.total.value", + "targetValue": "12", }, - authParams: map[string]string{"username": "admin"}, - expectedError: fmt.Errorf("missing required parameter \"searchTemplateName\""), + authParams: map[string]string{ + "username": "admin", + "password": "password", + }, + expectedError: fmt.Errorf("cannot provide both searchTemplateName and query"), + }, + { + name: "neither query nor searchTemplateName provided", + metadata: map[string]string{ + "addresses": "http://localhost:9200", + "index": "index1", + "valueLocation": "hits.total.value", + "targetValue": "12", + }, + authParams: map[string]string{ + "username": "admin", + "password": "password", + }, + expectedError: fmt.Errorf("either searchTemplateName or query must be provided"), }, { name: "no valueLocation given", @@ -306,6 +327,31 @@ var testCases = []parseElasticsearchMetadataTestData{ }, expectedError: nil, }, + { + name: "valid query parameter", + metadata: map[string]string{ + "addresses": "http://localhost:9200", + "index": "index1", + "query": `{"match": {"field": "value"}}`, + "valueLocation": "hits.total.value", + "targetValue": "12", + }, + authParams: map[string]string{ + "username": "admin", + "password": "password", + }, + expectedMetadata: &elasticsearchMetadata{ + Addresses: []string{"http://localhost:9200"}, + Index: []string{"index1"}, + Username: "admin", + Password: "password", + Query: `{"match": {"field": "value"}}`, + ValueLocation: "hits.total.value", + TargetValue: 12, + MetricName: "s0-elasticsearch-query", + }, + expectedError: nil, + }, } func TestParseElasticsearchMetadata(t *testing.T) { diff --git a/pkg/scalers/etcd_scaler.go b/pkg/scalers/etcd_scaler.go index 83835bed93e..eed57a3a119 100644 --- a/pkg/scalers/etcd_scaler.go +++ b/pkg/scalers/etcd_scaler.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "strconv" - "strings" "time" "github.com/go-logr/logr" @@ -38,18 +37,49 @@ type etcdScaler struct { } type etcdMetadata struct { - endpoints []string - watchKey string - value float64 - activationValue float64 - watchProgressNotifyInterval int - triggerIndex int + triggerIndex int + + Endpoints []string `keda:"name=endpoints, order=triggerMetadata"` + WatchKey string `keda:"name=watchKey, order=triggerMetadata"` + Value float64 `keda:"name=value, order=triggerMetadata"` + ActivationValue float64 `keda:"name=activationValue, order=triggerMetadata, optional, default=0"` + WatchProgressNotifyInterval int `keda:"name=watchProgressNotifyInterval, order=triggerMetadata, optional, default=600"` + + Username string `keda:"name=username,order=authParams;resolvedEnv, optional"` + Password string `keda:"name=password,order=authParams;resolvedEnv, optional"` + // TLS - enableTLS bool - cert string - key string - keyPassword string - ca string + EnableTLS string `keda:"name=tls, order=authParams, optional, default=disable"` + Cert string `keda:"name=cert, order=authParams, optional"` + Key string `keda:"name=key, order=authParams, optional"` + KeyPassword string `keda:"name=keyPassword, order=authParams, optional"` + Ca string `keda:"name=ca, order=authParams, optional"` +} + +func (meta *etcdMetadata) Validate() error { + if meta.WatchProgressNotifyInterval <= 0 { + return errors.New("watchProgressNotifyInterval must be greater than 0") + } + + if meta.EnableTLS == etcdTLSEnable { + if meta.Cert == "" && meta.Key != "" { + return errors.New("cert must be provided with key") + } + if meta.Key == "" && meta.Cert != "" { + return errors.New("key must be provided with cert") + } + } else if meta.EnableTLS != etcdTLSDisable { + return fmt.Errorf("incorrect value for TLS given: %s", meta.EnableTLS) + } + + if meta.Password != "" && meta.Username == "" { + return errors.New("username must be provided with password") + } + if meta.Username != "" && meta.Password == "" { + return errors.New("password must be provided with username") + } + + return nil } // NewEtcdScaler creates a new etcdScaler @@ -76,75 +106,11 @@ func NewEtcdScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { }, nil } -func parseEtcdAuthParams(config *scalersconfig.ScalerConfig, meta *etcdMetadata) error { - meta.enableTLS = false - if val, ok := config.AuthParams["tls"]; ok { - val = strings.TrimSpace(val) - if val == etcdTLSEnable { - certGiven := config.AuthParams["cert"] != "" - keyGiven := config.AuthParams["key"] != "" - if certGiven && !keyGiven { - return errors.New("key must be provided with cert") - } - if keyGiven && !certGiven { - return errors.New("cert must be provided with key") - } - meta.ca = config.AuthParams["ca"] - meta.cert = config.AuthParams["cert"] - meta.key = config.AuthParams["key"] - if value, found := config.AuthParams["keyPassword"]; found { - meta.keyPassword = value - } else { - meta.keyPassword = "" - } - meta.enableTLS = true - } else if val != etcdTLSDisable { - return fmt.Errorf("err incorrect value for TLS given: %s", val) - } - } - - return nil -} - func parseEtcdMetadata(config *scalersconfig.ScalerConfig) (*etcdMetadata, error) { meta := &etcdMetadata{} - var err error - meta.endpoints = strings.Split(config.TriggerMetadata[endpoints], ",") - if len(meta.endpoints) == 0 || meta.endpoints[0] == "" { - return nil, fmt.Errorf("endpoints required") - } - - meta.watchKey = config.TriggerMetadata[watchKey] - if len(meta.watchKey) == 0 { - return nil, fmt.Errorf("watchKey required") - } - - value, err := strconv.ParseFloat(config.TriggerMetadata[value], 64) - if err != nil || value <= 0 { - return nil, fmt.Errorf("value must be a float greater than 0") - } - meta.value = value - - meta.activationValue = 0 - if val, ok := config.TriggerMetadata[activationValue]; ok { - activationValue, err := strconv.ParseFloat(val, 64) - if err != nil { - return nil, fmt.Errorf("activationValue must be a float") - } - meta.activationValue = activationValue - } - - meta.watchProgressNotifyInterval = defaultWatchProgressNotifyInterval - if val, ok := config.TriggerMetadata[watchProgressNotifyInterval]; ok { - interval, err := strconv.Atoi(val) - if err != nil || interval <= 0 { - return nil, fmt.Errorf("watchProgressNotifyInterval must be a int greater than 0") - } - meta.watchProgressNotifyInterval = interval - } - if err = parseEtcdAuthParams(config, meta); err != nil { - return meta, err + if err := config.TypedConfig(meta); err != nil { + return nil, fmt.Errorf("error parsing redis metadata: %w", err) } meta.triggerIndex = config.TriggerIndex @@ -154,17 +120,19 @@ func parseEtcdMetadata(config *scalersconfig.ScalerConfig) (*etcdMetadata, error func getEtcdClients(metadata *etcdMetadata) (*clientv3.Client, error) { var tlsConfig *tls.Config var err error - if metadata.enableTLS { - tlsConfig, err = kedautil.NewTLSConfigWithPassword(metadata.cert, metadata.key, metadata.keyPassword, metadata.ca, false) + if metadata.EnableTLS == etcdTLSEnable { + tlsConfig, err = kedautil.NewTLSConfigWithPassword(metadata.Cert, metadata.Key, metadata.KeyPassword, metadata.Ca, false) if err != nil { return nil, err } } cli, err := clientv3.New(clientv3.Config{ - Endpoints: metadata.endpoints, + Endpoints: metadata.Endpoints, DialTimeout: 5 * time.Second, TLS: tlsConfig, + Username: metadata.Username, + Password: metadata.Password, }) if err != nil { return nil, fmt.Errorf("error connecting to etcd server: %w", err) @@ -189,16 +157,16 @@ func (s *etcdScaler) GetMetricsAndActivity(ctx context.Context, metricName strin } metric := GenerateMetricInMili(metricName, v) - return append([]external_metrics.ExternalMetricValue{}, metric), v > s.metadata.activationValue, nil + return append([]external_metrics.ExternalMetricValue{}, metric), v > s.metadata.ActivationValue, nil } // GetMetricSpecForScaling returns the metric spec for the HPA. func (s *etcdScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { externalMetric := &v2.ExternalMetricSource{ Metric: v2.MetricIdentifier{ - Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("etcd-%s", s.metadata.watchKey))), + Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("etcd-%s", s.metadata.WatchKey))), }, - Target: GetMetricTargetMili(s.metricType, s.metadata.value), + Target: GetMetricTargetMili(s.metricType, s.metadata.Value), } metricSpec := v2.MetricSpec{External: externalMetric, Type: etcdMetricType} return []v2.MetricSpec{metricSpec} @@ -209,16 +177,16 @@ func (s *etcdScaler) Run(ctx context.Context, active chan<- bool) { // It's possible for the watch to get terminated anytime, we need to run this in a retry loop runWithWatch := func() { - s.logger.Info("run watch", "watchKey", s.metadata.watchKey, "endpoints", s.metadata.endpoints) + s.logger.Info("run watch", "watchKey", s.metadata.WatchKey, "endpoints", s.metadata.Endpoints) subCtx, cancel := context.WithCancel(ctx) subCtx = clientv3.WithRequireLeader(subCtx) - rch := s.client.Watch(subCtx, s.metadata.watchKey, clientv3.WithProgressNotify()) + rch := s.client.Watch(subCtx, s.metadata.WatchKey, clientv3.WithProgressNotify()) // rewatch to another etcd server when the network is isolated from the current etcd server. progress := make(chan bool) defer close(progress) go func() { - delayDuration := time.Duration(s.metadata.watchProgressNotifyInterval) * 2 * time.Second + delayDuration := time.Duration(s.metadata.WatchProgressNotifyInterval) * 2 * time.Second delay := time.NewTimer(delayDuration) defer delay.Stop() for { @@ -228,7 +196,7 @@ func (s *etcdScaler) Run(ctx context.Context, active chan<- bool) { case <-subCtx.Done(): return case <-delay.C: - s.logger.Info("no watch progress notification in the interval", "watchKey", s.metadata.watchKey, "endpoints", s.metadata.endpoints) + s.logger.Info("no watch progress notification in the interval", "watchKey", s.metadata.WatchKey, "endpoints", s.metadata.Endpoints) cancel() return } @@ -240,7 +208,7 @@ func (s *etcdScaler) Run(ctx context.Context, active chan<- bool) { // rewatch to another etcd server when there is an error form the current etcd server, such as 'no leader','required revision has been compacted' if wresp.Err() != nil { - s.logger.Error(wresp.Err(), "an error occurred in the watch process", "watchKey", s.metadata.watchKey, "endpoints", s.metadata.endpoints) + s.logger.Error(wresp.Err(), "an error occurred in the watch process", "watchKey", s.metadata.WatchKey, "endpoints", s.metadata.Endpoints) cancel() return } @@ -251,7 +219,7 @@ func (s *etcdScaler) Run(ctx context.Context, active chan<- bool) { s.logger.Error(err, "etcdValue invalid will be treated as 0") v = 0 } - active <- v > s.metadata.activationValue + active <- v > s.metadata.ActivationValue } } } @@ -288,12 +256,12 @@ func (s *etcdScaler) Run(ctx context.Context, active chan<- bool) { func (s *etcdScaler) getMetricValue(ctx context.Context) (float64, error) { ctx, cancel := context.WithTimeout(ctx, time.Second*5) defer cancel() - resp, err := s.client.Get(ctx, s.metadata.watchKey) + resp, err := s.client.Get(ctx, s.metadata.WatchKey) if err != nil { return 0, err } if resp.Kvs == nil { - return 0, fmt.Errorf("watchKey %s doesn't exist", s.metadata.watchKey) + return 0, fmt.Errorf("watchKey %s doesn't exist", s.metadata.WatchKey) } v, err := strconv.ParseFloat(string(resp.Kvs[0].Value), 64) if err != nil { diff --git a/pkg/scalers/etcd_scaler_test.go b/pkg/scalers/etcd_scaler_test.go index 9ecdb5b602a..70f16e2fec8 100644 --- a/pkg/scalers/etcd_scaler_test.go +++ b/pkg/scalers/etcd_scaler_test.go @@ -19,7 +19,7 @@ type parseEtcdMetadataTestData struct { type parseEtcdAuthParamsTestData struct { authParams map[string]string isError bool - enableTLS bool + enableTLS string } type etcdMetricIdentifier struct { @@ -56,19 +56,25 @@ var parseEtcdMetadataTestDataset = []parseEtcdMetadataTestData{ var parseEtcdAuthParamsTestDataset = []parseEtcdAuthParamsTestData{ // success, TLS only - {map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert", "key": "keey"}, false, true}, + {map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert", "key": "keey"}, false, etcdTLSEnable}, // success, TLS cert/key and assumed public CA - {map[string]string{"tls": "enable", "cert": "ceert", "key": "keey"}, false, true}, + {map[string]string{"tls": "enable", "cert": "ceert", "key": "keey"}, false, etcdTLSEnable}, // success, TLS cert/key + key password and assumed public CA - {map[string]string{"tls": "enable", "cert": "ceert", "key": "keey", "keyPassword": "keeyPassword"}, false, true}, + {map[string]string{"tls": "enable", "cert": "ceert", "key": "keey", "keyPassword": "keeyPassword"}, false, etcdTLSEnable}, // success, TLS CA only - {map[string]string{"tls": "enable", "ca": "caaa"}, false, true}, + {map[string]string{"tls": "enable", "ca": "caaa"}, false, etcdTLSEnable}, // failure, TLS missing cert - {map[string]string{"tls": "enable", "ca": "caaa", "key": "keey"}, true, false}, + {map[string]string{"tls": "enable", "ca": "caaa", "key": "keey"}, true, etcdTLSDisable}, // failure, TLS missing key - {map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert"}, true, false}, + {map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert"}, true, etcdTLSDisable}, // failure, TLS invalid - {map[string]string{"tls": "yes", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, false}, + {map[string]string{"tls": "yes", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, etcdTLSDisable}, + // success, username and password + {map[string]string{"username": "root", "password": "admin"}, false, etcdTLSDisable}, + // failure, missing password + {map[string]string{"username": "root"}, true, etcdTLSDisable}, + // failure, missing username + {map[string]string{"password": "admin"}, true, etcdTLSDisable}, } var etcdMetricIdentifiers = []etcdMetricIdentifier{ @@ -83,10 +89,10 @@ func TestParseEtcdMetadata(t *testing.T) { t.Error("Expected success but got error", err) } if testData.isError && err == nil { - t.Error("Expected error but got success") + t.Errorf("Expected error but got success %v", testData) } - if err == nil && !reflect.DeepEqual(meta.endpoints, testData.endpoints) { - t.Errorf("Expected %v but got %v\n", testData.endpoints, meta.endpoints) + if err == nil && !reflect.DeepEqual(meta.Endpoints, testData.endpoints) { + t.Errorf("Expected %v but got %v\n", testData.endpoints, meta.Endpoints) } } } @@ -101,21 +107,21 @@ func TestParseEtcdAuthParams(t *testing.T) { if testData.isError && err == nil { t.Error("Expected error but got success") } - if meta.enableTLS != testData.enableTLS { - t.Errorf("Expected enableTLS to be set to %v but got %v\n", testData.enableTLS, meta.enableTLS) + if meta != nil && meta.EnableTLS != testData.enableTLS { + t.Errorf("Expected enableTLS to be set to %v but got %v\n", testData.enableTLS, meta.EnableTLS) } - if meta.enableTLS { - if meta.ca != testData.authParams["ca"] { - t.Errorf("Expected ca to be set to %v but got %v\n", testData.authParams["ca"], meta.enableTLS) + if meta != nil && meta.EnableTLS == etcdTLSEnable { + if meta.Ca != testData.authParams["ca"] { + t.Errorf("Expected ca to be set to %v but got %v\n", testData.authParams["ca"], meta.EnableTLS) } - if meta.cert != testData.authParams["cert"] { - t.Errorf("Expected cert to be set to %v but got %v\n", testData.authParams["cert"], meta.cert) + if meta.Cert != testData.authParams["cert"] { + t.Errorf("Expected cert to be set to %v but got %v\n", testData.authParams["cert"], meta.Cert) } - if meta.key != testData.authParams["key"] { - t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["key"], meta.key) + if meta.Key != testData.authParams["key"] { + t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["key"], meta.Key) } - if meta.keyPassword != testData.authParams["keyPassword"] { - t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["keyPassword"], meta.key) + if meta.KeyPassword != testData.authParams["keyPassword"] { + t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["keyPassword"], meta.Key) } } } diff --git a/pkg/scalers/github_runner_scaler.go b/pkg/scalers/github_runner_scaler.go index d53cd21bd2e..c9c93c501b1 100644 --- a/pkg/scalers/github_runner_scaler.go +++ b/pkg/scalers/github_runner_scaler.go @@ -362,8 +362,12 @@ func getValueFromMetaOrEnv(key string, metadata map[string]string, env map[strin if val, ok := metadata[key]; ok && val != "" { return val, nil } else if val, ok := metadata[key+"FromEnv"]; ok && val != "" { - return env[val], nil + if envVal, ok := env[val]; ok && envVal != "" { + return envVal, nil + } + return "", fmt.Errorf("%s %s env variable value is empty", key, val) } + return "", fmt.Errorf("no %s given", key) } @@ -444,12 +448,14 @@ func setupGitHubApp(config *scalersconfig.ScalerConfig) (*int64, *int64, *string var instID *int64 var appKey *string - if val, err := getInt64ValueFromMetaOrEnv("applicationID", config); err == nil && val != -1 { - appID = &val + appIDVal, appIDErr := getInt64ValueFromMetaOrEnv("applicationID", config) + if appIDErr == nil && appIDVal != -1 { + appID = &appIDVal } - if val, err := getInt64ValueFromMetaOrEnv("installationID", config); err == nil && val != -1 { - instID = &val + instIDVal, instIDErr := getInt64ValueFromMetaOrEnv("installationID", config) + if instIDErr == nil && instIDVal != -1 { + instID = &instIDVal } if val, ok := config.AuthParams["appKey"]; ok && val != "" { @@ -458,7 +464,15 @@ func setupGitHubApp(config *scalersconfig.ScalerConfig) (*int64, *int64, *string if (appID != nil || instID != nil || appKey != nil) && (appID == nil || instID == nil || appKey == nil) { - return nil, nil, nil, fmt.Errorf("applicationID, installationID and applicationKey must be given") + if appIDErr != nil { + return nil, nil, nil, appIDErr + } + + if instIDErr != nil { + return nil, nil, nil, instIDErr + } + + return nil, nil, nil, fmt.Errorf("no applicationKey given") } return appID, instID, appKey, nil diff --git a/pkg/scalers/github_runner_scaler_test.go b/pkg/scalers/github_runner_scaler_test.go index fc1babdddc2..808ac78562c 100644 --- a/pkg/scalers/github_runner_scaler_test.go +++ b/pkg/scalers/github_runner_scaler_test.go @@ -73,9 +73,9 @@ var testGitHubRunnerMetadata = []parseGitHubRunnerMetadataTestData{ // empty token {"empty targetWorkflowQueueLength", map[string]string{"githubApiURL": "https://api.github.com", "runnerScope": REPO, "owner": "ownername", "repos": "reponame"}, true, false, ""}, // missing installationID From Env - {"missing installationID Env", map[string]string{"githubApiURL": "https://api.github.com", "runnerScope": ORG, "owner": "ownername", "repos": "reponame,otherrepo", "labels": "golang", "targetWorkflowQueueLength": "1", "applicationIDFromEnv": "APP_ID"}, true, true, "applicationID, installationID and applicationKey must be given"}, + {"missing installationID Env", map[string]string{"githubApiURL": "https://api.github.com", "runnerScope": ORG, "owner": "ownername", "repos": "reponame,otherrepo", "labels": "golang", "targetWorkflowQueueLength": "1", "applicationIDFromEnv": "APP_ID"}, true, true, "error parsing installationID: no installationID given"}, // missing applicationID From Env - {"missing applicationId Env", map[string]string{"githubApiURL": "https://api.github.com", "runnerScope": ORG, "owner": "ownername", "repos": "reponame,otherrepo", "labels": "golang", "targetWorkflowQueueLength": "1", "installationIDFromEnv": "INST_ID"}, true, true, "applicationID, installationID and applicationKey must be given"}, + {"missing applicationID Env", map[string]string{"githubApiURL": "https://api.github.com", "runnerScope": ORG, "owner": "ownername", "repos": "reponame,otherrepo", "labels": "golang", "targetWorkflowQueueLength": "1", "installationIDFromEnv": "INST_ID"}, true, true, "error parsing applicationID: no applicationID given"}, // nothing passed {"empty, no envs", map[string]string{}, false, true, "no runnerScope given"}, // empty githubApiURL @@ -105,11 +105,15 @@ var testGitHubRunnerMetadata = []parseGitHubRunnerMetadataTestData{ // empty repos, no envs {"empty repos, no envs", map[string]string{"githubApiURL": "https://api.github.com", "runnerScope": ORG, "owner": "ownername", "labels": "golang", "repos": "", "targetWorkflowQueueLength": "1"}, false, false, ""}, // missing installationID - {"missing installationID", map[string]string{"githubApiURL": "https://api.github.com", "runnerScope": ORG, "owner": "ownername", "repos": "reponame,otherrepo", "labels": "golang", "targetWorkflowQueueLength": "1", "applicationID": "1"}, true, true, "applicationID, installationID and applicationKey must be given"}, + {"missing installationID", map[string]string{"githubApiURL": "https://api.github.com", "runnerScope": ORG, "owner": "ownername", "repos": "reponame,otherrepo", "labels": "golang", "targetWorkflowQueueLength": "1", "applicationID": "1"}, true, true, "error parsing installationID: no installationID given"}, // missing applicationID - {"missing applicationID", map[string]string{"githubApiURL": "https://api.github.com", "runnerScope": ORG, "owner": "ownername", "repos": "reponame,otherrepo", "labels": "golang", "targetWorkflowQueueLength": "1", "installationID": "1"}, true, true, "applicationID, installationID and applicationKey must be given"}, + {"missing applicationID", map[string]string{"githubApiURL": "https://api.github.com", "runnerScope": ORG, "owner": "ownername", "repos": "reponame,otherrepo", "labels": "golang", "targetWorkflowQueueLength": "1", "installationID": "1"}, true, true, "error parsing applicationID: no applicationID given"}, // all good - {"missing applicationKey", map[string]string{"githubApiURL": "https://api.github.com", "runnerScope": ORG, "owner": "ownername", "repos": "reponame,otherrepo", "labels": "golang", "targetWorkflowQueueLength": "1", "applicationID": "1", "installationID": "1"}, true, true, "applicationID, installationID and applicationKey must be given"}, + {"missing applicationKey", map[string]string{"githubApiURL": "https://api.github.com", "runnerScope": ORG, "owner": "ownername", "repos": "reponame,otherrepo", "labels": "golang", "targetWorkflowQueueLength": "1", "applicationID": "1", "installationID": "1"}, true, true, "no applicationKey given"}, + {"missing runnerScope Env", map[string]string{"githubApiURL": "https://api.github.com", "owner": "ownername", "repos": "reponame,otherrepo", "labels": "golang", "targetWorkflowQueueLength": "1", "runnerScopeFromEnv": "EMPTY"}, true, true, "runnerScope EMPTY env variable value is empty"}, + {"missing owner Env", map[string]string{"githubApiURL": "https://api.github.com", "runnerScope": ORG, "repos": "reponame,otherrepo", "labels": "golang", "targetWorkflowQueueLength": "1", "ownerFromEnv": "EMPTY"}, true, true, "owner EMPTY env variable value is empty"}, + {"wrong applicationID", map[string]string{"githubApiURL": "https://api.github.com", "runnerScope": ORG, "owner": "ownername", "repos": "reponame,otherrepo", "labels": "golang", "targetWorkflowQueueLength": "1", "applicationID": "id", "installationID": "1"}, true, true, "error parsing applicationID: strconv.ParseInt: parsing \"id\": invalid syntax"}, + {"wrong installationID", map[string]string{"githubApiURL": "https://api.github.com", "runnerScope": ORG, "owner": "ownername", "repos": "reponame,otherrepo", "labels": "golang", "targetWorkflowQueueLength": "1", "applicationID": "1", "installationID": "id"}, true, true, "error parsing installationID: strconv.ParseInt: parsing \"id\": invalid syntax"}, } func TestGitHubRunnerParseMetadata(t *testing.T) { diff --git a/pkg/scalers/influxdb_scaler.go b/pkg/scalers/influxdb_scaler.go index 4f552999ed8..fba5c841a7c 100644 --- a/pkg/scalers/influxdb_scaler.go +++ b/pkg/scalers/influxdb_scaler.go @@ -3,7 +3,6 @@ package scalers import ( "context" "fmt" - "strconv" "github.com/go-logr/logr" influxdb2 "github.com/influxdata/influxdb-client-go/v2" @@ -23,14 +22,15 @@ type influxDBScaler struct { } type influxDBMetadata struct { - authToken string - organizationName string - query string - serverURL string - unsafeSsl bool - thresholdValue float64 - activationThresholdValue float64 - triggerIndex int + AuthToken string `keda:"name=authToken, order=triggerMetadata;resolvedEnv;authParams"` + OrganizationName string `keda:"name=organizationName, order=triggerMetadata;resolvedEnv;authParams"` + Query string `keda:"name=query, order=triggerMetadata"` + ServerURL string `keda:"name=serverURL, order=triggerMetadata;authParams"` + UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, optional"` + ThresholdValue float64 `keda:"name=thresholdValue, order=triggerMetadata, optional"` + ActivationThresholdValue float64 `keda:"name=activationThresholdValue, order=triggerMetadata, optional"` + + triggerIndex int } // NewInfluxDBScaler creates a new influx db scaler @@ -49,9 +49,9 @@ func NewInfluxDBScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { logger.Info("starting up influxdb client") client := influxdb2.NewClientWithOptions( - meta.serverURL, - meta.authToken, - influxdb2.DefaultOptions().SetTLSConfig(util.CreateTLSClientConfig(meta.unsafeSsl))) + meta.ServerURL, + meta.AuthToken, + influxdb2.DefaultOptions().SetTLSConfig(util.CreateTLSClientConfig(meta.UnsafeSsl))) return &influxDBScaler{ client: client, @@ -63,100 +63,17 @@ func NewInfluxDBScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { // parseInfluxDBMetadata parses the metadata passed in from the ScaledObject config func parseInfluxDBMetadata(config *scalersconfig.ScalerConfig) (*influxDBMetadata, error) { - var authToken string - var organizationName string - var query string - var serverURL string - var unsafeSsl bool - var thresholdValue float64 - var activationThresholdValue float64 - - val, ok := config.TriggerMetadata["authToken"] - switch { - case ok && val != "": - authToken = val - case config.TriggerMetadata["authTokenFromEnv"] != "": - if val, ok := config.ResolvedEnv[config.TriggerMetadata["authTokenFromEnv"]]; ok { - authToken = val - } else { - return nil, fmt.Errorf("no auth token given") - } - case config.AuthParams["authToken"] != "": - authToken = config.AuthParams["authToken"] - default: - return nil, fmt.Errorf("no auth token given") - } - - val, ok = config.TriggerMetadata["organizationName"] - switch { - case ok && val != "": - organizationName = val - case config.TriggerMetadata["organizationNameFromEnv"] != "": - if val, ok := config.ResolvedEnv[config.TriggerMetadata["organizationNameFromEnv"]]; ok { - organizationName = val - } else { - return nil, fmt.Errorf("no organization name given") - } - case config.AuthParams["organizationName"] != "": - organizationName = config.AuthParams["organizationName"] - default: - return nil, fmt.Errorf("no organization name given") - } - - if val, ok := config.TriggerMetadata["query"]; ok { - query = val - } else { - return nil, fmt.Errorf("no query provided") - } - - if val, ok := config.TriggerMetadata["serverURL"]; ok { - serverURL = val - } else if val, ok := config.AuthParams["serverURL"]; ok { - serverURL = val - } else { - return nil, fmt.Errorf("no server url given") - } - - if val, ok := config.TriggerMetadata["activationThresholdValue"]; ok { - value, err := strconv.ParseFloat(val, 64) - if err != nil { - return nil, fmt.Errorf("activationThresholdValue: failed to parse activationThresholdValue %w", err) - } - activationThresholdValue = value + meta := &influxDBMetadata{} + meta.triggerIndex = config.TriggerIndex + if err := config.TypedConfig(meta); err != nil { + return nil, fmt.Errorf("error parsing influxdb metadata: %w", err) } - if val, ok := config.TriggerMetadata["thresholdValue"]; ok { - value, err := strconv.ParseFloat(val, 64) - if err != nil { - return nil, fmt.Errorf("thresholdValue: failed to parse thresholdValue length %w", err) - } - thresholdValue = value - } else { - if config.AsMetricSource { - thresholdValue = 0 - } else { - return nil, fmt.Errorf("no threshold value given") - } - } - unsafeSsl = false - if val, ok := config.TriggerMetadata["unsafeSsl"]; ok { - parsedVal, err := strconv.ParseBool(val) - if err != nil { - return nil, fmt.Errorf("error parsing unsafeSsl: %w", err) - } - unsafeSsl = parsedVal + if meta.ThresholdValue == 0 && !config.AsMetricSource { + return nil, fmt.Errorf("no threshold value given") } - return &influxDBMetadata{ - authToken: authToken, - organizationName: organizationName, - query: query, - serverURL: serverURL, - thresholdValue: thresholdValue, - activationThresholdValue: activationThresholdValue, - unsafeSsl: unsafeSsl, - triggerIndex: config.TriggerIndex, - }, nil + return meta, nil } // Close closes the connection of the client to the server @@ -192,25 +109,25 @@ func queryInfluxDB(ctx context.Context, queryAPI api.QueryAPI, query string) (fl // GetMetricsAndActivity connects to influxdb via the client and returns a value based on the query func (s *influxDBScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { // Grab QueryAPI to make queries to influxdb instance - queryAPI := s.client.QueryAPI(s.metadata.organizationName) + queryAPI := s.client.QueryAPI(s.metadata.OrganizationName) - value, err := queryInfluxDB(ctx, queryAPI, s.metadata.query) + value, err := queryInfluxDB(ctx, queryAPI, s.metadata.Query) if err != nil { return []external_metrics.ExternalMetricValue{}, false, err } metric := GenerateMetricInMili(metricName, value) - return []external_metrics.ExternalMetricValue{metric}, value > s.metadata.activationThresholdValue, nil + return []external_metrics.ExternalMetricValue{metric}, value > s.metadata.ActivationThresholdValue, nil } // GetMetricSpecForScaling returns the metric spec for the Horizontal Pod Autoscaler func (s *influxDBScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { externalMetric := &v2.ExternalMetricSource{ Metric: v2.MetricIdentifier{ - Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, util.NormalizeString(fmt.Sprintf("influxdb-%s", s.metadata.organizationName))), + Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, util.NormalizeString(fmt.Sprintf("influxdb-%s", s.metadata.OrganizationName))), }, - Target: GetMetricTargetMili(s.metricType, s.metadata.thresholdValue), + Target: GetMetricTargetMili(s.metricType, s.metadata.ThresholdValue), } metricSpec := v2.MetricSpec{ External: externalMetric, Type: externalMetricType, diff --git a/pkg/scalers/influxdb_scaler_test.go b/pkg/scalers/influxdb_scaler_test.go index d238a222a54..6cf23680cd0 100644 --- a/pkg/scalers/influxdb_scaler_test.go +++ b/pkg/scalers/influxdb_scaler_test.go @@ -46,7 +46,7 @@ var testInfluxDBMetadata = []parseInfluxDBMetadataTestData{ {map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "unsafeSsl": "false"}, true, map[string]string{}}, // 9 authToken, organizationName, and serverURL are defined in authParams {map[string]string{"query": "from(bucket: hello)", "thresholdValue": "10", "unsafeSsl": "false"}, false, map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "authToken": "myToken"}}, - // 10 no sunsafeSsl value passed + // 10 no unsafeSsl value passed {map[string]string{"serverURL": "https://influxdata.com", "metricName": "influx_metric", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "authToken": "myToken"}, false, map[string]string{}}, // 11 wrong activationThreshold valuequeryInfluxDB {map[string]string{"serverURL": "https://influxdata.com", "metricName": "influx_metric", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "activationThresholdValue": "aa", "authToken": "myToken", "unsafeSsl": "false"}, true, map[string]string{}}, diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 48d6b3c9069..b353c1313b4 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -81,6 +81,7 @@ type kafkaMetadata struct { realm string kerberosConfigPath string kerberosServiceName string + kerberosDisableFAST bool // OAUTHBEARER tokenProvider kafkaSaslOAuthTokenProvider @@ -409,6 +410,15 @@ func parseKerberosParams(config *scalersconfig.ScalerConfig, meta *kafkaMetadata meta.kerberosServiceName = strings.TrimSpace(config.AuthParams["kerberosServiceName"]) } + meta.kerberosDisableFAST = false + if val, ok := config.AuthParams["kerberosDisableFAST"]; ok { + t, err := strconv.ParseBool(val) + if err != nil { + return fmt.Errorf("error parsing kerberosDisableFAST: %w", err) + } + meta.kerberosDisableFAST = t + } + meta.saslType = mode return nil } @@ -688,7 +698,12 @@ func getKafkaClientConfig(ctx context.Context, metadata kafkaMetadata) (*sarama. config.Net.SASL.GSSAPI.AuthType = sarama.KRB5_USER_AUTH config.Net.SASL.GSSAPI.Password = metadata.password } + + if metadata.kerberosDisableFAST { + config.Net.SASL.GSSAPI.DisablePAFXFAST = true + } } + return config, nil } diff --git a/pkg/scalers/kafka_scaler_test.go b/pkg/scalers/kafka_scaler_test.go index 57a3f95eba9..fe42e28995c 100644 --- a/pkg/scalers/kafka_scaler_test.go +++ b/pkg/scalers/kafka_scaler_test.go @@ -209,6 +209,10 @@ var parseKafkaAuthParamsTestDataset = []parseKafkaAuthParamsTestData{ {map[string]string{"sasl": "gssapi", "username": "admin", "password": "admin", "kerberosConfig": "", "tls": "enable", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, false}, // failure, SASL GSSAPI/keytab + TLS missing username {map[string]string{"sasl": "gssapi", "keytab": "/path/to/keytab", "kerberosConfig": "", "realm": "tst.com", "tls": "enable", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, false}, + // success, SASL GSSAPI/disableFast + {map[string]string{"sasl": "gssapi", "username": "admin", "keytab": "/path/to/keytab", "kerberosConfig": "", "realm": "tst.com", "kerberosDisableFAST": "true"}, false, false}, + // failure, SASL GSSAPI/disableFast incorrect + {map[string]string{"sasl": "gssapi", "username": "admin", "keytab": "/path/to/keytab", "kerberosConfig": "", "realm": "tst.com", "kerberosDisableFAST": "notabool"}, true, false}, } var parseAuthParamsTestDataset = []parseAuthParamsTestDataSecondAuthMethod{ // success, SASL plaintext diff --git a/pkg/scalers/kubernetes_workload_scaler.go b/pkg/scalers/kubernetes_workload_scaler.go index 2994e8daff2..a3ceeaa0405 100644 --- a/pkg/scalers/kubernetes_workload_scaler.go +++ b/pkg/scalers/kubernetes_workload_scaler.go @@ -3,7 +3,6 @@ package scalers import ( "context" "fmt" - "strconv" "github.com/go-logr/logr" v2 "k8s.io/api/autoscaling/v2" @@ -18,16 +17,13 @@ import ( type kubernetesWorkloadScaler struct { metricType v2.MetricTargetType - metadata *kubernetesWorkloadMetadata + metadata kubernetesWorkloadMetadata kubeClient client.Client logger logr.Logger } const ( kubernetesWorkloadMetricType = "External" - podSelectorKey = "podSelector" - valueKey = "value" - activationValueKey = "activationValue" ) var phasesCountedAsTerminated = []corev1.PodPhase{ @@ -36,11 +32,22 @@ var phasesCountedAsTerminated = []corev1.PodPhase{ } type kubernetesWorkloadMetadata struct { - podSelector labels.Selector - namespace string - value float64 - activationValue float64 - triggerIndex int + PodSelector string `keda:"name=podSelector, order=triggerMetadata"` + Value float64 `keda:"name=value, order=triggerMetadata"` + ActivationValue float64 `keda:"name=activationValue, order=triggerMetadata, default=0"` + + namespace string + triggerIndex int + podSelector labels.Selector + asMetricSource bool +} + +func (m *kubernetesWorkloadMetadata) Validate() error { + if m.Value <= 0 && !m.asMetricSource { + return fmt.Errorf("value must be a float greater than 0") + } + + return nil } // NewKubernetesWorkloadScaler creates a new kubernetesWorkloadScaler @@ -50,9 +57,9 @@ func NewKubernetesWorkloadScaler(kubeClient client.Client, config *scalersconfig return nil, fmt.Errorf("error getting scaler metric type: %w", err) } - meta, parseErr := parseWorkloadMetadata(config) - if parseErr != nil { - return nil, fmt.Errorf("error parsing kubernetes workload metadata: %w", parseErr) + meta, err := parseKubernetesWorkloadMetadata(config) + if err != nil { + return nil, fmt.Errorf("error parsing kubernetes workload metadata: %w", err) } return &kubernetesWorkloadScaler{ @@ -63,50 +70,46 @@ func NewKubernetesWorkloadScaler(kubeClient client.Client, config *scalersconfig }, nil } -func parseWorkloadMetadata(config *scalersconfig.ScalerConfig) (*kubernetesWorkloadMetadata, error) { - meta := &kubernetesWorkloadMetadata{} - var err error +func parseKubernetesWorkloadMetadata(config *scalersconfig.ScalerConfig) (kubernetesWorkloadMetadata, error) { + meta := kubernetesWorkloadMetadata{} + err := config.TypedConfig(&meta) + if err != nil { + return meta, fmt.Errorf("error parsing kubernetes workload metadata: %w", err) + } + meta.namespace = config.ScalableObjectNamespace - podSelector, err := labels.Parse(config.TriggerMetadata[podSelectorKey]) - if err != nil || podSelector.String() == "" { - return nil, fmt.Errorf("invalid pod selector") + meta.triggerIndex = config.TriggerIndex + meta.asMetricSource = config.AsMetricSource + + if meta.asMetricSource { + meta.Value = 0 } - meta.podSelector = podSelector - value, err := strconv.ParseFloat(config.TriggerMetadata[valueKey], 64) - if err != nil || value == 0 { - if config.AsMetricSource { - value = 0 - } else { - return nil, fmt.Errorf("value must be a float greater than 0") - } + + selector, err := labels.Parse(meta.PodSelector) + if err != nil { + return meta, fmt.Errorf("error parsing pod selector: %w", err) } - meta.value = value + meta.podSelector = selector - meta.activationValue = 0 - if val, ok := config.TriggerMetadata[activationValueKey]; ok { - activationValue, err := strconv.ParseFloat(val, 64) - if err != nil { - return nil, fmt.Errorf("value must be a float") - } - meta.activationValue = activationValue + if err := meta.Validate(); err != nil { + return meta, err } - meta.triggerIndex = config.TriggerIndex return meta, nil } -// Close no need for kubernetes workload scaler func (s *kubernetesWorkloadScaler) Close(context.Context) error { return nil } // GetMetricSpecForScaling returns the metric spec for the HPA func (s *kubernetesWorkloadScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { + metricName := kedautil.NormalizeString(fmt.Sprintf("workload-%s", s.metadata.namespace)) externalMetric := &v2.ExternalMetricSource{ Metric: v2.MetricIdentifier{ - Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("workload-%s", s.metadata.namespace))), + Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName), }, - Target: GetMetricTargetMili(s.metricType, s.metadata.value), + Target: GetMetricTargetMili(s.metricType, s.metadata.Value), } metricSpec := v2.MetricSpec{External: externalMetric, Type: kubernetesWorkloadMetricType} return []v2.MetricSpec{metricSpec} @@ -121,19 +124,17 @@ func (s *kubernetesWorkloadScaler) GetMetricsAndActivity(ctx context.Context, me metric := GenerateMetricInMili(metricName, float64(pods)) - return []external_metrics.ExternalMetricValue{metric}, float64(pods) > s.metadata.activationValue, nil + return []external_metrics.ExternalMetricValue{metric}, float64(pods) > s.metadata.ActivationValue, nil } func (s *kubernetesWorkloadScaler) getMetricValue(ctx context.Context) (int64, error) { podList := &corev1.PodList{} - listOptions := client.ListOptions{} - listOptions.LabelSelector = s.metadata.podSelector - listOptions.Namespace = s.metadata.namespace - opts := []client.ListOption{ - &listOptions, + listOptions := client.ListOptions{ + LabelSelector: s.metadata.podSelector, + Namespace: s.metadata.namespace, } - err := s.kubeClient.List(ctx, podList, opts...) + err := s.kubeClient.List(ctx, podList, &listOptions) if err != nil { return 0, err } diff --git a/pkg/scalers/kubernetes_workload_scaler_test.go b/pkg/scalers/kubernetes_workload_scaler_test.go index ab7a7f360d6..8544ab8e4c7 100644 --- a/pkg/scalers/kubernetes_workload_scaler_test.go +++ b/pkg/scalers/kubernetes_workload_scaler_test.go @@ -38,7 +38,13 @@ var parseWorkloadMetadataTestDataset = []workloadMetadataTestData{ func TestParseWorkloadMetadata(t *testing.T) { for _, testData := range parseWorkloadMetadataTestDataset { - _, err := parseWorkloadMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, ScalableObjectNamespace: testData.namespace}) + _, err := NewKubernetesWorkloadScaler( + fake.NewClientBuilder().Build(), + &scalersconfig.ScalerConfig{ + TriggerMetadata: testData.metadata, + ScalableObjectNamespace: testData.namespace, + }, + ) if err != nil && !testData.isError { t.Error("Expected success but got error", err) } @@ -68,7 +74,7 @@ var isActiveWorkloadTestDataset = []workloadIsActiveTestData{ func TestWorkloadIsActive(t *testing.T) { for _, testData := range isActiveWorkloadTestDataset { - s, _ := NewKubernetesWorkloadScaler( + s, err := NewKubernetesWorkloadScaler( fake.NewClientBuilder().WithRuntimeObjects(createPodlist(testData.podCount)).Build(), &scalersconfig.ScalerConfig{ TriggerMetadata: testData.metadata, @@ -77,6 +83,10 @@ func TestWorkloadIsActive(t *testing.T) { ScalableObjectNamespace: testData.namespace, }, ) + if err != nil { + t.Error("Error creating scaler", err) + continue + } _, isActive, _ := s.GetMetricsAndActivity(context.TODO(), "Metric") if testData.active && !isActive { t.Error("Expected active but got inactive") @@ -107,7 +117,7 @@ var getMetricSpecForScalingTestDataset = []workloadGetMetricSpecForScalingTestDa func TestWorkloadGetMetricSpecForScaling(t *testing.T) { for _, testData := range getMetricSpecForScalingTestDataset { - s, _ := NewKubernetesWorkloadScaler( + s, err := NewKubernetesWorkloadScaler( fake.NewClientBuilder().Build(), &scalersconfig.ScalerConfig{ TriggerMetadata: testData.metadata, @@ -117,6 +127,10 @@ func TestWorkloadGetMetricSpecForScaling(t *testing.T) { TriggerIndex: testData.triggerIndex, }, ) + if err != nil { + t.Error("Error creating scaler", err) + continue + } metric := s.GetMetricSpecForScaling(context.Background()) if metric[0].External.Metric.Name != testData.name { @@ -145,14 +159,11 @@ func createPodlist(count int) *v1.PodList { func TestWorkloadPhase(t *testing.T) { phases := map[v1.PodPhase]bool{ - v1.PodRunning: true, - // succeeded and failed clearly count as terminated + v1.PodRunning: true, v1.PodSucceeded: false, v1.PodFailed: false, - // unknown could be for example a temporarily unresponsive node; count the pod - v1.PodUnknown: true, - // count pre-Running to avoid an additional delay on top of the poll interval - v1.PodPending: true, + v1.PodUnknown: true, + v1.PodPending: true, } for phase, active := range phases { list := &v1.PodList{} diff --git a/pkg/scalers/rabbitmq_scaler.go b/pkg/scalers/rabbitmq_scaler.go index f30d92fa13a..860228d3a99 100644 --- a/pkg/scalers/rabbitmq_scaler.go +++ b/pkg/scalers/rabbitmq_scaler.go @@ -8,6 +8,7 @@ import ( "net/http" "net/url" "path" + "reflect" "regexp" "strconv" "strings" @@ -83,6 +84,9 @@ type rabbitMQMetadata struct { timeout time.Duration // custom http timeout for a specific trigger triggerIndex int // scaler index + username string + password string + // TLS ca string cert string @@ -149,12 +153,20 @@ func NewRabbitMQScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { if meta.protocol == amqpProtocol { // Override vhost if requested. host := meta.host - if meta.vhostName != "" { + if meta.vhostName != "" || (meta.username != "" && meta.password != "") { hostURI, err := amqp.ParseURI(host) if err != nil { return nil, fmt.Errorf("error parsing rabbitmq connection string: %w", err) } - hostURI.Vhost = meta.vhostName + if meta.vhostName != "" { + hostURI.Vhost = meta.vhostName + } + + if meta.username != "" && meta.password != "" { + hostURI.Username = meta.username + hostURI.Password = meta.password + } + host = hostURI.String() } @@ -232,6 +244,28 @@ func resolveTLSAuthParams(config *scalersconfig.ScalerConfig, meta *rabbitMQMeta return nil } +func resolveAuth(config *scalersconfig.ScalerConfig, meta *rabbitMQMetadata) error { + usernameVal, err := getParameterFromConfigV2(config, "username", reflect.TypeOf(meta.username), + UseAuthentication(true), UseResolvedEnv(true), IsOptional(true)) + if err != nil { + return err + } + meta.username = usernameVal.(string) + + passwordVal, err := getParameterFromConfigV2(config, "password", reflect.TypeOf(meta.username), + UseAuthentication(true), UseResolvedEnv(true), IsOptional(true)) + if err != nil { + return err + } + meta.password = passwordVal.(string) + + if (meta.username != "" || meta.password != "") && (meta.username == "" || meta.password == "") { + return fmt.Errorf("username and password must be given together") + } + + return nil +} + func parseRabbitMQMetadata(config *scalersconfig.ScalerConfig) (*rabbitMQMetadata, error) { meta := rabbitMQMetadata{ connectionName: connectionName(config), @@ -252,6 +286,11 @@ func parseRabbitMQMetadata(config *scalersconfig.ScalerConfig) (*rabbitMQMetadat return nil, err } + // Resolve username and password + if err := resolveAuth(config, &meta); err != nil { + return nil, err + } + meta.keyPassword = config.AuthParams["keyPassword"] if config.PodIdentity.Provider == v1alpha1.PodIdentityProviderAzureWorkload { @@ -596,6 +635,10 @@ func (s *rabbitMQScaler) getQueueInfoViaHTTP(ctx context.Context) (*queueInfo, e vhost, subpaths := getVhostAndPathFromURL(parsedURL.Path, s.metadata.vhostName) parsedURL.Path = subpaths + if s.metadata.username != "" && s.metadata.password != "" { + parsedURL.User = url.UserPassword(s.metadata.username, s.metadata.password) + } + var getQueueInfoManagementURI string if s.metadata.useRegex { getQueueInfoManagementURI = fmt.Sprintf("%s/api/queues%s?page=1&use_regex=true&pagination=false&name=%s&page_size=%d", parsedURL.String(), vhost, url.QueryEscape(s.metadata.queueName), s.metadata.pageSize) diff --git a/pkg/scalers/rabbitmq_scaler_test.go b/pkg/scalers/rabbitmq_scaler_test.go index ef705757a3c..dd9c3f900b8 100644 --- a/pkg/scalers/rabbitmq_scaler_test.go +++ b/pkg/scalers/rabbitmq_scaler_test.go @@ -18,7 +18,9 @@ import ( ) const ( - host = "myHostSecret" + host = "myHostSecret" + rabbitMQUsername = "myUsernameSecret" + rabbitMQPassword = "myPasswordSecret" ) type parseRabbitMQMetadataTestData struct { @@ -43,7 +45,9 @@ type rabbitMQMetricIdentifier struct { } var sampleRabbitMqResolvedEnv = map[string]string{ - host: "amqp://user:sercet@somehost.com:5236/vhost", + host: "amqp://user:sercet@somehost.com:5236/vhost", + rabbitMQUsername: "user", + rabbitMQPassword: "Password", } var testRabbitMQMetadata = []parseRabbitMQMetadataTestData{ @@ -151,6 +155,18 @@ var testRabbitMQAuthParamData = []parseRabbitMQAuthParamTestData{ {map[string]string{"queueName": "sample", "hostFromEnv": host}, v1alpha1.AuthPodIdentity{}, map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert"}, true, true, false}, // failure, TLS invalid {map[string]string{"queueName": "sample", "hostFromEnv": host}, v1alpha1.AuthPodIdentity{}, map[string]string{"tls": "yes", "ca": "caaa", "cert": "ceert", "key": "kee"}, true, true, false}, + // success, username and password + {map[string]string{"queueName": "sample", "hostFromEnv": host}, v1alpha1.AuthPodIdentity{}, map[string]string{"username": "user", "password": "PASSWORD"}, false, false, false}, + // failure, username but no password + {map[string]string{"queueName": "sample", "hostFromEnv": host}, v1alpha1.AuthPodIdentity{}, map[string]string{"username": "user"}, true, false, false}, + // failure, password but no username + {map[string]string{"queueName": "sample", "hostFromEnv": host}, v1alpha1.AuthPodIdentity{}, map[string]string{"password": "PASSWORD"}, true, false, false}, + // success, username and password from env + {map[string]string{"queueName": "sample", "hostFromEnv": host, "usernameFromEnv": rabbitMQUsername, "passwordFromEnv": rabbitMQPassword}, v1alpha1.AuthPodIdentity{}, map[string]string{}, false, false, false}, + // failure, username from env but not password + {map[string]string{"queueName": "sample", "hostFromEnv": host, "usernameFromEnv": rabbitMQUsername}, v1alpha1.AuthPodIdentity{}, map[string]string{}, true, false, false}, + // failure, password from env but not username + {map[string]string{"queueName": "sample", "hostFromEnv": host, "passwordFromEnv": rabbitMQPassword}, v1alpha1.AuthPodIdentity{}, map[string]string{}, true, false, false}, // success, WorkloadIdentity {map[string]string{"queueName": "sample", "hostFromEnv": host, "protocol": "http"}, v1alpha1.AuthPodIdentity{Provider: v1alpha1.PodIdentityProviderAzureWorkload, IdentityID: kedautil.StringPointer("client-id")}, map[string]string{"workloadIdentityResource": "rabbitmq-resource-id"}, false, false, true}, // failure, WoekloadIdentity not supported for amqp diff --git a/pkg/scalers/selenium_grid_scaler_test.go b/pkg/scalers/selenium_grid_scaler_test.go index 85321118c6f..56de3b7f024 100644 --- a/pkg/scalers/selenium_grid_scaler_test.go +++ b/pkg/scalers/selenium_grid_scaler_test.go @@ -811,14 +811,16 @@ func Test_parseSeleniumGridScalerMetadata(t *testing.T) { }, wantErr: false, want: &seleniumGridScalerMetadata{ - URL: "http://selenium-hub:4444/graphql", - BrowserName: "MicrosoftEdge", - SessionBrowserName: "msedge", - TargetValue: 1, - BrowserVersion: "latest", - PlatformName: "linux", - Username: "username", - Password: "password", + URL: "http://selenium-hub:4444/graphql", + BrowserName: "MicrosoftEdge", + SessionBrowserName: "msedge", + TargetValue: 1, + BrowserVersion: "latest", + PlatformName: "linux", + Username: "username", + Password: "password", + SessionsPerNode: 1, + SessionBrowserVersion: "latest", }, }, { diff --git a/tests/scalers/elasticsearch/elasticsearch_test.go b/tests/scalers/elasticsearch/elasticsearch_test.go index de2d997066c..ea83f315298 100644 --- a/tests/scalers/elasticsearch/elasticsearch_test.go +++ b/tests/scalers/elasticsearch/elasticsearch_test.go @@ -202,7 +202,7 @@ spec: name: elasticsearch ` - scaledObjectTemplate = ` + scaledObjectTemplateSearchTemplate = ` apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: @@ -232,6 +232,54 @@ spec: name: keda-trigger-auth-elasticsearch-secret ` + scaledObjectTemplateQuery = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + minReplicaCount: 0 + maxReplicaCount: 2 + pollingInterval: 3 + cooldownPeriod: 5 + triggers: + - type: elasticsearch + metadata: + addresses: "http://{{.DeploymentName}}.{{.TestNamespace}}.svc:9200" + username: "elastic" + index: {{.IndexName}} + query: | + { + "query": { + "bool": { + "must": [ + { + "range": { + "@timestamp": { + "gte": "now-1m", + "lte": "now" + } + } + }, + { + "match_all": {} + } + ] + } + } + } + valueLocation: "hits.total.value" + targetValue: "1" + activationTargetValue: "4" + authenticationRef: + name: keda-trigger-auth-elasticsearch-secret +` + elasticsearchCreateIndex = ` { "mappings": { @@ -297,9 +345,6 @@ spec: func TestElasticsearchScaler(t *testing.T) { kc := GetKubernetesClient(t) data, templates := getTemplateData() - t.Cleanup(func() { - DeleteKubernetesResources(t, testNamespace, data, templates) - }) // Create kubernetes resources CreateKubernetesResources(t, kc, testNamespace, data, templates) @@ -307,13 +352,32 @@ func TestElasticsearchScaler(t *testing.T) { // setup elastic setupElasticsearch(t, kc) - assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, 3), - "replica count should be %d after 3 minutes", minReplicaCount) + t.Run("test with searchTemplateName", func(t *testing.T) { + t.Log("--- testing with searchTemplateName ---") + + // Create ScaledObject with searchTemplateName + KubectlApplyWithTemplate(t, data, "scaledObjectTemplateSearchTemplate", scaledObjectTemplateSearchTemplate) - // test scaling - testActivation(t, kc) - testScaleOut(t, kc) - testScaleIn(t, kc) + testElasticsearchScaler(t, kc) + + // Delete ScaledObject + KubectlDeleteWithTemplate(t, data, "scaledObjectTemplateSearchTemplate", scaledObjectTemplateSearchTemplate) + }) + + t.Run("test with query", func(t *testing.T) { + t.Log("--- testing with query ---") + + // Create ScaledObject with query + KubectlApplyWithTemplate(t, data, "scaledObjectTemplateQuery", scaledObjectTemplateQuery) + + testElasticsearchScaler(t, kc) + + // Delete ScaledObject + KubectlDeleteWithTemplate(t, data, "scaledObjectTemplateQuery", scaledObjectTemplateQuery) + }) + + // cleanup + DeleteKubernetesResources(t, testNamespace, data, templates) } func setupElasticsearch(t *testing.T, kc *kubernetes.Clientset) { @@ -326,22 +390,18 @@ func setupElasticsearch(t *testing.T, kc *kubernetes.Clientset) { require.NoErrorf(t, err, "cannot execute command - %s", err) } -func testActivation(t *testing.T, kc *kubernetes.Clientset) { +func testElasticsearchScaler(t *testing.T, kc *kubernetes.Clientset) { t.Log("--- testing activation ---") addElements(t, 3) AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, minReplicaCount, 60) -} -func testScaleOut(t *testing.T, kc *kubernetes.Clientset) { t.Log("--- testing scale out ---") addElements(t, 5) assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, 3), "replica count should be %d after 3 minutes", maxReplicaCount) -} -func testScaleIn(t *testing.T, kc *kubernetes.Clientset) { t.Log("--- testing scale in ---") assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, 3), @@ -383,6 +443,5 @@ func getTemplateData() (templateData, []Template) { {Name: "serviceTemplate", Config: serviceTemplate}, {Name: "elasticsearchDeploymentTemplate", Config: elasticsearchDeploymentTemplate}, {Name: "deploymentTemplate", Config: deploymentTemplate}, - {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, } } diff --git a/tests/scalers/etcd/etcd_cluster_auth/etcd_cluster_auth_test.go b/tests/scalers/etcd/etcd_cluster_auth/etcd_cluster_auth_test.go new file mode 100644 index 00000000000..9f5d7d59892 --- /dev/null +++ b/tests/scalers/etcd/etcd_cluster_auth/etcd_cluster_auth_test.go @@ -0,0 +1,232 @@ +//go:build e2e +// +build e2e + +package etcd_cluster_auth_test + +import ( + "encoding/base64" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" +) + +const ( + testName = "etcd-auth-test" +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + jobName = fmt.Sprintf("%s-job", testName) + secretName = fmt.Sprintf("%s-secret", testName) + triggerAuthName = fmt.Sprintf("%s-triggerauth", testName) + etcdClientName = fmt.Sprintf("%s-client", testName) + etcdUsername = "root" + etcdPassword = "admin" + etcdEndpoints = fmt.Sprintf("etcd-0.etcd-headless.%s:2379,etcd-1.%s:2379,etcd-2.etcd-headless.%s:2379", testNamespace, testNamespace, testNamespace) + minReplicaCount = 0 + maxReplicaCount = 2 +) + +type templateData struct { + TestNamespace string + DeploymentName string + JobName string + ScaledObjectName string + SecretName string + TriggerAuthName string + EtcdUsernameBase64 string + EtcdPasswordBase64 string + MinReplicaCount int + MaxReplicaCount int + EtcdName string + EtcdClientName string + EtcdEndpoints string +} + +const ( + secretTemplate = `apiVersion: v1 +kind: Secret +metadata: + name: {{.SecretName}} + namespace: {{.TestNamespace}} +data: + etcd-username: {{.EtcdUsernameBase64}} + etcd-password: {{.EtcdPasswordBase64}} +` + triggerAuthenticationTemplate = `apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: {{.TriggerAuthName}} + namespace: {{.TestNamespace}} +spec: + secretTargetRef: + - parameter: username + name: {{.SecretName}} + key: etcd-username + - parameter: password + name: {{.SecretName}} + key: etcd-password +` + deploymentTemplate = `apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} +spec: + selector: + matchLabels: + app: {{.DeploymentName}} + replicas: 0 + template: + metadata: + labels: + app: {{.DeploymentName}} + spec: + containers: + - name: my-app + image: nginxinc/nginx-unprivileged + imagePullPolicy: IfNotPresent + ports: + - containerPort: 80 +` + scaledObjectTemplate = `apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + pollingInterval: 15 + cooldownPeriod: 5 + minReplicaCount: {{.MinReplicaCount}} + maxReplicaCount: {{.MaxReplicaCount}} + advanced: + horizontalPodAutoscalerConfig: + name: keda-hpa-etcd-scaledobject + behavior: + scaleDown: + stabilizationWindowSeconds: 5 + triggers: + - type: etcd + metadata: + endpoints: {{.EtcdEndpoints}} + watchKey: var + value: '1.5' + activationValue: '5' + watchProgressNotifyInterval: '10' + authenticationRef: + name: {{.TriggerAuthName}} +` + etcdClientTemplate = ` +apiVersion: v1 +kind: Pod +metadata: + name: {{.EtcdClientName}} + namespace: {{.TestNamespace}} +spec: + containers: + - name: {{.EtcdClientName}} + image: gcr.io/etcd-development/etcd:v3.4.10 + command: + - sh + - -c + - "exec tail -f /dev/null"` +) + +func TestScaler(t *testing.T) { + // setup + t.Log("--- setting up ---") + // Create kubernetes resources + kc := GetKubernetesClient(t) + data, templates := getTemplateData() + t.Cleanup(func() { + KubectlDeleteWithTemplate(t, data, "etcdClientTemplate", etcdClientTemplate) + RemoveCluster(t, kc) + DeleteKubernetesResources(t, testNamespace, data, templates) + }) + CreateNamespace(t, kc, testNamespace) + + // Create Etcd Cluster + KubectlApplyWithTemplate(t, data, "etcdClientTemplate", etcdClientTemplate) + InstallCluster(t, kc) + setVarValue(t, 0) + + // Create kubernetes resources for testing + KubectlApplyMultipleWithTemplate(t, data, templates) + + testActivation(t, kc) + testScaleOut(t, kc) + testScaleIn(t, kc) +} + +func testActivation(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing activation ---") + setVarValue(t, 4) + + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, minReplicaCount, 60) +} + +func testScaleOut(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing scale out ---") + setVarValue(t, 9) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, 3), + "replica count should be %d after 3 minutes", maxReplicaCount) +} + +func testScaleIn(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing scale in ---") + setVarValue(t, 0) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, 3), + "replica count should be %d after 3 minutes", minReplicaCount) +} + +func getTemplateData() (templateData, []Template) { + return templateData{ + TestNamespace: testNamespace, + DeploymentName: deploymentName, + ScaledObjectName: scaledObjectName, + JobName: jobName, + SecretName: secretName, + TriggerAuthName: triggerAuthName, + EtcdUsernameBase64: base64.StdEncoding.EncodeToString([]byte(etcdUsername)), + EtcdPasswordBase64: base64.StdEncoding.EncodeToString([]byte(etcdPassword)), + EtcdName: testName, + EtcdClientName: etcdClientName, + EtcdEndpoints: etcdEndpoints, + MinReplicaCount: minReplicaCount, + MaxReplicaCount: maxReplicaCount, + }, []Template{ + {Name: "secretTemplate", Config: secretTemplate}, + {Name: "triggerAuthenticationTemplate", Config: triggerAuthenticationTemplate}, + {Name: "deploymentTemplate", Config: deploymentTemplate}, + {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, + } +} + +func setVarValue(t *testing.T, value int) { + _, _, err := ExecCommandOnSpecificPod(t, etcdClientName, testNamespace, fmt.Sprintf(`etcdctl --user="%s" --password="%s" put var %d --endpoints=%s`, + etcdUsername, etcdPassword, value, etcdEndpoints)) + assert.NoErrorf(t, err, "cannot execute command - %s", err) +} + +func InstallCluster(t *testing.T, kc *kubernetes.Clientset) { + _, err := ExecuteCommand(fmt.Sprintf(`helm upgrade --install --set persistence.enabled=false --set resourcesPreset=none --set auth.rbac.rootPassword=%s --set auth.rbac.allowNoneAuthentication=false --set replicaCount=3 --namespace %s --wait etcd oci://registry-1.docker.io/bitnamicharts/etcd`, + etcdPassword, testNamespace)) + require.NoErrorf(t, err, "cannot execute command - %s", err) +} + +func RemoveCluster(t *testing.T, kc *kubernetes.Clientset) { + _, err := ExecuteCommand(fmt.Sprintf(`helm delete --namespace %s --wait etcd`, + testNamespace)) + require.NoErrorf(t, err, "cannot execute command - %s", err) +} diff --git a/tests/scalers/rabbitmq/rabbitmq_helper.go b/tests/scalers/rabbitmq/rabbitmq_helper.go index 6f22b178d95..d0a78e821e4 100644 --- a/tests/scalers/rabbitmq/rabbitmq_helper.go +++ b/tests/scalers/rabbitmq/rabbitmq_helper.go @@ -139,6 +139,47 @@ data: --- apiVersion: apps/v1 kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + replicas: 0 + selector: + matchLabels: + app: {{.DeploymentName}} + template: + metadata: + labels: + app: {{.DeploymentName}} + spec: + containers: + - name: rabbitmq-consumer + image: ghcr.io/kedacore/tests-rabbitmq + imagePullPolicy: Always + command: + - receive + args: + - '{{.Connection}}' + envFrom: + - secretRef: + name: {{.SecretName}} +` + + RMQTargetDeploymentWithAuthEnvTemplate = ` +apiVersion: v1 +kind: Secret +metadata: + name: {{.SecretName}} + namespace: {{.TestNamespace}} +data: + RabbitApiHost: {{.Base64Connection}} + RabbitUsername: {{.Base64Username}} + RabbitPassword: {{.Base64Password}} +--- +apiVersion: apps/v1 +kind: Deployment metadata: name: {{.DeploymentName}} namespace: {{.TestNamespace}} diff --git a/tests/scalers/rabbitmq/rabbitmq_queue_amqp_auth/rabbitmq_queue_amqp_auth_test.go b/tests/scalers/rabbitmq/rabbitmq_queue_amqp_auth/rabbitmq_queue_amqp_auth_test.go new file mode 100644 index 00000000000..47acb3e7144 --- /dev/null +++ b/tests/scalers/rabbitmq/rabbitmq_queue_amqp_auth/rabbitmq_queue_amqp_auth_test.go @@ -0,0 +1,258 @@ +//go:build e2e +// +build e2e + +package rabbitmq_queue_amqp_auth_test + +import ( + "encoding/base64" + "fmt" + "testing" + + "github.com/joho/godotenv" + "github.com/stretchr/testify/assert" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" + . "github.com/kedacore/keda/v2/tests/scalers/rabbitmq" +) + +// Load environment variables from .env file +var _ = godotenv.Load("../../../.env") + +const ( + testName = "rmq-queue-amqp-test" +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + rmqNamespace = fmt.Sprintf("%s-rmq", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + triggerAuthenticationName = fmt.Sprintf("%s-ta", testName) + secretName = fmt.Sprintf("%s-secret", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + queueName = "hello" + user = fmt.Sprintf("%s-user", testName) + password = fmt.Sprintf("%s-password", testName) + vhost = "/" + NoAuthConnectionString = fmt.Sprintf("amqp://rabbitmq.%s.svc.cluster.local", rmqNamespace) + connectionString = fmt.Sprintf("amqp://%s:%s@rabbitmq.%s.svc.cluster.local", user, password, rmqNamespace) + messageCount = 100 +) + +const ( + scaledObjectAuthFromSecretTemplate = ` +--- +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + pollingInterval: 5 + cooldownPeriod: 10 + minReplicaCount: 0 + maxReplicaCount: 4 + triggers: + - type: rabbitmq + metadata: + queueName: {{.QueueName}} + hostFromEnv: RabbitApiHost + mode: QueueLength + value: '10' + activationValue: '5' + authenticationRef: + name: {{.TriggerAuthenticationName}} +` + + triggerAuthenticationTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: {{.TriggerAuthenticationName}} + namespace: {{.TestNamespace}} +spec: + secretTargetRef: + - parameter: username + name: {{.SecretName}} + key: RabbitUsername + - parameter: password + name: {{.SecretName}} + key: RabbitPassword +` + invalidUsernameAndPasswordTriggerAuthenticationTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: {{.TriggerAuthenticationName}} + namespace: {{.TestNamespace}} +spec: + secretTargetRef: + - parameter: username + name: {{.SecretName}} + key: Rabbit-Username + - parameter: password + name: {{.SecretName}} + key: Rabbit-Password +` + + invalidPasswordTriggerAuthenticationTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: {{.TriggerAuthenticationName}} + namespace: {{.TestNamespace}} +spec: + secretTargetRef: + - parameter: username + name: {{.SecretName}} + key: RabbitUsername + - parameter: password + name: {{.SecretName}} + key: Rabbit-Password +` + + scaledObjectAuthFromEnvTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + pollingInterval: 5 + cooldownPeriod: 10 + minReplicaCount: 0 + maxReplicaCount: 4 + triggers: + - type: rabbitmq + metadata: + queueName: {{.QueueName}} + hostFromEnv: RabbitApiHost + usernameFromEnv: RabbitUsername + passwordFromEnv: RabbitPassword + mode: QueueLength + value: '10' + activationValue: '5' +` +) + +type templateData struct { + TestNamespace string + DeploymentName string + ScaledObjectName string + TriggerAuthenticationName string + SecretName string + QueueName string + Username, Base64Username string + Password, Base64Password string + Connection, Base64Connection string + FullConnection string +} + +func TestScaler(t *testing.T) { + // setup + t.Log("--- setting up ---") + kc := GetKubernetesClient(t) + data, templates := getTemplateData() + t.Cleanup(func() { + DeleteKubernetesResources(t, testNamespace, data, templates) + RMQUninstall(t, rmqNamespace, user, password, vhost, WithoutOAuth()) + }) + + // Create kubernetes resources + RMQInstall(t, kc, rmqNamespace, user, password, vhost, WithoutOAuth()) + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 1), + "replica count should be 0 after 1 minute") + + testAuthFromSecret(t, kc, data) + testAuthFromEnv(t, kc, data) + + testInvalidPassword(t, kc, data) + testInvalidUsernameAndPassword(t, kc, data) + + testActivationValue(t, kc) +} + +func getTemplateData() (templateData, []Template) { + return templateData{ + TestNamespace: testNamespace, + DeploymentName: deploymentName, + ScaledObjectName: scaledObjectName, + TriggerAuthenticationName: triggerAuthenticationName, + SecretName: secretName, + QueueName: queueName, + Username: user, + Base64Username: base64.StdEncoding.EncodeToString([]byte(user)), + Password: password, + Base64Password: base64.StdEncoding.EncodeToString([]byte(password)), + Connection: connectionString, + Base64Connection: base64.StdEncoding.EncodeToString([]byte(NoAuthConnectionString)), + }, []Template{ + {Name: "deploymentTemplate", Config: RMQTargetDeploymentWithAuthEnvTemplate}, + } +} + +func testAuthFromSecret(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing scale out ---") + KubectlApplyWithTemplate(t, data, "scaledObjectAuthFromSecretTemplate", scaledObjectAuthFromSecretTemplate) + defer KubectlDeleteWithTemplate(t, data, "scaledObjectAuthFromSecretTemplate", scaledObjectAuthFromSecretTemplate) + KubectlApplyWithTemplate(t, data, "triggerAuthenticationTemplate", triggerAuthenticationTemplate) + defer KubectlDeleteWithTemplate(t, data, "triggerAuthenticationTemplate", triggerAuthenticationTemplate) + + RMQPublishMessages(t, rmqNamespace, connectionString, queueName, messageCount) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 4, 60, 1), + "replica count should be 4 after 1 minute") + + t.Log("--- testing scale in ---") + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 1), + "replica count should be 0 after 1 minute") +} + +func testAuthFromEnv(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing scale out ---") + KubectlApplyWithTemplate(t, data, "scaledObjectAuthFromEnvTemplate", scaledObjectAuthFromEnvTemplate) + defer KubectlDeleteWithTemplate(t, data, "scaledObjectAuthFromEnvTemplate", scaledObjectAuthFromEnvTemplate) + + RMQPublishMessages(t, rmqNamespace, connectionString, queueName, messageCount) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 4, 60, 1), + "replica count should be 4 after 1 minute") + + t.Log("--- testing scale in ---") + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 1), + "replica count should be 0 after 1 minute") +} + +func testInvalidPassword(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing invalid password ---") + KubectlApplyWithTemplate(t, data, "scaledObjectAuthFromSecretTemplate", scaledObjectAuthFromSecretTemplate) + defer KubectlDeleteWithTemplate(t, data, "scaledObjectAuthFromSecretTemplate", scaledObjectAuthFromSecretTemplate) + KubectlApplyWithTemplate(t, data, "invalidPasswordTriggerAuthenticationTemplate", invalidPasswordTriggerAuthenticationTemplate) + defer KubectlDeleteWithTemplate(t, data, "invalidPasswordTriggerAuthenticationTemplate", invalidPasswordTriggerAuthenticationTemplate) + + // Shouldn't scale pods + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30) +} + +func testInvalidUsernameAndPassword(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing invalid username and password ---") + KubectlApplyWithTemplate(t, data, "scaledObjectAuthFromSecretTemplate", scaledObjectAuthFromSecretTemplate) + defer KubectlDeleteWithTemplate(t, data, "scaledObjectAuthFromSecretTemplate", scaledObjectAuthFromSecretTemplate) + KubectlApplyWithTemplate(t, data, "invalidUsernameAndPasswordTriggerAuthenticationTemplate", invalidUsernameAndPasswordTriggerAuthenticationTemplate) + defer KubectlDeleteWithTemplate(t, data, "invalidUsernameAndPasswordTriggerAuthenticationTemplate", invalidUsernameAndPasswordTriggerAuthenticationTemplate) + + // Shouldn't scale pods + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30) +} + +func testActivationValue(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing activation value ---") + messagesToQueue := 3 + RMQPublishMessages(t, rmqNamespace, connectionString, queueName, messagesToQueue) + + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 60) +} diff --git a/tests/scalers/rabbitmq/rabbitmq_queue_http_auth/rabbitmq_queue_http_auth_test.go b/tests/scalers/rabbitmq/rabbitmq_queue_http_auth/rabbitmq_queue_http_auth_test.go new file mode 100644 index 00000000000..a95d1924a8e --- /dev/null +++ b/tests/scalers/rabbitmq/rabbitmq_queue_http_auth/rabbitmq_queue_http_auth_test.go @@ -0,0 +1,258 @@ +//go:build e2e +// +build e2e + +package rabbitmq_queue_http_test + +import ( + "encoding/base64" + "fmt" + "testing" + + "github.com/joho/godotenv" + "github.com/stretchr/testify/assert" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" + . "github.com/kedacore/keda/v2/tests/scalers/rabbitmq" +) + +// Load environment variables from .env file +var _ = godotenv.Load("../../../.env") + +const ( + testName = "rmq-queue-http-test" +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + rmqNamespace = fmt.Sprintf("%s-rmq", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + triggerAuthenticationName = fmt.Sprintf("%s-ta", testName) + secretName = fmt.Sprintf("%s-secret", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + queueName = "hello" + user = fmt.Sprintf("%s-user", testName) + password = fmt.Sprintf("%s-password", testName) + vhost = "/" + NoAuthConnectionString = fmt.Sprintf("http://rabbitmq.%s.svc.cluster.local", rmqNamespace) + connectionString = fmt.Sprintf("amqp://%s:%s@rabbitmq.%s.svc.cluster.local", user, password, rmqNamespace) + messageCount = 100 +) + +const ( + scaledObjectAuthFromSecretTemplate = ` +--- +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + pollingInterval: 5 + cooldownPeriod: 10 + minReplicaCount: 0 + maxReplicaCount: 4 + triggers: + - type: rabbitmq + metadata: + queueName: {{.QueueName}} + hostFromEnv: RabbitApiHost + mode: QueueLength + value: '10' + activationValue: '5' + authenticationRef: + name: {{.TriggerAuthenticationName}} +` + + triggerAuthenticationTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: {{.TriggerAuthenticationName}} + namespace: {{.TestNamespace}} +spec: + secretTargetRef: + - parameter: username + name: {{.SecretName}} + key: RabbitUsername + - parameter: password + name: {{.SecretName}} + key: RabbitPassword +` + invalidUsernameAndPasswordTriggerAuthenticationTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: {{.TriggerAuthenticationName}} + namespace: {{.TestNamespace}} +spec: + secretTargetRef: + - parameter: username + name: {{.SecretName}} + key: Rabbit-Username + - parameter: password + name: {{.SecretName}} + key: Rabbit-Password +` + + invalidPasswordTriggerAuthenticationTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: {{.TriggerAuthenticationName}} + namespace: {{.TestNamespace}} +spec: + secretTargetRef: + - parameter: username + name: {{.SecretName}} + key: RabbitUsername + - parameter: password + name: {{.SecretName}} + key: Rabbit-Password +` + + scaledObjectAuthFromEnvTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + pollingInterval: 5 + cooldownPeriod: 10 + minReplicaCount: 0 + maxReplicaCount: 4 + triggers: + - type: rabbitmq + metadata: + queueName: {{.QueueName}} + hostFromEnv: RabbitApiHost + usernameFromEnv: RabbitUsername + passwordFromEnv: RabbitPassword + mode: QueueLength + value: '10' + activationValue: '5' +` +) + +type templateData struct { + TestNamespace string + DeploymentName string + ScaledObjectName string + TriggerAuthenticationName string + SecretName string + QueueName string + Username, Base64Username string + Password, Base64Password string + Connection, Base64Connection string + FullConnection string +} + +func TestScaler(t *testing.T) { + // setup + t.Log("--- setting up ---") + kc := GetKubernetesClient(t) + data, templates := getTemplateData() + t.Cleanup(func() { + DeleteKubernetesResources(t, testNamespace, data, templates) + RMQUninstall(t, rmqNamespace, user, password, vhost, WithoutOAuth()) + }) + + // Create kubernetes resources + RMQInstall(t, kc, rmqNamespace, user, password, vhost, WithoutOAuth()) + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 1), + "replica count should be 0 after 1 minute") + + testAuthFromSecret(t, kc, data) + testAuthFromEnv(t, kc, data) + + testInvalidPassword(t, kc, data) + testInvalidUsernameAndPassword(t, kc, data) + + testActivationValue(t, kc) +} + +func getTemplateData() (templateData, []Template) { + return templateData{ + TestNamespace: testNamespace, + DeploymentName: deploymentName, + ScaledObjectName: scaledObjectName, + TriggerAuthenticationName: triggerAuthenticationName, + SecretName: secretName, + QueueName: queueName, + Username: user, + Base64Username: base64.StdEncoding.EncodeToString([]byte(user)), + Password: password, + Base64Password: base64.StdEncoding.EncodeToString([]byte(password)), + Connection: connectionString, + Base64Connection: base64.StdEncoding.EncodeToString([]byte(NoAuthConnectionString)), + }, []Template{ + {Name: "deploymentTemplate", Config: RMQTargetDeploymentWithAuthEnvTemplate}, + } +} + +func testAuthFromSecret(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing scale out ---") + KubectlApplyWithTemplate(t, data, "scaledObjectAuthFromSecretTemplate", scaledObjectAuthFromSecretTemplate) + defer KubectlDeleteWithTemplate(t, data, "scaledObjectAuthFromSecretTemplate", scaledObjectAuthFromSecretTemplate) + KubectlApplyWithTemplate(t, data, "triggerAuthenticationTemplate", triggerAuthenticationTemplate) + defer KubectlDeleteWithTemplate(t, data, "triggerAuthenticationTemplate", triggerAuthenticationTemplate) + + RMQPublishMessages(t, rmqNamespace, connectionString, queueName, messageCount) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 4, 60, 1), + "replica count should be 4 after 1 minute") + + t.Log("--- testing scale in ---") + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 1), + "replica count should be 0 after 1 minute") +} + +func testAuthFromEnv(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing scale out ---") + KubectlApplyWithTemplate(t, data, "scaledObjectAuthFromEnvTemplate", scaledObjectAuthFromEnvTemplate) + defer KubectlDeleteWithTemplate(t, data, "scaledObjectAuthFromEnvTemplate", scaledObjectAuthFromEnvTemplate) + + RMQPublishMessages(t, rmqNamespace, connectionString, queueName, messageCount) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 4, 60, 1), + "replica count should be 4 after 1 minute") + + t.Log("--- testing scale in ---") + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 1), + "replica count should be 0 after 1 minute") +} + +func testInvalidPassword(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing invalid password ---") + KubectlApplyWithTemplate(t, data, "scaledObjectAuthFromSecretTemplate", scaledObjectAuthFromSecretTemplate) + defer KubectlDeleteWithTemplate(t, data, "scaledObjectAuthFromSecretTemplate", scaledObjectAuthFromSecretTemplate) + KubectlApplyWithTemplate(t, data, "invalidPasswordTriggerAuthenticationTemplate", invalidPasswordTriggerAuthenticationTemplate) + defer KubectlDeleteWithTemplate(t, data, "invalidPasswordTriggerAuthenticationTemplate", invalidPasswordTriggerAuthenticationTemplate) + + // Shouldn't scale pods + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30) +} + +func testInvalidUsernameAndPassword(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing invalid username and password ---") + KubectlApplyWithTemplate(t, data, "scaledObjectAuthFromSecretTemplate", scaledObjectAuthFromSecretTemplate) + defer KubectlDeleteWithTemplate(t, data, "scaledObjectAuthFromSecretTemplate", scaledObjectAuthFromSecretTemplate) + KubectlApplyWithTemplate(t, data, "invalidUsernameAndPasswordTriggerAuthenticationTemplate", invalidUsernameAndPasswordTriggerAuthenticationTemplate) + defer KubectlDeleteWithTemplate(t, data, "invalidUsernameAndPasswordTriggerAuthenticationTemplate", invalidUsernameAndPasswordTriggerAuthenticationTemplate) + + // Shouldn't scale pods + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30) +} + +func testActivationValue(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing activation value ---") + messagesToQueue := 3 + RMQPublishMessages(t, rmqNamespace, connectionString, queueName, messagesToQueue) + + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 60) +}