From 6af88bc2dfbc8bba12ea116428397246545cee0d Mon Sep 17 00:00:00 2001 From: Takayuki Tanabe Date: Wed, 26 Jul 2023 17:29:22 +0900 Subject: [PATCH] fix for updating wp at commit phase https://github.com/project-tsurugi/tsurugi-issues/issues/326 --- .../interface/long_tx/helper.cpp | 38 +++++++++++++++++++ .../interface/long_tx/include/long_tx.h | 2 + .../interface/long_tx/termination.cpp | 35 +++++------------ src/concurrency_control/ongoing_tx.cpp | 5 +++ src/concurrency_control/wp_meta.cpp | 6 ++- .../long_tx/wp_basic/wp_register_test.cpp | 35 +++++++++++++++++ 6 files changed, 95 insertions(+), 26 deletions(-) diff --git a/src/concurrency_control/interface/long_tx/helper.cpp b/src/concurrency_control/interface/long_tx/helper.cpp index 430daa671..df40ff0ae 100644 --- a/src/concurrency_control/interface/long_tx/helper.cpp +++ b/src/concurrency_control/interface/long_tx/helper.cpp @@ -66,6 +66,44 @@ void preprocess_read_area(transaction_options::read_area& ra) { } } +void update_wp_at_commit(session* const ti, std::set const& sts) { + /** + * write preserve はTX開始時点に宣言したものよりも実体の方が同じか小さくなる。 + * 小さくできるなら小さくすることで、他Txへの影響を軽減する。 + * */ + for (auto itr = ti->get_wp_set().begin(); itr != ti->get_wp_set().end();) { + bool hit_actual{false}; + for (auto actual_write_storage : sts) { + if (actual_write_storage == itr->first) { + // exactly, write this storage + hit_actual = true; + break; + } + } + if (hit_actual) { + ++itr; + continue; + } // else + /** + * wp したが、実際には書かなかったストレージである。コミット処理前にこれを + * 取り除く + */ + { + itr->second->get_wp_lock().lock(); + auto ret = + itr->second->remove_wp_without_lock(ti->get_long_tx_id()); + if (ret == Status::OK) { + // 縮退成功 + itr = ti->get_wp_set().erase(itr); + continue; + } + LOG(ERROR) << log_location_prefix << "unexpected code path"; + itr->second->get_wp_lock().unlock(); + } + ++itr; + } +} + Status tx_begin(session* const ti, std::vector write_preserve, transaction_options::read_area ra) { // NOLINT // get wp mutex, exclude long tx's coming and epoch update diff --git a/src/concurrency_control/interface/long_tx/include/long_tx.h b/src/concurrency_control/interface/long_tx/include/long_tx.h index a43de0e1b..a1d1316ff 100644 --- a/src/concurrency_control/interface/long_tx/include/long_tx.h +++ b/src/concurrency_control/interface/long_tx/include/long_tx.h @@ -29,6 +29,8 @@ extern Status search_key(session* ti, Storage storage, std::string_view key, extern Status tx_begin(session* ti, std::vector write_preserve, transaction_options::read_area ra); +extern void update_wp_at_commit(session* ti, std::set const& sts); + /** * @brief version function for long tx. * @param[in] rec pointer to record. diff --git a/src/concurrency_control/interface/long_tx/termination.cpp b/src/concurrency_control/interface/long_tx/termination.cpp index 1dec9d017..afb9d32d9 100644 --- a/src/concurrency_control/interface/long_tx/termination.cpp +++ b/src/concurrency_control/interface/long_tx/termination.cpp @@ -327,23 +327,6 @@ static inline void register_wp_result_and_remove_wps( std::map>& write_range) { for (auto&& elem : ti->get_wp_set()) { Storage storage = elem.first; - Storage page_set_meta_storage = wp::get_page_set_meta_storage(); - std::string_view page_set_meta_storage_view = { - reinterpret_cast( // NOLINT - &page_set_meta_storage), - sizeof(page_set_meta_storage)}; - std::string_view storage_view = { - reinterpret_cast(&storage), // NOLINT - sizeof(storage)}; - std::pair out{}; - auto rc{yakushima::get(page_set_meta_storage_view, - storage_view, out)}; - if (rc != yakushima::status::OK) { - LOG(ERROR) << log_location_prefix << "Error: " << rc - << ". It strongly suspect that DML and DDL are " - "mixed."; - return; - } // check write range auto wr_itr = write_range.find(storage); @@ -357,14 +340,11 @@ static inline void register_wp_result_and_remove_wps( // register wp result and remove wp if (Status::OK != - (reinterpret_cast(out.first)) // NOLINT - ->get_wp_meta_ptr() - ->register_wp_result_and_remove_wp(std::make_tuple( - ti->get_valid_epoch(), ti->get_long_tx_id(), - was_committed, - std::make_tuple(write_something, - std::string(write_range_left), - std::string(write_range_right))))) { + (elem.second->register_wp_result_and_remove_wp(std::make_tuple( + ti->get_valid_epoch(), ti->get_long_tx_id(), was_committed, + std::make_tuple(write_something, + std::string(write_range_left), + std::string(write_range_right)))))) { LOG(ERROR) << "Fail to register wp result and remove wp."; } } @@ -518,6 +498,11 @@ Status verify(session* const ti) { ++wp_result_itr; } } + /** + * wp result set から見つからないということは、相手は wp 宣言をしたが + * 実際には書かなくて wp が縮退されたということ。そのままスルーしてよい。 + * イテレートは拡張 for ループの方で実施される。 + */ } } diff --git a/src/concurrency_control/ongoing_tx.cpp b/src/concurrency_control/ongoing_tx.cpp index 56495eb14..0cd4040af 100644 --- a/src/concurrency_control/ongoing_tx.cpp +++ b/src/concurrency_control/ongoing_tx.cpp @@ -1,6 +1,7 @@ #include "concurrency_control/include/ongoing_tx.h" #include "concurrency_control/include/wp.h" +#include "concurrency_control/interface/long_tx/include/long_tx.h" namespace shirakami { @@ -33,6 +34,10 @@ bool ongoing_tx::exist_wait_for(session* ti) { std::set st_set{}; // create and compaction about storage set ti->get_write_set().get_storage_set(st_set); + if (!ti->get_requested_commit()) { + // first request, so update wp + long_tx::update_wp_at_commit(ti, st_set); + } // check wait for (auto&& elem : tx_info_) { diff --git a/src/concurrency_control/wp_meta.cpp b/src/concurrency_control/wp_meta.cpp index b9b2548ea..fafc0dacc 100644 --- a/src/concurrency_control/wp_meta.cpp +++ b/src/concurrency_control/wp_meta.cpp @@ -143,7 +143,10 @@ wp_meta::register_wp_result_and_remove_wp(wp_result_elem_type const& elem) { wp_result_set_.emplace_back(elem); } wp_lock_.lock(); - return remove_wp_without_lock(wp_result_elem_extract_id(elem)); + auto ret = remove_wp_without_lock(wp_result_elem_extract_id(elem)); + if (ret == Status::OK) { return ret; } + wp_lock_.unlock(); + return ret; } [[nodiscard]] Status wp_meta::remove_wp_without_lock(std::size_t const id) { @@ -155,6 +158,7 @@ wp_meta::register_wp_result_and_remove_wp(wp_result_elem_type const& elem) { return Status::OK; } } + LOG(ERROR) << log_location_prefix << "unexpected code path"; return Status::WARN_NOT_FOUND; } diff --git a/test/concurrency_control/long_tx/wp_basic/wp_register_test.cpp b/test/concurrency_control/long_tx/wp_basic/wp_register_test.cpp index c79affa8b..7e7e7894b 100644 --- a/test/concurrency_control/long_tx/wp_basic/wp_register_test.cpp +++ b/test/concurrency_control/long_tx/wp_basic/wp_register_test.cpp @@ -17,6 +17,8 @@ #include "yakushima/include/kvs.h" +#include "test_tool.h" + #include "gtest/gtest.h" #include "glog/logging.h" @@ -71,4 +73,37 @@ TEST_F(wp_register_test, multi_register) { // NOLINT ASSERT_EQ(wu.count(), 2); } +TEST_F(wp_register_test, shrink_at_commit) { // NOLINT + /** + * コミット時の write preserve 圧縮をテストする。 + */ + // prepare + Storage st1{}; + Storage st2{}; + ASSERT_EQ(create_storage("test1", st1), Status::OK); + ASSERT_EQ(create_storage("test2", st2), Status::OK); + + Token s{}; + ASSERT_EQ(enter(s), Status::OK); + + // test + ASSERT_EQ(tx_begin({s, + transaction_options::transaction_type::LONG, + {st1, st2}}), + Status::OK); + wait_epoch_update(); + // 書くのは一か所のみ + ASSERT_EQ(Status::OK, upsert(s, st1, "", "")); + ASSERT_EQ(Status::OK, commit(s)); + + // verify + wp::wp_meta* wp_meta_ptr{}; + wp::find_wp_meta(st1, wp_meta_ptr); + ASSERT_EQ(1, wp_meta_ptr->get_wp_result_set().size()); + wp::find_wp_meta(st2, wp_meta_ptr); + ASSERT_EQ(0, wp_meta_ptr->get_wp_result_set().size()); + + // cleanup + ASSERT_EQ(leave(s), Status::OK); +} } // namespace shirakami::testing