Skip to content

Commit

Permalink
compact: add shared compaction pool for multiple stores
Browse files Browse the repository at this point in the history
  • Loading branch information
anish-shanbhag committed Aug 22, 2024
1 parent 94561af commit 96141b6
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 57 deletions.
161 changes: 115 additions & 46 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
"context"
"fmt"
"math"
"runtime"
"runtime/pprof"
"slices"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -1644,21 +1646,10 @@ func (d *DB) maybeScheduleCompactionAsync() {
d.mu.Unlock()
}

// maybeScheduleCompaction schedules a compaction if necessary.
//
// d.mu must be held when calling this.
func (d *DB) maybeScheduleCompaction() {
d.maybeScheduleCompactionPicker(pickAuto)
}

func pickAuto(picker compactionPicker, env compactionEnv) *pickedCompaction {
return picker.pickAuto(env)
}

func pickElisionOnly(picker compactionPicker, env compactionEnv) *pickedCompaction {
return picker.pickElisionOnlyCompaction(env)
}

// tryScheduleDownloadCompaction tries to start a download compaction.
//
// Returns true if we started a download compaction (or completed it
Expand All @@ -1683,27 +1674,15 @@ func (d *DB) tryScheduleDownloadCompaction(env compactionEnv, maxConcurrentDownl
return false
}

// maybeScheduleCompactionPicker schedules a compaction if necessary,
// calling `pickFunc` to pick automatic compactions.
// withCompactionEnv runs the specified function after initializing the
// compaction picking environment. If the DB is read-only or has already been
// closed, the function will not be run.
//
// Requires d.mu to be held.
func (d *DB) maybeScheduleCompactionPicker(
pickFunc func(compactionPicker, compactionEnv) *pickedCompaction,
) {
func (d *DB) withCompactionEnv(f func(env compactionEnv)) {
if d.closed.Load() != nil || d.opts.ReadOnly {
return
}
maxCompactions := d.opts.MaxConcurrentCompactions()
maxDownloads := d.opts.MaxConcurrentDownloads()

if d.mu.compact.compactingCount >= maxCompactions &&
(len(d.mu.compact.downloads) == 0 || d.mu.compact.downloadingCount >= maxDownloads) {
if len(d.mu.compact.manual) > 0 {
// Inability to run head blocks later manual compactions.
d.mu.compact.manual[0].retries++
}
return
}

// Compaction picking needs a coherent view of a Version. In particular, we
// need to exclude concurrent ingestions from making a decision on which level
Expand All @@ -1722,9 +1701,91 @@ func (d *DB) maybeScheduleCompactionPicker(
diskAvailBytes: d.diskAvailBytes.Load(),
earliestSnapshotSeqNum: d.mu.snapshots.earliest(),
earliestUnflushedSeqNum: d.getEarliestUnflushedSeqNumLocked(),
inProgressCompactions: d.getInProgressCompactionInfoLocked(nil),
readCompactionEnv: readCompactionEnv{
readCompactions: &d.mu.compact.readCompactions,
flushing: d.mu.compact.flushing || d.passedFlushThreshold(),
rescheduleReadCompaction: &d.mu.compact.rescheduleReadCompaction,
},
}

f(env)
}

// CompactionPool enforces a global max compaction concurrency in a multi-store
// configuration.
type CompactionPool struct {
mu sync.Mutex
compactingCount int
waiting map[*DB]struct{}
maxCompactionConcurrency int
}

var defaultCompactionPool = &CompactionPool{
maxCompactionConcurrency: runtime.GOMAXPROCS(0),
}

// maybeScheduleWaitingDBLocked attempts to schedule a waiting compaction
// from the list of waiting DBs. It prioritizes the DB with the highest
// compaction score across all levels. If no DBs have a compaction score above
// the threshold, it effectively picks a DB at random.
//
// c.mu must be held. DB.mu must not be held for any DB.
func (c *CompactionPool) maybeScheduleWaitingDBLocked() {
if len(c.waiting) == 0 || c.compactingCount >= c.maxCompactionConcurrency {
return
}

// NB: highestScore starts at 1 so that we effectively have no preference
// between two DBs that don't have any level with a score above 1.
highestScore := float64(compactionScoreThreshold)
var pickedDB *DB
for d := range c.waiting {
if len(c.waiting) == 1 {
pickedDB = d
// No need to calculate scores if only one DB is waiting.
break
}
d.mu.Lock()
inProgress := d.getInProgressCompactionInfoLocked(nil)
scores := d.mu.versions.picker.getScores(inProgress)
if pickedDB == nil || scores[0] >= highestScore {
highestScore = scores[0]
pickedDB = d
}
d.mu.Unlock()
}

if d.mu.compact.compactingCount < maxCompactions {
pickedDB.mu.Lock()
if !pickedDB.tryScheduleAutoCompaction() {
// If we can't schedule a compaction for this DB right now, mark it as
// no longer waiting.
delete(c.waiting, pickedDB)
}
pickedDB.mu.Unlock()

c.maybeScheduleWaitingDBLocked()
}

// maybeScheduleCompaction schedules a compaction if necessary.
//
// Requires d.mu to be held.
func (d *DB) maybeScheduleCompaction() {
d.withCompactionEnv(func(env compactionEnv) {
maxDownloads := d.opts.MaxConcurrentDownloads()
for len(d.mu.compact.downloads) > 0 && d.mu.compact.downloadingCount < maxDownloads &&
d.tryScheduleDownloadCompaction(env, maxDownloads) {
}

maxCompactions := d.opts.MaxConcurrentCompactions()
if d.mu.compact.compactingCount >= maxCompactions {
if len(d.mu.compact.manual) > 0 {
// Inability to run head blocks later manual compactions.
d.mu.compact.manual[0].retries++
}
return
}

// Check for delete-only compactions first, because they're expected to be
// cheap and reduce future compaction work.
if !d.opts.private.disableDeleteOnlyCompactions &&
Expand All @@ -1741,14 +1802,15 @@ func (d *DB) maybeScheduleCompactionPicker(
}
d.mu.compact.manual = d.mu.compact.manual[1:]
}
})

for !d.opts.DisableAutomaticCompactions && d.mu.compact.compactingCount < maxCompactions &&
d.tryScheduleAutoCompaction(env, pickFunc) {
}
}

for len(d.mu.compact.downloads) > 0 && d.mu.compact.downloadingCount < maxDownloads &&
d.tryScheduleDownloadCompaction(env, maxDownloads) {
if !d.opts.DisableAutomaticCompactions {
d.mu.Unlock()
d.compactionPool.mu.Lock()
d.compactionPool.waiting[d] = struct{}{}
d.compactionPool.maybeScheduleWaitingDBLocked()
d.compactionPool.mu.Unlock()
d.mu.Lock()
}
}

Expand Down Expand Up @@ -1801,24 +1863,31 @@ func (d *DB) tryScheduleManualCompaction(env compactionEnv, manual *manualCompac
// Returns false if no automatic compactions are necessary or able to run at
// this time.
//
// Requires d.mu to be held.
func (d *DB) tryScheduleAutoCompaction(
env compactionEnv, pickFunc func(compactionPicker, compactionEnv) *pickedCompaction,
) bool {
env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil)
env.readCompactionEnv = readCompactionEnv{
readCompactions: &d.mu.compact.readCompactions,
flushing: d.mu.compact.flushing || d.passedFlushThreshold(),
rescheduleReadCompaction: &d.mu.compact.rescheduleReadCompaction,
// Requires d.mu and d.compactionPool.mu to be held.
func (d *DB) tryScheduleAutoCompaction() bool {
if d.mu.compact.compactingCount >= d.opts.MaxConcurrentCompactions() {
return false
}
pc := pickFunc(d.mu.versions.picker, env)

var pc *pickedCompaction
d.withCompactionEnv(func(env compactionEnv) {
pc = pickAuto(d.mu.versions.picker, env)
})

if pc == nil {
return false
}
c := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider())
d.mu.compact.compactingCount++
d.compactionPool.compactingCount++
d.addInProgressCompaction(c)
go d.compact(c, nil)
go func() {
d.compact(c, nil)
d.compactionPool.mu.Lock()
d.compactionPool.compactingCount--
d.compactionPool.maybeScheduleWaitingDBLocked()
d.compactionPool.mu.Unlock()
}()
return true
}

Expand Down
9 changes: 2 additions & 7 deletions compaction_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1413,16 +1413,11 @@ func TestCompactionPickerPickFile(t *testing.T) {
d.mu.Lock()
defer d.mu.Unlock()

// Use maybeScheduleCompactionPicker to take care of all of the
// initialization of the compaction-picking environment, but never
// pick a compaction; just call pickFile using the user-provided
// level.
var lf manifest.LevelFile
var ok bool
d.maybeScheduleCompactionPicker(func(untypedPicker compactionPicker, env compactionEnv) *pickedCompaction {
p := untypedPicker.(*compactionPickerByScore)
d.withCompactionEnv(func(env compactionEnv) {
p := d.mu.versions.picker.(*compactionPickerByScore)
lf, ok = pickCompactionSeedFile(p.vers, p.virtualBackings, opts, level, level+1, env.earliestSnapshotSeqNum)
return nil
})
if !ok {
return "(none)"
Expand Down
2 changes: 2 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@ type DB struct {
// compactionShedulers.Wait() should not be called while the DB.mu is held.
compactionSchedulers sync.WaitGroup

compactionPool *CompactionPool

// The main mutex protecting internal DB state. This mutex encompasses many
// fields because those fields need to be accessed and updated atomically. In
// particular, the current version, log.*, mem.*, and snapshot list need to
Expand Down
4 changes: 1 addition & 3 deletions format_major_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,7 @@ func (d *DB) compactMarkedFilesLocked() error {
for curr.Stats.MarkedForCompaction > 0 {
// Attempt to schedule a compaction to rewrite a file marked for
// compaction.
d.maybeScheduleCompactionPicker(func(picker compactionPicker, env compactionEnv) *pickedCompaction {
return picker.pickRewriteCompaction(env)
})
d.maybeScheduleCompaction()

// The above attempt might succeed and schedule a rewrite compaction. Or
// there might not be available compaction concurrency to schedule the
Expand Down
7 changes: 7 additions & 0 deletions open.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,13 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
}
d.calculateDiskAvailableBytes()

d.compactionPool = opts.CompactionPool
d.compactionPool.mu.Lock()
if d.compactionPool.waiting == nil {
d.compactionPool.waiting = make(map[*DB]struct{})
}
d.compactionPool.mu.Unlock()

d.maybeScheduleFlush()
d.maybeScheduleCompaction()

Expand Down
5 changes: 5 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,8 @@ type Options struct {
// The default value is 1.
MaxConcurrentCompactions func() int

CompactionPool *CompactionPool

// MaxConcurrentDownloads specifies the maximum number of download
// compactions. These are compactions that copy an external file to the local
// store.
Expand Down Expand Up @@ -1268,6 +1270,9 @@ func (o *Options) EnsureDefaults() *Options {
if o.MaxConcurrentCompactions == nil {
o.MaxConcurrentCompactions = func() int { return 1 }
}
if o.CompactionPool == nil {
o.CompactionPool = defaultCompactionPool
}
if o.MaxConcurrentDownloads == nil {
o.MaxConcurrentDownloads = func() int { return 1 }
}
Expand Down
2 changes: 1 addition & 1 deletion snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (s *Snapshot) closeLocked() error {
// If s was the previous earliest snapshot, we might be able to reclaim
// disk space by dropping obsolete records that were pinned by s.
if e := s.db.mu.snapshots.earliest(); e > s.seqNum {
s.db.maybeScheduleCompactionPicker(pickElisionOnly)
s.db.maybeScheduleCompaction()
}
s.db = nil
return nil
Expand Down

0 comments on commit 96141b6

Please sign in to comment.