diff --git a/src/leveldb/db/db_impl.cc b/src/leveldb/db/db_impl.cc index c076008de..e864f7dc7 100644 --- a/src/leveldb/db/db_impl.cc +++ b/src/leveldb/db/db_impl.cc @@ -166,10 +166,6 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) logfile_number_(0), log_(NULL), bound_log_size_(0), - bg_compaction_scheduled_(false), - bg_compaction_score_(0), - bg_compaction_timeout_(0), - bg_schedule_id_(0), manual_compaction_(NULL), consecutive_compaction_errors_(0), flush_on_destroy_(false) { @@ -196,10 +192,11 @@ Status DBImpl::Shutdown1() { shutting_down_.Release_Store(this); // Any non-NULL value is ok Log(options_.info_log, "[%s] wait bg compact finish", dbname_.c_str()); - if (bg_compaction_scheduled_) { - env_->ReSchedule(bg_schedule_id_, kDumpMemTableUrgentScore, 0); + std::vector::iterator it = bg_compaction_tasks_.begin(); + for (; it != bg_compaction_tasks_.end(); ++it) { + env_->ReSchedule((*it)->id, kDumpMemTableUrgentScore, 0); } - while (bg_compaction_scheduled_) { + while (bg_compaction_tasks_.size() > 0) { bg_cv_.Wait(); } @@ -315,12 +312,19 @@ void DBImpl::MaybeIgnoreError(Status* s) const { } void DBImpl::DeleteObsoleteFiles() { + mutex_.AssertHeld(); if (!bg_error_.ok()) { // After a background error, we don't know whether a new version may // or may not have been committed, so we cannot safely garbage collect. return; } + // check filesystem, and then check pending_outputs_ + std::vector filenames; + mutex_.Unlock(); + env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose + mutex_.Lock(); + // Make a set of all of the live files std::set live = pending_outputs_; versions_->AddLiveFiles(&live); @@ -330,11 +334,6 @@ void DBImpl::DeleteObsoleteFiles() { Log(options_.info_log, "[%s] try DeleteObsoleteFiles, total live file num: %llu\n", dbname_.c_str(), static_cast(live.size())); - - std::vector filenames; - mutex_.Unlock(); - env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose - mutex_.Lock(); uint64_t number; FileType type; for (size_t i = 0; i < filenames.size(); i++) { @@ -672,11 +671,14 @@ Status DBImpl::Recover(VersionEdit* edit) { } Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, - Version* base) { + Version* base, uint64_t* number) { mutex_.AssertHeld(); const uint64_t start_micros = env_->NowMicros(); FileMetaData meta; meta.number = BuildFullFileNumber(dbname_, versions_->NewFileNumber()); + if (number) { + *number = meta.number; + } pending_outputs_.insert(meta.number); Iterator* iter = mem->NewIterator(); Log(options_.info_log, "[%s] Level-0 table #%u: started", @@ -724,15 +726,39 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, return s; } -Status DBImpl::CompactMemTable() { +// multithread safe +Status DBImpl::CompactMemTable(bool* sched_idle) { mutex_.AssertHeld(); assert(imm_ != NULL); + Status s; + if (sched_idle) { + *sched_idle = true; + } + if (imm_->BeingFlushed()) { + //Log(options_.info_log, "[%s] CompactMemTable conflict, seq %lu", + // dbname_.c_str(), GetLastSequence(false)); + return s; + } + imm_->SetBeingFlushed(true); + + if (imm_->ApproximateMemoryUsage() <= 0) { // imm is empty, do nothing + Log(options_.info_log, "[%s] CompactMemTable empty memtable %lu", + dbname_.c_str(), GetLastSequence(false)); + imm_->Unref(); + imm_ = NULL; + has_imm_.Release_Store(NULL); + return s; + } + if (sched_idle) { + *sched_idle = false; + } // Save the contents of the memtable as a new Table VersionEdit edit; + uint64_t number; Version* base = versions_->current(); base->Ref(); - Status s = WriteLevel0Table(imm_, &edit, base); + s = WriteLevel0Table(imm_, &edit, base, &number); base->Unref(); if (s.ok() && shutting_down_.Acquire_Load()) { @@ -741,6 +767,7 @@ Status DBImpl::CompactMemTable() { // Replace immutable memtable with the generated Table if (s.ok()) { + pending_outputs_.insert(number); // LogAndApply donot holds lock, so use pending_outputs_ to make sure new file will not be deleted edit.SetPrevLogNumber(0); edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed if (imm_->GetLastSequence()) { @@ -749,6 +776,7 @@ Status DBImpl::CompactMemTable() { Log(options_.info_log, "[%s] CompactMemTable SetLastSequence %lu", dbname_.c_str(), edit.GetLastSequence()); s = versions_->LogAndApply(&edit, &mutex_); + pending_outputs_.erase(number); } if (s.ok()) { @@ -756,6 +784,9 @@ Status DBImpl::CompactMemTable() { imm_->Unref(); imm_ = NULL; has_imm_.Release_Store(NULL); + } else { + // imm dump fail, reset being flush flag + imm_->SetBeingFlushed(false); } return s; @@ -787,6 +818,8 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { ManualCompaction manual; manual.level = level; manual.done = false; + manual.being_sched = false; + manual.compaction_conflict = kManualCompactIdle; if (begin == NULL) { manual.begin = NULL; } else { @@ -805,6 +838,9 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { if (manual_compaction_ == NULL) { // Idle manual_compaction_ = &manual; MaybeScheduleCompaction(); + } else if (manual_compaction_->compaction_conflict == kManualCompactWakeup) { + manual_compaction_->compaction_conflict = kManualCompactIdle; + MaybeScheduleCompaction(); } else { // Running either my compaction or another compaction. bg_cv_.Wait(); } @@ -877,117 +913,142 @@ void DBImpl::AddInheritedLiveFiles(std::vector >* live) { } Status DBImpl::RecoverInsertMem(WriteBatch* batch, VersionEdit* edit) { - MutexLock lock(&mutex_); + MutexLock lock(&mutex_); - if (recover_mem_ == NULL) { - recover_mem_ = NewMemTable(); - recover_mem_->Ref(); - } - uint64_t log_sequence = WriteBatchInternal::Sequence(batch); - uint64_t last_sequence = log_sequence + WriteBatchInternal::Count(batch) - 1; + if (recover_mem_ == NULL) { + recover_mem_ = NewMemTable(); + recover_mem_->Ref(); + } + uint64_t log_sequence = WriteBatchInternal::Sequence(batch); + uint64_t last_sequence = log_sequence + WriteBatchInternal::Count(batch) - 1; - // if duplicate record, ignore - if (log_sequence <= recover_mem_->GetLastSequence()) { - assert (last_sequence <= recover_mem_->GetLastSequence()); - Log(options_.info_log, "[%s] duplicate record, ignore %lu ~ %lu", - dbname_.c_str(), log_sequence, last_sequence); - return Status::OK(); - } + // if duplicate record, ignore + if (log_sequence <= recover_mem_->GetLastSequence()) { + assert (last_sequence <= recover_mem_->GetLastSequence()); + Log(options_.info_log, "[%s] duplicate record, ignore %lu ~ %lu", + dbname_.c_str(), log_sequence, last_sequence); + return Status::OK(); + } - Status status = WriteBatchInternal::InsertInto(batch, recover_mem_); - MaybeIgnoreError(&status); + Status status = WriteBatchInternal::InsertInto(batch, recover_mem_); + MaybeIgnoreError(&status); + if (!status.ok()) { + return status; + } + if (recover_mem_->ApproximateMemoryUsage() > options_.write_buffer_size) { + edit->SetLastSequence(recover_mem_->GetLastSequence()); + status = WriteLevel0Table(recover_mem_, edit, NULL); if (!status.ok()) { - return status; - } - if (recover_mem_->ApproximateMemoryUsage() > options_.write_buffer_size) { - edit->SetLastSequence(recover_mem_->GetLastSequence()); - status = WriteLevel0Table(recover_mem_, edit, NULL); - if (!status.ok()) { - // Reflect errors immediately so that conditions like full - // file-systems cause the DB::Open() to fail. - return status; - } - recover_mem_->Unref(); - recover_mem_ = NULL; + // Reflect errors immediately so that conditions like full + // file-systems cause the DB::Open() to fail. + return status; } - return status; + recover_mem_->Unref(); + recover_mem_ = NULL; + } + return status; } Status DBImpl::RecoverLastDumpToLevel0(VersionEdit* edit) { - MutexLock lock(&mutex_); - Status status; - if (recover_mem_ == NULL) { - return status; - } + MutexLock lock(&mutex_); + Status s; + if (recover_mem_ != NULL) { if (recover_mem_->GetLastSequence() > 0) { - edit->SetLastSequence(recover_mem_->GetLastSequence()); - status = WriteLevel0Table(recover_mem_, edit, NULL); + edit->SetLastSequence(recover_mem_->GetLastSequence()); + s = WriteLevel0Table(recover_mem_, edit, NULL); } recover_mem_->Unref(); recover_mem_ = NULL; - return status; -} + } + assert(recover_mem_ == NULL); + // LogAndApply to lg's manifest + if (s.ok()) { + s = versions_->LogAndApply(edit, &mutex_); + if (s.ok()) { + DeleteObsoleteFiles(); + MaybeScheduleCompaction(); + } else { + Log(options_.info_log, "[%s] Fail to modify manifest", + dbname_.c_str()); + } + } else { + Log(options_.info_log, "[%s] Fail to dump log to level 0", dbname_.c_str()); + } + return s; +} // end of tera-specific +bool ScoreSortGreater(std::pair i, std::pair j) { + if (i.second != j.second) { + return i.second < j.second; + } else { + return i.first > j.first; + } +} void DBImpl::MaybeScheduleCompaction() { mutex_.AssertHeld(); if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions } else { - uint64_t timeout = 0; - double score = versions_->CompactionScore(&timeout); - if (manual_compaction_ != NULL) { - score = kManualCompactScore; - timeout = 0; - } - if (imm_ != NULL) { - score = kDumpMemTableScore; - timeout = 0; - } - if (score > 0) { - if (!bg_compaction_scheduled_) { - bg_schedule_id_ = env_->Schedule(&DBImpl::BGWork, this, score, timeout); - Log(options_.info_log, "[%s] Schedule Compact[%ld] score= %.2f, timeout=%lu", - dbname_.c_str(), bg_schedule_id_, score, timeout); - bg_compaction_score_ = score; - bg_compaction_timeout_ = timeout; - bg_compaction_scheduled_ = true; - assert(score <= 1 || timeout == 0); // if score > 1, then timeout MUST be 0 - } else { - // use the same way to compute priority score, like util/thread_pool.h - bool need_resched = false; - if (timeout != bg_compaction_timeout_) { - need_resched = timeout < bg_compaction_timeout_; - } else if (score != bg_compaction_score_) { - need_resched = score > bg_compaction_score_; - } - - if (need_resched) { - env_->ReSchedule(bg_schedule_id_, score, timeout); - Log(options_.info_log, "[%s] ReSchedule Compact[%ld] score= %.2f, timeout=%lu", - dbname_.c_str(), bg_schedule_id_, score, timeout); - bg_compaction_score_ = score; - bg_compaction_timeout_ = timeout; - assert(score <= 1 || timeout == 0); // if score > 1, then timeout MUST be 0 + std::vector > scores; + if (imm_ && !imm_->BeingFlushed()) { + scores.push_back(std::pair(kDumpMemTableScore, 0)); + } + if (manual_compaction_ && !manual_compaction_->being_sched && + (manual_compaction_->compaction_conflict != kManualCompactConflict)) { + scores.push_back(std::pair(kManualCompactScore, 0)); + } + versions_->CompactionScore(&scores); + + size_t qlen = scores.size() > bg_compaction_tasks_.size() ? scores.size(): bg_compaction_tasks_.size(); + for (size_t i = 0; i < bg_compaction_tasks_.size(); i++) { + CompactionTask* task = bg_compaction_tasks_[i]; + scores.push_back(std::pair(task->score, task->timeout)); + } + std::sort(scores.begin(), scores.end(), ScoreSortGreater); + + for (size_t i = 0; i < qlen; i++) { + if (bg_compaction_tasks_.size() < options_.max_background_compactions) { + if (i < bg_compaction_tasks_.size()) { // try reschedule + CompactionTask* task = bg_compaction_tasks_[i]; + if (ScoreSortGreater(scores[i], std::pair(task->score, task->timeout))) { // resched + task->score = scores[i].first; + task->timeout = scores[i].second; + env_->ReSchedule(task->id, task->score, task->timeout); + Log(options_.info_log, "[%s] ReSchedule Compact[%ld] score= %.2f, timeout=%lu, currency %d", + dbname_.c_str(), task->id, task->score, task->timeout, (int)bg_compaction_tasks_.size()); + assert(scores[i].first <= 1 || scores[i].second == 0); // if score > 1, then timeout MUST be 0 + } + } else { // new compact task + CompactionTask* task = new CompactionTask; + task->db = this; + task->score = scores[i].first; + task->timeout = scores[i].second; + bg_compaction_tasks_.push_back(task); + task->id = env_->Schedule(&DBImpl::BGWork, task, task->score, task->timeout); + Log(options_.info_log, "[%s] Schedule Compact[%ld] score= %.2f, timeout=%lu, currency %d", + dbname_.c_str(), task->id, task->score, task->timeout, (int)bg_compaction_tasks_.size()); + assert(scores[i].first <= 1 || scores[i].second == 0); // if score > 1, then timeout MUST be 0 } } - } else { - // No work to be done } } + return; } -void DBImpl::BGWork(void* db) { - reinterpret_cast(db)->BackgroundCall(); +void DBImpl::BGWork(void* task) { + CompactionTask* ctask = reinterpret_cast(task); + reinterpret_cast(ctask->db)->BackgroundCall(ctask); } -void DBImpl::BackgroundCall() { - Log(options_.info_log, "[%s] BackgroundCall", dbname_.c_str()); +void DBImpl::BackgroundCall(CompactionTask* task) { MutexLock l(&mutex_); - assert(bg_compaction_scheduled_); + Log(options_.info_log, "[%s] BackgroundCompact[%ld] score= %.2f currency %d", + dbname_.c_str(), task->id, task->score, (int)bg_compaction_tasks_.size()); + bool sched_idle = false; if (!shutting_down_.Acquire_Load()) { - Status s = BackgroundCompaction(); + Status s = BackgroundCompaction(&sched_idle); if (s.ok()) { // Success consecutive_compaction_errors_ = 0; @@ -1014,36 +1075,53 @@ void DBImpl::BackgroundCall() { env_->SleepForMicroseconds(seconds_to_sleep * 1000000); mutex_.Lock(); } + } else { + sched_idle = true; } - bg_compaction_scheduled_ = false; + std::vector::iterator task_id = std::find(bg_compaction_tasks_.begin(), + bg_compaction_tasks_.end(), + task); + assert(task_id != bg_compaction_tasks_.end()); + bg_compaction_tasks_.erase(task_id); + delete task; // Previous compaction may have produced too many files in a level, // so reschedule another compaction if needed. - MaybeScheduleCompaction(); + if (!sched_idle) { + MaybeScheduleCompaction(); + } bg_cv_.SignalAll(); } -Status DBImpl::BackgroundCompaction() { +Status DBImpl::BackgroundCompaction(bool* sched_idle) { mutex_.AssertHeld(); - if (imm_ != NULL) { - return CompactMemTable(); + *sched_idle = false; + if (imm_ && !imm_->BeingFlushed()) { + return CompactMemTable(sched_idle); } - Compaction* c; + Status status; + Compaction* c = NULL; bool is_manual = (manual_compaction_ != NULL); InternalKey manual_end; if (is_manual) { ManualCompaction* m = manual_compaction_; - c = versions_->CompactRange(m->level, m->begin, m->end); - m->done = (c == NULL); + if (m->being_sched) { // other thread doing manual compaction or range being compacted + return status; + } + m->being_sched = true; + bool conflict = false; + c = versions_->CompactRange(m->level, m->begin, m->end, &conflict); + m->compaction_conflict = conflict? kManualCompactConflict : kManualCompactIdle; + m->done = (c == NULL && !conflict); if (c != NULL) { manual_end = c->input(0, c->num_input_files(0) - 1)->largest; } Log(options_.info_log, - "[%s] Manual compaction at level-%d from %s .. %s; will stop at %s\n", - dbname_.c_str(), m->level, + "[%s] Manual compaction, conflit %u, at level-%d from %s .. %s; will stop at %s\n", + dbname_.c_str(), conflict, m->level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"), (m->end ? m->end->DebugString().c_str() : "(end)"), (m->done ? "(end)" : manual_end.DebugString().c_str())); @@ -1051,9 +1129,9 @@ Status DBImpl::BackgroundCompaction() { c = versions_->PickCompaction(); } - Status status; if (c == NULL) { // Nothing to do + *sched_idle = true; } else if (!is_manual && c->IsTrivialMove()) { // Move file to next level assert(c->num_input_files(0) == 1); @@ -1070,10 +1148,12 @@ Status DBImpl::BackgroundCompaction() { static_cast(f->file_size), status.ToString().c_str(), versions_->LevelSummary(&tmp)); + versions_->ReleaseCompaction(c, status); } else { CompactionState* compact = new CompactionState(c); status = DoCompactionWork(compact); - CleanupCompaction(compact); + CleanupCompaction(compact); // pop pedning output, which can be deleted in DeleteObSoleteFiles() + versions_->ReleaseCompaction(c, status); // current_version has reference to c->inputs_[0,1] c->ReleaseInputs(); DeleteObsoleteFiles(); } @@ -1094,16 +1174,27 @@ Status DBImpl::BackgroundCompaction() { if (is_manual) { ManualCompaction* m = manual_compaction_; - if (!status.ok()) { - m->done = true; - } - if (!m->done) { - // We only compacted part of the requested range. Update *m - // to the range that is left to be compacted. - m->tmp_storage = manual_end; - m->begin = &m->tmp_storage; + m->being_sched = false; + if (m->compaction_conflict != kManualCompactConflict) { // PickRange success + if (!status.ok()) { + m->done = true; + } + if (!m->done) { + // We only compacted part of the requested range. Update *m + // to the range that is left to be compacted. + m->tmp_storage = manual_end; + m->begin = &m->tmp_storage; + } + manual_compaction_ = NULL; } - manual_compaction_ = NULL; + } else if (manual_compaction_ != NULL) { // non manual compact + ManualCompaction* m = manual_compaction_; + m->compaction_conflict = kManualCompactWakeup;// Wakeup here, ManualCompact thread check it + Log(options_.info_log, + "[%s] Wakeup Manual compaction at level-%d from %s .. %s", + dbname_.c_str(), m->level, + (m->begin ? m->begin->DebugString().c_str() : "(begin)"), + (m->end ? m->end->DebugString().c_str() : "(end)")); } return status; } @@ -1121,6 +1212,7 @@ void DBImpl::CleanupCompaction(CompactionState* compact) { for (size_t i = 0; i < compact->outputs.size(); i++) { const CompactionState::Output& out = compact->outputs[i]; pending_outputs_.erase(BuildFullFileNumber(dbname_, out.number)); + Log(options_.info_log, "[%s] erase pending_output #%lu", dbname_.c_str(), out.number); } delete compact; } @@ -1138,6 +1230,8 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { out.smallest.Clear(); out.largest.Clear(); compact->outputs.push_back(out); + + Log(options_.info_log, "[%s] insert pending_output #%lu", dbname_.c_str(), file_number); mutex_.Unlock(); } @@ -1253,6 +1347,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { return versions_->LogAndApply(compact->compaction->edit(), &mutex_); } +// *MUST* multi thread safe Status DBImpl::DoCompactionWork(CompactionState* compact) { const uint64_t start_micros = env_->NowMicros(); int64_t imm_micros = 0; // Micros spent doing imm_ compactions @@ -1302,8 +1397,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { if (has_imm_.NoBarrier_Load() != NULL) { const uint64_t imm_start = env_->NowMicros(); mutex_.Lock(); - if (imm_ != NULL) { - CompactMemTable(); + if (imm_ && !imm_->BeingFlushed()) { + CompactMemTable(); // no need check failure, because imm_ not null if dump fail. bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary } mutex_.Unlock(); @@ -1471,7 +1566,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { } VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, - "[%s] compacted to: %s", dbname_.c_str(), versions_->LevelSummary(&tmp)); + "[%s] compacted to: %s, compacte stat %s", dbname_.c_str(), versions_->LevelSummary(&tmp), status.ToString().c_str()); return status; } @@ -1652,8 +1747,9 @@ bool DBImpl::BusyWrite() { void DBImpl::Workload(double* write_workload) { MutexLock l(&mutex_); - uint64_t timeout = 0; - double wwl = versions_->CompactionScore(&timeout); + std::vector > scores; + versions_->CompactionScore(&scores); + double wwl = scores.size() > 0? scores[0].first: 0; if (wwl >= 0) { *write_workload = wwl; } else { diff --git a/src/leveldb/db/db_impl.h b/src/leveldb/db/db_impl.h index 05b1ae623..db9d3f332 100644 --- a/src/leveldb/db/db_impl.h +++ b/src/leveldb/db/db_impl.h @@ -96,6 +96,12 @@ class DBImpl : public DB { friend class DBTable; struct CompactionState; struct Writer; + struct CompactionTask { + int64_t id; // compaction thread id + double score; // compaction score + uint64_t timeout; // compaction task delay time + DBImpl* db; + }; Iterator* NewInternalIterator(const ReadOptions&, SequenceNumber* latest_snapshot); @@ -110,10 +116,10 @@ class DBImpl : public DB { // Compact the in-memory write buffer to disk. Switches to a new // log-file/memtable and writes a new descriptor iff successful. - Status CompactMemTable() + Status CompactMemTable(bool* sched_idle = NULL) EXCLUSIVE_LOCKS_REQUIRED(mutex_); - Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base) + Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base, uint64_t* number = NULL) EXCLUSIVE_LOCKS_REQUIRED(mutex_); Status MakeRoomForWrite(bool force /* compact even if there is room? */) @@ -121,8 +127,8 @@ class DBImpl : public DB { void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); static void BGWork(void* db); - void BackgroundCall(); - Status BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); + void BackgroundCall(CompactionTask* task); + Status BackgroundCompaction(bool* sched_idle) EXCLUSIVE_LOCKS_REQUIRED(mutex_); void CleanupCompaction(CompactionState* compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_); Status DoCompactionWork(CompactionState* compact) @@ -196,18 +202,24 @@ class DBImpl : public DB { std::set pending_outputs_; // Has a background compaction been scheduled or is running? - bool bg_compaction_scheduled_; - double bg_compaction_score_; - uint64_t bg_compaction_timeout_; - int64_t bg_schedule_id_; + std::vector bg_compaction_tasks_; + std::vector bg_compaction_score_; + std::vector bg_schedule_id_; // Information for a manual compaction + enum ManualCompactState { + kManualCompactIdle, // manual compact inited + kManualCompactConflict, // manual compact run simultaneously + kManualCompactWakeup, // restart delay compact task + }; struct ManualCompaction { int level; bool done; + bool being_sched; const InternalKey* begin; // NULL means beginning of key range const InternalKey* end; // NULL means end of key range InternalKey tmp_storage; // Used to keep track of compaction progress + ManualCompactState compaction_conflict; // 0 == idle, 1 == conflict, 2 == wake }; ManualCompaction* manual_compaction_; diff --git a/src/leveldb/db/db_table.cc b/src/leveldb/db/db_table.cc index d9a4a725c..b247cd112 100644 --- a/src/leveldb/db/db_table.cc +++ b/src/leveldb/db/db_table.cc @@ -311,22 +311,6 @@ Status DBTable::Init() { uint32_t i = *it; DBImpl* impl = lg_list_[i]; s = impl->RecoverLastDumpToLevel0(lg_edits[i]); - - // LogAndApply to lg's manifest - if (s.ok()) { - MutexLock lock(&impl->mutex_); - s = impl->versions_->LogAndApply(lg_edits[i], &impl->mutex_); - if (s.ok()) { - impl->DeleteObsoleteFiles(); - impl->MaybeScheduleCompaction(); - } else { - Log(options_.info_log, "[%s] Fail to modify manifest of lg %d", - dbname_.c_str(), - i); - } - } else { - Log(options_.info_log, "[%s] Fail to dump log to level 0", dbname_.c_str()); - } delete lg_edits[i]; } @@ -936,7 +920,6 @@ Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit, } } delete file; - return status; } diff --git a/src/leveldb/db/memtable.cc b/src/leveldb/db/memtable.cc index c9f284110..ddee41b1d 100644 --- a/src/leveldb/db/memtable.cc +++ b/src/leveldb/db/memtable.cc @@ -26,6 +26,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, CompactStrategyFactory* com : last_seq_(0), comparator_(cmp), refs_(0), + being_flushed_(false), table_(comparator_, &arena_), empty_(true), compact_strategy_factory_(compact_strategy_factory) { diff --git a/src/leveldb/db/memtable.h b/src/leveldb/db/memtable.h index ba608550e..a2a1a073a 100644 --- a/src/leveldb/db/memtable.h +++ b/src/leveldb/db/memtable.h @@ -79,6 +79,13 @@ class MemTable { empty_ = false; } + bool BeingFlushed() { return being_flushed_;} + void SetBeingFlushed(bool flag) { + assert(flag ? !being_flushed_ + : being_flushed_); + being_flushed_ = flag; + } + virtual ~MemTable(); protected: @@ -97,6 +104,7 @@ class MemTable { KeyComparator comparator_; int refs_; + bool being_flushed_; Arena arena_; Table table_; diff --git a/src/leveldb/db/version_edit.h b/src/leveldb/db/version_edit.h index 0c64728d0..603ad41ac 100644 --- a/src/leveldb/db/version_edit.h +++ b/src/leveldb/db/version_edit.h @@ -33,6 +33,7 @@ struct FileMetaData { InternalKey largest; // Largest internal key served by table bool smallest_fake; // smallest is not real, have out-of-range keys bool largest_fake; // largest is not real, have out-of-range keys + bool being_compacted; // Is this file undergoing compaction? FileMetaData() : refs(0), @@ -44,7 +45,8 @@ struct FileMetaData { file_size(0), data_size(0), smallest_fake(false), - largest_fake(false) { } + largest_fake(false), + being_compacted(false) { } }; class VersionEdit { diff --git a/src/leveldb/db/version_set.cc b/src/leveldb/db/version_set.cc index 088acd090..73a6e42c9 100644 --- a/src/leveldb/db/version_set.cc +++ b/src/leveldb/db/version_set.cc @@ -70,6 +70,15 @@ static int64_t TotalFileSize(const std::vector& files) { } return sum; } +static int64_t TotalFileSizeNotBeingCompacted(const std::vector& files) { + int64_t sum = 0; + for (size_t i = 0; i < files.size(); i++) { + if (!files[i]->being_compacted) { + sum += files[i]->file_size; + } + } + return sum; +} Version::~Version() { assert(refs_ == 0); @@ -818,6 +827,7 @@ class VersionSetBuilder { FileMetaData* f = new FileMetaData(f_new); f->refs = 1; + f->being_compacted = false; if (f->data_size == 0 && !f->smallest_fake && !f->largest_fake) { // Make sure this is a new file generated by compaction. @@ -1014,7 +1024,18 @@ void VersionSet::AppendVersion(Version* v) { v->next_->prev_ = v; } -Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { +// multi thread safe +// Information kept for every waiting manifest writer +struct VersionSet::ManifestWriter { + Status status; + VersionEdit* edit; + bool done; + port::CondVar cv; + + explicit ManifestWriter(port::Mutex* mu) : done(false), cv(mu) { } +}; +void VersionSet::LogAndApplyHelper(VersionSetBuilder* builder, + VersionEdit* edit) { if (edit->has_log_number_) { assert(edit->log_number_ >= log_number_); assert(edit->log_number_ < next_file_number_); @@ -1036,13 +1057,27 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { edit->SetLastSequence(last_sequence_); } + builder->Apply(edit); +} +Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { + mu->AssertHeld(); + // multi write control, do not batch edit write, but multi thread safety + ManifestWriter w(mu); + w.edit = edit; + manifest_writers_.push_back(&w); + while (!w.done && &w != manifest_writers_.front()) { + w.cv.Wait(); + } + assert(manifest_writers_.front() == &w); + + // first manifest writer, batch edit Version* v = new Version(this); { VersionSetBuilder builder(this, current_); - builder.Apply(edit); + LogAndApplyHelper(&builder, w.edit); builder.SaveTo(v); } - Finalize(v); + Finalize(v); // recalculate new version score const uint64_t switch_interval = options_->manifest_switch_interval * 1000000UL; if (descriptor_log_ != NULL && @@ -1155,6 +1190,11 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { Log(options_->info_log, "[%s][dfs error] set force_switch_manifest", dbname_.c_str()); } + // TODO: batch write manifest finish + manifest_writers_.pop_front(); + if (!manifest_writers_.empty()) { + manifest_writers_.front()->cv.Signal(); + } return s; } @@ -1523,8 +1563,6 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) { void VersionSet::Finalize(Version* v) { // Precomputed best level for next compaction - int best_level = -1; - double best_score = -1; int best_del_level = -1; int best_del_idx = -1; int best_ttl_level = -1; @@ -1532,8 +1570,8 @@ void VersionSet::Finalize(Version* v) { int base_level = -1; for (int level = config::kNumLevels - 1; level >= 0; level--) { - double score; - if (level == 0) { + double score = 0; + if (level == 0 && level0_compactions_in_progress_.empty()) { // We treat level-0 specially by bounding the number of files // instead of number of bytes for two reasons: // @@ -1548,11 +1586,16 @@ void VersionSet::Finalize(Version* v) { // // (3) More level0 files means write hotspot. // We give lower score to avoid too much level0 compaction. - score = sqrt(v->files_[level].size() / - static_cast(config::kL0_CompactionTrigger)); - } else { + if (v->files_[level].size() <= (size_t)options_->slow_down_level0_score_limit) { + score = v->files_[level].size() / + static_cast(config::kL0_CompactionTrigger); + } else { + score = sqrt(v->files_[level].size() / + static_cast(config::kL0_CompactionTrigger)); + } + } else if (level > 0) { // Compute the ratio of current size to size limit. - const uint64_t level_bytes = TotalFileSize(v->files_[level]); + const uint64_t level_bytes = TotalFileSizeNotBeingCompacted(v->files_[level]); score = static_cast(level_bytes) / MaxBytesForLevel(level, options_->sst_size); } @@ -1562,16 +1605,15 @@ void VersionSet::Finalize(Version* v) { base_level = level; } - // size compaction does not allow trigger by base level - if ((score > best_score) && (level < config::kNumLevels - 1)) { - best_level = level; - best_score = score; + if (level < config::kNumLevels - 1) { + v->compaction_level_[level] = level; + v->compaction_score_[level] = (score < 1.0) ? 0: score; } for (size_t i = 0; i < v->files_[level].size(); i++) { FileMetaData* f = v->files_[level][i]; // del compaction does not allow trigger by base level - if ((level > 0) && (level < base_level) && + if ((!f->being_compacted) && (level > 0) && (level < base_level) && (f->del_percentage > options_->del_percentage) && (best_del_level < 0 || v->files_[best_del_level][best_del_idx]->del_percentage < f->del_percentage)) { @@ -1580,7 +1622,7 @@ void VersionSet::Finalize(Version* v) { } // ttl compaction can trigger in base level - if ((f->check_ttl_ts > 0) && + if ((!f->being_compacted) && (f->check_ttl_ts > 0) && (best_ttl_level < 0 || v->files_[best_ttl_level][best_ttl_idx]->check_ttl_ts > f->check_ttl_ts)) { best_ttl_level = level; @@ -1589,30 +1631,44 @@ void VersionSet::Finalize(Version* v) { } } - v->compaction_level_ = best_level; - v->compaction_score_ = best_score; + // sort all the levels based on their score. Higher scores get listed + // first. Use bubble sort because the number of entries are small. + for (int i = 0; i < config::kNumLevels - 2; i++) { + for (int j = i + 1; j < config::kNumLevels - 1; j++) { + if (v->compaction_score_[i] < v->compaction_score_[j]) { + int level = v->compaction_level_[i]; + double score = v->compaction_score_[i]; + v->compaction_level_[i] = v->compaction_level_[j]; + v->compaction_score_[i] = v->compaction_score_[j]; + v->compaction_level_[j] = level; + v->compaction_score_[j] = score; + } + } + } + if (best_del_level >= 0) { v->del_trigger_compact_ = v->files_[best_del_level][best_del_idx]; v->del_trigger_compact_level_ = best_del_level; Log(options_->info_log, - "[%s] del_strategy(current), level %d, num #%lu, file_size %lu, del_p %lu\n", - dbname_.c_str(), - v->del_trigger_compact_level_, - (v->del_trigger_compact_->number) & 0xffffffff, - v->del_trigger_compact_->file_size, - v->del_trigger_compact_->del_percentage); + "[%s] del_strategy(current), level %d, num #%lu, file_size %lu, del_p %lu\n", + dbname_.c_str(), + v->del_trigger_compact_level_, + (v->del_trigger_compact_->number) & 0xffffffff, + v->del_trigger_compact_->file_size, + v->del_trigger_compact_->del_percentage); } + if (best_ttl_level >= 0) { v->ttl_trigger_compact_ = v->files_[best_ttl_level][best_ttl_idx]; v->ttl_trigger_compact_level_ = best_ttl_level; Log(options_->info_log, - "[%s] ttl_strategy(current), level %d, num #%lu, file_size %lu, ttl_p %lu, check_ts %lu\n", - dbname_.c_str(), - v->ttl_trigger_compact_level_, - (v->ttl_trigger_compact_->number) & 0xffffffff, - v->ttl_trigger_compact_->file_size, - v->ttl_trigger_compact_->ttl_percentage, - v->ttl_trigger_compact_->check_ttl_ts); + "[%s] ttl_strategy(current), level %d, num #%lu, file_size %lu, ttl_p %lu, check_ts %lu\n", + dbname_.c_str(), + v->ttl_trigger_compact_level_, + (v->ttl_trigger_compact_->number) & 0xffffffff, + v->ttl_trigger_compact_->file_size, + v->ttl_trigger_compact_->ttl_percentage, + v->ttl_trigger_compact_->check_ttl_ts); } } @@ -1854,97 +1910,217 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) { return result; } +void VersionSet::PrintFilesInCompaction(const std::vector& inputs) { + char buf[30]; + std::string fstr = "file: "; + for (size_t i = 0; i < inputs.size(); i++) { + FileMetaData* f = inputs[i]; + if (f->being_compacted) { + snprintf(buf, sizeof(buf), "%lu ", f->number); + fstr.append(buf); + break; + } + } + Log(options_->info_log, "[%s] test mark level [%s] bening compact.", dbname_.c_str(), + fstr.c_str()); + return; +} +bool VersionSet::FilesInCompaction(const std::vector& inputs) { + for (size_t i = 0; i < inputs.size(); i++) { + FileMetaData* f = inputs[i]; + if (f->being_compacted) { + return true; + } + } + return false; +} +void VersionSet::PrintRangeInCompaction(const InternalKey* smallest, const InternalKey* largest, int level) { + std::vector inputs; + assert(level < config::kNumLevels); + current_->GetOverlappingInputs(level, smallest, largest, &inputs); + PrintFilesInCompaction(inputs); + return; +} +bool VersionSet::RangeInCompaction(const InternalKey* smallest, const InternalKey* largest, int level) { + std::vector inputs; + assert(level < config::kNumLevels); + current_->GetOverlappingInputs(level, smallest, largest, &inputs); + return FilesInCompaction(inputs); +} +// Note: +// 1) if f in level1 being compacted, level0 may be blocked; +// 2) compacting pointer may cause other f in the same level to be blocked. +bool VersionSet::PickCompactionBySize(int level, std::vector* inputs) { + inputs->clear(); + std::vector candidate; + // Pick the first file that comes after compact_pointer_[level] + for (size_t i = 0; i < current_->files_[level].size(); i++) { + FileMetaData* f = current_->files_[level][i]; + if (f->being_compacted) { + //Log(options_->info_log, "[%s] PickCompactionBySize, level %d, f[%s, %s] being_compact %d\n", + // dbname_.c_str(), level, + // f->smallest.Encode().ToString().c_str(), f->largest.Encode().ToString().c_str(), + // f->being_compacted); + continue; + } + if (!compact_pointer_[level].empty() && + icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) <= 0) { + //Log(options_->info_log, "[%s] PickCompactionBySize, skip by compact_pointer_[%d] %s, f[%s, %s] being_compacted %d\n", + // dbname_.c_str(), level, compact_pointer_[level].c_str(), + // f->smallest.Encode().ToString().c_str(), f->largest.Encode().ToString().c_str(), + // f->being_compacted); + if (!RangeInCompaction(&f->smallest, &f->largest, level + 1)) { + candidate.push_back(f); + } + continue; + } + if (RangeInCompaction(&f->smallest, &f->largest, level + 1)) { + //PrintRangeInCompaction(&f->smallest, &f->largest, level + 1); + continue; + } + inputs->push_back(f); + break; + } + if (inputs->empty()) { + // Wrap-around to the beginning of the key space + FileMetaData* f = current_->files_[level][0]; + if (!f->being_compacted && !RangeInCompaction(&f->smallest, &f->largest, level + 1)) { + inputs->push_back(f); + } + //Log(options_->info_log, "[%s] PickCompactBySize, wrap-arroud level %d, f[%s, %s] being_compacted %d\n", + // dbname_.c_str(), level, + // f->smallest.Encode().ToString().c_str(), f->largest.Encode().ToString().c_str(), + // f->being_compacted); + //PrintRangeInCompaction(&f->smallest, &f->largest, level + 1); + } + if (inputs->empty() && candidate.size() > 0) { + inputs->push_back(candidate[candidate.size() - 1]); + } + return !inputs->empty(); +} + // timeout for micro_second -double VersionSet::CompactionScore(uint64_t* timeout) const { - *timeout = 0; +void VersionSet::CompactionScore(std::vector >* scores) { uint64_t ts = env_->NowMicros(); Version* v = current_; - if (v->compaction_score_ >= 1) { - return v->compaction_score_; - } else if (v->del_trigger_compact_ != NULL && - v->del_trigger_compact_->del_percentage > options_->del_percentage) { - return (double)(v->del_trigger_compact_->del_percentage / 100.0); - } else if (v->ttl_trigger_compact_ != NULL && - ts >= v->ttl_trigger_compact_->check_ttl_ts) { - return (double)((v->ttl_trigger_compact_->ttl_percentage + 1) / 100.0); - } else if (v->file_to_compact_ != NULL) { - return 0.1f; + for (size_t i = 0; i < v->compaction_score_.size(); i++) { + if (v->compaction_score_[i] >= 1) { + scores->push_back(std::pair(v->compaction_score_[i], 0)); + } + } + if (v->del_trigger_compact_ != NULL && + !v->del_trigger_compact_->being_compacted && + v->del_trigger_compact_->del_percentage > options_->del_percentage) { + scores->push_back(std::pair( + (double)(v->del_trigger_compact_->del_percentage / 100.0), 0)); + } + if (v->ttl_trigger_compact_ != NULL && + !v->ttl_trigger_compact_->being_compacted && + ts >= v->ttl_trigger_compact_->check_ttl_ts) { + scores->push_back(std::pair( + (double)((v->ttl_trigger_compact_->ttl_percentage + 1) / 100.0), 0)); + } + if (v->file_to_compact_ != NULL && + !v->file_to_compact_->being_compacted) { + scores->push_back(std::pair(0.1, 0)); } // delay task if (v->ttl_trigger_compact_ != NULL && + !v->ttl_trigger_compact_->being_compacted && ts < v->ttl_trigger_compact_->check_ttl_ts) { - *timeout = (v->ttl_trigger_compact_->check_ttl_ts - ts + 1000000) / 1000; - return (double)((v->ttl_trigger_compact_->ttl_percentage + 1) / 100.0); + scores->push_back(std::pair( + (double)((v->ttl_trigger_compact_->ttl_percentage + 1) / 100.0), + ((v->ttl_trigger_compact_->check_ttl_ts - ts + 1000000) / 1000))); } - - // nothing to do - return -1.0; } Compaction* VersionSet::PickCompaction() { - Compaction* c; - int level; + int level = -1; + std::vector inputs; + bool set_non_trivial = false; // We prefer compactions triggered by too much data in a level over // the compactions triggered by seeks. - const bool size_compaction = (current_->compaction_score_ >= 1); + const bool size_compaction = (current_->compaction_score_[0] >= 1); const bool seek_compaction = (current_->file_to_compact_ != NULL); const bool del_compaction = (current_->del_trigger_compact_ != NULL); const bool ttl_compaction = (current_->ttl_trigger_compact_ != NULL); - if (size_compaction) { - level = current_->compaction_level_; - assert(level >= 0); - assert(level+1 < config::kNumLevels); - c = new Compaction(level); - // Pick the first file that comes after compact_pointer_[level] - for (size_t i = 0; i < current_->files_[level].size(); i++) { - FileMetaData* f = current_->files_[level][i]; - if (compact_pointer_[level].empty() || - icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) { - c->inputs_[0].push_back(f); + // check size compaction + assert(level0_compactions_in_progress_.size() <= 1); + bool skipped_l0 = false; + for (size_t li = 0; size_compaction && li < current_->compaction_score_.size(); li++) { + double score = current_->compaction_score_[li]; + level = current_->compaction_level_[li]; + assert(li == 0 || score <= current_->compaction_score_[li - 1]); + if (score >= 1) { + assert(level >= 0); + assert(level+1 < config::kNumLevels); + if (skipped_l0 && level <= 1) { + // level0 in progress and level 0 will not directly compact to level > 1 + //Log(options_->info_log, "[%s] lock level %d, conflict, score %.2f\n", + // dbname_.c_str(), level, score); + continue; + } + if (level == 0 && !level0_compactions_in_progress_.empty()) { + skipped_l0 = true; + //Log(options_->info_log, "[%s] level %d in progress, conflict, score %.2f\n", + // dbname_.c_str(), level, score); + continue; + } + if (PickCompactionBySize(level, &inputs)) { break; } + //Log(options_->info_log, "[%s] pick level %d, conflict, score %.2f\n", + // dbname_.c_str(), level, score); } - if (c->inputs_[0].empty()) { - // Wrap-around to the beginning of the key space - c->inputs_[0].push_back(current_->files_[level][0]); - } - } else if (seek_compaction) { - // compaction trigger by seek percentage - // TODO: multithread should lock it + } + + // check seek compaction + if (inputs.empty() && seek_compaction) { level = current_->file_to_compact_level_; - c = new Compaction(level); - c->inputs_[0].push_back(current_->file_to_compact_); - } else if (del_compaction) { + assert(level >= 0); + assert(level+1 < config::kNumLevels); + FileMetaData* f = current_->file_to_compact_; + if (!f->being_compacted && !RangeInCompaction(&f->smallest, &f->largest, level + 1)) { + inputs.push_back(f); + } + } + + // check del compaction + if (inputs.empty() && del_compaction) { // compaction trigger by delete tags percentage; // TODO: multithread should lock it level = current_->del_trigger_compact_level_; assert(level >= 0); assert(level+1 < config::kNumLevels); - c = new Compaction(level); - c->SetNonTrivial(true); - c->inputs_[0].push_back(current_->del_trigger_compact_); - Log(options_->info_log, + FileMetaData* f = current_->del_trigger_compact_; + if (!f->being_compacted && !RangeInCompaction(&f->smallest, &f->largest, level + 1)) { + inputs.push_back(f); + set_non_trivial = true; + Log(options_->info_log, "[%s] compact trigger by del stragety, level %d, num #%lu, file_size %lu, del_p %lu\n", dbname_.c_str(), current_->del_trigger_compact_level_, (current_->del_trigger_compact_->number) & 0xffffffff, current_->del_trigger_compact_->file_size, current_->del_trigger_compact_->del_percentage); - } else if (ttl_compaction) { + } + } + + // check ttl compaction + if (inputs.empty() && ttl_compaction) { // compaction trigger by ttl tags percentage // TODO: multithread should lock it level = current_->ttl_trigger_compact_level_; assert(level >= 0); - c = new Compaction(level); - c->SetNonTrivial(true); - c->inputs_[0].push_back(current_->ttl_trigger_compact_); - if (level == config::kNumLevels - 1) {// level in last level - c->set_output_level(level); - } - Log(options_->info_log, + FileMetaData* f = current_->ttl_trigger_compact_; + if (!f->being_compacted && + (level+1 == config::kNumLevels || !RangeInCompaction(&f->smallest, &f->largest, level + 1))) { + inputs.push_back(f); + set_non_trivial = true; + Log(options_->info_log, "[%s] compact trigger by ttl stragety, level %d, num #%lu, file_size %lu, ttl_p %lu, check_ts %lu\n", dbname_.c_str(), current_->ttl_trigger_compact_level_, @@ -1952,32 +2128,57 @@ Compaction* VersionSet::PickCompaction() { current_->ttl_trigger_compact_->file_size, current_->ttl_trigger_compact_->ttl_percentage, current_->ttl_trigger_compact_->check_ttl_ts); - } else { + } + } + if (inputs.empty()) { return NULL; } - c->input_version_ = current_; - c->input_version_->Ref(); - c->max_output_file_size_ = - MaxFileSizeForLevel(c->output_level(), current_->vset_->options_->sst_size); - + assert(inputs.size() == 1); + assert(level >= 0); // Files in level 0 may overlap each other, so pick up all overlapping ones if (level == 0) { + assert(level0_compactions_in_progress_.size() == 0); InternalKey smallest, largest; - GetRange(c->inputs_[0], &smallest, &largest); + GetRange(inputs, &smallest, &largest); // Note that the next call will discard the file we placed in // c->inputs_[0] earlier and replace it with an overlapping set // which will include the picked file. - current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]); - assert(!c->inputs_[0].empty()); + current_->GetOverlappingInputs(level, &smallest, &largest, &inputs); + GetRange(inputs, &smallest, &largest); + if (RangeInCompaction(&smallest, &largest, level + 1)) { // make sure level1 not in compaction + Log(options_->info_log, "[%s] level1 in compacting, level0 conflict\n", + dbname_.c_str()); + return NULL; + } + assert(!inputs.empty()); + assert(!FilesInCompaction(inputs)); } + + // expand inputs + Compaction* c = new Compaction(level); + c->SetNonTrivial(set_non_trivial); + c->input_version_ = current_; + c->input_version_->Ref(); // make sure compacting version will not delete + if (level == config::kNumLevels - 1) {// level in last level + c->set_output_level(level); + } + c->max_output_file_size_ = + MaxFileSizeForLevel(c->output_level(), current_->vset_->options_->sst_size); + c->inputs_[0] = inputs; SetupOtherInputs(c); // tera-specific: calculate the smallest rowkey which overlap with file not // in this compaction. SetupCompactionBoundary(c); + + // mark being compacted + c->MarkBeingCompacted(true); + if (level == 0) { + level0_compactions_in_progress_.push_back(c); + } + Finalize(current_); // reculate level score return c; } - void VersionSet::SetupOtherInputs(Compaction* c) { if (c->level() == c->output_level()) { // self level compaction, should select next level return; @@ -2008,7 +2209,10 @@ void VersionSet::SetupOtherInputs(Compaction* c) { std::vector expanded1; current_->GetOverlappingInputs(c->output_level(), &new_start, &new_limit, &expanded1); - if (expanded1.size() == c->inputs_[1].size()) { + // check expanded file wether in compacting + if ((expanded1.size() == c->inputs_[1].size()) && + !RangeInCompaction(&new_start, &new_limit, level) && + !RangeInCompaction(&new_start, &new_limit, c->output_level())) { Log(options_->info_log, "[%s] Expanding@%d %d+%d (%ld+%ld bytes) to %d+%d (%ld+%ld bytes)\n", dbname_.c_str(), @@ -2084,11 +2288,18 @@ void VersionSet::SetupCompactionBoundary(Compaction* c) { Compaction* VersionSet::CompactRange( int level, const InternalKey* begin, - const InternalKey* end) { + const InternalKey* end, bool* being_compacted) { + *being_compacted = false; std::vector inputs; current_->GetOverlappingInputs(level, begin, end, &inputs); if (inputs.empty()) { - return NULL; + return NULL; + } + + // check level0 wether in compaction + if (level == 0 && !level0_compactions_in_progress_.empty()) { + *being_compacted = true; + return NULL; } // Avoid compacting too much in one shot in case the range is large. @@ -2109,6 +2320,18 @@ Compaction* VersionSet::CompactRange( } } + // check being compacting + InternalKey smallest, largest; + GetRange(inputs, &smallest, &largest); + if (FilesInCompaction(inputs) || RangeInCompaction(&smallest, &largest, level + 1)) { + PrintFilesInCompaction(inputs); + PrintRangeInCompaction(&smallest, &largest, level + 1); + Log(options_->info_log, "[%s] RangeCompaction : %s...%s, level: %d or %d, in compaction", + dbname_.c_str(), smallest.DebugString().c_str(), largest.DebugString().c_str(), level, level + 1); + *being_compacted = true; + return NULL; + } + Compaction* c = new Compaction(level); c->input_version_ = current_; c->input_version_->Ref(); @@ -2119,9 +2342,28 @@ Compaction* VersionSet::CompactRange( // tera-specific: calculate the smallest rowkey which overlap with file not // in this compaction. SetupCompactionBoundary(c); + + // mark being compacted + c->MarkBeingCompacted(true); + if (level == 0) { + level0_compactions_in_progress_.push_back(c); + } + Finalize(current_); // reculate level score return c; } +void VersionSet::ReleaseCompaction(Compaction* c, Status& s) { + c->MarkBeingCompacted(false); + assert(level0_compactions_in_progress_.size() <= 1); + if (c->level() == 0 && level0_compactions_in_progress_[0] == c) { + level0_compactions_in_progress_.resize(0); + } + if (!s.ok()) { + Finalize(current_); + } + return; +} + Compaction::Compaction(int level) : level_(level), output_level_(level + 1), @@ -2209,6 +2451,16 @@ bool Compaction::ShouldStopBefore(const Slice& internal_key) { } } +void Compaction::MarkBeingCompacted(bool flag) { + for (size_t i = 0; i < 2; i++) { + for (size_t j = 0; j < inputs_[i].size(); j++) { + assert(flag ? !inputs_[i][j]->being_compacted + : inputs_[i][j]->being_compacted); + inputs_[i][j]->being_compacted = flag; + } + } +} + void Compaction::ReleaseInputs() { if (input_version_ != NULL) { input_version_->Unref(); diff --git a/src/leveldb/db/version_set.h b/src/leveldb/db/version_set.h index 5a01d8dba..069311c4f 100644 --- a/src/leveldb/db/version_set.h +++ b/src/leveldb/db/version_set.h @@ -19,6 +19,7 @@ #ifndef STORAGE_LEVELDB_DB_VERSION_SET_H_ #define STORAGE_LEVELDB_DB_VERSION_SET_H_ +#include #include #include #include @@ -147,8 +148,8 @@ class Version { // Level that should be compacted next and its compaction score. // Score < 1 means compaction is not strictly needed. These fields // are initialized by Finalize(). - double compaction_score_; - int compaction_level_; + std::vector compaction_score_; + std::vector compaction_level_; explicit Version(VersionSet* vset) : vset_(vset), next_(this), prev_(this), refs_(0), @@ -157,9 +158,13 @@ class Version { ttl_trigger_compact_(NULL), ttl_trigger_compact_level_(-1), del_trigger_compact_(NULL), - del_trigger_compact_level_(-1), - compaction_score_(-1), - compaction_level_(-1) { + del_trigger_compact_level_(-1) { + compaction_score_.resize(config::kNumLevels - 1); + compaction_level_.resize(config::kNumLevels - 1); + for (size_t i = 0; i < config::kNumLevels - 1; i++) { + compaction_score_[i] = -1.0; + compaction_level_[i] = -1; + } } ~Version(); @@ -182,6 +187,8 @@ class VersionSet { // current version. Will release *mu while actually writing to the file. // REQUIRES: *mu is held on entry. // REQUIRES: no other thread concurrently calls LogAndApply() + void LogAndApplyHelper(VersionSetBuilder* builder, + VersionEdit* edit); Status LogAndApply(VersionEdit* edit, port::Mutex* mu) EXCLUSIVE_LOCKS_REQUIRED(mu); @@ -231,7 +238,8 @@ class VersionSet { // being compacted, or zero if there is no such log file. uint64_t PrevLogNumber() const { return prev_log_number_; } - double CompactionScore(uint64_t* timeout) const; + // + void CompactionScore(std::vector >* scores); // Pick level and inputs for a new compaction. // Returns NULL if there is no compaction to be done. // Otherwise returns a pointer to a heap-allocated object that @@ -245,7 +253,10 @@ class VersionSet { Compaction* CompactRange( int level, const InternalKey* begin, - const InternalKey* end); + const InternalKey* end, bool* being_compacted); + + // release file's being_compacted flag, and release level0's lock + void ReleaseCompaction(Compaction* c, Status& s); // Return the maximum overlapping data (in bytes) at next level for any // file at a level >= 1. @@ -275,6 +286,7 @@ class VersionSet { friend class Compaction; friend class Version; friend class VersionSetBuilder; + struct ManifestWriter; void Finalize(Version* v); @@ -301,6 +313,13 @@ class VersionSet { bool ModifyFileSize(FileMetaData* f); + // milti thread compaction relatively + void PrintFilesInCompaction(const std::vector& inputs); + bool FilesInCompaction(const std::vector& inputs); + void PrintRangeInCompaction(const InternalKey* smallest, const InternalKey* largest, int level); + bool RangeInCompaction(const InternalKey* smallest, const InternalKey* largest, int level); + bool PickCompactionBySize(int level, std::vector* inputs); + Env* const env_; const std::string dbname_; const Options* const options_; @@ -316,6 +335,8 @@ class VersionSet { uint64_t log_number_; uint64_t prev_log_number_; // 0 or backing store for memtable being compacted + std::deque manifest_writers_; + // Opened lazily WritableFile* descriptor_file_; log::Writer* descriptor_log_; @@ -325,6 +346,7 @@ class VersionSet { // Per-level key at which the next compaction at that level should start. // Either an empty string, or a valid InternalKey. std::string compact_pointer_[config::kNumLevels]; + std::vector level0_compactions_in_progress_; // No copying allowed VersionSet(const VersionSet&); @@ -372,6 +394,8 @@ class Compaction { // before processing "internal_key". bool ShouldStopBefore(const Slice& internal_key); + void MarkBeingCompacted(bool flag); + // Release the input version for the compaction, once the compaction // is successful. void ReleaseInputs(); @@ -384,6 +408,7 @@ class Compaction { private: friend class Version; friend class VersionSet; + friend class DBImpl; explicit Compaction(int level); diff --git a/src/leveldb/include/leveldb/options.h b/src/leveldb/include/leveldb/options.h index be78d0d30..adfcb44ad 100644 --- a/src/leveldb/include/leveldb/options.h +++ b/src/leveldb/include/leveldb/options.h @@ -310,13 +310,21 @@ struct Options { bool ignore_corruption_in_open; // Statistic: By default, if 10% entry timeout, will trigger compaction - // Default: 10 % + // Default: 99 % uint64_t ttl_percentage; // Statistic: delete tag's percentage in sst - // Default: 10 % + // Default: 20 % uint64_t del_percentage; + // Max thread alloc for lg's compaction + // Default: 5 + uint32_t max_background_compactions; + + // if level0's file num >= limit, use sqrt slow down level score + // Default: 30 + int slow_down_level0_score_limit; + // Create an Options object with default values for all fields. Options(); }; diff --git a/src/leveldb/util/options.cc b/src/leveldb/util/options.cc index ecd11b57e..e54bd447c 100644 --- a/src/leveldb/util/options.cc +++ b/src/leveldb/util/options.cc @@ -53,7 +53,9 @@ Options::Options() disable_wal(false), ignore_corruption_in_open(false), ttl_percentage(99), - del_percentage(20) { + del_percentage(20), + max_background_compactions(5), + slow_down_level0_score_limit(30) { } } // namespace leveldb