Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Loki scaler #6264

Merged
merged 1 commit into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 72 additions & 141 deletions pkg/scalers/loki_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,252 +19,184 @@ import (
)

const (
lokiServerAddress = "serverAddress"
lokiQuery = "query"
lokiThreshold = "threshold"
lokiActivationThreshold = "activationThreshold"
lokiNamespace = "namespace"
tenantName = "tenantName"
defaultIgnoreNullValues = true
tenantNameHeaderKey = "X-Scope-OrgID"
lokiIgnoreNullValues = "ignoreNullValues"
)

var (
lokiDefaultIgnoreNullValues = true
)

type lokiScaler struct {
metricType v2.MetricTargetType
metadata *lokiMetadata
metadata lokiMetadata
httpClient *http.Client
logger logr.Logger
}

type lokiMetadata struct {
serverAddress string
query string
threshold float64
activationThreshold float64
lokiAuth *authentication.AuthMeta
triggerIndex int
tenantName string
ignoreNullValues bool
unsafeSsl bool
ServerAddress string `keda:"name=serverAddress,order=triggerMetadata"`
Query string `keda:"name=query,order=triggerMetadata"`
Threshold float64 `keda:"name=threshold,order=triggerMetadata"`
ActivationThreshold float64 `keda:"name=activationThreshold,order=triggerMetadata,default=0"`
TenantName string `keda:"name=tenantName,order=triggerMetadata,optional"`
IgnoreNullValues bool `keda:"name=ignoreNullValues,order=triggerMetadata,default=true"`
UnsafeSsl bool `keda:"name=unsafeSsl,order=triggerMetadata,default=false"`
TriggerIndex int
Auth *authentication.AuthMeta
}

type lokiQueryResult struct {
Status string `json:"status"`
Data struct {
ResultType string `json:"resultType"`
Result []struct {
Metric struct {
} `json:"metric"`
Value []interface{} `json:"value"`
Metric struct{} `json:"metric"`
Value []interface{} `json:"value"`
} `json:"result"`
} `json:"data"`
}

// NewLokiScaler returns a new lokiScaler
func NewLokiScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
return nil, fmt.Errorf("error getting scaler metric type: %w", err)
}

logger := InitializeLogger(config, "loki_scaler")

meta, err := parseLokiMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing loki metadata: %w", err)
}

httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, meta.unsafeSsl)
httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, meta.UnsafeSsl)

return &lokiScaler{
metricType: metricType,
metadata: meta,
httpClient: httpClient,
logger: logger,
logger: InitializeLogger(config, "loki_scaler"),
}, nil
}

func parseLokiMetadata(config *scalersconfig.ScalerConfig) (meta *lokiMetadata, err error) {
meta = &lokiMetadata{}

if val, ok := config.TriggerMetadata[lokiServerAddress]; ok && val != "" {
meta.serverAddress = val
} else {
return nil, fmt.Errorf("no %s given", lokiServerAddress)
}

if val, ok := config.TriggerMetadata[lokiQuery]; ok && val != "" {
meta.query = val
} else {
return nil, fmt.Errorf("no %s given", lokiQuery)
}

if val, ok := config.TriggerMetadata[lokiThreshold]; ok && val != "" {
t, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("error parsing %s: %w", lokiThreshold, err)
}

meta.threshold = t
} else {
if config.AsMetricSource {
meta.threshold = 0
} else {
return nil, fmt.Errorf("no %s given", lokiThreshold)
}
}

meta.activationThreshold = 0
if val, ok := config.TriggerMetadata[lokiActivationThreshold]; ok {
t, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("activationThreshold parsing error %w", err)
}

meta.activationThreshold = t
}

if val, ok := config.TriggerMetadata[tenantName]; ok && val != "" {
meta.tenantName = val
func parseLokiMetadata(config *scalersconfig.ScalerConfig) (lokiMetadata, error) {
meta := lokiMetadata{}
err := config.TypedConfig(&meta)
if err != nil {
return meta, fmt.Errorf("error parsing loki metadata: %w", err)
}

meta.ignoreNullValues = lokiDefaultIgnoreNullValues
if val, ok := config.TriggerMetadata[lokiIgnoreNullValues]; ok && val != "" {
ignoreNullValues, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("err incorrect value for ignoreNullValues given: %s please use true or false", val)
}
meta.ignoreNullValues = ignoreNullValues
if config.AsMetricSource {
meta.Threshold = 0
}

meta.unsafeSsl = false
if val, ok := config.TriggerMetadata[unsafeSsl]; ok && val != "" {
unsafeSslValue, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("error parsing %s: %w", unsafeSsl, err)
}

meta.unsafeSsl = unsafeSslValue
}

meta.triggerIndex = config.TriggerIndex

// parse auth configs from ScalerConfig
auth, err := authentication.GetAuthConfigs(config.TriggerMetadata, config.AuthParams)
if err != nil {
return nil, err
return meta, err
}
meta.lokiAuth = auth
meta.Auth = auth
meta.TriggerIndex = config.TriggerIndex

return meta, nil
}

// Close returns a nil error
func (s *lokiScaler) Close(context.Context) error {
if s.httpClient != nil {
s.httpClient.CloseIdleConnections()
}
return nil
}

// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler
func (s *lokiScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, "loki"),
Name: GenerateMetricNameWithIndex(s.metadata.TriggerIndex, "loki"),
},
Target: GetMetricTargetMili(s.metricType, s.metadata.threshold),
}
metricSpec := v2.MetricSpec{
External: externalMetric, Type: externalMetricType,
Target: GetMetricTargetMili(s.metricType, s.metadata.Threshold),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2.MetricSpec{metricSpec}
}

// ExecuteLokiQuery returns the result of the LogQL query execution
func (s *lokiScaler) ExecuteLokiQuery(ctx context.Context) (float64, error) {
u, err := url.ParseRequestURI(s.metadata.serverAddress)
u, err := url.ParseRequestURI(s.metadata.ServerAddress)
if err != nil {
return -1, err
}
u.Path = "/loki/api/v1/query"

u.RawQuery = url.Values{
"query": []string{s.metadata.query},
}.Encode()
u.RawQuery = url.Values{"query": []string{s.metadata.Query}}.Encode()

req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil)
if err != nil {
return -1, err
}

if s.metadata.lokiAuth != nil && s.metadata.lokiAuth.EnableBearerAuth {
req.Header.Add("Authorization", authentication.GetBearerToken(s.metadata.lokiAuth))
} else if s.metadata.lokiAuth != nil && s.metadata.lokiAuth.EnableBasicAuth {
req.SetBasicAuth(s.metadata.lokiAuth.Username, s.metadata.lokiAuth.Password)
if s.metadata.Auth != nil {
if s.metadata.Auth.EnableBearerAuth {
req.Header.Add("Authorization", authentication.GetBearerToken(s.metadata.Auth))
} else if s.metadata.Auth.EnableBasicAuth {
req.SetBasicAuth(s.metadata.Auth.Username, s.metadata.Auth.Password)
}
}

if s.metadata.tenantName != "" {
req.Header.Add(tenantNameHeaderKey, s.metadata.tenantName)
if s.metadata.TenantName != "" {
req.Header.Add(tenantNameHeaderKey, s.metadata.TenantName)
}

r, err := s.httpClient.Do(req)
resp, err := s.httpClient.Do(req)
if err != nil {
return -1, err
}
defer r.Body.Close()
defer resp.Body.Close()

b, err := io.ReadAll(r.Body)
body, err := io.ReadAll(resp.Body)
if err != nil {
return -1, err
}

if !(r.StatusCode >= 200 && r.StatusCode <= 299) {
err := fmt.Errorf("loki query api returned error. status: %d response: %s", r.StatusCode, string(b))
s.logger.Error(err, "loki query api returned error")
return -1, err
if resp.StatusCode < 200 || resp.StatusCode > 299 {
return -1, fmt.Errorf("loki query api returned error. status: %d response: %s", resp.StatusCode, string(body))
}

var result lokiQueryResult
err = json.Unmarshal(b, &result)
if err != nil {
if err := json.Unmarshal(body, &result); err != nil {
return -1, err
}

var v float64 = -1
return s.parseQueryResult(result)
}

// allow for zero element or single element result sets
func (s *lokiScaler) parseQueryResult(result lokiQueryResult) (float64, error) {
if len(result.Data.Result) == 0 {
if s.metadata.ignoreNullValues {
if s.metadata.IgnoreNullValues {
return 0, nil
}
return -1, fmt.Errorf("loki metrics may be lost, the result is empty")
} else if len(result.Data.Result) > 1 {
return -1, fmt.Errorf("loki query %s returned multiple elements", s.metadata.query)
}

valueLen := len(result.Data.Result[0].Value)
if valueLen == 0 {
if s.metadata.ignoreNullValues {
if len(result.Data.Result) > 1 {
return -1, fmt.Errorf("loki query %s returned multiple elements", s.metadata.Query)
}

values := result.Data.Result[0].Value
if len(values) == 0 {
if s.metadata.IgnoreNullValues {
return 0, nil
}
return -1, fmt.Errorf("loki metrics may be lost, the value list is empty")
} else if valueLen < 2 {
return -1, fmt.Errorf("loki query %s didn't return enough values", s.metadata.query)
}

val := result.Data.Result[0].Value[1]
if val != nil {
str := val.(string)
v, err = strconv.ParseFloat(str, 64)
if err != nil {
s.logger.Error(err, "Error converting loki value", "loki_value", str)
return -1, err
}
if len(values) < 2 {
return -1, fmt.Errorf("loki query %s didn't return enough values", s.metadata.Query)
}

if values[1] == nil {
return 0, nil
}

str, ok := values[1].(string)
if !ok {
return -1, fmt.Errorf("failed to parse loki value as string")
}

v, err := strconv.ParseFloat(str, 64)
if err != nil {
return -1, fmt.Errorf("error converting loki value %s: %w", str, err)
}

return v, nil
Expand All @@ -279,6 +211,5 @@ func (s *lokiScaler) GetMetricsAndActivity(ctx context.Context, metricName strin
}

metric := GenerateMetricInMili(metricName, val)

return []external_metrics.ExternalMetricValue{metric}, val > s.metadata.activationThreshold, nil
return []external_metrics.ExternalMetricValue{metric}, val > s.metadata.ActivationThreshold, nil
}
Loading
Loading