Skip to content

Commit

Permalink
Apply Titan related changes (#388)
Browse files Browse the repository at this point in the history
 

Signed-off-by: Yang Zhang <[email protected]>
  • Loading branch information
v01dstar authored Oct 2, 2024
1 parent 56fb1ab commit 528b66b
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 81 deletions.
8 changes: 8 additions & 0 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
}
}

ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
const ColumnFamilyHandleImpl& other)
: cfd_(other.cfd_), db_(other.db_), mutex_(other.mutex_) {
if (cfd_ != nullptr) {
cfd_->Ref();
}
}

ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
if (cfd_ != nullptr) {
for (auto& listener : cfd_->ioptions()->listeners) {
Expand Down
1 change: 1 addition & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ class ColumnFamilyHandleImpl : public ColumnFamilyHandle {
// create while holding the mutex
ColumnFamilyHandleImpl(ColumnFamilyData* cfd, DBImpl* db,
InstrumentedMutex* mutex);
ColumnFamilyHandleImpl(const ColumnFamilyHandleImpl& other);
// destroy without mutex
virtual ~ColumnFamilyHandleImpl();
virtual ColumnFamilyData* cfd() const { return cfd_; }
Expand Down
8 changes: 2 additions & 6 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "db/write_batch_internal.h"
#include "env/mock_env.h"
#include "file/filename.h"
#include "monitoring/statistics_impl.h"
#include "monitoring/thread_status_util.h"
#include "port/port.h"
#include "port/stack_trace.h"
Expand Down Expand Up @@ -1161,7 +1162,6 @@ class DelayFilterFactory : public CompactionFilterFactory {
};
} // anonymous namespace


static std::string CompressibleString(Random* rnd, int len) {
std::string r;
test::CompressibleString(rnd, 0.8, len, &r);
Expand Down Expand Up @@ -4351,7 +4351,6 @@ TEST_F(DBTest, ConcurrentMemtableNotSupported) {
ASSERT_NOK(db_->CreateColumnFamily(cf_options, "name", &handle));
}


TEST_F(DBTest, SanitizeNumThreads) {
for (int attempt = 0; attempt < 2; attempt++) {
const size_t kTotalTasks = 8;
Expand Down Expand Up @@ -5721,7 +5720,6 @@ TEST_F(DBTest, FileCreationRandomFailure) {
}
}


TEST_F(DBTest, DynamicMiscOptions) {
// Test max_sequential_skip_in_iterations
Options options;
Expand Down Expand Up @@ -6101,7 +6099,7 @@ TEST_P(DBTestWithParam, FilterCompactionTimeTest) {
// CPUMicros() is not supported. See WinClock::CPUMicros().
TEST_P(DBTestWithParam, CompactionTotalTimeTest) {
int record_count = 0;
class TestStatistics : public StatisticsImpl {
class TestStatistics : public StatisticsImpl<> {
public:
explicit TestStatistics(int* record_count)
: StatisticsImpl(nullptr), record_count_(record_count) {}
Expand Down Expand Up @@ -7178,7 +7176,6 @@ TEST_F(DBTest, ReusePinnableSlice) {
1);
}


TEST_F(DBTest, DeletingOldWalAfterDrop) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"Test:AllowFlushes", "DBImpl::BGWorkFlush"},
Expand Down Expand Up @@ -7303,7 +7300,6 @@ TEST_F(DBTest, LargeBlockSizeTest) {
ASSERT_NOK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
}


TEST_F(DBTest, CreationTimeOfOldestFile) {
const int kNumKeysPerFile = 32;
const int kNumLevelFiles = 2;
Expand Down
4 changes: 3 additions & 1 deletion db/dbformat.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace ROCKSDB_NAMESPACE {
// and the value type is embedded as the low 8 bits in the sequence
// number in internal keys, we need to use the highest-numbered
// ValueType, not the lowest).
const ValueType kValueTypeForSeek = kTypeWideColumnEntity;
const ValueType kValueTypeForSeek = kTypeBlobIndex;
const ValueType kValueTypeForSeekForPrev = kTypeDeletion;
const std::string kDisableUserTimestamp("");

Expand All @@ -48,6 +48,8 @@ EntryType GetEntryType(ValueType value_type) {
return kEntryBlobIndex;
case kTypeWideColumnEntity:
return kEntryWideColumnEntity;
case kTypeTitanBlobIndex:
return kEntryBlobIndex;
default:
return kEntryOther;
}
Expand Down
3 changes: 2 additions & 1 deletion db/dbformat.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ enum ValueType : unsigned char {
kTypeColumnFamilyRangeDeletion = 0xE, // WAL only.
kTypeRangeDeletion = 0xF, // meta block
kTypeColumnFamilyBlobIndex = 0x10, // Blob DB only
kTypeBlobIndex = 0x11, // Blob DB only
kTypeTitanBlobIndex = 0x11, // Titan Blob DB only
// When the prepared record is also persisted in db, we use a different
// record. This is to ensure that the WAL that is generated by a WritePolicy
// is not mistakenly read by another, which would result into data
Expand All @@ -68,6 +68,7 @@ enum ValueType : unsigned char {
kTypeCommitXIDAndTimestamp = 0x15, // WAL only
kTypeWideColumnEntity = 0x16,
kTypeColumnFamilyWideColumnEntity = 0x17, // WAL only
kTypeBlobIndex = 0x18, // RocksDB native Blob DB only
kTypeMaxValid, // Should be after the last valid type, only used for
// validation
kMaxValue = 0x7F // Not used for storing records.
Expand Down
2 changes: 2 additions & 0 deletions include/rocksdb/statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,8 @@ class Statistics : public Customizable {
};

// Create a concrete DBStatistics object
template <uint32_t TICKER_MAX = TICKER_ENUM_MAX,
uint32_t HISTOGRAM_MAX = HISTOGRAM_ENUM_MAX>
std::shared_ptr<Statistics> CreateDBStatistics();

} // namespace ROCKSDB_NAMESPACE
2 changes: 1 addition & 1 deletion java/rocksjni/statisticsjni.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

namespace ROCKSDB_NAMESPACE {

class StatisticsJni : public StatisticsImpl {
class StatisticsJni : public StatisticsImpl<> {
public:
StatisticsJni(std::shared_ptr<Statistics> stats);
StatisticsJni(std::shared_ptr<Statistics> stats,
Expand Down
108 changes: 68 additions & 40 deletions monitoring/statistics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -343,17 +343,14 @@ const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {
"rocksdb.table.open.prefetch.tail.read.bytes"},
};

std::shared_ptr<Statistics> CreateDBStatistics() {
return std::make_shared<StatisticsImpl>(nullptr);
}

static int RegisterBuiltinStatistics(ObjectLibrary& library,
const std::string& /*arg*/) {
library.AddFactory<Statistics>(
StatisticsImpl::kClassName(),
StatisticsImpl<TICKER_ENUM_MAX, HISTOGRAM_ENUM_MAX>::kClassName(),
[](const std::string& /*uri*/, std::unique_ptr<Statistics>* guard,
std::string* /* errmsg */) {
guard->reset(new StatisticsImpl(nullptr));
guard->reset(
new StatisticsImpl<TICKER_ENUM_MAX, HISTOGRAM_ENUM_MAX>(nullptr));
return guard->get();
});
return 1;
Expand All @@ -367,8 +364,10 @@ Status Statistics::CreateFromString(const ConfigOptions& config_options,
RegisterBuiltinStatistics(*(ObjectLibrary::Default().get()), "");
});
Status s;
if (id == "" || id == StatisticsImpl::kClassName()) {
result->reset(new StatisticsImpl(nullptr));
if (id == "" ||
id == StatisticsImpl<TICKER_ENUM_MAX, HISTOGRAM_ENUM_MAX>::kClassName()) {
result->reset(
new StatisticsImpl<TICKER_ENUM_MAX, HISTOGRAM_ENUM_MAX>(nullptr));
} else if (id == kNullptrString) {
result->reset();
} else {
Expand All @@ -383,36 +382,46 @@ static std::unordered_map<std::string, OptionTypeInfo> stats_type_info = {
OptionTypeFlags::kCompareNever)},
};

StatisticsImpl::StatisticsImpl(std::shared_ptr<Statistics> stats)
template <uint32_t TICKER_MAX, uint32_t HISTOGRAM_MAX>
StatisticsImpl<TICKER_MAX, HISTOGRAM_MAX>::StatisticsImpl(
std::shared_ptr<Statistics> stats)
: stats_(std::move(stats)) {
RegisterOptions("StatisticsOptions", &stats_, &stats_type_info);
}

StatisticsImpl::~StatisticsImpl() = default;
template <uint32_t TICKER_MAX, uint32_t HISTOGRAM_MAX>
StatisticsImpl<TICKER_MAX, HISTOGRAM_MAX>::~StatisticsImpl() = default;

uint64_t StatisticsImpl::getTickerCount(uint32_t tickerType) const {
template <uint32_t TICKER_MAX, uint32_t HISTOGRAM_MAX>
uint64_t StatisticsImpl<TICKER_MAX, HISTOGRAM_MAX>::getTickerCount(
uint32_t tickerType) const {
MutexLock lock(&aggregate_lock_);
return getTickerCountLocked(tickerType);
}

uint64_t StatisticsImpl::getTickerCountLocked(uint32_t tickerType) const {
assert(tickerType < TICKER_ENUM_MAX);
template <uint32_t TICKER_MAX, uint32_t HISTOGRAM_MAX>
uint64_t StatisticsImpl<TICKER_MAX, HISTOGRAM_MAX>::getTickerCountLocked(
uint32_t tickerType) const {
assert(tickerType < TICKER_MAX);
uint64_t res = 0;
for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
res += per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType];
}
return res;
}

void StatisticsImpl::histogramData(uint32_t histogramType,
HistogramData* const data) const {
template <uint32_t TICKER_MAX, uint32_t HISTOGRAM_MAX>
void StatisticsImpl<TICKER_MAX, HISTOGRAM_MAX>::histogramData(
uint32_t histogramType, HistogramData* const data) const {
MutexLock lock(&aggregate_lock_);
getHistogramImplLocked(histogramType)->Data(data);
}

std::unique_ptr<HistogramImpl> StatisticsImpl::getHistogramImplLocked(
template <uint32_t TICKER_MAX, uint32_t HISTOGRAM_MAX>
std::unique_ptr<HistogramImpl>
StatisticsImpl<TICKER_MAX, HISTOGRAM_MAX>::getHistogramImplLocked(
uint32_t histogramType) const {
assert(histogramType < HISTOGRAM_ENUM_MAX);
assert(histogramType < HISTOGRAM_MAX);
std::unique_ptr<HistogramImpl> res_hist(new HistogramImpl());
for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
res_hist->Merge(
Expand All @@ -421,23 +430,29 @@ std::unique_ptr<HistogramImpl> StatisticsImpl::getHistogramImplLocked(
return res_hist;
}

std::string StatisticsImpl::getHistogramString(uint32_t histogramType) const {
template <uint32_t TICKER_MAX, uint32_t HISTOGRAM_MAX>
std::string StatisticsImpl<TICKER_MAX, HISTOGRAM_MAX>::getHistogramString(
uint32_t histogramType) const {
MutexLock lock(&aggregate_lock_);
return getHistogramImplLocked(histogramType)->ToString();
}

void StatisticsImpl::setTickerCount(uint32_t tickerType, uint64_t count) {
template <uint32_t TICKER_MAX, uint32_t HISTOGRAM_MAX>
void StatisticsImpl<TICKER_MAX, HISTOGRAM_MAX>::setTickerCount(
uint32_t tickerType, uint64_t count) {
{
MutexLock lock(&aggregate_lock_);
setTickerCountLocked(tickerType, count);
}
if (stats_ && tickerType < TICKER_ENUM_MAX) {
if (stats_ && tickerType < TICKER_MAX) {
stats_->setTickerCount(tickerType, count);
}
}

void StatisticsImpl::setTickerCountLocked(uint32_t tickerType, uint64_t count) {
assert(tickerType < TICKER_ENUM_MAX);
template <uint32_t TICKER_MAX, uint32_t HISTOGRAM_MAX>
void StatisticsImpl<TICKER_MAX, HISTOGRAM_MAX>::setTickerCountLocked(
uint32_t tickerType, uint64_t count) {
assert(tickerType < TICKER_MAX);
for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
if (core_idx == 0) {
per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType] = count;
Expand All @@ -447,28 +462,32 @@ void StatisticsImpl::setTickerCountLocked(uint32_t tickerType, uint64_t count) {
}
}

uint64_t StatisticsImpl::getAndResetTickerCount(uint32_t tickerType) {
template <uint32_t TICKER_MAX, uint32_t HISTOGRAM_MAX>
uint64_t StatisticsImpl<TICKER_MAX, HISTOGRAM_MAX>::getAndResetTickerCount(
uint32_t tickerType) {
uint64_t sum = 0;
{
MutexLock lock(&aggregate_lock_);
assert(tickerType < TICKER_ENUM_MAX);
assert(tickerType < TICKER_MAX);
for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
sum +=
per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType].exchange(
0, std::memory_order_relaxed);
}
}
if (stats_ && tickerType < TICKER_ENUM_MAX) {
if (stats_ && tickerType < TICKER_MAX) {
stats_->setTickerCount(tickerType, 0);
}
return sum;
}

void StatisticsImpl::recordTick(uint32_t tickerType, uint64_t count) {
template <uint32_t TICKER_MAX, uint32_t HISTOGRAM_MAX>
void StatisticsImpl<TICKER_MAX, HISTOGRAM_MAX>::recordTick(uint32_t tickerType,
uint64_t count) {
if (get_stats_level() <= StatsLevel::kExceptTickers) {
return;
}
if (tickerType < TICKER_ENUM_MAX) {
if (tickerType < TICKER_MAX) {
per_core_stats_.Access()->tickers_[tickerType].fetch_add(
count, std::memory_order_relaxed);
if (stats_) {
Expand All @@ -479,23 +498,26 @@ void StatisticsImpl::recordTick(uint32_t tickerType, uint64_t count) {
}
}

void StatisticsImpl::recordInHistogram(uint32_t histogramType, uint64_t value) {
assert(histogramType < HISTOGRAM_ENUM_MAX);
template <uint32_t TICKER_MAX, uint32_t HISTOGRAM_MAX>
void StatisticsImpl<TICKER_MAX, HISTOGRAM_MAX>::recordInHistogram(
uint32_t histogramType, uint64_t value) {
assert(histogramType < HISTOGRAM_MAX);
if (get_stats_level() <= StatsLevel::kExceptHistogramOrTimers) {
return;
}
per_core_stats_.Access()->histograms_[histogramType].Add(value);
if (stats_ && histogramType < HISTOGRAM_ENUM_MAX) {
if (stats_ && histogramType < HISTOGRAM_MAX) {
stats_->recordInHistogram(histogramType, value);
}
}

Status StatisticsImpl::Reset() {
template <uint32_t TICKER_MAX, uint32_t HISTOGRAM_MAX>
Status StatisticsImpl<TICKER_MAX, HISTOGRAM_MAX>::Reset() {
MutexLock lock(&aggregate_lock_);
for (uint32_t i = 0; i < TICKER_ENUM_MAX; ++i) {
for (uint32_t i = 0; i < TICKER_MAX; ++i) {
setTickerCountLocked(i, 0);
}
for (uint32_t i = 0; i < HISTOGRAM_ENUM_MAX; ++i) {
for (uint32_t i = 0; i < HISTOGRAM_MAX; ++i) {
for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
per_core_stats_.AccessAtCore(core_idx)->histograms_[i].Clear();
}
Expand All @@ -510,19 +532,20 @@ const int kTmpStrBufferSize = 200;

} // namespace

std::string StatisticsImpl::ToString() const {
template <uint32_t TICKER_MAX, uint32_t HISTOGRAM_MAX>
std::string StatisticsImpl<TICKER_MAX, HISTOGRAM_MAX>::ToString() const {
MutexLock lock(&aggregate_lock_);
std::string res;
res.reserve(20000);
for (const auto& t : TickersNameMap) {
assert(t.first < TICKER_ENUM_MAX);
assert(t.first < TICKER_MAX);
char buffer[kTmpStrBufferSize];
snprintf(buffer, kTmpStrBufferSize, "%s COUNT : %" PRIu64 "\n",
t.second.c_str(), getTickerCountLocked(t.first));
res.append(buffer);
}
for (const auto& h : HistogramsNameMap) {
assert(h.first < HISTOGRAM_ENUM_MAX);
assert(h.first < HISTOGRAM_MAX);
char buffer[kTmpStrBufferSize];
HistogramData hData;
getHistogramImplLocked(h.first)->Data(&hData);
Expand All @@ -544,7 +567,8 @@ std::string StatisticsImpl::ToString() const {
return res;
}

bool StatisticsImpl::getTickerMap(
template <uint32_t TICKER_MAX, uint32_t HISTOGRAM_MAX>
bool StatisticsImpl<TICKER_MAX, HISTOGRAM_MAX>::getTickerMap(
std::map<std::string, uint64_t>* stats_map) const {
assert(stats_map);
if (!stats_map) {
Expand All @@ -553,14 +577,18 @@ bool StatisticsImpl::getTickerMap(
stats_map->clear();
MutexLock lock(&aggregate_lock_);
for (const auto& t : TickersNameMap) {
assert(t.first < TICKER_ENUM_MAX);
assert(t.first < TICKER_MAX);
(*stats_map)[t.second.c_str()] = getTickerCountLocked(t.first);
}
return true;
}

bool StatisticsImpl::HistEnabledForType(uint32_t type) const {
return type < HISTOGRAM_ENUM_MAX;
template <uint32_t TICKER_MAX, uint32_t HISTOGRAM_MAX>
bool StatisticsImpl<TICKER_MAX, HISTOGRAM_MAX>::HistEnabledForType(
uint32_t type) const {
return type < HISTOGRAM_MAX;
}

template class StatisticsImpl<TICKER_ENUM_MAX, HISTOGRAM_ENUM_MAX>;

} // namespace ROCKSDB_NAMESPACE
Loading

0 comments on commit 528b66b

Please sign in to comment.