Skip to content

Commit

Permalink
feat: profilecli query-blocks merge
Browse files Browse the repository at this point in the history
feat: profilecli query-blocks merge
  • Loading branch information
alsoba13 committed Oct 9, 2024
1 parent 426515a commit fabe9a7
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 71 deletions.
6 changes: 6 additions & 0 deletions cmd/profilecli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func main() {
queryBlocksCmd := app.Command("query-blocks", "Query on local/remote blocks")
queryBlocksSeriesCmd := queryBlocksCmd.Command("series", "Request series labels on local/remote blocks")
queryBlocksSeriesParams := addQueryBlocksSeriesParams(queryBlocksSeriesCmd)
queryBlocksMergeCmd := queryBlocksCmd.Command("merge", "Request merged profile.")
queryBlocksMergeParams := addQueryBlocksMergeParams(queryBlocksMergeCmd)

uploadCmd := app.Command("upload", "Upload profile(s).")
uploadParams := addUploadParams(uploadCmd)
Expand Down Expand Up @@ -140,6 +142,10 @@ func main() {
if err := queryBlocksSeries(ctx, queryBlocksSeriesParams); err != nil {
os.Exit(checkError(err))
}
case queryBlocksMergeCmd.FullCommand():
if err := queryBlocksMerge(ctx, queryBlocksMergeParams); err != nil {
os.Exit(checkError(err))
}

case queryLabelValuesCardinalityCmd.FullCommand():
if err := queryLabelValuesCardinality(ctx, queryLabelValuesCardinalityParams); err != nil {
Expand Down
75 changes: 75 additions & 0 deletions cmd/profilecli/output.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,31 @@
package main

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"strings"

gprofile "github.com/google/pprof/profile"
"github.com/grafana/dskit/runutil"
"github.com/k0kubun/pp/v3"
"github.com/klauspost/compress/gzip"
"github.com/mattn/go-isatty"
"github.com/pkg/errors"

"github.com/grafana/pyroscope/api/gen/proto/go/google/v1"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
)

const (
outputConsole = "console"
outputRaw = "raw"
outputPprof = "pprof="
)

func outputSeries(result []*typesv1.Labels) error {
enc := json.NewEncoder(os.Stdout)
m := make(map[string]interface{})
Expand All @@ -21,3 +40,59 @@ func outputSeries(result []*typesv1.Labels) error {
}
return nil
}

func outputMergeProfile(ctx context.Context, outputFlag string, profile *googlev1.Profile) error {
mypp := pp.New()
mypp.SetColoringEnabled(isatty.IsTerminal(os.Stdout.Fd()))
mypp.SetExportedOnly(true)

if outputFlag == outputConsole {
buf, err := profile.MarshalVT()
if err != nil {
return errors.Wrap(err, "failed to marshal protobuf")
}

p, err := gprofile.Parse(bytes.NewReader(buf))
if err != nil {
return errors.Wrap(err, "failed to parse profile")
}

fmt.Fprintln(output(ctx), p.String())
return nil

}

if outputFlag == outputRaw {
mypp.Print(profile)
return nil
}

if strings.HasPrefix(outputFlag, outputPprof) {
filePath := strings.TrimPrefix(outputFlag, outputPprof)
if filePath == "" {
return errors.New("no file path specified after pprof=")
}
buf, err := profile.MarshalVT()
if err != nil {
return errors.Wrap(err, "failed to marshal protobuf")
}

// open new file, fail when the file already exists
f, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0644)
if err != nil {
return errors.Wrap(err, "failed to create pprof file")
}
defer runutil.CloseWithErrCapture(&err, f, "failed to close pprof file")

gzipWriter := gzip.NewWriter(f)
defer runutil.CloseWithErrCapture(&err, gzipWriter, "failed to close pprof gzip writer")

if _, err := io.Copy(gzipWriter, bytes.NewReader(buf)); err != nil {
return errors.Wrap(err, "failed to write pprof")
}

return nil
}

return errors.Errorf("unknown output %s", outputFlag)
}
79 changes: 76 additions & 3 deletions cmd/profilecli/query-blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/pkg/errors"

ingestv1 "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
"github.com/grafana/pyroscope/pkg/model"
"github.com/grafana/pyroscope/pkg/objstore"
objstoreclient "github.com/grafana/pyroscope/pkg/objstore/client"
"github.com/grafana/pyroscope/pkg/objstore/providers/filesystem"
Expand All @@ -26,6 +28,13 @@ type queryBlocksParams struct {
Query string
}

type queryBlocksMergeParams struct {
*queryBlocksParams
Output string
ProfileType string
StacktraceSelector []string
}

type queryBlocksSeriesParams struct {
*queryBlocksParams
LabelNames []string
Expand All @@ -42,18 +51,82 @@ func addQueryBlocksParams(queryCmd commander) *queryBlocksParams {
return params
}

func addQueryBlocksMergeParams(queryCmd commander) *queryBlocksMergeParams {
params := new(queryBlocksMergeParams)
params.queryBlocksParams = addQueryBlocksParams(queryCmd)
queryCmd.Flag("output", "How to output the result, examples: console, raw, pprof=./my.pprof").Default("console").StringVar(&params.Output)
queryCmd.Flag("profile-type", "Profile type to query.").Default("process_cpu:cpu:nanoseconds:cpu:nanoseconds").StringVar(&params.ProfileType)
queryCmd.Flag("stacktrace-selector", "Only query locations with those symbols. Provide multiple times starting with the root").StringsVar(&params.StacktraceSelector)
return params
}

func addQueryBlocksSeriesParams(queryCmd commander) *queryBlocksSeriesParams {
params := new(queryBlocksSeriesParams)
params.queryBlocksParams = addQueryBlocksParams(queryCmd)
queryCmd.Flag("label-names", "Filter returned labels to the supplied label names. Without any filter all labels are returned.").StringsVar(&params.LabelNames)
return params
}

func queryBlocksMerge(ctx context.Context, params *queryBlocksMergeParams) error {
level.Info(logger).Log("msg", "query-block merge", "blockIds", fmt.Sprintf("%v", params.BlockIds), "localPath",
params.LocalPath, "bucketName", params.BucketName, "tenantId", params.TenantID, "query", params.Query, "type", params.ProfileType)

if len(params.BlockIds) > 1 {
return errors.New("query merge is limited to a single block")
}

profileType, err := model.ParseProfileTypeSelector(params.ProfileType)
if err != nil {
return err
}

var stackTraceSelectors *typesv1.StackTraceSelector = nil
if len(params.StacktraceSelector) > 0 {
locations := make([]*typesv1.Location, 0, len(params.StacktraceSelector))
for _, cs := range params.StacktraceSelector {
locations = append(locations, &typesv1.Location{
Name: cs,
})
}
stackTraceSelectors = &typesv1.StackTraceSelector{
CallSite: locations,
}
level.Info(logger).Log("msg", "selecting with stackstrace selector", "call-site", fmt.Sprintf("%#+v", params.StacktraceSelector))
}

bucket, err := getBucket(ctx, params.queryBlocksParams)
if err != nil {
return err
}

meta, err := phlaredb.NewBlockQuerier(ctx, bucket).BlockMeta(ctx, params.BlockIds[0])
if err != nil {
return err
}

resp, err := phlaredb.NewSingleBlockQuerierFromMeta(ctx, bucket, meta).SelectMergePprof(
ctx,
&ingestv1.SelectProfilesRequest{
LabelSelector: params.Query,
Type: profileType,
Start: meta.MinTime.Time().UnixMilli(),
End: meta.MaxTime.Time().UnixMilli(),
},
512,
stackTraceSelectors,
)
if err != nil {
return errors.Wrap(err, "failed to query")
}

return outputMergeProfile(ctx, params.Output, resp)
}

func queryBlocksSeries(ctx context.Context, params *queryBlocksSeriesParams) error {
level.Info(logger).Log("msg", "query-block series", "labelNames", fmt.Sprintf("%v", params.LabelNames),
"blockIds", fmt.Sprintf("%v", params.BlockIds), "localPath", params.LocalPath, "bucketName", params.BucketName, "tenantId", params.TenantID)

bucket, err := getBucket(ctx, params)
bucket, err := getBucket(ctx, params.queryBlocksParams)
if err != nil {
return err
}
Expand Down Expand Up @@ -88,15 +161,15 @@ func queryBlocksSeries(ctx context.Context, params *queryBlocksSeriesParams) err
return outputSeries(response.Msg.LabelsSet)
}

func getBucket(ctx context.Context, params *queryBlocksSeriesParams) (objstore.Bucket, error) {
func getBucket(ctx context.Context, params *queryBlocksParams) (objstore.Bucket, error) {
if params.BucketName != "" {
return getRemoteBucket(ctx, params)
} else {
return filesystem.NewBucket(params.LocalPath)
}
}

func getRemoteBucket(ctx context.Context, params *queryBlocksSeriesParams) (objstore.Bucket, error) {
func getRemoteBucket(ctx context.Context, params *queryBlocksParams) (objstore.Bucket, error) {
if params.TenantID == "" {
return nil, errors.New("specify tenant id for remote bucket")
}
Expand Down
69 changes: 1 addition & 68 deletions cmd/profilecli/query.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,14 @@
package main

import (
"bytes"
"context"
"fmt"
"io"
"os"
"sort"
"strings"
"time"

"connectrpc.com/connect"
"github.com/dustin/go-humanize"
"github.com/go-kit/log/level"
gprofile "github.com/google/pprof/profile"
"github.com/grafana/dskit/runutil"
"github.com/k0kubun/pp/v3"
"github.com/klauspost/compress/gzip"
"github.com/mattn/go-isatty"
"github.com/olekukonko/tablewriter"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
Expand All @@ -32,12 +23,6 @@ import (
"github.com/grafana/pyroscope/pkg/operations"
)

const (
outputConsole = "console"
outputRaw = "raw"
outputPprof = "pprof="
)

func (c *phlareClient) queryClient() querierv1connect.QuerierServiceClient {
return querierv1connect.NewQuerierServiceClient(
c.httpClient(),
Expand Down Expand Up @@ -156,59 +141,7 @@ func selectMergeProfile(ctx context.Context, client *phlareClient, outputFlag st
return errors.Wrap(err, "failed to query")
}

mypp := pp.New()
mypp.SetColoringEnabled(isatty.IsTerminal(os.Stdout.Fd()))
mypp.SetExportedOnly(true)

if outputFlag == outputConsole {
buf, err := resp.Msg.MarshalVT()
if err != nil {
return errors.Wrap(err, "failed to marshal protobuf")
}

p, err := gprofile.Parse(bytes.NewReader(buf))
if err != nil {
return errors.Wrap(err, "failed to parse profile")
}

fmt.Fprintln(output(ctx), p.String())
return nil

}

if outputFlag == outputRaw {
mypp.Print(resp.Msg)
return nil
}

if strings.HasPrefix(outputFlag, outputPprof) {
filePath := strings.TrimPrefix(outputFlag, outputPprof)
if filePath == "" {
return errors.New("no file path specified after pprof=")
}
buf, err := resp.Msg.MarshalVT()
if err != nil {
return errors.Wrap(err, "failed to marshal protobuf")
}

// open new file, fail when the file already exists
f, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0644)
if err != nil {
return errors.Wrap(err, "failed to create pprof file")
}
defer runutil.CloseWithErrCapture(&err, f, "failed to close pprof file")

gzipWriter := gzip.NewWriter(f)
defer runutil.CloseWithErrCapture(&err, gzipWriter, "failed to close pprof gzip writer")

if _, err := io.Copy(gzipWriter, bytes.NewReader(buf)); err != nil {
return errors.Wrap(err, "failed to write pprof")
}

return nil
}

return errors.Errorf("unknown output %s", outputFlag)
return outputMergeProfile(ctx, outputFlag, resp.Msg)
}

type queryGoPGOParams struct {
Expand Down

0 comments on commit fabe9a7

Please sign in to comment.