From dca2a6a0a043c27c56acca267b3ecf39980e11db Mon Sep 17 00:00:00 2001 From: Nobuhiro Ban Date: Fri, 6 Sep 2024 21:50:37 +0900 Subject: [PATCH] WIP944 --- src/concurrency_control/include/lpwal.h | 29 ++++++++++++++- .../interface/long_tx/termination.cpp | 35 +++++++++++-------- .../interface/short_tx/termination.cpp | 23 ++++++------ src/datastore/limestone/lpwal.cpp | 32 +++++++++++++---- 4 files changed, 87 insertions(+), 32 deletions(-) diff --git a/src/concurrency_control/include/lpwal.h b/src/concurrency_control/include/lpwal.h index c9425c1d4..2c8427c3f 100644 --- a/src/concurrency_control/include/lpwal.h +++ b/src/concurrency_control/include/lpwal.h @@ -168,12 +168,21 @@ class handler { return worker_number_; } + [[nodiscard]] bool get_begun_session() const { + return begun_session_; + } + + [[nodiscard]] epoch::epoch_t get_durable_epoch() const { + return durable_epoch_; + } + /** * @pre take mtx of logs */ void push_log(log_record const& log) { if (logs_.empty()) { set_min_log_epoch(log.get_wv().get_major_write_version()); + begin_session(); } logs_.emplace_back(log); } @@ -196,6 +205,21 @@ class handler { void set_worker_number(std::size_t wn) { worker_number_ = wn; } + // must call with mtx_log_ + void begin_session() { + // assert begun_session_ = false + shirakami::begin_session(log_channel_ptr_); + durable_epoch_ = epoch::get_global_epoch(); + begun_session_ = true; + } + + // must call with mtx_log_ + void end_session() { + // assert begun_session_ = true + shirakami::end_session(log_channel_ptr_); + begun_session_ = false; + } + private: /** * @brief worker thread number used for logging callback. @@ -208,7 +232,7 @@ class handler { std::atomic min_log_epoch_{0}; /** - * @brief mutex for logs_ + * @brief mutex for logs_ and begun_session and durable_epoch */ std::mutex mtx_logs_; @@ -217,6 +241,9 @@ class handler { */ logs_type logs_{}; + bool begun_session_{false}; + epoch::epoch_t durable_epoch_{}; + /** * @brief log channel */ diff --git a/src/concurrency_control/interface/long_tx/termination.cpp b/src/concurrency_control/interface/long_tx/termination.cpp index 147759405..846ae33ed 100644 --- a/src/concurrency_control/interface/long_tx/termination.cpp +++ b/src/concurrency_control/interface/long_tx/termination.cpp @@ -838,20 +838,6 @@ extern Status commit(session* const ti) { // This must be after cc commit and before log process ti->commit_sequence(ctid); - // log debug timing event - VLOG(log_debug_timing_event) << log_location_prefix_timing_event - << "start_process_logging : " << str_tx_id; - -#if defined(PWAL) - auto oldest_log_epoch{ti->get_lpwal_handle().get_min_log_epoch()}; - // think the wal buffer is empty due to background thread's work - if (oldest_log_epoch != 0 && // mean the wal buffer is not empty. - oldest_log_epoch != epoch::get_global_epoch()) { - // should flush - shirakami::lpwal::flush_log(static_cast(ti)); - } -#endif - // todo enhancement /** * Sort by wp and then globalize the local write set. @@ -859,6 +845,13 @@ extern Status commit(session* const ti) { */ auto this_dm = epoch::get_global_epoch(); +#if defined(PWAL) + { + auto& handle = ti->get_lpwal_handle(); + std::unique_lock lk{handle.get_mtx_logs()}; + if (handle.get_begun_session()) { this_dm = handle.get_durable_epoch(); } + } +#endif // about transaction state process_tx_state(ti, this_dm); @@ -872,6 +865,20 @@ extern Status commit(session* const ti) { // call commit callback ti->call_commit_callback(rc, {}, this_dm); + // log debug timing event + VLOG(log_debug_timing_event) << log_location_prefix_timing_event + << "start_process_logging : " << str_tx_id; + +#if defined(PWAL) + auto oldest_log_epoch{ti->get_lpwal_handle().get_min_log_epoch()}; + // think the wal buffer is empty due to background thread's work + if (oldest_log_epoch != 0 && // mean the wal buffer is not empty. + oldest_log_epoch != epoch::get_global_epoch()) { + // should flush + shirakami::lpwal::flush_log(static_cast(ti)); + } +#endif + } else { LOG_FIRST_N(ERROR, 1) << "library programming error."; } diff --git a/src/concurrency_control/interface/short_tx/termination.cpp b/src/concurrency_control/interface/short_tx/termination.cpp index 6b035fc7c..4e43bb49c 100644 --- a/src/concurrency_control/interface/short_tx/termination.cpp +++ b/src/concurrency_control/interface/short_tx/termination.cpp @@ -764,17 +764,6 @@ extern Status commit(session* const ti) { // This must be after cc commit and before log process ti->commit_sequence(ti->get_mrc_tid()); - // flush log if need -#if defined(PWAL) - auto oldest_log_epoch{ti->get_lpwal_handle().get_min_log_epoch()}; - // think the wal buffer is empty due to background thread's work - if (oldest_log_epoch != 0 && // mean the wal buffer is not empty. - oldest_log_epoch != epoch::get_global_epoch()) { - // should flush - shirakami::lpwal::flush_log(static_cast(ti)); - } -#endif - auto this_dm = epoch::get_global_epoch(); // about tx state @@ -787,6 +776,18 @@ extern Status commit(session* const ti) { ti->set_result(reason_code::UNKNOWN); ti->call_commit_callback(rc, {}, this_dm); + + // flush log if need +#if defined(PWAL) + auto oldest_log_epoch{ti->get_lpwal_handle().get_min_log_epoch()}; + // think the wal buffer is empty due to background thread's work + if (oldest_log_epoch != 0 && // mean the wal buffer is not empty. + oldest_log_epoch != epoch::get_global_epoch()) { + // should flush + shirakami::lpwal::flush_log(static_cast(ti)); + } +#endif + return Status::OK; } diff --git a/src/datastore/limestone/lpwal.cpp b/src/datastore/limestone/lpwal.cpp index 58e16f2ea..34024965b 100644 --- a/src/datastore/limestone/lpwal.cpp +++ b/src/datastore/limestone/lpwal.cpp @@ -116,9 +116,18 @@ void flush_log(Token token) { if (handle.get_mtx_logs().try_lock()) { // flush log if exist if (!handle.get_logs().empty()) { - begin_session(handle.get_log_channel_ptr()); + if (!handle.get_begun_session()) { + LOG_FIRST_N(ERROR, 1) << log_location_prefix << "lpwal worker#" << handle.get_worker_number() + << ": session should be begun here"; + handle.begin_session(); + } add_entry_from_logs(handle); - end_session(handle.get_log_channel_ptr()); + handle.end_session(); + } + if (handle.get_begun_session()) { + LOG_FIRST_N(ERROR, 1) << log_location_prefix << "lpwal worker#" << handle.get_worker_number() + << ": session should not be begun here"; + handle.end_session(); } handle.get_mtx_logs().unlock(); @@ -127,10 +136,21 @@ void flush_log(Token token) { void flush_remaining_log() { for (auto&& es : session_table::get_session_table()) { - if (!es.get_lpwal_handle().get_logs().empty()) { - begin_session(es.get_lpwal_handle().get_log_channel_ptr()); - add_entry_from_logs(es.get_lpwal_handle()); - end_session(es.get_lpwal_handle().get_log_channel_ptr()); + auto& handle = es.get_lpwal_handle(); + std::unique_lock lk{handle.get_mtx_logs()}; + if (!handle.get_logs().empty()) { + if (!handle.get_begun_session()) { + LOG_FIRST_N(ERROR, 1) << log_location_prefix << "lpwal worker#" << handle.get_worker_number() + << ": session should be begun here"; + handle.begin_session(); + } + add_entry_from_logs(handle); + handle.end_session(); + } + if (handle.get_begun_session()) { + LOG_FIRST_N(ERROR, 1) << log_location_prefix << "lpwal worker#" << handle.get_worker_number() + << ": session should not be begun here"; + handle.end_session(); } } }