diff --git a/src/leveldb/db/db_impl.cc b/src/leveldb/db/db_impl.cc index 2d00657ee..852495df4 100644 --- a/src/leveldb/db/db_impl.cc +++ b/src/leveldb/db/db_impl.cc @@ -146,9 +146,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) log_(NULL), bound_log_size_(0), tmp_batch_(new WriteBatch), - bg_compaction_scheduled_(false), - bg_compaction_score_(0), - bg_schedule_id_(0), + bg_compaction_scheduled_(0), manual_compaction_(NULL), consecutive_compaction_errors_(0), flush_on_destroy_(false) { @@ -173,9 +171,6 @@ Status DBImpl::Shutdown1() { shutting_down_.Release_Store(this); // Any non-NULL value is ok Log(options_.info_log, "[%s] wait bg compact finish", dbname_.c_str()); - if (bg_compaction_scheduled_) { - env_->ReSchedule(bg_schedule_id_, kDumpMemTableUrgentScore); - } while (bg_compaction_scheduled_) { bg_cv_.Wait(); } @@ -293,12 +288,19 @@ void DBImpl::MaybeIgnoreError(Status* s) const { } void DBImpl::DeleteObsoleteFiles() { + mutex_.AssertHeld(); if (!bg_error_.ok()) { // After a background error, we don't know whether a new version may // or may not have been committed, so we cannot safely garbage collect. return; } + // check filesystem, and then check pending_outputs_ + std::vector filenames; + mutex_.Unlock(); + env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose + mutex_.Lock(); + // Make a set of all of the live files std::set live = pending_outputs_; versions_->AddLiveFiles(&live); @@ -308,11 +310,6 @@ void DBImpl::DeleteObsoleteFiles() { Log(options_.info_log, "[%s] try DeleteObsoleteFiles, total live file num: %llu\n", dbname_.c_str(), static_cast(live.size())); - - std::vector filenames; - mutex_.Unlock(); - env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose - mutex_.Lock(); uint64_t number; FileType type; for (size_t i = 0; i < filenames.size(); i++) { @@ -387,8 +384,8 @@ bool DBImpl::IsDbExist() { if (ParseFileName(files[i], &number, &type) && type == kDescriptorFile) { std::string dscname = dbname_ + "/" + files[i]; uint64_t fsize = 0; - env_->GetFileSize(dscname, &fsize); - if (fsize == 0) { + Status s = env_->GetFileSize(dscname, &fsize); + if (s.ok() && fsize == 0) { // if CURRENT file not exist, empty MANIFEST is dangerous, delete it Log(options_.info_log, "[%s] delete empty manifest: %s.", dbname_.c_str(), dscname.c_str()); @@ -524,11 +521,16 @@ Status DBImpl::Recover(VersionEdit* edit) { } Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, - Version* base) { + Version* base, uint64_t* number) { mutex_.AssertHeld(); const uint64_t start_micros = env_->NowMicros(); FileMetaData meta; - meta.number = BuildFullFileNumber(dbname_, versions_->NewFileNumber()); + if (number) { + *number = BuildFullFileNumber(dbname_, versions_->NewFileNumber()); + meta.number = *number; + } else { + meta.number = BuildFullFileNumber(dbname_, versions_->NewFileNumber()); + } pending_outputs_.insert(meta.number); Iterator* iter = mem->NewIterator(); Log(options_.info_log, "[%s] Level-0 table #%u: started", @@ -577,15 +579,35 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, return s; } -Status DBImpl::CompactMemTable() { +// multithread safe +Status DBImpl::CompactMemTable(bool* sched_idle) { mutex_.AssertHeld(); assert(imm_ != NULL); + Status s; + if (sched_idle) *sched_idle = true; + if (imm_->BeingFlushed()) { + //Log(options_.info_log, "[%s] CompactMemTable conflict, seq %lu", + // dbname_.c_str(), GetLastSequence(false)); + return s; + } + imm_->SetBeingFlushed(true); + + if (imm_->ApproximateMemoryUsage() <= 0) { // imm is empty, do nothing + Log(options_.info_log, "[%s] CompactMemTable empty memtable %lu", + dbname_.c_str(), GetLastSequence(false)); + imm_->Unref(); + imm_ = NULL; + has_imm_.Release_Store(NULL); + return s; + } + if (sched_idle) *sched_idle = false; // Save the contents of the memtable as a new Table VersionEdit edit; + uint64_t number; Version* base = versions_->current(); base->Ref(); - Status s = WriteLevel0Table(imm_, &edit, base); + s = WriteLevel0Table(imm_, &edit, base, &number); base->Unref(); if (s.ok() && shutting_down_.Acquire_Load()) { @@ -594,6 +616,7 @@ Status DBImpl::CompactMemTable() { // Replace immutable memtable with the generated Table if (s.ok()) { + pending_outputs_.insert(number); // LogAndApply donot holds lock, so use pending_outputs_ to make sure new file will not be deleted edit.SetPrevLogNumber(0); edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed if (imm_->GetLastSequence()) { @@ -602,6 +625,9 @@ Status DBImpl::CompactMemTable() { Log(options_.info_log, "[%s] CompactMemTable SetLastSequence %lu", dbname_.c_str(), edit.GetLastSequence()); s = versions_->LogAndApply(&edit, &mutex_); + pending_outputs_.erase(number); + } else { + // TODO: dump memtable error, delete temp file } if (s.ok()) { @@ -609,6 +635,9 @@ Status DBImpl::CompactMemTable() { imm_->Unref(); imm_ = NULL; has_imm_.Release_Store(NULL); + } else { + // imm dump fail, reset being flush flag + imm_->SetBeingFlushed(false); } return s; @@ -640,6 +669,8 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { ManualCompaction manual; manual.level = level; manual.done = false; + manual.being_sched = false; + manual.compaction_conflict = kManualCompactIdle; if (begin == NULL) { manual.begin = NULL; } else { @@ -658,6 +689,9 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { if (manual_compaction_ == NULL) { // Idle manual_compaction_ = &manual; MaybeScheduleCompaction(); + } else if (manual_compaction_->compaction_conflict == kManualCompactWakeup) { + manual_compaction_->compaction_conflict = kManualCompactIdle; + MaybeScheduleCompaction(); } else { // Running either my compaction or another compaction. bg_cv_.Wait(); } @@ -730,55 +764,71 @@ void DBImpl::AddInheritedLiveFiles(std::vector >* live) { } Status DBImpl::RecoverInsertMem(WriteBatch* batch, VersionEdit* edit) { - MutexLock lock(&mutex_); + MutexLock lock(&mutex_); - if (recover_mem_ == NULL) { - recover_mem_ = NewMemTable(); - recover_mem_->Ref(); - } - uint64_t log_sequence = WriteBatchInternal::Sequence(batch); - uint64_t last_sequence = log_sequence + WriteBatchInternal::Count(batch) - 1; - - // if duplicate record, ignore - if (log_sequence <= recover_mem_->GetLastSequence()) { - assert (last_sequence <= recover_mem_->GetLastSequence()); - Log(options_.info_log, "[%s] duplicate record, ignore %lu ~ %lu", - dbname_.c_str(), log_sequence, last_sequence); - return Status::OK(); - } + if (recover_mem_ == NULL) { + recover_mem_ = NewMemTable(); + recover_mem_->Ref(); + } + uint64_t log_sequence = WriteBatchInternal::Sequence(batch); + uint64_t last_sequence = log_sequence + WriteBatchInternal::Count(batch) - 1; + + // if duplicate record, ignore + if (log_sequence <= recover_mem_->GetLastSequence()) { + assert (last_sequence <= recover_mem_->GetLastSequence()); + Log(options_.info_log, "[%s] duplicate record, ignore %lu ~ %lu", + dbname_.c_str(), log_sequence, last_sequence); + return Status::OK(); + } - Status status = WriteBatchInternal::InsertInto(batch, recover_mem_); - MaybeIgnoreError(&status); + Status status = WriteBatchInternal::InsertInto(batch, recover_mem_); + MaybeIgnoreError(&status); + if (!status.ok()) { + return status; + } + if (recover_mem_->ApproximateMemoryUsage() > options_.write_buffer_size) { + edit->SetLastSequence(recover_mem_->GetLastSequence()); + status = WriteLevel0Table(recover_mem_, edit, NULL); if (!status.ok()) { - return status; - } - if (recover_mem_->ApproximateMemoryUsage() > options_.write_buffer_size) { - edit->SetLastSequence(recover_mem_->GetLastSequence()); - status = WriteLevel0Table(recover_mem_, edit, NULL); - if (!status.ok()) { - // Reflect errors immediately so that conditions like full - // file-systems cause the DB::Open() to fail. - return status; - } - recover_mem_->Unref(); - recover_mem_ = NULL; + // Reflect errors immediately so that conditions like full + // file-systems cause the DB::Open() to fail. + return status; } - return status; + recover_mem_->Unref(); + recover_mem_ = NULL; + } + return status; } Status DBImpl::RecoverLastDumpToLevel0(VersionEdit* edit) { - MutexLock lock(&mutex_); - Status status; - if (recover_mem_ == NULL) { - return status; - } - if (recover_mem_->GetLastSequence() > 0) { - edit->SetLastSequence(recover_mem_->GetLastSequence()); - status = WriteLevel0Table(recover_mem_, edit, NULL); + MutexLock lock(&mutex_); + Status s; + if (recover_mem_ == NULL) { + DeleteObsoleteFiles(); + MaybeScheduleCompaction(); + return s; + } + if (recover_mem_->GetLastSequence() > 0) { + edit->SetLastSequence(recover_mem_->GetLastSequence()); + s = WriteLevel0Table(recover_mem_, edit, NULL); + } + recover_mem_->Unref(); + recover_mem_ = NULL; + + // LogAndApply to lg's manifest + if (s.ok()) { + s = versions_->LogAndApply(edit, &mutex_); + if (s.ok()) { + DeleteObsoleteFiles(); + MaybeScheduleCompaction(); + } else { + Log(options_.info_log, "[%s] Fail to modify manifest", + dbname_.c_str()); } - recover_mem_->Unref(); - recover_mem_ = NULL; - return status; + } else { + Log(options_.info_log, "[%s] Fail to dump log to level 0", dbname_.c_str()); + } + return s; } // end of tera-specific @@ -788,30 +838,26 @@ void DBImpl::MaybeScheduleCompaction() { if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions } else { - double score = versions_->CompactionScore(); - if (manual_compaction_ != NULL) { - score = kManualCompactScore; + std::vector scores; + if (imm_ && !imm_->BeingFlushed()) { + scores.push_back(kDumpMemTableScore); } - if (imm_ != NULL) { - score = kDumpMemTableScore; + if (manual_compaction_ && !manual_compaction_->being_sched && + (manual_compaction_->compaction_conflict != kManualCompactConflict)) { + scores.push_back(kManualCompactScore); } - if (score > 0) { - if (bg_compaction_scheduled_ && score <= bg_compaction_score_) { - // Already scheduled - } else if (bg_compaction_scheduled_) { - env_->ReSchedule(bg_schedule_id_, score); - Log(options_.info_log, "[%s] ReSchedule Compact[%ld] score= %.2f", - dbname_.c_str(), bg_schedule_id_, score); - bg_compaction_score_ = score; - } else { - bg_schedule_id_ = env_->Schedule(&DBImpl::BGWork, this, score); - Log(options_.info_log, "[%s] Schedule Compact[%ld] score= %.2f", - dbname_.c_str(), bg_schedule_id_, score); - bg_compaction_score_ = score; - bg_compaction_scheduled_ = true; - } - } else { - // No work to be done + double level_score = versions_->CompactionScore(); + if (level_score > 0) { + scores.push_back(level_score); // Get Max score + } + for (size_t i = 0; i < scores.size(); i++) { + if (bg_compaction_scheduled_ < (int)options_.max_background_compactions) { + bg_compaction_scheduled_++; + + int64_t id = env_->Schedule(&DBImpl::BGWork,this, scores[i]); + Log(options_.info_log, "[%s] Schedule Compact[%ld] score= %.2f currency %d", + dbname_.c_str(), id, scores[i], bg_compaction_scheduled_); + } } } } @@ -821,11 +867,12 @@ void DBImpl::BGWork(void* db) { } void DBImpl::BackgroundCall() { - Log(options_.info_log, "[%s] BackgroundCall", dbname_.c_str()); MutexLock l(&mutex_); - assert(bg_compaction_scheduled_); + Log(options_.info_log, "[%s] BackgroundCall Compact %d", dbname_.c_str(), bg_compaction_scheduled_); + assert(bg_compaction_scheduled_ > 0); + bool sched_idle = false; if (!shutting_down_.Acquire_Load()) { - Status s = BackgroundCompaction(); + Status s = BackgroundCompaction(&sched_idle); if (s.ok()) { // Success consecutive_compaction_errors_ = 0; @@ -854,34 +901,44 @@ void DBImpl::BackgroundCall() { } } - bg_compaction_scheduled_ = false; + bg_compaction_scheduled_--; // Previous compaction may have produced too many files in a level, // so reschedule another compaction if needed. - MaybeScheduleCompaction(); + if (!sched_idle) { + MaybeScheduleCompaction(); + } bg_cv_.SignalAll(); } -Status DBImpl::BackgroundCompaction() { +Status DBImpl::BackgroundCompaction(bool* sched_idle) { mutex_.AssertHeld(); - if (imm_ != NULL) { - return CompactMemTable(); + *sched_idle = false; + if (imm_ && !imm_->BeingFlushed()) { + return CompactMemTable(sched_idle); } - Compaction* c; + Status status; + Compaction* c = NULL; bool is_manual = (manual_compaction_ != NULL); InternalKey manual_end; if (is_manual) { ManualCompaction* m = manual_compaction_; - c = versions_->CompactRange(m->level, m->begin, m->end); - m->done = (c == NULL); + if (m->being_sched) { // other thread doing manual compaction or range being compacted + return status; + } + m->being_sched = true; + bool conflict = false; + c = versions_->CompactRange(m->level, m->begin, m->end, &conflict); + m->compaction_conflict = conflict? kManualCompactConflict : kManualCompactIdle; + m->done = (c == NULL && !conflict); if (c != NULL) { manual_end = c->input(0, c->num_input_files(0) - 1)->largest; } Log(options_.info_log, - "[%s] Manual compaction at level-%d from %s .. %s; will stop at %s\n", - dbname_.c_str(), m->level, + "[%s] Manual compaction, conflit %u, at level-%d from %s .. %s; will stop at %s\n", + dbname_.c_str(), conflict, m->level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"), (m->end ? m->end->DebugString().c_str() : "(end)"), (m->done ? "(end)" : manual_end.DebugString().c_str())); @@ -889,9 +946,9 @@ Status DBImpl::BackgroundCompaction() { c = versions_->PickCompaction(); } - Status status; if (c == NULL) { // Nothing to do + *sched_idle = true; } else if (!is_manual && c->IsTrivialMove()) { // Move file to next level assert(c->num_input_files(0) == 1); @@ -908,12 +965,14 @@ Status DBImpl::BackgroundCompaction() { static_cast(f->file_size), status.ToString().c_str(), versions_->LevelSummary(&tmp)); + versions_->ReleaseCompaction(c, status); } else { CompactionState* compact = new CompactionState(c); status = DoCompactionWork(compact); - CleanupCompaction(compact); + CleanupCompaction(compact); // pop pedning output, which can be deleted in DeleteObSoleteFiles() + versions_->ReleaseCompaction(c, status); // current_version has reference to c->inputs_[0,1] c->ReleaseInputs(); - DeleteObsoleteFiles(); + DeleteObsoleteFiles();// delete any sst not in version set. if comapct and flush mem handle in multi-thread, may delete new data } delete c; @@ -932,16 +991,27 @@ Status DBImpl::BackgroundCompaction() { if (is_manual) { ManualCompaction* m = manual_compaction_; - if (!status.ok()) { - m->done = true; - } - if (!m->done) { - // We only compacted part of the requested range. Update *m - // to the range that is left to be compacted. - m->tmp_storage = manual_end; - m->begin = &m->tmp_storage; + m->being_sched = false; + if (m->compaction_conflict != kManualCompactConflict) { // PickRange success + if (!status.ok()) { + m->done = true; + } + if (!m->done) { + // We only compacted part of the requested range. Update *m + // to the range that is left to be compacted. + m->tmp_storage = manual_end; + m->begin = &m->tmp_storage; + } + manual_compaction_ = NULL; } - manual_compaction_ = NULL; + } else if (manual_compaction_ != NULL) { // non manual compact + ManualCompaction* m = manual_compaction_; + m->compaction_conflict = kManualCompactWakeup;// Wakeup here, ManualCompact thread check it + Log(options_.info_log, + "[%s] Wakeup Manual compaction at level-%d from %s .. %s", + dbname_.c_str(), m->level, + (m->begin ? m->begin->DebugString().c_str() : "(begin)"), + (m->end ? m->end->DebugString().c_str() : "(end)")); } return status; } @@ -959,6 +1029,7 @@ void DBImpl::CleanupCompaction(CompactionState* compact) { for (size_t i = 0; i < compact->outputs.size(); i++) { const CompactionState::Output& out = compact->outputs[i]; pending_outputs_.erase(BuildFullFileNumber(dbname_, out.number)); + Log(options_.info_log, "[%s] erase pending_output #%lu", dbname_.c_str(), out.number); } delete compact; } @@ -976,6 +1047,8 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { out.smallest.Clear(); out.largest.Clear(); compact->outputs.push_back(out); + + Log(options_.info_log, "[%s] insert pending_output #%lu", dbname_.c_str(), file_number); mutex_.Unlock(); } @@ -1072,6 +1145,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { return versions_->LogAndApply(compact->compaction->edit(), &mutex_); } +// *MUST* multi thread safe Status DBImpl::DoCompactionWork(CompactionState* compact) { const uint64_t start_micros = env_->NowMicros(); int64_t imm_micros = 0; // Micros spent doing imm_ compactions @@ -1122,8 +1196,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { if (has_imm_.NoBarrier_Load() != NULL) { const uint64_t imm_start = env_->NowMicros(); mutex_.Lock(); - if (imm_ != NULL) { - CompactMemTable(); + if (imm_ && !imm_->BeingFlushed()) { + CompactMemTable(); // no need check failure, because imm_ not null if dump fail. bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary } mutex_.Unlock(); @@ -1276,7 +1350,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { } VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, - "[%s] compacted to: %s", dbname_.c_str(), versions_->LevelSummary(&tmp)); + "[%s] compacted to: %s, compacte stat %s", dbname_.c_str(), versions_->LevelSummary(&tmp), status.ToString().c_str()); return status; } diff --git a/src/leveldb/db/db_impl.h b/src/leveldb/db/db_impl.h index 2036de199..ffbb5ed62 100644 --- a/src/leveldb/db/db_impl.h +++ b/src/leveldb/db/db_impl.h @@ -111,10 +111,10 @@ class DBImpl : public DB { // Compact the in-memory write buffer to disk. Switches to a new // log-file/memtable and writes a new descriptor iff successful. - Status CompactMemTable() + Status CompactMemTable(bool* sched_idle = NULL) EXCLUSIVE_LOCKS_REQUIRED(mutex_); - Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base) + Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base, uint64_t* number = NULL) EXCLUSIVE_LOCKS_REQUIRED(mutex_); Status MakeRoomForWrite(bool force /* compact even if there is room? */) @@ -124,7 +124,7 @@ class DBImpl : public DB { void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); static void BGWork(void* db); void BackgroundCall(); - Status BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); + Status BackgroundCompaction(bool* sched_idle) EXCLUSIVE_LOCKS_REQUIRED(mutex_); void CleanupCompaction(CompactionState* compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_); Status DoCompactionWork(CompactionState* compact) @@ -193,17 +193,24 @@ class DBImpl : public DB { std::set pending_outputs_; // Has a background compaction been scheduled or is running? - bool bg_compaction_scheduled_; - double bg_compaction_score_; - int64_t bg_schedule_id_; + int bg_compaction_scheduled_; + std::vector bg_compaction_score_; + std::vector bg_schedule_id_; // Information for a manual compaction + enum ManualCompactState { + kManualCompactIdle, + kManualCompactConflict, + kManualCompactWakeup, + }; struct ManualCompaction { int level; bool done; + bool being_sched; const InternalKey* begin; // NULL means beginning of key range const InternalKey* end; // NULL means end of key range InternalKey tmp_storage; // Used to keep track of compaction progress + int compaction_conflict; // 0 == idle, 1 == conflict, 2 == wake }; ManualCompaction* manual_compaction_; diff --git a/src/leveldb/db/db_table.cc b/src/leveldb/db/db_table.cc index 4eb16b52e..eedc353ae 100644 --- a/src/leveldb/db/db_table.cc +++ b/src/leveldb/db/db_table.cc @@ -292,22 +292,6 @@ Status DBTable::Init() { uint32_t i = *it; DBImpl* impl = lg_list_[i]; s = impl->RecoverLastDumpToLevel0(lg_edits[i]); - - // LogAndApply to lg's manifest - if (s.ok()) { - MutexLock lock(&impl->mutex_); - s = impl->versions_->LogAndApply(lg_edits[i], &impl->mutex_); - if (s.ok()) { - impl->DeleteObsoleteFiles(); - impl->MaybeScheduleCompaction(); - } else { - Log(options_.info_log, "[%s] Fail to modify manifest of lg %d", - dbname_.c_str(), - i); - } - } else { - Log(options_.info_log, "[%s] Fail to dump log to level 0", dbname_.c_str()); - } delete lg_edits[i]; } @@ -820,7 +804,6 @@ Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit, if (this->status != NULL && this->status->ok()) *this->status = s; } }; - mutex_.AssertHeld(); // Open the log file @@ -860,7 +843,7 @@ Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit, // dbname_.c_str(), batch_seq, last_sequence_, WriteBatchInternal::Count(&batch)); if (last_seq >= recover_limit) { Log(options_.info_log, "[%s] exceed limit %lu, ignore %lu ~ %lu", - dbname_.c_str(), recover_limit, first_seq, last_seq); + dbname_.c_str(), recover_limit, first_seq, last_seq); continue; } @@ -915,7 +898,6 @@ Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit, } } delete file; - return status; } diff --git a/src/leveldb/db/memtable.cc b/src/leveldb/db/memtable.cc index c9f284110..ddee41b1d 100644 --- a/src/leveldb/db/memtable.cc +++ b/src/leveldb/db/memtable.cc @@ -26,6 +26,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, CompactStrategyFactory* com : last_seq_(0), comparator_(cmp), refs_(0), + being_flushed_(false), table_(comparator_, &arena_), empty_(true), compact_strategy_factory_(compact_strategy_factory) { diff --git a/src/leveldb/db/memtable.h b/src/leveldb/db/memtable.h index ba608550e..a2a1a073a 100644 --- a/src/leveldb/db/memtable.h +++ b/src/leveldb/db/memtable.h @@ -79,6 +79,13 @@ class MemTable { empty_ = false; } + bool BeingFlushed() { return being_flushed_;} + void SetBeingFlushed(bool flag) { + assert(flag ? !being_flushed_ + : being_flushed_); + being_flushed_ = flag; + } + virtual ~MemTable(); protected: @@ -97,6 +104,7 @@ class MemTable { KeyComparator comparator_; int refs_; + bool being_flushed_; Arena arena_; Table table_; diff --git a/src/leveldb/db/version_edit.h b/src/leveldb/db/version_edit.h index c01d11ff8..4d7e3e9ac 100644 --- a/src/leveldb/db/version_edit.h +++ b/src/leveldb/db/version_edit.h @@ -30,6 +30,7 @@ struct FileMetaData { InternalKey largest; // Largest internal key served by table bool smallest_fake; // smallest is not real, have out-of-range keys bool largest_fake; // largest is not real, have out-of-range keys + bool being_compacted; // Is this file undergoing compaction? FileMetaData() : refs(0), @@ -37,7 +38,8 @@ struct FileMetaData { file_size(0), data_size(0), smallest_fake(false), - largest_fake(false) { } + largest_fake(false), + being_compacted(false) { } }; class VersionEdit { diff --git a/src/leveldb/db/version_set.cc b/src/leveldb/db/version_set.cc index 7d94c69ed..b856ad2f6 100644 --- a/src/leveldb/db/version_set.cc +++ b/src/leveldb/db/version_set.cc @@ -70,6 +70,15 @@ static int64_t TotalFileSize(const std::vector& files) { } return sum; } +static int64_t TotalFileSizeNotBeingCompacted(const std::vector& files) { + int64_t sum = 0; + for (size_t i = 0; i < files.size(); i++) { + if (!files[i]->being_compacted) { + sum += files[i]->file_size; + } + } + return sum; +} Version::~Version() { assert(refs_ == 0); @@ -790,6 +799,7 @@ class VersionSetBuilder { FileMetaData* f = new FileMetaData(f_new); f->refs = 1; + f->being_compacted = false; if (f->data_size == 0 && !f->smallest_fake && !f->largest_fake) { // Make sure this is a new file generated by compaction. @@ -986,7 +996,18 @@ void VersionSet::AppendVersion(Version* v) { v->next_->prev_ = v; } -Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { +// multi thread safe +// Information kept for every waiting manifest writer +struct VersionSet::ManifestWriter { + Status status; + VersionEdit* edit; + bool done; + port::CondVar cv; + + explicit ManifestWriter(port::Mutex* mu) : done(false), cv(mu) { } +}; +void VersionSet::LogAndApplyHelper(VersionSetBuilder* builder, + VersionEdit* edit) { if (edit->has_log_number_) { assert(edit->log_number_ >= log_number_); assert(edit->log_number_ < next_file_number_); @@ -1008,13 +1029,27 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { edit->SetLastSequence(last_sequence_); } + builder->Apply(edit); +} +Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { + mu->AssertHeld(); + // multi write control, do not batch edit write, but multi thread safety + ManifestWriter w(mu); + w.edit = edit; + manifest_writers_.push_back(&w); + while (!w.done && &w != manifest_writers_.front()) { + w.cv.Wait(); + } + assert(manifest_writers_.front() == &w); + + // first manifest writer, batch edit Version* v = new Version(this); { VersionSetBuilder builder(this, current_); - builder.Apply(edit); + LogAndApplyHelper(&builder, w.edit); builder.SaveTo(v); } - Finalize(v); + Finalize(v); // recalculate new version score const uint64_t switch_interval = options_->manifest_switch_interval * 1000000UL; if (descriptor_log_ != NULL && @@ -1112,6 +1147,11 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { Log(options_->info_log, "[%s][dfs error] set force_switch_manifest", dbname_.c_str()); } + // TODO: batch write manifest finish + manifest_writers_.pop_front(); + if (!manifest_writers_.empty()) { + manifest_writers_.front()->cv.Signal(); + } return s; } @@ -1421,12 +1461,9 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) { void VersionSet::Finalize(Version* v) { // Precomputed best level for next compaction - int best_level = -1; - double best_score = -1; - for (int level = 0; level < config::kNumLevels-1; level++) { - double score; - if (level == 0) { + double score = 0; + if (level == 0 && level0_compactions_in_progress_.empty()) { // We treat level-0 specially by bounding the number of files // instead of number of bytes for two reasons: // @@ -1441,23 +1478,37 @@ void VersionSet::Finalize(Version* v) { // // (3) More level0 files means write hotspot. // We give lower score to avoid too much level0 compaction. - score = sqrt(v->files_[level].size() / - static_cast(config::kL0_CompactionTrigger)); - } else { + if (v->files_[level].size() <= (size_t)options_->slow_down_level0_score_limit) { + score = v->files_[level].size() / + static_cast(config::kL0_CompactionTrigger); + } else { + score = sqrt(v->files_[level].size() / + static_cast(config::kL0_CompactionTrigger)); + } + } else if (level > 0) { // Compute the ratio of current size to size limit. - const uint64_t level_bytes = TotalFileSize(v->files_[level]); + const uint64_t level_bytes = TotalFileSizeNotBeingCompacted(v->files_[level]); score = static_cast(level_bytes) / MaxBytesForLevel(level, options_->sst_size); } - - if (score > best_score) { - best_level = level; - best_score = score; + v->compaction_level_[level] = level; + v->compaction_score_[level] = (score < 1.0) ? 0: score; + } + + // sort all the levels based on their score. Higher scores get listed + // first. Use bubble sort because the number of entries are small. + for (int i = 0; i < config::kNumLevels - 2; i++) { + for (int j = i + 1; j < config::kNumLevels - 1; j++) { + if (v->compaction_score_[i] < v->compaction_score_[j]) { + int level = v->compaction_level_[i]; + double score = v->compaction_score_[i]; + v->compaction_level_[i] = v->compaction_level_[j]; + v->compaction_score_[i] = v->compaction_score_[j]; + v->compaction_level_[j] = level; + v->compaction_score_[j] = score; + } } } - - v->compaction_level_ = best_level; - v->compaction_score_ = best_score; } Status VersionSet::WriteSnapshot(log::Writer* log) { @@ -1698,62 +1749,177 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) { return result; } +void VersionSet::PrintFilesInCompaction(const std::vector& inputs) { + char buf[30]; + std::string fstr = "file: "; + for (size_t i = 0; i < inputs.size(); i++) { + FileMetaData* f = inputs[i]; + if (f->being_compacted) { + snprintf(buf, sizeof(buf), "%lu ", f->number); + fstr.append(buf); + break; + } + } + Log(options_->info_log, "[%s] test mark level [%s] bening compact.", dbname_.c_str(), + fstr.c_str()); + return; +} +bool VersionSet::FilesInCompaction(const std::vector& inputs) { + for (size_t i = 0; i < inputs.size(); i++) { + FileMetaData* f = inputs[i]; + if (f->being_compacted) { + return true; + } + } + return false; +} +void VersionSet::PrintRangeInCompaction(const InternalKey* smallest, const InternalKey* largest, int level) { + std::vector inputs; + assert(level < config::kNumLevels); + current_->GetOverlappingInputs(level, smallest, largest, &inputs); + PrintFilesInCompaction(inputs); + return; +} +bool VersionSet::RangeInCompaction(const InternalKey* smallest, const InternalKey* largest, int level) { + std::vector inputs; + assert(level < config::kNumLevels); + current_->GetOverlappingInputs(level, smallest, largest, &inputs); + return FilesInCompaction(inputs); +} +// Note: +// 1) if f in leve1 being compacted, level0 may be blocked; +// 2) compacting pointer may cause other f in the same level to be blocked. +bool VersionSet::PickCompactionBySize(int level, std::vector* inputs) { + inputs->clear(); + std::vector candidate; + // Pick the first file that comes after compact_pointer_[level] + for (size_t i = 0; i < current_->files_[level].size(); i++) { + FileMetaData* f = current_->files_[level][i]; + if (f->being_compacted) { + //Log(options_->info_log, "[%s] PickCompactionBySize, level %d, f[%s, %s] being_compact %d\n", + // dbname_.c_str(), level, + // f->smallest.Encode().ToString().c_str(), f->largest.Encode().ToString().c_str(), + // f->being_compacted); + continue; + } + if (!compact_pointer_[level].empty() && + icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) <= 0) { + //Log(options_->info_log, "[%s] PickCompactionBySize, skip by compact_pointer_[%d] %s, f[%s, %s] being_compacted %d\n", + // dbname_.c_str(), level, compact_pointer_[level].c_str(), + // f->smallest.Encode().ToString().c_str(), f->largest.Encode().ToString().c_str(), + // f->being_compacted); + if (!RangeInCompaction(&f->smallest, &f->largest, level + 1)) { + candidate.push_back(f); + } + continue; + } + if (RangeInCompaction(&f->smallest, &f->largest, level + 1)) { + //PrintRangeInCompaction(&f->smallest, &f->largest, level + 1); + continue; + } + inputs->push_back(f); + break; + } + if (inputs->empty()) { + // Wrap-around to the beginning of the key space + FileMetaData* f = current_->files_[level][0]; + if (!f->being_compacted && !RangeInCompaction(&f->smallest, &f->largest, level + 1)) { + inputs->push_back(f); + } + //Log(options_->info_log, "[%s] PickCompactBySize, wrap-arroud level %d, f[%s, %s] being_compacted %d\n", + // dbname_.c_str(), level, + // f->smallest.Encode().ToString().c_str(), f->largest.Encode().ToString().c_str(), + // f->being_compacted); + //PrintRangeInCompaction(&f->smallest, &f->largest, level + 1); + } + if (inputs->empty() && candidate.size() > 0) { + inputs->push_back(candidate[candidate.size() - 1]); + } + return !inputs->empty(); +} Compaction* VersionSet::PickCompaction() { - Compaction* c; - int level; + int level = -1; + std::vector inputs; // We prefer compactions triggered by too much data in a level over // the compactions triggered by seeks. - const bool size_compaction = (current_->compaction_score_ >= 1); - const bool seek_compaction = (current_->file_to_compact_ != NULL); - if (size_compaction) { - level = current_->compaction_level_; - assert(level >= 0); - assert(level+1 < config::kNumLevels); - c = new Compaction(level); - - // Pick the first file that comes after compact_pointer_[level] - for (size_t i = 0; i < current_->files_[level].size(); i++) { - FileMetaData* f = current_->files_[level][i]; - if (compact_pointer_[level].empty() || - icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) { - c->inputs_[0].push_back(f); + assert(level0_compactions_in_progress_.size() <= 1); + bool skipped_l0 = false; + for (size_t li = 0; li < current_->compaction_score_.size(); li++) { + double score = current_->compaction_score_[li]; + level = current_->compaction_level_[li]; + assert(li == 0 || score <= current_->compaction_score_[li - 1]); + if (score >= 1) { + if (skipped_l0 && level <= 1) { + // level0 in progress and level 0 will not directly compact to level > 1 + //Log(options_->info_log, "[%s] lock level %d, conflict, score %.2f\n", + // dbname_.c_str(), level, score); + continue; + } + if (level == 0 && !level0_compactions_in_progress_.empty()) { + skipped_l0 = true; + //Log(options_->info_log, "[%s] level %d in progress, conflict, score %.2f\n", + // dbname_.c_str(), level, score); + continue; + } + if (PickCompactionBySize(level, &inputs)) { break; } + //Log(options_->info_log, "[%s] pick level %d, conflict, score %.2f\n", + // dbname_.c_str(), level, score); } - if (c->inputs_[0].empty()) { - // Wrap-around to the beginning of the key space - c->inputs_[0].push_back(current_->files_[level][0]); - } - } else if (seek_compaction) { + } + // check seek compaction + if (inputs.empty() && + (current_->file_to_compact_ != NULL)) { level = current_->file_to_compact_level_; - c = new Compaction(level); - c->inputs_[0].push_back(current_->file_to_compact_); - } else { + FileMetaData* f = current_->file_to_compact_; + if (!f->being_compacted && !RangeInCompaction(&f->smallest, &f->largest, level + 1)) { + inputs.push_back(f); + } + } + if (inputs.empty()) { return NULL; } - - c->input_version_ = current_; - c->input_version_->Ref(); - c->max_output_file_size_ = - MaxFileSizeForLevel(level + 1, current_->vset_->options_->sst_size); + assert(inputs.size() == 1); + assert(level >= 0); // Files in level 0 may overlap each other, so pick up all overlapping ones if (level == 0) { + assert(level0_compactions_in_progress_.size() == 0); InternalKey smallest, largest; - GetRange(c->inputs_[0], &smallest, &largest); + GetRange(inputs, &smallest, &largest); // Note that the next call will discard the file we placed in // c->inputs_[0] earlier and replace it with an overlapping set // which will include the picked file. - current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]); - assert(!c->inputs_[0].empty()); + current_->GetOverlappingInputs(level, &smallest, &largest, &inputs); + GetRange(inputs, &smallest, &largest); + if (RangeInCompaction(&smallest, &largest, level + 1)) { // make sure level1 not in compaction + Log(options_->info_log, "[%s] level1 in compacting, level0 conflict\n", + dbname_.c_str()); + return NULL; + } + assert(!inputs.empty()); + assert(!FilesInCompaction(inputs)); } + // expand inputs + Compaction* c = new Compaction(level); + c->input_version_ = current_; + c->input_version_->Ref(); // make sure compacting version will not delete + c->max_output_file_size_ = + MaxFileSizeForLevel(level + 1, current_->vset_->options_->sst_size); + c->inputs_[0] = inputs; SetupOtherInputs(c); + // mark being compacted + c->MarkBeingCompacted(true); + if (level == 0) { + level0_compactions_in_progress_.push_back(c); + } + Finalize(current_); // reculate level score return c; } - void VersionSet::SetupOtherInputs(Compaction* c) { const int level = c->level(); InternalKey smallest, largest; @@ -1781,7 +1947,10 @@ void VersionSet::SetupOtherInputs(Compaction* c) { std::vector expanded1; current_->GetOverlappingInputs(level+1, &new_start, &new_limit, &expanded1); - if (expanded1.size() == c->inputs_[1].size()) { + // check expanded file wether in compacting + if ((expanded1.size() == c->inputs_[1].size()) && + !RangeInCompaction(&new_start, &new_limit, level) && + !RangeInCompaction(&new_start, &new_limit, level + 1)) { Log(options_->info_log, "[%s] Expanding@%d %d+%d (%ld+%ld bytes) to %d+%d (%ld+%ld bytes)\n", dbname_.c_str(), @@ -1855,11 +2024,18 @@ void VersionSet::SetupCompactionBoundary(Compaction* c) { Compaction* VersionSet::CompactRange( int level, const InternalKey* begin, - const InternalKey* end) { + const InternalKey* end, bool* being_compacted) { + *being_compacted = false; std::vector inputs; current_->GetOverlappingInputs(level, begin, end, &inputs); if (inputs.empty()) { - return NULL; + return NULL; + } + + // check level0 wether in compaction + if (level == 0 && !level0_compactions_in_progress_.empty()) { + *being_compacted = true; + return NULL; } // Avoid compacting too much in one shot in case the range is large. @@ -1880,6 +2056,18 @@ Compaction* VersionSet::CompactRange( } } + // check being compacting + InternalKey smallest, largest; + GetRange(inputs, &smallest, &largest); + if (FilesInCompaction(inputs) || RangeInCompaction(&smallest, &largest, level + 1)) { + PrintFilesInCompaction(inputs); + PrintRangeInCompaction(&smallest, &largest, level + 1); + Log(options_->info_log, "[%s] RangeCompaction : %s...%s, level: %d or %d, in compaction", + dbname_.c_str(), smallest.DebugString().c_str(), largest.DebugString().c_str(), level, level + 1); + *being_compacted = true; + return NULL; + } + Compaction* c = new Compaction(level); c->input_version_ = current_; c->input_version_->Ref(); @@ -1887,9 +2075,28 @@ Compaction* VersionSet::CompactRange( MaxFileSizeForLevel(level + 1, current_->vset_->options_->sst_size); c->inputs_[0] = inputs; SetupOtherInputs(c); + + // mark being compacted + c->MarkBeingCompacted(true); + if (level == 0) { + level0_compactions_in_progress_.push_back(c); + } + Finalize(current_); // reculate level score return c; } +void VersionSet::ReleaseCompaction(Compaction* c, Status& s) { + c->MarkBeingCompacted(false); + assert(level0_compactions_in_progress_.size() <= 1); + if (c->level() == 0 && level0_compactions_in_progress_[0] == c) { + level0_compactions_in_progress_.resize(0); + } + if (!s.ok()) { + Finalize(current_); + } + return; +} + Compaction::Compaction(int level) : level_(level), max_output_file_size_(0), @@ -1969,6 +2176,16 @@ bool Compaction::ShouldStopBefore(const Slice& internal_key) { } } +void Compaction::MarkBeingCompacted(bool flag) { + for (size_t i = 0; i < 2; i++) { + for (size_t j = 0; j < inputs_[i].size(); j++) { + assert(flag ? !inputs_[i][j]->being_compacted + : inputs_[i][j]->being_compacted); + inputs_[i][j]->being_compacted = flag; + } + } +} + void Compaction::ReleaseInputs() { if (input_version_ != NULL) { input_version_->Unref(); diff --git a/src/leveldb/db/version_set.h b/src/leveldb/db/version_set.h index 676cc64ee..2276b0a10 100644 --- a/src/leveldb/db/version_set.h +++ b/src/leveldb/db/version_set.h @@ -19,6 +19,7 @@ #ifndef STORAGE_LEVELDB_DB_VERSION_SET_H_ #define STORAGE_LEVELDB_DB_VERSION_SET_H_ +#include #include #include #include @@ -140,15 +141,19 @@ class Version { // Level that should be compacted next and its compaction score. // Score < 1 means compaction is not strictly needed. These fields // are initialized by Finalize(). - double compaction_score_; - int compaction_level_; + std::vector compaction_score_; + std::vector compaction_level_; explicit Version(VersionSet* vset) : vset_(vset), next_(this), prev_(this), refs_(0), file_to_compact_(NULL), - file_to_compact_level_(-1), - compaction_score_(-1), - compaction_level_(-1) { + file_to_compact_level_(-1) { + compaction_score_.resize(config::kNumLevels - 1); + compaction_level_.resize(config::kNumLevels - 1); + for (size_t i = 0; i < config::kNumLevels - 1; i++) { + compaction_score_[i] = -1.0; + compaction_level_[i] = -1; + } } ~Version(); @@ -171,6 +176,8 @@ class VersionSet { // current version. Will release *mu while actually writing to the file. // REQUIRES: *mu is held on entry. // REQUIRES: no other thread concurrently calls LogAndApply() + void LogAndApplyHelper(VersionSetBuilder* builder, + VersionEdit* edit); Status LogAndApply(VersionEdit* edit, port::Mutex* mu) EXCLUSIVE_LOCKS_REQUIRED(mu); @@ -233,7 +240,10 @@ class VersionSet { Compaction* CompactRange( int level, const InternalKey* begin, - const InternalKey* end); + const InternalKey* end, bool* being_compacted); + + // release file's being_compacted flag, and release level0's lock + void ReleaseCompaction(Compaction* c, Status& s); // Return the maximum overlapping data (in bytes) at next level for any // file at a level >= 1. @@ -246,17 +256,19 @@ class VersionSet { // Returns true iff some level needs a compaction. bool NeedsCompaction() const { Version* v = current_; - return (v->compaction_score_ >= 1) || (v->file_to_compact_ != NULL); + return (v->compaction_score_[0] >= 1) || (v->file_to_compact_ != NULL); } double CompactionScore() const { Version* v = current_; - if (v->compaction_score_ >= 1) { - return v->compaction_score_; - } else if (v->file_to_compact_ != NULL) { + double score = v->compaction_score_[0]; + if (score <= 0) { + if (v->file_to_compact_ && !v->file_to_compact_->being_compacted) { return 0.1f; + } + return -1.0; } - return -1.0; + return score; } // Add all files listed in any live version to *live. @@ -279,6 +291,7 @@ class VersionSet { friend class Compaction; friend class Version; friend class VersionSetBuilder; + struct ManifestWriter; void Finalize(Version* v); @@ -305,6 +318,13 @@ class VersionSet { bool ModifyFileSize(FileMetaData* f); + // milti thread compaction relatively + void PrintFilesInCompaction(const std::vector& inputs); + bool FilesInCompaction(const std::vector& inputs); + void PrintRangeInCompaction(const InternalKey* smallest, const InternalKey* largest, int level); + bool RangeInCompaction(const InternalKey* smallest, const InternalKey* largest, int level); + bool PickCompactionBySize(int level, std::vector* inputs); + Env* const env_; const std::string dbname_; const Options* const options_; @@ -320,6 +340,8 @@ class VersionSet { uint64_t log_number_; uint64_t prev_log_number_; // 0 or backing store for memtable being compacted + std::deque manifest_writers_; + // Opened lazily WritableFile* descriptor_file_; log::Writer* descriptor_log_; @@ -329,6 +351,7 @@ class VersionSet { // Per-level key at which the next compaction at that level should start. // Either an empty string, or a valid InternalKey. std::string compact_pointer_[config::kNumLevels]; + std::vector level0_compactions_in_progress_; // No copying allowed VersionSet(const VersionSet&); @@ -373,6 +396,8 @@ class Compaction { // before processing "internal_key". bool ShouldStopBefore(const Slice& internal_key); + void MarkBeingCompacted(bool flag = true); + // Release the input version for the compaction, once the compaction // is successful. void ReleaseInputs(); @@ -385,6 +410,7 @@ class Compaction { private: friend class Version; friend class VersionSet; + friend class DBimpl; explicit Compaction(int level); diff --git a/src/leveldb/include/leveldb/options.h b/src/leveldb/include/leveldb/options.h index 1d36a69b1..1868e8cba 100644 --- a/src/leveldb/include/leveldb/options.h +++ b/src/leveldb/include/leveldb/options.h @@ -290,6 +290,14 @@ struct Options { // disable write-ahead-log bool disable_wal; + // Max thread alloc for lg's compaction + // Default: 1 + uint32_t max_background_compactions; + + // if level0's file num >= limit, use sqrt slow down level score + // Default: 30 + int slow_down_level0_score_limit; + // Create an Options object with default values for all fields. Options(); }; diff --git a/src/leveldb/util/options.cc b/src/leveldb/util/options.cc index 4d62b5039..6ac432334 100644 --- a/src/leveldb/util/options.cc +++ b/src/leveldb/util/options.cc @@ -49,7 +49,9 @@ Options::Options() sst_size(kDefaultSstSize), verify_checksums_in_compaction(false), ignore_corruption_in_compaction(false), - disable_wal(false) { + disable_wal(false), + max_background_compactions(5), + slow_down_level0_score_limit(30) { } } // namespace leveldb