Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue #934: multi-thread compaction support #976

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
340 changes: 218 additions & 122 deletions src/leveldb/db/db_impl.cc

Large diffs are not rendered by default.

28 changes: 20 additions & 8 deletions src/leveldb/db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ class DBImpl : public DB {
friend class DBTable;
struct CompactionState;
struct Writer;
struct CompactionTask {
int64_t id; // compaction thread id
double score; // compaction score
uint64_t timeout; // compaction task delay time
DBImpl* db;
};

Iterator* NewInternalIterator(const ReadOptions&,
SequenceNumber* latest_snapshot);
Expand All @@ -110,19 +116,19 @@ class DBImpl : public DB {

// Compact the in-memory write buffer to disk. Switches to a new
// log-file/memtable and writes a new descriptor iff successful.
Status CompactMemTable()
Status CompactMemTable(bool* sched_idle = NULL)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

函数的参数求注释啊,下同

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

描述改函数是否真正干活,避免空调度

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

防止空调度

EXCLUSIVE_LOCKS_REQUIRED(mutex_);

Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base)
Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base, uint64_t* number = NULL)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);

Status MakeRoomForWrite(bool force /* compact even if there is room? */)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);

void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
static void BGWork(void* db);
void BackgroundCall();
Status BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void BackgroundCall(CompactionTask* task);
Status BackgroundCompaction(bool* sched_idle) EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void CleanupCompaction(CompactionState* compact)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
Status DoCompactionWork(CompactionState* compact)
Expand Down Expand Up @@ -196,18 +202,24 @@ class DBImpl : public DB {
std::set<uint64_t> pending_outputs_;

// Has a background compaction been scheduled or is running?
bool bg_compaction_scheduled_;
double bg_compaction_score_;
uint64_t bg_compaction_timeout_;
int64_t bg_schedule_id_;
std::vector<CompactionTask*> bg_compaction_tasks_;
std::vector<double> bg_compaction_score_;
std::vector<int64_t> bg_schedule_id_;

// Information for a manual compaction
enum ManualCompactState {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

跪求注释,这几种状态的定义

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已改

kManualCompactIdle, // manual compact inited
kManualCompactConflict, // manual compact run simultaneously
kManualCompactWakeup, // restart delay compact task
};
struct ManualCompaction {
int level;
bool done;
bool being_sched;
const InternalKey* begin; // NULL means beginning of key range
const InternalKey* end; // NULL means end of key range
InternalKey tmp_storage; // Used to keep track of compaction progress
ManualCompactState compaction_conflict; // 0 == idle, 1 == conflict, 2 == wake
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compaction_conflict变量命名有点奇怪? 猜不出什么含义?

Copy link
Collaborator Author

@caijieming-ng caijieming-ng Jun 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

就是手动compact并发冲突

};
ManualCompaction* manual_compaction_;

Expand Down
17 changes: 0 additions & 17 deletions src/leveldb/db/db_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -311,22 +311,6 @@ Status DBTable::Init() {
uint32_t i = *it;
DBImpl* impl = lg_list_[i];
s = impl->RecoverLastDumpToLevel0(lg_edits[i]);

// LogAndApply to lg's manifest
if (s.ok()) {
MutexLock lock(&impl->mutex_);
s = impl->versions_->LogAndApply(lg_edits[i], &impl->mutex_);
if (s.ok()) {
impl->DeleteObsoleteFiles();
impl->MaybeScheduleCompaction();
} else {
Log(options_.info_log, "[%s] Fail to modify manifest of lg %d",
dbname_.c_str(),
i);
}
} else {
Log(options_.info_log, "[%s] Fail to dump log to level 0", dbname_.c_str());
}
delete lg_edits[i];
}

Expand Down Expand Up @@ -936,7 +920,6 @@ Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit,
}
}
delete file;

return status;
}

Expand Down
1 change: 1 addition & 0 deletions src/leveldb/db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, CompactStrategyFactory* com
: last_seq_(0),
comparator_(cmp),
refs_(0),
being_flushed_(false),
table_(comparator_, &arena_),
empty_(true),
compact_strategy_factory_(compact_strategy_factory) {
Expand Down
8 changes: 8 additions & 0 deletions src/leveldb/db/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ class MemTable {
empty_ = false;
}

bool BeingFlushed() { return being_flushed_;}
void SetBeingFlushed(bool flag) {
assert(flag ? !being_flushed_
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为什么要加assert?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

放置后续同学改次块代码改错

: being_flushed_);
being_flushed_ = flag;
}

virtual ~MemTable();

protected:
Expand All @@ -97,6 +104,7 @@ class MemTable {

KeyComparator comparator_;
int refs_;
bool being_flushed_;

Arena arena_;
Table table_;
Expand Down
4 changes: 3 additions & 1 deletion src/leveldb/db/version_edit.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ struct FileMetaData {
InternalKey largest; // Largest internal key served by table
bool smallest_fake; // smallest is not real, have out-of-range keys
bool largest_fake; // largest is not real, have out-of-range keys
bool being_compacted; // Is this file undergoing compaction?

FileMetaData() :
refs(0),
Expand All @@ -44,7 +45,8 @@ struct FileMetaData {
file_size(0),
data_size(0),
smallest_fake(false),
largest_fake(false) { }
largest_fake(false),
being_compacted(false) { }
};

class VersionEdit {
Expand Down
Loading