diff --git a/pkg/server/server.go b/pkg/server/server.go index ae5fe9a022ab..08a26c42b473 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -588,6 +588,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf ) db.SQLKVResponseAdmissionQ = gcoords.Regular.GetWorkQueue(admission.SQLKVResponseWork) db.AdmissionPacerFactory = gcoords.Elastic + goschedstats.RegisterSettings(st) cbID := goschedstats.RegisterRunnableCountCallback(gcoords.Regular.CPULoad) stopper.AddCloser(stop.CloserFn(func() { goschedstats.UnregisterRunnableCountCallback(cbID) diff --git a/pkg/util/goschedstats/BUILD.bazel b/pkg/util/goschedstats/BUILD.bazel index b31ce88bef1f..5797a1a82381 100644 --- a/pkg/util/goschedstats/BUILD.bazel +++ b/pkg/util/goschedstats/BUILD.bazel @@ -11,6 +11,8 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/util/goschedstats", visibility = ["//visibility:public"], deps = [ + "//pkg/settings", + "//pkg/settings/cluster", "//pkg/util/syncutil", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", @@ -22,6 +24,7 @@ go_test( srcs = ["runnable_test.go"], embed = [":goschedstats"], deps = [ + "//pkg/settings/cluster", "//pkg/testutils", "//pkg/util/timeutil", "@com_github_stretchr_testify//require", diff --git a/pkg/util/goschedstats/runnable.go b/pkg/util/goschedstats/runnable.go index 9d9395c25496..6edba1be59e2 100644 --- a/pkg/util/goschedstats/runnable.go +++ b/pkg/util/goschedstats/runnable.go @@ -9,6 +9,8 @@ import ( "sync/atomic" "time" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -59,6 +61,12 @@ var _ = numRunnableGoroutines // interaction with processor idle state // https://github.com/golang/go/issues/30740#issuecomment-471634471. See // #66881. +// +// The use of underloadedRunnablePerProcThreshold does not provide sufficient +// protection against sluggish response in the admission control system, which +// uses these samples to adjust concurrency of request processing. So +// goschedstats.always_use_short_sample_period.enabled can be set to true to +// force this responsiveness. const samplePeriodShort = time.Millisecond const samplePeriodLong = 250 * time.Millisecond @@ -73,6 +81,12 @@ const samplePeriodLong = 250 * time.Millisecond // 0.1 runnable goroutine per proc. const underloadedRunnablePerProcThreshold = 1 * toFixedPoint / 10 +var alwaysUseShortSamplePeriodEnabled = settings.RegisterBoolSetting( + settings.SystemOnly, + "goschedstats.always_use_short_sample_period.enabled", + "when set to true, the system always does 1ms sampling of runnable queue lengths", + false) + // We "report" the average value every reportingPeriod. // Note: if this is changed from 1s, CumulativeNormalizedRunnableGoroutines() // needs to be updated to scale the sum accordingly. @@ -108,6 +122,7 @@ var callbackInfo struct { // Multiple cbs are used only for tests which can run multiple CockroachDB // nodes in a process. cbs []callbackWithID + st *cluster.Settings } // RegisterRunnableCountCallback registers a callback to be run with the @@ -155,6 +170,14 @@ func UnregisterRunnableCountCallback(id int64) { callbackInfo.cbs = newCBs } +// RegisterSettings provides a settings object that can be used to alter +// callback frequency. +func RegisterSettings(st *cluster.Settings) { + callbackInfo.mu.Lock() + defer callbackInfo.mu.Unlock() + callbackInfo.st = st +} + func init() { go func() { sst := schedStatsTicker{ @@ -167,8 +190,9 @@ func init() { t := <-ticker.C callbackInfo.mu.Lock() cbs := callbackInfo.cbs + st := callbackInfo.st callbackInfo.mu.Unlock() - sst.getStatsOnTick(t, cbs, ticker) + sst.getStatsOnTick(t, cbs, st, ticker) } }() } @@ -194,9 +218,9 @@ type schedStatsTicker struct { localTotal, localEWMA uint64 } -// getStatsOnTick gets scheduler stats as the ticker has ticked. +// getStatsOnTick gets scheduler stats as the ticker has ticked. st can be nil. func (s *schedStatsTicker) getStatsOnTick( - t time.Time, cbs []callbackWithID, ticker timeTickerInterface, + t time.Time, cbs []callbackWithID, st *cluster.Settings, ticker timeTickerInterface, ) { if t.Sub(s.lastTime) > reportingPeriod { var avgValue uint64 @@ -216,7 +240,8 @@ func (s *schedStatsTicker) getStatsOnTick( // Both the mean over the last 1s, and the exponentially weighted average // must be low for the system to be considered underloaded. if avgValue < underloadedRunnablePerProcThreshold && - s.localEWMA < underloadedRunnablePerProcThreshold { + s.localEWMA < underloadedRunnablePerProcThreshold && + (st == nil || !alwaysUseShortSamplePeriodEnabled.Get(&st.SV)) { // Underloaded, so switch to longer sampling period. nextPeriod = samplePeriodLong } diff --git a/pkg/util/goschedstats/runnable_test.go b/pkg/util/goschedstats/runnable_test.go index 07e28e2dd57f..f65b68662713 100644 --- a/pkg/util/goschedstats/runnable_test.go +++ b/pkg/util/goschedstats/runnable_test.go @@ -6,11 +6,13 @@ package goschedstats import ( + "context" "fmt" "runtime" "testing" "time" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/require" @@ -73,7 +75,7 @@ func TestSchedStatsTicker(t *testing.T) { // Tick every 1ms until the reportingPeriod has elapsed. for i := 1; ; i++ { now = now.Add(samplePeriodShort) - sst.getStatsOnTick(now, cbs, &tt) + sst.getStatsOnTick(now, cbs, nil, &tt) if now.Sub(startTime) <= reportingPeriod { // No reset of the time ticker. require.Equal(t, 0, tt.numResets) @@ -95,7 +97,7 @@ func TestSchedStatsTicker(t *testing.T) { tt.numResets = 0 for i := 1; ; i++ { now = now.Add(samplePeriodLong) - sst.getStatsOnTick(now, cbs, &tt) + sst.getStatsOnTick(now, cbs, nil, &tt) if now.Sub(startTime) <= reportingPeriod { // No reset of the time ticker. require.Equal(t, 0, tt.numResets) @@ -111,3 +113,64 @@ func TestSchedStatsTicker(t *testing.T) { require.Equal(t, samplePeriodShort, tt.lastResetDuration) require.Equal(t, samplePeriodShort, callbackSamplePeriod) } + +func TestSchedStatsTickerShortPeriodOverride(t *testing.T) { + ctx := context.Background() + var callbackSamplePeriod time.Duration + cb := func(numRunnable int, numProcs int, samplePeriod time.Duration) { + // Always underloaded. + require.Equal(t, 0, numRunnable) + require.Equal(t, 1, numProcs) + callbackSamplePeriod = samplePeriod + } + cbs := []callbackWithID{{cb, 0}} + now := timeutil.UnixEpoch + startTime := now + st := cluster.MakeTestingClusterSettings() + // Override to use short sample period. + alwaysUseShortSamplePeriodEnabled.Override(ctx, &st.SV, true) + // Start with long sample period. + sst := schedStatsTicker{ + lastTime: now, + curPeriod: samplePeriodLong, + numRunnableGoroutines: func() (numRunnable int, numProcs int) { return 0, 1 }, + } + tt := testTimeTicker{} + // Tick until the reportingPeriod has elapsed. + for i := 1; ; i++ { + now = now.Add(samplePeriodLong) + sst.getStatsOnTick(now, cbs, st, &tt) + if now.Sub(startTime) <= reportingPeriod { + // No reset of the time ticker. + require.Equal(t, 0, tt.numResets) + // Each tick causes a callback. + require.Equal(t, samplePeriodLong, callbackSamplePeriod) + } else { + break + } + } + // Sample period resets to short. + require.Equal(t, 1, tt.numResets) + require.Equal(t, samplePeriodShort, tt.lastResetDuration) + require.Equal(t, samplePeriodShort, callbackSamplePeriod) + + // Tick again until the reportingPeriod has elapsed. + startTime = now + tt.numResets = 0 + for i := 1; ; i++ { + now = now.Add(samplePeriodShort) + sst.getStatsOnTick(now, cbs, st, &tt) + if now.Sub(startTime) <= reportingPeriod { + // No reset of the time ticker. + require.Equal(t, 0, tt.numResets) + // Each tick causes a callback. + require.Equal(t, samplePeriodShort, callbackSamplePeriod) + } else { + break + } + } + // Still using short sample period. + require.Equal(t, 0, tt.numResets) + require.Equal(t, samplePeriodShort, tt.lastResetDuration) + require.Equal(t, samplePeriodShort, callbackSamplePeriod) +}