diff --git a/src/concurrency_control/include/lpwal.h b/src/concurrency_control/include/lpwal.h index c9425c1d4..b9d2663af 100644 --- a/src/concurrency_control/include/lpwal.h +++ b/src/concurrency_control/include/lpwal.h @@ -168,12 +168,21 @@ class handler { return worker_number_; } + [[nodiscard]] bool get_begun_session() const { + return begun_session_; + } + + [[nodiscard]] epoch::epoch_t get_durable_epoch() const { + return durable_epoch_; + } + /** * @pre take mtx of logs */ void push_log(log_record const& log) { if (logs_.empty()) { set_min_log_epoch(log.get_wv().get_major_write_version()); + begin_session(); } logs_.emplace_back(log); } @@ -182,6 +191,7 @@ class handler { worker_number_ = 0; min_log_epoch_ = 0; logs_.clear(); + begun_session_ = false; // this can't due to concurrent programming // log_channel_ptr_ = nullptr; } @@ -196,6 +206,16 @@ class handler { void set_worker_number(std::size_t wn) { worker_number_ = wn; } + /** + * @pre take mtx of logs + */ + void begin_session(); + + /** + * @pre take mtx of logs + */ + void end_session(); + private: /** * @brief worker thread number used for logging callback. @@ -208,7 +228,7 @@ class handler { std::atomic min_log_epoch_{0}; /** - * @brief mutex for logs_ + * @brief mutex for logs_ and begun_session_ and durable_epoch_ */ std::mutex mtx_logs_; @@ -217,6 +237,10 @@ class handler { */ logs_type logs_{}; + // invariant: logs_.empty() == !begun_session_ + bool begun_session_{false}; + epoch::epoch_t durable_epoch_{}; + /** * @brief log channel */ diff --git a/src/concurrency_control/interface/long_tx/termination.cpp b/src/concurrency_control/interface/long_tx/termination.cpp index 147759405..846ae33ed 100644 --- a/src/concurrency_control/interface/long_tx/termination.cpp +++ b/src/concurrency_control/interface/long_tx/termination.cpp @@ -838,20 +838,6 @@ extern Status commit(session* const ti) { // This must be after cc commit and before log process ti->commit_sequence(ctid); - // log debug timing event - VLOG(log_debug_timing_event) << log_location_prefix_timing_event - << "start_process_logging : " << str_tx_id; - -#if defined(PWAL) - auto oldest_log_epoch{ti->get_lpwal_handle().get_min_log_epoch()}; - // think the wal buffer is empty due to background thread's work - if (oldest_log_epoch != 0 && // mean the wal buffer is not empty. - oldest_log_epoch != epoch::get_global_epoch()) { - // should flush - shirakami::lpwal::flush_log(static_cast(ti)); - } -#endif - // todo enhancement /** * Sort by wp and then globalize the local write set. @@ -859,6 +845,13 @@ extern Status commit(session* const ti) { */ auto this_dm = epoch::get_global_epoch(); +#if defined(PWAL) + { + auto& handle = ti->get_lpwal_handle(); + std::unique_lock lk{handle.get_mtx_logs()}; + if (handle.get_begun_session()) { this_dm = handle.get_durable_epoch(); } + } +#endif // about transaction state process_tx_state(ti, this_dm); @@ -872,6 +865,20 @@ extern Status commit(session* const ti) { // call commit callback ti->call_commit_callback(rc, {}, this_dm); + // log debug timing event + VLOG(log_debug_timing_event) << log_location_prefix_timing_event + << "start_process_logging : " << str_tx_id; + +#if defined(PWAL) + auto oldest_log_epoch{ti->get_lpwal_handle().get_min_log_epoch()}; + // think the wal buffer is empty due to background thread's work + if (oldest_log_epoch != 0 && // mean the wal buffer is not empty. + oldest_log_epoch != epoch::get_global_epoch()) { + // should flush + shirakami::lpwal::flush_log(static_cast(ti)); + } +#endif + } else { LOG_FIRST_N(ERROR, 1) << "library programming error."; } diff --git a/src/concurrency_control/interface/short_tx/termination.cpp b/src/concurrency_control/interface/short_tx/termination.cpp index 6b035fc7c..4a74afe97 100644 --- a/src/concurrency_control/interface/short_tx/termination.cpp +++ b/src/concurrency_control/interface/short_tx/termination.cpp @@ -764,19 +764,15 @@ extern Status commit(session* const ti) { // This must be after cc commit and before log process ti->commit_sequence(ti->get_mrc_tid()); - // flush log if need + auto this_dm = epoch::get_global_epoch(); #if defined(PWAL) - auto oldest_log_epoch{ti->get_lpwal_handle().get_min_log_epoch()}; - // think the wal buffer is empty due to background thread's work - if (oldest_log_epoch != 0 && // mean the wal buffer is not empty. - oldest_log_epoch != epoch::get_global_epoch()) { - // should flush - shirakami::lpwal::flush_log(static_cast(ti)); + { + auto& handle = ti->get_lpwal_handle(); + std::unique_lock lk{handle.get_mtx_logs()}; + if (handle.get_begun_session()) { this_dm = handle.get_durable_epoch(); } } #endif - auto this_dm = epoch::get_global_epoch(); - // about tx state process_tx_state(ti, this_dm); @@ -787,6 +783,18 @@ extern Status commit(session* const ti) { ti->set_result(reason_code::UNKNOWN); ti->call_commit_callback(rc, {}, this_dm); + + // flush log if need +#if defined(PWAL) + auto oldest_log_epoch{ti->get_lpwal_handle().get_min_log_epoch()}; + // think the wal buffer is empty due to background thread's work + if (oldest_log_epoch != 0 && // mean the wal buffer is not empty. + oldest_log_epoch != epoch::get_global_epoch()) { + // should flush + shirakami::lpwal::flush_log(static_cast(ti)); + } +#endif + return Status::OK; } diff --git a/src/datastore/limestone/lpwal.cpp b/src/datastore/limestone/lpwal.cpp index 58e16f2ea..d2025e2b0 100644 --- a/src/datastore/limestone/lpwal.cpp +++ b/src/datastore/limestone/lpwal.cpp @@ -109,6 +109,19 @@ void fin() { set_stopping(false); } +void handler::begin_session() { + // assert begun_session_ = false + shirakami::begin_session(log_channel_ptr_); + durable_epoch_ = epoch::get_global_epoch(); + begun_session_ = true; +} + +void handler::end_session() { + // assert begun_session_ = true + shirakami::end_session(log_channel_ptr_); + begun_session_ = false; +} + void flush_log(Token token) { auto* ti = static_cast(token); auto& handle = ti->get_lpwal_handle(); @@ -116,9 +129,18 @@ void flush_log(Token token) { if (handle.get_mtx_logs().try_lock()) { // flush log if exist if (!handle.get_logs().empty()) { - begin_session(handle.get_log_channel_ptr()); + if (!handle.get_begun_session()) { + LOG_FIRST_N(ERROR, 1) << log_location_prefix << "lpwal worker#" << handle.get_worker_number() + << ": session should be begun here"; + handle.begin_session(); + } add_entry_from_logs(handle); - end_session(handle.get_log_channel_ptr()); + handle.end_session(); + } + if (handle.get_begun_session()) { + LOG_FIRST_N(ERROR, 1) << log_location_prefix << "lpwal worker#" << handle.get_worker_number() + << ": session should not be begun here"; + handle.end_session(); } handle.get_mtx_logs().unlock(); @@ -127,10 +149,21 @@ void flush_log(Token token) { void flush_remaining_log() { for (auto&& es : session_table::get_session_table()) { - if (!es.get_lpwal_handle().get_logs().empty()) { - begin_session(es.get_lpwal_handle().get_log_channel_ptr()); - add_entry_from_logs(es.get_lpwal_handle()); - end_session(es.get_lpwal_handle().get_log_channel_ptr()); + auto& handle = es.get_lpwal_handle(); + std::unique_lock lk{handle.get_mtx_logs()}; + if (!handle.get_logs().empty()) { + if (!handle.get_begun_session()) { + LOG_FIRST_N(ERROR, 1) << log_location_prefix << "lpwal worker#" << handle.get_worker_number() + << ": session should be begun here"; + handle.begin_session(); + } + add_entry_from_logs(handle); + handle.end_session(); + } + if (handle.get_begun_session()) { + LOG_FIRST_N(ERROR, 1) << log_location_prefix << "lpwal worker#" << handle.get_worker_number() + << ": session should not be begun here"; + handle.end_session(); } } }