Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor InfluxDB scaler config #6240

Merged
merged 2 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 24 additions & 107 deletions pkg/scalers/influxdb_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package scalers
import (
"context"
"fmt"
"strconv"

"github.com/go-logr/logr"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
Expand All @@ -23,14 +22,15 @@ type influxDBScaler struct {
}

type influxDBMetadata struct {
authToken string
organizationName string
query string
serverURL string
unsafeSsl bool
thresholdValue float64
activationThresholdValue float64
triggerIndex int
AuthToken string `keda:"name=authToken, order=triggerMetadata;resolvedEnv;authParams"`
OrganizationName string `keda:"name=organizationName, order=triggerMetadata;resolvedEnv;authParams"`
Query string `keda:"name=query, order=triggerMetadata"`
ServerURL string `keda:"name=serverURL, order=triggerMetadata;authParams"`
UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, optional"`
ThresholdValue float64 `keda:"name=thresholdValue, order=triggerMetadata, optional"`
ActivationThresholdValue float64 `keda:"name=activationThresholdValue, order=triggerMetadata, optional"`

triggerIndex int
}

// NewInfluxDBScaler creates a new influx db scaler
Expand All @@ -49,9 +49,9 @@ func NewInfluxDBScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {

logger.Info("starting up influxdb client")
client := influxdb2.NewClientWithOptions(
meta.serverURL,
meta.authToken,
influxdb2.DefaultOptions().SetTLSConfig(util.CreateTLSClientConfig(meta.unsafeSsl)))
meta.ServerURL,
meta.AuthToken,
influxdb2.DefaultOptions().SetTLSConfig(util.CreateTLSClientConfig(meta.UnsafeSsl)))

return &influxDBScaler{
client: client,
Expand All @@ -63,100 +63,17 @@ func NewInfluxDBScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {

// parseInfluxDBMetadata parses the metadata passed in from the ScaledObject config
func parseInfluxDBMetadata(config *scalersconfig.ScalerConfig) (*influxDBMetadata, error) {
var authToken string
var organizationName string
var query string
var serverURL string
var unsafeSsl bool
var thresholdValue float64
var activationThresholdValue float64

val, ok := config.TriggerMetadata["authToken"]
switch {
case ok && val != "":
authToken = val
case config.TriggerMetadata["authTokenFromEnv"] != "":
if val, ok := config.ResolvedEnv[config.TriggerMetadata["authTokenFromEnv"]]; ok {
authToken = val
} else {
return nil, fmt.Errorf("no auth token given")
}
case config.AuthParams["authToken"] != "":
authToken = config.AuthParams["authToken"]
default:
return nil, fmt.Errorf("no auth token given")
}

val, ok = config.TriggerMetadata["organizationName"]
switch {
case ok && val != "":
organizationName = val
case config.TriggerMetadata["organizationNameFromEnv"] != "":
if val, ok := config.ResolvedEnv[config.TriggerMetadata["organizationNameFromEnv"]]; ok {
organizationName = val
} else {
return nil, fmt.Errorf("no organization name given")
}
case config.AuthParams["organizationName"] != "":
organizationName = config.AuthParams["organizationName"]
default:
return nil, fmt.Errorf("no organization name given")
}

if val, ok := config.TriggerMetadata["query"]; ok {
query = val
} else {
return nil, fmt.Errorf("no query provided")
}

if val, ok := config.TriggerMetadata["serverURL"]; ok {
serverURL = val
} else if val, ok := config.AuthParams["serverURL"]; ok {
serverURL = val
} else {
return nil, fmt.Errorf("no server url given")
}

if val, ok := config.TriggerMetadata["activationThresholdValue"]; ok {
value, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("activationThresholdValue: failed to parse activationThresholdValue %w", err)
}
activationThresholdValue = value
meta := &influxDBMetadata{}
meta.triggerIndex = config.TriggerIndex
if err := config.TypedConfig(meta); err != nil {
return nil, fmt.Errorf("error parsing influxdb metadata: %w", err)
}

if val, ok := config.TriggerMetadata["thresholdValue"]; ok {
value, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("thresholdValue: failed to parse thresholdValue length %w", err)
}
thresholdValue = value
} else {
if config.AsMetricSource {
thresholdValue = 0
} else {
return nil, fmt.Errorf("no threshold value given")
}
}
unsafeSsl = false
if val, ok := config.TriggerMetadata["unsafeSsl"]; ok {
parsedVal, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("error parsing unsafeSsl: %w", err)
}
unsafeSsl = parsedVal
if meta.ThresholdValue == 0 && !config.AsMetricSource {
return nil, fmt.Errorf("no threshold value given")
}

return &influxDBMetadata{
authToken: authToken,
organizationName: organizationName,
query: query,
serverURL: serverURL,
thresholdValue: thresholdValue,
activationThresholdValue: activationThresholdValue,
unsafeSsl: unsafeSsl,
triggerIndex: config.TriggerIndex,
}, nil
return meta, nil
}

// Close closes the connection of the client to the server
Expand Down Expand Up @@ -192,25 +109,25 @@ func queryInfluxDB(ctx context.Context, queryAPI api.QueryAPI, query string) (fl
// GetMetricsAndActivity connects to influxdb via the client and returns a value based on the query
func (s *influxDBScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
// Grab QueryAPI to make queries to influxdb instance
queryAPI := s.client.QueryAPI(s.metadata.organizationName)
queryAPI := s.client.QueryAPI(s.metadata.OrganizationName)

value, err := queryInfluxDB(ctx, queryAPI, s.metadata.query)
value, err := queryInfluxDB(ctx, queryAPI, s.metadata.Query)
if err != nil {
return []external_metrics.ExternalMetricValue{}, false, err
}

metric := GenerateMetricInMili(metricName, value)

return []external_metrics.ExternalMetricValue{metric}, value > s.metadata.activationThresholdValue, nil
return []external_metrics.ExternalMetricValue{metric}, value > s.metadata.ActivationThresholdValue, nil
}

// GetMetricSpecForScaling returns the metric spec for the Horizontal Pod Autoscaler
func (s *influxDBScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, util.NormalizeString(fmt.Sprintf("influxdb-%s", s.metadata.organizationName))),
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, util.NormalizeString(fmt.Sprintf("influxdb-%s", s.metadata.OrganizationName))),
},
Target: GetMetricTargetMili(s.metricType, s.metadata.thresholdValue),
Target: GetMetricTargetMili(s.metricType, s.metadata.ThresholdValue),
}
metricSpec := v2.MetricSpec{
External: externalMetric, Type: externalMetricType,
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/influxdb_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var testInfluxDBMetadata = []parseInfluxDBMetadataTestData{
{map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "unsafeSsl": "false"}, true, map[string]string{}},
// 9 authToken, organizationName, and serverURL are defined in authParams
{map[string]string{"query": "from(bucket: hello)", "thresholdValue": "10", "unsafeSsl": "false"}, false, map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "authToken": "myToken"}},
// 10 no sunsafeSsl value passed
// 10 no unsafeSsl value passed
{map[string]string{"serverURL": "https://influxdata.com", "metricName": "influx_metric", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "authToken": "myToken"}, false, map[string]string{}},
// 11 wrong activationThreshold valuequeryInfluxDB
{map[string]string{"serverURL": "https://influxdata.com", "metricName": "influx_metric", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "activationThresholdValue": "aa", "authToken": "myToken", "unsafeSsl": "false"}, true, map[string]string{}},
Expand Down
Loading