From e25aca92ae0a7e56e47c19f3ae8873ec52513dec Mon Sep 17 00:00:00 2001 From: Dao Thanh Tung Date: Tue, 4 Jun 2024 10:50:54 +0100 Subject: [PATCH] Refactor Artemis scaler config (#5836) Signed-off-by: dttung2905 --- pkg/scalers/artemis_scaler.go | 189 ++++++++++------------------- pkg/scalers/artemis_scaler_test.go | 10 +- 2 files changed, 69 insertions(+), 130 deletions(-) diff --git a/pkg/scalers/artemis_scaler.go b/pkg/scalers/artemis_scaler.go index 05e030ccb02..c845beae037 100644 --- a/pkg/scalers/artemis_scaler.go +++ b/pkg/scalers/artemis_scaler.go @@ -7,7 +7,6 @@ import ( "fmt" "net/http" "net/url" - "strconv" "strings" "github.com/go-logr/logr" @@ -27,17 +26,17 @@ type artemisScaler struct { //revive:disable:var-naming breaking change on restApiTemplate, wouldn't bring any benefit to users type artemisMetadata struct { - managementEndpoint string - queueName string - brokerName string - brokerAddress string - username string - password string - restAPITemplate string - queueLength int64 - activationQueueLength int64 - corsHeader string - triggerIndex int + TriggerIndex int + ManagementEndpoint string `keda:"name=managementEndpoint, order=triggerMetadata, optional"` + QueueName string `keda:"name=queueName, order=triggerMetadata, optional"` + BrokerName string `keda:"name=brokerName, order=triggerMetadata, optional"` + BrokerAddress string `keda:"name=brokerAddress, order=triggerMetadata, optional"` + Username string `keda:"name=username, order=authParams;triggerMetadata;resolvedEnv"` + Password string `keda:"name=password, order=authParams;triggerMetadata;resolvedEnv"` + RestAPITemplate string `keda:"name=restApiTemplate, order=triggerMetadata, optional"` + QueueLength int64 `keda:"name=queueLength, order=triggerMetadata, optional, default=10"` + ActivationQueueLength int64 `keda:"name=activationQueueLength, order=triggerMetadata, optional, default=10"` + CorsHeader string `keda:"name=corsHeader, order=triggerMetadata, optional"` } //revive:enable:var-naming @@ -49,13 +48,38 @@ type artemisMonitoring struct { } const ( - artemisMetricType = "External" - defaultArtemisQueueLength = 10 - defaultArtemisActivationQueueLength = 0 - defaultRestAPITemplate = "http://<>/console/jolokia/read/org.apache.activemq.artemis:broker=\"<>\",component=addresses,address=\"<>\",subcomponent=queues,routing-type=\"anycast\",queue=\"<>\"/MessageCount" - defaultCorsHeader = "http://%s" + artemisMetricType = "External" + defaultRestAPITemplate = "http://<>/console/jolokia/read/org.apache.activemq.artemis:broker=\"<>\",component=addresses,address=\"<>\",subcomponent=queues,routing-type=\"anycast\",queue=\"<>\"/MessageCount" + defaultCorsHeader = "http://%s" ) +func (a *artemisMetadata) Validate() error { + if a.RestAPITemplate != "" { + var err error + if *a, err = getAPIParameters(*a); err != nil { + return fmt.Errorf("can't parse restApiTemplate : %s ", err) + } + } else { + a.RestAPITemplate = defaultRestAPITemplate + if a.ManagementEndpoint == "" { + return errors.New("no management endpoint given") + } + if a.QueueName == "" { + return errors.New("no queue name given") + } + if a.BrokerName == "" { + return errors.New("no broker name given") + } + if a.BrokerAddress == "" { + return errors.New("no broker address given") + } + } + if a.CorsHeader == "" { + a.CorsHeader = fmt.Sprintf(defaultCorsHeader, a.ManagementEndpoint) + } + return nil +} + // NewArtemisQueueScaler creates a new artemis queue Scaler func NewArtemisQueueScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { // do we need to guarantee this timeout for a specific @@ -82,108 +106,23 @@ func NewArtemisQueueScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { } func parseArtemisMetadata(config *scalersconfig.ScalerConfig) (*artemisMetadata, error) { - meta := artemisMetadata{} - - meta.queueLength = defaultArtemisQueueLength - meta.activationQueueLength = defaultArtemisActivationQueueLength - - if val, ok := config.TriggerMetadata["restApiTemplate"]; ok && val != "" { - meta.restAPITemplate = config.TriggerMetadata["restApiTemplate"] - var err error - if meta, err = getAPIParameters(meta); err != nil { - return nil, fmt.Errorf("can't parse restApiTemplate : %s ", err) - } - } else { - meta.restAPITemplate = defaultRestAPITemplate - if config.TriggerMetadata["managementEndpoint"] == "" { - return nil, errors.New("no management endpoint given") - } - meta.managementEndpoint = config.TriggerMetadata["managementEndpoint"] - - if config.TriggerMetadata["queueName"] == "" { - return nil, errors.New("no queue name given") - } - meta.queueName = config.TriggerMetadata["queueName"] - - if config.TriggerMetadata["brokerName"] == "" { - return nil, errors.New("no broker name given") - } - meta.brokerName = config.TriggerMetadata["brokerName"] - - if config.TriggerMetadata["brokerAddress"] == "" { - return nil, errors.New("no broker address given") - } - meta.brokerAddress = config.TriggerMetadata["brokerAddress"] + meta := &artemisMetadata{} + if err := config.TypedConfig(meta); err != nil { + return nil, fmt.Errorf("error parsing prometheus metadata: %w", err) } - if val, ok := config.TriggerMetadata["corsHeader"]; ok && val != "" { - meta.corsHeader = config.TriggerMetadata["corsHeader"] - } else { - meta.corsHeader = fmt.Sprintf(defaultCorsHeader, meta.managementEndpoint) - } - - if val, ok := config.TriggerMetadata["queueLength"]; ok { - queueLength, err := strconv.ParseInt(val, 10, 64) - if err != nil { - return nil, fmt.Errorf("can't parse queueLength: %w", err) - } - - meta.queueLength = queueLength - } - - if val, ok := config.TriggerMetadata["activationQueueLength"]; ok { - activationQueueLength, err := strconv.ParseInt(val, 10, 64) - if err != nil { - return nil, fmt.Errorf("can't parse activationQueueLength: %w", err) - } - - meta.activationQueueLength = activationQueueLength - } - - if val, ok := config.AuthParams["username"]; ok && val != "" { - meta.username = val - } else if val, ok := config.TriggerMetadata["username"]; ok && val != "" { - username := val - - if val, ok := config.ResolvedEnv[username]; ok && val != "" { - meta.username = val - } else { - meta.username = username - } - } - - if meta.username == "" { - return nil, fmt.Errorf("username cannot be empty") - } - - if val, ok := config.AuthParams["password"]; ok && val != "" { - meta.password = val - } else if val, ok := config.TriggerMetadata["password"]; ok && val != "" { - password := val - - if val, ok := config.ResolvedEnv[password]; ok && val != "" { - meta.password = val - } else { - meta.password = password - } - } - - if meta.password == "" { - return nil, fmt.Errorf("password cannot be empty") - } - - meta.triggerIndex = config.TriggerIndex + meta.TriggerIndex = config.TriggerIndex - return &meta, nil + return meta, nil } // getAPIParameters parse restAPITemplate to provide managementEndpoint , brokerName, brokerAddress, queueName func getAPIParameters(meta artemisMetadata) (artemisMetadata, error) { - u, err := url.ParseRequestURI(meta.restAPITemplate) + u, err := url.ParseRequestURI(meta.RestAPITemplate) if err != nil { return meta, fmt.Errorf("unable to parse the artemis restAPITemplate: %w", err) } - meta.managementEndpoint = u.Host + meta.ManagementEndpoint = u.Host splitURL := strings.Split(strings.Split(u.RawPath, ":")[1], "/")[0] // This returns : broker="<>",component=addresses,address="<>",subcomponent=queues,routing-type="anycast",queue="<>" replacer := strings.NewReplacer(",", "&", "\"\"", "") v, err := url.ParseQuery(replacer.Replace(splitURL)) // This returns a map with key: string types and element type [] string. : map[address:["<>"] broker:["<>"] component:[addresses] queue:["<>"] routing-type:["anycast"] subcomponent:[queues]] @@ -194,28 +133,28 @@ func getAPIParameters(meta artemisMetadata) (artemisMetadata, error) { if len(v["address"][0]) == 0 { return meta, errors.New("no brokerAddress given") } - meta.brokerAddress = v["address"][0] + meta.BrokerAddress = v["address"][0] if len(v["queue"][0]) == 0 { return meta, errors.New("no queueName is given") } - meta.queueName = v["queue"][0] + meta.QueueName = v["queue"][0] if len(v["broker"][0]) == 0 { - return meta, fmt.Errorf("no brokerName given: %s", meta.restAPITemplate) + return meta, fmt.Errorf("no brokerName given: %s", meta.RestAPITemplate) } - meta.brokerName = v["broker"][0] + meta.BrokerName = v["broker"][0] return meta, nil } func (s *artemisScaler) getMonitoringEndpoint() string { - replacer := strings.NewReplacer("<>", s.metadata.managementEndpoint, - "<>", s.metadata.queueName, - "<>", s.metadata.brokerName, - "<>", s.metadata.brokerAddress) + replacer := strings.NewReplacer("<>", s.metadata.ManagementEndpoint, + "<>", s.metadata.QueueName, + "<>", s.metadata.BrokerName, + "<>", s.metadata.BrokerAddress) - monitoringEndpoint := replacer.Replace(s.metadata.restAPITemplate) + monitoringEndpoint := replacer.Replace(s.metadata.RestAPITemplate) return monitoringEndpoint } @@ -231,8 +170,8 @@ func (s *artemisScaler) getQueueMessageCount(ctx context.Context) (int64, error) if err != nil { return -1, err } - req.SetBasicAuth(s.metadata.username, s.metadata.password) - req.Header.Set("Origin", s.metadata.corsHeader) + req.SetBasicAuth(s.metadata.Username, s.metadata.Password) + req.Header.Set("Origin", s.metadata.CorsHeader) resp, err := client.Do(req) if err != nil { @@ -250,7 +189,7 @@ func (s *artemisScaler) getQueueMessageCount(ctx context.Context) (int64, error) return -1, fmt.Errorf("artemis management endpoint response error code : %d %d", resp.StatusCode, monitoringInfo.Status) } - s.logger.V(1).Info(fmt.Sprintf("Artemis scaler: Providing metrics based on current queue length %d queue length limit %d", messageCount, s.metadata.queueLength)) + s.logger.V(1).Info(fmt.Sprintf("Artemis scaler: Providing metrics based on current queue length %d queue length limit %d", messageCount, s.metadata.QueueLength)) return messageCount, nil } @@ -258,9 +197,9 @@ func (s *artemisScaler) getQueueMessageCount(ctx context.Context) (int64, error) func (s *artemisScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { externalMetric := &v2.ExternalMetricSource{ Metric: v2.MetricIdentifier{ - Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("artemis-%s", s.metadata.queueName))), + Name: GenerateMetricNameWithIndex(s.metadata.TriggerIndex, kedautil.NormalizeString(fmt.Sprintf("artemis-%s", s.metadata.QueueName))), }, - Target: GetMetricTarget(s.metricType, s.metadata.queueLength), + Target: GetMetricTarget(s.metricType, s.metadata.QueueLength), } metricSpec := v2.MetricSpec{External: externalMetric, Type: artemisMetricType} return []v2.MetricSpec{metricSpec} @@ -271,13 +210,13 @@ func (s *artemisScaler) GetMetricsAndActivity(ctx context.Context, metricName st messages, err := s.getQueueMessageCount(ctx) if err != nil { - s.logger.Error(err, "Unable to access the artemis management endpoint", "managementEndpoint", s.metadata.managementEndpoint) + s.logger.Error(err, "Unable to access the artemis management endpoint", "managementEndpoint", s.metadata.ManagementEndpoint) return []external_metrics.ExternalMetricValue{}, false, err } metric := GenerateMetricInMili(metricName, float64(messages)) - return []external_metrics.ExternalMetricValue{metric}, messages > s.metadata.activationQueueLength, nil + return []external_metrics.ExternalMetricValue{metric}, messages > s.metadata.ActivationQueueLength, nil } func (s *artemisScaler) Close(context.Context) error { diff --git a/pkg/scalers/artemis_scaler_test.go b/pkg/scalers/artemis_scaler_test.go index f7dc7ab43f9..4140d8d3f63 100644 --- a/pkg/scalers/artemis_scaler_test.go +++ b/pkg/scalers/artemis_scaler_test.go @@ -70,7 +70,7 @@ var artemisMetricIdentifiers = []artemisMetricIdentifier{ var testArtemisMetadataWithEmptyAuthParams = []parseArtemisMetadataTestData{ // nothing passed {map[string]string{}, true}, - // Missing missing managementEndpoint should fail + // Missing managementEndpoint should fail {map[string]string{"managementEndpoint": "", "queueName": "queue1", "brokerName": "broker-activemq", "brokerAddress": "address1"}, true}, // Missing queue name, should fail {map[string]string{"managementEndpoint": "localhost:8161", "queueName": "", "brokerName": "broker-activemq", "brokerAddress": "address1"}, true}, @@ -93,8 +93,8 @@ func TestArtemisDefaultCorsHeader(t *testing.T) { if err != nil { t.Error("Expected success but got error", err) } - if !(meta.corsHeader == "http://localhost:8161") { - t.Errorf("Expected http://localhost:8161 but got %s", meta.corsHeader) + if !(meta.CorsHeader == "http://localhost:8161") { + t.Errorf("Expected http://localhost:8161 but got %s", meta.CorsHeader) } } @@ -105,8 +105,8 @@ func TestArtemisCorsHeader(t *testing.T) { if err != nil { t.Error("Expected success but got error", err) } - if !(meta.corsHeader == "test") { - t.Errorf("Expected test but got %s", meta.corsHeader) + if !(meta.CorsHeader == "test") { + t.Errorf("Expected test but got %s", meta.CorsHeader) } }