Skip to content

Commit

Permalink
changefeedccl: add webhook sink benchmark
Browse files Browse the repository at this point in the history
This patch adds a micro-benchmark for the webhook sink. Note that we may be able
to refactor the test framework in a way that will be applicable to all sink
types. We will defer this later. This is just a first step.

Part of: #133293
Release note: none

```
BenchmarkWebhookSink/parallelism=1/opt=generic
BenchmarkWebhookSink/parallelism=1/opt=generic          1000000000               0.1811 ns/op
BenchmarkWebhookSink/parallelism=1/opt=opts_zero_value
BenchmarkWebhookSink/parallelism=1/opt=opts_zero_value  1000000000               0.1683 ns/op
BenchmarkWebhookSink/parallelism=2/opt=generic
BenchmarkWebhookSink/parallelism=2/opt=generic          1000000000               0.09745 ns/op
BenchmarkWebhookSink/parallelism=2/opt=opts_zero_value
BenchmarkWebhookSink/parallelism=2/opt=opts_zero_value  1000000000               0.03203 ns/op
BenchmarkWebhookSink/parallelism=3/opt=generic
BenchmarkWebhookSink/parallelism=3/opt=generic          1000000000               0.1371 ns/op
BenchmarkWebhookSink/parallelism=3/opt=opts_zero_value
BenchmarkWebhookSink/parallelism=3/opt=opts_zero_value  1000000000               0.06612 ns/op
BenchmarkWebhookSink/parallelism=4/opt=opts_zero_value
BenchmarkWebhookSink/parallelism=4/opt=opts_zero_value  1000000000               0.09639 ns/op
BenchmarkWebhookSink/parallelism=4/opt=generic
BenchmarkWebhookSink/parallelism=4/opt=generic          1000000000               0.04369 ns/op
```
  • Loading branch information
wenyihu6 committed Oct 25, 2024
1 parent 9a5c5a6 commit 8f7d627
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ go_test(
"scheduled_changefeed_test.go",
"schema_registry_test.go",
"show_changefeed_jobs_test.go",
"sink_bench_test.go",
"sink_cloudstorage_test.go",
"sink_kafka_connection_test.go",
"sink_kafka_v2_test.go",
Expand Down
119 changes: 119 additions & 0 deletions pkg/ccl/changefeedccl/sink_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright 2024 The Cockroach Authors.

// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package changefeedccl

import (
"context"
"fmt"
"math/rand"
"net/url"
"testing"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/sql/randgen"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc/keyside"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

func setUpWebHookSinks(b *testing.B) (*cdctest.MockWebhookSink, string) {
cert, certEncoded, err := cdctest.NewCACertBase64Encoded()
require.NoError(b, err)
sinkDest, err := cdctest.StartMockWebhookSink(cert)
require.NoError(b, err)

sinkDestHost, err := url.Parse(sinkDest.URL())
require.NoError(b, err)

params := sinkDestHost.Query()
params.Set(changefeedbase.SinkParamCACert, certEncoded)
sinkDestHost.RawQuery = params.Encode()
return sinkDest, sinkDestHost.String()
}

func generateRandomizedBytes(rand *rand.Rand) []byte {
const tableID = 42
dataTypes := []*types.T{types.String, types.Int, types.Decimal, types.Bytes, types.Bool, types.Date, types.Timestamp, types.Float}
randType := dataTypes[rand.Intn(len(dataTypes))]

key, err := keyside.Encode(
keys.SystemSQLCodec.TablePrefix(tableID),
randgen.RandDatumSimple(rand, randType),
encoding.Ascending,
)
if err != nil {
panic(err)
}
return key
}

func runWebhookSink(b *testing.B, sinkSrc Sink, rng *rand.Rand, enc Encoder) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
bytes := generateRandomizedBytes(rng)
ts := hlc.Timestamp{WallTime: rand.Int63() + 1}
n := rng.Intn(10)
switch {
case n < 6:
require.NoError(b, sinkSrc.EmitRow(context.Background(), noTopic{}, bytes, bytes, zeroTS, zeroTS, zeroAlloc))
case n < 9:
require.NoError(b, sinkSrc.Flush(context.Background()))
case n < 10:
require.NoError(b, sinkSrc.EmitResolvedTimestamp(context.Background(), enc, ts))
}
}
}

func BenchmarkWebhookSink(b *testing.B) {
defer leaktest.AfterTest(b)()
defer log.Scope(b).Close(b)

webhookSinkTestfn := func(parallelism int, opts map[string]string) {
sinkDest, sinkDestHost := setUpWebHookSinks(b)
defer sinkDest.Close()
details := jobspb.ChangefeedDetails{
SinkURI: fmt.Sprintf("webhook-%s", sinkDestHost),
Opts: opts,
}
sinkSrc, err := setupWebhookSinkWithDetails(context.Background(), details, parallelism, timeutil.DefaultTimeSource{})
require.NoError(b, err)
defer require.NoError(b, sinkSrc.Close())

rng, _ := randutil.NewTestRand()
ctx := context.Background()
require.NoError(b, err)
encodingOpts, err := changefeedbase.MakeStatementOptions(opts).GetEncodingOptions()
require.NoError(b, err)
enc, err := makeJSONEncoder(ctx, jsonEncoderOptions{EncodingOptions: encodingOpts})
require.NoError(b, err)
runWebhookSink(b, sinkSrc, rng, enc)
}

opts := getGenericWebhookSinkOptions()
optsZeroValueConfig := getGenericWebhookSinkOptions(
struct {
key string
value string
}{changefeedbase.OptWebhookSinkConfig,
`{"Retry":{"Backoff": "5ms"},"Flush":{"Bytes": 0, "Frequency": "0s", "Messages": 0}}`})
for i := 1; i <= 4; i++ {
for optName, opt := range map[string]changefeedbase.StatementOptions{"generic": opts, "opts_zero_value": optsZeroValueConfig} {
b.Run(fmt.Sprintf("parallelism=%d/opt=%s", i, optName), func(b *testing.B) {
webhookSinkTestfn(i, opt.AsMap())
})
}
}
}

0 comments on commit 8f7d627

Please sign in to comment.