Skip to content

Commit

Permalink
Merge pull request #132491 from wenyihu6/backport23.2.13-rc-115785
Browse files Browse the repository at this point in the history
release-23.2.13-rc: changefeedccl: add parallel io metrics
  • Loading branch information
rharding6373 authored Oct 11, 2024
2 parents 74520af + 6de39cc commit f00c152
Show file tree
Hide file tree
Showing 7 changed files with 296 additions and 100 deletions.
5 changes: 4 additions & 1 deletion docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,10 @@
<tr><td>APPLICATION</td><td>changefeed.nprocs_consume_event_nanos</td><td>Total time spent waiting to add an event to the parallel consumer</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.nprocs_flush_nanos</td><td>Total time spent idle waiting for the parallel consumer to flush</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.nprocs_in_flight_count</td><td>Number of buffered events in the parallel consumer</td><td>Count of Events</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.parallel_io_queue_nanos</td><td>Time spent with outgoing requests to the sink waiting in queue due to inflight requests with conflicting keys</td><td>Changefeeds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.parallel_io_in_flight_keys</td><td>The number of keys currently in-flight which may contend with batches pending to be emitted</td><td>Keys</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.parallel_io_pending_rows</td><td>Number of rows which are blocked from being sent due to conflicting in-flight keys</td><td>Keys</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.parallel_io_queue_nanos</td><td>Time that outgoing requests to the sink spend waiting in a queue due to in-flight requests with conflicting keys</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.parallel_io_result_queue_nanos</td><td>Time that incoming results from the sink spend waiting in parallel io emitter before they are acknowledged by the changefeed</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.queue_time_nanos</td><td>Time KV event spent waiting to be processed</td><td>Nanoseconds</td><td>COUNTER</td><td>NANOSECONDS</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.running</td><td>Number of currently running changefeeds, including sinkless</td><td>Changefeeds</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.schema_registry.registrations</td><td>Number of registration attempts with the schema registry</td><td>Registrations</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/changefeedccl/batching_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,11 @@ func (sb *sinkBatch) Keys() intsets.Fast {
return sb.keys
}

// NumMessages implements the IORequest interface.
func (sb *sinkBatch) NumMessages() int {
return sb.numMessages
}

func (sb *sinkBatch) isEmpty() bool {
return sb.numMessages == 0
}
Expand Down
69 changes: 69 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9471,3 +9471,72 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) {

cdcTest(t, testFn, feedTestForceSink("kafka"))
}

// TestParallelIOMetrics tests parallel io metrics.
func TestParallelIOMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
registry := s.Server.JobRegistry().(*jobs.Registry)
metrics := registry.MetricsStruct().Changefeed.(*Metrics).AggMetrics

// Add delay so queuing occurs, which results in the below metrics being
// nonzero.
defer testingEnableQueuingDelay()()

db := sqlutils.MakeSQLRunner(s.DB)
db.Exec(t, `SET CLUSTER SETTING changefeed.new_pubsub_sink_enabled = true`)
db.Exec(t, `SET CLUSTER SETTING changefeed.sink_io_workers = 1`)
db.Exec(t, `
CREATE TABLE foo (a INT PRIMARY KEY);
`)

// Keep writing data to the same key to ensure contention.
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()

g := ctxgroup.WithContext(ctx)
done := make(chan struct{})
g.Go(func() error {
for {
select {
case <-ctx.Done():
return nil
case <-done:
return nil
default:
_, err := s.DB.Exec(`UPSERT INTO foo (a) SELECT * FROM generate_series(1, 10)`)
if err != nil {
return err
}
}
}
})
// Set the frequency to 1s. The default frequency at the time of writing is
foo, err := f.Feed("CREATE CHANGEFEED FOR TABLE foo WITH pubsub_sink_config=" +
"'{\"Flush\": {\"Frequency\": \"100ms\"}}'")
require.NoError(t, err)

testutils.SucceedsSoon(t, func() error {
pendingKeys := metrics.ParallelIOPendingRows.Value()
if pendingKeys <= 0 {
return errors.Newf("waiting for pending keys: %d", pendingKeys)
}
return nil
})
testutils.SucceedsSoon(t, func() error {
for i := 0; i < 50; i++ {
inFlightKeys := metrics.ParallelIOInFlightKeys.Value()
if inFlightKeys > 0 {
return nil
}
}
return errors.New("waiting for in-flight keys")
})
close(done)
require.NoError(t, g.Wait())
require.NoError(t, foo.Close())
}
cdcTest(t, testFn, feedTestForceSink("pubsub"))
}
Loading

0 comments on commit f00c152

Please sign in to comment.