Skip to content

Commit

Permalink
WIP944
Browse files Browse the repository at this point in the history
  • Loading branch information
ban-nobuhiro committed Sep 12, 2024
1 parent 5e00592 commit a7e4e3b
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 30 deletions.
26 changes: 25 additions & 1 deletion src/concurrency_control/include/lpwal.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,21 @@ class handler {
return worker_number_;
}

[[nodiscard]] bool get_begun_session() const {
return begun_session_;
}

[[nodiscard]] epoch::epoch_t get_durable_epoch() const {
return durable_epoch_;
}

/**
* @pre take mtx of logs
*/
void push_log(log_record const& log) {
if (logs_.empty()) {
set_min_log_epoch(log.get_wv().get_major_write_version());
begin_session();
}
logs_.emplace_back(log);
}
Expand All @@ -182,6 +191,7 @@ class handler {
worker_number_ = 0;
min_log_epoch_ = 0;
logs_.clear();
begun_session_ = false;
// this can't due to concurrent programming
// log_channel_ptr_ = nullptr;
}
Expand All @@ -196,6 +206,16 @@ class handler {

void set_worker_number(std::size_t wn) { worker_number_ = wn; }

/**
* @pre take mtx of logs
*/
void begin_session();

/**
* @pre take mtx of logs
*/
void end_session();

private:
/**
* @brief worker thread number used for logging callback.
Expand All @@ -208,7 +228,7 @@ class handler {
std::atomic<epoch::epoch_t> min_log_epoch_{0};

/**
* @brief mutex for logs_
* @brief mutex for logs_ and begun_session_ and durable_epoch_
*/
std::mutex mtx_logs_;

Expand All @@ -217,6 +237,10 @@ class handler {
*/
logs_type logs_{};

// invariant: logs_.empty() == !begun_session_
bool begun_session_{false};
epoch::epoch_t durable_epoch_{};

/**
* @brief log channel
*/
Expand Down
35 changes: 21 additions & 14 deletions src/concurrency_control/interface/long_tx/termination.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -838,27 +838,20 @@ extern Status commit(session* const ti) {
// This must be after cc commit and before log process
ti->commit_sequence(ctid);

// log debug timing event
VLOG(log_debug_timing_event) << log_location_prefix_timing_event
<< "start_process_logging : " << str_tx_id;

#if defined(PWAL)
auto oldest_log_epoch{ti->get_lpwal_handle().get_min_log_epoch()};
// think the wal buffer is empty due to background thread's work
if (oldest_log_epoch != 0 && // mean the wal buffer is not empty.
oldest_log_epoch != epoch::get_global_epoch()) {
// should flush
shirakami::lpwal::flush_log(static_cast<void*>(ti));
}
#endif

// todo enhancement
/**
* Sort by wp and then globalize the local write set.
* Eliminate wp from those that have been globalized in wp units.
*/

auto this_dm = epoch::get_global_epoch();
#if defined(PWAL)
{
auto& handle = ti->get_lpwal_handle();
std::unique_lock lk{handle.get_mtx_logs()};
if (handle.get_begun_session()) { this_dm = handle.get_durable_epoch(); }
}
#endif

// about transaction state
process_tx_state(ti, this_dm);
Expand All @@ -872,6 +865,20 @@ extern Status commit(session* const ti) {
// call commit callback
ti->call_commit_callback(rc, {}, this_dm);

// log debug timing event
VLOG(log_debug_timing_event) << log_location_prefix_timing_event
<< "start_process_logging : " << str_tx_id;

#if defined(PWAL)
auto oldest_log_epoch{ti->get_lpwal_handle().get_min_log_epoch()};
// think the wal buffer is empty due to background thread's work
if (oldest_log_epoch != 0 && // mean the wal buffer is not empty.
oldest_log_epoch != epoch::get_global_epoch()) {
// should flush
shirakami::lpwal::flush_log(static_cast<void*>(ti));
}
#endif

} else {
LOG_FIRST_N(ERROR, 1) << "library programming error.";
}
Expand Down
26 changes: 17 additions & 9 deletions src/concurrency_control/interface/short_tx/termination.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -764,19 +764,15 @@ extern Status commit(session* const ti) {
// This must be after cc commit and before log process
ti->commit_sequence(ti->get_mrc_tid());

// flush log if need
auto this_dm = epoch::get_global_epoch();
#if defined(PWAL)
auto oldest_log_epoch{ti->get_lpwal_handle().get_min_log_epoch()};
// think the wal buffer is empty due to background thread's work
if (oldest_log_epoch != 0 && // mean the wal buffer is not empty.
oldest_log_epoch != epoch::get_global_epoch()) {
// should flush
shirakami::lpwal::flush_log(static_cast<void*>(ti));
{
auto& handle = ti->get_lpwal_handle();
std::unique_lock lk{handle.get_mtx_logs()};
if (handle.get_begun_session()) { this_dm = handle.get_durable_epoch(); }
}
#endif

auto this_dm = epoch::get_global_epoch();

// about tx state
process_tx_state(ti, this_dm);

Expand All @@ -787,6 +783,18 @@ extern Status commit(session* const ti) {
ti->set_result(reason_code::UNKNOWN);

ti->call_commit_callback(rc, {}, this_dm);

// flush log if need
#if defined(PWAL)
auto oldest_log_epoch{ti->get_lpwal_handle().get_min_log_epoch()};
// think the wal buffer is empty due to background thread's work
if (oldest_log_epoch != 0 && // mean the wal buffer is not empty.
oldest_log_epoch != epoch::get_global_epoch()) {
// should flush
shirakami::lpwal::flush_log(static_cast<void*>(ti));
}
#endif

return Status::OK;
}

Expand Down
45 changes: 39 additions & 6 deletions src/datastore/limestone/lpwal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,38 @@ void fin() {
set_stopping(false);
}

void handler::begin_session() {
// assert begun_session_ = false
shirakami::begin_session(log_channel_ptr_);
durable_epoch_ = epoch::get_global_epoch();
begun_session_ = true;
}

void handler::end_session() {
// assert begun_session_ = true
shirakami::end_session(log_channel_ptr_);
begun_session_ = false;
}

void flush_log(Token token) {
auto* ti = static_cast<session*>(token);
auto& handle = ti->get_lpwal_handle();
// this is called worker or daemon, so use try_lock
if (handle.get_mtx_logs().try_lock()) {
// flush log if exist
if (!handle.get_logs().empty()) {
begin_session(handle.get_log_channel_ptr());
if (!handle.get_begun_session()) {
LOG_FIRST_N(ERROR, 1) << log_location_prefix << "lpwal worker#" << handle.get_worker_number()
<< ": session should be begun here";
handle.begin_session();
}
add_entry_from_logs(handle);
end_session(handle.get_log_channel_ptr());
handle.end_session();
}
if (handle.get_begun_session()) {
LOG_FIRST_N(ERROR, 1) << log_location_prefix << "lpwal worker#" << handle.get_worker_number()
<< ": session should not be begun here";
handle.end_session();
}

handle.get_mtx_logs().unlock();
Expand All @@ -127,10 +149,21 @@ void flush_log(Token token) {

void flush_remaining_log() {
for (auto&& es : session_table::get_session_table()) {
if (!es.get_lpwal_handle().get_logs().empty()) {
begin_session(es.get_lpwal_handle().get_log_channel_ptr());
add_entry_from_logs(es.get_lpwal_handle());
end_session(es.get_lpwal_handle().get_log_channel_ptr());
auto& handle = es.get_lpwal_handle();
std::unique_lock lk{handle.get_mtx_logs()};
if (!handle.get_logs().empty()) {
if (!handle.get_begun_session()) {
LOG_FIRST_N(ERROR, 1) << log_location_prefix << "lpwal worker#" << handle.get_worker_number()
<< ": session should be begun here";
handle.begin_session();
}
add_entry_from_logs(handle);
handle.end_session();
}
if (handle.get_begun_session()) {
LOG_FIRST_N(ERROR, 1) << log_location_prefix << "lpwal worker#" << handle.get_worker_number()
<< ": session should not be begun here";
handle.end_session();
}
}
}
Expand Down

0 comments on commit a7e4e3b

Please sign in to comment.