From 9cbc630358375937ba9a967fd3118ac701de9139 Mon Sep 17 00:00:00 2001 From: Rishabh Kumar Date: Mon, 23 Sep 2024 10:46:20 -0700 Subject: [PATCH 1/3] [thanos] Changes to address querying multi cluster reads using thanos query --- cmd/thanos/config.go | 3 + cmd/thanos/sidecar.go | 3 +- docs/getting-started.md | 2 +- pkg/api/query/v1.go | 2 - pkg/promclient/promclient.go | 46 +- pkg/query/querier.go | 811 ++++++++++++++++----------------- pkg/store/acceptance_test.go | 2 +- pkg/store/prometheus.go | 20 +- pkg/store/prometheus_test.go | 8 +- pkg/store/proxy.go | 6 +- pkg/tenancy/tenancy.go | 14 +- test/e2e/query_test.go | 6 +- test/e2e/store_gateway_test.go | 8 +- 13 files changed, 482 insertions(+), 449 deletions(-) diff --git a/cmd/thanos/config.go b/cmd/thanos/config.go index 3ce069c1b2..400f75c65c 100644 --- a/cmd/thanos/config.go +++ b/cmd/thanos/config.go @@ -21,6 +21,7 @@ import ( "github.com/thanos-io/thanos/pkg/extkingpin" "github.com/thanos-io/thanos/pkg/shipper" + "github.com/thanos-io/thanos/pkg/tenancy" ) type grpcConfig struct { @@ -80,12 +81,14 @@ type prometheusConfig struct { getConfigInterval time.Duration getConfigTimeout time.Duration httpClient *extflag.PathOrContent + tenantHeader string } func (pc *prometheusConfig) registerFlag(cmd extkingpin.FlagClause) *prometheusConfig { cmd.Flag("prometheus.url", "URL at which to reach Prometheus's API. For better performance use local network."). Default("http://localhost:9090").URLVar(&pc.url) + cmd.Flag("prometheus.tenant-header", "HTTP header to determine tenant.").Default(tenancy.DefaultTenantHeader).StringVar(&pc.tenantHeader) cmd.Flag("prometheus.ready_timeout", "Maximum time to wait for the Prometheus instance to start up"). Default("10m").DurationVar(&pc.readyTimeout) diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 014b9fc9df..077d086c14 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -297,7 +297,8 @@ func runSidecar( { c := promclient.NewWithTracingClient(logger, httpClient, clientconfig.ThanosUserAgent) - promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version) + errors.Wrap(err, "create Prometheus header "+conf.prometheus.tenantHeader) + promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version, conf.prometheus.tenantHeader) if err != nil { return errors.Wrap(err, "create Prometheus store") } diff --git a/docs/getting-started.md b/docs/getting-started.md index 4f52745dfc..7948b63bfb 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -197,4 +197,4 @@ However, in case you want to play and run Thanos components on a single node, we | Compact | HTTP | 10912 | | Query Frontend | HTTP | 10913 | -You can see example one-node setup [here](../scripts/quickstart.sh). +You can see example one-node setup [here](../scripts/sidecar.sh). diff --git a/pkg/api/query/v1.go b/pkg/api/query/v1.go index 8d3969cb0e..7c29a69e0a 100644 --- a/pkg/api/query/v1.go +++ b/pkg/api/query/v1.go @@ -1096,7 +1096,6 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A hints := &storage.LabelHints{ Limit: toHintLimit(limit), } - var ( vals []string warnings annotations.Annotations @@ -1281,7 +1280,6 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap hints := &storage.LabelHints{ Limit: toHintLimit(limit), } - if len(matcherSets) > 0 { var callWarnings annotations.Annotations labelNamesSet := make(map[string]struct{}) diff --git a/pkg/promclient/promclient.go b/pkg/promclient/promclient.go index 6f124136b4..b214e203cb 100644 --- a/pkg/promclient/promclient.go +++ b/pkg/promclient/promclient.go @@ -40,6 +40,7 @@ import ( "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/targets/targetspb" + "github.com/thanos-io/thanos/pkg/tenancy" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -107,6 +108,7 @@ func NewWithTracingClient(logger log.Logger, httpClient *http.Client, userAgent // the raw query is encoded in the body and the appropriate Content-Type is set. func (c *Client) req2xx(ctx context.Context, u *url.URL, method string, headers http.Header) (_ []byte, _ int, err error) { var b io.Reader + if method == http.MethodPost { rq := u.RawQuery b = strings.NewReader(rq) @@ -691,11 +693,14 @@ func formatTime(t time.Time) string { return strconv.FormatFloat(float64(t.Unix())+float64(t.Nanosecond())/1e9, 'f', -1, 64) } -func (c *Client) get2xxResultWithGRPCErrors(ctx context.Context, spanName string, u *url.URL, data interface{}) error { +func (c *Client) get2xxResultWithGRPCErrors(ctx context.Context, spanName string, u *url.URL, data interface{}, header string) error { span, ctx := tracing.StartSpan(ctx, spanName) defer span.Finish() - body, code, err := c.req2xx(ctx, u, http.MethodGet, nil) + var customheader = http.Header{} + customheader.Set(header, tenancy.GetTenantFromProvidedHeader(ctx, header)) + + body, code, err := c.req2xx(ctx, u, http.MethodGet, customheader) if err != nil { if code, exists := statusToCode[code]; exists && code != 0 { return status.Error(code, err.Error()) @@ -734,7 +739,7 @@ func (c *Client) get2xxResultWithGRPCErrors(ctx context.Context, spanName string // SeriesInGRPC returns the labels from Prometheus series API. It uses gRPC errors. // NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus. -func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]map[string]string, error) { +func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int, header string) ([]map[string]string, error) { u := *base u.Path = path.Join(u.Path, "/api/v1/series") q := u.Query() @@ -742,19 +747,21 @@ func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*la q.Add("match[]", storepb.PromMatchersToString(matchers...)) q.Add("start", formatTime(timestamp.Time(startTime))) q.Add("end", formatTime(timestamp.Time(endTime))) - q.Add("limit", strconv.Itoa(limit)) + if limit > 0 { + q.Add("limit", strconv.Itoa(limit)) + } u.RawQuery = q.Encode() var m struct { Data []map[string]string `json:"data"` } - return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_series HTTP[client]", &u, &m) + return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_series HTTP[client]", &u, &m, header) } // LabelNamesInGRPC returns all known label names constrained by the given matchers. It uses gRPC errors. // NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus. -func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]string, error) { +func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int, header string) ([]string, error) { u := *base u.Path = path.Join(u.Path, "/api/v1/labels") q := u.Query() @@ -764,18 +771,20 @@ func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers [ } q.Add("start", formatTime(timestamp.Time(startTime))) q.Add("end", formatTime(timestamp.Time(endTime))) - q.Add("limit", strconv.Itoa(limit)) + if limit >= 0 { + q.Add("limit", strconv.Itoa(limit)) + } u.RawQuery = q.Encode() var m struct { Data []string `json:"data"` } - return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_label_names HTTP[client]", &u, &m) + return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_label_names HTTP[client]", &u, &m, header) } // LabelValuesInGRPC returns all known label values for a given label name. It uses gRPC errors. // NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus. -func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]string, error) { +func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string, matchers []*labels.Matcher, startTime, endTime int64, limit int, header string) ([]string, error) { u := *base u.Path = path.Join(u.Path, "/api/v1/label/", label, "/values") q := u.Query() @@ -785,13 +794,16 @@ func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label str } q.Add("start", formatTime(timestamp.Time(startTime))) q.Add("end", formatTime(timestamp.Time(endTime))) - q.Add("limit", strconv.Itoa(limit)) + if limit >= 0 { + q.Add("limit", strconv.Itoa(limit)) + } + u.RawQuery = q.Encode() var m struct { Data []string `json:"data"` } - return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_label_values HTTP[client]", &u, &m) + return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_label_values HTTP[client]", &u, &m, header) } // RulesInGRPC returns the rules from Prometheus rules API. It uses gRPC errors. @@ -810,7 +822,7 @@ func (c *Client) RulesInGRPC(ctx context.Context, base *url.URL, typeRules strin Data *rulespb.RuleGroups `json:"data"` } - if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_rules HTTP[client]", &u, &m); err != nil { + if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_rules HTTP[client]", &u, &m, ""); err != nil { return nil, err } @@ -833,7 +845,7 @@ func (c *Client) AlertsInGRPC(ctx context.Context, base *url.URL) ([]*rulespb.Al } `json:"data"` } - if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_alerts HTTP[client]", &u, &m); err != nil { + if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_alerts HTTP[client]", &u, &m, ""); err != nil { return nil, err } @@ -854,7 +866,7 @@ func (c *Client) MetricMetadataInGRPC(ctx context.Context, base *url.URL, metric q.Add("metric", metric) } // We only set limit when it is >= 0. - if limit >= 0 { + if limit > 0 { q.Add("limit", strconv.Itoa(limit)) } @@ -863,7 +875,7 @@ func (c *Client) MetricMetadataInGRPC(ctx context.Context, base *url.URL, metric var v struct { Data map[string][]*metadatapb.Meta `json:"data"` } - return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_metric_metadata HTTP[client]", &u, &v) + return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_metric_metadata HTTP[client]", &u, &v, "") } // ExemplarsInGRPC returns the exemplars from Prometheus exemplars API. It uses gRPC errors. @@ -882,7 +894,7 @@ func (c *Client) ExemplarsInGRPC(ctx context.Context, base *url.URL, query strin Data []*exemplarspb.ExemplarData `json:"data"` } - if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_exemplars HTTP[client]", &u, &m); err != nil { + if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_exemplars HTTP[client]", &u, &m, ""); err != nil { return nil, err } @@ -902,5 +914,5 @@ func (c *Client) TargetsInGRPC(ctx context.Context, base *url.URL, stateTargets var v struct { Data *targetspb.TargetDiscovery `json:"data"` } - return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_targets HTTP[client]", &u, &v) + return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_targets HTTP[client]", &u, &v, "") } diff --git a/pkg/query/querier.go b/pkg/query/querier.go index 588fa00734..d783b2b739 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -4,27 +4,27 @@ package query import ( - "context" - "strings" - "sync" - "time" - - "github.com/go-kit/log" - "github.com/opentracing/opentracing-go" - "github.com/pkg/errors" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/util/annotations" - - "github.com/thanos-io/thanos/pkg/dedup" - "github.com/thanos-io/thanos/pkg/extprom" - "github.com/thanos-io/thanos/pkg/gate" - "github.com/thanos-io/thanos/pkg/store" - "github.com/thanos-io/thanos/pkg/store/storepb" - "github.com/thanos-io/thanos/pkg/tenancy" - "github.com/thanos-io/thanos/pkg/tracing" + "context" + "strings" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/annotations" + + "github.com/thanos-io/thanos/pkg/dedup" + "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/gate" + "github.com/thanos-io/thanos/pkg/store" + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/tenancy" + "github.com/thanos-io/thanos/pkg/tracing" ) type seriesStatsReporter func(seriesStats storepb.SeriesStatsCounter) @@ -32,12 +32,12 @@ type seriesStatsReporter func(seriesStats storepb.SeriesStatsCounter) var NoopSeriesStatsReporter seriesStatsReporter = func(_ storepb.SeriesStatsCounter) {} func NewAggregateStatsReporter(stats *[]storepb.SeriesStatsCounter) seriesStatsReporter { - var mutex sync.Mutex - return func(s storepb.SeriesStatsCounter) { - mutex.Lock() - defer mutex.Unlock() - *stats = append(*stats, s) - } + var mutex sync.Mutex + return func(s storepb.SeriesStatsCounter) { + mutex.Lock() + defer mutex.Unlock() + *stats = append(*stats, s) + } } // QueryableCreator returns implementation of promql.Queryable that fetches data from the proxy store API endpoints. @@ -47,446 +47,445 @@ func NewAggregateStatsReporter(stats *[]storepb.SeriesStatsCounter) seriesStatsR // maxResolutionMillis controls downsampling resolution that is allowed (specified in milliseconds). // partialResponse controls `partialResponseDisabled` option of StoreAPI and partial response behavior of proxy. type QueryableCreator func( - deduplicate bool, - replicaLabels []string, - storeDebugMatchers [][]*labels.Matcher, - maxResolutionMillis int64, - partialResponse, - skipChunks bool, - shardInfo *storepb.ShardInfo, - seriesStatsReporter seriesStatsReporter, + deduplicate bool, + replicaLabels []string, + storeDebugMatchers [][]*labels.Matcher, + maxResolutionMillis int64, + partialResponse, + skipChunks bool, + shardInfo *storepb.ShardInfo, + seriesStatsReporter seriesStatsReporter, ) storage.Queryable // NewQueryableCreator creates QueryableCreator. // NOTE(bwplotka): Proxy assumes to be replica_aware, see thanos.store.info.StoreInfo.replica_aware field. func NewQueryableCreator( - logger log.Logger, - reg prometheus.Registerer, - proxy storepb.StoreServer, - maxConcurrentSelects int, - selectTimeout time.Duration, + logger log.Logger, + reg prometheus.Registerer, + proxy storepb.StoreServer, + maxConcurrentSelects int, + selectTimeout time.Duration, ) QueryableCreator { - gf := gate.NewGateFactory(extprom.WrapRegistererWithPrefix("concurrent_selects_", reg), maxConcurrentSelects, gate.Selects) - - return func( - deduplicate bool, - replicaLabels []string, - storeDebugMatchers [][]*labels.Matcher, - maxResolutionMillis int64, - partialResponse, - skipChunks bool, - shardInfo *storepb.ShardInfo, - seriesStatsReporter seriesStatsReporter, - ) storage.Queryable { - return &queryable{ - logger: logger, - replicaLabels: replicaLabels, - storeDebugMatchers: storeDebugMatchers, - proxy: proxy, - deduplicate: deduplicate, - maxResolutionMillis: maxResolutionMillis, - partialResponse: partialResponse, - skipChunks: skipChunks, - gateProviderFn: func() gate.Gate { - return gf.New() - }, - maxConcurrentSelects: maxConcurrentSelects, - selectTimeout: selectTimeout, - shardInfo: shardInfo, - seriesStatsReporter: seriesStatsReporter, - } - } + gf := gate.NewGateFactory(extprom.WrapRegistererWithPrefix("concurrent_selects_", reg), maxConcurrentSelects, gate.Selects) + + return func( + deduplicate bool, + replicaLabels []string, + storeDebugMatchers [][]*labels.Matcher, + maxResolutionMillis int64, + partialResponse, + skipChunks bool, + shardInfo *storepb.ShardInfo, + seriesStatsReporter seriesStatsReporter, + ) storage.Queryable { + return &queryable{ + logger: logger, + replicaLabels: replicaLabels, + storeDebugMatchers: storeDebugMatchers, + proxy: proxy, + deduplicate: deduplicate, + maxResolutionMillis: maxResolutionMillis, + partialResponse: partialResponse, + skipChunks: skipChunks, + gateProviderFn: func() gate.Gate { + return gf.New() + }, + maxConcurrentSelects: maxConcurrentSelects, + selectTimeout: selectTimeout, + shardInfo: shardInfo, + seriesStatsReporter: seriesStatsReporter, + } + } } type queryable struct { - logger log.Logger - replicaLabels []string - storeDebugMatchers [][]*labels.Matcher - proxy storepb.StoreServer - deduplicate bool - maxResolutionMillis int64 - partialResponse bool - skipChunks bool - gateProviderFn func() gate.Gate - maxConcurrentSelects int - selectTimeout time.Duration - shardInfo *storepb.ShardInfo - seriesStatsReporter seriesStatsReporter + logger log.Logger + replicaLabels []string + storeDebugMatchers [][]*labels.Matcher + proxy storepb.StoreServer + deduplicate bool + maxResolutionMillis int64 + partialResponse bool + skipChunks bool + gateProviderFn func() gate.Gate + maxConcurrentSelects int + selectTimeout time.Duration + shardInfo *storepb.ShardInfo + seriesStatsReporter seriesStatsReporter } // Querier returns a new storage querier against the underlying proxy store API. func (q *queryable) Querier(mint, maxt int64) (storage.Querier, error) { - return newQuerier(q.logger, mint, maxt, q.replicaLabels, q.storeDebugMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateProviderFn(), q.selectTimeout, q.shardInfo, q.seriesStatsReporter), nil + return newQuerier(q.logger, mint, maxt, q.replicaLabels, q.storeDebugMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateProviderFn(), q.selectTimeout, q.shardInfo, q.seriesStatsReporter), nil } type querier struct { - logger log.Logger - mint, maxt int64 - replicaLabels []string - storeDebugMatchers [][]*labels.Matcher - proxy storepb.StoreServer - deduplicate bool - maxResolutionMillis int64 - partialResponseStrategy storepb.PartialResponseStrategy - skipChunks bool - selectGate gate.Gate - selectTimeout time.Duration - shardInfo *storepb.ShardInfo - seriesStatsReporter seriesStatsReporter - - returnChunksMtx sync.Mutex - returnChunks []*storepb.AggrChunk + logger log.Logger + mint, maxt int64 + replicaLabels []string + storeDebugMatchers [][]*labels.Matcher + proxy storepb.StoreServer + deduplicate bool + maxResolutionMillis int64 + partialResponseStrategy storepb.PartialResponseStrategy + skipChunks bool + selectGate gate.Gate + selectTimeout time.Duration + shardInfo *storepb.ShardInfo + seriesStatsReporter seriesStatsReporter + + returnChunksMtx sync.Mutex + returnChunks []*storepb.AggrChunk } var returnChunksSlicePool = sync.Pool{ - New: func() interface{} { - r := make([]*storepb.AggrChunk, 0) - return &r - }, + New: func() interface{} { + r := make([]*storepb.AggrChunk, 0) + return &r + }, } // newQuerier creates implementation of storage.Querier that fetches data from the proxy // store API endpoints. func newQuerier( - logger log.Logger, - mint, - maxt int64, - replicaLabels []string, - storeDebugMatchers [][]*labels.Matcher, - proxy storepb.StoreServer, - deduplicate bool, - maxResolutionMillis int64, - partialResponse, - skipChunks bool, - selectGate gate.Gate, - selectTimeout time.Duration, - shardInfo *storepb.ShardInfo, - seriesStatsReporter seriesStatsReporter, + logger log.Logger, + mint, + maxt int64, + replicaLabels []string, + storeDebugMatchers [][]*labels.Matcher, + proxy storepb.StoreServer, + deduplicate bool, + maxResolutionMillis int64, + partialResponse, + skipChunks bool, + selectGate gate.Gate, + selectTimeout time.Duration, + shardInfo *storepb.ShardInfo, + seriesStatsReporter seriesStatsReporter, ) *querier { - if logger == nil { - logger = log.NewNopLogger() - } - rl := make(map[string]struct{}) - for _, replicaLabel := range replicaLabels { - rl[replicaLabel] = struct{}{} - } - - partialResponseStrategy := storepb.PartialResponseStrategy_ABORT - if partialResponse { - partialResponseStrategy = storepb.PartialResponseStrategy_WARN - } - - returnChunks := returnChunksSlicePool.Get().(*[]*storepb.AggrChunk) - return &querier{ - logger: logger, - selectGate: selectGate, - selectTimeout: selectTimeout, - - mint: mint, - maxt: maxt, - replicaLabels: replicaLabels, - storeDebugMatchers: storeDebugMatchers, - proxy: proxy, - deduplicate: deduplicate, - maxResolutionMillis: maxResolutionMillis, - partialResponseStrategy: partialResponseStrategy, - skipChunks: skipChunks, - shardInfo: shardInfo, - seriesStatsReporter: seriesStatsReporter, - returnChunks: *returnChunks, - } + if logger == nil { + logger = log.NewNopLogger() + } + rl := make(map[string]struct{}) + for _, replicaLabel := range replicaLabels { + rl[replicaLabel] = struct{}{} + } + + partialResponseStrategy := storepb.PartialResponseStrategy_ABORT + if partialResponse { + partialResponseStrategy = storepb.PartialResponseStrategy_WARN + } + + returnChunks := returnChunksSlicePool.Get().(*[]*storepb.AggrChunk) + return &querier{ + logger: logger, + selectGate: selectGate, + selectTimeout: selectTimeout, + + mint: mint, + maxt: maxt, + replicaLabels: replicaLabels, + storeDebugMatchers: storeDebugMatchers, + proxy: proxy, + deduplicate: deduplicate, + maxResolutionMillis: maxResolutionMillis, + partialResponseStrategy: partialResponseStrategy, + skipChunks: skipChunks, + shardInfo: shardInfo, + seriesStatsReporter: seriesStatsReporter, + returnChunks: *returnChunks, + } } func (q *querier) isDedupEnabled() bool { - return q.deduplicate && len(q.replicaLabels) > 0 + return q.deduplicate && len(q.replicaLabels) > 0 } type seriesServer struct { - // This field just exist to pseudo-implement the unused methods of the interface. - storepb.Store_SeriesServer - ctx context.Context + // This field just exist to pseudo-implement the unused methods of the interface. + storepb.Store_SeriesServer + ctx context.Context - seriesSet []storepb.Series - seriesSetStats storepb.SeriesStatsCounter - warnings annotations.Annotations + seriesSet []storepb.Series + seriesSetStats storepb.SeriesStatsCounter + warnings annotations.Annotations } func (s *seriesServer) Send(r *storepb.SeriesResponse) error { - if r.GetWarning() != "" { - s.warnings.Add(errors.New(r.GetWarning())) - return nil - } - - if r.GetSeries() != nil { - s.seriesSet = append(s.seriesSet, *r.GetSeries()) - s.seriesSetStats.Count(r.GetSeries()) - return nil - } - - // Unsupported field, skip. - return nil + if r.GetWarning() != "" { + s.warnings.Add(errors.New(r.GetWarning())) + return nil + } + + if r.GetSeries() != nil { + s.seriesSet = append(s.seriesSet, *r.GetSeries()) + s.seriesSetStats.Count(r.GetSeries()) + return nil + } + + // Unsupported field, skip. + return nil } func (s *seriesServer) Context() context.Context { - return s.ctx + return s.ctx } // aggrsFromFunc infers aggregates of the underlying data based on the wrapping // function of a series selection. func aggrsFromFunc(f string) []storepb.Aggr { - if f == "min" || strings.HasPrefix(f, "min_") { - return []storepb.Aggr{storepb.Aggr_MIN} - } - if f == "max" || strings.HasPrefix(f, "max_") { - return []storepb.Aggr{storepb.Aggr_MAX} - } - if f == "count" || strings.HasPrefix(f, "count_") { - return []storepb.Aggr{storepb.Aggr_COUNT} - } - // f == "sum" falls through here since we want the actual samples. - if strings.HasPrefix(f, "sum_") { - return []storepb.Aggr{storepb.Aggr_SUM} - } - if f == "increase" || f == "rate" || f == "irate" || f == "resets" { - return []storepb.Aggr{storepb.Aggr_COUNTER} - } - // In the default case, we retrieve count and sum to compute an average. - return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM} + if f == "min" || strings.HasPrefix(f, "min_") { + return []storepb.Aggr{storepb.Aggr_MIN} + } + if f == "max" || strings.HasPrefix(f, "max_") { + return []storepb.Aggr{storepb.Aggr_MAX} + } + if f == "count" || strings.HasPrefix(f, "count_") { + return []storepb.Aggr{storepb.Aggr_COUNT} + } + // f == "sum" falls through here since we want the actual samples. + if strings.HasPrefix(f, "sum_") { + return []storepb.Aggr{storepb.Aggr_SUM} + } + if f == "increase" || f == "rate" || f == "irate" || f == "resets" { + return []storepb.Aggr{storepb.Aggr_COUNTER} + } + // In the default case, we retrieve count and sum to compute an average. + return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM} } func (q *querier) Select(ctx context.Context, _ bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet { - if hints == nil { - hints = &storage.SelectHints{ - Start: q.mint, - End: q.maxt, - } - } else { - // NOTE(GiedriusS): need to make a copy here - // because the PromQL engine sorts these and - // we later on call String() the whole request (including this slice). - grouping := make([]string, 0, len(hints.Grouping)) - grouping = append(grouping, hints.Grouping...) - hints.Grouping = grouping - } - - matchers := make([]string, len(ms)) - for i, m := range ms { - matchers[i] = m.String() - } - tenant := ctx.Value(tenancy.TenantKey) - // The context gets canceled as soon as query evaluation is completed by the engine. - // We want to prevent this from happening for the async store API calls we make while preserving tracing context. - // TODO(bwplotka): Does the above still is true? It feels weird to leave unfinished calls behind query API. - ctx = tracing.CopyTraceContext(context.Background(), ctx) - ctx = context.WithValue(ctx, tenancy.TenantKey, tenant) - ctx, cancel := context.WithTimeout(ctx, q.selectTimeout) - span, ctx := tracing.StartSpan(ctx, "querier_select", opentracing.Tags{ - "minTime": hints.Start, - "maxTime": hints.End, - "matchers": "{" + strings.Join(matchers, ",") + "}", - }) - - promise := make(chan storage.SeriesSet, 1) - go func() { - defer close(promise) - - var err error - tracing.DoInSpan(ctx, "querier_select_gate_ismyturn", func(ctx context.Context) { - err = q.selectGate.Start(ctx) - }) - if err != nil { - promise <- storage.ErrSeriesSet(errors.Wrap(err, "failed to wait for turn")) - return - } - defer q.selectGate.Done() - - span, ctx := tracing.StartSpan(ctx, "querier_select_select_fn") - defer span.Finish() - - set, stats, err := q.selectFn(ctx, hints, ms...) - if err != nil { - promise <- storage.ErrSeriesSet(err) - return - } - q.seriesStatsReporter(stats) - - promise <- set - }() - - return &lazySeriesSet{create: func() (storage.SeriesSet, bool) { - defer cancel() - defer span.Finish() - - // Only gets called once, for the first Next() call of the series set. - set, ok := <-promise - if !ok { - return storage.ErrSeriesSet(errors.New("channel closed before a value received")), false - } - return set, set.Next() - }} + if hints == nil { + hints = &storage.SelectHints{ + Start: q.mint, + End: q.maxt, + } + } else { + // NOTE(GiedriusS): need to make a copy here + // because the PromQL engine sorts these and + // we later on call String() the whole request (including this slice). + grouping := make([]string, 0, len(hints.Grouping)) + grouping = append(grouping, hints.Grouping...) + hints.Grouping = grouping + } + + matchers := make([]string, len(ms)) + for i, m := range ms { + matchers[i] = m.String() + } + tenant := ctx.Value(tenancy.TenantKey) + // The context gets canceled as soon as query evaluation is completed by the engine. + // We want to prevent this from happening for the async store API calls we make while preserving tracing context. + // TODO(bwplotka): Does the above still is true? It feels weird to leave unfinished calls behind query API. + ctx = tracing.CopyTraceContext(context.Background(), ctx) + ctx = context.WithValue(ctx, tenancy.TenantKey, tenant) + ctx, cancel := context.WithTimeout(ctx, q.selectTimeout) + span, ctx := tracing.StartSpan(ctx, "querier_select", opentracing.Tags{ + "minTime": hints.Start, + "maxTime": hints.End, + "matchers": "{" + strings.Join(matchers, ",") + "}", + }) + + promise := make(chan storage.SeriesSet, 1) + go func() { + defer close(promise) + + var err error + tracing.DoInSpan(ctx, "querier_select_gate_ismyturn", func(ctx context.Context) { + err = q.selectGate.Start(ctx) + }) + if err != nil { + promise <- storage.ErrSeriesSet(errors.Wrap(err, "failed to wait for turn")) + return + } + defer q.selectGate.Done() + + span, ctx := tracing.StartSpan(ctx, "querier_select_select_fn") + defer span.Finish() + + set, stats, err := q.selectFn(ctx, hints, ms...) + if err != nil { + promise <- storage.ErrSeriesSet(err) + return + } + q.seriesStatsReporter(stats) + + promise <- set + }() + + return &lazySeriesSet{create: func() (storage.SeriesSet, bool) { + defer cancel() + defer span.Finish() + + // Only gets called once, for the first Next() call of the series set. + set, ok := <-promise + if !ok { + return storage.ErrSeriesSet(errors.New("channel closed before a value received")), false + } + return set, set.Next() + }} } func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, storepb.SeriesStatsCounter, error) { - sms, err := storepb.PromMatchersToMatchers(ms...) - if err != nil { - return nil, storepb.SeriesStatsCounter{}, errors.Wrap(err, "convert matchers") - } - - aggrs := aggrsFromFunc(hints.Func) - - // TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context. - ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers) - - // TODO(bwplotka): Use inprocess gRPC when we want to stream responses. - // Currently streaming won't help due to nature of the both PromQL engine which - // pulls all series before computations anyway. - resp := &seriesServer{ctx: ctx} - req := storepb.SeriesRequest{ - MinTime: hints.Start, - MaxTime: hints.End, - Limit: int64(hints.Limit), - Matchers: sms, - MaxResolutionWindow: q.maxResolutionMillis, - Aggregates: aggrs, - ShardInfo: q.shardInfo, - PartialResponseStrategy: q.partialResponseStrategy, - SkipChunks: q.skipChunks, - } - if q.isDedupEnabled() { - // Soft ask to sort without replica labels and push them at the end of labelset. - req.WithoutReplicaLabels = q.replicaLabels - } - - if err := q.proxy.Series(&req, resp); err != nil { - return nil, storepb.SeriesStatsCounter{}, errors.Wrap(err, "proxy Series()") - } - q.returnChunksMtx.Lock() - for i := range resp.seriesSet { - q.returnChunks = append(q.returnChunks, resp.seriesSet[i].Chunks...) - } - q.returnChunksMtx.Unlock() - - warns := annotations.New().Merge(resp.warnings) - - if !q.isDedupEnabled() { - return NewPromSeriesSet( - newStoreSeriesSet(resp.seriesSet), - q.mint, - q.maxt, - aggrs, - warns, - ), resp.seriesSetStats, nil - } - - // TODO(bwplotka): Move to deduplication on chunk level inside promSeriesSet, similar to what we have in dedup.NewDedupChunkMerger(). - // This however require big refactor, caring about correct AggrChunk to iterator conversion and counter reset apply. - // For now we apply simple logic that splits potential overlapping chunks into separate replica series, so we can split the work. - set := NewPromSeriesSet( - dedup.NewOverlapSplit(newStoreSeriesSet(resp.seriesSet)), - q.mint, - q.maxt, - aggrs, - warns, - ) - - return dedup.NewSeriesSet(set, hints.Func), resp.seriesSetStats, nil + sms, err := storepb.PromMatchersToMatchers(ms...) + if err != nil { + return nil, storepb.SeriesStatsCounter{}, errors.Wrap(err, "convert matchers") + } + + aggrs := aggrsFromFunc(hints.Func) + + // TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context. + ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers) + + // TODO(bwplotka): Use inprocess gRPC when we want to stream responses. + // Currently streaming won't help due to nature of the both PromQL engine which + // pulls all series before computations anyway. + resp := &seriesServer{ctx: ctx} + req := storepb.SeriesRequest{ + MinTime: hints.Start, + MaxTime: hints.End, + Limit: int64(hints.Limit), + Matchers: sms, + MaxResolutionWindow: q.maxResolutionMillis, + Aggregates: aggrs, + ShardInfo: q.shardInfo, + PartialResponseStrategy: q.partialResponseStrategy, + SkipChunks: q.skipChunks, + } + if q.isDedupEnabled() { + // Soft ask to sort without replica labels and push them at the end of labelset. + req.WithoutReplicaLabels = q.replicaLabels + } + + if err := q.proxy.Series(&req, resp); err != nil { + return nil, storepb.SeriesStatsCounter{}, errors.Wrap(err, "proxy Series()") + } + q.returnChunksMtx.Lock() + for i := range resp.seriesSet { + q.returnChunks = append(q.returnChunks, resp.seriesSet[i].Chunks...) + } + q.returnChunksMtx.Unlock() + + warns := annotations.New().Merge(resp.warnings) + + if !q.isDedupEnabled() { + return NewPromSeriesSet( + newStoreSeriesSet(resp.seriesSet), + q.mint, + q.maxt, + aggrs, + warns, + ), resp.seriesSetStats, nil + } + + // TODO(bwplotka): Move to deduplication on chunk level inside promSeriesSet, similar to what we have in dedup.NewDedupChunkMerger(). + // This however require big refactor, caring about correct AggrChunk to iterator conversion and counter reset apply. + // For now we apply simple logic that splits potential overlapping chunks into separate replica series, so we can split the work. + set := NewPromSeriesSet( + dedup.NewOverlapSplit(newStoreSeriesSet(resp.seriesSet)), + q.mint, + q.maxt, + aggrs, + warns, + ) + + return dedup.NewSeriesSet(set, hints.Func), resp.seriesSetStats, nil } // LabelValues returns all potential values for a label name. func (q *querier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { - span, ctx := tracing.StartSpan(ctx, "querier_label_values") - defer span.Finish() - - // TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context. - ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers) - - pbMatchers, err := storepb.PromMatchersToMatchers(matchers...) - if err != nil { - return nil, nil, errors.Wrap(err, "converting prom matchers to storepb matchers") - } - - if hints == nil { - hints = &storage.LabelHints{} - } - - req := &storepb.LabelValuesRequest{ - Label: name, - PartialResponseStrategy: q.partialResponseStrategy, - Start: q.mint, - End: q.maxt, - Matchers: pbMatchers, - Limit: int64(hints.Limit), - } - - if q.isDedupEnabled() { - req.WithoutReplicaLabels = q.replicaLabels - } - - resp, err := q.proxy.LabelValues(ctx, req) - if err != nil { - return nil, nil, errors.Wrap(err, "proxy LabelValues()") - } - - var warns annotations.Annotations - for _, w := range resp.Warnings { - warns.Add(errors.New(w)) - } - - return resp.Values, warns, nil + span, ctx := tracing.StartSpan(ctx, "querier_label_values") + defer span.Finish() + + // TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context. + ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers) + + pbMatchers, err := storepb.PromMatchersToMatchers(matchers...) + if err != nil { + return nil, nil, errors.Wrap(err, "converting prom matchers to storepb matchers") + } + + if hints == nil { + hints = &storage.LabelHints{} + } + req := &storepb.LabelValuesRequest{ + Label: name, + PartialResponseStrategy: q.partialResponseStrategy, + Start: q.mint, + End: q.maxt, + Matchers: pbMatchers, + Limit: int64(hints.Limit), + } + + if q.isDedupEnabled() { + req.WithoutReplicaLabels = q.replicaLabels + } + + resp, err := q.proxy.LabelValues(ctx, req) + if err != nil { + return nil, nil, errors.Wrap(err, "proxy LabelValues()") + } + + var warns annotations.Annotations + for _, w := range resp.Warnings { + warns.Add(errors.New(w)) + } + + return resp.Values, warns, nil } // LabelNames returns all the unique label names present in the block in sorted order constrained // by the given matchers. func (q *querier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { - span, ctx := tracing.StartSpan(ctx, "querier_label_names") - defer span.Finish() - - // TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context. - ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers) - - pbMatchers, err := storepb.PromMatchersToMatchers(matchers...) - if err != nil { - return nil, nil, errors.Wrap(err, "converting prom matchers to storepb matchers") - } - - if hints == nil { - hints = &storage.LabelHints{} - } - - req := &storepb.LabelNamesRequest{ - PartialResponseStrategy: q.partialResponseStrategy, - Start: q.mint, - End: q.maxt, - Matchers: pbMatchers, - Limit: int64(hints.Limit), - } - - if q.isDedupEnabled() { - req.WithoutReplicaLabels = q.replicaLabels - } - - resp, err := q.proxy.LabelNames(ctx, req) - if err != nil { - return nil, nil, errors.Wrap(err, "proxy LabelNames()") - } - - var warns annotations.Annotations - for _, w := range resp.Warnings { - warns.Add(errors.New(w)) - } - - return resp.Names, warns, nil + span, ctx := tracing.StartSpan(ctx, "querier_label_names") + defer span.Finish() + + // TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context. + ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers) + + pbMatchers, err := storepb.PromMatchersToMatchers(matchers...) + if err != nil { + return nil, nil, errors.Wrap(err, "converting prom matchers to storepb matchers") + } + + if hints == nil { + hints = &storage.LabelHints{} + } + + req := &storepb.LabelNamesRequest{ + PartialResponseStrategy: q.partialResponseStrategy, + Start: q.mint, + End: q.maxt, + Matchers: pbMatchers, + Limit: int64(hints.Limit), + } + + if q.isDedupEnabled() { + req.WithoutReplicaLabels = q.replicaLabels + } + + resp, err := q.proxy.LabelNames(ctx, req) + if err != nil { + return nil, nil, errors.Wrap(err, "proxy LabelNames()") + } + + var warns annotations.Annotations + for _, w := range resp.Warnings { + warns.Add(errors.New(w)) + } + + return resp.Names, warns, nil } func (q *querier) Close() error { - q.returnChunksMtx.Lock() - defer q.returnChunksMtx.Unlock() + q.returnChunksMtx.Lock() + defer q.returnChunksMtx.Unlock() - for _, ch := range q.returnChunks { - ch.ReturnToVTPool() - } - q.returnChunks = q.returnChunks[:0] - returnChunksSlicePool.Put(&q.returnChunks) + for _, ch := range q.returnChunks { + ch.ReturnToVTPool() + } + q.returnChunks = q.returnChunks[:0] + returnChunksSlicePool.Put(&q.returnChunks) - return nil + return nil } diff --git a/pkg/store/acceptance_test.go b/pkg/store/acceptance_test.go index e7e15ca2b4..55c164502b 100644 --- a/pkg/store/acceptance_test.go +++ b/pkg/store/acceptance_test.go @@ -997,7 +997,7 @@ func TestPrometheusStore_Acceptance(t *testing.T) { promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return extLset }, func() (int64, int64) { return timestamp.FromTime(minTime), timestamp.FromTime(maxTime) }, - func() string { return version }) + func() string { return version }, "") testutil.Ok(tt, err) // We build chunks only for SAMPLES method. Make sure we ask for SAMPLES only. diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index f891ad9bb0..17aadccb1e 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -41,6 +41,7 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" + "github.com/thanos-io/thanos/pkg/tenancy" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -61,6 +62,7 @@ type PrometheusStore struct { framesRead prometheus.Histogram storepb.UnimplementedStoreServer + tenantHeader string } // Label{Values,Names} call with matchers is supported for Prometheus versions >= 2.24.0. @@ -81,6 +83,7 @@ func NewPrometheusStore( externalLabelsFn func() labels.Labels, timestamps func() (mint int64, maxt int64), promVersion func() string, + tenantHeader string, ) (*PrometheusStore, error) { if logger == nil { logger = log.NewNopLogger() @@ -105,6 +108,7 @@ func NewPrometheusStore( Buckets: prometheus.ExponentialBuckets(10, 10, 5), }, ), + tenantHeader: tenantHeader, } return p, nil } @@ -153,7 +157,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto if r.SkipChunks { finalExtLset := rmLabels(extLset.Copy(), extLsetToRemove) - labelMaps, err := p.client.SeriesInGRPC(s.Context(), p.base, matchers, r.MinTime, r.MaxTime, int(r.Limit)) + labelMaps, err := p.client.SeriesInGRPC(s.Context(), p.base, matchers, r.MinTime, r.MaxTime, int(r.Limit), p.tenantHeader) if err != nil { return err } @@ -468,10 +472,11 @@ func (p *PrometheusStore) startPromRemoteRead(ctx context.Context, q *prompb.Que if err != nil { return nil, errors.Wrap(err, "unable to create request") } + tenantName, _ := tenancy.GetTenantFromGRPCMetadata(ctx) preq.Header.Add("Content-Encoding", "snappy") preq.Header.Set("Content-Type", "application/x-stream-protobuf") preq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0") - + preq.Header.Set(p.tenantHeader, tenantName) preq.Header.Set("User-Agent", clientconfig.ThanosUserAgent) presp, err = p.client.Do(preq.WithContext(ctx)) if err != nil { @@ -551,12 +556,12 @@ func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesR var lbls []string if len(matchers) == 0 || p.labelCallsSupportMatchers() { - lbls, err = p.client.LabelNamesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit)) + lbls, err = p.client.LabelNamesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit), p.tenantHeader) if err != nil { return nil, err } } else { - sers, err := p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit)) + sers, err := p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit), p.tenantHeader) if err != nil { return nil, err } @@ -622,7 +627,8 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue if len(matchers) == 0 { return &storepb.LabelValuesResponse{Values: []string{val}}, nil } - sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit)) + + sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit), p.tenantHeader) if err != nil { return nil, err } @@ -633,12 +639,12 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue } if len(matchers) == 0 || p.labelCallsSupportMatchers() { - vals, err = p.client.LabelValuesInGRPC(ctx, p.base, r.Label, matchers, r.Start, r.End, int(r.Limit)) + vals, err = p.client.LabelValuesInGRPC(ctx, p.base, r.Label, matchers, r.Start, r.End, int(r.Limit), p.tenantHeader) if err != nil { return nil, err } } else { - sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit)) + sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit), p.tenantHeader) if err != nil { return nil, err } diff --git a/pkg/store/prometheus_test.go b/pkg/store/prometheus_test.go index 8c7b0496c8..6dcb0dde0b 100644 --- a/pkg/store/prometheus_test.go +++ b/pkg/store/prometheus_test.go @@ -71,7 +71,7 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return limitMinT, -1 }, - nil, + nil, "", ) // MaxTime does not matter. testutil.Ok(t, err) @@ -200,7 +200,7 @@ func TestPrometheusStore_SeriesLabels_e2e(t *testing.T) { promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 }, - nil, + nil, "", ) testutil.Ok(t, err) @@ -374,7 +374,7 @@ func TestPrometheusStore_Series_MatchExternalLabel(t *testing.T) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return 0, math.MaxInt64 }, - nil) + nil, "") testutil.Ok(t, err) srv := newStoreSeriesServer(ctx) @@ -437,7 +437,7 @@ func TestPrometheusStore_Series_ChunkHashCalculation_Integration(t *testing.T) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return 0, math.MaxInt64 }, - nil) + nil, "") testutil.Ok(t, err) srv := newStoreSeriesServer(ctx) diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 90ed0fb2c2..5a5081baa7 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -263,6 +263,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. } ctx = metadata.AppendToOutgoingContext(ctx, tenancy.DefaultTenantHeader, tenant) + ctx = metadata.AppendToOutgoingContext(ctx, tenancy.CustomHeader, tenant) level.Debug(s.logger).Log("msg", "Tenant info in Series()", "tenant", tenant) stores, storeLabelSets, storeDebugMsgs := s.matchingStores(ctx, originalRequest.MinTime, originalRequest.MaxTime, matchers) @@ -361,6 +362,7 @@ func (s *ProxyStore) LabelNames(ctx context.Context, originalRequest *storepb.La } ctx = metadata.AppendToOutgoingContext(ctx, tenancy.DefaultTenantHeader, tenant) + ctx = metadata.AppendToOutgoingContext(ctx, tenancy.CustomHeader, tenant) level.Debug(s.logger).Log("msg", "Tenant info in LabelNames()", "tenant", tenant) stores, storeLabelSets, storeDebugMsgs := s.matchingStores(ctx, originalRequest.Start, originalRequest.End, matchers) @@ -375,6 +377,7 @@ func (s *ProxyStore) LabelNames(ctx context.Context, originalRequest *storepb.La End: originalRequest.End, Matchers: append(storeMatchers, MatchersForLabelSets(storeLabelSets)...), WithoutReplicaLabels: originalRequest.WithoutReplicaLabels, + Limit: originalRequest.Limit, Hints: originalRequest.Hints, } @@ -464,7 +467,8 @@ func (s *ProxyStore) LabelValues(ctx context.Context, originalRequest *storepb.L } ctx = metadata.AppendToOutgoingContext(ctx, tenancy.DefaultTenantHeader, tenant) - level.Debug(reqLogger).Log("msg", "Tenant info in LabelValues()", "tenant", tenant) + ctx = metadata.AppendToOutgoingContext(ctx, tenancy.CustomHeader, tenant) + level.Debug(s.logger).Log("msg", "Tenant info in LabelNames()", "tenant", tenant) stores, storeLabelSets, storeDebugMsgs := s.matchingStores(ctx, originalRequest.Start, originalRequest.End, matchers) if len(stores) == 0 { diff --git a/pkg/tenancy/tenancy.go b/pkg/tenancy/tenancy.go index 9da1372933..4784185061 100644 --- a/pkg/tenancy/tenancy.go +++ b/pkg/tenancy/tenancy.go @@ -31,6 +31,8 @@ const ( MetricLabel = "tenant" ) +var CustomHeader = "" + // Allowed fields in client certificates. const ( CertificateFieldOrganization = "organization" @@ -140,6 +142,14 @@ func GetTenantFromGRPCMetadata(ctx context.Context) (string, bool) { return md.Get(DefaultTenantHeader)[0], true } +func GetTenantFromProvidedHeader(ctx context.Context, header string) string { + md, ok := metadata.FromIncomingContext(ctx) + if !ok || len(md.Get(header)) == 0 { + return DefaultTenant + } + return md.Get(header)[0] +} + func EnforceQueryTenancy(tenantLabel string, tenant string, query string) (string, error) { labelMatcher := &labels.Matcher{ Name: tenantLabel, @@ -207,7 +217,7 @@ func RewritePromQL(ctx context.Context, r *http.Request, tenantHeader string, de return "", "", ctx, err } ctx = context.WithValue(ctx, TenantKey, tenant) - + CustomHeader = tenantHeader if enforceTenancy { queryStr, err = EnforceQueryTenancy(tenantLabel, tenant, queryStr) return queryStr, tenant, ctx, err @@ -225,7 +235,7 @@ func RewriteLabelMatchers(ctx context.Context, r *http.Request, tenantHeader str return nil, ctx, err } ctx = context.WithValue(ctx, TenantKey, tenant) - + CustomHeader = tenantHeader matcherSets, err := getLabelMatchers(formMatchers, tenant, enforceTenancy, tenantLabel) if err != nil { return nil, ctx, err diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 071d6b304b..55970f151d 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -1421,7 +1421,7 @@ func labelNames(t *testing.T, ctx context.Context, addr string, matchers []*labe logger := log.NewLogfmtLogger(os.Stdout) logger = log.With(logger, "ts", log.DefaultTimestampUTC) testutil.Ok(t, runutil.RetryWithLog(logger, 2*time.Second, ctx.Done(), func() error { - res, err := promclient.NewDefaultClient().LabelNamesInGRPC(ctx, urlParse(t, "http://"+addr), matchers, start, end, limit) + res, err := promclient.NewDefaultClient().LabelNamesInGRPC(ctx, urlParse(t, "http://"+addr), matchers, start, end, limit, "") if err != nil { return err } @@ -1440,7 +1440,7 @@ func labelValues(t *testing.T, ctx context.Context, addr, label string, matchers logger := log.NewLogfmtLogger(os.Stdout) logger = log.With(logger, "ts", log.DefaultTimestampUTC) testutil.Ok(t, runutil.RetryWithLog(logger, 2*time.Second, ctx.Done(), func() error { - res, err := promclient.NewDefaultClient().LabelValuesInGRPC(ctx, urlParse(t, "http://"+addr), label, matchers, start, end, limit) + res, err := promclient.NewDefaultClient().LabelValuesInGRPC(ctx, urlParse(t, "http://"+addr), label, matchers, start, end, limit, "") if err != nil { return err } @@ -1458,7 +1458,7 @@ func series(t *testing.T, ctx context.Context, addr string, matchers []*labels.M logger := log.NewLogfmtLogger(os.Stdout) logger = log.With(logger, "ts", log.DefaultTimestampUTC) testutil.Ok(t, runutil.RetryWithLog(logger, 2*time.Second, ctx.Done(), func() error { - res, err := promclient.NewDefaultClient().SeriesInGRPC(ctx, urlParse(t, "http://"+addr), matchers, start, end, limit) + res, err := promclient.NewDefaultClient().SeriesInGRPC(ctx, urlParse(t, "http://"+addr), matchers, start, end, limit, "") if err != nil { return err } diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index 912712f0a8..8a1d80e252 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -1401,17 +1401,17 @@ func TestStoreGatewayLazyExpandedPostingsPromQLSmithFuzz(t *testing.T) { minT := e2eutil.RandRange(rnd, startMs, endMs) maxT := e2eutil.RandRange(rnd, minT+1, endMs) - res1, err := client.SeriesInGRPC(ctx, u1, matchers, minT, maxT, 0) + res1, err := client.SeriesInGRPC(ctx, u1, matchers, minT, maxT, 0, "") testutil.Ok(t, err) - res2, err := client.SeriesInGRPC(ctx, u2, matchers, minT, maxT, 0) + res2, err := client.SeriesInGRPC(ctx, u2, matchers, minT, maxT, 0, "") testutil.Ok(t, err) // Try again with a different timestamp and let requests hit posting cache. minT = e2eutil.RandRange(rnd, startMs, endMs) maxT = e2eutil.RandRange(rnd, minT+1, endMs) - newRes1, err := client.SeriesInGRPC(ctx, u1, matchers, minT, maxT, 0) + newRes1, err := client.SeriesInGRPC(ctx, u1, matchers, minT, maxT, 0, "") testutil.Ok(t, err) - newRes2, err := client.SeriesInGRPC(ctx, u2, matchers, minT, maxT, 0) + newRes2, err := client.SeriesInGRPC(ctx, u2, matchers, minT, maxT, 0, "") testutil.Ok(t, err) cases = append(cases, &testCase{ From ab160f07dc049a1ea40d5332f7628f45240600b9 Mon Sep 17 00:00:00 2001 From: Rishabh Kumar Date: Mon, 23 Sep 2024 10:47:05 -0700 Subject: [PATCH 2/3] Revert "[thanos] Changes to address querying multi cluster reads using thanos query" This reverts commit 9cbc630358375937ba9a967fd3118ac701de9139. --- cmd/thanos/config.go | 3 - cmd/thanos/sidecar.go | 3 +- docs/getting-started.md | 2 +- pkg/api/query/v1.go | 2 + pkg/promclient/promclient.go | 46 +- pkg/query/querier.go | 811 +++++++++++++++++---------------- pkg/store/acceptance_test.go | 2 +- pkg/store/prometheus.go | 20 +- pkg/store/prometheus_test.go | 8 +- pkg/store/proxy.go | 6 +- pkg/tenancy/tenancy.go | 14 +- test/e2e/query_test.go | 6 +- test/e2e/store_gateway_test.go | 8 +- 13 files changed, 449 insertions(+), 482 deletions(-) diff --git a/cmd/thanos/config.go b/cmd/thanos/config.go index 400f75c65c..3ce069c1b2 100644 --- a/cmd/thanos/config.go +++ b/cmd/thanos/config.go @@ -21,7 +21,6 @@ import ( "github.com/thanos-io/thanos/pkg/extkingpin" "github.com/thanos-io/thanos/pkg/shipper" - "github.com/thanos-io/thanos/pkg/tenancy" ) type grpcConfig struct { @@ -81,14 +80,12 @@ type prometheusConfig struct { getConfigInterval time.Duration getConfigTimeout time.Duration httpClient *extflag.PathOrContent - tenantHeader string } func (pc *prometheusConfig) registerFlag(cmd extkingpin.FlagClause) *prometheusConfig { cmd.Flag("prometheus.url", "URL at which to reach Prometheus's API. For better performance use local network."). Default("http://localhost:9090").URLVar(&pc.url) - cmd.Flag("prometheus.tenant-header", "HTTP header to determine tenant.").Default(tenancy.DefaultTenantHeader).StringVar(&pc.tenantHeader) cmd.Flag("prometheus.ready_timeout", "Maximum time to wait for the Prometheus instance to start up"). Default("10m").DurationVar(&pc.readyTimeout) diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 077d086c14..014b9fc9df 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -297,8 +297,7 @@ func runSidecar( { c := promclient.NewWithTracingClient(logger, httpClient, clientconfig.ThanosUserAgent) - errors.Wrap(err, "create Prometheus header "+conf.prometheus.tenantHeader) - promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version, conf.prometheus.tenantHeader) + promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version) if err != nil { return errors.Wrap(err, "create Prometheus store") } diff --git a/docs/getting-started.md b/docs/getting-started.md index 7948b63bfb..4f52745dfc 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -197,4 +197,4 @@ However, in case you want to play and run Thanos components on a single node, we | Compact | HTTP | 10912 | | Query Frontend | HTTP | 10913 | -You can see example one-node setup [here](../scripts/sidecar.sh). +You can see example one-node setup [here](../scripts/quickstart.sh). diff --git a/pkg/api/query/v1.go b/pkg/api/query/v1.go index 7c29a69e0a..8d3969cb0e 100644 --- a/pkg/api/query/v1.go +++ b/pkg/api/query/v1.go @@ -1096,6 +1096,7 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A hints := &storage.LabelHints{ Limit: toHintLimit(limit), } + var ( vals []string warnings annotations.Annotations @@ -1280,6 +1281,7 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap hints := &storage.LabelHints{ Limit: toHintLimit(limit), } + if len(matcherSets) > 0 { var callWarnings annotations.Annotations labelNamesSet := make(map[string]struct{}) diff --git a/pkg/promclient/promclient.go b/pkg/promclient/promclient.go index b214e203cb..6f124136b4 100644 --- a/pkg/promclient/promclient.go +++ b/pkg/promclient/promclient.go @@ -40,7 +40,6 @@ import ( "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/targets/targetspb" - "github.com/thanos-io/thanos/pkg/tenancy" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -108,7 +107,6 @@ func NewWithTracingClient(logger log.Logger, httpClient *http.Client, userAgent // the raw query is encoded in the body and the appropriate Content-Type is set. func (c *Client) req2xx(ctx context.Context, u *url.URL, method string, headers http.Header) (_ []byte, _ int, err error) { var b io.Reader - if method == http.MethodPost { rq := u.RawQuery b = strings.NewReader(rq) @@ -693,14 +691,11 @@ func formatTime(t time.Time) string { return strconv.FormatFloat(float64(t.Unix())+float64(t.Nanosecond())/1e9, 'f', -1, 64) } -func (c *Client) get2xxResultWithGRPCErrors(ctx context.Context, spanName string, u *url.URL, data interface{}, header string) error { +func (c *Client) get2xxResultWithGRPCErrors(ctx context.Context, spanName string, u *url.URL, data interface{}) error { span, ctx := tracing.StartSpan(ctx, spanName) defer span.Finish() - var customheader = http.Header{} - customheader.Set(header, tenancy.GetTenantFromProvidedHeader(ctx, header)) - - body, code, err := c.req2xx(ctx, u, http.MethodGet, customheader) + body, code, err := c.req2xx(ctx, u, http.MethodGet, nil) if err != nil { if code, exists := statusToCode[code]; exists && code != 0 { return status.Error(code, err.Error()) @@ -739,7 +734,7 @@ func (c *Client) get2xxResultWithGRPCErrors(ctx context.Context, spanName string // SeriesInGRPC returns the labels from Prometheus series API. It uses gRPC errors. // NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus. -func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int, header string) ([]map[string]string, error) { +func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]map[string]string, error) { u := *base u.Path = path.Join(u.Path, "/api/v1/series") q := u.Query() @@ -747,21 +742,19 @@ func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*la q.Add("match[]", storepb.PromMatchersToString(matchers...)) q.Add("start", formatTime(timestamp.Time(startTime))) q.Add("end", formatTime(timestamp.Time(endTime))) - if limit > 0 { - q.Add("limit", strconv.Itoa(limit)) - } + q.Add("limit", strconv.Itoa(limit)) u.RawQuery = q.Encode() var m struct { Data []map[string]string `json:"data"` } - return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_series HTTP[client]", &u, &m, header) + return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_series HTTP[client]", &u, &m) } // LabelNamesInGRPC returns all known label names constrained by the given matchers. It uses gRPC errors. // NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus. -func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int, header string) ([]string, error) { +func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]string, error) { u := *base u.Path = path.Join(u.Path, "/api/v1/labels") q := u.Query() @@ -771,20 +764,18 @@ func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers [ } q.Add("start", formatTime(timestamp.Time(startTime))) q.Add("end", formatTime(timestamp.Time(endTime))) - if limit >= 0 { - q.Add("limit", strconv.Itoa(limit)) - } + q.Add("limit", strconv.Itoa(limit)) u.RawQuery = q.Encode() var m struct { Data []string `json:"data"` } - return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_label_names HTTP[client]", &u, &m, header) + return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_label_names HTTP[client]", &u, &m) } // LabelValuesInGRPC returns all known label values for a given label name. It uses gRPC errors. // NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus. -func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string, matchers []*labels.Matcher, startTime, endTime int64, limit int, header string) ([]string, error) { +func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]string, error) { u := *base u.Path = path.Join(u.Path, "/api/v1/label/", label, "/values") q := u.Query() @@ -794,16 +785,13 @@ func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label str } q.Add("start", formatTime(timestamp.Time(startTime))) q.Add("end", formatTime(timestamp.Time(endTime))) - if limit >= 0 { - q.Add("limit", strconv.Itoa(limit)) - } - + q.Add("limit", strconv.Itoa(limit)) u.RawQuery = q.Encode() var m struct { Data []string `json:"data"` } - return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_label_values HTTP[client]", &u, &m, header) + return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_label_values HTTP[client]", &u, &m) } // RulesInGRPC returns the rules from Prometheus rules API. It uses gRPC errors. @@ -822,7 +810,7 @@ func (c *Client) RulesInGRPC(ctx context.Context, base *url.URL, typeRules strin Data *rulespb.RuleGroups `json:"data"` } - if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_rules HTTP[client]", &u, &m, ""); err != nil { + if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_rules HTTP[client]", &u, &m); err != nil { return nil, err } @@ -845,7 +833,7 @@ func (c *Client) AlertsInGRPC(ctx context.Context, base *url.URL) ([]*rulespb.Al } `json:"data"` } - if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_alerts HTTP[client]", &u, &m, ""); err != nil { + if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_alerts HTTP[client]", &u, &m); err != nil { return nil, err } @@ -866,7 +854,7 @@ func (c *Client) MetricMetadataInGRPC(ctx context.Context, base *url.URL, metric q.Add("metric", metric) } // We only set limit when it is >= 0. - if limit > 0 { + if limit >= 0 { q.Add("limit", strconv.Itoa(limit)) } @@ -875,7 +863,7 @@ func (c *Client) MetricMetadataInGRPC(ctx context.Context, base *url.URL, metric var v struct { Data map[string][]*metadatapb.Meta `json:"data"` } - return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_metric_metadata HTTP[client]", &u, &v, "") + return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_metric_metadata HTTP[client]", &u, &v) } // ExemplarsInGRPC returns the exemplars from Prometheus exemplars API. It uses gRPC errors. @@ -894,7 +882,7 @@ func (c *Client) ExemplarsInGRPC(ctx context.Context, base *url.URL, query strin Data []*exemplarspb.ExemplarData `json:"data"` } - if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_exemplars HTTP[client]", &u, &m, ""); err != nil { + if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_exemplars HTTP[client]", &u, &m); err != nil { return nil, err } @@ -914,5 +902,5 @@ func (c *Client) TargetsInGRPC(ctx context.Context, base *url.URL, stateTargets var v struct { Data *targetspb.TargetDiscovery `json:"data"` } - return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_targets HTTP[client]", &u, &v, "") + return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_targets HTTP[client]", &u, &v) } diff --git a/pkg/query/querier.go b/pkg/query/querier.go index d783b2b739..588fa00734 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -4,27 +4,27 @@ package query import ( - "context" - "strings" - "sync" - "time" - - "github.com/go-kit/log" - "github.com/opentracing/opentracing-go" - "github.com/pkg/errors" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/util/annotations" - - "github.com/thanos-io/thanos/pkg/dedup" - "github.com/thanos-io/thanos/pkg/extprom" - "github.com/thanos-io/thanos/pkg/gate" - "github.com/thanos-io/thanos/pkg/store" - "github.com/thanos-io/thanos/pkg/store/storepb" - "github.com/thanos-io/thanos/pkg/tenancy" - "github.com/thanos-io/thanos/pkg/tracing" + "context" + "strings" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/annotations" + + "github.com/thanos-io/thanos/pkg/dedup" + "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/gate" + "github.com/thanos-io/thanos/pkg/store" + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/tenancy" + "github.com/thanos-io/thanos/pkg/tracing" ) type seriesStatsReporter func(seriesStats storepb.SeriesStatsCounter) @@ -32,12 +32,12 @@ type seriesStatsReporter func(seriesStats storepb.SeriesStatsCounter) var NoopSeriesStatsReporter seriesStatsReporter = func(_ storepb.SeriesStatsCounter) {} func NewAggregateStatsReporter(stats *[]storepb.SeriesStatsCounter) seriesStatsReporter { - var mutex sync.Mutex - return func(s storepb.SeriesStatsCounter) { - mutex.Lock() - defer mutex.Unlock() - *stats = append(*stats, s) - } + var mutex sync.Mutex + return func(s storepb.SeriesStatsCounter) { + mutex.Lock() + defer mutex.Unlock() + *stats = append(*stats, s) + } } // QueryableCreator returns implementation of promql.Queryable that fetches data from the proxy store API endpoints. @@ -47,445 +47,446 @@ func NewAggregateStatsReporter(stats *[]storepb.SeriesStatsCounter) seriesStatsR // maxResolutionMillis controls downsampling resolution that is allowed (specified in milliseconds). // partialResponse controls `partialResponseDisabled` option of StoreAPI and partial response behavior of proxy. type QueryableCreator func( - deduplicate bool, - replicaLabels []string, - storeDebugMatchers [][]*labels.Matcher, - maxResolutionMillis int64, - partialResponse, - skipChunks bool, - shardInfo *storepb.ShardInfo, - seriesStatsReporter seriesStatsReporter, + deduplicate bool, + replicaLabels []string, + storeDebugMatchers [][]*labels.Matcher, + maxResolutionMillis int64, + partialResponse, + skipChunks bool, + shardInfo *storepb.ShardInfo, + seriesStatsReporter seriesStatsReporter, ) storage.Queryable // NewQueryableCreator creates QueryableCreator. // NOTE(bwplotka): Proxy assumes to be replica_aware, see thanos.store.info.StoreInfo.replica_aware field. func NewQueryableCreator( - logger log.Logger, - reg prometheus.Registerer, - proxy storepb.StoreServer, - maxConcurrentSelects int, - selectTimeout time.Duration, + logger log.Logger, + reg prometheus.Registerer, + proxy storepb.StoreServer, + maxConcurrentSelects int, + selectTimeout time.Duration, ) QueryableCreator { - gf := gate.NewGateFactory(extprom.WrapRegistererWithPrefix("concurrent_selects_", reg), maxConcurrentSelects, gate.Selects) - - return func( - deduplicate bool, - replicaLabels []string, - storeDebugMatchers [][]*labels.Matcher, - maxResolutionMillis int64, - partialResponse, - skipChunks bool, - shardInfo *storepb.ShardInfo, - seriesStatsReporter seriesStatsReporter, - ) storage.Queryable { - return &queryable{ - logger: logger, - replicaLabels: replicaLabels, - storeDebugMatchers: storeDebugMatchers, - proxy: proxy, - deduplicate: deduplicate, - maxResolutionMillis: maxResolutionMillis, - partialResponse: partialResponse, - skipChunks: skipChunks, - gateProviderFn: func() gate.Gate { - return gf.New() - }, - maxConcurrentSelects: maxConcurrentSelects, - selectTimeout: selectTimeout, - shardInfo: shardInfo, - seriesStatsReporter: seriesStatsReporter, - } - } + gf := gate.NewGateFactory(extprom.WrapRegistererWithPrefix("concurrent_selects_", reg), maxConcurrentSelects, gate.Selects) + + return func( + deduplicate bool, + replicaLabels []string, + storeDebugMatchers [][]*labels.Matcher, + maxResolutionMillis int64, + partialResponse, + skipChunks bool, + shardInfo *storepb.ShardInfo, + seriesStatsReporter seriesStatsReporter, + ) storage.Queryable { + return &queryable{ + logger: logger, + replicaLabels: replicaLabels, + storeDebugMatchers: storeDebugMatchers, + proxy: proxy, + deduplicate: deduplicate, + maxResolutionMillis: maxResolutionMillis, + partialResponse: partialResponse, + skipChunks: skipChunks, + gateProviderFn: func() gate.Gate { + return gf.New() + }, + maxConcurrentSelects: maxConcurrentSelects, + selectTimeout: selectTimeout, + shardInfo: shardInfo, + seriesStatsReporter: seriesStatsReporter, + } + } } type queryable struct { - logger log.Logger - replicaLabels []string - storeDebugMatchers [][]*labels.Matcher - proxy storepb.StoreServer - deduplicate bool - maxResolutionMillis int64 - partialResponse bool - skipChunks bool - gateProviderFn func() gate.Gate - maxConcurrentSelects int - selectTimeout time.Duration - shardInfo *storepb.ShardInfo - seriesStatsReporter seriesStatsReporter + logger log.Logger + replicaLabels []string + storeDebugMatchers [][]*labels.Matcher + proxy storepb.StoreServer + deduplicate bool + maxResolutionMillis int64 + partialResponse bool + skipChunks bool + gateProviderFn func() gate.Gate + maxConcurrentSelects int + selectTimeout time.Duration + shardInfo *storepb.ShardInfo + seriesStatsReporter seriesStatsReporter } // Querier returns a new storage querier against the underlying proxy store API. func (q *queryable) Querier(mint, maxt int64) (storage.Querier, error) { - return newQuerier(q.logger, mint, maxt, q.replicaLabels, q.storeDebugMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateProviderFn(), q.selectTimeout, q.shardInfo, q.seriesStatsReporter), nil + return newQuerier(q.logger, mint, maxt, q.replicaLabels, q.storeDebugMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateProviderFn(), q.selectTimeout, q.shardInfo, q.seriesStatsReporter), nil } type querier struct { - logger log.Logger - mint, maxt int64 - replicaLabels []string - storeDebugMatchers [][]*labels.Matcher - proxy storepb.StoreServer - deduplicate bool - maxResolutionMillis int64 - partialResponseStrategy storepb.PartialResponseStrategy - skipChunks bool - selectGate gate.Gate - selectTimeout time.Duration - shardInfo *storepb.ShardInfo - seriesStatsReporter seriesStatsReporter - - returnChunksMtx sync.Mutex - returnChunks []*storepb.AggrChunk + logger log.Logger + mint, maxt int64 + replicaLabels []string + storeDebugMatchers [][]*labels.Matcher + proxy storepb.StoreServer + deduplicate bool + maxResolutionMillis int64 + partialResponseStrategy storepb.PartialResponseStrategy + skipChunks bool + selectGate gate.Gate + selectTimeout time.Duration + shardInfo *storepb.ShardInfo + seriesStatsReporter seriesStatsReporter + + returnChunksMtx sync.Mutex + returnChunks []*storepb.AggrChunk } var returnChunksSlicePool = sync.Pool{ - New: func() interface{} { - r := make([]*storepb.AggrChunk, 0) - return &r - }, + New: func() interface{} { + r := make([]*storepb.AggrChunk, 0) + return &r + }, } // newQuerier creates implementation of storage.Querier that fetches data from the proxy // store API endpoints. func newQuerier( - logger log.Logger, - mint, - maxt int64, - replicaLabels []string, - storeDebugMatchers [][]*labels.Matcher, - proxy storepb.StoreServer, - deduplicate bool, - maxResolutionMillis int64, - partialResponse, - skipChunks bool, - selectGate gate.Gate, - selectTimeout time.Duration, - shardInfo *storepb.ShardInfo, - seriesStatsReporter seriesStatsReporter, + logger log.Logger, + mint, + maxt int64, + replicaLabels []string, + storeDebugMatchers [][]*labels.Matcher, + proxy storepb.StoreServer, + deduplicate bool, + maxResolutionMillis int64, + partialResponse, + skipChunks bool, + selectGate gate.Gate, + selectTimeout time.Duration, + shardInfo *storepb.ShardInfo, + seriesStatsReporter seriesStatsReporter, ) *querier { - if logger == nil { - logger = log.NewNopLogger() - } - rl := make(map[string]struct{}) - for _, replicaLabel := range replicaLabels { - rl[replicaLabel] = struct{}{} - } - - partialResponseStrategy := storepb.PartialResponseStrategy_ABORT - if partialResponse { - partialResponseStrategy = storepb.PartialResponseStrategy_WARN - } - - returnChunks := returnChunksSlicePool.Get().(*[]*storepb.AggrChunk) - return &querier{ - logger: logger, - selectGate: selectGate, - selectTimeout: selectTimeout, - - mint: mint, - maxt: maxt, - replicaLabels: replicaLabels, - storeDebugMatchers: storeDebugMatchers, - proxy: proxy, - deduplicate: deduplicate, - maxResolutionMillis: maxResolutionMillis, - partialResponseStrategy: partialResponseStrategy, - skipChunks: skipChunks, - shardInfo: shardInfo, - seriesStatsReporter: seriesStatsReporter, - returnChunks: *returnChunks, - } + if logger == nil { + logger = log.NewNopLogger() + } + rl := make(map[string]struct{}) + for _, replicaLabel := range replicaLabels { + rl[replicaLabel] = struct{}{} + } + + partialResponseStrategy := storepb.PartialResponseStrategy_ABORT + if partialResponse { + partialResponseStrategy = storepb.PartialResponseStrategy_WARN + } + + returnChunks := returnChunksSlicePool.Get().(*[]*storepb.AggrChunk) + return &querier{ + logger: logger, + selectGate: selectGate, + selectTimeout: selectTimeout, + + mint: mint, + maxt: maxt, + replicaLabels: replicaLabels, + storeDebugMatchers: storeDebugMatchers, + proxy: proxy, + deduplicate: deduplicate, + maxResolutionMillis: maxResolutionMillis, + partialResponseStrategy: partialResponseStrategy, + skipChunks: skipChunks, + shardInfo: shardInfo, + seriesStatsReporter: seriesStatsReporter, + returnChunks: *returnChunks, + } } func (q *querier) isDedupEnabled() bool { - return q.deduplicate && len(q.replicaLabels) > 0 + return q.deduplicate && len(q.replicaLabels) > 0 } type seriesServer struct { - // This field just exist to pseudo-implement the unused methods of the interface. - storepb.Store_SeriesServer - ctx context.Context + // This field just exist to pseudo-implement the unused methods of the interface. + storepb.Store_SeriesServer + ctx context.Context - seriesSet []storepb.Series - seriesSetStats storepb.SeriesStatsCounter - warnings annotations.Annotations + seriesSet []storepb.Series + seriesSetStats storepb.SeriesStatsCounter + warnings annotations.Annotations } func (s *seriesServer) Send(r *storepb.SeriesResponse) error { - if r.GetWarning() != "" { - s.warnings.Add(errors.New(r.GetWarning())) - return nil - } - - if r.GetSeries() != nil { - s.seriesSet = append(s.seriesSet, *r.GetSeries()) - s.seriesSetStats.Count(r.GetSeries()) - return nil - } - - // Unsupported field, skip. - return nil + if r.GetWarning() != "" { + s.warnings.Add(errors.New(r.GetWarning())) + return nil + } + + if r.GetSeries() != nil { + s.seriesSet = append(s.seriesSet, *r.GetSeries()) + s.seriesSetStats.Count(r.GetSeries()) + return nil + } + + // Unsupported field, skip. + return nil } func (s *seriesServer) Context() context.Context { - return s.ctx + return s.ctx } // aggrsFromFunc infers aggregates of the underlying data based on the wrapping // function of a series selection. func aggrsFromFunc(f string) []storepb.Aggr { - if f == "min" || strings.HasPrefix(f, "min_") { - return []storepb.Aggr{storepb.Aggr_MIN} - } - if f == "max" || strings.HasPrefix(f, "max_") { - return []storepb.Aggr{storepb.Aggr_MAX} - } - if f == "count" || strings.HasPrefix(f, "count_") { - return []storepb.Aggr{storepb.Aggr_COUNT} - } - // f == "sum" falls through here since we want the actual samples. - if strings.HasPrefix(f, "sum_") { - return []storepb.Aggr{storepb.Aggr_SUM} - } - if f == "increase" || f == "rate" || f == "irate" || f == "resets" { - return []storepb.Aggr{storepb.Aggr_COUNTER} - } - // In the default case, we retrieve count and sum to compute an average. - return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM} + if f == "min" || strings.HasPrefix(f, "min_") { + return []storepb.Aggr{storepb.Aggr_MIN} + } + if f == "max" || strings.HasPrefix(f, "max_") { + return []storepb.Aggr{storepb.Aggr_MAX} + } + if f == "count" || strings.HasPrefix(f, "count_") { + return []storepb.Aggr{storepb.Aggr_COUNT} + } + // f == "sum" falls through here since we want the actual samples. + if strings.HasPrefix(f, "sum_") { + return []storepb.Aggr{storepb.Aggr_SUM} + } + if f == "increase" || f == "rate" || f == "irate" || f == "resets" { + return []storepb.Aggr{storepb.Aggr_COUNTER} + } + // In the default case, we retrieve count and sum to compute an average. + return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM} } func (q *querier) Select(ctx context.Context, _ bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet { - if hints == nil { - hints = &storage.SelectHints{ - Start: q.mint, - End: q.maxt, - } - } else { - // NOTE(GiedriusS): need to make a copy here - // because the PromQL engine sorts these and - // we later on call String() the whole request (including this slice). - grouping := make([]string, 0, len(hints.Grouping)) - grouping = append(grouping, hints.Grouping...) - hints.Grouping = grouping - } - - matchers := make([]string, len(ms)) - for i, m := range ms { - matchers[i] = m.String() - } - tenant := ctx.Value(tenancy.TenantKey) - // The context gets canceled as soon as query evaluation is completed by the engine. - // We want to prevent this from happening for the async store API calls we make while preserving tracing context. - // TODO(bwplotka): Does the above still is true? It feels weird to leave unfinished calls behind query API. - ctx = tracing.CopyTraceContext(context.Background(), ctx) - ctx = context.WithValue(ctx, tenancy.TenantKey, tenant) - ctx, cancel := context.WithTimeout(ctx, q.selectTimeout) - span, ctx := tracing.StartSpan(ctx, "querier_select", opentracing.Tags{ - "minTime": hints.Start, - "maxTime": hints.End, - "matchers": "{" + strings.Join(matchers, ",") + "}", - }) - - promise := make(chan storage.SeriesSet, 1) - go func() { - defer close(promise) - - var err error - tracing.DoInSpan(ctx, "querier_select_gate_ismyturn", func(ctx context.Context) { - err = q.selectGate.Start(ctx) - }) - if err != nil { - promise <- storage.ErrSeriesSet(errors.Wrap(err, "failed to wait for turn")) - return - } - defer q.selectGate.Done() - - span, ctx := tracing.StartSpan(ctx, "querier_select_select_fn") - defer span.Finish() - - set, stats, err := q.selectFn(ctx, hints, ms...) - if err != nil { - promise <- storage.ErrSeriesSet(err) - return - } - q.seriesStatsReporter(stats) - - promise <- set - }() - - return &lazySeriesSet{create: func() (storage.SeriesSet, bool) { - defer cancel() - defer span.Finish() - - // Only gets called once, for the first Next() call of the series set. - set, ok := <-promise - if !ok { - return storage.ErrSeriesSet(errors.New("channel closed before a value received")), false - } - return set, set.Next() - }} + if hints == nil { + hints = &storage.SelectHints{ + Start: q.mint, + End: q.maxt, + } + } else { + // NOTE(GiedriusS): need to make a copy here + // because the PromQL engine sorts these and + // we later on call String() the whole request (including this slice). + grouping := make([]string, 0, len(hints.Grouping)) + grouping = append(grouping, hints.Grouping...) + hints.Grouping = grouping + } + + matchers := make([]string, len(ms)) + for i, m := range ms { + matchers[i] = m.String() + } + tenant := ctx.Value(tenancy.TenantKey) + // The context gets canceled as soon as query evaluation is completed by the engine. + // We want to prevent this from happening for the async store API calls we make while preserving tracing context. + // TODO(bwplotka): Does the above still is true? It feels weird to leave unfinished calls behind query API. + ctx = tracing.CopyTraceContext(context.Background(), ctx) + ctx = context.WithValue(ctx, tenancy.TenantKey, tenant) + ctx, cancel := context.WithTimeout(ctx, q.selectTimeout) + span, ctx := tracing.StartSpan(ctx, "querier_select", opentracing.Tags{ + "minTime": hints.Start, + "maxTime": hints.End, + "matchers": "{" + strings.Join(matchers, ",") + "}", + }) + + promise := make(chan storage.SeriesSet, 1) + go func() { + defer close(promise) + + var err error + tracing.DoInSpan(ctx, "querier_select_gate_ismyturn", func(ctx context.Context) { + err = q.selectGate.Start(ctx) + }) + if err != nil { + promise <- storage.ErrSeriesSet(errors.Wrap(err, "failed to wait for turn")) + return + } + defer q.selectGate.Done() + + span, ctx := tracing.StartSpan(ctx, "querier_select_select_fn") + defer span.Finish() + + set, stats, err := q.selectFn(ctx, hints, ms...) + if err != nil { + promise <- storage.ErrSeriesSet(err) + return + } + q.seriesStatsReporter(stats) + + promise <- set + }() + + return &lazySeriesSet{create: func() (storage.SeriesSet, bool) { + defer cancel() + defer span.Finish() + + // Only gets called once, for the first Next() call of the series set. + set, ok := <-promise + if !ok { + return storage.ErrSeriesSet(errors.New("channel closed before a value received")), false + } + return set, set.Next() + }} } func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, storepb.SeriesStatsCounter, error) { - sms, err := storepb.PromMatchersToMatchers(ms...) - if err != nil { - return nil, storepb.SeriesStatsCounter{}, errors.Wrap(err, "convert matchers") - } - - aggrs := aggrsFromFunc(hints.Func) - - // TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context. - ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers) - - // TODO(bwplotka): Use inprocess gRPC when we want to stream responses. - // Currently streaming won't help due to nature of the both PromQL engine which - // pulls all series before computations anyway. - resp := &seriesServer{ctx: ctx} - req := storepb.SeriesRequest{ - MinTime: hints.Start, - MaxTime: hints.End, - Limit: int64(hints.Limit), - Matchers: sms, - MaxResolutionWindow: q.maxResolutionMillis, - Aggregates: aggrs, - ShardInfo: q.shardInfo, - PartialResponseStrategy: q.partialResponseStrategy, - SkipChunks: q.skipChunks, - } - if q.isDedupEnabled() { - // Soft ask to sort without replica labels and push them at the end of labelset. - req.WithoutReplicaLabels = q.replicaLabels - } - - if err := q.proxy.Series(&req, resp); err != nil { - return nil, storepb.SeriesStatsCounter{}, errors.Wrap(err, "proxy Series()") - } - q.returnChunksMtx.Lock() - for i := range resp.seriesSet { - q.returnChunks = append(q.returnChunks, resp.seriesSet[i].Chunks...) - } - q.returnChunksMtx.Unlock() - - warns := annotations.New().Merge(resp.warnings) - - if !q.isDedupEnabled() { - return NewPromSeriesSet( - newStoreSeriesSet(resp.seriesSet), - q.mint, - q.maxt, - aggrs, - warns, - ), resp.seriesSetStats, nil - } - - // TODO(bwplotka): Move to deduplication on chunk level inside promSeriesSet, similar to what we have in dedup.NewDedupChunkMerger(). - // This however require big refactor, caring about correct AggrChunk to iterator conversion and counter reset apply. - // For now we apply simple logic that splits potential overlapping chunks into separate replica series, so we can split the work. - set := NewPromSeriesSet( - dedup.NewOverlapSplit(newStoreSeriesSet(resp.seriesSet)), - q.mint, - q.maxt, - aggrs, - warns, - ) - - return dedup.NewSeriesSet(set, hints.Func), resp.seriesSetStats, nil + sms, err := storepb.PromMatchersToMatchers(ms...) + if err != nil { + return nil, storepb.SeriesStatsCounter{}, errors.Wrap(err, "convert matchers") + } + + aggrs := aggrsFromFunc(hints.Func) + + // TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context. + ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers) + + // TODO(bwplotka): Use inprocess gRPC when we want to stream responses. + // Currently streaming won't help due to nature of the both PromQL engine which + // pulls all series before computations anyway. + resp := &seriesServer{ctx: ctx} + req := storepb.SeriesRequest{ + MinTime: hints.Start, + MaxTime: hints.End, + Limit: int64(hints.Limit), + Matchers: sms, + MaxResolutionWindow: q.maxResolutionMillis, + Aggregates: aggrs, + ShardInfo: q.shardInfo, + PartialResponseStrategy: q.partialResponseStrategy, + SkipChunks: q.skipChunks, + } + if q.isDedupEnabled() { + // Soft ask to sort without replica labels and push them at the end of labelset. + req.WithoutReplicaLabels = q.replicaLabels + } + + if err := q.proxy.Series(&req, resp); err != nil { + return nil, storepb.SeriesStatsCounter{}, errors.Wrap(err, "proxy Series()") + } + q.returnChunksMtx.Lock() + for i := range resp.seriesSet { + q.returnChunks = append(q.returnChunks, resp.seriesSet[i].Chunks...) + } + q.returnChunksMtx.Unlock() + + warns := annotations.New().Merge(resp.warnings) + + if !q.isDedupEnabled() { + return NewPromSeriesSet( + newStoreSeriesSet(resp.seriesSet), + q.mint, + q.maxt, + aggrs, + warns, + ), resp.seriesSetStats, nil + } + + // TODO(bwplotka): Move to deduplication on chunk level inside promSeriesSet, similar to what we have in dedup.NewDedupChunkMerger(). + // This however require big refactor, caring about correct AggrChunk to iterator conversion and counter reset apply. + // For now we apply simple logic that splits potential overlapping chunks into separate replica series, so we can split the work. + set := NewPromSeriesSet( + dedup.NewOverlapSplit(newStoreSeriesSet(resp.seriesSet)), + q.mint, + q.maxt, + aggrs, + warns, + ) + + return dedup.NewSeriesSet(set, hints.Func), resp.seriesSetStats, nil } // LabelValues returns all potential values for a label name. func (q *querier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { - span, ctx := tracing.StartSpan(ctx, "querier_label_values") - defer span.Finish() - - // TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context. - ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers) - - pbMatchers, err := storepb.PromMatchersToMatchers(matchers...) - if err != nil { - return nil, nil, errors.Wrap(err, "converting prom matchers to storepb matchers") - } - - if hints == nil { - hints = &storage.LabelHints{} - } - req := &storepb.LabelValuesRequest{ - Label: name, - PartialResponseStrategy: q.partialResponseStrategy, - Start: q.mint, - End: q.maxt, - Matchers: pbMatchers, - Limit: int64(hints.Limit), - } - - if q.isDedupEnabled() { - req.WithoutReplicaLabels = q.replicaLabels - } - - resp, err := q.proxy.LabelValues(ctx, req) - if err != nil { - return nil, nil, errors.Wrap(err, "proxy LabelValues()") - } - - var warns annotations.Annotations - for _, w := range resp.Warnings { - warns.Add(errors.New(w)) - } - - return resp.Values, warns, nil + span, ctx := tracing.StartSpan(ctx, "querier_label_values") + defer span.Finish() + + // TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context. + ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers) + + pbMatchers, err := storepb.PromMatchersToMatchers(matchers...) + if err != nil { + return nil, nil, errors.Wrap(err, "converting prom matchers to storepb matchers") + } + + if hints == nil { + hints = &storage.LabelHints{} + } + + req := &storepb.LabelValuesRequest{ + Label: name, + PartialResponseStrategy: q.partialResponseStrategy, + Start: q.mint, + End: q.maxt, + Matchers: pbMatchers, + Limit: int64(hints.Limit), + } + + if q.isDedupEnabled() { + req.WithoutReplicaLabels = q.replicaLabels + } + + resp, err := q.proxy.LabelValues(ctx, req) + if err != nil { + return nil, nil, errors.Wrap(err, "proxy LabelValues()") + } + + var warns annotations.Annotations + for _, w := range resp.Warnings { + warns.Add(errors.New(w)) + } + + return resp.Values, warns, nil } // LabelNames returns all the unique label names present in the block in sorted order constrained // by the given matchers. func (q *querier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { - span, ctx := tracing.StartSpan(ctx, "querier_label_names") - defer span.Finish() - - // TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context. - ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers) - - pbMatchers, err := storepb.PromMatchersToMatchers(matchers...) - if err != nil { - return nil, nil, errors.Wrap(err, "converting prom matchers to storepb matchers") - } - - if hints == nil { - hints = &storage.LabelHints{} - } - - req := &storepb.LabelNamesRequest{ - PartialResponseStrategy: q.partialResponseStrategy, - Start: q.mint, - End: q.maxt, - Matchers: pbMatchers, - Limit: int64(hints.Limit), - } - - if q.isDedupEnabled() { - req.WithoutReplicaLabels = q.replicaLabels - } - - resp, err := q.proxy.LabelNames(ctx, req) - if err != nil { - return nil, nil, errors.Wrap(err, "proxy LabelNames()") - } - - var warns annotations.Annotations - for _, w := range resp.Warnings { - warns.Add(errors.New(w)) - } - - return resp.Names, warns, nil + span, ctx := tracing.StartSpan(ctx, "querier_label_names") + defer span.Finish() + + // TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context. + ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers) + + pbMatchers, err := storepb.PromMatchersToMatchers(matchers...) + if err != nil { + return nil, nil, errors.Wrap(err, "converting prom matchers to storepb matchers") + } + + if hints == nil { + hints = &storage.LabelHints{} + } + + req := &storepb.LabelNamesRequest{ + PartialResponseStrategy: q.partialResponseStrategy, + Start: q.mint, + End: q.maxt, + Matchers: pbMatchers, + Limit: int64(hints.Limit), + } + + if q.isDedupEnabled() { + req.WithoutReplicaLabels = q.replicaLabels + } + + resp, err := q.proxy.LabelNames(ctx, req) + if err != nil { + return nil, nil, errors.Wrap(err, "proxy LabelNames()") + } + + var warns annotations.Annotations + for _, w := range resp.Warnings { + warns.Add(errors.New(w)) + } + + return resp.Names, warns, nil } func (q *querier) Close() error { - q.returnChunksMtx.Lock() - defer q.returnChunksMtx.Unlock() + q.returnChunksMtx.Lock() + defer q.returnChunksMtx.Unlock() - for _, ch := range q.returnChunks { - ch.ReturnToVTPool() - } - q.returnChunks = q.returnChunks[:0] - returnChunksSlicePool.Put(&q.returnChunks) + for _, ch := range q.returnChunks { + ch.ReturnToVTPool() + } + q.returnChunks = q.returnChunks[:0] + returnChunksSlicePool.Put(&q.returnChunks) - return nil + return nil } diff --git a/pkg/store/acceptance_test.go b/pkg/store/acceptance_test.go index 55c164502b..e7e15ca2b4 100644 --- a/pkg/store/acceptance_test.go +++ b/pkg/store/acceptance_test.go @@ -997,7 +997,7 @@ func TestPrometheusStore_Acceptance(t *testing.T) { promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return extLset }, func() (int64, int64) { return timestamp.FromTime(minTime), timestamp.FromTime(maxTime) }, - func() string { return version }, "") + func() string { return version }) testutil.Ok(tt, err) // We build chunks only for SAMPLES method. Make sure we ask for SAMPLES only. diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 17aadccb1e..f891ad9bb0 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -41,7 +41,6 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" - "github.com/thanos-io/thanos/pkg/tenancy" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -62,7 +61,6 @@ type PrometheusStore struct { framesRead prometheus.Histogram storepb.UnimplementedStoreServer - tenantHeader string } // Label{Values,Names} call with matchers is supported for Prometheus versions >= 2.24.0. @@ -83,7 +81,6 @@ func NewPrometheusStore( externalLabelsFn func() labels.Labels, timestamps func() (mint int64, maxt int64), promVersion func() string, - tenantHeader string, ) (*PrometheusStore, error) { if logger == nil { logger = log.NewNopLogger() @@ -108,7 +105,6 @@ func NewPrometheusStore( Buckets: prometheus.ExponentialBuckets(10, 10, 5), }, ), - tenantHeader: tenantHeader, } return p, nil } @@ -157,7 +153,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto if r.SkipChunks { finalExtLset := rmLabels(extLset.Copy(), extLsetToRemove) - labelMaps, err := p.client.SeriesInGRPC(s.Context(), p.base, matchers, r.MinTime, r.MaxTime, int(r.Limit), p.tenantHeader) + labelMaps, err := p.client.SeriesInGRPC(s.Context(), p.base, matchers, r.MinTime, r.MaxTime, int(r.Limit)) if err != nil { return err } @@ -472,11 +468,10 @@ func (p *PrometheusStore) startPromRemoteRead(ctx context.Context, q *prompb.Que if err != nil { return nil, errors.Wrap(err, "unable to create request") } - tenantName, _ := tenancy.GetTenantFromGRPCMetadata(ctx) preq.Header.Add("Content-Encoding", "snappy") preq.Header.Set("Content-Type", "application/x-stream-protobuf") preq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0") - preq.Header.Set(p.tenantHeader, tenantName) + preq.Header.Set("User-Agent", clientconfig.ThanosUserAgent) presp, err = p.client.Do(preq.WithContext(ctx)) if err != nil { @@ -556,12 +551,12 @@ func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesR var lbls []string if len(matchers) == 0 || p.labelCallsSupportMatchers() { - lbls, err = p.client.LabelNamesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit), p.tenantHeader) + lbls, err = p.client.LabelNamesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit)) if err != nil { return nil, err } } else { - sers, err := p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit), p.tenantHeader) + sers, err := p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit)) if err != nil { return nil, err } @@ -627,8 +622,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue if len(matchers) == 0 { return &storepb.LabelValuesResponse{Values: []string{val}}, nil } - - sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit), p.tenantHeader) + sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit)) if err != nil { return nil, err } @@ -639,12 +633,12 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue } if len(matchers) == 0 || p.labelCallsSupportMatchers() { - vals, err = p.client.LabelValuesInGRPC(ctx, p.base, r.Label, matchers, r.Start, r.End, int(r.Limit), p.tenantHeader) + vals, err = p.client.LabelValuesInGRPC(ctx, p.base, r.Label, matchers, r.Start, r.End, int(r.Limit)) if err != nil { return nil, err } } else { - sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit), p.tenantHeader) + sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit)) if err != nil { return nil, err } diff --git a/pkg/store/prometheus_test.go b/pkg/store/prometheus_test.go index 6dcb0dde0b..8c7b0496c8 100644 --- a/pkg/store/prometheus_test.go +++ b/pkg/store/prometheus_test.go @@ -71,7 +71,7 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return limitMinT, -1 }, - nil, "", + nil, ) // MaxTime does not matter. testutil.Ok(t, err) @@ -200,7 +200,7 @@ func TestPrometheusStore_SeriesLabels_e2e(t *testing.T) { promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 }, - nil, "", + nil, ) testutil.Ok(t, err) @@ -374,7 +374,7 @@ func TestPrometheusStore_Series_MatchExternalLabel(t *testing.T) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return 0, math.MaxInt64 }, - nil, "") + nil) testutil.Ok(t, err) srv := newStoreSeriesServer(ctx) @@ -437,7 +437,7 @@ func TestPrometheusStore_Series_ChunkHashCalculation_Integration(t *testing.T) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return 0, math.MaxInt64 }, - nil, "") + nil) testutil.Ok(t, err) srv := newStoreSeriesServer(ctx) diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 5a5081baa7..90ed0fb2c2 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -263,7 +263,6 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. } ctx = metadata.AppendToOutgoingContext(ctx, tenancy.DefaultTenantHeader, tenant) - ctx = metadata.AppendToOutgoingContext(ctx, tenancy.CustomHeader, tenant) level.Debug(s.logger).Log("msg", "Tenant info in Series()", "tenant", tenant) stores, storeLabelSets, storeDebugMsgs := s.matchingStores(ctx, originalRequest.MinTime, originalRequest.MaxTime, matchers) @@ -362,7 +361,6 @@ func (s *ProxyStore) LabelNames(ctx context.Context, originalRequest *storepb.La } ctx = metadata.AppendToOutgoingContext(ctx, tenancy.DefaultTenantHeader, tenant) - ctx = metadata.AppendToOutgoingContext(ctx, tenancy.CustomHeader, tenant) level.Debug(s.logger).Log("msg", "Tenant info in LabelNames()", "tenant", tenant) stores, storeLabelSets, storeDebugMsgs := s.matchingStores(ctx, originalRequest.Start, originalRequest.End, matchers) @@ -377,7 +375,6 @@ func (s *ProxyStore) LabelNames(ctx context.Context, originalRequest *storepb.La End: originalRequest.End, Matchers: append(storeMatchers, MatchersForLabelSets(storeLabelSets)...), WithoutReplicaLabels: originalRequest.WithoutReplicaLabels, - Limit: originalRequest.Limit, Hints: originalRequest.Hints, } @@ -467,8 +464,7 @@ func (s *ProxyStore) LabelValues(ctx context.Context, originalRequest *storepb.L } ctx = metadata.AppendToOutgoingContext(ctx, tenancy.DefaultTenantHeader, tenant) - ctx = metadata.AppendToOutgoingContext(ctx, tenancy.CustomHeader, tenant) - level.Debug(s.logger).Log("msg", "Tenant info in LabelNames()", "tenant", tenant) + level.Debug(reqLogger).Log("msg", "Tenant info in LabelValues()", "tenant", tenant) stores, storeLabelSets, storeDebugMsgs := s.matchingStores(ctx, originalRequest.Start, originalRequest.End, matchers) if len(stores) == 0 { diff --git a/pkg/tenancy/tenancy.go b/pkg/tenancy/tenancy.go index 4784185061..9da1372933 100644 --- a/pkg/tenancy/tenancy.go +++ b/pkg/tenancy/tenancy.go @@ -31,8 +31,6 @@ const ( MetricLabel = "tenant" ) -var CustomHeader = "" - // Allowed fields in client certificates. const ( CertificateFieldOrganization = "organization" @@ -142,14 +140,6 @@ func GetTenantFromGRPCMetadata(ctx context.Context) (string, bool) { return md.Get(DefaultTenantHeader)[0], true } -func GetTenantFromProvidedHeader(ctx context.Context, header string) string { - md, ok := metadata.FromIncomingContext(ctx) - if !ok || len(md.Get(header)) == 0 { - return DefaultTenant - } - return md.Get(header)[0] -} - func EnforceQueryTenancy(tenantLabel string, tenant string, query string) (string, error) { labelMatcher := &labels.Matcher{ Name: tenantLabel, @@ -217,7 +207,7 @@ func RewritePromQL(ctx context.Context, r *http.Request, tenantHeader string, de return "", "", ctx, err } ctx = context.WithValue(ctx, TenantKey, tenant) - CustomHeader = tenantHeader + if enforceTenancy { queryStr, err = EnforceQueryTenancy(tenantLabel, tenant, queryStr) return queryStr, tenant, ctx, err @@ -235,7 +225,7 @@ func RewriteLabelMatchers(ctx context.Context, r *http.Request, tenantHeader str return nil, ctx, err } ctx = context.WithValue(ctx, TenantKey, tenant) - CustomHeader = tenantHeader + matcherSets, err := getLabelMatchers(formMatchers, tenant, enforceTenancy, tenantLabel) if err != nil { return nil, ctx, err diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 55970f151d..071d6b304b 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -1421,7 +1421,7 @@ func labelNames(t *testing.T, ctx context.Context, addr string, matchers []*labe logger := log.NewLogfmtLogger(os.Stdout) logger = log.With(logger, "ts", log.DefaultTimestampUTC) testutil.Ok(t, runutil.RetryWithLog(logger, 2*time.Second, ctx.Done(), func() error { - res, err := promclient.NewDefaultClient().LabelNamesInGRPC(ctx, urlParse(t, "http://"+addr), matchers, start, end, limit, "") + res, err := promclient.NewDefaultClient().LabelNamesInGRPC(ctx, urlParse(t, "http://"+addr), matchers, start, end, limit) if err != nil { return err } @@ -1440,7 +1440,7 @@ func labelValues(t *testing.T, ctx context.Context, addr, label string, matchers logger := log.NewLogfmtLogger(os.Stdout) logger = log.With(logger, "ts", log.DefaultTimestampUTC) testutil.Ok(t, runutil.RetryWithLog(logger, 2*time.Second, ctx.Done(), func() error { - res, err := promclient.NewDefaultClient().LabelValuesInGRPC(ctx, urlParse(t, "http://"+addr), label, matchers, start, end, limit, "") + res, err := promclient.NewDefaultClient().LabelValuesInGRPC(ctx, urlParse(t, "http://"+addr), label, matchers, start, end, limit) if err != nil { return err } @@ -1458,7 +1458,7 @@ func series(t *testing.T, ctx context.Context, addr string, matchers []*labels.M logger := log.NewLogfmtLogger(os.Stdout) logger = log.With(logger, "ts", log.DefaultTimestampUTC) testutil.Ok(t, runutil.RetryWithLog(logger, 2*time.Second, ctx.Done(), func() error { - res, err := promclient.NewDefaultClient().SeriesInGRPC(ctx, urlParse(t, "http://"+addr), matchers, start, end, limit, "") + res, err := promclient.NewDefaultClient().SeriesInGRPC(ctx, urlParse(t, "http://"+addr), matchers, start, end, limit) if err != nil { return err } diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index 8a1d80e252..912712f0a8 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -1401,17 +1401,17 @@ func TestStoreGatewayLazyExpandedPostingsPromQLSmithFuzz(t *testing.T) { minT := e2eutil.RandRange(rnd, startMs, endMs) maxT := e2eutil.RandRange(rnd, minT+1, endMs) - res1, err := client.SeriesInGRPC(ctx, u1, matchers, minT, maxT, 0, "") + res1, err := client.SeriesInGRPC(ctx, u1, matchers, minT, maxT, 0) testutil.Ok(t, err) - res2, err := client.SeriesInGRPC(ctx, u2, matchers, minT, maxT, 0, "") + res2, err := client.SeriesInGRPC(ctx, u2, matchers, minT, maxT, 0) testutil.Ok(t, err) // Try again with a different timestamp and let requests hit posting cache. minT = e2eutil.RandRange(rnd, startMs, endMs) maxT = e2eutil.RandRange(rnd, minT+1, endMs) - newRes1, err := client.SeriesInGRPC(ctx, u1, matchers, minT, maxT, 0, "") + newRes1, err := client.SeriesInGRPC(ctx, u1, matchers, minT, maxT, 0) testutil.Ok(t, err) - newRes2, err := client.SeriesInGRPC(ctx, u2, matchers, minT, maxT, 0, "") + newRes2, err := client.SeriesInGRPC(ctx, u2, matchers, minT, maxT, 0) testutil.Ok(t, err) cases = append(cases, &testCase{ From 566ccc2cb75b799a41a95cc082e8f8d2c74ea713 Mon Sep 17 00:00:00 2001 From: Rishabh Kumar Date: Fri, 27 Sep 2024 14:16:05 -0700 Subject: [PATCH 3/3] update Signed-off-by: Rishabh Kumar --- Makefile | 6 +++-- cmd/thanos/config.go | 3 +++ cmd/thanos/sidecar.go | 2 +- docs/components/sidecar.md | 3 +++ pkg/promclient/promclient.go | 47 ++++++++++++++++++++++------------ pkg/store/acceptance_test.go | 2 +- pkg/store/prometheus.go | 21 ++++++++++----- pkg/store/prometheus_test.go | 8 +++--- pkg/store/proxy.go | 16 +++++++++++- pkg/tenancy/tenancy.go | 18 +++++++++++-- test/e2e/query_test.go | 6 ++--- test/e2e/store_gateway_test.go | 8 +++--- 12 files changed, 99 insertions(+), 41 deletions(-) diff --git a/Makefile b/Makefile index bca6345503..a41891cf04 100644 --- a/Makefile +++ b/Makefile @@ -152,6 +152,8 @@ build: check-git deps $(PROMU) @$(PROMU) build --prefix $(PREFIX) GIT_BRANCH=$(shell $(GIT) rev-parse --abbrev-ref HEAD) +GIT_REVISION := $(shell git rev-parse --short HEAD) +IMAGE_TAG ?= $(subst /,-,$(GIT_BRANCH))-$(GIT_REVISION) .PHONY: crossbuild crossbuild: ## Builds all binaries for all platforms. ifeq ($(GIT_BRANCH), main) @@ -197,7 +199,7 @@ docker: build @echo ">> copying Thanos from $(PREFIX) to ./thanos_tmp_for_docker" @cp $(PREFIX)/thanos ./thanos_tmp_for_docker @echo ">> building docker image 'thanos'" - @docker build -t "thanos" --build-arg BASE_DOCKER_SHA=$(BASE_DOCKER_SHA) . + @docker build -t "thanos" --build-arg BASE_DOCKER_SHA=$(BASE_DOCKER_SHA) -t thanos:$(IMAGE_TAG) . @rm ./thanos_tmp_for_docker else docker: docker-multi-stage @@ -207,7 +209,7 @@ endif docker-multi-stage: ## Builds 'thanos' docker image using multi-stage. docker-multi-stage: @echo ">> building docker image 'thanos' with Dockerfile.multi-stage" - @docker build -f Dockerfile.multi-stage -t "thanos" --build-arg BASE_DOCKER_SHA=$(BASE_DOCKER_SHA) . + @docker build -f Dockerfile.multi-stage -t "thanos" -t thanos:$(IMAGE_TAG) --build-arg BASE_DOCKER_SHA=$(BASE_DOCKER_SHA) . # docker-build builds docker images with multiple architectures. .PHONY: docker-build $(BUILD_DOCKER_ARCHS) diff --git a/cmd/thanos/config.go b/cmd/thanos/config.go index 3ce069c1b2..400f75c65c 100644 --- a/cmd/thanos/config.go +++ b/cmd/thanos/config.go @@ -21,6 +21,7 @@ import ( "github.com/thanos-io/thanos/pkg/extkingpin" "github.com/thanos-io/thanos/pkg/shipper" + "github.com/thanos-io/thanos/pkg/tenancy" ) type grpcConfig struct { @@ -80,12 +81,14 @@ type prometheusConfig struct { getConfigInterval time.Duration getConfigTimeout time.Duration httpClient *extflag.PathOrContent + tenantHeader string } func (pc *prometheusConfig) registerFlag(cmd extkingpin.FlagClause) *prometheusConfig { cmd.Flag("prometheus.url", "URL at which to reach Prometheus's API. For better performance use local network."). Default("http://localhost:9090").URLVar(&pc.url) + cmd.Flag("prometheus.tenant-header", "HTTP header to determine tenant.").Default(tenancy.DefaultTenantHeader).StringVar(&pc.tenantHeader) cmd.Flag("prometheus.ready_timeout", "Maximum time to wait for the Prometheus instance to start up"). Default("10m").DurationVar(&pc.readyTimeout) diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 014b9fc9df..74755f3946 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -297,7 +297,7 @@ func runSidecar( { c := promclient.NewWithTracingClient(logger, httpClient, clientconfig.ThanosUserAgent) - promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version) + promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version, conf.prometheus.tenantHeader) if err != nil { return errors.Wrap(err, "create Prometheus store") } diff --git a/docs/components/sidecar.md b/docs/components/sidecar.md index 951a0d58ec..730967b410 100644 --- a/docs/components/sidecar.md +++ b/docs/components/sidecar.md @@ -45,6 +45,7 @@ prometheus \ ```bash thanos sidecar \ --tsdb.path "/path/to/prometheus/data/dir" \ + --prometheus.tenant-header="THANOS-TENANT" \ --prometheus.url "http://localhost:9090" \ --objstore.config-file "bucket.yml" ``` @@ -170,6 +171,8 @@ Flags: --prometheus.ready_timeout=10m Maximum time to wait for the Prometheus instance to start up + --prometheus.tenant-header="THANOS-TENANT" + HTTP header to determine tenant. --prometheus.url=http://localhost:9090 URL at which to reach Prometheus's API. For better performance use local network. diff --git a/pkg/promclient/promclient.go b/pkg/promclient/promclient.go index 6f124136b4..7b067fc26a 100644 --- a/pkg/promclient/promclient.go +++ b/pkg/promclient/promclient.go @@ -40,6 +40,7 @@ import ( "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/targets/targetspb" + "github.com/thanos-io/thanos/pkg/tenancy" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -107,6 +108,7 @@ func NewWithTracingClient(logger log.Logger, httpClient *http.Client, userAgent // the raw query is encoded in the body and the appropriate Content-Type is set. func (c *Client) req2xx(ctx context.Context, u *url.URL, method string, headers http.Header) (_ []byte, _ int, err error) { var b io.Reader + if method == http.MethodPost { rq := u.RawQuery b = strings.NewReader(rq) @@ -691,11 +693,17 @@ func formatTime(t time.Time) string { return strconv.FormatFloat(float64(t.Unix())+float64(t.Nanosecond())/1e9, 'f', -1, 64) } -func (c *Client) get2xxResultWithGRPCErrors(ctx context.Context, spanName string, u *url.URL, data interface{}) error { +func (c *Client) get2xxResultWithGRPCErrors(ctx context.Context, spanName string, u *url.URL, data interface{}, tenantHeader string) error { span, ctx := tracing.StartSpan(ctx, spanName) defer span.Finish() - body, code, err := c.req2xx(ctx, u, http.MethodGet, nil) + var customheader http.Header = nil + if len(tenantHeader) > 0 { + customheader = http.Header{} + customheader.Set(tenantHeader, tenancy.GetTenantFromProvidedHeader(ctx, tenantHeader)) + } + + body, code, err := c.req2xx(ctx, u, http.MethodGet, customheader) if err != nil { if code, exists := statusToCode[code]; exists && code != 0 { return status.Error(code, err.Error()) @@ -734,7 +742,7 @@ func (c *Client) get2xxResultWithGRPCErrors(ctx context.Context, spanName string // SeriesInGRPC returns the labels from Prometheus series API. It uses gRPC errors. // NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus. -func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]map[string]string, error) { +func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int, tenantHeader string) ([]map[string]string, error) { u := *base u.Path = path.Join(u.Path, "/api/v1/series") q := u.Query() @@ -742,19 +750,21 @@ func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*la q.Add("match[]", storepb.PromMatchersToString(matchers...)) q.Add("start", formatTime(timestamp.Time(startTime))) q.Add("end", formatTime(timestamp.Time(endTime))) - q.Add("limit", strconv.Itoa(limit)) + if limit > 0 { + q.Add("limit", strconv.Itoa(limit)) + } u.RawQuery = q.Encode() var m struct { Data []map[string]string `json:"data"` } - return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_series HTTP[client]", &u, &m) + return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_series HTTP[client]", &u, &m, tenantHeader) } // LabelNamesInGRPC returns all known label names constrained by the given matchers. It uses gRPC errors. // NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus. -func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]string, error) { +func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int, tenantHeader string) ([]string, error) { u := *base u.Path = path.Join(u.Path, "/api/v1/labels") q := u.Query() @@ -764,18 +774,20 @@ func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers [ } q.Add("start", formatTime(timestamp.Time(startTime))) q.Add("end", formatTime(timestamp.Time(endTime))) - q.Add("limit", strconv.Itoa(limit)) + if limit > 0 { + q.Add("limit", strconv.Itoa(limit)) + } u.RawQuery = q.Encode() var m struct { Data []string `json:"data"` } - return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_label_names HTTP[client]", &u, &m) + return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_label_names HTTP[client]", &u, &m, tenantHeader) } // LabelValuesInGRPC returns all known label values for a given label name. It uses gRPC errors. // NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus. -func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]string, error) { +func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string, matchers []*labels.Matcher, startTime, endTime int64, limit int, tenantHeader string) ([]string, error) { u := *base u.Path = path.Join(u.Path, "/api/v1/label/", label, "/values") q := u.Query() @@ -785,13 +797,16 @@ func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label str } q.Add("start", formatTime(timestamp.Time(startTime))) q.Add("end", formatTime(timestamp.Time(endTime))) - q.Add("limit", strconv.Itoa(limit)) + if limit > 0 { + q.Add("limit", strconv.Itoa(limit)) + } + u.RawQuery = q.Encode() var m struct { Data []string `json:"data"` } - return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_label_values HTTP[client]", &u, &m) + return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_label_values HTTP[client]", &u, &m, tenantHeader) } // RulesInGRPC returns the rules from Prometheus rules API. It uses gRPC errors. @@ -810,7 +825,7 @@ func (c *Client) RulesInGRPC(ctx context.Context, base *url.URL, typeRules strin Data *rulespb.RuleGroups `json:"data"` } - if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_rules HTTP[client]", &u, &m); err != nil { + if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_rules HTTP[client]", &u, &m, ""); err != nil { return nil, err } @@ -833,7 +848,7 @@ func (c *Client) AlertsInGRPC(ctx context.Context, base *url.URL) ([]*rulespb.Al } `json:"data"` } - if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_alerts HTTP[client]", &u, &m); err != nil { + if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_alerts HTTP[client]", &u, &m, ""); err != nil { return nil, err } @@ -863,7 +878,7 @@ func (c *Client) MetricMetadataInGRPC(ctx context.Context, base *url.URL, metric var v struct { Data map[string][]*metadatapb.Meta `json:"data"` } - return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_metric_metadata HTTP[client]", &u, &v) + return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_metric_metadata HTTP[client]", &u, &v, "") } // ExemplarsInGRPC returns the exemplars from Prometheus exemplars API. It uses gRPC errors. @@ -882,7 +897,7 @@ func (c *Client) ExemplarsInGRPC(ctx context.Context, base *url.URL, query strin Data []*exemplarspb.ExemplarData `json:"data"` } - if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_exemplars HTTP[client]", &u, &m); err != nil { + if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_exemplars HTTP[client]", &u, &m, ""); err != nil { return nil, err } @@ -902,5 +917,5 @@ func (c *Client) TargetsInGRPC(ctx context.Context, base *url.URL, stateTargets var v struct { Data *targetspb.TargetDiscovery `json:"data"` } - return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_targets HTTP[client]", &u, &v) + return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_targets HTTP[client]", &u, &v, "") } diff --git a/pkg/store/acceptance_test.go b/pkg/store/acceptance_test.go index e7e15ca2b4..55c164502b 100644 --- a/pkg/store/acceptance_test.go +++ b/pkg/store/acceptance_test.go @@ -997,7 +997,7 @@ func TestPrometheusStore_Acceptance(t *testing.T) { promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return extLset }, func() (int64, int64) { return timestamp.FromTime(minTime), timestamp.FromTime(maxTime) }, - func() string { return version }) + func() string { return version }, "") testutil.Ok(tt, err) // We build chunks only for SAMPLES method. Make sure we ask for SAMPLES only. diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index f891ad9bb0..dcc0062698 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -41,6 +41,7 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" + "github.com/thanos-io/thanos/pkg/tenancy" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -61,6 +62,7 @@ type PrometheusStore struct { framesRead prometheus.Histogram storepb.UnimplementedStoreServer + tenantHeader string } // Label{Values,Names} call with matchers is supported for Prometheus versions >= 2.24.0. @@ -81,6 +83,7 @@ func NewPrometheusStore( externalLabelsFn func() labels.Labels, timestamps func() (mint int64, maxt int64), promVersion func() string, + tenantHeader string, ) (*PrometheusStore, error) { if logger == nil { logger = log.NewNopLogger() @@ -105,6 +108,7 @@ func NewPrometheusStore( Buckets: prometheus.ExponentialBuckets(10, 10, 5), }, ), + tenantHeader: tenantHeader, } return p, nil } @@ -153,7 +157,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto if r.SkipChunks { finalExtLset := rmLabels(extLset.Copy(), extLsetToRemove) - labelMaps, err := p.client.SeriesInGRPC(s.Context(), p.base, matchers, r.MinTime, r.MaxTime, int(r.Limit)) + labelMaps, err := p.client.SeriesInGRPC(s.Context(), p.base, matchers, r.MinTime, r.MaxTime, int(r.Limit), p.tenantHeader) if err != nil { return err } @@ -471,7 +475,9 @@ func (p *PrometheusStore) startPromRemoteRead(ctx context.Context, q *prompb.Que preq.Header.Add("Content-Encoding", "snappy") preq.Header.Set("Content-Type", "application/x-stream-protobuf") preq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0") - + if len(p.tenantHeader) > 0 { + preq.Header.Set(p.tenantHeader, tenancy.GetTenantFromProvidedHeader(ctx, p.tenantHeader)) + } preq.Header.Set("User-Agent", clientconfig.ThanosUserAgent) presp, err = p.client.Do(preq.WithContext(ctx)) if err != nil { @@ -551,12 +557,12 @@ func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesR var lbls []string if len(matchers) == 0 || p.labelCallsSupportMatchers() { - lbls, err = p.client.LabelNamesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit)) + lbls, err = p.client.LabelNamesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit), p.tenantHeader) if err != nil { return nil, err } } else { - sers, err := p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit)) + sers, err := p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit), p.tenantHeader) if err != nil { return nil, err } @@ -622,7 +628,8 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue if len(matchers) == 0 { return &storepb.LabelValuesResponse{Values: []string{val}}, nil } - sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit)) + + sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit), p.tenantHeader) if err != nil { return nil, err } @@ -633,12 +640,12 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue } if len(matchers) == 0 || p.labelCallsSupportMatchers() { - vals, err = p.client.LabelValuesInGRPC(ctx, p.base, r.Label, matchers, r.Start, r.End, int(r.Limit)) + vals, err = p.client.LabelValuesInGRPC(ctx, p.base, r.Label, matchers, r.Start, r.End, int(r.Limit), p.tenantHeader) if err != nil { return nil, err } } else { - sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit)) + sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit), p.tenantHeader) if err != nil { return nil, err } diff --git a/pkg/store/prometheus_test.go b/pkg/store/prometheus_test.go index 8c7b0496c8..6dcb0dde0b 100644 --- a/pkg/store/prometheus_test.go +++ b/pkg/store/prometheus_test.go @@ -71,7 +71,7 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return limitMinT, -1 }, - nil, + nil, "", ) // MaxTime does not matter. testutil.Ok(t, err) @@ -200,7 +200,7 @@ func TestPrometheusStore_SeriesLabels_e2e(t *testing.T) { promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 }, - nil, + nil, "", ) testutil.Ok(t, err) @@ -374,7 +374,7 @@ func TestPrometheusStore_Series_MatchExternalLabel(t *testing.T) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return 0, math.MaxInt64 }, - nil) + nil, "") testutil.Ok(t, err) srv := newStoreSeriesServer(ctx) @@ -437,7 +437,7 @@ func TestPrometheusStore_Series_ChunkHashCalculation_Integration(t *testing.T) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return 0, math.MaxInt64 }, - nil) + nil, "") testutil.Ok(t, err) srv := newStoreSeriesServer(ctx) diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 90ed0fb2c2..12b2c0273b 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -263,6 +263,10 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. } ctx = metadata.AppendToOutgoingContext(ctx, tenancy.DefaultTenantHeader, tenant) + // only set custom header if it's set previously + if len(tenancy.CustomHeader) > 0 { + ctx = metadata.AppendToOutgoingContext(ctx, tenancy.CustomHeader, tenant) + } level.Debug(s.logger).Log("msg", "Tenant info in Series()", "tenant", tenant) stores, storeLabelSets, storeDebugMsgs := s.matchingStores(ctx, originalRequest.MinTime, originalRequest.MaxTime, matchers) @@ -361,6 +365,10 @@ func (s *ProxyStore) LabelNames(ctx context.Context, originalRequest *storepb.La } ctx = metadata.AppendToOutgoingContext(ctx, tenancy.DefaultTenantHeader, tenant) + // only set custom header if it's set previously + if len(tenancy.CustomHeader) > 0 { + ctx = metadata.AppendToOutgoingContext(ctx, tenancy.CustomHeader, tenant) + } level.Debug(s.logger).Log("msg", "Tenant info in LabelNames()", "tenant", tenant) stores, storeLabelSets, storeDebugMsgs := s.matchingStores(ctx, originalRequest.Start, originalRequest.End, matchers) @@ -375,6 +383,7 @@ func (s *ProxyStore) LabelNames(ctx context.Context, originalRequest *storepb.La End: originalRequest.End, Matchers: append(storeMatchers, MatchersForLabelSets(storeLabelSets)...), WithoutReplicaLabels: originalRequest.WithoutReplicaLabels, + Limit: originalRequest.Limit, Hints: originalRequest.Hints, } @@ -464,7 +473,12 @@ func (s *ProxyStore) LabelValues(ctx context.Context, originalRequest *storepb.L } ctx = metadata.AppendToOutgoingContext(ctx, tenancy.DefaultTenantHeader, tenant) - level.Debug(reqLogger).Log("msg", "Tenant info in LabelValues()", "tenant", tenant) + // only set custom header if it's set previously + if len(tenancy.CustomHeader) > 0 { + ctx = metadata.AppendToOutgoingContext(ctx, tenancy.CustomHeader, tenant) + } + + level.Debug(s.logger).Log("msg", "Tenant info in LabelNames()", "tenant", tenant) stores, storeLabelSets, storeDebugMsgs := s.matchingStores(ctx, originalRequest.Start, originalRequest.End, matchers) if len(stores) == 0 { diff --git a/pkg/tenancy/tenancy.go b/pkg/tenancy/tenancy.go index 9da1372933..34a4c6a6a2 100644 --- a/pkg/tenancy/tenancy.go +++ b/pkg/tenancy/tenancy.go @@ -31,6 +31,8 @@ const ( MetricLabel = "tenant" ) +var CustomHeader = "" + // Allowed fields in client certificates. const ( CertificateFieldOrganization = "organization" @@ -140,6 +142,14 @@ func GetTenantFromGRPCMetadata(ctx context.Context) (string, bool) { return md.Get(DefaultTenantHeader)[0], true } +func GetTenantFromProvidedHeader(ctx context.Context, header string) string { + md, ok := metadata.FromIncomingContext(ctx) + if !ok || len(md.Get(header)) == 0 { + return "" + } + return md.Get(header)[0] +} + func EnforceQueryTenancy(tenantLabel string, tenant string, query string) (string, error) { labelMatcher := &labels.Matcher{ Name: tenantLabel, @@ -207,7 +217,9 @@ func RewritePromQL(ctx context.Context, r *http.Request, tenantHeader string, de return "", "", ctx, err } ctx = context.WithValue(ctx, TenantKey, tenant) - + if tenantHeader != DefaultTenantHeader { + CustomHeader = tenantHeader + } if enforceTenancy { queryStr, err = EnforceQueryTenancy(tenantLabel, tenant, queryStr) return queryStr, tenant, ctx, err @@ -225,7 +237,9 @@ func RewriteLabelMatchers(ctx context.Context, r *http.Request, tenantHeader str return nil, ctx, err } ctx = context.WithValue(ctx, TenantKey, tenant) - + if tenantHeader != DefaultTenantHeader { + CustomHeader = tenantHeader + } matcherSets, err := getLabelMatchers(formMatchers, tenant, enforceTenancy, tenantLabel) if err != nil { return nil, ctx, err diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 071d6b304b..55970f151d 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -1421,7 +1421,7 @@ func labelNames(t *testing.T, ctx context.Context, addr string, matchers []*labe logger := log.NewLogfmtLogger(os.Stdout) logger = log.With(logger, "ts", log.DefaultTimestampUTC) testutil.Ok(t, runutil.RetryWithLog(logger, 2*time.Second, ctx.Done(), func() error { - res, err := promclient.NewDefaultClient().LabelNamesInGRPC(ctx, urlParse(t, "http://"+addr), matchers, start, end, limit) + res, err := promclient.NewDefaultClient().LabelNamesInGRPC(ctx, urlParse(t, "http://"+addr), matchers, start, end, limit, "") if err != nil { return err } @@ -1440,7 +1440,7 @@ func labelValues(t *testing.T, ctx context.Context, addr, label string, matchers logger := log.NewLogfmtLogger(os.Stdout) logger = log.With(logger, "ts", log.DefaultTimestampUTC) testutil.Ok(t, runutil.RetryWithLog(logger, 2*time.Second, ctx.Done(), func() error { - res, err := promclient.NewDefaultClient().LabelValuesInGRPC(ctx, urlParse(t, "http://"+addr), label, matchers, start, end, limit) + res, err := promclient.NewDefaultClient().LabelValuesInGRPC(ctx, urlParse(t, "http://"+addr), label, matchers, start, end, limit, "") if err != nil { return err } @@ -1458,7 +1458,7 @@ func series(t *testing.T, ctx context.Context, addr string, matchers []*labels.M logger := log.NewLogfmtLogger(os.Stdout) logger = log.With(logger, "ts", log.DefaultTimestampUTC) testutil.Ok(t, runutil.RetryWithLog(logger, 2*time.Second, ctx.Done(), func() error { - res, err := promclient.NewDefaultClient().SeriesInGRPC(ctx, urlParse(t, "http://"+addr), matchers, start, end, limit) + res, err := promclient.NewDefaultClient().SeriesInGRPC(ctx, urlParse(t, "http://"+addr), matchers, start, end, limit, "") if err != nil { return err } diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index 912712f0a8..8a1d80e252 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -1401,17 +1401,17 @@ func TestStoreGatewayLazyExpandedPostingsPromQLSmithFuzz(t *testing.T) { minT := e2eutil.RandRange(rnd, startMs, endMs) maxT := e2eutil.RandRange(rnd, minT+1, endMs) - res1, err := client.SeriesInGRPC(ctx, u1, matchers, minT, maxT, 0) + res1, err := client.SeriesInGRPC(ctx, u1, matchers, minT, maxT, 0, "") testutil.Ok(t, err) - res2, err := client.SeriesInGRPC(ctx, u2, matchers, minT, maxT, 0) + res2, err := client.SeriesInGRPC(ctx, u2, matchers, minT, maxT, 0, "") testutil.Ok(t, err) // Try again with a different timestamp and let requests hit posting cache. minT = e2eutil.RandRange(rnd, startMs, endMs) maxT = e2eutil.RandRange(rnd, minT+1, endMs) - newRes1, err := client.SeriesInGRPC(ctx, u1, matchers, minT, maxT, 0) + newRes1, err := client.SeriesInGRPC(ctx, u1, matchers, minT, maxT, 0, "") testutil.Ok(t, err) - newRes2, err := client.SeriesInGRPC(ctx, u2, matchers, minT, maxT, 0) + newRes2, err := client.SeriesInGRPC(ctx, u2, matchers, minT, maxT, 0, "") testutil.Ok(t, err) cases = append(cases, &testCase{