-
Notifications
You must be signed in to change notification settings - Fork 603
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: profilecli query-blocks series #3610
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. extracting some common logic from query.go |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
package main | ||
|
||
import ( | ||
"encoding/json" | ||
"os" | ||
|
||
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" | ||
) | ||
|
||
func outputSeries(result []*typesv1.Labels) error { | ||
enc := json.NewEncoder(os.Stdout) | ||
m := make(map[string]interface{}) | ||
for _, s := range result { | ||
clear(m) | ||
for _, l := range s.Labels { | ||
m[l.Name] = l.Value | ||
} | ||
if err := enc.Encode(m); err != nil { | ||
return err | ||
} | ||
} | ||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"math" | ||
|
||
"connectrpc.com/connect" | ||
"github.com/go-kit/log/level" | ||
"github.com/pkg/errors" | ||
|
||
ingestv1 "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1" | ||
"github.com/grafana/pyroscope/pkg/objstore" | ||
objstoreclient "github.com/grafana/pyroscope/pkg/objstore/client" | ||
"github.com/grafana/pyroscope/pkg/objstore/providers/filesystem" | ||
"github.com/grafana/pyroscope/pkg/objstore/providers/gcs" | ||
"github.com/grafana/pyroscope/pkg/phlaredb" | ||
) | ||
|
||
type queryBlocksParams struct { | ||
LocalPath string | ||
BucketName string | ||
BlockIds []string | ||
TenantID string | ||
ObjectStoreType string | ||
Query string | ||
} | ||
|
||
type queryBlocksSeriesParams struct { | ||
*queryBlocksParams | ||
LabelNames []string | ||
} | ||
|
||
func addQueryBlocksParams(queryCmd commander) *queryBlocksParams { | ||
params := new(queryBlocksParams) | ||
queryCmd.Flag("local-path", "Path to blocks directory.").Default("./data/anonymous/local").StringVar(¶ms.LocalPath) | ||
queryCmd.Flag("bucket-name", "The name of the object storage bucket.").StringVar(¶ms.BucketName) | ||
queryCmd.Flag("object-store-type", "The type of the object storage (e.g., gcs).").Default("gcs").StringVar(¶ms.ObjectStoreType) | ||
queryCmd.Flag("block-ids", "List of blocks ids to query on").StringsVar(¶ms.BlockIds) | ||
queryCmd.Flag("tenant-id", "Tenant id of the queried block for remote bucket").StringVar(¶ms.TenantID) | ||
queryCmd.Flag("query", "Label selector to query.").Default("{}").StringVar(¶ms.Query) | ||
return params | ||
} | ||
|
||
func addQueryBlocksSeriesParams(queryCmd commander) *queryBlocksSeriesParams { | ||
params := new(queryBlocksSeriesParams) | ||
params.queryBlocksParams = addQueryBlocksParams(queryCmd) | ||
queryCmd.Flag("label-names", "Filter returned labels to the supplied label names. Without any filter all labels are returned.").StringsVar(¶ms.LabelNames) | ||
return params | ||
} | ||
|
||
func queryBlocksSeries(ctx context.Context, params *queryBlocksSeriesParams) error { | ||
level.Info(logger).Log("msg", "query-block series", "labelNames", fmt.Sprintf("%v", params.LabelNames), | ||
"blockIds", fmt.Sprintf("%v", params.BlockIds), "localPath", params.LocalPath, "bucketName", params.BucketName, "tenantId", params.TenantID) | ||
|
||
bucket, err := getBucket(ctx, params) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
blockQuerier := phlaredb.NewBlockQuerier(ctx, bucket) | ||
|
||
var from, to int64 | ||
from, to = math.MaxInt64, math.MinInt64 | ||
var targetBlockQueriers phlaredb.Queriers | ||
for _, blockId := range params.queryBlocksParams.BlockIds { | ||
meta, err := blockQuerier.BlockMeta(ctx, blockId) | ||
if err != nil { | ||
return err | ||
} | ||
from = min(from, meta.MinTime.Time().UnixMilli()) | ||
to = max(to, meta.MaxTime.Time().UnixMilli()) | ||
targetBlockQueriers = append(targetBlockQueriers, phlaredb.NewSingleBlockQuerierFromMeta(ctx, bucket, meta)) | ||
} | ||
|
||
response, err := targetBlockQueriers.Series(ctx, connect.NewRequest( | ||
&ingestv1.SeriesRequest{ | ||
Start: from, | ||
End: to, | ||
Matchers: []string{params.Query}, | ||
LabelNames: params.LabelNames, | ||
}, | ||
)) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return outputSeries(response.Msg.LabelsSet) | ||
} | ||
|
||
func getBucket(ctx context.Context, params *queryBlocksSeriesParams) (objstore.Bucket, error) { | ||
if params.BucketName != "" { | ||
return getRemoteBucket(ctx, params) | ||
} else { | ||
return filesystem.NewBucket(params.LocalPath) | ||
} | ||
} | ||
|
||
func getRemoteBucket(ctx context.Context, params *queryBlocksSeriesParams) (objstore.Bucket, error) { | ||
if params.TenantID == "" { | ||
return nil, errors.New("specify tenant id for remote bucket") | ||
} | ||
return objstoreclient.NewBucket(ctx, objstoreclient.Config{ | ||
StorageBackendConfig: objstoreclient.StorageBackendConfig{ | ||
Backend: params.ObjectStoreType, | ||
GCS: gcs.Config{ | ||
BucketName: params.BucketName, | ||
}, | ||
}, | ||
StoragePrefix: fmt.Sprintf("%s/phlaredb", params.TenantID), | ||
}, params.BucketName) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -159,6 +159,23 @@ func (b *BlockQuerier) BlockMetas(ctx context.Context) (metas []*block.Meta, _ e | |
return metas[0 : pos+1], nil | ||
} | ||
|
||
func (b *BlockQuerier) BlockMeta(ctx context.Context, name string) (meta *block.Meta, _ error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had to introduce a direct way to read a single meta, otherwise, using |
||
path := filepath.Join(name, block.MetaFilename) | ||
metaReader, err := b.bkt.Get(ctx, path) | ||
if err != nil { | ||
level.Error(b.logger).Log("msg", "error reading block meta", "block", path, "err", err) | ||
return nil, err | ||
} | ||
|
||
meta, err = block.Read(metaReader) | ||
if err != nil { | ||
level.Error(b.logger).Log("msg", "error parsing block meta", "block", path, "err", err) | ||
return nil, err | ||
} | ||
|
||
return meta, nil | ||
} | ||
|
||
// Sync gradually scans the available blocks. If there are any changes to the | ||
// last run it will Open/Close new/no longer existing ones. | ||
func (b *BlockQuerier) Sync(ctx context.Context) error { | ||
|
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -29,6 +29,8 @@ import ( | |||||||
"github.com/grafana/pyroscope/pkg/pprof/testhelper" | ||||||||
) | ||||||||
|
||||||||
const testDataPath = "./block/testdata/" | ||||||||
|
||||||||
func TestQuerierBlockEviction(t *testing.T) { | ||||||||
type testCase struct { | ||||||||
blocks []string | ||||||||
|
@@ -105,7 +107,7 @@ func (p *profileCounter) Next() bool { | |||||||
} | ||||||||
|
||||||||
func TestBlockCompatability(t *testing.T) { | ||||||||
path := "./block/testdata/" | ||||||||
path := testDataPath | ||||||||
bucket, err := filesystem.NewBucket(path) | ||||||||
require.NoError(t, err) | ||||||||
|
||||||||
|
@@ -156,7 +158,7 @@ func TestBlockCompatability(t *testing.T) { | |||||||
} | ||||||||
|
||||||||
func TestBlockCompatability_SelectMergeSpans(t *testing.T) { | ||||||||
path := "./block/testdata/" | ||||||||
path := testDataPath | ||||||||
bucket, err := filesystem.NewBucket(path) | ||||||||
require.NoError(t, err) | ||||||||
|
||||||||
|
@@ -1386,3 +1388,22 @@ func testSelectMergeByStacktracesRace(t testing.TB, times int) { | |||||||
require.NoError(t, g.Wait()) | ||||||||
require.NoError(t, querier.Close()) | ||||||||
} | ||||||||
|
||||||||
func TestBlockMeta_loadsMetasIndividually(t *testing.T) { | ||||||||
path := testDataPath | ||||||||
bucket, err := filesystem.NewBucket(path) | ||||||||
require.NoError(t, err) | ||||||||
|
||||||||
ctx := context.Background() | ||||||||
blockQuerier := NewBlockQuerier(ctx, bucket) | ||||||||
metas, err := blockQuerier.BlockMetas(ctx) | ||||||||
require.NoError(t, err) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd add something like this to ensure there's actually data to test.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good one as well, fixed |
||||||||
require.NotEmpty(t, metas) | ||||||||
|
||||||||
for _, meta := range metas { | ||||||||
singleMeta, err := blockQuerier.BlockMeta(ctx, meta.ULID.String()) | ||||||||
require.NoError(t, err) | ||||||||
|
||||||||
require.Equal(t, meta, singleMeta) | ||||||||
} | ||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT about moving this to
admin blocks
sub-commands?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not directly related to this PR, but if you think it makes sense we can improve the storage config handling (in all subcommands)
I think that we should use the standard client config (that supports all backends). You can register the flags as follows:
This will probably require handling of the default path and fs backend. This way we could create our
Bucket
object for allblocks
subcommands automatically.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding the flags of the storage, I'll take a look and maybe implement separately.
About making query a subcommand of admin blocks, I discarded that idea at first because I thought that
profilecli admin blocks
needed a pyroscope server to work. Now I see it works directly on blocks, and makes sense to merge commands here. I may implement this as well in a following PR.