diff --git a/src/leveldb/db/db_impl.cc b/src/leveldb/db/db_impl.cc index 2d00657ee..fdfd965bb 100644 --- a/src/leveldb/db/db_impl.cc +++ b/src/leveldb/db/db_impl.cc @@ -146,9 +146,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) log_(NULL), bound_log_size_(0), tmp_batch_(new WriteBatch), - bg_compaction_scheduled_(false), - bg_compaction_score_(0), - bg_schedule_id_(0), + bg_compaction_scheduled_(0), manual_compaction_(NULL), consecutive_compaction_errors_(0), flush_on_destroy_(false) { @@ -173,9 +171,6 @@ Status DBImpl::Shutdown1() { shutting_down_.Release_Store(this); // Any non-NULL value is ok Log(options_.info_log, "[%s] wait bg compact finish", dbname_.c_str()); - if (bg_compaction_scheduled_) { - env_->ReSchedule(bg_schedule_id_, kDumpMemTableUrgentScore); - } while (bg_compaction_scheduled_) { bg_cv_.Wait(); } @@ -293,12 +288,19 @@ void DBImpl::MaybeIgnoreError(Status* s) const { } void DBImpl::DeleteObsoleteFiles() { + mutex_.AssertHeld(); if (!bg_error_.ok()) { // After a background error, we don't know whether a new version may // or may not have been committed, so we cannot safely garbage collect. return; } + // check filesystem, and then check pending_outputs_ + std::vector filenames; + mutex_.Unlock(); + env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose + mutex_.Lock(); + // Make a set of all of the live files std::set live = pending_outputs_; versions_->AddLiveFiles(&live); @@ -308,11 +310,6 @@ void DBImpl::DeleteObsoleteFiles() { Log(options_.info_log, "[%s] try DeleteObsoleteFiles, total live file num: %llu\n", dbname_.c_str(), static_cast(live.size())); - - std::vector filenames; - mutex_.Unlock(); - env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose - mutex_.Lock(); uint64_t number; FileType type; for (size_t i = 0; i < filenames.size(); i++) { @@ -524,11 +521,16 @@ Status DBImpl::Recover(VersionEdit* edit) { } Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, - Version* base) { + Version* base, uint64_t* number) { mutex_.AssertHeld(); const uint64_t start_micros = env_->NowMicros(); FileMetaData meta; - meta.number = BuildFullFileNumber(dbname_, versions_->NewFileNumber()); + if (number) { + *number = BuildFullFileNumber(dbname_, versions_->NewFileNumber()); + meta.number = *number; + } else { + meta.number = BuildFullFileNumber(dbname_, versions_->NewFileNumber()); + } pending_outputs_.insert(meta.number); Iterator* iter = mem->NewIterator(); Log(options_.info_log, "[%s] Level-0 table #%u: started", @@ -577,15 +579,22 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, return s; } +// multithread safe Status DBImpl::CompactMemTable() { mutex_.AssertHeld(); assert(imm_ != NULL); + Status s; + if (imm_->BeingFlushed()) { + return s; + } + imm_->SetBeingFlushed(true); // Save the contents of the memtable as a new Table VersionEdit edit; + uint64_t number; Version* base = versions_->current(); base->Ref(); - Status s = WriteLevel0Table(imm_, &edit, base); + s = WriteLevel0Table(imm_, &edit, base, &number); base->Unref(); if (s.ok() && shutting_down_.Acquire_Load()) { @@ -594,6 +603,7 @@ Status DBImpl::CompactMemTable() { // Replace immutable memtable with the generated Table if (s.ok()) { + pending_outputs_.insert(number); // LogAndApply donot holds lock, so use pending_outputs_ to make sure new file will not be deleted edit.SetPrevLogNumber(0); edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed if (imm_->GetLastSequence()) { @@ -602,6 +612,9 @@ Status DBImpl::CompactMemTable() { Log(options_.info_log, "[%s] CompactMemTable SetLastSequence %lu", dbname_.c_str(), edit.GetLastSequence()); s = versions_->LogAndApply(&edit, &mutex_); + pending_outputs_.erase(number); + } else { + // TODO: dump memtable error, delete temp file } if (s.ok()) { @@ -609,6 +622,9 @@ Status DBImpl::CompactMemTable() { imm_->Unref(); imm_ = NULL; has_imm_.Release_Store(NULL); + } else { + // imm dump fail, reset being flush flag + imm_->SetBeingFlushed(false); } return s; @@ -640,6 +656,8 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { ManualCompaction manual; manual.level = level; manual.done = false; + manual.being_sched = false; + manual.compaction_conflict = kManualCompactIdle; if (begin == NULL) { manual.begin = NULL; } else { @@ -658,6 +676,8 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { if (manual_compaction_ == NULL) { // Idle manual_compaction_ = &manual; MaybeScheduleCompaction(); + } else if (manual_compaction_->compaction_conflict == kManualCompactWakeup) { + MaybeScheduleCompaction(); } else { // Running either my compaction or another compaction. bg_cv_.Wait(); } @@ -730,55 +750,69 @@ void DBImpl::AddInheritedLiveFiles(std::vector >* live) { } Status DBImpl::RecoverInsertMem(WriteBatch* batch, VersionEdit* edit) { - MutexLock lock(&mutex_); + MutexLock lock(&mutex_); - if (recover_mem_ == NULL) { - recover_mem_ = NewMemTable(); - recover_mem_->Ref(); - } - uint64_t log_sequence = WriteBatchInternal::Sequence(batch); - uint64_t last_sequence = log_sequence + WriteBatchInternal::Count(batch) - 1; - - // if duplicate record, ignore - if (log_sequence <= recover_mem_->GetLastSequence()) { - assert (last_sequence <= recover_mem_->GetLastSequence()); - Log(options_.info_log, "[%s] duplicate record, ignore %lu ~ %lu", - dbname_.c_str(), log_sequence, last_sequence); - return Status::OK(); - } + if (recover_mem_ == NULL) { + recover_mem_ = NewMemTable(); + recover_mem_->Ref(); + } + uint64_t log_sequence = WriteBatchInternal::Sequence(batch); + uint64_t last_sequence = log_sequence + WriteBatchInternal::Count(batch) - 1; + + // if duplicate record, ignore + if (log_sequence <= recover_mem_->GetLastSequence()) { + assert (last_sequence <= recover_mem_->GetLastSequence()); + Log(options_.info_log, "[%s] duplicate record, ignore %lu ~ %lu", + dbname_.c_str(), log_sequence, last_sequence); + return Status::OK(); + } - Status status = WriteBatchInternal::InsertInto(batch, recover_mem_); - MaybeIgnoreError(&status); + Status status = WriteBatchInternal::InsertInto(batch, recover_mem_); + MaybeIgnoreError(&status); + if (!status.ok()) { + return status; + } + if (recover_mem_->ApproximateMemoryUsage() > options_.write_buffer_size) { + edit->SetLastSequence(recover_mem_->GetLastSequence()); + status = WriteLevel0Table(recover_mem_, edit, NULL); if (!status.ok()) { - return status; + // Reflect errors immediately so that conditions like full + // file-systems cause the DB::Open() to fail. + return status; } - if (recover_mem_->ApproximateMemoryUsage() > options_.write_buffer_size) { - edit->SetLastSequence(recover_mem_->GetLastSequence()); - status = WriteLevel0Table(recover_mem_, edit, NULL); - if (!status.ok()) { - // Reflect errors immediately so that conditions like full - // file-systems cause the DB::Open() to fail. - return status; - } - recover_mem_->Unref(); - recover_mem_ = NULL; - } - return status; + recover_mem_->Unref(); + recover_mem_ = NULL; + } + return status; } Status DBImpl::RecoverLastDumpToLevel0(VersionEdit* edit) { - MutexLock lock(&mutex_); - Status status; - if (recover_mem_ == NULL) { - return status; - } - if (recover_mem_->GetLastSequence() > 0) { - edit->SetLastSequence(recover_mem_->GetLastSequence()); - status = WriteLevel0Table(recover_mem_, edit, NULL); + MutexLock lock(&mutex_); + Status s; + if (recover_mem_ == NULL) { + return s; + } + if (recover_mem_->GetLastSequence() > 0) { + edit->SetLastSequence(recover_mem_->GetLastSequence()); + s = WriteLevel0Table(recover_mem_, edit, NULL); + } + recover_mem_->Unref(); + recover_mem_ = NULL; + + // LogAndApply to lg's manifest + if (s.ok()) { + s = versions_->LogAndApply(edit, &mutex_); + if (s.ok()) { + DeleteObsoleteFiles(); + MaybeScheduleCompaction(); + } else { + Log(options_.info_log, "[%s] Fail to modify manifest", + dbname_.c_str()); } - recover_mem_->Unref(); - recover_mem_ = NULL; - return status; + } else { + Log(options_.info_log, "[%s] Fail to dump log to level 0", dbname_.c_str()); + } + return s; } // end of tera-specific @@ -788,30 +822,26 @@ void DBImpl::MaybeScheduleCompaction() { if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions } else { - double score = versions_->CompactionScore(); - if (manual_compaction_ != NULL) { - score = kManualCompactScore; + std::vector scores; + if (imm_ && !imm_->BeingFlushed()) { + scores.push_back(kDumpMemTableScore); } - if (imm_ != NULL) { - score = kDumpMemTableScore; + if (manual_compaction_ && !manual_compaction_->being_sched && + (manual_compaction_->compaction_conflict != kManualCompactConflict)) { + scores.push_back(kManualCompactScore); } - if (score > 0) { - if (bg_compaction_scheduled_ && score <= bg_compaction_score_) { - // Already scheduled - } else if (bg_compaction_scheduled_) { - env_->ReSchedule(bg_schedule_id_, score); - Log(options_.info_log, "[%s] ReSchedule Compact[%ld] score= %.2f", - dbname_.c_str(), bg_schedule_id_, score); - bg_compaction_score_ = score; - } else { - bg_schedule_id_ = env_->Schedule(&DBImpl::BGWork, this, score); - Log(options_.info_log, "[%s] Schedule Compact[%ld] score= %.2f", - dbname_.c_str(), bg_schedule_id_, score); - bg_compaction_score_ = score; - bg_compaction_scheduled_ = true; - } - } else { - // No work to be done + double level_score = versions_->CompactionScore(); + if (level_score > 0) { + scores.push_back(level_score); // Get Max score + } + for (size_t i = 0; i < scores.size(); i++) { + if (bg_compaction_scheduled_ < (int)options_.max_background_compactions) { + bg_compaction_scheduled_++; + + int64_t id = env_->Schedule(&DBImpl::BGWork,this, scores[i]); + Log(options_.info_log, "[%s] Schedule Compact[%ld] score= %.2f currency %d", + dbname_.c_str(), id, scores[i], bg_compaction_scheduled_); + } } } } @@ -821,9 +851,9 @@ void DBImpl::BGWork(void* db) { } void DBImpl::BackgroundCall() { - Log(options_.info_log, "[%s] BackgroundCall", dbname_.c_str()); MutexLock l(&mutex_); - assert(bg_compaction_scheduled_); + Log(options_.info_log, "[%s] BackgroundCall Compact %d", dbname_.c_str(), bg_compaction_scheduled_); + assert(bg_compaction_scheduled_ > 0); if (!shutting_down_.Acquire_Load()) { Status s = BackgroundCompaction(); if (s.ok()) { @@ -854,7 +884,7 @@ void DBImpl::BackgroundCall() { } } - bg_compaction_scheduled_ = false; + bg_compaction_scheduled_--; // Previous compaction may have produced too many files in a level, // so reschedule another compaction if needed. @@ -869,13 +899,20 @@ Status DBImpl::BackgroundCompaction() { return CompactMemTable(); } - Compaction* c; + Status status; + Compaction* c = NULL; bool is_manual = (manual_compaction_ != NULL); InternalKey manual_end; if (is_manual) { ManualCompaction* m = manual_compaction_; - c = versions_->CompactRange(m->level, m->begin, m->end); - m->done = (c == NULL); + if (m->being_sched) { // other thread doing manual compaction or range being compacted + return status; + } + m->being_sched = true; + bool conflict = false; + c = versions_->CompactRange(m->level, m->begin, m->end, &conflict); + m->compaction_conflict = conflict? kManualCompactConflict : kManualCompactIdle; + m->done = (c == NULL && !conflict); if (c != NULL) { manual_end = c->input(0, c->num_input_files(0) - 1)->largest; } @@ -889,7 +926,6 @@ Status DBImpl::BackgroundCompaction() { c = versions_->PickCompaction(); } - Status status; if (c == NULL) { // Nothing to do } else if (!is_manual && c->IsTrivialMove()) { @@ -908,12 +944,14 @@ Status DBImpl::BackgroundCompaction() { static_cast(f->file_size), status.ToString().c_str(), versions_->LevelSummary(&tmp)); + versions_->ReleaseCompaction(c, status); } else { CompactionState* compact = new CompactionState(c); status = DoCompactionWork(compact); - CleanupCompaction(compact); + CleanupCompaction(compact); // pop pedning output, which can be deleted in DeleteObSoleteFiles() + versions_->ReleaseCompaction(c, status); // current_version has reference to c->inputs_[0,1] c->ReleaseInputs(); - DeleteObsoleteFiles(); + DeleteObsoleteFiles();// delete any sst not in version set. if comapct and flush mem handle in multi-thread, may delete new data } delete c; @@ -932,16 +970,27 @@ Status DBImpl::BackgroundCompaction() { if (is_manual) { ManualCompaction* m = manual_compaction_; - if (!status.ok()) { - m->done = true; - } - if (!m->done) { - // We only compacted part of the requested range. Update *m - // to the range that is left to be compacted. - m->tmp_storage = manual_end; - m->begin = &m->tmp_storage; + m->being_sched = false; + if (m->compaction_conflict != kManualCompactConflict) { // PickRange success + if (!status.ok()) { + m->done = true; + } + if (!m->done) { + // We only compacted part of the requested range. Update *m + // to the range that is left to be compacted. + m->tmp_storage = manual_end; + m->begin = &m->tmp_storage; + } + manual_compaction_ = NULL; } - manual_compaction_ = NULL; + } else if (manual_compaction_ != NULL) { // non manual compact + ManualCompaction* m = manual_compaction_; + m->compaction_conflict = kManualCompactWakeup;// Wakeup here, ManualCompact thread check it + Log(options_.info_log, + "[%s] Wakeup Manual compaction at level-%d from %s .. %s", + dbname_.c_str(), m->level, + (m->begin ? m->begin->DebugString().c_str() : "(begin)"), + (m->end ? m->end->DebugString().c_str() : "(end)")); } return status; } @@ -959,6 +1008,7 @@ void DBImpl::CleanupCompaction(CompactionState* compact) { for (size_t i = 0; i < compact->outputs.size(); i++) { const CompactionState::Output& out = compact->outputs[i]; pending_outputs_.erase(BuildFullFileNumber(dbname_, out.number)); + Log(options_.info_log, "[%s] erase pending_output #%lu", dbname_.c_str(), out.number); } delete compact; } @@ -976,6 +1026,8 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { out.smallest.Clear(); out.largest.Clear(); compact->outputs.push_back(out); + + Log(options_.info_log, "[%s] insert pending_output #%lu", dbname_.c_str(), file_number); mutex_.Unlock(); } @@ -1072,6 +1124,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { return versions_->LogAndApply(compact->compaction->edit(), &mutex_); } +// *MUST* multi thread safe Status DBImpl::DoCompactionWork(CompactionState* compact) { const uint64_t start_micros = env_->NowMicros(); int64_t imm_micros = 0; // Micros spent doing imm_ compactions @@ -1276,7 +1329,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { } VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, - "[%s] compacted to: %s", dbname_.c_str(), versions_->LevelSummary(&tmp)); + "[%s] compacted to: %s, compacte stat %s", dbname_.c_str(), versions_->LevelSummary(&tmp), status.ToString().c_str()); return status; } diff --git a/src/leveldb/db/db_impl.h b/src/leveldb/db/db_impl.h index 2036de199..55d30c571 100644 --- a/src/leveldb/db/db_impl.h +++ b/src/leveldb/db/db_impl.h @@ -114,7 +114,7 @@ class DBImpl : public DB { Status CompactMemTable() EXCLUSIVE_LOCKS_REQUIRED(mutex_); - Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base) + Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base, uint64_t* number = NULL) EXCLUSIVE_LOCKS_REQUIRED(mutex_); Status MakeRoomForWrite(bool force /* compact even if there is room? */) @@ -193,17 +193,24 @@ class DBImpl : public DB { std::set pending_outputs_; // Has a background compaction been scheduled or is running? - bool bg_compaction_scheduled_; - double bg_compaction_score_; - int64_t bg_schedule_id_; + int bg_compaction_scheduled_; + std::vector bg_compaction_score_; + std::vector bg_schedule_id_; // Information for a manual compaction + enum ManualCompactState { + kManualCompactIdle, + kManualCompactConflict, + kManualCompactWakeup, + }; struct ManualCompaction { int level; bool done; + bool being_sched; const InternalKey* begin; // NULL means beginning of key range const InternalKey* end; // NULL means end of key range InternalKey tmp_storage; // Used to keep track of compaction progress + int compaction_conflict; // 0 == idle, 1 == conflict, 2 == wake }; ManualCompaction* manual_compaction_; diff --git a/src/leveldb/db/db_table.cc b/src/leveldb/db/db_table.cc index 4eb16b52e..85005334f 100644 --- a/src/leveldb/db/db_table.cc +++ b/src/leveldb/db/db_table.cc @@ -210,140 +210,124 @@ DBTable::~DBTable() { } Status DBTable::Init() { - std::vector lg_edits; - Status s; - Log(options_.info_log, "[%s] start Init()", dbname_.c_str()); - MutexLock lock(&mutex_); - - uint64_t min_log_sequence = kMaxSequenceNumber; - std::vector snapshot_sequence = options_.snapshots_sequence; - std::map rollbacks = options_.rollbacks; - for (std::set::iterator it = options_.exist_lg_list->begin(); - it != options_.exist_lg_list->end() && s.ok(); ++it) { - uint32_t i = *it; - DBImpl* impl = new DBImpl(InitOptionsLG(options_, i), - dbname_ + "/" + Uint64ToString(i)); - lg_list_.push_back(impl); - lg_edits.push_back(new VersionEdit); - for (uint32_t i = 0; i < snapshot_sequence.size(); ++i) { - impl->GetSnapshot(snapshot_sequence[i]); - } - std::map::iterator rollback_it = rollbacks.begin(); - for (; rollback_it != rollbacks.end(); ++rollback_it) { - impl->Rollback(rollback_it->first, rollback_it->second); - } - - // recover SST - Log(options_.info_log, "[%s] start Recover lg%d, last_seq= %lu", - dbname_.c_str(), i, impl->GetLastSequence()); - s = impl->Recover(lg_edits[i]); - Log(options_.info_log, "[%s] end Recover lg%d, last_seq= %lu", - dbname_.c_str(), i, impl->GetLastSequence()); - if (s.ok()) { - uint64_t last_seq = impl->GetLastSequence(); - - Log(options_.info_log, - "[%s] Recover lg %d last_log_seq= %lu", dbname_.c_str(), i, last_seq); - if (min_log_sequence > last_seq) { - min_log_sequence = last_seq; - } - if (last_sequence_ < last_seq) { - last_sequence_ = last_seq; - } - } else { - Log(options_.info_log, "[%s] fail to recover lg %d", dbname_.c_str(), i); - break; - } - } - if (!s.ok()) { - Log(options_.info_log, "[%s] fail to recover table.", dbname_.c_str()); - for (uint32_t i = 0; i != lg_list_.size(); ++i) { - delete lg_list_[i]; - } - lg_list_.clear(); - return s; - } - - Log(options_.info_log, "[%s] start GatherLogFile", dbname_.c_str()); - // recover log files - std::vector logfiles; - s = GatherLogFile(min_log_sequence + 1, &logfiles); + std::vector lg_edits; + Status s; + Log(options_.info_log, "[%s] start Init()", dbname_.c_str()); + MutexLock lock(&mutex_); + + uint64_t min_log_sequence = kMaxSequenceNumber; + std::vector snapshot_sequence = options_.snapshots_sequence; + std::map rollbacks = options_.rollbacks; + for (std::set::iterator it = options_.exist_lg_list->begin(); + it != options_.exist_lg_list->end() && s.ok(); ++it) { + uint32_t i = *it; + DBImpl* impl = new DBImpl(InitOptionsLG(options_, i), + dbname_ + "/" + Uint64ToString(i)); + lg_list_.push_back(impl); + lg_edits.push_back(new VersionEdit); + for (uint32_t i = 0; i < snapshot_sequence.size(); ++i) { + impl->GetSnapshot(snapshot_sequence[i]); + } + std::map::iterator rollback_it = rollbacks.begin(); + for (; rollback_it != rollbacks.end(); ++rollback_it) { + impl->Rollback(rollback_it->first, rollback_it->second); + } + + // recover SST + Log(options_.info_log, "[%s] start Recover lg%d, last_seq= %lu", + dbname_.c_str(), i, impl->GetLastSequence()); + s = impl->Recover(lg_edits[i]); + Log(options_.info_log, "[%s] end Recover lg%d, last_seq= %lu", + dbname_.c_str(), i, impl->GetLastSequence()); if (s.ok()) { - for (uint32_t i = 0; i < logfiles.size(); ++i) { - // If two log files have overlap sequence id, ignore records - // from old log. - uint64_t recover_limit = kMaxSequenceNumber; - if (i < logfiles.size() - 1) { - recover_limit = logfiles[i + 1]; - } - s = RecoverLogFile(logfiles[i], recover_limit, &lg_edits); - if (!s.ok()) { - Log(options_.info_log, "[%s] Fail to RecoverLogFile %ld", - dbname_.c_str(), logfiles[i]); - } - } - } else { - Log(options_.info_log, "[%s] Fail to GatherLogFile", dbname_.c_str()); - } - - Log(options_.info_log, "[%s] start RecoverLogToLevel0Table", dbname_.c_str()); - std::set::iterator it = options_.exist_lg_list->begin(); - for (; it != options_.exist_lg_list->end(); ++it) { - uint32_t i = *it; - DBImpl* impl = lg_list_[i]; - s = impl->RecoverLastDumpToLevel0(lg_edits[i]); + uint64_t last_seq = impl->GetLastSequence(); - // LogAndApply to lg's manifest - if (s.ok()) { - MutexLock lock(&impl->mutex_); - s = impl->versions_->LogAndApply(lg_edits[i], &impl->mutex_); - if (s.ok()) { - impl->DeleteObsoleteFiles(); - impl->MaybeScheduleCompaction(); - } else { - Log(options_.info_log, "[%s] Fail to modify manifest of lg %d", - dbname_.c_str(), - i); - } - } else { - Log(options_.info_log, "[%s] Fail to dump log to level 0", dbname_.c_str()); - } - delete lg_edits[i]; + Log(options_.info_log, + "[%s] Recover lg %d last_log_seq= %lu", dbname_.c_str(), i, last_seq); + if (min_log_sequence > last_seq) { + min_log_sequence = last_seq; + } + if (last_sequence_ < last_seq) { + last_sequence_ = last_seq; + } + } else { + Log(options_.info_log, "[%s] fail to recover lg %d", dbname_.c_str(), i); + break; } - - if (s.ok()) { - Log(options_.info_log, "[%s] start DeleteLogFile", dbname_.c_str()); - s = DeleteLogFile(logfiles); + } + if (!s.ok()) { + Log(options_.info_log, "[%s] fail to recover table.", dbname_.c_str()); + for (uint32_t i = 0; i != lg_list_.size(); ++i) { + delete lg_list_[i]; } - - if (s.ok() && !options_.disable_wal) { - std::string log_file_name = LogHexFileName(dbname_, last_sequence_ + 1); - s = options_.env->NewWritableFile(log_file_name, &logfile_); - if (s.ok()) { - //Log(options_.info_log, "[%s] open logfile %s", - // dbname_.c_str(), log_file_name.c_str()); - log_ = new log::AsyncWriter(logfile_, options_.log_async_mode); - } else { - Log(options_.info_log, "[%s] fail to open logfile %s", - dbname_.c_str(), log_file_name.c_str()); - } + lg_list_.clear(); + return s; + } + + Log(options_.info_log, "[%s] start GatherLogFile", dbname_.c_str()); + // recover log files + std::vector logfiles; + s = GatherLogFile(min_log_sequence + 1, &logfiles); + if (s.ok()) { + for (uint32_t i = 0; i < logfiles.size(); ++i) { + // If two log files have overlap sequence id, ignore records + // from old log. + uint64_t recover_limit = kMaxSequenceNumber; + if (i < logfiles.size() - 1) { + recover_limit = logfiles[i + 1]; + } + s = RecoverLogFile(logfiles[i], recover_limit, &lg_edits); + if (!s.ok()) { + Log(options_.info_log, "[%s] Fail to RecoverLogFile %ld", + dbname_.c_str(), logfiles[i]); + } } - + } else { + Log(options_.info_log, "[%s] Fail to GatherLogFile", dbname_.c_str()); + } + + Log(options_.info_log, "[%s] start RecoverLogToLevel0Table", dbname_.c_str()); + std::set::iterator it = options_.exist_lg_list->begin(); + for (; it != options_.exist_lg_list->end(); ++it) { + uint32_t i = *it; + DBImpl* impl = lg_list_[i]; + s = impl->RecoverLastDumpToLevel0(lg_edits[i]); + delete lg_edits[i]; + } + + if (s.ok()) { + Log(options_.info_log, "[%s] start DeleteLogFile", dbname_.c_str()); + s = DeleteLogFile(logfiles); + } + + if (s.ok() && !options_.disable_wal) { + std::string log_file_name = LogHexFileName(dbname_, last_sequence_ + 1); + s = options_.env->NewWritableFile(log_file_name, &logfile_); if (s.ok()) { - state_ = kOpened; - Log(options_.info_log, "[%s] custom compact strategy: %s, flush trigger %lu", - dbname_.c_str(), options_.compact_strategy_factory->Name(), - options_.flush_triggered_log_num); - - Log(options_.info_log, "[%s] Init() done, last_seq=%llu", dbname_.c_str(), - static_cast(last_sequence_)); + //Log(options_.info_log, "[%s] open logfile %s", + // dbname_.c_str(), log_file_name.c_str()); + log_ = new log::AsyncWriter(logfile_, options_.log_async_mode); } else { - for (uint32_t i = 0; i != lg_list_.size(); ++i) { - delete lg_list_[i]; - } - lg_list_.clear(); - } - return s; + Log(options_.info_log, "[%s] fail to open logfile %s", + dbname_.c_str(), log_file_name.c_str()); + } + } + + if (s.ok()) { + state_ = kOpened; + Log(options_.info_log, "[%s] custom compact strategy: %s, flush trigger %lu", + dbname_.c_str(), options_.compact_strategy_factory->Name(), + options_.flush_triggered_log_num); + + Log(options_.info_log, "[%s] Init() done, last_seq=%llu", dbname_.c_str(), + static_cast(last_sequence_)); + } else { + for (uint32_t i = 0; i != lg_list_.size(); ++i) { + delete lg_list_[i]; + } + lg_list_.clear(); + } + return s; } Status DBTable::Put(const WriteOptions& options, @@ -808,115 +792,113 @@ Status DBTable::GatherLogFile(uint64_t begin_num, Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit, std::vector* edit_list) { - struct LogReporter : public log::Reader::Reporter { - Env* env; - Logger* info_log; - const char* fname; - Status* status; // NULL if options_.paranoid_checks==false - virtual void Corruption(size_t bytes, const Status& s) { - Log(info_log, "%s%s: dropping %d bytes; %s", - (this->status == NULL ? "(ignoring error) " : ""), - fname, static_cast(bytes), s.ToString().c_str()); - if (this->status != NULL && this->status->ok()) *this->status = s; - } - }; - - mutex_.AssertHeld(); - - // Open the log file - std::string fname = LogHexFileName(dbname_, log_number); - SequentialFile* file; - Status status = env_->NewSequentialFile(fname, &file); - if (!status.ok()) { - MaybeIgnoreError(&status); + struct LogReporter : public log::Reader::Reporter { + Env* env; + Logger* info_log; + const char* fname; + Status* status; // NULL if options_.paranoid_checks==false + virtual void Corruption(size_t bytes, const Status& s) { + Log(info_log, "%s%s: dropping %d bytes; %s", + (this->status == NULL ? "(ignoring error) " : ""), + fname, static_cast(bytes), s.ToString().c_str()); + if (this->status != NULL && this->status->ok()) *this->status = s; + } + }; + mutex_.AssertHeld(); + + // Open the log file + std::string fname = LogHexFileName(dbname_, log_number); + SequentialFile* file; + Status status = env_->NewSequentialFile(fname, &file); + if (!status.ok()) { + MaybeIgnoreError(&status); + return status; + } + + // Create the log reader. + LogReporter reporter; + reporter.env = env_; + reporter.info_log = options_.info_log; + reporter.fname = fname.c_str(); + reporter.status = (options_.paranoid_checks ? &status : NULL); + log::Reader reader(file, &reporter, true/*checksum*/, + 0/*initial_offset*/); + Log(options_.info_log, "[%s] Recovering log #%lx, sequence limit %lu", + dbname_.c_str(), log_number, recover_limit); + + // Read all the records and add to a memtable + std::string scratch; + Slice record; + WriteBatch batch; + while (reader.ReadRecord(&record, &scratch) && status.ok()) { + if (record.size() < 12) { + reporter.Corruption(record.size(), + Status::Corruption("log record too small")); + continue; + } + WriteBatchInternal::SetContents(&batch, record); + uint64_t first_seq = WriteBatchInternal::Sequence(&batch); + uint64_t last_seq = first_seq + WriteBatchInternal::Count(&batch) - 1; + //Log(options_.info_log, "[%s] batch_seq= %lu, last_seq= %lu, count=%d", + // dbname_.c_str(), batch_seq, last_sequence_, WriteBatchInternal::Count(&batch)); + if (last_seq >= recover_limit) { + Log(options_.info_log, "[%s] exceed limit %lu, ignore %lu ~ %lu", + dbname_.c_str(), recover_limit, first_seq, last_seq); + continue; + } + + if (last_seq > last_sequence_) { + last_sequence_ = last_seq; + } + + std::vector lg_updates; + lg_updates.resize(lg_list_.size()); + std::fill(lg_updates.begin(), lg_updates.end(), (WriteBatch*)0); + bool created_new_wb = false; + if (lg_list_.size() > 1) { + status = batch.SeperateLocalityGroup(&lg_updates); + created_new_wb = true; + if (!status.ok()) { return status; + } + } else { + lg_updates[0] = (&batch); } - // Create the log reader. - LogReporter reporter; - reporter.env = env_; - reporter.info_log = options_.info_log; - reporter.fname = fname.c_str(); - reporter.status = (options_.paranoid_checks ? &status : NULL); - log::Reader reader(file, &reporter, true/*checksum*/, - 0/*initial_offset*/); - Log(options_.info_log, "[%s] Recovering log #%lx, sequence limit %lu", - dbname_.c_str(), log_number, recover_limit); - - // Read all the records and add to a memtable - std::string scratch; - Slice record; - WriteBatch batch; - while (reader.ReadRecord(&record, &scratch) && status.ok()) { - if (record.size() < 12) { - reporter.Corruption(record.size(), - Status::Corruption("log record too small")); - continue; + if (status.ok()) { + //TODO: should be multi-thread distributed + for (uint32_t i = 0; i < lg_updates.size(); ++i) { + if (lg_updates[i] == NULL) { + continue; } - WriteBatchInternal::SetContents(&batch, record); - uint64_t first_seq = WriteBatchInternal::Sequence(&batch); - uint64_t last_seq = first_seq + WriteBatchInternal::Count(&batch) - 1; - //Log(options_.info_log, "[%s] batch_seq= %lu, last_seq= %lu, count=%d", - // dbname_.c_str(), batch_seq, last_sequence_, WriteBatchInternal::Count(&batch)); - if (last_seq >= recover_limit) { - Log(options_.info_log, "[%s] exceed limit %lu, ignore %lu ~ %lu", - dbname_.c_str(), recover_limit, first_seq, last_seq); - continue; + if (last_seq <= lg_list_[i]->GetLastSequence()) { + continue; } - - if (last_seq > last_sequence_) { - last_sequence_ = last_seq; - } - - std::vector lg_updates; - lg_updates.resize(lg_list_.size()); - std::fill(lg_updates.begin(), lg_updates.end(), (WriteBatch*)0); - bool created_new_wb = false; - if (lg_list_.size() > 1) { - status = batch.SeperateLocalityGroup(&lg_updates); - created_new_wb = true; - if (!status.ok()) { - return status; - } - } else { - lg_updates[0] = (&batch); - } - - if (status.ok()) { - //TODO: should be multi-thread distributed - for (uint32_t i = 0; i < lg_updates.size(); ++i) { - if (lg_updates[i] == NULL) { - continue; - } - if (last_seq <= lg_list_[i]->GetLastSequence()) { - continue; - } - uint64_t first = WriteBatchInternal::Sequence(lg_updates[i]); - uint64_t last = first + WriteBatchInternal::Count(lg_updates[i]) - 1; - // Log(options_.info_log, "[%s] recover log batch first= %lu, last= %lu\n", - // dbname_.c_str(), first, last); - - Status lg_s = lg_list_[i]->RecoverInsertMem(lg_updates[i], (*edit_list)[i]); - if (!lg_s.ok()) { - Log(options_.info_log, "[%s] recover log fail batch first= %lu, last= %lu\n", - dbname_.c_str(), first, last); - status = lg_s; - } - } + uint64_t first = WriteBatchInternal::Sequence(lg_updates[i]); + uint64_t last = first + WriteBatchInternal::Count(lg_updates[i]) - 1; + // Log(options_.info_log, "[%s] recover log batch first= %lu, last= %lu\n", + // dbname_.c_str(), first, last); + + Status lg_s = lg_list_[i]->RecoverInsertMem(lg_updates[i], (*edit_list)[i]); + if (!lg_s.ok()) { + Log(options_.info_log, "[%s] recover log fail batch first= %lu, last= %lu\n", + dbname_.c_str(), first, last); + status = lg_s; } + } + } - if (created_new_wb) { - for (uint32_t i = 0; i < lg_updates.size(); ++i) { - if (lg_updates[i] != NULL) { - delete lg_updates[i]; - lg_updates[i] = NULL; - } - } + if (created_new_wb) { + for (uint32_t i = 0; i < lg_updates.size(); ++i) { + if (lg_updates[i] != NULL) { + delete lg_updates[i]; + lg_updates[i] = NULL; } + } } - delete file; - - return status; + } + delete file; + return status; } void DBTable::MaybeIgnoreError(Status* s) const { diff --git a/src/leveldb/db/memtable.cc b/src/leveldb/db/memtable.cc index c9f284110..ddee41b1d 100644 --- a/src/leveldb/db/memtable.cc +++ b/src/leveldb/db/memtable.cc @@ -26,6 +26,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, CompactStrategyFactory* com : last_seq_(0), comparator_(cmp), refs_(0), + being_flushed_(false), table_(comparator_, &arena_), empty_(true), compact_strategy_factory_(compact_strategy_factory) { diff --git a/src/leveldb/db/memtable.h b/src/leveldb/db/memtable.h index ba608550e..a2a1a073a 100644 --- a/src/leveldb/db/memtable.h +++ b/src/leveldb/db/memtable.h @@ -79,6 +79,13 @@ class MemTable { empty_ = false; } + bool BeingFlushed() { return being_flushed_;} + void SetBeingFlushed(bool flag) { + assert(flag ? !being_flushed_ + : being_flushed_); + being_flushed_ = flag; + } + virtual ~MemTable(); protected: @@ -97,6 +104,7 @@ class MemTable { KeyComparator comparator_; int refs_; + bool being_flushed_; Arena arena_; Table table_; diff --git a/src/leveldb/db/version_edit.h b/src/leveldb/db/version_edit.h index c01d11ff8..4d7e3e9ac 100644 --- a/src/leveldb/db/version_edit.h +++ b/src/leveldb/db/version_edit.h @@ -30,6 +30,7 @@ struct FileMetaData { InternalKey largest; // Largest internal key served by table bool smallest_fake; // smallest is not real, have out-of-range keys bool largest_fake; // largest is not real, have out-of-range keys + bool being_compacted; // Is this file undergoing compaction? FileMetaData() : refs(0), @@ -37,7 +38,8 @@ struct FileMetaData { file_size(0), data_size(0), smallest_fake(false), - largest_fake(false) { } + largest_fake(false), + being_compacted(false) { } }; class VersionEdit { diff --git a/src/leveldb/db/version_set.cc b/src/leveldb/db/version_set.cc index 7d94c69ed..77837233f 100644 --- a/src/leveldb/db/version_set.cc +++ b/src/leveldb/db/version_set.cc @@ -70,6 +70,15 @@ static int64_t TotalFileSize(const std::vector& files) { } return sum; } +static int64_t TotalFileSizeNotBeingCompacted(const std::vector& files) { + int64_t sum = 0; + for (size_t i = 0; i < files.size(); i++) { + if (!files[i]->being_compacted) { + sum += files[i]->file_size; + } + } + return sum; +} Version::~Version() { assert(refs_ == 0); @@ -986,7 +995,18 @@ void VersionSet::AppendVersion(Version* v) { v->next_->prev_ = v; } -Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { +// multi thread safe +// Information kept for every waiting manifest writer +struct VersionSet::ManifestWriter { + Status status; + VersionEdit* edit; + bool done; + port::CondVar cv; + + explicit ManifestWriter(port::Mutex* mu) : done(false), cv(mu) { } +}; +void VersionSet::LogAndApplyHelper(VersionSetBuilder* builder, + VersionEdit* edit) { if (edit->has_log_number_) { assert(edit->log_number_ >= log_number_); assert(edit->log_number_ < next_file_number_); @@ -1008,13 +1028,27 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { edit->SetLastSequence(last_sequence_); } + builder->Apply(edit); +} +Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { + mu->AssertHeld(); + // multi write control, do not batch edit write, but multi thread safety + ManifestWriter w(mu); + w.edit = edit; + manifest_writers_.push_back(&w); + while (!w.done && &w != manifest_writers_.front()) { + w.cv.Wait(); + } + assert(manifest_writers_.front() == &w); + + // first manifest writer, batch edit Version* v = new Version(this); { VersionSetBuilder builder(this, current_); - builder.Apply(edit); + LogAndApplyHelper(&builder, w.edit); builder.SaveTo(v); } - Finalize(v); + Finalize(v); // recalculate new version score const uint64_t switch_interval = options_->manifest_switch_interval * 1000000UL; if (descriptor_log_ != NULL && @@ -1112,6 +1146,11 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { Log(options_->info_log, "[%s][dfs error] set force_switch_manifest", dbname_.c_str()); } + // TODO: batch write manifest finish + manifest_writers_.pop_front(); + if (!manifest_writers_.empty()) { + manifest_writers_.front()->cv.Signal(); + } return s; } @@ -1421,12 +1460,9 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) { void VersionSet::Finalize(Version* v) { // Precomputed best level for next compaction - int best_level = -1; - double best_score = -1; - for (int level = 0; level < config::kNumLevels-1; level++) { - double score; - if (level == 0) { + double score = 0; + if (level == 0 && level0_compactions_in_progress_.empty()) { // We treat level-0 specially by bounding the number of files // instead of number of bytes for two reasons: // @@ -1441,23 +1477,37 @@ void VersionSet::Finalize(Version* v) { // // (3) More level0 files means write hotspot. // We give lower score to avoid too much level0 compaction. - score = sqrt(v->files_[level].size() / - static_cast(config::kL0_CompactionTrigger)); - } else { + if (v->files_[level].size() <= (size_t)options_->slow_down_level0_score_limit) { + score = v->files_[level].size() / + static_cast(config::kL0_CompactionTrigger); + } else { + score = sqrt(v->files_[level].size() / + static_cast(config::kL0_CompactionTrigger)); + } + } else if (level > 0) { // Compute the ratio of current size to size limit. - const uint64_t level_bytes = TotalFileSize(v->files_[level]); + const uint64_t level_bytes = TotalFileSizeNotBeingCompacted(v->files_[level]); score = static_cast(level_bytes) / MaxBytesForLevel(level, options_->sst_size); } - - if (score > best_score) { - best_level = level; - best_score = score; + v->compaction_level_[level] = level; + v->compaction_score_[level] = (score < 1.0) ? 0: score; + } + + // sort all the levels based on their score. Higher scores get listed + // first. Use bubble sort because the number of entries are small. + for (int i = 0; i < config::kNumLevels - 2; i++) { + for (int j = i + 1; j < config::kNumLevels - 1; j++) { + if (v->compaction_score_[i] < v->compaction_score_[j]) { + int level = v->compaction_level_[i]; + double score = v->compaction_score_[i]; + v->compaction_level_[i] = v->compaction_level_[j]; + v->compaction_score_[i] = v->compaction_score_[j]; + v->compaction_level_[j] = level; + v->compaction_score_[j] = score; + } } } - - v->compaction_level_ = best_level; - v->compaction_score_ = best_score; } Status VersionSet::WriteSnapshot(log::Writer* log) { @@ -1698,62 +1748,156 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) { return result; } +void VersionSet::PrintFilesInCompaction(const std::vector& inputs) { + char buf[30]; + std::string fstr = "file: "; + for (size_t i = 0; i < inputs.size(); i++) { + FileMetaData* f = inputs[i]; + if (f->being_compacted) { + snprintf(buf, sizeof(buf), "%lu ", f->number); + fstr.append(buf); + break; + } + } + Log(options_->info_log, "%s test mark level %s, bening compact", dbname_.c_str(), + fstr.c_str()); + return; +} +bool VersionSet::FilesInCompaction(const std::vector& inputs) { + for (size_t i = 0; i < inputs.size(); i++) { + FileMetaData* f = inputs[i]; + if (f->being_compacted) { + return true; + } + } + return false; +} +void VersionSet::PrintRangeInCompaction(const InternalKey* smallest, const InternalKey* largest, int level) { + std::vector inputs; + assert(level < config::kNumLevels); + current_->GetOverlappingInputs(level, smallest, largest, &inputs); + PrintFilesInCompaction(inputs); + return; +} +bool VersionSet::RangeInCompaction(const InternalKey* smallest, const InternalKey* largest, int level) { + std::vector inputs; + assert(level < config::kNumLevels); + current_->GetOverlappingInputs(level, smallest, largest, &inputs); + return FilesInCompaction(inputs); +} +bool VersionSet::PickCompactionBySize(int level, std::vector* inputs) { + inputs->clear(); + // Pick the first file that comes after compact_pointer_[level] + for (size_t i = 0; i < current_->files_[level].size(); i++) { + FileMetaData* f = current_->files_[level][i]; + if (f->being_compacted) { + continue; + } + if (!compact_pointer_[level].empty() && + icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) <= 0) { + continue; + } + if (RangeInCompaction(&f->smallest, &f->largest, level + 1)) { + continue; + } + inputs->push_back(f); + break; + } + if (inputs->empty()) { + // Wrap-around to the beginning of the key space + FileMetaData* f = current_->files_[level][0]; + if (!f->being_compacted && !RangeInCompaction(&f->smallest, &f->largest, level + 1)) { + inputs->push_back(f); + } + } + return !inputs->empty(); +} Compaction* VersionSet::PickCompaction() { - Compaction* c; - int level; + int level = -1; + std::vector inputs; // We prefer compactions triggered by too much data in a level over // the compactions triggered by seeks. - const bool size_compaction = (current_->compaction_score_ >= 1); - const bool seek_compaction = (current_->file_to_compact_ != NULL); - if (size_compaction) { - level = current_->compaction_level_; - assert(level >= 0); - assert(level+1 < config::kNumLevels); - c = new Compaction(level); - - // Pick the first file that comes after compact_pointer_[level] - for (size_t i = 0; i < current_->files_[level].size(); i++) { - FileMetaData* f = current_->files_[level][i]; - if (compact_pointer_[level].empty() || - icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) { - c->inputs_[0].push_back(f); + assert(level0_compactions_in_progress_.size() <= 1); + bool skipped_l0 = false; + for (size_t li = 0; li < current_->compaction_score_.size(); li++) { + double score = current_->compaction_score_[li]; + level = current_->compaction_level_[li]; + assert(li == 0 || score <= current_->compaction_score_[li - 1]); + if (score >= 1) { + if (skipped_l0 && level <= 1) { + // level0 in progress and level 0 will not directly compact to level > 1 + continue; + } + if (level == 0 && !level0_compactions_in_progress_.empty()) { + skipped_l0 = true; + continue; + } + if (PickCompactionBySize(level, &inputs)) { break; } } - if (c->inputs_[0].empty()) { - // Wrap-around to the beginning of the key space - c->inputs_[0].push_back(current_->files_[level][0]); - } - } else if (seek_compaction) { + } + // check seek compaction + if (inputs.empty() && + (current_->file_to_compact_ != NULL)) { level = current_->file_to_compact_level_; - c = new Compaction(level); - c->inputs_[0].push_back(current_->file_to_compact_); - } else { + FileMetaData* f = current_->file_to_compact_; + if (!f->being_compacted && !RangeInCompaction(&f->smallest, &f->largest, level + 1)) { + inputs.push_back(f); + } + } + if (inputs.empty()) { return NULL; } - - c->input_version_ = current_; - c->input_version_->Ref(); - c->max_output_file_size_ = - MaxFileSizeForLevel(level + 1, current_->vset_->options_->sst_size); + assert(inputs.size() == 1); + assert(level >= 0); // Files in level 0 may overlap each other, so pick up all overlapping ones if (level == 0) { + assert(level0_compactions_in_progress_.size() == 0); InternalKey smallest, largest; - GetRange(c->inputs_[0], &smallest, &largest); + GetRange(inputs, &smallest, &largest); // Note that the next call will discard the file we placed in // c->inputs_[0] earlier and replace it with an overlapping set // which will include the picked file. - current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]); - assert(!c->inputs_[0].empty()); + current_->GetOverlappingInputs(level, &smallest, &largest, &inputs); + GetRange(inputs, &smallest, &largest); + if (RangeInCompaction(&smallest, &largest, level + 1)) { // make sure level1 not in compaction + return NULL; + } + assert(!inputs.empty()); + assert(!FilesInCompaction(inputs)); } + // expand inputs + Compaction* c = new Compaction(level); + c->input_version_ = current_; + c->input_version_->Ref(); // make sure compacting version will not delete + c->max_output_file_size_ = + MaxFileSizeForLevel(level + 1, current_->vset_->options_->sst_size); + c->inputs_[0] = inputs; SetupOtherInputs(c); + // mark being compacted + c->MarkBeingCompacted(true); + char buf[30]; + std::string fstr = "file: "; + for (size_t tmpi = 0; tmpi < 2; tmpi++) { + for (size_t tmpj = 0; tmpj < c->inputs_[tmpi].size(); tmpj++) { + snprintf(buf, sizeof(buf), "L%lu:%lu ", tmpi, + c->inputs_[tmpi][tmpj]->number); + fstr.append(buf); + } + } + Log(options_->info_log, "%s mark level %d in C%lu %s, bening compact", dbname_.c_str(), + c->level(), (unsigned long)c, fstr.c_str()); + if (level == 0) { + level0_compactions_in_progress_.push_back(c); + } + Finalize(current_); // reculate level score return c; } - void VersionSet::SetupOtherInputs(Compaction* c) { const int level = c->level(); InternalKey smallest, largest; @@ -1781,7 +1925,10 @@ void VersionSet::SetupOtherInputs(Compaction* c) { std::vector expanded1; current_->GetOverlappingInputs(level+1, &new_start, &new_limit, &expanded1); - if (expanded1.size() == c->inputs_[1].size()) { + // check expanded file wether in compacting + if ((expanded1.size() == c->inputs_[1].size()) && + !RangeInCompaction(&new_start, &new_limit, level) && + !RangeInCompaction(&new_start, &new_limit, level + 1)) { Log(options_->info_log, "[%s] Expanding@%d %d+%d (%ld+%ld bytes) to %d+%d (%ld+%ld bytes)\n", dbname_.c_str(), @@ -1855,11 +2002,18 @@ void VersionSet::SetupCompactionBoundary(Compaction* c) { Compaction* VersionSet::CompactRange( int level, const InternalKey* begin, - const InternalKey* end) { + const InternalKey* end, bool* being_compacted) { + *being_compacted = false; std::vector inputs; current_->GetOverlappingInputs(level, begin, end, &inputs); if (inputs.empty()) { - return NULL; + return NULL; + } + + // check level0 wether in compaction + if (level == 0 && !level0_compactions_in_progress_.empty()) { + *being_compacted = true; + return NULL; } // Avoid compacting too much in one shot in case the range is large. @@ -1880,6 +2034,18 @@ Compaction* VersionSet::CompactRange( } } + // check being compacting + InternalKey smallest, largest; + GetRange(inputs, &smallest, &largest); + if (FilesInCompaction(inputs) || RangeInCompaction(&smallest, &largest, level + 1)) { + PrintFilesInCompaction(inputs); + PrintRangeInCompaction(&smallest, &largest, level + 1); + Log(options_->info_log, "[%s] RangeCompaction : %s...%s, level: %d or %d, in compaction", + dbname_.c_str(), smallest.DebugString().c_str(), largest.DebugString().c_str(), level, level + 1); + *being_compacted = true; + return NULL; + } + Compaction* c = new Compaction(level); c->input_version_ = current_; c->input_version_->Ref(); @@ -1887,9 +2053,51 @@ Compaction* VersionSet::CompactRange( MaxFileSizeForLevel(level + 1, current_->vset_->options_->sst_size); c->inputs_[0] = inputs; SetupOtherInputs(c); + + // mark being compacted + c->MarkBeingCompacted(true); + char buf[30]; + std::string fstr = "file: "; + for (size_t tmpi = 0; tmpi < 2; tmpi++) { + for (size_t tmpj = 0; tmpj < c->inputs_[tmpi].size(); tmpj++) { + snprintf(buf, sizeof(buf), "L%lu:%lu ", tmpi, + c->inputs_[tmpi][tmpj]->number); + fstr.append(buf); + } + } + Log(options_->info_log, "%s mark level %d in C%lu %s, bening compact", dbname_.c_str(), + c->level(), (unsigned long)c, fstr.c_str()); + if (level == 0) { + level0_compactions_in_progress_.push_back(c); + } + Finalize(current_); // reculate level score return c; } +void VersionSet::ReleaseCompaction(Compaction* c, Status& s) { + c->MarkBeingCompacted(false); + assert(level0_compactions_in_progress_.size() <= 1); + if (c->level() == 0 && level0_compactions_in_progress_[0] == c) { + level0_compactions_in_progress_.resize(0); + } + if (!s.ok()) { + Finalize(c->input_version_); + } + + char buf[30]; + std::string fstr = "file: "; + for (size_t tmpi = 0; tmpi < 2; tmpi++) { + for (size_t tmpj = 0; tmpj < c->inputs_[tmpi].size(); tmpj++) { + snprintf(buf, sizeof(buf), "L%lu:%lu ", tmpi, + c->inputs_[tmpi][tmpj]->number); + fstr.append(buf); + } + } + Log(options_->info_log, "%s unmark level %d in C%lu %s, bening compact", + dbname_.c_str(), c->level(), (unsigned long)c, fstr.c_str()); + return; +} + Compaction::Compaction(int level) : level_(level), max_output_file_size_(0), @@ -1969,6 +2177,16 @@ bool Compaction::ShouldStopBefore(const Slice& internal_key) { } } +void Compaction::MarkBeingCompacted(bool flag) { + for (size_t i = 0; i < 2; i++) { + for (size_t j = 0; j < inputs_[i].size(); j++) { + assert(flag ? !inputs_[i][j]->being_compacted + : inputs_[i][j]->being_compacted); + inputs_[i][j]->being_compacted = flag; + } + } +} + void Compaction::ReleaseInputs() { if (input_version_ != NULL) { input_version_->Unref(); diff --git a/src/leveldb/db/version_set.h b/src/leveldb/db/version_set.h index 676cc64ee..2276b0a10 100644 --- a/src/leveldb/db/version_set.h +++ b/src/leveldb/db/version_set.h @@ -19,6 +19,7 @@ #ifndef STORAGE_LEVELDB_DB_VERSION_SET_H_ #define STORAGE_LEVELDB_DB_VERSION_SET_H_ +#include #include #include #include @@ -140,15 +141,19 @@ class Version { // Level that should be compacted next and its compaction score. // Score < 1 means compaction is not strictly needed. These fields // are initialized by Finalize(). - double compaction_score_; - int compaction_level_; + std::vector compaction_score_; + std::vector compaction_level_; explicit Version(VersionSet* vset) : vset_(vset), next_(this), prev_(this), refs_(0), file_to_compact_(NULL), - file_to_compact_level_(-1), - compaction_score_(-1), - compaction_level_(-1) { + file_to_compact_level_(-1) { + compaction_score_.resize(config::kNumLevels - 1); + compaction_level_.resize(config::kNumLevels - 1); + for (size_t i = 0; i < config::kNumLevels - 1; i++) { + compaction_score_[i] = -1.0; + compaction_level_[i] = -1; + } } ~Version(); @@ -171,6 +176,8 @@ class VersionSet { // current version. Will release *mu while actually writing to the file. // REQUIRES: *mu is held on entry. // REQUIRES: no other thread concurrently calls LogAndApply() + void LogAndApplyHelper(VersionSetBuilder* builder, + VersionEdit* edit); Status LogAndApply(VersionEdit* edit, port::Mutex* mu) EXCLUSIVE_LOCKS_REQUIRED(mu); @@ -233,7 +240,10 @@ class VersionSet { Compaction* CompactRange( int level, const InternalKey* begin, - const InternalKey* end); + const InternalKey* end, bool* being_compacted); + + // release file's being_compacted flag, and release level0's lock + void ReleaseCompaction(Compaction* c, Status& s); // Return the maximum overlapping data (in bytes) at next level for any // file at a level >= 1. @@ -246,17 +256,19 @@ class VersionSet { // Returns true iff some level needs a compaction. bool NeedsCompaction() const { Version* v = current_; - return (v->compaction_score_ >= 1) || (v->file_to_compact_ != NULL); + return (v->compaction_score_[0] >= 1) || (v->file_to_compact_ != NULL); } double CompactionScore() const { Version* v = current_; - if (v->compaction_score_ >= 1) { - return v->compaction_score_; - } else if (v->file_to_compact_ != NULL) { + double score = v->compaction_score_[0]; + if (score <= 0) { + if (v->file_to_compact_ && !v->file_to_compact_->being_compacted) { return 0.1f; + } + return -1.0; } - return -1.0; + return score; } // Add all files listed in any live version to *live. @@ -279,6 +291,7 @@ class VersionSet { friend class Compaction; friend class Version; friend class VersionSetBuilder; + struct ManifestWriter; void Finalize(Version* v); @@ -305,6 +318,13 @@ class VersionSet { bool ModifyFileSize(FileMetaData* f); + // milti thread compaction relatively + void PrintFilesInCompaction(const std::vector& inputs); + bool FilesInCompaction(const std::vector& inputs); + void PrintRangeInCompaction(const InternalKey* smallest, const InternalKey* largest, int level); + bool RangeInCompaction(const InternalKey* smallest, const InternalKey* largest, int level); + bool PickCompactionBySize(int level, std::vector* inputs); + Env* const env_; const std::string dbname_; const Options* const options_; @@ -320,6 +340,8 @@ class VersionSet { uint64_t log_number_; uint64_t prev_log_number_; // 0 or backing store for memtable being compacted + std::deque manifest_writers_; + // Opened lazily WritableFile* descriptor_file_; log::Writer* descriptor_log_; @@ -329,6 +351,7 @@ class VersionSet { // Per-level key at which the next compaction at that level should start. // Either an empty string, or a valid InternalKey. std::string compact_pointer_[config::kNumLevels]; + std::vector level0_compactions_in_progress_; // No copying allowed VersionSet(const VersionSet&); @@ -373,6 +396,8 @@ class Compaction { // before processing "internal_key". bool ShouldStopBefore(const Slice& internal_key); + void MarkBeingCompacted(bool flag = true); + // Release the input version for the compaction, once the compaction // is successful. void ReleaseInputs(); @@ -385,6 +410,7 @@ class Compaction { private: friend class Version; friend class VersionSet; + friend class DBimpl; explicit Compaction(int level); diff --git a/src/leveldb/include/leveldb/options.h b/src/leveldb/include/leveldb/options.h index 1d36a69b1..1868e8cba 100644 --- a/src/leveldb/include/leveldb/options.h +++ b/src/leveldb/include/leveldb/options.h @@ -290,6 +290,14 @@ struct Options { // disable write-ahead-log bool disable_wal; + // Max thread alloc for lg's compaction + // Default: 1 + uint32_t max_background_compactions; + + // if level0's file num >= limit, use sqrt slow down level score + // Default: 30 + int slow_down_level0_score_limit; + // Create an Options object with default values for all fields. Options(); }; diff --git a/src/leveldb/util/options.cc b/src/leveldb/util/options.cc index 4d62b5039..27ea735ae 100644 --- a/src/leveldb/util/options.cc +++ b/src/leveldb/util/options.cc @@ -49,7 +49,9 @@ Options::Options() sst_size(kDefaultSstSize), verify_checksums_in_compaction(false), ignore_corruption_in_compaction(false), - disable_wal(false) { + disable_wal(false), + max_background_compactions(3), + slow_down_level0_score_limit(30) { } } // namespace leveldb