Skip to content

Commit

Permalink
Refactor InfluxDB scaler config
Browse files Browse the repository at this point in the history
Signed-off-by: wangrushen <[email protected]>
  • Loading branch information
dovics committed Oct 15, 2024
1 parent 69a9cb9 commit 10c4389
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 108 deletions.
131 changes: 24 additions & 107 deletions pkg/scalers/influxdb_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package scalers
import (
"context"
"fmt"
"strconv"

"github.com/go-logr/logr"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
Expand All @@ -23,14 +22,15 @@ type influxDBScaler struct {
}

type influxDBMetadata struct {
authToken string
organizationName string
query string
serverURL string
unsafeSsl bool
thresholdValue float64
activationThresholdValue float64
triggerIndex int
AuthToken string `keda:"name=authToken, order=triggerMetadata;resolvedEnv;authParams"`
OrganizationName string `keda:"name=organizationName, order=triggerMetadata;resolvedEnv;authParams"`
Query string `keda:"name=query, order=triggerMetadata"`
ServerURL string `keda:"name=serverURL, order=triggerMetadata;authParams"`
UnsafeSsl bool `keda:"name=UnsafeSsl, order=triggerMetadata, optional"`
ThresholdValue float64 `keda:"name=thresholdValue, order=triggerMetadata, optional"`
ActivationThresholdValue float64 `keda:"name=activationThresholdValue, order=triggerMetadata, optional"`

triggerIndex int
}

// NewInfluxDBScaler creates a new influx db scaler
Expand All @@ -49,9 +49,9 @@ func NewInfluxDBScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {

logger.Info("starting up influxdb client")
client := influxdb2.NewClientWithOptions(
meta.serverURL,
meta.authToken,
influxdb2.DefaultOptions().SetTLSConfig(util.CreateTLSClientConfig(meta.unsafeSsl)))
meta.ServerURL,
meta.AuthToken,
influxdb2.DefaultOptions().SetTLSConfig(util.CreateTLSClientConfig(meta.UnsafeSsl)))

return &influxDBScaler{
client: client,
Expand All @@ -63,100 +63,17 @@ func NewInfluxDBScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {

// parseInfluxDBMetadata parses the metadata passed in from the ScaledObject config
func parseInfluxDBMetadata(config *scalersconfig.ScalerConfig) (*influxDBMetadata, error) {
var authToken string
var organizationName string
var query string
var serverURL string
var unsafeSsl bool
var thresholdValue float64
var activationThresholdValue float64

val, ok := config.TriggerMetadata["authToken"]
switch {
case ok && val != "":
authToken = val
case config.TriggerMetadata["authTokenFromEnv"] != "":
if val, ok := config.ResolvedEnv[config.TriggerMetadata["authTokenFromEnv"]]; ok {
authToken = val
} else {
return nil, fmt.Errorf("no auth token given")
}
case config.AuthParams["authToken"] != "":
authToken = config.AuthParams["authToken"]
default:
return nil, fmt.Errorf("no auth token given")
}

val, ok = config.TriggerMetadata["organizationName"]
switch {
case ok && val != "":
organizationName = val
case config.TriggerMetadata["organizationNameFromEnv"] != "":
if val, ok := config.ResolvedEnv[config.TriggerMetadata["organizationNameFromEnv"]]; ok {
organizationName = val
} else {
return nil, fmt.Errorf("no organization name given")
}
case config.AuthParams["organizationName"] != "":
organizationName = config.AuthParams["organizationName"]
default:
return nil, fmt.Errorf("no organization name given")
}

if val, ok := config.TriggerMetadata["query"]; ok {
query = val
} else {
return nil, fmt.Errorf("no query provided")
}

if val, ok := config.TriggerMetadata["serverURL"]; ok {
serverURL = val
} else if val, ok := config.AuthParams["serverURL"]; ok {
serverURL = val
} else {
return nil, fmt.Errorf("no server url given")
}

if val, ok := config.TriggerMetadata["activationThresholdValue"]; ok {
value, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("activationThresholdValue: failed to parse activationThresholdValue %w", err)
}
activationThresholdValue = value
meta := &influxDBMetadata{}
meta.triggerIndex = config.TriggerIndex
if err := config.TypedConfig(meta); err != nil {
return nil, fmt.Errorf("error parsing influxdb metadata: %w", err)
}

if val, ok := config.TriggerMetadata["thresholdValue"]; ok {
value, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("thresholdValue: failed to parse thresholdValue length %w", err)
}
thresholdValue = value
} else {
if config.AsMetricSource {
thresholdValue = 0
} else {
return nil, fmt.Errorf("no threshold value given")
}
}
unsafeSsl = false
if val, ok := config.TriggerMetadata["unsafeSsl"]; ok {
parsedVal, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("error parsing unsafeSsl: %w", err)
}
unsafeSsl = parsedVal
if meta.ThresholdValue == 0 && !config.AsMetricSource {
return nil, fmt.Errorf("no threshold value given")
}

return &influxDBMetadata{
authToken: authToken,
organizationName: organizationName,
query: query,
serverURL: serverURL,
thresholdValue: thresholdValue,
activationThresholdValue: activationThresholdValue,
unsafeSsl: unsafeSsl,
triggerIndex: config.TriggerIndex,
}, nil
return meta, nil
}

// Close closes the connection of the client to the server
Expand Down Expand Up @@ -192,25 +109,25 @@ func queryInfluxDB(ctx context.Context, queryAPI api.QueryAPI, query string) (fl
// GetMetricsAndActivity connects to influxdb via the client and returns a value based on the query
func (s *influxDBScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
// Grab QueryAPI to make queries to influxdb instance
queryAPI := s.client.QueryAPI(s.metadata.organizationName)
queryAPI := s.client.QueryAPI(s.metadata.OrganizationName)

value, err := queryInfluxDB(ctx, queryAPI, s.metadata.query)
value, err := queryInfluxDB(ctx, queryAPI, s.metadata.Query)
if err != nil {
return []external_metrics.ExternalMetricValue{}, false, err
}

metric := GenerateMetricInMili(metricName, value)

return []external_metrics.ExternalMetricValue{metric}, value > s.metadata.activationThresholdValue, nil
return []external_metrics.ExternalMetricValue{metric}, value > s.metadata.ActivationThresholdValue, nil
}

// GetMetricSpecForScaling returns the metric spec for the Horizontal Pod Autoscaler
func (s *influxDBScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, util.NormalizeString(fmt.Sprintf("influxdb-%s", s.metadata.organizationName))),
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, util.NormalizeString(fmt.Sprintf("influxdb-%s", s.metadata.OrganizationName))),
},
Target: GetMetricTargetMili(s.metricType, s.metadata.thresholdValue),
Target: GetMetricTargetMili(s.metricType, s.metadata.ThresholdValue),
}
metricSpec := v2.MetricSpec{
External: externalMetric, Type: externalMetricType,
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/influxdb_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var testInfluxDBMetadata = []parseInfluxDBMetadataTestData{
{map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "unsafeSsl": "false"}, true, map[string]string{}},
// 9 authToken, organizationName, and serverURL are defined in authParams
{map[string]string{"query": "from(bucket: hello)", "thresholdValue": "10", "unsafeSsl": "false"}, false, map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "authToken": "myToken"}},
// 10 no sunsafeSsl value passed
// 10 no unsafeSsl value passed
{map[string]string{"serverURL": "https://influxdata.com", "metricName": "influx_metric", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "authToken": "myToken"}, false, map[string]string{}},
// 11 wrong activationThreshold valuequeryInfluxDB
{map[string]string{"serverURL": "https://influxdata.com", "metricName": "influx_metric", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "activationThresholdValue": "aa", "authToken": "myToken", "unsafeSsl": "false"}, true, map[string]string{}},
Expand Down

0 comments on commit 10c4389

Please sign in to comment.