Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] Clean up config handling in target allocator #2470

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 11 additions & 19 deletions cmd/otel-allocator/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)

const DefaultResyncTime = 5 * time.Minute
const DefaultConfigFilePath string = "/conf/targetallocator.yaml"
const DefaultCRScrapeInterval model.Duration = model.Duration(time.Second * 30)
const (
DefaultResyncTime = 5 * time.Minute
DefaultConfigFilePath string = "/conf/targetallocator.yaml"
DefaultCRScrapeInterval model.Duration = model.Duration(time.Second * 30)
DefaultAllocationStrategy = "consistent-hashing"
DefaultFilterStrategy = "relabel-config"
)

type Config struct {
ListenAddr string `yaml:"listen_addr,omitempty"`
Expand All @@ -46,8 +50,8 @@ type Config struct {
RootLogger logr.Logger `yaml:"-"`
CollectorSelector *metav1.LabelSelector `yaml:"collector_selector,omitempty"`
PromConfig *promconfig.Config `yaml:"config"`
AllocationStrategy *string `yaml:"allocation_strategy,omitempty"`
FilterStrategy *string `yaml:"filter_strategy,omitempty"`
AllocationStrategy string `yaml:"allocation_strategy,omitempty"`
FilterStrategy string `yaml:"filter_strategy,omitempty"`
PrometheusCR PrometheusCRConfig `yaml:"prometheus_cr,omitempty"`
PodMonitorSelector map[string]string `yaml:"pod_monitor_selector,omitempty"`
ServiceMonitorSelector map[string]string `yaml:"service_monitor_selector,omitempty"`
Expand All @@ -58,20 +62,6 @@ type PrometheusCRConfig struct {
ScrapeInterval model.Duration `yaml:"scrape_interval,omitempty"`
}

func (c Config) GetAllocationStrategy() string {
if c.AllocationStrategy != nil {
return *c.AllocationStrategy
}
return "consistent-hashing"
}

func (c Config) GetTargetsFilterStrategy() string {
if c.FilterStrategy != nil {
return *c.FilterStrategy
}
return "relabel-config"
}

func LoadFromFile(file string, target *Config) error {
return unmarshal(target, file)
}
Expand Down Expand Up @@ -127,6 +117,8 @@ func unmarshal(cfg *Config, configFile string) error {

func CreateDefaultConfig() Config {
return Config{
AllocationStrategy: DefaultAllocationStrategy,
FilterStrategy: DefaultFilterStrategy,
PrometheusCR: PrometheusCRConfig{
ScrapeInterval: DefaultCRScrapeInterval,
},
Expand Down
4 changes: 4 additions & 0 deletions cmd/otel-allocator/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ func TestLoad(t *testing.T) {
file: "./testdata/config_test.yaml",
},
want: Config{
AllocationStrategy: DefaultAllocationStrategy,
CollectorSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app.kubernetes.io/instance": "default.test",
"app.kubernetes.io/managed-by": "opentelemetry-operator",
},
},
FilterStrategy: DefaultFilterStrategy,
PrometheusCR: PrometheusCRConfig{
ScrapeInterval: model.Duration(time.Second * 60),
},
Expand Down Expand Up @@ -111,12 +113,14 @@ func TestLoad(t *testing.T) {
file: "./testdata/pod_service_selector_test.yaml",
},
want: Config{
AllocationStrategy: DefaultAllocationStrategy,
CollectorSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app.kubernetes.io/instance": "default.test",
"app.kubernetes.io/managed-by": "opentelemetry-operator",
},
},
FilterStrategy: DefaultFilterStrategy,
PrometheusCR: PrometheusCRConfig{
ScrapeInterval: DefaultCRScrapeInterval,
},
Expand Down
19 changes: 12 additions & 7 deletions cmd/otel-allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ func main() {
ctx := context.Background()
log := ctrl.Log.WithName("allocator")

allocatorPrehook = prehook.New(cfg.GetTargetsFilterStrategy(), log)
allocator, err = allocation.New(cfg.GetAllocationStrategy(), log, allocation.WithFilter(allocatorPrehook))
allocatorPrehook = prehook.New(cfg.FilterStrategy, log)
allocator, err = allocation.New(cfg.AllocationStrategy, log, allocation.WithFilter(allocatorPrehook))
if err != nil {
setupLog.Error(err, "Unable to initialize allocation strategy")
os.Exit(1)
Expand Down Expand Up @@ -134,11 +134,16 @@ func main() {
runGroup.Add(
func() error {
// Initial loading of the config file's scrape config
err = targetDiscoverer.ApplyConfig(allocatorWatcher.EventSourceConfigMap, cfg.PromConfig)
if err != nil {
setupLog.Error(err, "Unable to apply initial configuration")
return err
if cfg.PromConfig != nil {
err = targetDiscoverer.ApplyConfig(allocatorWatcher.EventSourceConfigMap, cfg.PromConfig.ScrapeConfigs)
if err != nil {
setupLog.Error(err, "Unable to apply initial configuration")
return err
}
} else {
setupLog.Info("Prometheus config empty, skipping initial discovery configuration")
}

err := targetDiscoverer.Watch(allocator.SetTargets)
setupLog.Info("Target discoverer exited")
return err
Expand Down Expand Up @@ -180,7 +185,7 @@ func main() {
setupLog.Error(err, "Unable to load configuration")
continue
}
err = targetDiscoverer.ApplyConfig(event.Source, loadConfig)
err = targetDiscoverer.ApplyConfig(event.Source, loadConfig.ScrapeConfigs)
if err != nil {
setupLog.Error(err, "Unable to apply configuration")
continue
Expand Down
25 changes: 11 additions & 14 deletions cmd/otel-allocator/target/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
promconfig "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/model/relabel"
"gopkg.in/yaml.v3"
Expand All @@ -41,7 +41,7 @@ type Discoverer struct {
log logr.Logger
manager *discovery.Manager
close chan struct{}
configsMap map[allocatorWatcher.EventSource]*config.Config
configsMap map[allocatorWatcher.EventSource][]*promconfig.ScrapeConfig
hook discoveryHook
scrapeConfigsHash hash.Hash
scrapeConfigsUpdater scrapeConfigsUpdater
Expand All @@ -52,33 +52,30 @@ type discoveryHook interface {
}

type scrapeConfigsUpdater interface {
UpdateScrapeConfigResponse(map[string]*config.ScrapeConfig) error
UpdateScrapeConfigResponse(map[string]*promconfig.ScrapeConfig) error
}

func NewDiscoverer(log logr.Logger, manager *discovery.Manager, hook discoveryHook, scrapeConfigsUpdater scrapeConfigsUpdater) *Discoverer {
return &Discoverer{
log: log,
manager: manager,
close: make(chan struct{}),
configsMap: make(map[allocatorWatcher.EventSource]*config.Config),
configsMap: make(map[allocatorWatcher.EventSource][]*promconfig.ScrapeConfig),
hook: hook,
scrapeConfigsHash: nil, // we want the first update to succeed even if the config is empty
scrapeConfigsUpdater: scrapeConfigsUpdater,
}
}

func (m *Discoverer) ApplyConfig(source allocatorWatcher.EventSource, cfg *config.Config) error {
if cfg == nil {
m.log.Info("Service Discovery got empty Prometheus config", "source", source.String())
return nil
}
m.configsMap[source] = cfg
jobToScrapeConfig := make(map[string]*config.ScrapeConfig)
func (m *Discoverer) ApplyConfig(source allocatorWatcher.EventSource, scrapeConfigs []*promconfig.ScrapeConfig) error {
m.configsMap[source] = scrapeConfigs
jobToScrapeConfig := make(map[string]*promconfig.ScrapeConfig)

discoveryCfg := make(map[string]discovery.Configs)
relabelCfg := make(map[string][]*relabel.Config)

for _, value := range m.configsMap {
for _, scrapeConfig := range value.ScrapeConfigs {
for _, configs := range m.configsMap {
for _, scrapeConfig := range configs {
jobToScrapeConfig[scrapeConfig.JobName] = scrapeConfig
discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs
relabelCfg[scrapeConfig.JobName] = scrapeConfig.RelabelConfigs
Expand Down Expand Up @@ -137,7 +134,7 @@ func (m *Discoverer) Close() {

// Calculate a hash for a scrape config map.
// This is done by marshaling to YAML because it's the most straightforward and doesn't run into problems with unexported fields.
func getScrapeConfigHash(jobToScrapeConfig map[string]*config.ScrapeConfig) (hash.Hash64, error) {
func getScrapeConfigHash(jobToScrapeConfig map[string]*promconfig.ScrapeConfig) (hash.Hash64, error) {
var err error
hash := fnv.New64()
yamlEncoder := yaml.NewEncoder(hash)
Expand Down
10 changes: 5 additions & 5 deletions cmd/otel-allocator/target/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestDiscovery(t *testing.T) {
err := config.LoadFromFile(tt.args.file, &cfg)
assert.NoError(t, err)
assert.True(t, len(cfg.PromConfig.ScrapeConfigs) > 0)
err = manager.ApplyConfig(allocatorWatcher.EventSourcePrometheusCR, cfg.PromConfig)
err = manager.ApplyConfig(allocatorWatcher.EventSourcePrometheusCR, cfg.PromConfig.ScrapeConfigs)
assert.NoError(t, err)

gotTargets := <-results
Expand Down Expand Up @@ -306,11 +306,11 @@ func TestDiscovery_ScrapeConfigHashing(t *testing.T) {

for _, tc := range tests {
t.Run(tc.description, func(t *testing.T) {
err := manager.ApplyConfig(allocatorWatcher.EventSourcePrometheusCR, tc.cfg)
err := manager.ApplyConfig(allocatorWatcher.EventSourcePrometheusCR, tc.cfg.ScrapeConfigs)
if !tc.expectErr {
expectedConfig = make(map[string]*promconfig.ScrapeConfig)
for _, value := range manager.configsMap {
for _, scrapeConfig := range value.ScrapeConfigs {
for _, configs := range manager.configsMap {
for _, scrapeConfig := range configs {
expectedConfig[scrapeConfig.JobName] = scrapeConfig
}
}
Expand Down Expand Up @@ -389,7 +389,7 @@ func BenchmarkApplyScrapeConfig(b *testing.B) {

b.ResetTimer()
for i := 0; i < b.N; i++ {
err := manager.ApplyConfig(allocatorWatcher.EventSourcePrometheusCR, cfg)
err := manager.ApplyConfig(allocatorWatcher.EventSourcePrometheusCR, cfg.ScrapeConfigs)
require.NoError(b, err)
}
}
Expand Down
Loading