Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compact: add shared compaction pool for multiple stores #3880

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
450 changes: 329 additions & 121 deletions compaction.go

Large diffs are not rendered by default.

213 changes: 95 additions & 118 deletions compaction_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/humanize"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/internal/manifest"
)
Expand Down Expand Up @@ -192,6 +191,8 @@ type pickedCompaction struct {
score float64
// kind indicates the kind of compaction.
kind compactionKind
// isManual indicates whether this compaction was manually triggered.
isManual bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be denoted by the kind too, right? We don't need a separate bool for this.

// startLevel is the level that is being compacted. Inputs from startLevel
// and outputLevel will be merged to produce a set of outputLevel files.
startLevel *compactionLevel
Expand Down Expand Up @@ -1156,16 +1157,13 @@ func responsibleForGarbageBytes(virtualBackings *manifest.VirtualBackings, m *fi
return uint64(totalGarbage) / uint64(useCount)
}

// pickAuto picks the best compaction, if any.
//
// On each call, pickAuto computes per-level size adjustments based on
// in-progress compactions, and computes a per-level score. The levels are
// iterated over in decreasing score order trying to find a valid compaction
// anchored at that level.
//
// If a score-based compaction cannot be found, pickAuto falls back to looking
// for an elision-only compaction to remove obsolete keys.
// pickAuto picks the best compaction, if any. It first tries to find a
// score-based compaction; if one cannot be found, pickAuto falls back to
// various other compaction picking methods.
func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompaction) {
// We first check if this DB has reached its current max compaction
// concurrency.
//
// Compaction concurrency is controlled by L0 read-amp. We allow one
// additional compaction per L0CompactionConcurrency sublevels, as well as
// one additional compaction per CompactionDebtConcurrency bytes of
Expand All @@ -1184,107 +1182,9 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompact
}
}

scores := p.calculateLevelScores(env.inProgressCompactions)

// TODO(bananabrick): Either remove, or change this into an event sent to the
// EventListener.
logCompaction := func(pc *pickedCompaction) {
var buf bytes.Buffer
for i := 0; i < numLevels; i++ {
if i != 0 && i < p.baseLevel {
continue
}

var info *candidateLevelInfo
for j := range scores {
if scores[j].level == i {
info = &scores[j]
break
}
}

marker := " "
if pc.startLevel.level == info.level {
marker = "*"
}
fmt.Fprintf(&buf, " %sL%d: %5.1f %5.1f %5.1f %5.1f %8s %8s",
marker, info.level, info.compensatedScoreRatio, info.compensatedScore,
info.uncompensatedScoreRatio, info.uncompensatedScore,
humanize.Bytes.Int64(int64(totalCompensatedSize(
p.vers.Levels[info.level].Iter(),
))),
humanize.Bytes.Int64(p.levelMaxBytes[info.level]),
)

count := 0
for i := range env.inProgressCompactions {
c := &env.inProgressCompactions[i]
if c.inputs[0].level != info.level {
continue
}
count++
if count == 1 {
fmt.Fprintf(&buf, " [")
} else {
fmt.Fprintf(&buf, " ")
}
fmt.Fprintf(&buf, "L%d->L%d", c.inputs[0].level, c.outputLevel)
}
if count > 0 {
fmt.Fprintf(&buf, "]")
}
fmt.Fprintf(&buf, "\n")
}
p.opts.Logger.Infof("pickAuto: L%d->L%d\n%s",
pc.startLevel.level, pc.outputLevel.level, buf.String())
}

// Check for a score-based compaction. candidateLevelInfos are first sorted
// by whether they should be compacted, so if we find a level which shouldn't
// be compacted, we can break early.
for i := range scores {
info := &scores[i]
if !info.shouldCompact() {
break
}
if info.level == numLevels-1 {
continue
}

if info.level == 0 {
pc = pickL0(env, p.opts, p.vers, p.baseLevel)
// Fail-safe to protect against compacting the same sstable
// concurrently.
if pc != nil && !inputRangeAlreadyCompacting(env, pc) {
p.addScoresToPickedCompactionMetrics(pc, scores)
pc.score = info.compensatedScoreRatio
// TODO(bananabrick): Create an EventListener for logCompaction.
if false {
logCompaction(pc)
}
return pc
}
continue
}

// info.level > 0
var ok bool
info.file, ok = pickCompactionSeedFile(p.vers, p.virtualBackings, p.opts, info.level, info.outputLevel, env.earliestSnapshotSeqNum)
if !ok {
continue
}

pc := pickAutoLPositive(env, p.opts, p.vers, *info, p.baseLevel)
// Fail-safe to protect against compacting the same sstable concurrently.
if pc != nil && !inputRangeAlreadyCompacting(env, pc) {
p.addScoresToPickedCompactionMetrics(pc, scores)
pc.score = info.compensatedScoreRatio
// TODO(bananabrick): Create an EventListener for logCompaction.
if false {
logCompaction(pc)
}
return pc
}
// Score-based compactions have the highest priority.
if pc := p.pickScoreBasedCompaction(env); pc != nil {
return pc
}

// Check for files which contain excessive point tombstones that could slow
Expand Down Expand Up @@ -1337,8 +1237,61 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompact
// MarkedForCompaction field is persisted in the manifest. That's okay. We
// previously would've ignored the designation, whereas now we'll re-compact
// the file in place.
if p.vers.Stats.MarkedForCompaction > 0 {
if pc := p.pickRewriteCompaction(env); pc != nil {
if pc := p.pickRewriteCompaction(env); pc != nil {
return pc
}

return nil
}

// pickScoreBasedCompaction computes per-level size adjustments based on
// in-progress compactions, and computes a per-level score. The levels are
// iterated over in decreasing score order trying to find a valid compaction
// anchored at that level.
func (p *compactionPickerByScore) pickScoreBasedCompaction(
env compactionEnv,
) (pc *pickedCompaction) {
scores := p.calculateLevelScores(env.inProgressCompactions)

// Check for a score-based compaction. candidateLevelInfos are first sorted
// by whether they should be compacted, so if we find a level which shouldn't
// be compacted, we can break early.
for i := range scores {
info := &scores[i]
if !info.shouldCompact() {
break
}
if info.level == numLevels-1 {
continue
}

withScore := func(pc *pickedCompaction) *pickedCompaction {
// Fail-safe to protect against compacting the same sstable
// concurrently.
if pc == nil || inputRangeAlreadyCompacting(env, pc) {
return nil
}
p.addScoresToPickedCompactionMetrics(pc, scores)
pc.score = info.compensatedScoreRatio
pc.kind = compactionKindDefault
return pc
}

if info.level == 0 {
if pc = withScore(pickL0(env, p.opts, p.vers, p.baseLevel)); pc != nil {
return pc
}
continue
}

// info.level > 0
var ok bool
info.file, ok = pickCompactionSeedFile(p.vers, p.virtualBackings, p.opts, info.level, info.outputLevel, env.earliestSnapshotSeqNum)
if !ok {
continue
}

if pc = withScore(pickAutoLPositive(env, p.opts, p.vers, *info, p.baseLevel)); pc != nil {
return pc
}
}
Expand Down Expand Up @@ -1426,7 +1379,12 @@ var markedForCompactionAnnotator = &manifest.Annotator[fileMetadata]{
// with various checks to ensure that the file still exists in the expected level
// and isn't already being compacted.
func (p *compactionPickerByScore) pickedCompactionFromCandidateFile(
candidate *fileMetadata, env compactionEnv, startLevel int, outputLevel int, kind compactionKind,
candidate *fileMetadata,
env compactionEnv,
startLevel int,
outputLevel int,
kind compactionKind,
score float64,
) *pickedCompaction {
if candidate == nil || candidate.IsCompacting() {
return nil
Expand All @@ -1453,6 +1411,7 @@ func (p *compactionPickerByScore) pickedCompactionFromCandidateFile(

pc := newPickedCompaction(p.opts, p.vers, startLevel, outputLevel, p.baseLevel)
pc.kind = kind
pc.score = score
pc.startLevel.files = inputs
pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, pc.startLevel.files.Iter())

Expand Down Expand Up @@ -1483,7 +1442,8 @@ func (p *compactionPickerByScore) pickElisionOnlyCompaction(
if candidate.LargestSeqNum >= env.earliestSnapshotSeqNum {
return nil
}
return p.pickedCompactionFromCandidateFile(candidate, env, numLevels-1, numLevels-1, compactionKindElisionOnly)
score := float64(max(candidate.Stats.RangeDeletionsBytesEstimate/max(candidate.Size, 1), candidate.Stats.NumDeletions/max(candidate.Stats.NumEntries, 1)))
return p.pickedCompactionFromCandidateFile(candidate, env, numLevels-1, numLevels-1, compactionKindElisionOnly, score)
}

// pickRewriteCompaction attempts to construct a compaction that
Expand All @@ -1492,13 +1452,17 @@ func (p *compactionPickerByScore) pickElisionOnlyCompaction(
// necessary. A rewrite compaction outputs files to the same level as
// the input level.
func (p *compactionPickerByScore) pickRewriteCompaction(env compactionEnv) (pc *pickedCompaction) {
if p.vers.Stats.MarkedForCompaction == 0 {
return nil
}

for l := numLevels - 1; l >= 0; l-- {
candidate := markedForCompactionAnnotator.LevelAnnotation(p.vers.Levels[l])
if candidate == nil {
// Try the next level.
continue
}
pc := p.pickedCompactionFromCandidateFile(candidate, env, l, l, compactionKindRewrite)
pc := p.pickedCompactionFromCandidateFile(candidate, env, l, l, compactionKindRewrite, 0)
if pc != nil {
return pc
}
Expand Down Expand Up @@ -1551,7 +1515,11 @@ func (p *compactionPickerByScore) pickTombstoneDensityCompaction(
}
}

return p.pickedCompactionFromCandidateFile(candidate, env, level, defaultOutputLevel(level, p.baseLevel), compactionKindTombstoneDensity)
if candidate == nil {
return nil
}

return p.pickedCompactionFromCandidateFile(candidate, env, level, defaultOutputLevel(level, p.baseLevel), compactionKindTombstoneDensity, candidate.Stats.TombstoneDenseBlocksRatio)
}

// pickAutoLPositive picks an automatic compaction for the candidate
Expand Down Expand Up @@ -1755,7 +1723,7 @@ func pickL0(env compactionEnv, opts *Options, vers *version, baseLevel int) (pc
return pc
}

func pickManualCompaction(
func newPickedManualCompaction(
vers *version, opts *Options, env compactionEnv, baseLevel int, manual *manualCompaction,
) (pc *pickedCompaction, retryLater bool) {
outputLevel := manual.level + 1
Expand All @@ -1779,6 +1747,11 @@ func pickManualCompaction(
return nil, true
}
pc = newPickedCompaction(opts, vers, manual.level, defaultOutputLevel(manual.level, baseLevel), baseLevel)
pc.kind = compactionKindDefault
pc.isManual = true
// NB: we set the score to math.MaxFloat64 so that manual compactions are
// always prioritized above automatic compactions.
pc.score = math.MaxFloat64
manual.outputLevel = pc.outputLevel.level
pc.startLevel.files = vers.Overlaps(manual.level, base.UserKeyBoundsInclusive(manual.start, manual.end))
if pc.startLevel.files.Empty() {
Expand Down Expand Up @@ -1909,6 +1882,10 @@ func pickReadTriggeredCompactionHelper(
return nil
}

// Prioritize read compactions with a smaller initial file size, since
// they will be cheaper to perform.
pc.score = -float64(outputOverlaps.SizeSum() + overlapSlice.SizeSum())

return pc
}

Expand Down
18 changes: 8 additions & 10 deletions compaction_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func TestCompactionPickerTargetLevel(t *testing.T) {
end: iEnd.UserKey,
}

pc, retryLater := pickManualCompaction(
pc, retryLater := newPickedManualCompaction(
pickerByScore.vers,
opts,
compactionEnv{
Expand Down Expand Up @@ -1413,17 +1413,15 @@ func TestCompactionPickerPickFile(t *testing.T) {
d.mu.Lock()
defer d.mu.Unlock()

// Use maybeScheduleCompactionPicker to take care of all of the
// initialization of the compaction-picking environment, but never
// pick a compaction; just call pickFile using the user-provided
// level.
var lf manifest.LevelFile
var ok bool
d.maybeScheduleCompactionPicker(func(untypedPicker compactionPicker, env compactionEnv) *pickedCompaction {
p := untypedPicker.(*compactionPickerByScore)
lf, ok = pickCompactionSeedFile(p.vers, p.virtualBackings, opts, level, level+1, env.earliestSnapshotSeqNum)
return nil
})
env := d.makeCompactionEnv()
if env == nil {
return "unable to lock the DB for compaction picking"
}
p := d.mu.versions.picker.(*compactionPickerByScore)
lf, ok = pickCompactionSeedFile(p.vers, p.virtualBackings, opts, level, level+1, env.earliestSnapshotSeqNum)
d.mu.versions.logUnlock()
if !ok {
return "(none)"
}
Expand Down
Loading
Loading