Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
thawk105 committed Jul 21, 2023
1 parent b5f4f33 commit fcf146c
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 26 deletions.
38 changes: 38 additions & 0 deletions src/concurrency_control/interface/long_tx/helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,44 @@ void preprocess_read_area(transaction_options::read_area& ra) {
}
}

void update_wp_at_commit(session* const ti, std::set<Storage> const& sts) {
/**
* write preserve はTX開始時点に宣言したものよりも実体の方が同じか小さくなる。
* 小さくできるなら小さくすることで、他Txへの影響を軽減する。
* */
for (auto itr = ti->get_wp_set().begin(); itr != ti->get_wp_set().end();) {
bool hit_actual{false};
for (auto actual_write_storage : sts) {
if (actual_write_storage == itr->first) {
// exactly, write this storage
hit_actual = true;
break;
}
}
if (hit_actual) {
++itr;
continue;
} // else
/**
* wp したが、実際には書かなかったストレージである。コミット処理前にこれを
* 取り除く
*/
{
itr->second->get_wp_lock().lock();
auto ret =
itr->second->remove_wp_without_lock(ti->get_long_tx_id());
if (ret == Status::OK) {
// 縮退成功
itr = ti->get_wp_set().erase(itr);
continue;
}
LOG(ERROR) << log_location_prefix << "unexpected code path";
itr->second->get_wp_lock().unlock();
}
++itr;
}
}

Status tx_begin(session* const ti, std::vector<Storage> write_preserve,
transaction_options::read_area ra) { // NOLINT
// get wp mutex, exclude long tx's coming and epoch update
Expand Down
2 changes: 2 additions & 0 deletions src/concurrency_control/interface/long_tx/include/long_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ extern Status search_key(session* ti, Storage storage, std::string_view key,
extern Status tx_begin(session* ti, std::vector<Storage> write_preserve,
transaction_options::read_area ra);

extern void update_wp_at_commit(session* ti, std::set<Storage> const& sts);

/**
* @brief version function for long tx.
* @param[in] rec pointer to record.
Expand Down
34 changes: 9 additions & 25 deletions src/concurrency_control/interface/long_tx/termination.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,23 +327,6 @@ static inline void register_wp_result_and_remove_wps(
std::map<Storage, std::tuple<std::string, std::string>>& write_range) {
for (auto&& elem : ti->get_wp_set()) {
Storage storage = elem.first;
Storage page_set_meta_storage = wp::get_page_set_meta_storage();
std::string_view page_set_meta_storage_view = {
reinterpret_cast<char*>( // NOLINT
&page_set_meta_storage),
sizeof(page_set_meta_storage)};
std::string_view storage_view = {
reinterpret_cast<char*>(&storage), // NOLINT
sizeof(storage)};
std::pair<wp::page_set_meta**, std::size_t> out{};
auto rc{yakushima::get<wp::page_set_meta*>(page_set_meta_storage_view,
storage_view, out)};
if (rc != yakushima::status::OK) {
LOG(ERROR) << log_location_prefix << "Error: " << rc
<< ". It strongly suspect that DML and DDL are "
"mixed.";
return;
}

// check write range
auto wr_itr = write_range.find(storage);
Expand All @@ -357,14 +340,11 @@ static inline void register_wp_result_and_remove_wps(

// register wp result and remove wp
if (Status::OK !=
(reinterpret_cast<wp::page_set_meta*>(out.first)) // NOLINT
->get_wp_meta_ptr()
->register_wp_result_and_remove_wp(std::make_tuple(
ti->get_valid_epoch(), ti->get_long_tx_id(),
was_committed,
std::make_tuple(write_something,
std::string(write_range_left),
std::string(write_range_right))))) {
(elem.second->register_wp_result_and_remove_wp(std::make_tuple(
ti->get_valid_epoch(), ti->get_long_tx_id(), was_committed,
std::make_tuple(write_something,
std::string(write_range_left),
std::string(write_range_right)))))) {
LOG(ERROR) << "Fail to register wp result and remove wp.";
}
}
Expand Down Expand Up @@ -518,6 +498,10 @@ Status verify(session* const ti) {
++wp_result_itr;
}
}
/**
* wp result set から見つからないということは、相手は wp 宣言をしたが
* 実際には書かなくて wp が縮退されたということ。そのままスルーしてよい。
*/
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/concurrency_control/ongoing_tx.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

#include "concurrency_control/include/ongoing_tx.h"
#include "concurrency_control/include/wp.h"
#include "concurrency_control/interface/long_tx/include/long_tx.h"

namespace shirakami {

Expand Down Expand Up @@ -33,6 +34,10 @@ bool ongoing_tx::exist_wait_for(session* ti) {
std::set<Storage> st_set{};
// create and compaction about storage set
ti->get_write_set().get_storage_set(st_set);
if (!ti->get_requested_commit()) {
// first request, so update wp
long_tx::update_wp_at_commit(ti, st_set);
}

// check wait
for (auto&& elem : tx_info_) {
Expand Down
6 changes: 5 additions & 1 deletion src/concurrency_control/wp_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ wp_meta::register_wp_result_and_remove_wp(wp_result_elem_type const& elem) {
wp_result_set_.emplace_back(elem);
}
wp_lock_.lock();
return remove_wp_without_lock(wp_result_elem_extract_id(elem));
auto ret = remove_wp_without_lock(wp_result_elem_extract_id(elem));
if (ret == Status::OK) { return ret; }
wp_lock_.unlock();
return ret;
}

[[nodiscard]] Status wp_meta::remove_wp_without_lock(std::size_t const id) {
Expand All @@ -155,6 +158,7 @@ wp_meta::register_wp_result_and_remove_wp(wp_result_elem_type const& elem) {
return Status::OK;
}
}
LOG(ERROR) << log_location_prefix << "unexpected code path";
return Status::WARN_NOT_FOUND;
}

Expand Down

0 comments on commit fcf146c

Please sign in to comment.