Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

goschedstats: add cluster setting to always do short sampling #133459

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
)
db.SQLKVResponseAdmissionQ = gcoords.Regular.GetWorkQueue(admission.SQLKVResponseWork)
db.AdmissionPacerFactory = gcoords.Elastic
goschedstats.RegisterSettings(st)
cbID := goschedstats.RegisterRunnableCountCallback(gcoords.Regular.CPULoad)
stopper.AddCloser(stop.CloserFn(func() {
goschedstats.UnregisterRunnableCountCallback(cbID)
Expand Down
3 changes: 3 additions & 0 deletions pkg/util/goschedstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/util/goschedstats",
visibility = ["//visibility:public"],
deps = [
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
Expand All @@ -22,6 +24,7 @@ go_test(
srcs = ["runnable_test.go"],
embed = [":goschedstats"],
deps = [
"//pkg/settings/cluster",
"//pkg/testutils",
"//pkg/util/timeutil",
"@com_github_stretchr_testify//require",
Expand Down
33 changes: 29 additions & 4 deletions pkg/util/goschedstats/runnable.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -59,6 +61,12 @@ var _ = numRunnableGoroutines
// interaction with processor idle state
// https://github.com/golang/go/issues/30740#issuecomment-471634471. See
// #66881.
//
// The use of underloadedRunnablePerProcThreshold does not provide sufficient
// protection against sluggish response in the admission control system, which
// uses these samples to adjust concurrency of request processing. So
// goschedstats.always_use_short_sample_period.enabled can be set to true to
// force this responsiveness.
const samplePeriodShort = time.Millisecond
const samplePeriodLong = 250 * time.Millisecond

Expand All @@ -73,6 +81,12 @@ const samplePeriodLong = 250 * time.Millisecond
// 0.1 runnable goroutine per proc.
const underloadedRunnablePerProcThreshold = 1 * toFixedPoint / 10

var alwaysUseShortSamplePeriodEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"goschedstats.always_use_short_sample_period.enabled",
"when set to true, the system always does 1ms sampling of runnable queue lengths",
false)

// We "report" the average value every reportingPeriod.
// Note: if this is changed from 1s, CumulativeNormalizedRunnableGoroutines()
// needs to be updated to scale the sum accordingly.
Expand Down Expand Up @@ -108,6 +122,7 @@ var callbackInfo struct {
// Multiple cbs are used only for tests which can run multiple CockroachDB
// nodes in a process.
cbs []callbackWithID
st *cluster.Settings
}

// RegisterRunnableCountCallback registers a callback to be run with the
Expand Down Expand Up @@ -155,6 +170,14 @@ func UnregisterRunnableCountCallback(id int64) {
callbackInfo.cbs = newCBs
}

// RegisterSettings provides a settings object that can be used to alter
// callback frequency.
func RegisterSettings(st *cluster.Settings) {
callbackInfo.mu.Lock()
defer callbackInfo.mu.Unlock()
callbackInfo.st = st
}

func init() {
go func() {
sst := schedStatsTicker{
Expand All @@ -167,8 +190,9 @@ func init() {
t := <-ticker.C
callbackInfo.mu.Lock()
cbs := callbackInfo.cbs
st := callbackInfo.st
callbackInfo.mu.Unlock()
sst.getStatsOnTick(t, cbs, ticker)
sst.getStatsOnTick(t, cbs, st, ticker)
}
}()
}
Expand All @@ -194,9 +218,9 @@ type schedStatsTicker struct {
localTotal, localEWMA uint64
}

// getStatsOnTick gets scheduler stats as the ticker has ticked.
// getStatsOnTick gets scheduler stats as the ticker has ticked. st can be nil.
func (s *schedStatsTicker) getStatsOnTick(
t time.Time, cbs []callbackWithID, ticker timeTickerInterface,
t time.Time, cbs []callbackWithID, st *cluster.Settings, ticker timeTickerInterface,
) {
if t.Sub(s.lastTime) > reportingPeriod {
var avgValue uint64
Expand All @@ -216,7 +240,8 @@ func (s *schedStatsTicker) getStatsOnTick(
// Both the mean over the last 1s, and the exponentially weighted average
// must be low for the system to be considered underloaded.
if avgValue < underloadedRunnablePerProcThreshold &&
s.localEWMA < underloadedRunnablePerProcThreshold {
s.localEWMA < underloadedRunnablePerProcThreshold &&
(st == nil || !alwaysUseShortSamplePeriodEnabled.Get(&st.SV)) {
// Underloaded, so switch to longer sampling period.
nextPeriod = samplePeriodLong
}
Expand Down
67 changes: 65 additions & 2 deletions pkg/util/goschedstats/runnable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
package goschedstats

import (
"context"
"fmt"
"runtime"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -73,7 +75,7 @@ func TestSchedStatsTicker(t *testing.T) {
// Tick every 1ms until the reportingPeriod has elapsed.
for i := 1; ; i++ {
now = now.Add(samplePeriodShort)
sst.getStatsOnTick(now, cbs, &tt)
sst.getStatsOnTick(now, cbs, nil, &tt)
if now.Sub(startTime) <= reportingPeriod {
// No reset of the time ticker.
require.Equal(t, 0, tt.numResets)
Expand All @@ -95,7 +97,7 @@ func TestSchedStatsTicker(t *testing.T) {
tt.numResets = 0
for i := 1; ; i++ {
now = now.Add(samplePeriodLong)
sst.getStatsOnTick(now, cbs, &tt)
sst.getStatsOnTick(now, cbs, nil, &tt)
if now.Sub(startTime) <= reportingPeriod {
// No reset of the time ticker.
require.Equal(t, 0, tt.numResets)
Expand All @@ -111,3 +113,64 @@ func TestSchedStatsTicker(t *testing.T) {
require.Equal(t, samplePeriodShort, tt.lastResetDuration)
require.Equal(t, samplePeriodShort, callbackSamplePeriod)
}

func TestSchedStatsTickerShortPeriodOverride(t *testing.T) {
ctx := context.Background()
var callbackSamplePeriod time.Duration
cb := func(numRunnable int, numProcs int, samplePeriod time.Duration) {
// Always underloaded.
require.Equal(t, 0, numRunnable)
require.Equal(t, 1, numProcs)
callbackSamplePeriod = samplePeriod
}
cbs := []callbackWithID{{cb, 0}}
now := timeutil.UnixEpoch
startTime := now
st := cluster.MakeTestingClusterSettings()
// Override to use short sample period.
alwaysUseShortSamplePeriodEnabled.Override(ctx, &st.SV, true)
// Start with long sample period.
sst := schedStatsTicker{
lastTime: now,
curPeriod: samplePeriodLong,
numRunnableGoroutines: func() (numRunnable int, numProcs int) { return 0, 1 },
}
tt := testTimeTicker{}
// Tick until the reportingPeriod has elapsed.
for i := 1; ; i++ {
now = now.Add(samplePeriodLong)
sst.getStatsOnTick(now, cbs, st, &tt)
if now.Sub(startTime) <= reportingPeriod {
// No reset of the time ticker.
require.Equal(t, 0, tt.numResets)
// Each tick causes a callback.
require.Equal(t, samplePeriodLong, callbackSamplePeriod)
} else {
break
}
}
// Sample period resets to short.
require.Equal(t, 1, tt.numResets)
require.Equal(t, samplePeriodShort, tt.lastResetDuration)
require.Equal(t, samplePeriodShort, callbackSamplePeriod)

// Tick again until the reportingPeriod has elapsed.
startTime = now
tt.numResets = 0
for i := 1; ; i++ {
now = now.Add(samplePeriodShort)
sst.getStatsOnTick(now, cbs, st, &tt)
if now.Sub(startTime) <= reportingPeriod {
// No reset of the time ticker.
require.Equal(t, 0, tt.numResets)
// Each tick causes a callback.
require.Equal(t, samplePeriodShort, callbackSamplePeriod)
} else {
break
}
}
// Still using short sample period.
require.Equal(t, 0, tt.numResets)
require.Equal(t, samplePeriodShort, tt.lastResetDuration)
require.Equal(t, samplePeriodShort, callbackSamplePeriod)
}
Loading