Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make WriteAmpBasedRateLimiter's auto tune smooth window configurable #348

Merged
merged 5 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion include/rocksdb/rate_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ extern RateLimiter* NewWriteAmpBasedRateLimiter(
int64_t rate_bytes_per_sec, int64_t refill_period_us = 100 * 1000,
int32_t fairness = 10,
RateLimiter::Mode mode = RateLimiter::Mode::kWritesOnly,
bool auto_tuned = false);
bool auto_tuned = false, int tune_per_sec = 1,
size_t smooth_window_size = 300, size_t recent_window_size = 30);

} // namespace ROCKSDB_NAMESPACE
48 changes: 29 additions & 19 deletions utilities/rate_limiters/write_amp_based_rate_limiter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ struct WriteAmpBasedRateLimiter::Req {
};

namespace {
constexpr int kSecondsPerTune = 1;
constexpr int kMillisPerTune = 1000 * kSecondsPerTune;
constexpr int kMicrosPerTune = 1000 * 1000 * kSecondsPerTune;

// Due to the execution model of compaction, large waves of pending compactions
// could possibly be hidden behind a constant rate of I/O requests. It's then
// wise to raise the threshold slightly above estimation to ensure those
Expand All @@ -45,11 +41,10 @@ int64_t CalculatePadding(int64_t base) {
}
} // unnamed namespace

WriteAmpBasedRateLimiter::WriteAmpBasedRateLimiter(int64_t rate_bytes_per_sec,
int64_t refill_period_us,
int32_t fairness,
RateLimiter::Mode mode,
Env* env, bool auto_tuned)
WriteAmpBasedRateLimiter::WriteAmpBasedRateLimiter(
int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness,
RateLimiter::Mode mode, Env* env, bool auto_tuned, int secs_per_tune,
size_t smooth_window_size, size_t recent_window_size)
: RateLimiter(mode),
refill_period_us_(refill_period_us),
rate_bytes_per_sec_(auto_tuned ? rate_bytes_per_sec / 2
Expand All @@ -66,10 +61,14 @@ WriteAmpBasedRateLimiter::WriteAmpBasedRateLimiter(int64_t rate_bytes_per_sec,
rnd_((uint32_t)time(nullptr)),
leader_(nullptr),
auto_tuned_(auto_tuned),
secs_per_tune_(secs_per_tune == 0 ? 1 : secs_per_tune),
max_bytes_per_sec_(rate_bytes_per_sec),
tuned_time_(NowMicrosMonotonic(env_)),
duration_highpri_bytes_through_(0),
duration_bytes_through_(0),
bytes_sampler_(smooth_window_size, recent_window_size),
highpri_bytes_sampler_(smooth_window_size, recent_window_size),
limit_bytes_sampler_(recent_window_size, recent_window_size),
critical_pace_up_(false),
normal_pace_up_(false),
percent_delta_(0) {
Expand Down Expand Up @@ -139,10 +138,9 @@ void WriteAmpBasedRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
&rate_bytes_per_sec_);
if (auto_tuned_.load(std::memory_order_acquire) && pri == Env::IO_HIGH &&
duration_highpri_bytes_through_ + duration_bytes_through_ + bytes <=
max_bytes_per_sec_.load(std::memory_order_relaxed) *
kSecondsPerTune) {
max_bytes_per_sec_.load(std::memory_order_relaxed) * secs_per_tune_) {
// In the case where low-priority request is absent, actual time elapsed
// will be larger than kSecondsPerTune, making the limit even tighter.
// will be larger than secs_per_tune_, making the limit even tighter.
total_bytes_through_[Env::IO_HIGH] += bytes;
++total_requests_[Env::IO_HIGH];
duration_highpri_bytes_through_ += bytes;
Expand All @@ -153,7 +151,8 @@ void WriteAmpBasedRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,

if (auto_tuned_.load(std::memory_order_acquire)) {
std::chrono::microseconds now(NowMicrosMonotonic(env_));
if (now - tuned_time_ >= std::chrono::microseconds(kMicrosPerTune)) {
auto micros_per_tune = 1000 * 1000 * secs_per_tune_;
if (now - tuned_time_ >= std::chrono::microseconds(micros_per_tune)) {
Tune();
}
}
Expand Down Expand Up @@ -315,7 +314,7 @@ int64_t WriteAmpBasedRateLimiter::CalculateRefillBytesPerPeriod(
}

// The core function used to dynamically adjust the compaction rate limit,
// called **at most** once every `kSecondsPerTune`.
// called **at most** once every `secs_per_tune`.
// I/O throughput threshold is automatically tuned based on history samples of
// compaction and flush flow. This algorithm excels by taking into account the
// limiter's inability to estimate the pressure of pending compactions, and the
Expand All @@ -340,7 +339,8 @@ Status WriteAmpBasedRateLimiter::Tune() {
// This function can be called less frequent than we anticipate when
// compaction rate is low. Loop through the actual time slice to correct
// the estimation.
for (uint32_t i = 0; i < duration_ms / kMillisPerTune; i++) {
auto millis_per_tune = 1000 * secs_per_tune_;
for (uint32_t i = 0; i < duration_ms / millis_per_tune; i++) {
bytes_sampler_.AddSample(duration_bytes_through_ * 1000 / duration_ms);
highpri_bytes_sampler_.AddSample(duration_highpri_bytes_through_ * 1000 /
duration_ms);
Expand Down Expand Up @@ -412,13 +412,23 @@ RateLimiter* NewWriteAmpBasedRateLimiter(
int64_t rate_bytes_per_sec, int64_t refill_period_us /* = 100 * 1000 */,
int32_t fairness /* = 10 */,
RateLimiter::Mode mode /* = RateLimiter::Mode::kWritesOnly */,
bool auto_tuned /* = false */) {
bool auto_tuned /* = false */, int tune_per_sec /* = 1 */,
size_t smooth_window_size /* = 300 */,
size_t recent_window_size /* = 30 */) {
assert(rate_bytes_per_sec > 0);
assert(refill_period_us > 0);
assert(fairness > 0);
return new WriteAmpBasedRateLimiter(rate_bytes_per_sec, refill_period_us,
fairness, mode, Env::Default(),
auto_tuned);
assert(tune_per_sec >= 0);
assert(smooth_window_size >= recent_window_size);
if (smooth_window_size == 0) {
smooth_window_size = 300;
}
if (recent_window_size == 0) {
recent_window_size = 30;
}
return new WriteAmpBasedRateLimiter(
rate_bytes_per_sec, refill_period_us, fairness, mode, Env::Default(),
auto_tuned, tune_per_sec, smooth_window_size, recent_window_size);
}

} // namespace ROCKSDB_NAMESPACE
40 changes: 19 additions & 21 deletions utilities/rate_limiters/write_amp_based_rate_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ class WriteAmpBasedRateLimiter : public RateLimiter {
public:
WriteAmpBasedRateLimiter(int64_t refill_bytes, int64_t refill_period_us,
int32_t fairness, RateLimiter::Mode mode, Env* env,
bool auto_tuned);
bool auto_tuned, int secs_per_tune,
size_t auto_tune_smooth_window,
size_t auto_tune_recent_window);

virtual ~WriteAmpBasedRateLimiter();

Expand Down Expand Up @@ -119,47 +121,43 @@ class WriteAmpBasedRateLimiter : public RateLimiter {
port::Mutex auto_tuned_mutex_;

std::atomic<bool> auto_tuned_;
int secs_per_tune_;
std::atomic<int64_t> max_bytes_per_sec_;
std::chrono::microseconds tuned_time_;
int64_t duration_highpri_bytes_through_;
int64_t duration_bytes_through_;

template <size_t kWindowSize, size_t kRecentWindowSize = 1>
class WindowSmoother {
public:
WindowSmoother() {
static_assert(kWindowSize >= kRecentWindowSize,
"Expect recent window no larger than full window");
static_assert(kRecentWindowSize >= 1, "Expect window size larger than 0");
memset(data_, 0, sizeof(int64_t) * kWindowSize);
}
WindowSmoother(size_t smooth_window_size, size_t recent_window_size)
: smooth_window_size_(smooth_window_size),
recent_window_size_(recent_window_size),
data_(0, smooth_window_size) {}
void AddSample(int64_t v) {
auto recent_cursor =
(cursor_ + 1 + kWindowSize - kRecentWindowSize) % kWindowSize;
cursor_ = (cursor_ + 1) % kWindowSize;
(cursor_ + 1 + smooth_window_size_ - recent_window_size_) %
smooth_window_size_;
cursor_ = (cursor_ + 1) % smooth_window_size_;
full_sum_ += v - data_[cursor_];
recent_sum_ += v - data_[recent_cursor];
data_[cursor_] = v;
}
int64_t GetFullValue() { return full_sum_ / kWindowSize; }
int64_t GetRecentValue() { return recent_sum_ / kRecentWindowSize; }
int64_t GetFullValue() { return full_sum_ / smooth_window_size_; }
int64_t GetRecentValue() { return recent_sum_ / recent_window_size_; }
bool AtTimePoint() const { return cursor_ == 0; }

private:
uint32_t cursor_{0}; // point to the most recent sample
int64_t data_[kWindowSize];
size_t smooth_window_size_;
size_t recent_window_size_;
std::vector<size_t> data_;
int64_t full_sum_{0};
int64_t recent_sum_{0};
};

static constexpr size_t kSmoothWindowSize = 300; // 300 * 1s = 5m
static constexpr size_t kRecentSmoothWindowSize = 30; // 30 * 1s = 30s

WindowSmoother<kSmoothWindowSize, kRecentSmoothWindowSize> bytes_sampler_;
WindowSmoother<kSmoothWindowSize, kRecentSmoothWindowSize>
highpri_bytes_sampler_;
WindowSmoother<kRecentSmoothWindowSize, kRecentSmoothWindowSize>
limit_bytes_sampler_;
WindowSmoother bytes_sampler_;
WindowSmoother highpri_bytes_sampler_;
WindowSmoother limit_bytes_sampler_;
std::atomic<bool> critical_pace_up_;
std::atomic<bool> normal_pace_up_;
uint32_t percent_delta_;
Expand Down
9 changes: 5 additions & 4 deletions utilities/rate_limiters/write_amp_based_rate_limiter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ namespace ROCKSDB_NAMESPACE {
class WriteAmpBasedRateLimiterTest : public testing::Test {};

TEST_F(WriteAmpBasedRateLimiterTest, OverflowRate) {
WriteAmpBasedRateLimiter limiter(port::kMaxInt64, 1000, 10,
RateLimiter::Mode::kWritesOnly,
Env::Default(), false /* auto_tuned */);
WriteAmpBasedRateLimiter limiter(
port::kMaxInt64, 1000, 10, RateLimiter::Mode::kWritesOnly, Env::Default(),
false /* auto_tuned */, 1, 100, 10);
ASSERT_GT(limiter.GetSingleBurstBytes(), 1000000000ll);
}

Expand All @@ -42,7 +42,8 @@ TEST_F(WriteAmpBasedRateLimiterTest, Modes) {
RateLimiter::Mode::kReadsOnly, RateLimiter::Mode::kAllIo}) {
WriteAmpBasedRateLimiter limiter(
2000 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */,
10 /* fairness */, mode, Env::Default(), false /* auto_tuned */);
10 /* fairness */, mode, Env::Default(), false /* auto_tuned */,
1 /* secs_per_tune */, 100 /* smooth_window */, 10 /* recent_window */);
limiter.Request(1000 /* bytes */, Env::IO_HIGH, nullptr /* stats */,
RateLimiter::OpType::kRead);
if (mode == RateLimiter::Mode::kWritesOnly) {
Expand Down
Loading