Skip to content

Commit

Permalink
Clean up config handling in target allocator (open-telemetry#2470)
Browse files Browse the repository at this point in the history
  • Loading branch information
swiatekm authored Jan 9, 2024
1 parent a62bac6 commit 8d25118
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 45 deletions.
30 changes: 11 additions & 19 deletions cmd/otel-allocator/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)

const DefaultResyncTime = 5 * time.Minute
const DefaultConfigFilePath string = "/conf/targetallocator.yaml"
const DefaultCRScrapeInterval model.Duration = model.Duration(time.Second * 30)
const (
DefaultResyncTime = 5 * time.Minute
DefaultConfigFilePath string = "/conf/targetallocator.yaml"
DefaultCRScrapeInterval model.Duration = model.Duration(time.Second * 30)
DefaultAllocationStrategy = "consistent-hashing"
DefaultFilterStrategy = "relabel-config"
)

type Config struct {
ListenAddr string `yaml:"listen_addr,omitempty"`
Expand All @@ -46,8 +50,8 @@ type Config struct {
RootLogger logr.Logger `yaml:"-"`
CollectorSelector *metav1.LabelSelector `yaml:"collector_selector,omitempty"`
PromConfig *promconfig.Config `yaml:"config"`
AllocationStrategy *string `yaml:"allocation_strategy,omitempty"`
FilterStrategy *string `yaml:"filter_strategy,omitempty"`
AllocationStrategy string `yaml:"allocation_strategy,omitempty"`
FilterStrategy string `yaml:"filter_strategy,omitempty"`
PrometheusCR PrometheusCRConfig `yaml:"prometheus_cr,omitempty"`
PodMonitorSelector map[string]string `yaml:"pod_monitor_selector,omitempty"`
ServiceMonitorSelector map[string]string `yaml:"service_monitor_selector,omitempty"`
Expand All @@ -58,20 +62,6 @@ type PrometheusCRConfig struct {
ScrapeInterval model.Duration `yaml:"scrape_interval,omitempty"`
}

func (c Config) GetAllocationStrategy() string {
if c.AllocationStrategy != nil {
return *c.AllocationStrategy
}
return "consistent-hashing"
}

func (c Config) GetTargetsFilterStrategy() string {
if c.FilterStrategy != nil {
return *c.FilterStrategy
}
return "relabel-config"
}

func LoadFromFile(file string, target *Config) error {
return unmarshal(target, file)
}
Expand Down Expand Up @@ -127,6 +117,8 @@ func unmarshal(cfg *Config, configFile string) error {

func CreateDefaultConfig() Config {
return Config{
AllocationStrategy: DefaultAllocationStrategy,
FilterStrategy: DefaultFilterStrategy,
PrometheusCR: PrometheusCRConfig{
ScrapeInterval: DefaultCRScrapeInterval,
},
Expand Down
4 changes: 4 additions & 0 deletions cmd/otel-allocator/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ func TestLoad(t *testing.T) {
file: "./testdata/config_test.yaml",
},
want: Config{
AllocationStrategy: DefaultAllocationStrategy,
CollectorSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app.kubernetes.io/instance": "default.test",
"app.kubernetes.io/managed-by": "opentelemetry-operator",
},
},
FilterStrategy: DefaultFilterStrategy,
PrometheusCR: PrometheusCRConfig{
ScrapeInterval: model.Duration(time.Second * 60),
},
Expand Down Expand Up @@ -111,12 +113,14 @@ func TestLoad(t *testing.T) {
file: "./testdata/pod_service_selector_test.yaml",
},
want: Config{
AllocationStrategy: DefaultAllocationStrategy,
CollectorSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app.kubernetes.io/instance": "default.test",
"app.kubernetes.io/managed-by": "opentelemetry-operator",
},
},
FilterStrategy: DefaultFilterStrategy,
PrometheusCR: PrometheusCRConfig{
ScrapeInterval: DefaultCRScrapeInterval,
},
Expand Down
19 changes: 12 additions & 7 deletions cmd/otel-allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ func main() {
ctx := context.Background()
log := ctrl.Log.WithName("allocator")

allocatorPrehook = prehook.New(cfg.GetTargetsFilterStrategy(), log)
allocator, err = allocation.New(cfg.GetAllocationStrategy(), log, allocation.WithFilter(allocatorPrehook))
allocatorPrehook = prehook.New(cfg.FilterStrategy, log)
allocator, err = allocation.New(cfg.AllocationStrategy, log, allocation.WithFilter(allocatorPrehook))
if err != nil {
setupLog.Error(err, "Unable to initialize allocation strategy")
os.Exit(1)
Expand Down Expand Up @@ -134,11 +134,16 @@ func main() {
runGroup.Add(
func() error {
// Initial loading of the config file's scrape config
err = targetDiscoverer.ApplyConfig(allocatorWatcher.EventSourceConfigMap, cfg.PromConfig)
if err != nil {
setupLog.Error(err, "Unable to apply initial configuration")
return err
if cfg.PromConfig != nil {
err = targetDiscoverer.ApplyConfig(allocatorWatcher.EventSourceConfigMap, cfg.PromConfig.ScrapeConfigs)
if err != nil {
setupLog.Error(err, "Unable to apply initial configuration")
return err
}
} else {
setupLog.Info("Prometheus config empty, skipping initial discovery configuration")
}

err := targetDiscoverer.Watch(allocator.SetTargets)
setupLog.Info("Target discoverer exited")
return err
Expand Down Expand Up @@ -180,7 +185,7 @@ func main() {
setupLog.Error(err, "Unable to load configuration")
continue
}
err = targetDiscoverer.ApplyConfig(event.Source, loadConfig)
err = targetDiscoverer.ApplyConfig(event.Source, loadConfig.ScrapeConfigs)
if err != nil {
setupLog.Error(err, "Unable to apply configuration")
continue
Expand Down
25 changes: 11 additions & 14 deletions cmd/otel-allocator/target/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
promconfig "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/model/relabel"
"gopkg.in/yaml.v3"
Expand All @@ -41,7 +41,7 @@ type Discoverer struct {
log logr.Logger
manager *discovery.Manager
close chan struct{}
configsMap map[allocatorWatcher.EventSource]*config.Config
configsMap map[allocatorWatcher.EventSource][]*promconfig.ScrapeConfig
hook discoveryHook
scrapeConfigsHash hash.Hash
scrapeConfigsUpdater scrapeConfigsUpdater
Expand All @@ -52,33 +52,30 @@ type discoveryHook interface {
}

type scrapeConfigsUpdater interface {
UpdateScrapeConfigResponse(map[string]*config.ScrapeConfig) error
UpdateScrapeConfigResponse(map[string]*promconfig.ScrapeConfig) error
}

func NewDiscoverer(log logr.Logger, manager *discovery.Manager, hook discoveryHook, scrapeConfigsUpdater scrapeConfigsUpdater) *Discoverer {
return &Discoverer{
log: log,
manager: manager,
close: make(chan struct{}),
configsMap: make(map[allocatorWatcher.EventSource]*config.Config),
configsMap: make(map[allocatorWatcher.EventSource][]*promconfig.ScrapeConfig),
hook: hook,
scrapeConfigsHash: nil, // we want the first update to succeed even if the config is empty
scrapeConfigsUpdater: scrapeConfigsUpdater,
}
}

func (m *Discoverer) ApplyConfig(source allocatorWatcher.EventSource, cfg *config.Config) error {
if cfg == nil {
m.log.Info("Service Discovery got empty Prometheus config", "source", source.String())
return nil
}
m.configsMap[source] = cfg
jobToScrapeConfig := make(map[string]*config.ScrapeConfig)
func (m *Discoverer) ApplyConfig(source allocatorWatcher.EventSource, scrapeConfigs []*promconfig.ScrapeConfig) error {
m.configsMap[source] = scrapeConfigs
jobToScrapeConfig := make(map[string]*promconfig.ScrapeConfig)

discoveryCfg := make(map[string]discovery.Configs)
relabelCfg := make(map[string][]*relabel.Config)

for _, value := range m.configsMap {
for _, scrapeConfig := range value.ScrapeConfigs {
for _, configs := range m.configsMap {
for _, scrapeConfig := range configs {
jobToScrapeConfig[scrapeConfig.JobName] = scrapeConfig
discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs
relabelCfg[scrapeConfig.JobName] = scrapeConfig.RelabelConfigs
Expand Down Expand Up @@ -137,7 +134,7 @@ func (m *Discoverer) Close() {

// Calculate a hash for a scrape config map.
// This is done by marshaling to YAML because it's the most straightforward and doesn't run into problems with unexported fields.
func getScrapeConfigHash(jobToScrapeConfig map[string]*config.ScrapeConfig) (hash.Hash64, error) {
func getScrapeConfigHash(jobToScrapeConfig map[string]*promconfig.ScrapeConfig) (hash.Hash64, error) {
var err error
hash := fnv.New64()
yamlEncoder := yaml.NewEncoder(hash)
Expand Down
10 changes: 5 additions & 5 deletions cmd/otel-allocator/target/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestDiscovery(t *testing.T) {
err := config.LoadFromFile(tt.args.file, &cfg)
assert.NoError(t, err)
assert.True(t, len(cfg.PromConfig.ScrapeConfigs) > 0)
err = manager.ApplyConfig(allocatorWatcher.EventSourcePrometheusCR, cfg.PromConfig)
err = manager.ApplyConfig(allocatorWatcher.EventSourcePrometheusCR, cfg.PromConfig.ScrapeConfigs)
assert.NoError(t, err)

gotTargets := <-results
Expand Down Expand Up @@ -306,11 +306,11 @@ func TestDiscovery_ScrapeConfigHashing(t *testing.T) {

for _, tc := range tests {
t.Run(tc.description, func(t *testing.T) {
err := manager.ApplyConfig(allocatorWatcher.EventSourcePrometheusCR, tc.cfg)
err := manager.ApplyConfig(allocatorWatcher.EventSourcePrometheusCR, tc.cfg.ScrapeConfigs)
if !tc.expectErr {
expectedConfig = make(map[string]*promconfig.ScrapeConfig)
for _, value := range manager.configsMap {
for _, scrapeConfig := range value.ScrapeConfigs {
for _, configs := range manager.configsMap {
for _, scrapeConfig := range configs {
expectedConfig[scrapeConfig.JobName] = scrapeConfig
}
}
Expand Down Expand Up @@ -389,7 +389,7 @@ func BenchmarkApplyScrapeConfig(b *testing.B) {

b.ResetTimer()
for i := 0; i < b.N; i++ {
err := manager.ApplyConfig(allocatorWatcher.EventSourcePrometheusCR, cfg)
err := manager.ApplyConfig(allocatorWatcher.EventSourcePrometheusCR, cfg.ScrapeConfigs)
require.NoError(b, err)
}
}
Expand Down

0 comments on commit 8d25118

Please sign in to comment.