Skip to content

Commit

Permalink
issue #934: multithread compactiong support
Browse files Browse the repository at this point in the history
  • Loading branch information
caijieming committed Nov 21, 2016
1 parent b66c065 commit 5ec0c93
Show file tree
Hide file tree
Showing 10 changed files with 552 additions and 205 deletions.
310 changes: 199 additions & 111 deletions src/leveldb/db/db_impl.cc

Large diffs are not rendered by default.

27 changes: 20 additions & 7 deletions src/leveldb/db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ class DBImpl : public DB {
friend class DBTable;
struct CompactionState;
struct Writer;
struct CompactionTask {
int64_t id;
double score;
DBImpl* db;
};

Iterator* NewInternalIterator(const ReadOptions&,
SequenceNumber* latest_snapshot);
Expand All @@ -111,10 +116,10 @@ class DBImpl : public DB {

// Compact the in-memory write buffer to disk. Switches to a new
// log-file/memtable and writes a new descriptor iff successful.
Status CompactMemTable()
Status CompactMemTable(bool* sched_idle = NULL)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);

Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base)
Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base, uint64_t* number = NULL)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);

Status MakeRoomForWrite(bool force /* compact even if there is room? */)
Expand All @@ -123,8 +128,8 @@ class DBImpl : public DB {

void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
static void BGWork(void* db);
void BackgroundCall();
Status BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void BackgroundCall(CompactionTask* task);
Status BackgroundCompaction(bool* sched_idle) EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void CleanupCompaction(CompactionState* compact)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
Status DoCompactionWork(CompactionState* compact)
Expand Down Expand Up @@ -193,17 +198,25 @@ class DBImpl : public DB {
std::set<uint64_t> pending_outputs_;

// Has a background compaction been scheduled or is running?
bool bg_compaction_scheduled_;
double bg_compaction_score_;
int64_t bg_schedule_id_;
std::set<CompactionTask*> bg_compaction_tasks_;
int bg_compaction_scheduled_;
std::vector<double> bg_compaction_score_;
std::vector<int64_t> bg_schedule_id_;

// Information for a manual compaction
enum ManualCompactState {
kManualCompactIdle,
kManualCompactConflict,
kManualCompactWakeup,
};
struct ManualCompaction {
int level;
bool done;
bool being_sched;
const InternalKey* begin; // NULL means beginning of key range
const InternalKey* end; // NULL means end of key range
InternalKey tmp_storage; // Used to keep track of compaction progress
int compaction_conflict; // 0 == idle, 1 == conflict, 2 == wake
};
ManualCompaction* manual_compaction_;

Expand Down
20 changes: 1 addition & 19 deletions src/leveldb/db/db_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -292,22 +292,6 @@ Status DBTable::Init() {
uint32_t i = *it;
DBImpl* impl = lg_list_[i];
s = impl->RecoverLastDumpToLevel0(lg_edits[i]);

// LogAndApply to lg's manifest
if (s.ok()) {
MutexLock lock(&impl->mutex_);
s = impl->versions_->LogAndApply(lg_edits[i], &impl->mutex_);
if (s.ok()) {
impl->DeleteObsoleteFiles();
impl->MaybeScheduleCompaction();
} else {
Log(options_.info_log, "[%s] Fail to modify manifest of lg %d",
dbname_.c_str(),
i);
}
} else {
Log(options_.info_log, "[%s] Fail to dump log to level 0", dbname_.c_str());
}
delete lg_edits[i];
}

Expand Down Expand Up @@ -820,7 +804,6 @@ Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit,
if (this->status != NULL && this->status->ok()) *this->status = s;
}
};

mutex_.AssertHeld();

// Open the log file
Expand Down Expand Up @@ -860,7 +843,7 @@ Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit,
// dbname_.c_str(), batch_seq, last_sequence_, WriteBatchInternal::Count(&batch));
if (last_seq >= recover_limit) {
Log(options_.info_log, "[%s] exceed limit %lu, ignore %lu ~ %lu",
dbname_.c_str(), recover_limit, first_seq, last_seq);
dbname_.c_str(), recover_limit, first_seq, last_seq);
continue;
}

Expand Down Expand Up @@ -915,7 +898,6 @@ Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit,
}
}
delete file;

return status;
}

Expand Down
1 change: 1 addition & 0 deletions src/leveldb/db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, CompactStrategyFactory* com
: last_seq_(0),
comparator_(cmp),
refs_(0),
being_flushed_(false),
table_(comparator_, &arena_),
empty_(true),
compact_strategy_factory_(compact_strategy_factory) {
Expand Down
8 changes: 8 additions & 0 deletions src/leveldb/db/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ class MemTable {
empty_ = false;
}

bool BeingFlushed() { return being_flushed_;}
void SetBeingFlushed(bool flag) {
assert(flag ? !being_flushed_
: being_flushed_);
being_flushed_ = flag;
}

virtual ~MemTable();

protected:
Expand All @@ -97,6 +104,7 @@ class MemTable {

KeyComparator comparator_;
int refs_;
bool being_flushed_;

Arena arena_;
Table table_;
Expand Down
4 changes: 3 additions & 1 deletion src/leveldb/db/version_edit.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,16 @@ struct FileMetaData {
InternalKey largest; // Largest internal key served by table
bool smallest_fake; // smallest is not real, have out-of-range keys
bool largest_fake; // largest is not real, have out-of-range keys
bool being_compacted; // Is this file undergoing compaction?

FileMetaData() :
refs(0),
allowed_seeks(1 << 30),
file_size(0),
data_size(0),
smallest_fake(false),
largest_fake(false) { }
largest_fake(false),
being_compacted(false) { }
};

class VersionEdit {
Expand Down
Loading

0 comments on commit 5ec0c93

Please sign in to comment.