From 0a4aef656afa7b4ec79786f8c6b2d6c4125279d4 Mon Sep 17 00:00:00 2001 From: Rushen Wang <45029442+dovics@users.noreply.github.com> Date: Mon, 21 Oct 2024 15:58:47 +0800 Subject: [PATCH 1/2] Refactor InfluxDB scaler config (#6240) * Refactor InfluxDB scaler config Signed-off-by: wangrushen * fix: typo in influxdv unsafessl Signed-off-by: wangrushen --------- Signed-off-by: wangrushen --- pkg/scalers/influxdb_scaler.go | 131 +++++----------------------- pkg/scalers/influxdb_scaler_test.go | 2 +- 2 files changed, 25 insertions(+), 108 deletions(-) diff --git a/pkg/scalers/influxdb_scaler.go b/pkg/scalers/influxdb_scaler.go index 4f552999ed8..fba5c841a7c 100644 --- a/pkg/scalers/influxdb_scaler.go +++ b/pkg/scalers/influxdb_scaler.go @@ -3,7 +3,6 @@ package scalers import ( "context" "fmt" - "strconv" "github.com/go-logr/logr" influxdb2 "github.com/influxdata/influxdb-client-go/v2" @@ -23,14 +22,15 @@ type influxDBScaler struct { } type influxDBMetadata struct { - authToken string - organizationName string - query string - serverURL string - unsafeSsl bool - thresholdValue float64 - activationThresholdValue float64 - triggerIndex int + AuthToken string `keda:"name=authToken, order=triggerMetadata;resolvedEnv;authParams"` + OrganizationName string `keda:"name=organizationName, order=triggerMetadata;resolvedEnv;authParams"` + Query string `keda:"name=query, order=triggerMetadata"` + ServerURL string `keda:"name=serverURL, order=triggerMetadata;authParams"` + UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, optional"` + ThresholdValue float64 `keda:"name=thresholdValue, order=triggerMetadata, optional"` + ActivationThresholdValue float64 `keda:"name=activationThresholdValue, order=triggerMetadata, optional"` + + triggerIndex int } // NewInfluxDBScaler creates a new influx db scaler @@ -49,9 +49,9 @@ func NewInfluxDBScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { logger.Info("starting up influxdb client") client := influxdb2.NewClientWithOptions( - meta.serverURL, - meta.authToken, - influxdb2.DefaultOptions().SetTLSConfig(util.CreateTLSClientConfig(meta.unsafeSsl))) + meta.ServerURL, + meta.AuthToken, + influxdb2.DefaultOptions().SetTLSConfig(util.CreateTLSClientConfig(meta.UnsafeSsl))) return &influxDBScaler{ client: client, @@ -63,100 +63,17 @@ func NewInfluxDBScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { // parseInfluxDBMetadata parses the metadata passed in from the ScaledObject config func parseInfluxDBMetadata(config *scalersconfig.ScalerConfig) (*influxDBMetadata, error) { - var authToken string - var organizationName string - var query string - var serverURL string - var unsafeSsl bool - var thresholdValue float64 - var activationThresholdValue float64 - - val, ok := config.TriggerMetadata["authToken"] - switch { - case ok && val != "": - authToken = val - case config.TriggerMetadata["authTokenFromEnv"] != "": - if val, ok := config.ResolvedEnv[config.TriggerMetadata["authTokenFromEnv"]]; ok { - authToken = val - } else { - return nil, fmt.Errorf("no auth token given") - } - case config.AuthParams["authToken"] != "": - authToken = config.AuthParams["authToken"] - default: - return nil, fmt.Errorf("no auth token given") - } - - val, ok = config.TriggerMetadata["organizationName"] - switch { - case ok && val != "": - organizationName = val - case config.TriggerMetadata["organizationNameFromEnv"] != "": - if val, ok := config.ResolvedEnv[config.TriggerMetadata["organizationNameFromEnv"]]; ok { - organizationName = val - } else { - return nil, fmt.Errorf("no organization name given") - } - case config.AuthParams["organizationName"] != "": - organizationName = config.AuthParams["organizationName"] - default: - return nil, fmt.Errorf("no organization name given") - } - - if val, ok := config.TriggerMetadata["query"]; ok { - query = val - } else { - return nil, fmt.Errorf("no query provided") - } - - if val, ok := config.TriggerMetadata["serverURL"]; ok { - serverURL = val - } else if val, ok := config.AuthParams["serverURL"]; ok { - serverURL = val - } else { - return nil, fmt.Errorf("no server url given") - } - - if val, ok := config.TriggerMetadata["activationThresholdValue"]; ok { - value, err := strconv.ParseFloat(val, 64) - if err != nil { - return nil, fmt.Errorf("activationThresholdValue: failed to parse activationThresholdValue %w", err) - } - activationThresholdValue = value + meta := &influxDBMetadata{} + meta.triggerIndex = config.TriggerIndex + if err := config.TypedConfig(meta); err != nil { + return nil, fmt.Errorf("error parsing influxdb metadata: %w", err) } - if val, ok := config.TriggerMetadata["thresholdValue"]; ok { - value, err := strconv.ParseFloat(val, 64) - if err != nil { - return nil, fmt.Errorf("thresholdValue: failed to parse thresholdValue length %w", err) - } - thresholdValue = value - } else { - if config.AsMetricSource { - thresholdValue = 0 - } else { - return nil, fmt.Errorf("no threshold value given") - } - } - unsafeSsl = false - if val, ok := config.TriggerMetadata["unsafeSsl"]; ok { - parsedVal, err := strconv.ParseBool(val) - if err != nil { - return nil, fmt.Errorf("error parsing unsafeSsl: %w", err) - } - unsafeSsl = parsedVal + if meta.ThresholdValue == 0 && !config.AsMetricSource { + return nil, fmt.Errorf("no threshold value given") } - return &influxDBMetadata{ - authToken: authToken, - organizationName: organizationName, - query: query, - serverURL: serverURL, - thresholdValue: thresholdValue, - activationThresholdValue: activationThresholdValue, - unsafeSsl: unsafeSsl, - triggerIndex: config.TriggerIndex, - }, nil + return meta, nil } // Close closes the connection of the client to the server @@ -192,25 +109,25 @@ func queryInfluxDB(ctx context.Context, queryAPI api.QueryAPI, query string) (fl // GetMetricsAndActivity connects to influxdb via the client and returns a value based on the query func (s *influxDBScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { // Grab QueryAPI to make queries to influxdb instance - queryAPI := s.client.QueryAPI(s.metadata.organizationName) + queryAPI := s.client.QueryAPI(s.metadata.OrganizationName) - value, err := queryInfluxDB(ctx, queryAPI, s.metadata.query) + value, err := queryInfluxDB(ctx, queryAPI, s.metadata.Query) if err != nil { return []external_metrics.ExternalMetricValue{}, false, err } metric := GenerateMetricInMili(metricName, value) - return []external_metrics.ExternalMetricValue{metric}, value > s.metadata.activationThresholdValue, nil + return []external_metrics.ExternalMetricValue{metric}, value > s.metadata.ActivationThresholdValue, nil } // GetMetricSpecForScaling returns the metric spec for the Horizontal Pod Autoscaler func (s *influxDBScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { externalMetric := &v2.ExternalMetricSource{ Metric: v2.MetricIdentifier{ - Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, util.NormalizeString(fmt.Sprintf("influxdb-%s", s.metadata.organizationName))), + Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, util.NormalizeString(fmt.Sprintf("influxdb-%s", s.metadata.OrganizationName))), }, - Target: GetMetricTargetMili(s.metricType, s.metadata.thresholdValue), + Target: GetMetricTargetMili(s.metricType, s.metadata.ThresholdValue), } metricSpec := v2.MetricSpec{ External: externalMetric, Type: externalMetricType, diff --git a/pkg/scalers/influxdb_scaler_test.go b/pkg/scalers/influxdb_scaler_test.go index d238a222a54..6cf23680cd0 100644 --- a/pkg/scalers/influxdb_scaler_test.go +++ b/pkg/scalers/influxdb_scaler_test.go @@ -46,7 +46,7 @@ var testInfluxDBMetadata = []parseInfluxDBMetadataTestData{ {map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "unsafeSsl": "false"}, true, map[string]string{}}, // 9 authToken, organizationName, and serverURL are defined in authParams {map[string]string{"query": "from(bucket: hello)", "thresholdValue": "10", "unsafeSsl": "false"}, false, map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "authToken": "myToken"}}, - // 10 no sunsafeSsl value passed + // 10 no unsafeSsl value passed {map[string]string{"serverURL": "https://influxdata.com", "metricName": "influx_metric", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "authToken": "myToken"}, false, map[string]string{}}, // 11 wrong activationThreshold valuequeryInfluxDB {map[string]string{"serverURL": "https://influxdata.com", "metricName": "influx_metric", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "activationThresholdValue": "aa", "authToken": "myToken", "unsafeSsl": "false"}, true, map[string]string{}}, From 1a49f5f8220c3a0fa6fc6ebd685d2bda5537eea1 Mon Sep 17 00:00:00 2001 From: Rushen Wang <45029442+dovics@users.noreply.github.com> Date: Mon, 21 Oct 2024 16:21:35 +0800 Subject: [PATCH 2/2] Support username and password for etcd (#6199) * Refactor etcd scaler metadata Signed-off-by: wangrushen * feat: Add Support for etcd Username and Password Authentication Signed-off-by: wangrushen --------- Signed-off-by: wangrushen --- CHANGELOG.md | 1 + pkg/scalers/etcd_scaler.go | 152 +++++------- pkg/scalers/etcd_scaler_test.go | 50 ++-- .../etcd_cluster_auth_test.go | 232 ++++++++++++++++++ 4 files changed, 321 insertions(+), 114 deletions(-) create mode 100644 tests/scalers/etcd/etcd_cluster_auth/etcd_cluster_auth_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 8fc5d2a6b81..3bb99af3b85 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -72,6 +72,7 @@ Here is an overview of all new **experimental** features: ### Improvements - **AWS CloudWatch Scaler**: Add support for ignoreNullValues ([#5352](https://github.com/kedacore/keda/issues/5352)) +- **Etcd Scaler**: Add username and password support for etcd ([#6199](https://github.com/kedacore/keda/pull/6199)) - **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778)) - **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738)) - **Grafana dashboard**: Fix dashboard to handle wildcard scaledObject variables ([#6214](https://github.com/kedacore/keda/issues/6214)) diff --git a/pkg/scalers/etcd_scaler.go b/pkg/scalers/etcd_scaler.go index 83835bed93e..eed57a3a119 100644 --- a/pkg/scalers/etcd_scaler.go +++ b/pkg/scalers/etcd_scaler.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "strconv" - "strings" "time" "github.com/go-logr/logr" @@ -38,18 +37,49 @@ type etcdScaler struct { } type etcdMetadata struct { - endpoints []string - watchKey string - value float64 - activationValue float64 - watchProgressNotifyInterval int - triggerIndex int + triggerIndex int + + Endpoints []string `keda:"name=endpoints, order=triggerMetadata"` + WatchKey string `keda:"name=watchKey, order=triggerMetadata"` + Value float64 `keda:"name=value, order=triggerMetadata"` + ActivationValue float64 `keda:"name=activationValue, order=triggerMetadata, optional, default=0"` + WatchProgressNotifyInterval int `keda:"name=watchProgressNotifyInterval, order=triggerMetadata, optional, default=600"` + + Username string `keda:"name=username,order=authParams;resolvedEnv, optional"` + Password string `keda:"name=password,order=authParams;resolvedEnv, optional"` + // TLS - enableTLS bool - cert string - key string - keyPassword string - ca string + EnableTLS string `keda:"name=tls, order=authParams, optional, default=disable"` + Cert string `keda:"name=cert, order=authParams, optional"` + Key string `keda:"name=key, order=authParams, optional"` + KeyPassword string `keda:"name=keyPassword, order=authParams, optional"` + Ca string `keda:"name=ca, order=authParams, optional"` +} + +func (meta *etcdMetadata) Validate() error { + if meta.WatchProgressNotifyInterval <= 0 { + return errors.New("watchProgressNotifyInterval must be greater than 0") + } + + if meta.EnableTLS == etcdTLSEnable { + if meta.Cert == "" && meta.Key != "" { + return errors.New("cert must be provided with key") + } + if meta.Key == "" && meta.Cert != "" { + return errors.New("key must be provided with cert") + } + } else if meta.EnableTLS != etcdTLSDisable { + return fmt.Errorf("incorrect value for TLS given: %s", meta.EnableTLS) + } + + if meta.Password != "" && meta.Username == "" { + return errors.New("username must be provided with password") + } + if meta.Username != "" && meta.Password == "" { + return errors.New("password must be provided with username") + } + + return nil } // NewEtcdScaler creates a new etcdScaler @@ -76,75 +106,11 @@ func NewEtcdScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { }, nil } -func parseEtcdAuthParams(config *scalersconfig.ScalerConfig, meta *etcdMetadata) error { - meta.enableTLS = false - if val, ok := config.AuthParams["tls"]; ok { - val = strings.TrimSpace(val) - if val == etcdTLSEnable { - certGiven := config.AuthParams["cert"] != "" - keyGiven := config.AuthParams["key"] != "" - if certGiven && !keyGiven { - return errors.New("key must be provided with cert") - } - if keyGiven && !certGiven { - return errors.New("cert must be provided with key") - } - meta.ca = config.AuthParams["ca"] - meta.cert = config.AuthParams["cert"] - meta.key = config.AuthParams["key"] - if value, found := config.AuthParams["keyPassword"]; found { - meta.keyPassword = value - } else { - meta.keyPassword = "" - } - meta.enableTLS = true - } else if val != etcdTLSDisable { - return fmt.Errorf("err incorrect value for TLS given: %s", val) - } - } - - return nil -} - func parseEtcdMetadata(config *scalersconfig.ScalerConfig) (*etcdMetadata, error) { meta := &etcdMetadata{} - var err error - meta.endpoints = strings.Split(config.TriggerMetadata[endpoints], ",") - if len(meta.endpoints) == 0 || meta.endpoints[0] == "" { - return nil, fmt.Errorf("endpoints required") - } - - meta.watchKey = config.TriggerMetadata[watchKey] - if len(meta.watchKey) == 0 { - return nil, fmt.Errorf("watchKey required") - } - - value, err := strconv.ParseFloat(config.TriggerMetadata[value], 64) - if err != nil || value <= 0 { - return nil, fmt.Errorf("value must be a float greater than 0") - } - meta.value = value - - meta.activationValue = 0 - if val, ok := config.TriggerMetadata[activationValue]; ok { - activationValue, err := strconv.ParseFloat(val, 64) - if err != nil { - return nil, fmt.Errorf("activationValue must be a float") - } - meta.activationValue = activationValue - } - - meta.watchProgressNotifyInterval = defaultWatchProgressNotifyInterval - if val, ok := config.TriggerMetadata[watchProgressNotifyInterval]; ok { - interval, err := strconv.Atoi(val) - if err != nil || interval <= 0 { - return nil, fmt.Errorf("watchProgressNotifyInterval must be a int greater than 0") - } - meta.watchProgressNotifyInterval = interval - } - if err = parseEtcdAuthParams(config, meta); err != nil { - return meta, err + if err := config.TypedConfig(meta); err != nil { + return nil, fmt.Errorf("error parsing redis metadata: %w", err) } meta.triggerIndex = config.TriggerIndex @@ -154,17 +120,19 @@ func parseEtcdMetadata(config *scalersconfig.ScalerConfig) (*etcdMetadata, error func getEtcdClients(metadata *etcdMetadata) (*clientv3.Client, error) { var tlsConfig *tls.Config var err error - if metadata.enableTLS { - tlsConfig, err = kedautil.NewTLSConfigWithPassword(metadata.cert, metadata.key, metadata.keyPassword, metadata.ca, false) + if metadata.EnableTLS == etcdTLSEnable { + tlsConfig, err = kedautil.NewTLSConfigWithPassword(metadata.Cert, metadata.Key, metadata.KeyPassword, metadata.Ca, false) if err != nil { return nil, err } } cli, err := clientv3.New(clientv3.Config{ - Endpoints: metadata.endpoints, + Endpoints: metadata.Endpoints, DialTimeout: 5 * time.Second, TLS: tlsConfig, + Username: metadata.Username, + Password: metadata.Password, }) if err != nil { return nil, fmt.Errorf("error connecting to etcd server: %w", err) @@ -189,16 +157,16 @@ func (s *etcdScaler) GetMetricsAndActivity(ctx context.Context, metricName strin } metric := GenerateMetricInMili(metricName, v) - return append([]external_metrics.ExternalMetricValue{}, metric), v > s.metadata.activationValue, nil + return append([]external_metrics.ExternalMetricValue{}, metric), v > s.metadata.ActivationValue, nil } // GetMetricSpecForScaling returns the metric spec for the HPA. func (s *etcdScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { externalMetric := &v2.ExternalMetricSource{ Metric: v2.MetricIdentifier{ - Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("etcd-%s", s.metadata.watchKey))), + Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("etcd-%s", s.metadata.WatchKey))), }, - Target: GetMetricTargetMili(s.metricType, s.metadata.value), + Target: GetMetricTargetMili(s.metricType, s.metadata.Value), } metricSpec := v2.MetricSpec{External: externalMetric, Type: etcdMetricType} return []v2.MetricSpec{metricSpec} @@ -209,16 +177,16 @@ func (s *etcdScaler) Run(ctx context.Context, active chan<- bool) { // It's possible for the watch to get terminated anytime, we need to run this in a retry loop runWithWatch := func() { - s.logger.Info("run watch", "watchKey", s.metadata.watchKey, "endpoints", s.metadata.endpoints) + s.logger.Info("run watch", "watchKey", s.metadata.WatchKey, "endpoints", s.metadata.Endpoints) subCtx, cancel := context.WithCancel(ctx) subCtx = clientv3.WithRequireLeader(subCtx) - rch := s.client.Watch(subCtx, s.metadata.watchKey, clientv3.WithProgressNotify()) + rch := s.client.Watch(subCtx, s.metadata.WatchKey, clientv3.WithProgressNotify()) // rewatch to another etcd server when the network is isolated from the current etcd server. progress := make(chan bool) defer close(progress) go func() { - delayDuration := time.Duration(s.metadata.watchProgressNotifyInterval) * 2 * time.Second + delayDuration := time.Duration(s.metadata.WatchProgressNotifyInterval) * 2 * time.Second delay := time.NewTimer(delayDuration) defer delay.Stop() for { @@ -228,7 +196,7 @@ func (s *etcdScaler) Run(ctx context.Context, active chan<- bool) { case <-subCtx.Done(): return case <-delay.C: - s.logger.Info("no watch progress notification in the interval", "watchKey", s.metadata.watchKey, "endpoints", s.metadata.endpoints) + s.logger.Info("no watch progress notification in the interval", "watchKey", s.metadata.WatchKey, "endpoints", s.metadata.Endpoints) cancel() return } @@ -240,7 +208,7 @@ func (s *etcdScaler) Run(ctx context.Context, active chan<- bool) { // rewatch to another etcd server when there is an error form the current etcd server, such as 'no leader','required revision has been compacted' if wresp.Err() != nil { - s.logger.Error(wresp.Err(), "an error occurred in the watch process", "watchKey", s.metadata.watchKey, "endpoints", s.metadata.endpoints) + s.logger.Error(wresp.Err(), "an error occurred in the watch process", "watchKey", s.metadata.WatchKey, "endpoints", s.metadata.Endpoints) cancel() return } @@ -251,7 +219,7 @@ func (s *etcdScaler) Run(ctx context.Context, active chan<- bool) { s.logger.Error(err, "etcdValue invalid will be treated as 0") v = 0 } - active <- v > s.metadata.activationValue + active <- v > s.metadata.ActivationValue } } } @@ -288,12 +256,12 @@ func (s *etcdScaler) Run(ctx context.Context, active chan<- bool) { func (s *etcdScaler) getMetricValue(ctx context.Context) (float64, error) { ctx, cancel := context.WithTimeout(ctx, time.Second*5) defer cancel() - resp, err := s.client.Get(ctx, s.metadata.watchKey) + resp, err := s.client.Get(ctx, s.metadata.WatchKey) if err != nil { return 0, err } if resp.Kvs == nil { - return 0, fmt.Errorf("watchKey %s doesn't exist", s.metadata.watchKey) + return 0, fmt.Errorf("watchKey %s doesn't exist", s.metadata.WatchKey) } v, err := strconv.ParseFloat(string(resp.Kvs[0].Value), 64) if err != nil { diff --git a/pkg/scalers/etcd_scaler_test.go b/pkg/scalers/etcd_scaler_test.go index 9ecdb5b602a..70f16e2fec8 100644 --- a/pkg/scalers/etcd_scaler_test.go +++ b/pkg/scalers/etcd_scaler_test.go @@ -19,7 +19,7 @@ type parseEtcdMetadataTestData struct { type parseEtcdAuthParamsTestData struct { authParams map[string]string isError bool - enableTLS bool + enableTLS string } type etcdMetricIdentifier struct { @@ -56,19 +56,25 @@ var parseEtcdMetadataTestDataset = []parseEtcdMetadataTestData{ var parseEtcdAuthParamsTestDataset = []parseEtcdAuthParamsTestData{ // success, TLS only - {map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert", "key": "keey"}, false, true}, + {map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert", "key": "keey"}, false, etcdTLSEnable}, // success, TLS cert/key and assumed public CA - {map[string]string{"tls": "enable", "cert": "ceert", "key": "keey"}, false, true}, + {map[string]string{"tls": "enable", "cert": "ceert", "key": "keey"}, false, etcdTLSEnable}, // success, TLS cert/key + key password and assumed public CA - {map[string]string{"tls": "enable", "cert": "ceert", "key": "keey", "keyPassword": "keeyPassword"}, false, true}, + {map[string]string{"tls": "enable", "cert": "ceert", "key": "keey", "keyPassword": "keeyPassword"}, false, etcdTLSEnable}, // success, TLS CA only - {map[string]string{"tls": "enable", "ca": "caaa"}, false, true}, + {map[string]string{"tls": "enable", "ca": "caaa"}, false, etcdTLSEnable}, // failure, TLS missing cert - {map[string]string{"tls": "enable", "ca": "caaa", "key": "keey"}, true, false}, + {map[string]string{"tls": "enable", "ca": "caaa", "key": "keey"}, true, etcdTLSDisable}, // failure, TLS missing key - {map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert"}, true, false}, + {map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert"}, true, etcdTLSDisable}, // failure, TLS invalid - {map[string]string{"tls": "yes", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, false}, + {map[string]string{"tls": "yes", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, etcdTLSDisable}, + // success, username and password + {map[string]string{"username": "root", "password": "admin"}, false, etcdTLSDisable}, + // failure, missing password + {map[string]string{"username": "root"}, true, etcdTLSDisable}, + // failure, missing username + {map[string]string{"password": "admin"}, true, etcdTLSDisable}, } var etcdMetricIdentifiers = []etcdMetricIdentifier{ @@ -83,10 +89,10 @@ func TestParseEtcdMetadata(t *testing.T) { t.Error("Expected success but got error", err) } if testData.isError && err == nil { - t.Error("Expected error but got success") + t.Errorf("Expected error but got success %v", testData) } - if err == nil && !reflect.DeepEqual(meta.endpoints, testData.endpoints) { - t.Errorf("Expected %v but got %v\n", testData.endpoints, meta.endpoints) + if err == nil && !reflect.DeepEqual(meta.Endpoints, testData.endpoints) { + t.Errorf("Expected %v but got %v\n", testData.endpoints, meta.Endpoints) } } } @@ -101,21 +107,21 @@ func TestParseEtcdAuthParams(t *testing.T) { if testData.isError && err == nil { t.Error("Expected error but got success") } - if meta.enableTLS != testData.enableTLS { - t.Errorf("Expected enableTLS to be set to %v but got %v\n", testData.enableTLS, meta.enableTLS) + if meta != nil && meta.EnableTLS != testData.enableTLS { + t.Errorf("Expected enableTLS to be set to %v but got %v\n", testData.enableTLS, meta.EnableTLS) } - if meta.enableTLS { - if meta.ca != testData.authParams["ca"] { - t.Errorf("Expected ca to be set to %v but got %v\n", testData.authParams["ca"], meta.enableTLS) + if meta != nil && meta.EnableTLS == etcdTLSEnable { + if meta.Ca != testData.authParams["ca"] { + t.Errorf("Expected ca to be set to %v but got %v\n", testData.authParams["ca"], meta.EnableTLS) } - if meta.cert != testData.authParams["cert"] { - t.Errorf("Expected cert to be set to %v but got %v\n", testData.authParams["cert"], meta.cert) + if meta.Cert != testData.authParams["cert"] { + t.Errorf("Expected cert to be set to %v but got %v\n", testData.authParams["cert"], meta.Cert) } - if meta.key != testData.authParams["key"] { - t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["key"], meta.key) + if meta.Key != testData.authParams["key"] { + t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["key"], meta.Key) } - if meta.keyPassword != testData.authParams["keyPassword"] { - t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["keyPassword"], meta.key) + if meta.KeyPassword != testData.authParams["keyPassword"] { + t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["keyPassword"], meta.Key) } } } diff --git a/tests/scalers/etcd/etcd_cluster_auth/etcd_cluster_auth_test.go b/tests/scalers/etcd/etcd_cluster_auth/etcd_cluster_auth_test.go new file mode 100644 index 00000000000..9f5d7d59892 --- /dev/null +++ b/tests/scalers/etcd/etcd_cluster_auth/etcd_cluster_auth_test.go @@ -0,0 +1,232 @@ +//go:build e2e +// +build e2e + +package etcd_cluster_auth_test + +import ( + "encoding/base64" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" +) + +const ( + testName = "etcd-auth-test" +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + jobName = fmt.Sprintf("%s-job", testName) + secretName = fmt.Sprintf("%s-secret", testName) + triggerAuthName = fmt.Sprintf("%s-triggerauth", testName) + etcdClientName = fmt.Sprintf("%s-client", testName) + etcdUsername = "root" + etcdPassword = "admin" + etcdEndpoints = fmt.Sprintf("etcd-0.etcd-headless.%s:2379,etcd-1.%s:2379,etcd-2.etcd-headless.%s:2379", testNamespace, testNamespace, testNamespace) + minReplicaCount = 0 + maxReplicaCount = 2 +) + +type templateData struct { + TestNamespace string + DeploymentName string + JobName string + ScaledObjectName string + SecretName string + TriggerAuthName string + EtcdUsernameBase64 string + EtcdPasswordBase64 string + MinReplicaCount int + MaxReplicaCount int + EtcdName string + EtcdClientName string + EtcdEndpoints string +} + +const ( + secretTemplate = `apiVersion: v1 +kind: Secret +metadata: + name: {{.SecretName}} + namespace: {{.TestNamespace}} +data: + etcd-username: {{.EtcdUsernameBase64}} + etcd-password: {{.EtcdPasswordBase64}} +` + triggerAuthenticationTemplate = `apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: {{.TriggerAuthName}} + namespace: {{.TestNamespace}} +spec: + secretTargetRef: + - parameter: username + name: {{.SecretName}} + key: etcd-username + - parameter: password + name: {{.SecretName}} + key: etcd-password +` + deploymentTemplate = `apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} +spec: + selector: + matchLabels: + app: {{.DeploymentName}} + replicas: 0 + template: + metadata: + labels: + app: {{.DeploymentName}} + spec: + containers: + - name: my-app + image: nginxinc/nginx-unprivileged + imagePullPolicy: IfNotPresent + ports: + - containerPort: 80 +` + scaledObjectTemplate = `apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + pollingInterval: 15 + cooldownPeriod: 5 + minReplicaCount: {{.MinReplicaCount}} + maxReplicaCount: {{.MaxReplicaCount}} + advanced: + horizontalPodAutoscalerConfig: + name: keda-hpa-etcd-scaledobject + behavior: + scaleDown: + stabilizationWindowSeconds: 5 + triggers: + - type: etcd + metadata: + endpoints: {{.EtcdEndpoints}} + watchKey: var + value: '1.5' + activationValue: '5' + watchProgressNotifyInterval: '10' + authenticationRef: + name: {{.TriggerAuthName}} +` + etcdClientTemplate = ` +apiVersion: v1 +kind: Pod +metadata: + name: {{.EtcdClientName}} + namespace: {{.TestNamespace}} +spec: + containers: + - name: {{.EtcdClientName}} + image: gcr.io/etcd-development/etcd:v3.4.10 + command: + - sh + - -c + - "exec tail -f /dev/null"` +) + +func TestScaler(t *testing.T) { + // setup + t.Log("--- setting up ---") + // Create kubernetes resources + kc := GetKubernetesClient(t) + data, templates := getTemplateData() + t.Cleanup(func() { + KubectlDeleteWithTemplate(t, data, "etcdClientTemplate", etcdClientTemplate) + RemoveCluster(t, kc) + DeleteKubernetesResources(t, testNamespace, data, templates) + }) + CreateNamespace(t, kc, testNamespace) + + // Create Etcd Cluster + KubectlApplyWithTemplate(t, data, "etcdClientTemplate", etcdClientTemplate) + InstallCluster(t, kc) + setVarValue(t, 0) + + // Create kubernetes resources for testing + KubectlApplyMultipleWithTemplate(t, data, templates) + + testActivation(t, kc) + testScaleOut(t, kc) + testScaleIn(t, kc) +} + +func testActivation(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing activation ---") + setVarValue(t, 4) + + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, minReplicaCount, 60) +} + +func testScaleOut(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing scale out ---") + setVarValue(t, 9) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, 3), + "replica count should be %d after 3 minutes", maxReplicaCount) +} + +func testScaleIn(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing scale in ---") + setVarValue(t, 0) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, 3), + "replica count should be %d after 3 minutes", minReplicaCount) +} + +func getTemplateData() (templateData, []Template) { + return templateData{ + TestNamespace: testNamespace, + DeploymentName: deploymentName, + ScaledObjectName: scaledObjectName, + JobName: jobName, + SecretName: secretName, + TriggerAuthName: triggerAuthName, + EtcdUsernameBase64: base64.StdEncoding.EncodeToString([]byte(etcdUsername)), + EtcdPasswordBase64: base64.StdEncoding.EncodeToString([]byte(etcdPassword)), + EtcdName: testName, + EtcdClientName: etcdClientName, + EtcdEndpoints: etcdEndpoints, + MinReplicaCount: minReplicaCount, + MaxReplicaCount: maxReplicaCount, + }, []Template{ + {Name: "secretTemplate", Config: secretTemplate}, + {Name: "triggerAuthenticationTemplate", Config: triggerAuthenticationTemplate}, + {Name: "deploymentTemplate", Config: deploymentTemplate}, + {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, + } +} + +func setVarValue(t *testing.T, value int) { + _, _, err := ExecCommandOnSpecificPod(t, etcdClientName, testNamespace, fmt.Sprintf(`etcdctl --user="%s" --password="%s" put var %d --endpoints=%s`, + etcdUsername, etcdPassword, value, etcdEndpoints)) + assert.NoErrorf(t, err, "cannot execute command - %s", err) +} + +func InstallCluster(t *testing.T, kc *kubernetes.Clientset) { + _, err := ExecuteCommand(fmt.Sprintf(`helm upgrade --install --set persistence.enabled=false --set resourcesPreset=none --set auth.rbac.rootPassword=%s --set auth.rbac.allowNoneAuthentication=false --set replicaCount=3 --namespace %s --wait etcd oci://registry-1.docker.io/bitnamicharts/etcd`, + etcdPassword, testNamespace)) + require.NoErrorf(t, err, "cannot execute command - %s", err) +} + +func RemoveCluster(t *testing.T, kc *kubernetes.Clientset) { + _, err := ExecuteCommand(fmt.Sprintf(`helm delete --namespace %s --wait etcd`, + testNamespace)) + require.NoErrorf(t, err, "cannot execute command - %s", err) +}