Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

coroutines+io uring #325

Draft
wants to merge 1 commit into
base: 6.29.tikv
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ else()
endif()

if( NOT DEFINED CMAKE_CXX_STANDARD )
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD 20)
endif()

include(CMakeDependentOption)
Expand Down
10 changes: 6 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ MACHINE ?= $(shell uname -m)
ARFLAGS = ${EXTRA_ARFLAGS} rs
STRIPFLAGS = -S -x

CXXFLAGS += -fcoroutines

# Transform parallel LOG output into something more readable.
perl_command = perl -n \
-e '@a=split("\t",$$_,-1); $$t=$$a[8];' \
Expand All @@ -46,7 +48,7 @@ quoted_perl_command = $(subst ','\'',$(perl_command))
# `make install`

# Set the default DEBUG_LEVEL to 1
DEBUG_LEVEL?=1
DEBUG_LEVEL?=2

# LIB_MODE says whether or not to use/build "shared" or "static" libraries.
# Mode "static" means to link against static libraries (.a)
Expand All @@ -69,7 +71,7 @@ else ifneq ($(filter shared_lib install-shared, $(MAKECMDGOALS)),)
DEBUG_LEVEL=0
LIB_MODE=shared
else ifneq ($(filter static_lib install-static, $(MAKECMDGOALS)),)
DEBUG_LEVEL=0
DEBUG_LEVEL=2
LIB_MODE=static
else ifneq ($(filter jtest rocksdbjava%, $(MAKECMDGOALS)),)
OBJ_DIR=jl
Expand Down Expand Up @@ -145,15 +147,15 @@ ifeq ($(DEBUG_LEVEL),0)
OPT += -DNDEBUG

ifneq ($(USE_RTTI), 1)
CXXFLAGS += -fno-rtti
# CXXFLAGS += -fno-rtti
else
CXXFLAGS += -DROCKSDB_USE_RTTI
endif
else
ifneq ($(USE_RTTI), 0)
CXXFLAGS += -DROCKSDB_USE_RTTI
else
CXXFLAGS += -fno-rtti
# CXXFLAGS += -fno-rtti
endif

ifdef ASSERT_STATUS_CHECKED
Expand Down
2 changes: 2 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ cpp_library(
"cache/lru_cache.cc",
"cache/sharded_cache.cc",
"db/arena_wrapped_db_iter.cc",
"db/async_future.cc",
"db/blob/blob_fetcher.cc",
"db/blob/blob_file_addition.cc",
"db/blob/blob_file_builder.cc",
Expand Down Expand Up @@ -483,6 +484,7 @@ cpp_library(
"cache/lru_cache.cc",
"cache/sharded_cache.cc",
"db/arena_wrapped_db_iter.cc",
"db/async_future.cc",
"db/blob/blob_fetcher.cc",
"db/blob/blob_file_addition.cc",
"db/blob/blob_file_builder.cc",
Expand Down
4 changes: 2 additions & 2 deletions build_tools/build_detect_platform
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fi
if [ "$ROCKSDB_CXX_STANDARD" ]; then
PLATFORM_CXXFLAGS="-std=$ROCKSDB_CXX_STANDARD"
else
PLATFORM_CXXFLAGS="-std=c++11"
PLATFORM_CXXFLAGS="-std=c++20"
fi

# we currently depend on POSIX platform
Expand Down Expand Up @@ -246,7 +246,7 @@ EOF
Cygwin)
PLATFORM=CYGWIN
PLATFORM_SHARED_CFLAGS=""
PLATFORM_CXXFLAGS="-std=gnu++11"
PLATFORM_CXXFLAGS="-std=gnu++20"
COMMON_FLAGS="$COMMON_FLAGS -DCYGWIN"
if [ -z "$USE_CLANG" ]; then
COMMON_FLAGS="$COMMON_FLAGS -fno-builtin-memcmp"
Expand Down
2 changes: 1 addition & 1 deletion db/arena_wrapped_db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ Status ArenaWrappedDBIter::Refresh() {
range_del_iter.reset(
sv->mem->NewRangeTombstoneIterator(read_options_, latest_seq));
range_del_agg->AddTombstones(std::move(range_del_iter));
cfd_->ReturnThreadLocalSuperVersion(sv);
cfd_->ReturnThreadLocalSuperVersion(db_impl_, sv);
}
// Refresh latest sequence number
db_iter_->set_sequence(latest_seq);
Expand Down
41 changes: 35 additions & 6 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,10 @@ SuperVersion* SuperVersion::Ref() {
return this;
}

uint32_t SuperVersion::GetRef() const {
return refs.load();
}

bool SuperVersion::Unref() {
// fetch_sub returns the previous value of ref
uint32_t previous_refs = refs.fetch_sub(1);
Expand Down Expand Up @@ -1200,7 +1204,7 @@ Compaction* ColumnFamilyData::CompactRange(
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(DBImpl* db) {
SuperVersion* sv = GetThreadLocalSuperVersion(db);
sv->Ref();
if (!ReturnThreadLocalSuperVersion(sv)) {
if (!ReturnThreadLocalSuperVersion(db, sv)) {
// This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion()
// when the thread-local pointer was populated. So, the Ref() earlier in
// this function still prevents the returned SuperVersion* from being
Expand All @@ -1219,7 +1223,7 @@ SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
// local pointer to guarantee exclusive access. If the thread local pointer
// is being used while a new SuperVersion is installed, the cached
// SuperVersion can become stale. In that case, the background thread would
// have swapped in kSVObsolete. We re-check the value at when returning
// have swapped in kSVObsolete. We re-check the value when returning
// SuperVersion back to thread local, with an atomic compare and swap.
// The superversion will need to be released if detected to be stale.
void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
Expand All @@ -1228,7 +1232,11 @@ SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
// (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
// should only keep kSVInUse before ReturnThreadLocalSuperVersion call
// (if no Scrape happens).
assert(ptr != SuperVersion::kSVInUse);
if (ptr == SuperVersion::kSVInUse) {
// FIXME: Check version number too.
return super_version_->Ref();
}

SuperVersion* sv = static_cast<SuperVersion*>(ptr);
if (sv == SuperVersion::kSVObsolete ||
sv->version_number != super_version_number_.load()) {
Expand Down Expand Up @@ -1259,22 +1267,42 @@ SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
return sv;
}

bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
bool ColumnFamilyData::ReturnThreadLocalSuperVersion(DBImpl* db, SuperVersion* sv) {
assert(sv != nullptr);

// Put the SuperVersion back
void* expected = SuperVersion::kSVInUse;

if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
// When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
// storage has not been altered and no Scrape has happened. The
// SuperVersion is still current.
return true;
} else if (expected != nullptr) {
// A scrape has happened, we have to adjust the refs in both
// the super version and CFD that it refers to.
// FIXME: This assumes the SVs "release" are not interleaved.
auto ptr = static_cast<SuperVersion*>(local_sv_->Swap(sv));
if (sv != ptr) {
db->mutex()->Lock();
assert(ptr != super_version_);
bool last_ref __attribute__((__unused__));
last_ref = ptr->Unref();
assert(last_ref);
refs_.fetch_sub(1);
ptr->Cleanup();
db->mutex()->Unlock();
// delete ptr;
} else {
sv->Unref();
}
return true;
} else {
// ThreadLocal scrape happened in the process of this GetImpl call (after
// thread local Swap() at the beginning and before CompareAndSwap()).
// This means the SuperVersion it holds is obsolete.
assert(expected == SuperVersion::kSVObsolete);
return false;
}
return false;
}

void ColumnFamilyData::InstallSuperVersion(
Expand Down Expand Up @@ -1329,6 +1357,7 @@ void ColumnFamilyData::ResetThreadLocalSuperVersions() {
continue;
}
auto sv = static_cast<SuperVersion*>(ptr);

bool was_last_ref __attribute__((__unused__));
was_last_ref = sv->Unref();
// sv couldn't have been the last reference because
Expand Down
9 changes: 7 additions & 2 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ struct SuperVersion {
// should be called outside the mutex
SuperVersion() = default;
~SuperVersion();

uint32_t GetRef() const;

SuperVersion* Ref();
// If Unref() returns true, Cleanup() should be called with mutex held
// before deleting this SuperVersion.
Expand Down Expand Up @@ -274,7 +277,9 @@ class ColumnFamilyData {
// Ref() can only be called from a context where the caller can guarantee
// that ColumnFamilyData is alive (while holding a non-zero ref already,
// holding a DB mutex, or as the leader in a write batch group).
void Ref() { refs_.fetch_add(1); }
void Ref() {
refs_.fetch_add(1);
}

// UnrefAndTryDelete() decreases the reference count and do free if needed,
// return true if this is freed else false, UnrefAndTryDelete() can only
Expand Down Expand Up @@ -440,7 +445,7 @@ class ColumnFamilyData {
// Try to return SuperVersion back to thread local storage. Return true on
// success and false on failure. It fails when the thread local storage
// contains anything other than SuperVersion::kSVInUse flag.
bool ReturnThreadLocalSuperVersion(SuperVersion* sv);
bool ReturnThreadLocalSuperVersion(DBImpl* db, SuperVersion* sv);
// thread-safe
uint64_t GetSuperVersionNumber() const {
return super_version_number_.load();
Expand Down
Loading