Skip to content

Commit

Permalink
issue #934: multithread compactiong support
Browse files Browse the repository at this point in the history
  • Loading branch information
caijieming committed Sep 7, 2016
1 parent b66c065 commit e61e91b
Show file tree
Hide file tree
Showing 10 changed files with 541 additions and 214 deletions.
284 changes: 179 additions & 105 deletions src/leveldb/db/db_impl.cc

Large diffs are not rendered by default.

19 changes: 13 additions & 6 deletions src/leveldb/db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ class DBImpl : public DB {

// Compact the in-memory write buffer to disk. Switches to a new
// log-file/memtable and writes a new descriptor iff successful.
Status CompactMemTable()
Status CompactMemTable(bool* sched_idle = NULL)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);

Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base)
Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base, uint64_t* number = NULL)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);

Status MakeRoomForWrite(bool force /* compact even if there is room? */)
Expand All @@ -124,7 +124,7 @@ class DBImpl : public DB {
void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
static void BGWork(void* db);
void BackgroundCall();
Status BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
Status BackgroundCompaction(bool* sched_idle) EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void CleanupCompaction(CompactionState* compact)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
Status DoCompactionWork(CompactionState* compact)
Expand Down Expand Up @@ -193,17 +193,24 @@ class DBImpl : public DB {
std::set<uint64_t> pending_outputs_;

// Has a background compaction been scheduled or is running?
bool bg_compaction_scheduled_;
double bg_compaction_score_;
int64_t bg_schedule_id_;
int bg_compaction_scheduled_;
std::vector<double> bg_compaction_score_;
std::vector<int64_t> bg_schedule_id_;

// Information for a manual compaction
enum ManualCompactState {
kManualCompactIdle,
kManualCompactConflict,
kManualCompactWakeup,
};
struct ManualCompaction {
int level;
bool done;
bool being_sched;
const InternalKey* begin; // NULL means beginning of key range
const InternalKey* end; // NULL means end of key range
InternalKey tmp_storage; // Used to keep track of compaction progress
int compaction_conflict; // 0 == idle, 1 == conflict, 2 == wake
};
ManualCompaction* manual_compaction_;

Expand Down
52 changes: 17 additions & 35 deletions src/leveldb/db/db_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,10 @@ Status DBTable::Init() {
std::vector<uint64_t> snapshot_sequence = options_.snapshots_sequence;
std::map<uint64_t, uint64_t> rollbacks = options_.rollbacks;
for (std::set<uint32_t>::iterator it = options_.exist_lg_list->begin();
it != options_.exist_lg_list->end() && s.ok(); ++it) {
it != options_.exist_lg_list->end() && s.ok(); ++it) {
uint32_t i = *it;
DBImpl* impl = new DBImpl(InitOptionsLG(options_, i),
dbname_ + "/" + Uint64ToString(i));
dbname_ + "/" + Uint64ToString(i));
lg_list_.push_back(impl);
lg_edits.push_back(new VersionEdit);
for (uint32_t i = 0; i < snapshot_sequence.size(); ++i) {
Expand All @@ -235,15 +235,15 @@ Status DBTable::Init() {

// recover SST
Log(options_.info_log, "[%s] start Recover lg%d, last_seq= %lu",
dbname_.c_str(), i, impl->GetLastSequence());
dbname_.c_str(), i, impl->GetLastSequence());
s = impl->Recover(lg_edits[i]);
Log(options_.info_log, "[%s] end Recover lg%d, last_seq= %lu",
dbname_.c_str(), i, impl->GetLastSequence());
dbname_.c_str(), i, impl->GetLastSequence());
if (s.ok()) {
uint64_t last_seq = impl->GetLastSequence();

Log(options_.info_log,
"[%s] Recover lg %d last_log_seq= %lu", dbname_.c_str(), i, last_seq);
"[%s] Recover lg %d last_log_seq= %lu", dbname_.c_str(), i, last_seq);
if (min_log_sequence > last_seq) {
min_log_sequence = last_seq;
}
Expand Down Expand Up @@ -279,7 +279,7 @@ Status DBTable::Init() {
s = RecoverLogFile(logfiles[i], recover_limit, &lg_edits);
if (!s.ok()) {
Log(options_.info_log, "[%s] Fail to RecoverLogFile %ld",
dbname_.c_str(), logfiles[i]);
dbname_.c_str(), logfiles[i]);
}
}
} else {
Expand All @@ -292,22 +292,6 @@ Status DBTable::Init() {
uint32_t i = *it;
DBImpl* impl = lg_list_[i];
s = impl->RecoverLastDumpToLevel0(lg_edits[i]);

// LogAndApply to lg's manifest
if (s.ok()) {
MutexLock lock(&impl->mutex_);
s = impl->versions_->LogAndApply(lg_edits[i], &impl->mutex_);
if (s.ok()) {
impl->DeleteObsoleteFiles();
impl->MaybeScheduleCompaction();
} else {
Log(options_.info_log, "[%s] Fail to modify manifest of lg %d",
dbname_.c_str(),
i);
}
} else {
Log(options_.info_log, "[%s] Fail to dump log to level 0", dbname_.c_str());
}
delete lg_edits[i];
}

Expand All @@ -325,18 +309,18 @@ Status DBTable::Init() {
log_ = new log::AsyncWriter(logfile_, options_.log_async_mode);
} else {
Log(options_.info_log, "[%s] fail to open logfile %s",
dbname_.c_str(), log_file_name.c_str());
dbname_.c_str(), log_file_name.c_str());
}
}

if (s.ok()) {
state_ = kOpened;
Log(options_.info_log, "[%s] custom compact strategy: %s, flush trigger %lu",
dbname_.c_str(), options_.compact_strategy_factory->Name(),
options_.flush_triggered_log_num);
dbname_.c_str(), options_.compact_strategy_factory->Name(),
options_.flush_triggered_log_num);

Log(options_.info_log, "[%s] Init() done, last_seq=%llu", dbname_.c_str(),
static_cast<unsigned long long>(last_sequence_));
static_cast<unsigned long long>(last_sequence_));
} else {
for (uint32_t i = 0; i != lg_list_.size(); ++i) {
delete lg_list_[i];
Expand Down Expand Up @@ -815,12 +799,11 @@ Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit,
Status* status; // NULL if options_.paranoid_checks==false
virtual void Corruption(size_t bytes, const Status& s) {
Log(info_log, "%s%s: dropping %d bytes; %s",
(this->status == NULL ? "(ignoring error) " : ""),
fname, static_cast<int>(bytes), s.ToString().c_str());
(this->status == NULL ? "(ignoring error) " : ""),
fname, static_cast<int>(bytes), s.ToString().c_str());
if (this->status != NULL && this->status->ok()) *this->status = s;
}
};

mutex_.AssertHeld();

// Open the log file
Expand All @@ -839,9 +822,9 @@ Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit,
reporter.fname = fname.c_str();
reporter.status = (options_.paranoid_checks ? &status : NULL);
log::Reader reader(file, &reporter, true/*checksum*/,
0/*initial_offset*/);
0/*initial_offset*/);
Log(options_.info_log, "[%s] Recovering log #%lx, sequence limit %lu",
dbname_.c_str(), log_number, recover_limit);
dbname_.c_str(), log_number, recover_limit);

// Read all the records and add to a memtable
std::string scratch;
Expand All @@ -850,7 +833,7 @@ Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit,
while (reader.ReadRecord(&record, &scratch) && status.ok()) {
if (record.size() < 12) {
reporter.Corruption(record.size(),
Status::Corruption("log record too small"));
Status::Corruption("log record too small"));
continue;
}
WriteBatchInternal::SetContents(&batch, record);
Expand All @@ -860,7 +843,7 @@ Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit,
// dbname_.c_str(), batch_seq, last_sequence_, WriteBatchInternal::Count(&batch));
if (last_seq >= recover_limit) {
Log(options_.info_log, "[%s] exceed limit %lu, ignore %lu ~ %lu",
dbname_.c_str(), recover_limit, first_seq, last_seq);
dbname_.c_str(), recover_limit, first_seq, last_seq);
continue;
}

Expand Down Expand Up @@ -899,7 +882,7 @@ Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit,
Status lg_s = lg_list_[i]->RecoverInsertMem(lg_updates[i], (*edit_list)[i]);
if (!lg_s.ok()) {
Log(options_.info_log, "[%s] recover log fail batch first= %lu, last= %lu\n",
dbname_.c_str(), first, last);
dbname_.c_str(), first, last);
status = lg_s;
}
}
Expand All @@ -915,7 +898,6 @@ Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit,
}
}
delete file;

return status;
}

Expand Down
1 change: 1 addition & 0 deletions src/leveldb/db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, CompactStrategyFactory* com
: last_seq_(0),
comparator_(cmp),
refs_(0),
being_flushed_(false),
table_(comparator_, &arena_),
empty_(true),
compact_strategy_factory_(compact_strategy_factory) {
Expand Down
8 changes: 8 additions & 0 deletions src/leveldb/db/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ class MemTable {
empty_ = false;
}

bool BeingFlushed() { return being_flushed_;}
void SetBeingFlushed(bool flag) {
assert(flag ? !being_flushed_
: being_flushed_);
being_flushed_ = flag;
}

virtual ~MemTable();

protected:
Expand All @@ -97,6 +104,7 @@ class MemTable {

KeyComparator comparator_;
int refs_;
bool being_flushed_;

Arena arena_;
Table table_;
Expand Down
4 changes: 3 additions & 1 deletion src/leveldb/db/version_edit.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,16 @@ struct FileMetaData {
InternalKey largest; // Largest internal key served by table
bool smallest_fake; // smallest is not real, have out-of-range keys
bool largest_fake; // largest is not real, have out-of-range keys
bool being_compacted; // Is this file undergoing compaction?

FileMetaData() :
refs(0),
allowed_seeks(1 << 30),
file_size(0),
data_size(0),
smallest_fake(false),
largest_fake(false) { }
largest_fake(false),
being_compacted(false) { }
};

class VersionEdit {
Expand Down
Loading

0 comments on commit e61e91b

Please sign in to comment.