Skip to content

Commit

Permalink
implement in-place teardown for insert operation
Browse files Browse the repository at this point in the history
  • Loading branch information
kuron99 committed Oct 31, 2024
1 parent 173de03 commit 1336e5e
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 38 deletions.
10 changes: 10 additions & 0 deletions include/jogasaki/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,14 @@ class configuration {
scan_default_parallel_ = arg;
}

[[nodiscard]] bool inplace_teardown() const noexcept {
return inplace_teardown_;
}

void inplace_teardown(bool arg) noexcept {
inplace_teardown_ = arg;
}

friend inline std::ostream& operator<<(std::ostream& out, configuration const& cfg) {

//NOLINTBEGIN
Expand Down Expand Up @@ -555,6 +563,7 @@ class configuration {
print_non_default(thousandths_ratio_check_local_first);
print_non_default(direct_commit_callback);
print_non_default(scan_default_parallel);
print_non_default(inplace_teardown);

if(cfg.req_cancel_config()) {
out << "req_cancel_config:" << *cfg.req_cancel_config() << " "; \
Expand Down Expand Up @@ -616,6 +625,7 @@ class configuration {
std::size_t thousandths_ratio_check_local_first_ = 100;
bool direct_commit_callback_ = false;
std::size_t scan_default_parallel_ = 1;
bool inplace_teardown_ = true;

};

Expand Down
1 change: 1 addition & 0 deletions src/jogasaki/api/impl/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ void dump_public_configurations(configuration const& cfg) {
LOGCFG << "(dev_thousandths_ratio_check_local_first) " << cfg.thousandths_ratio_check_local_first() << " : how frequently (represented as count out of 1000 executions) task scheduler checks local task queue first";
LOGCFG << "(dev_direct_commit_callback) " << cfg.direct_commit_callback() << " : whether to make callback directly from shirakami to client on pre-commit response (only for `available` and `accepted`)";
LOGCFG << "(scan_default_parallel) " << cfg.scan_default_parallel() << " : max parallel execution count of scan tasks";
LOGCFG << "(dev_inplace_teardown) " << cfg.inplace_teardown() << " : whether to process teardown (job completion) directly on the current thread instead of scheduling a task for it";
}

status database::start() {
Expand Down
3 changes: 3 additions & 0 deletions src/jogasaki/api/resource/bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,9 @@ bool process_sql_config(std::shared_ptr<jogasaki::configuration>& ret, tateyama:
if (auto v = jogasaki_config->get<std::size_t>("scan_default_parallel")) {
ret->scan_default_parallel(v.value());
}
if (auto v = jogasaki_config->get<bool>("dev_inplace_teardown")) {
ret->inplace_teardown(v.value());
}
return true;
}

Expand Down
5 changes: 5 additions & 0 deletions src/jogasaki/model/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ enum class task_result : std::size_t {
*/
complete,

/**
* @brief complete the task, remove it from the schedule and teardown the job
*/
complete_and_teardown,

/**
* @brief sleep and detach thread from this task, the task needs wake-up to resume
* @attention this is future functionality and not yet supported
Expand Down
80 changes: 55 additions & 25 deletions src/jogasaki/scheduler/flat_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,21 @@ void flat_task::dag_schedule() {
log_exit << *this;
}

bool check_or_submit_teardown(
request_context& req_context,
bool calling_from_task,
bool force,
bool try_on_suspended_worker
) {
if(global::config_pool()->inplace_teardown()) {
if(ready_to_finish(*req_context.job(), calling_from_task)) {
return true;
}
}
submit_teardown(req_context, force, try_on_suspended_worker);
return false;
}

void submit_teardown(request_context& req_context, bool force, bool try_on_suspended_worker) {
// make sure teardown task is submitted only once
auto& ts = *req_context.scheduler();
Expand Down Expand Up @@ -103,39 +118,46 @@ void flat_task::resubmit(request_context& req_context) {
ts.schedule_task(flat_task{*this});
}

bool flat_task::teardown() { //NOLINT(readability-make-member-function-const)
bool ready_to_finish(job_context& job, bool calling_from_task) { //NOLINT(readability-make-member-function-const)
// stop log_entry/log_exit since this function is called frequently as part of durability callback processing
// log_entry << *this;
trace_scope_name("teardown"); //NOLINT
bool ret = true;
if (auto cnt = job()->task_count().load(); cnt > 0) {
DVLOG_LP(log_debug) << *this << " other " << cnt << " tasks remain and teardown is rescheduled.";
std::size_t expected_task_count = calling_from_task ? 1 : 0;
if (auto cnt = job.task_count().load(); cnt > expected_task_count) {
VLOG_LP(log_debug) << job << " other " << cnt << " tasks remain and teardown is (re)scheduled";
// Another teardown task will be scheduled at the end of this task.
// It's not done here because newly scheduled task quickly completes and destroy job context.
ret = false;
} else if (job()->completion_readiness() && !job()->completion_readiness()()) {
DVLOG_LP(log_debug) << *this << " job completion is not ready and teardown is rescheduled.";
} else if (job.completion_readiness() && !job.completion_readiness()()) {
VLOG_LP(log_debug) << job << " job completion is not ready and teardown is (re)scheduled";
ret = false;
}
// log_exit << *this << " ret:" << ret;
return ret;
}

void flat_task::write() {
bool flat_task::write() {
log_entry << *this;
bool ret = false;
if(utils::request_cancel_enabled(request_cancel_kind::write)) {
auto res_src = req_context_->req_info().response_source();
if(res_src && res_src->check_cancel()) {
set_cancel_status(*req_context_);
submit_teardown(*req_context_);
if(check_or_submit_teardown(*req_context_, true)) {
ret = true;
};
log_exit << *this;
return;
return ret;
}
}
trace_scope_name("write"); //NOLINT
(*write_)(*req_context_);
submit_teardown(*req_context_);
if(check_or_submit_teardown(*req_context_, true)) {
ret = true;
};
log_exit << *this;
return ret;
}

bool flat_task::execute(tateyama::task_scheduler::context& ctx) {
Expand All @@ -148,16 +170,16 @@ bool flat_task::execute(tateyama::task_scheduler::context& ctx) {
VLOG_LP(log_trace_fine) << "task begin " << *this << " job_id:" << utils::hex(req_context_->job()->id())
<< " kind:" << kind_ << " sticky:" << sticky_ << " worker:" << ctx.index()
<< " stolen:" << ctx.task_is_stolen() << " last_steal_from:" << ctx.last_steal_from();
bool ret = false;
bool to_finish_job = false;
switch(kind_) {
using kind = flat_task_kind;
case kind::dag_events: dag_schedule(); break;
case kind::bootstrap: bootstrap(ctx); break;
case kind::resolve: resolve(ctx); break;
case kind::teardown: ret = teardown(); break;
case kind::wrapped: execute_wrapped(); break;
case kind::write: write(); break;
case kind::load: load(); break;
case kind::teardown: to_finish_job = ready_to_finish(*job(), false); break;
case kind::wrapped: to_finish_job = execute_wrapped(); break;
case kind::write: to_finish_job = write(); break;
case kind::load: to_finish_job = load(); break;
}
std::chrono::time_point<clock> end{};
std::size_t took_ns{};
Expand All @@ -179,21 +201,22 @@ bool flat_task::execute(tateyama::task_scheduler::context& ctx) {
<< " job_id:" << utils::hex(req_context_->job()->id()) << " kind:" << kind_
<< " sticky:" << sticky_ << " worker:" << ctx.index() << " stolen:" << ctx.task_is_stolen();

return ret;
return to_finish_job;
}

void flat_task::finish_job() {
void finish_job(request_context& req_context) {
// job completed, and the latch needs to be released
auto& ts = *req_context_->scheduler();
auto& j = *job();
auto& ts = *req_context.scheduler();
auto& j = *req_context.job();
auto& cb = j.callback();
auto req_detail = j.request();
if(cb) {
cb();
}
VLOG_LP(log_trace_fine) << "job teardown job_id:" << utils::hex(req_context.job()->id());
if(req_detail) {
req_detail->status(scheduler::request_detail_status::finishing);
log_request(*req_detail, req_context_->status_code() == status::ok);
log_request(*req_detail, req_context.status_code() == status::ok);

VLOG(log_debug_timing_event_fine) << "/:jogasaki:metrics:task_time"
<< " job_id:" << utils::hex(req_detail->id())
Expand Down Expand Up @@ -252,17 +275,21 @@ void flat_task::operator()(tateyama::task_scheduler::context& ctx) {
(void)cnt;
(void)jobid;
//VLOG_LP(log_debug) << "decremented job " << jobid << " task count to " << cnt;
return;
if(! job_completes) {
return;
}
}

// teardown task
// teardown task or job_completes=true
if(! job_completes) {
// teardown task is not ready to finish_job

// Submitting teardown should be done at the end of the task since otherwise new teardown finish fast
// and start destroying job context, which can be touched by this task.
submit_teardown(*req_context_, true);
return;
}
finish_job();
finish_job(*req_context_);
}

flat_task::identity_type flat_task::id() const {
Expand Down Expand Up @@ -382,10 +409,11 @@ flat_task::flat_task(
sctx_(std::move(sctx))
{}

void flat_task::load() {
bool flat_task::load() {
log_entry << *this;
trace_scope_name("load"); //NOLINT
auto res = (*loader_)();
bool ret = false;
if(res == executor::file::loader_result::running) {
auto& ts = *req_context_->scheduler();
ts.schedule_task(flat_task{
Expand All @@ -406,6 +434,7 @@ void flat_task::load() {
submit_teardown(*req_context_);
}
log_exit << *this;
return ret;
}

flat_task::flat_task(task_enum_tag_t<flat_task_kind::load>, request_context* rctx,
Expand All @@ -416,15 +445,16 @@ flat_task::flat_task(task_enum_tag_t<flat_task_kind::load>, request_context* rct
loader_(std::move(ldr))
{}

void flat_task::execute_wrapped() {
bool flat_task::execute_wrapped() {
//DVLOG(log_trace) << *this << " wrapped task executed.";
trace_scope_name("executor_task"); //NOLINT
model::task_result res{};
while((res = (*origin_)()) == model::task_result::proceed) {}
if(res == model::task_result::yield) {
resubmit(*req_context_);
return;
return false;
}
return res == model::task_result::complete_and_teardown;
}

} // namespace jogasaki::scheduler
32 changes: 23 additions & 9 deletions src/jogasaki/scheduler/flat_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ struct statement_context {

void submit_teardown(request_context& req_context, bool force = false, bool try_on_suspended_worker = false);

bool check_or_submit_teardown(
request_context& req_context,
bool calling_from_task = false,
bool force = false,
bool try_on_suspended_worker = false
);

/**
* @brief common task object
* @details The task object used commonly for the jogasaki::scheduler::task_scheduler.
Expand Down Expand Up @@ -305,18 +312,12 @@ class cache_align flat_task {
void bootstrap(tateyama::task_scheduler::context& ctx);
void dag_schedule();

/**
* @return true if the teardown task completes
* @return false if the teardown task is rescheduled
*/
bool teardown();
void resolve(tateyama::task_scheduler::context& ctx);

void write();
void load();
void execute_wrapped();
bool write();
bool load();
bool execute_wrapped();
void resubmit(request_context& req_context);
void finish_job();

std::ostream& write_to(std::ostream& out) const {
using namespace std::string_view_literals;
Expand All @@ -325,6 +326,19 @@ class cache_align flat_task {

};

/**
* @brief function to check job is ready to finish
* @return true if there is no other tasks for the job and completion is ready
* @return false otherwise
*/
bool ready_to_finish(job_context& job, bool calling_from_task);

/**
* @brief finish the job
* @details this function doesn't check any condition for teardown, so use only when you are sure the job is ready to finish
*/
void finish_job(request_context& req_context);

void print_task_diagnostic(flat_task const& t, std::ostream& os);

}
20 changes: 17 additions & 3 deletions src/jogasaki/scheduler/job_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include <jogasaki/scheduler/hybrid_execution_mode.h>
#include <jogasaki/scheduler/request_detail.h>
#include <jogasaki/utils/hex.h>
#include <jogasaki/utils/interference_size.h>
#include <jogasaki/utils/latch.h>

Expand Down Expand Up @@ -155,6 +156,15 @@ class cache_align job_context {
[[nodiscard]] std::atomic<hybrid_execution_mode_kind>& hybrid_execution_mode() noexcept {
return hybrid_execution_mode_;
}

/**
* @brief dump the text representation of the value to output stream
* @param out the target output stream
* @param value the value to be output
*/
friend std::ostream& operator<<(std::ostream& out, job_context const& value) {
return value.write_to(out);
}
private:

job_id_type id_{id_src_++};
Expand All @@ -169,9 +179,13 @@ class cache_align job_context {
cache_align std::atomic<hybrid_execution_mode_kind> hybrid_execution_mode_{hybrid_execution_mode_kind::undefined};

static inline std::atomic_size_t id_src_{1UL << 32UL}; //NOLINT
};

}
std::ostream& write_to(std::ostream& out) const {
using namespace std::string_view_literals;
return out << "job_id:"sv << utils::hex(id());
}
};

}
} // namespace scheduler

} // namespace jogasaki
5 changes: 4 additions & 1 deletion test/jogasaki/scheduler/teardown_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,10 @@ TEST_F(teardown_test, basic) {
}
auto s = teardown_task_submitted.load();
if(!s && teardown_task_submitted.compare_exchange_strong(s, true)) {
submit_teardown(*rctx);
if(check_or_submit_teardown(*rctx, true)) {
++completed_task_count;
return model::task_result::complete_and_teardown;
}
}
++completed_task_count;
return model::task_result::complete;
Expand Down

0 comments on commit 1336e5e

Please sign in to comment.