Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
Signed-off-by: SpadeA-Tang <[email protected]>
  • Loading branch information
SpadeA-Tang committed Aug 14, 2023
1 parent c47e7d8 commit 004be9b
Showing 1 changed file with 127 additions and 1 deletion.
128 changes: 127 additions & 1 deletion db/db_write_buffer_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ TEST_P(DBWriteBufferManagerTest, SharedBufferAcrossCFs1) {
ASSERT_OK(Put(0, Key(2), DummyString(1), wo));
}

// Test Single DB with multiple writer threads get blocked when
// Test Single DB with single WriteBufferManager with multiple writer threads get blocked when
// WriteBufferManager execeeds buffer_size_ and flush is waiting to be
// finished.
TEST_P(DBWriteBufferManagerTest, SharedWriteBufferAcrossCFs2) {
Expand Down Expand Up @@ -182,6 +182,132 @@ TEST_P(DBWriteBufferManagerTest, SharedWriteBufferAcrossCFs2) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}

// Test Single DB with multiple WriteBufferManager with multiple writer threads get blocked when
// WriteBufferManagers execeeds buffer_size_ and flush is waiting to be
// finished.
TEST_P(DBWriteBufferManagerTest, SharedWriteBufferAcrossCFs3) {
Options options = CurrentOptions();
options.arena_block_size = 4096;
options.write_buffer_size = 500000; // this is never hit
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
ASSERT_LT(cache->GetUsage(), 256 * 1024);
cost_cache_ = GetParam();

if (cost_cache_) {
options.write_buffer_manager.push_back(
std::make_shared<WriteBufferManager>(100000, cache, 1.0));
options.write_buffer_manager.push_back(
std::make_shared<WriteBufferManager>(100000, cache, 1.0));
} else {
options.write_buffer_manager.push_back(
std::make_shared<WriteBufferManager>(100000, nullptr, 1.0));
options.write_buffer_manager.push_back(
std::make_shared<WriteBufferManager>(100000, nullptr, 1.0));
}
options.write_buffer_manager_map = {{"default", 0}, {"cf1", 0}, {"cf2", 0},
{"cf3", 0}, {"cf4", 1}, {"cf5", 1}};

WriteOptions wo;
wo.disableWAL = true;

CreateAndReopenWithCF({"cf1", "cf2", "cf3", "cf4", "cf5"}, options);
ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
Flush(3);
ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
Flush(0);

// Write to "Default", "cf2" and "cf3". No flush will be triggered.
ASSERT_OK(Put(3, Key(1), DummyString(30000), wo));
ASSERT_OK(Put(0, Key(1), DummyString(40000), wo));
ASSERT_OK(Put(2, Key(1), DummyString(1), wo));

ASSERT_OK(Put(3, Key(2), DummyString(40000), wo));
// WriteBufferManager::buffer_size_ has exceeded after the previous write is
// completed.

std::unordered_set<WriteThread::Writer*> w_set;
std::vector<port::Thread> threads;
int wait_count_db = 0;
int num_writers_total = 6;
int num_writers1 = 4;
InstrumentedMutex mutex;
InstrumentedCondVar cv(&mutex);
std::atomic<int> thread_num(0);

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0",
"DBImpl::BackgroundCallFlush:start"}});

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"WBMStallInterface::BlockDB", [&](void*) {
InstrumentedMutexLock lock(&mutex);
wait_count_db++;
cv.SignalAll();
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"WriteThread::WriteStall::Wait", [&](void* arg) {
InstrumentedMutexLock lock(&mutex);
WriteThread::Writer* w = reinterpret_cast<WriteThread::Writer*>(arg);
w_set.insert(w);
// Allow the flush to continue if all writer threads are blocked.
if (w_set.size() == (unsigned long)num_writers1) {
TEST_SYNC_POINT(
"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0");
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

bool s = true;

std::function<void(int)> writer = [&](int cf) {
int a = thread_num.fetch_add(1);
std::string key = "foo" + std::to_string(a);
Status tmp = Put(cf, Slice(key), DummyString(1), wo);
InstrumentedMutexLock lock(&mutex);
s = s && tmp.ok();
};

// Flow:
// main_writer thread will write but will be blocked (as Flush will on hold,
// buffer_size_ has exceeded, thus will create stall in effect).
// |
// |
// multiple writer threads will be created to write across multiple columns
// and they will be blocked.
// |
// |
// Last writer thread will write and when its blocked it will signal Flush to
// continue to clear the stall.

threads.emplace_back(writer, 1);
// Wait untill first thread (main_writer) writing to DB is blocked and then
// create the multiple writers which will be blocked from getting added to the
// queue because stall is in effect.
{
InstrumentedMutexLock lock(&mutex);
while (wait_count_db != 1) {
cv.Wait();
}
}
for (int i = 0; i < num_writers_total; i++) {
threads.emplace_back(writer, i % 6);
}
for (auto& t : threads) {
t.join();
}

ASSERT_TRUE(s);

// Number of DBs blocked.
ASSERT_EQ(wait_count_db, 1);
// Number of Writer threads blocked.
ASSERT_EQ(w_set.size(), num_writers1);

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}

TEST_P(DBWriteBufferManagerTest, FreeMemoryOnDestroy) {
Options options = CurrentOptions();
options.arena_block_size = 4096;
Expand Down

0 comments on commit 004be9b

Please sign in to comment.