Skip to content

Commit

Permalink
WIP944
Browse files Browse the repository at this point in the history
  • Loading branch information
ban-nobuhiro committed Sep 6, 2024
1 parent 5e00592 commit dca2a6a
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 32 deletions.
29 changes: 28 additions & 1 deletion src/concurrency_control/include/lpwal.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,21 @@ class handler {
return worker_number_;
}

[[nodiscard]] bool get_begun_session() const {
return begun_session_;
}

[[nodiscard]] epoch::epoch_t get_durable_epoch() const {
return durable_epoch_;
}

/**
* @pre take mtx of logs
*/
void push_log(log_record const& log) {
if (logs_.empty()) {
set_min_log_epoch(log.get_wv().get_major_write_version());
begin_session();
}
logs_.emplace_back(log);
}
Expand All @@ -196,6 +205,21 @@ class handler {

void set_worker_number(std::size_t wn) { worker_number_ = wn; }

// must call with mtx_log_
void begin_session() {
// assert begun_session_ = false
shirakami::begin_session(log_channel_ptr_);

Check failure on line 211 in src/concurrency_control/include/lpwal.h

View workflow job for this annotation

GitHub Actions / Clang-Tidy

clang-diagnostic-error

no type named 'begin_session' in namespace 'shirakami'

Check warning on line 211 in src/concurrency_control/include/lpwal.h

View workflow job for this annotation

GitHub Actions / Clang-Tidy

cppcoreguidelines-init-variables

variable 'log_channel_ptr_' is not initialized
durable_epoch_ = epoch::get_global_epoch();
begun_session_ = true;
}

// must call with mtx_log_
void end_session() {
// assert begun_session_ = true
shirakami::end_session(log_channel_ptr_);

Check failure on line 219 in src/concurrency_control/include/lpwal.h

View workflow job for this annotation

GitHub Actions / Clang-Tidy

clang-diagnostic-error

no type named 'end_session' in namespace 'shirakami'

Check warning on line 219 in src/concurrency_control/include/lpwal.h

View workflow job for this annotation

GitHub Actions / Clang-Tidy

cppcoreguidelines-init-variables

variable 'log_channel_ptr_' is not initialized
begun_session_ = false;
}

private:
/**
* @brief worker thread number used for logging callback.
Expand All @@ -208,7 +232,7 @@ class handler {
std::atomic<epoch::epoch_t> min_log_epoch_{0};

/**
* @brief mutex for logs_
* @brief mutex for logs_ and begun_session and durable_epoch
*/
std::mutex mtx_logs_;

Expand All @@ -217,6 +241,9 @@ class handler {
*/
logs_type logs_{};

bool begun_session_{false};
epoch::epoch_t durable_epoch_{};

/**
* @brief log channel
*/
Expand Down
35 changes: 21 additions & 14 deletions src/concurrency_control/interface/long_tx/termination.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -838,27 +838,20 @@ extern Status commit(session* const ti) {
// This must be after cc commit and before log process
ti->commit_sequence(ctid);

// log debug timing event
VLOG(log_debug_timing_event) << log_location_prefix_timing_event
<< "start_process_logging : " << str_tx_id;

#if defined(PWAL)
auto oldest_log_epoch{ti->get_lpwal_handle().get_min_log_epoch()};
// think the wal buffer is empty due to background thread's work
if (oldest_log_epoch != 0 && // mean the wal buffer is not empty.
oldest_log_epoch != epoch::get_global_epoch()) {
// should flush
shirakami::lpwal::flush_log(static_cast<void*>(ti));
}
#endif

// todo enhancement
/**
* Sort by wp and then globalize the local write set.
* Eliminate wp from those that have been globalized in wp units.
*/

auto this_dm = epoch::get_global_epoch();
#if defined(PWAL)
{
auto& handle = ti->get_lpwal_handle();
std::unique_lock lk{handle.get_mtx_logs()};
if (handle.get_begun_session()) { this_dm = handle.get_durable_epoch(); }
}
#endif

// about transaction state
process_tx_state(ti, this_dm);
Expand All @@ -872,6 +865,20 @@ extern Status commit(session* const ti) {
// call commit callback
ti->call_commit_callback(rc, {}, this_dm);

// log debug timing event
VLOG(log_debug_timing_event) << log_location_prefix_timing_event
<< "start_process_logging : " << str_tx_id;

#if defined(PWAL)
auto oldest_log_epoch{ti->get_lpwal_handle().get_min_log_epoch()};
// think the wal buffer is empty due to background thread's work
if (oldest_log_epoch != 0 && // mean the wal buffer is not empty.
oldest_log_epoch != epoch::get_global_epoch()) {
// should flush
shirakami::lpwal::flush_log(static_cast<void*>(ti));
}
#endif

} else {
LOG_FIRST_N(ERROR, 1) << "library programming error.";
}
Expand Down
23 changes: 12 additions & 11 deletions src/concurrency_control/interface/short_tx/termination.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -764,17 +764,6 @@ extern Status commit(session* const ti) {
// This must be after cc commit and before log process
ti->commit_sequence(ti->get_mrc_tid());

// flush log if need
#if defined(PWAL)
auto oldest_log_epoch{ti->get_lpwal_handle().get_min_log_epoch()};
// think the wal buffer is empty due to background thread's work
if (oldest_log_epoch != 0 && // mean the wal buffer is not empty.
oldest_log_epoch != epoch::get_global_epoch()) {
// should flush
shirakami::lpwal::flush_log(static_cast<void*>(ti));
}
#endif

auto this_dm = epoch::get_global_epoch();

// about tx state
Expand All @@ -787,6 +776,18 @@ extern Status commit(session* const ti) {
ti->set_result(reason_code::UNKNOWN);

ti->call_commit_callback(rc, {}, this_dm);

// flush log if need
#if defined(PWAL)
auto oldest_log_epoch{ti->get_lpwal_handle().get_min_log_epoch()};
// think the wal buffer is empty due to background thread's work
if (oldest_log_epoch != 0 && // mean the wal buffer is not empty.
oldest_log_epoch != epoch::get_global_epoch()) {
// should flush
shirakami::lpwal::flush_log(static_cast<void*>(ti));
}
#endif

return Status::OK;
}

Expand Down
32 changes: 26 additions & 6 deletions src/datastore/limestone/lpwal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,18 @@ void flush_log(Token token) {
if (handle.get_mtx_logs().try_lock()) {
// flush log if exist
if (!handle.get_logs().empty()) {
begin_session(handle.get_log_channel_ptr());
if (!handle.get_begun_session()) {
LOG_FIRST_N(ERROR, 1) << log_location_prefix << "lpwal worker#" << handle.get_worker_number()
<< ": session should be begun here";
handle.begin_session();
}
add_entry_from_logs(handle);
end_session(handle.get_log_channel_ptr());
handle.end_session();
}
if (handle.get_begun_session()) {
LOG_FIRST_N(ERROR, 1) << log_location_prefix << "lpwal worker#" << handle.get_worker_number()
<< ": session should not be begun here";
handle.end_session();
}

handle.get_mtx_logs().unlock();
Expand All @@ -127,10 +136,21 @@ void flush_log(Token token) {

void flush_remaining_log() {
for (auto&& es : session_table::get_session_table()) {
if (!es.get_lpwal_handle().get_logs().empty()) {
begin_session(es.get_lpwal_handle().get_log_channel_ptr());
add_entry_from_logs(es.get_lpwal_handle());
end_session(es.get_lpwal_handle().get_log_channel_ptr());
auto& handle = es.get_lpwal_handle();
std::unique_lock lk{handle.get_mtx_logs()};
if (!handle.get_logs().empty()) {
if (!handle.get_begun_session()) {
LOG_FIRST_N(ERROR, 1) << log_location_prefix << "lpwal worker#" << handle.get_worker_number()
<< ": session should be begun here";
handle.begin_session();
}
add_entry_from_logs(handle);
handle.end_session();
}
if (handle.get_begun_session()) {
LOG_FIRST_N(ERROR, 1) << log_location_prefix << "lpwal worker#" << handle.get_worker_number()
<< ": session should not be begun here";
handle.end_session();
}
}
}
Expand Down

0 comments on commit dca2a6a

Please sign in to comment.