Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compact: add shared compaction pool for multiple stores #3880

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

anish-shanbhag
Copy link
Contributor

@anish-shanbhag anish-shanbhag commented Aug 21, 2024

This change adds a new compaction pool which enforces a global max
compaction concurrency in a multi-store configuration. Each Pebble store
(i.e. an instance of *DB) still maintains its own per-store compaction
concurrency which is controlled by opts.MaxConcurrentCompactions.
However, in a multi-store configuration, disk I/O is a per-store resource
while CPU is shared across stores. A significant portion of compaction
is CPU-intensive, and so this ensures that excessive compactions don't
interrupt foreground CPU tasks even if the disks are capable of handling
the additional throughput from those compactions.

The shared compaction concurrency only applies to automatic and manual
compactions. This means that delete-only compactions are excluded because
they are expected to be cheap, as are flushes because they should never be
blocked.

Fixes: #3813
Informs: cockroachdb/cockroach#74697

@cockroach-teamcity
Copy link
Member

This change is Reviewable

@anish-shanbhag anish-shanbhag force-pushed the multi-store-compactions branch 2 times, most recently from 96141b6 to ed2ae1e Compare August 23, 2024 13:07
@anish-shanbhag
Copy link
Contributor Author

I initially chose to use a global max compaction concurrency of runtime.GOMAXPROCS(0) (i.e. # of cpus) based on the analysis here where we saw that compactions were 80-90% CPU bound.

I've been testing this change on roachprod and it looks like the compaction pool behavior is working as intended, i.e. the global compaction concurrency is enforced and stores with higher level scores are prioritized. However, I'm seeing some interesting results from the workloads that I ran.

In order for the compaction pool to start limiting compactions in the first place, there needs to be a situation where most or all of the stores on a node have enough compaction debt/L0 sublevels that they want to schedule multiple compactions relative to the number of CPUs on the node. So, I was experimenting with workloads that overload all stores on a node with writes. I ended up with the following:

# Create a 3 node cluster with 16 CPUs + 16 SSDs.
roachprod create $CLUSTER --nodes 3 --gce-machine-type n2-standard-16 \
    --gce-local-ssd-count 16 --gce-enable-multiple-stores \
    --max-concurrency 0
roachprod put $CLUSTER artifacts/cockroach cockroach

roachprod wipe $CLUSTER
# Only the first node will actually store any ranges. Increase the cache
# size so that compactions spend less time in I/O.
roachprod start $CLUSTER:1 --store-count 16 --args --cache=0.35
# Node 2 will be used as a SQL gateway. Node 3 will be used to run the workloads.
# The intention here is to leave node 1 with as much CPU as possible to perform
# both compactions and KV requests.
roachprod start $CLUSTER:2

roachprod sql $CLUSTER:1 -- -e "SET CLUSTER SETTING kv.range_split.by_load.enabled = 'false';"

roachprod adminui $CLUSTER:1 --open

run() {
    DB=$1
    roachprod sql $CLUSTER:1 -- -e "CREATE DATABASE db$DB;"
    # Constrain this DB to a specific store on node 1.
    CONSTRAINTS="[+node1store$DB]"
    roachprod sql $CLUSTER:1 -- -e "ALTER DATABASE db$DB CONFIGURE ZONE USING constraints = '$CONSTRAINTS', voter_constraints = '$CONSTRAINTS', num_replicas = 1, num_voters = 1;" 
    MIN_BLOCK_SIZE=32
    # Large block size so that we overload the node.
    MAX_BLOCK_SIZE=65536
    roachprod run $CLUSTER:3 "./cockroach workload init kv --db db$DB" {pgurl:2}
    # Run a 5% read, 95% write, 400 ops/sec KV workload for this store. 
    roachprod run $CLUSTER:3 "./cockroach workload run kv --db db$DB --duration=30m --display-every 10s --read-percent 5 --concurrency 512 --max-rate 400 --min-block-bytes $MIN_BLOCK_SIZE --max-block-bytes $MAX_BLOCK_SIZE --tolerate-errors" {pgurl:2} &> db$DB-original &
}

for DB in {1..16}; do
    # Run 16 concurrent KV workloads, one for each store.
    # This means a total of 6400 ops/sec.
    run $DB &
done

Below is a comparison of read amp with the compaction pool disabled (top) and enabled (bottom):
Pasted Graphic 4
Pasted Graphic 8
p99 SQL latency:
Pasted Graphic 7
Pasted Graphic 10
And here is the SQL throughput with the compaction pool enabled. The workload with the pool disabled was able to maintain the peak 6400 ops/sec more consistently than the below graph.
Pasted Graphic 9

It looks like read amp and SQL throughput are worse with the compaction pool enabled. After some investigation, I believe this is because the compactions in this workload are primarily I/O bound, unlike the compactions in the DRT cluster investigation above. A CPU profile showed syscalls like pread to make up the majority of compaction time, as opposed to the DRT cluster where block compression was much more significant. CPU usage also capped out at ~85-90% on node 1 while disk throughput was >1 GiB/s per store. I tried rerunning the workload with the global compaction concurrency set to num cpus * 2 assuming that these compactions might be closer to 50% CPU work, but didn't see much improvement:
image
image
After performing these tests, I'm curious if there's something specific in snapshot rebalancing or otherwise which caused the compactions in the DRT cluster to be much more CPU-heavy. I tried reproducing the situation we saw there using the following:

# 5 node cluster with 8 CPUs and 8 SSDs each
roachprod create $CLUSTER --nodes 5 --gce-machine-type n2-standard-8 --gce-local-ssd-count 8 --gce-enable-multiple-stores --max-concurrency 0
roachprod stage $CLUSTER release v24.2.0-beta.3


roachprod wipe $CLUSTER

# Nodes 1-3 are initially the only ones in the cluster
roachprod start $CLUSTER:1-3 --store-count 8 --args --cache=0.35

# Increase snapshot rebalancing rate
roachprod sql $CLUSTER:1 -- -e "SET CLUSTER SETTING kv.snapshot_rebalance.max_rate = '256 MiB';"

# # ~500 GiB of table data
roachprod run $CLUSTER:5 "./cockroach workload fixtures import tpcc --warehouses=10000 {pgurl:1}"

# Use node 5 as a workload node
roachprod run $CLUSTER:5 "./cockroach workload run tpcc --warehouses 10000 --wait false --workers=32 --max-rate=600 --duration 30m --tolerate-errors {pgurl:1-3}" &> tpcc &

sleep 180
# Start node 4 a few minutes later. Check for performance dips during upreplication/snapshot rebalancing.
roachprod start $CLUSTER:4

However, it seemed like snapshot rebalancing was still slow enough that there was almost no drop in performance:
image

I'm open to more ideas for how we can induce a CPU-heavy compaction overload. In general though, from this investigation it seems likely that some compactions will be more CPU-bound than others. It might be worth thinking about introducing a more granular limit on the concurrency to account for this. One idea could be to only limit the number of concurrent compactions that are performing actual CPU work by releasing locks when compactions perform I/O via data block writes. However, I'm not sure if this would cause excessive overhead from locking/unlocking in situations where most blocks are already in the cache.

@anish-shanbhag anish-shanbhag marked this pull request as ready for review August 23, 2024 14:22
@anish-shanbhag anish-shanbhag requested a review from a team as a code owner August 23, 2024 14:22
@anish-shanbhag
Copy link
Contributor Author

Summarizing some suggestions from @itsbilal here:

The fairly large block size of 65536 is likely the main reason why the compactions above are mostly I/O bound. But when we lower the max block size to something more reasonable in the range of 512-4096 B, there isn't enough compaction debt buildup to produce a large increase in the compaction concurrency.

The 16 concurrent workloads is also pretty unwieldy to debug, and pinning the DB to one node is pretty unusual. I tweaked the KV roachprod test to this:

# Create a 4 node cluster with 16 CPUs + 16 SSDs each.
roachprod create $CLUSTER --nodes 4 --gce-machine-type n2-standard-16 \
    --gce-local-ssd-count 16 --gce-enable-multiple-stores \
    --max-concurrency 0
roachprod put $CLUSTER artifacts/cockroach cockroach

roachprod wipe $CLUSTER
# Increase the cache size so that compactions spend less time in I/O.
# The last node is only used to run the workload.
roachprod start $CLUSTER:1-3 --store-count 16 --args --cache=0.35

roachprod adminui $CLUSTER:1 --open

MIN_BLOCK_SIZE=32
MAX_BLOCK_SIZE=512

roachprod run $CLUSTER:3 "./cockroach workload init kv" {pgurl:1}
roachprod run $CLUSTER:4 "./cockroach workload run kv --duration=30m --display-every 10s --read-percent 5 --concurrency 64 --min-block-bytes $MIN_BLOCK_SIZE --max-block-bytes $MAX_BLOCK_SIZE --tolerate-errors" {pgurl:1-3}

This is a much more realistic case, but even with an unbounded workload rate there aren't enough compactions to cause a buildup of compaction debt. CPU usage for nodes 1-3 also only peaks at ~80%, and a profile shows that most of the CPU time is spent in SQL/KV rather than compactions:
image
profile.pb.gz
image
image

I'm unsure why we don't see queries/sec increase even further given the extra CPU headroom here.

I think that reproducing the case of snapshot rebalancing in the DRT cluster might be a better route for testing here. @sumeerbhola any ideas for how we can tweak the KV workload or the snapshot rebalancing test above to better test this?

Copy link
Collaborator

@sumeerbhola sumeerbhola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 7 of 7 files at r1, all commit messages.
Reviewable status: all files reviewed, 5 unresolved discussions (waiting on @anish-shanbhag and @itsbilal)


compaction.go line 1762 at r1 (raw file):

		d.mu.Lock()
		inProgress := d.getInProgressCompactionInfoLocked(nil)
		scores := d.mu.versions.picker.getScores(inProgress)

I was imagining a getHighestScore method that would return a tuple: (, ). Where the compaction kind can be a score based compaction (where the level-score will actually be populated), or one of the others we pick in pickAuto (whose scores would be 0). And the kinds would be ordered in the priority we like to run them e.g. tombstone-density compaction before elision only etc. We would have a tuple comparison function to pick the best DB and pass it back what we picked. So we will need to refactor things like pickTombstoneDensityCompaction to be cheap and not return a pickedCompaction(not do the work in pickedCompactionFromCandidateFile).


compaction.go line 1797 at r1 (raw file):

				d.mu.compact.manual[0].retries++
			}
			return

why is this repeated here when it happens in the for loop below?


compaction.go line 1826 at r1 (raw file):

		// be scheduled.
		d.compactionPool.waiting[d] = struct{}{}
		d.compactionPool.maybeScheduleWaitingCompactionLocked()

(I assumed yes) Is every DB on completing a compaction also calling compactionPool.maybeScheduleWaitingCompactionLocked()? Oh, maybe it cals this method since it also needs to check its local MaxConcurrentCompactions limit.


format_major_version.go line 405 at r1 (raw file):

		// Attempt to schedule a compaction to rewrite a file marked for
		// compaction.
		d.maybeScheduleCompactionPicker(func(picker compactionPicker, env compactionEnv) *pickedCompaction {

does this change mean a rewrite compaction may not be picked?


snapshot.go line 120 at r1 (raw file):

	// disk space by dropping obsolete records that were pinned by s.
	if e := s.db.mu.snapshots.earliest(); e > s.seqNum {
		s.db.maybeScheduleCompactionPicker(pickElisionOnly)

so an elision compaction may no longer get picked?

Copy link
Collaborator

@sumeerbhola sumeerbhola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sumeerbhola any ideas for how we can tweak the KV workload or the snapshot rebalancing test above to better test this?

I think the goal here should be narrower -- is the compaction pool successfully limiting the concurrency. We don't need to care about SQL latency, since we know there are scenarios where without such concurrency limiting, the SQL latency does suffer. And we don't even need roachtests for that narrow goal -- it can be tested via Pebble unit tests with many in-memory stores.

Also, when you do end-to-end testing for this via a roachtest, I would suggest using a single node cluster for it to be easy to understand. For our targeted roachtests that test a feature, we want to eliminate variability whenever possible.

Reviewable status: all files reviewed, 5 unresolved discussions (waiting on @anish-shanbhag and @itsbilal)

Copy link
Collaborator

@sumeerbhola sumeerbhola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: all files reviewed, 7 unresolved discussions (waiting on @anish-shanbhag and @itsbilal)


compaction.go line 1808 at r1 (raw file):

		}

		for len(d.mu.compact.manual) > 0 && d.mu.compact.compactingCount < maxCompactions {

I think we want manual compactions also counted against the global limit.


compaction.go line 1901 at r1 (raw file):

	go func() {
		d.compact(c, nil)
		d.compactionPool.mu.Lock()

this should be abstracted into a compactionCompleted method on the pool instead of fiddling with the internal state here. In general, everything in compactionPool should be hidden behind methods.


format_major_version.go line 405 at r1 (raw file):

Previously, sumeerbhola wrote…

does this change mean a rewrite compaction may not be picked?

I looked at our entry points for running a compaction and realized they are unnecessarily confusing. maybeScheduleCompaction is our normal entry point, which we call in various places to trigger a compaction if we are below the concurrency threshold. This includes when a compaction has completed. Two places use maybeScheduleCompactionPicker directly which seems like a premature optimization to have a simpler pickFunc, since those kinds of compactions are also picked under the usual pickAuto path. I assume you realized that and hence these simplifications.

It is also not clear why this loop repeatedly calls maybeScheduleCompaction. It should call it once in the function in case it needs to start one, but subsequent ones will get started when the started one completes.

Copy link
Contributor Author

@anish-shanbhag anish-shanbhag left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made some fairly large changes here based on the discussion with @sumeerbhola. Summarizing some of these below:

  • CompactionPool is now an interface rather than a struct. There are two implementations - UnlimitedCompactionPool which is intended to match the previous behavior where there was no global compaction concurrency limit, and PrioritizingCompactionPool which does enforce a limit. PrioritizingCompactionPool now prioritizes compactions from each DB by first comparing their compactionKind, and then breaking ties with pickedCompaction.score.
    • We had a discussion around the risk of incurring extra overhead from repeatedly trying to get a pickedCompaction from a DB in the case that other DBs had higher priority and thus the pickedCompaction would never be run. Even though the current implementation uses pickedCompaction directly to determine priority, I don't think this should be a concern here. The reason is that a DB's pickedCompaction is picked only once when we call maybeScheduleWaitingCompactionLocked, and then cached inside PrioritizingCompactionPool.waiting until it's invalidated by a call to maybeScheduleCompaction. In other words, the only time we'll have to pick a new compaction from a given DB is if a call to maybeScheduleCompaction indicates that a new compaction could be picked, which matches the current behavior anyway.
    • The other advantage of using a pickedCompaction instead of a tuple is that the prioritization logic becomes a lot easier when it is guaranteed that a compaction picked from a DB can definitely be run. Creating a pickedCompaction involves a variety of checks to ensure this, e.g. checking if any overlapping sstables are already compacting. If we choose a potential compaction from a DB thinking that it's the best one and then realize that the compaction can't actually be run, then we have to restart and go through all of the DBs again.
  • To increase clarity around the relative priority of compaction kinds, I've extracted all of the score-based compacting picking logic from compactionPickerByScore.pickAuto into its own method, compactionPickerByScore.pickScoreBasedCompaction.
  • Manual compactions are now also included in the compaction concurrency limit.

Reviewable status: 1 of 8 files reviewed, 7 unresolved discussions (waiting on @itsbilal and @sumeerbhola)


compaction.go line 1797 at r1 (raw file):

Previously, sumeerbhola wrote…

why is this repeated here when it happens in the for loop below?

The retries field here is only used in TestManualCompaction in order to verify that a manual compaction was retried if it doesn't get to immediately run (se #494). Ideally we would remove this but I'm not too familiar with the way that test works.


compaction.go line 1901 at r1 (raw file):

Previously, sumeerbhola wrote…

this should be abstracted into a compactionCompleted method on the pool instead of fiddling with the internal state here. In general, everything in compactionPool should be hidden behind methods.

This is now abstracted behind the CompactionFinished interface method.


format_major_version.go line 405 at r1 (raw file):

Previously, sumeerbhola wrote…

I looked at our entry points for running a compaction and realized they are unnecessarily confusing. maybeScheduleCompaction is our normal entry point, which we call in various places to trigger a compaction if we are below the concurrency threshold. This includes when a compaction has completed. Two places use maybeScheduleCompactionPicker directly which seems like a premature optimization to have a simpler pickFunc, since those kinds of compactions are also picked under the usual pickAuto path. I assume you realized that and hence these simplifications.

It is also not clear why this loop repeatedly calls maybeScheduleCompaction. It should call it once in the function in case it needs to start one, but subsequent ones will get started when the started one completes.

Yep, maybeScheduleCompactionPicker made the code path a bit more complex to reason about, so I've tried to simplify it a bit.

And yeah, I think a single call to maybeScheduleCompaction is sufficient here - I've updated this.

Copy link
Collaborator

@sumeerbhola sumeerbhola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks much better.

Reviewed 6 of 7 files at r2.
Reviewable status: 7 of 8 files reviewed, 10 unresolved discussions (waiting on @anish-shanbhag and @itsbilal)


compaction.go line 1901 at r3 (raw file):

			// We've found a higher priority pickedCompaction - first unlock the old
			// selectedDB, and then swap it out for the current DB.
			unlockDB(selectedDB)

On the surface it seems wrong that DB2 is being locked while DB1's locks are held in that the map ordering is arbitrary so the lock ordering can change, and result in a deadlock. It seems to work out here since there is only 1 thread doing this arbitrary ordering (since the pool's mutex is held while doing this). Definitely needs a comment if we stay with this structure.

I am also slightly nervous that these DB locks are held while iterating over all DBs, in terms of the length of the critical section. Though possibly that is ok.

The thing that seems not quite right is to use (a) stale pickedCompactions, (b) do the full work of picking a compaction when it is not going to run. I noticed there is a best-effort attempt in AddWaitingDB to drop an existing pickedCompaction to avoid (a), but there is no guarantee that a DB.maybeScheduleCompaction will be repeatedly called. If the last call to DB.maybeScheduleCompaction queued the DB in the pool and there are no flushes or compactions running for this DB, it won't be called again. Hmm, I had to reread your comment, and now I see why this is fine. If no one is calling DB.maybeScheduleCompaction the cached state is not stale, since nothing has changed in the version and no new manual compaction has been queued. This is worth a code comment.

I can see why (b) can be debated. Say a DB is running 4 compactions, and this will shrink to 0, and then grow back to 4 and back to 0. And there is always a compaction backlog. With 4 compactions running, we have picked another compaction that is not getting to run. Then after each compaction finishes, we will pick again, so 1 (original) + 1 (4 => 3 transition) + 1 + 1 + 1 (1 => 0 transition) = 5. The last one will get to run. Then we grow again to 4. So we picked 8 times and of which 4 got to run. If the worst we can come up with is a 2x multiplier in picked to what got to run, then that seems ok. See if you can construct anything worse.

I suspect there is still a problem with caching these pickedCompaction. The compactionEnv in which a pickedCompaction was created was reliant on DB.mu and DB.mu.versions.logLock(). Both have been released. This may be subtly working because we reacquire those locks for the pickedCompaction. Though I suspect there is a race where a compaction on this DB has completed and may have installed a new version (or an ingest has completed and installed a new version) and the call to invalidate the pickedCompaction races with some other DB's compaction finishing and calling the pool, and the latter gets ahead and we try to use this old pickedCompaction with a later version. There may be a way to make this work, but I am very wary of fragile code in this area. One solution would be to stop caching pickedCompaction. The pool will still get a pickedCompaction, but if it can't run it, because it has lost out to someone else, the map will only store its score tuple. We will still invalidate the score tuple the same way as the current code invalidates the pickedCompaction. It may slightly worsen the worst case analysis from the previous para, but that may be ok.


compaction.go line 1957 at r3 (raw file):

	// compaction work, so schedule them directly instead of using
	// d.compactionPool.
	d.tryScheduleDeleteOnlyCompaction()

this is unrelated to this PR, but it seems odd that don't have a for loop around d.tryScheduleDeleteOnlyCompaction() in the existing code (which this code inherited).


compaction.go line 1965 at r3 (raw file):

	// addWaitingDB below.
	d.mu.Unlock()
	d.compactionPool.AddWaitingDB(d)

It seems to me that the limits are being checked top-down. The pool checks its limit and it that permits, it calls into DB, which checks its limits. Which is totally reasonable, but we should mention this in a code comment.

Copy link
Collaborator

@sumeerbhola sumeerbhola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@anish-shanbhag I didn't look carefully at the refactoring you did of the existing code.
@itsbilal will be giving that a more careful read.

Reviewable status: 7 of 8 files reviewed, 10 unresolved discussions (waiting on @anish-shanbhag and @itsbilal)

Copy link
Contributor Author

@anish-shanbhag anish-shanbhag left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 7 of 8 files reviewed, 10 unresolved discussions (waiting on @itsbilal and @sumeerbhola)


compaction.go line 1901 at r3 (raw file):

On the surface it seems wrong that DB2 is being locked while DB1's locks are held in that the map ordering is arbitrary so the lock ordering can change, and result in a deadlock. It seems to work out here since there is only 1 thread doing this arbitrary ordering (since the pool's mutex is held while doing this). Definitely needs a comment if we stay with this structure.

Yeah, this is only valid because we hold compactionPool.mu. I'll add a comment

I can see why (b) can be debated. Say a DB is running 4 compactions, and this will shrink to 0, and then grow back to 4 and back to 0. And there is always a compaction backlog. With 4 compactions running, we have picked another compaction that is not getting to run. Then after each compaction finishes, we will pick again, so 1 (original) + 1 (4 => 3 transition) + 1 + 1 + 1 (1 => 0 transition) = 5. The last one will get to run. Then we grow again to 4. So we picked 8 times and of which 4 got to run. If the worst we can come up with is a 2x multiplier in picked to what got to run, then that seems ok. See if you can construct anything worse.

Just to confirm, I think you're describing a case where the compaction picked after each compaction finishes is lower priority than the compactions from another DB which also has a backlog. For example, we have DB 1 and DB 2 both of which currently have a per-store max concurrency of 4; DB 1 is running 1 compaction and DB 2 is running 4 compactions, and the global compaction concurrency is 5. In the case that all of the future picked compactions from DB 1 are higher priority than those of DB 2, and assuming all of DB 2's compactions finish before DB 1's compactions, we'd pick and discard compactions from DB 2 up to 4 times.

I think the analysis here is right; the worst case seems to be that each call to compactionPool.CompactionFinished results in at most 1 wasted pickedCompaction. This is because for every compaction that finishes, it must have been started via a pickedCompaction that actually got to run.

I suspect there is still a problem with caching these pickedCompaction. The compactionEnv in which a pickedCompaction was created was reliant on DB.mu and DB.mu.versions.logLock(). Both have been released. This may be subtly working because we reacquire those locks for the pickedCompaction. Though I suspect there is a race where a compaction on this DB has completed and may have installed a new version (or an ingest has completed and installed a new version) and the call to invalidate the pickedCompaction races with some other DB's compaction finishing and calling the pool, and the latter gets ahead and we try to use this old pickedCompaction with a later version. There may be a way to make this work, but I am very wary of fragile code in this area. One solution would be to stop caching pickedCompaction. The pool will still get a pickedCompaction, but if it can't run it, because it has lost out to someone else, the map will only store its score tuple. We will still invalidate the score tuple the same way as the current code invalidates the pickedCompaction. It may slightly worsen the worst case analysis from the previous para, but that may be ok.

You're right, I think there's a race here which I missed. We could have the following sequence of events:

At some point, db1 caches a pickedCompaction that doesn't get to run

db1 performs a compaction:
    Acquire db1.mu
    Acquire db1.mu.versions.logLock
    Perform the compaction and install the new version
    Release db1.mu.versions.logLock
    Call db1.maybeScheduleCompaction
        ...
        Release db1.mu
        <--- At this point, no locks are held for db1
        Acquire compactionPool.mu
        Call compactionPool.AddWaitingDB, which invalidates the currently cached compaction

At the point where no locks are held for db1, the following could happen from db2:

db2 finishes a compaction and calls compactionPool.CompactionFinished
    Acquire compactionPool.mu
    Acquire db1.mu
    Acquire db1.mu.versions.logLock
    <--- now we use the invalid cached pickedCompaction from db1

I think this could potentially be fixed by moving the cached pickedCompaction into a field protected under d.mu and atomically invalidate the cached pickedCompaction before we release d.mu inside maybeScheduleCompaction. This should guarantee that picked compactions are valid because invalidation will occur atomically with any compaction/ingestion/etc. that changes the version.

But in general, I agree that this structure is a bit fragile. I think only caching a score tuple would definitely make it easier to ensure we don't accidentally run an invalid compaction. The problem there is that we're still subject to the same data race as above, in that the cached score might be completely different than the score of the compaction that would actually be picked. Maybe the solution here is to both fix the race and cache the score. But if we're assuming that the score is accurate, then I guess there's a tradeoff we'd make by double checking that the picked compaction is valid, at the cost of doing extra work to re-pick the compaction (which should have been cacheable under that assumption).

Do you think the current metamorphic tests would be able to catch a subtle bug with the invalidation, if one exists beyond the data race above ?


compaction.go line 1957 at r3 (raw file):

Previously, sumeerbhola wrote…

this is unrelated to this PR, but it seems odd that don't have a for loop around d.tryScheduleDeleteOnlyCompaction() in the existing code (which this code inherited).

From my understanding it looks like this function schedules a single compaction encompassing all tables which currently can be deleted, by combining all of the currently resolvable deletion hints.

Copy link
Contributor Author

@anish-shanbhag anish-shanbhag left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a data-driven test which checks that the concurrency and prioritization is working as expected. The data race described by @sumeerbhola still exists though.

Reviewable status: 7 of 10 files reviewed, 10 unresolved discussions (waiting on @itsbilal and @sumeerbhola)

Copy link
Collaborator

@sumeerbhola sumeerbhola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 7 of 17 files reviewed, 10 unresolved discussions (waiting on @anish-shanbhag and @itsbilal)


compaction.go line 1901 at r3 (raw file):

I think this could potentially be fixed by moving the cached pickedCompaction into a field protected under d.mu and atomically invalidate the cached pickedCompaction before we release d.mu inside maybeScheduleCompaction. This should guarantee that picked compactions are valid because invalidation will occur atomically with any compaction/ingestion/etc. that changes the version.

I like this idea. With this, the caching is simply a performance optimization inside the DB and is hidden from the pool. The pool simply asks the DB for a score. If the DB happens to have a cached compaction it returns the score. If it doesn't, it picks a compaction, caches it, and returns the score. At all times, the pool is working with a fresh score.
All we need to do is clearly define the situations under which the cache must be invalidated, and ensure that it is done. I'm still wary of holding the DB mutex on the current best compaction score while looping over all the DBs. So it is possible that the score that we pick is now stale by the time we go to run the compaction. The staleness probability is low enough that there is no hard in occasionally running a suboptimal compaction.

The alternative of explicitly caching the score inside the pool and not caching the compaction inside the DB is easier to get right (same staleness concern), and has 2x + 1 worst case of number of picked compactions (instead of 2x). This may be a good first step, with a TODO to further improve later.

Copy link
Member

@itsbilal itsbilal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good work! Some minor cleanup-y comments while echoing Sumeer's suggestion to not having the pool grab db mutexes, and instead just asking the dbs for a score. It's easier to visualize and reason about the pool as "external" to the DBs instead of having it be intimately aware of the internals of makeCompactionEnv and maybeScheduleCompaction.

//
// DB.mu should NOT be held for any DB (including d) when AddWaitingDB is
// called.
AddWaitingDB(d *DB)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this could be ScheduleCompaction as it's not necessary that the pool will block here, and this method does the compaction scheduling too

a Outdated
@@ -0,0 +1,145 @@
=== RUN TestPrioritizingCompactionPool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unintentional commit?

compactionKindElisionOnly
compactionKindRead
compactionKindTombstoneDensity
compactionKindRewrite
compactionKindIngestedFlushable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would move to being right below compactionKindFlush - as it's basically a flush.

compactionKindElisionOnly
compactionKindRead
compactionKindTombstoneDensity
compactionKindRewrite
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rewrite should probably go above Read and below ElisionOnly. Copy can go below Rewrite.

@@ -192,6 +191,8 @@ type pickedCompaction struct {
score float64
// kind indicates the kind of compaction.
kind compactionKind
// isManual indicates whether this compaction was manually triggered.
isManual bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be denoted by the kind too, right? We don't need a separate bool for this.

// addWaitingDB below.
d.mu.Unlock()
d.compactionPool.AddWaitingDB(d)
d.mu.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's make this a defer

// Compaction picking needs a coherent view of a Version. In particular, we
// need to exclude concurrent ingestions from making a decision on which level
// to ingest into that conflicts with our compaction
// decision. versionSet.logLock provides the necessary mutual exclusion.
d.mu.versions.logLock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should just move the responsibility for the logLock to the caller. It's pretty hard to read the compaction pool code without knowing that makeCompactionEnv does the version set locking.

@sumeerbhola
Copy link
Collaborator

The alternative of explicitly caching the score inside the pool and not caching the compaction inside the DB is easier to get right (same staleness concern), and has 2x + 1 worst case of number of picked compactions (instead of 2x). This may be a good first step, with a TODO to further improve later.

I think in the case that there is a single queued DB (probably the common case), we don't want to ask it for a score, where it generates a pickedCompaction, then pick that score, and ask it again to run a compaction, which will cause it to have to pick again. And we don't want any complicated locking story with the pool holding DB mutexes. So we could simply make the pool fast path the case where it realizes that there is a single DB and simply ask it to run. Alternatively we could hide the locking inside DB by having the pool call the following interface:
func (*DB) GetScoreAndPotentiallyRun(func(score Score) bool)) bool

The pool would gather the set of DB's for which it doesn't have a cached score. If none, picks the highest score and tries to run it. If exactly one doesn't have a cached score (say only one DB is overloaded and it is getting to run most of the time), the pool knows the best other score and calls GetScoreAndPotentiallyRun and in the func compares the score and if the DB's score is best returns true. Then the DB tries to run, and returns true if it started the compaction. No wasted picking. If there are multiple DB's with no cached scores it has to cycle through them in arbitrary order. The last one still can use this optimization.

@anish-shanbhag anish-shanbhag force-pushed the multi-store-compactions branch 2 times, most recently from 930ad44 to 521a99d Compare August 30, 2024 18:36
This change adds a new compaction pool which enforces a global max
compaction concurrency in a multi-store configuration. Each Pebble store
(i.e. an instance of *DB) still maintains its own per-store compaction
concurrency which is controlled by `opts.MaxConcurrentCompactions`.
However, in a multi-store configuration, disk I/O is a per-store resource
while CPU is shared across stores. A significant portion of compaction
is CPU-intensive, and so this ensures that excessive compactions don't
interrupt foreground CPU tasks even if the disks are capable of handling
the additional throughput from those compactions.

The shared compaction concurrency only applies to automatic compactions
This means that delete-only compactions are excluded because they are
expected to be cheap, as are flushes because they should never be
blocked.

Fixes: cockroachdb#3813
Informs: cockroachdb/cockroach#74697
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

db: shared compaction concurrency limit across multiple Pebble instances
4 participants