diff --git a/include/jogasaki/configuration.h b/include/jogasaki/configuration.h index 3f9d9a0b5..019222947 100644 --- a/include/jogasaki/configuration.h +++ b/include/jogasaki/configuration.h @@ -498,6 +498,14 @@ class configuration { scan_default_parallel_ = arg; } + [[nodiscard]] bool inplace_teardown() const noexcept { + return inplace_teardown_; + } + + void inplace_teardown(bool arg) noexcept { + inplace_teardown_ = arg; + } + friend inline std::ostream& operator<<(std::ostream& out, configuration const& cfg) { //NOLINTBEGIN @@ -555,6 +563,7 @@ class configuration { print_non_default(thousandths_ratio_check_local_first); print_non_default(direct_commit_callback); print_non_default(scan_default_parallel); + print_non_default(inplace_teardown); if(cfg.req_cancel_config()) { out << "req_cancel_config:" << *cfg.req_cancel_config() << " "; \ @@ -616,6 +625,7 @@ class configuration { std::size_t thousandths_ratio_check_local_first_ = 100; bool direct_commit_callback_ = false; std::size_t scan_default_parallel_ = 1; + bool inplace_teardown_ = true; }; diff --git a/src/jogasaki/api/impl/database.cpp b/src/jogasaki/api/impl/database.cpp index 64b44ab66..a72beac3d 100644 --- a/src/jogasaki/api/impl/database.cpp +++ b/src/jogasaki/api/impl/database.cpp @@ -181,6 +181,7 @@ void dump_public_configurations(configuration const& cfg) { LOGCFG << "(dev_thousandths_ratio_check_local_first) " << cfg.thousandths_ratio_check_local_first() << " : how frequently (represented as count out of 1000 executions) task scheduler checks local task queue first"; LOGCFG << "(dev_direct_commit_callback) " << cfg.direct_commit_callback() << " : whether to make callback directly from shirakami to client on pre-commit response (only for `available` and `accepted`)"; LOGCFG << "(scan_default_parallel) " << cfg.scan_default_parallel() << " : max parallel execution count of scan tasks"; + LOGCFG << "(dev_inplace_teardown) " << cfg.inplace_teardown() << " : whether to process teardown (job completion) directly on the current thread instead of scheduling a task for it"; } status database::start() { diff --git a/src/jogasaki/api/resource/bridge.cpp b/src/jogasaki/api/resource/bridge.cpp index 62348ec0b..36e6f4c8b 100644 --- a/src/jogasaki/api/resource/bridge.cpp +++ b/src/jogasaki/api/resource/bridge.cpp @@ -263,6 +263,9 @@ bool process_sql_config(std::shared_ptr& ret, tateyama: if (auto v = jogasaki_config->get("scan_default_parallel")) { ret->scan_default_parallel(v.value()); } + if (auto v = jogasaki_config->get("dev_inplace_teardown")) { + ret->inplace_teardown(v.value()); + } return true; } diff --git a/src/jogasaki/model/task.h b/src/jogasaki/model/task.h index fd27711eb..b27a41843 100644 --- a/src/jogasaki/model/task.h +++ b/src/jogasaki/model/task.h @@ -39,6 +39,11 @@ enum class task_result : std::size_t { */ complete, + /** + * @brief complete the task, remove it from the schedule and teardown the job + */ + complete_and_teardown, + /** * @brief sleep and detach thread from this task, the task needs wake-up to resume * @attention this is future functionality and not yet supported diff --git a/src/jogasaki/scheduler/flat_task.cpp b/src/jogasaki/scheduler/flat_task.cpp index 049379d95..a38474443 100644 --- a/src/jogasaki/scheduler/flat_task.cpp +++ b/src/jogasaki/scheduler/flat_task.cpp @@ -73,6 +73,21 @@ void flat_task::dag_schedule() { log_exit << *this; } +bool check_or_submit_teardown( + request_context& req_context, + bool calling_from_task, + bool force, + bool try_on_suspended_worker +) { + if(global::config_pool()->inplace_teardown()) { + if(ready_to_finish(*req_context.job(), calling_from_task)) { + return true; + } + } + submit_teardown(req_context, force, try_on_suspended_worker); + return false; +} + void submit_teardown(request_context& req_context, bool force, bool try_on_suspended_worker) { // make sure teardown task is submitted only once auto& ts = *req_context.scheduler(); @@ -103,39 +118,46 @@ void flat_task::resubmit(request_context& req_context) { ts.schedule_task(flat_task{*this}); } -bool flat_task::teardown() { //NOLINT(readability-make-member-function-const) +bool ready_to_finish(job_context& job, bool calling_from_task) { //NOLINT(readability-make-member-function-const) // stop log_entry/log_exit since this function is called frequently as part of durability callback processing // log_entry << *this; trace_scope_name("teardown"); //NOLINT bool ret = true; - if (auto cnt = job()->task_count().load(); cnt > 0) { - DVLOG_LP(log_debug) << *this << " other " << cnt << " tasks remain and teardown is rescheduled."; + std::size_t expected_task_count = calling_from_task ? 1 : 0; + if (auto cnt = job.task_count().load(); cnt > expected_task_count) { + VLOG_LP(log_debug) << job << " other " << cnt << " tasks remain and teardown is (re)scheduled"; // Another teardown task will be scheduled at the end of this task. // It's not done here because newly scheduled task quickly completes and destroy job context. ret = false; - } else if (job()->completion_readiness() && !job()->completion_readiness()()) { - DVLOG_LP(log_debug) << *this << " job completion is not ready and teardown is rescheduled."; + } else if (job.completion_readiness() && !job.completion_readiness()()) { + VLOG_LP(log_debug) << job << " job completion is not ready and teardown is (re)scheduled"; ret = false; } // log_exit << *this << " ret:" << ret; return ret; } -void flat_task::write() { +bool flat_task::write() { log_entry << *this; + bool ret = false; if(utils::request_cancel_enabled(request_cancel_kind::write)) { auto res_src = req_context_->req_info().response_source(); if(res_src && res_src->check_cancel()) { set_cancel_status(*req_context_); - submit_teardown(*req_context_); + if(check_or_submit_teardown(*req_context_, true)) { + ret = true; + }; log_exit << *this; - return; + return ret; } } trace_scope_name("write"); //NOLINT (*write_)(*req_context_); - submit_teardown(*req_context_); + if(check_or_submit_teardown(*req_context_, true)) { + ret = true; + }; log_exit << *this; + return ret; } bool flat_task::execute(tateyama::task_scheduler::context& ctx) { @@ -148,16 +170,16 @@ bool flat_task::execute(tateyama::task_scheduler::context& ctx) { VLOG_LP(log_trace_fine) << "task begin " << *this << " job_id:" << utils::hex(req_context_->job()->id()) << " kind:" << kind_ << " sticky:" << sticky_ << " worker:" << ctx.index() << " stolen:" << ctx.task_is_stolen() << " last_steal_from:" << ctx.last_steal_from(); - bool ret = false; + bool to_finish_job = false; switch(kind_) { using kind = flat_task_kind; case kind::dag_events: dag_schedule(); break; case kind::bootstrap: bootstrap(ctx); break; case kind::resolve: resolve(ctx); break; - case kind::teardown: ret = teardown(); break; - case kind::wrapped: execute_wrapped(); break; - case kind::write: write(); break; - case kind::load: load(); break; + case kind::teardown: to_finish_job = ready_to_finish(*job(), false); break; + case kind::wrapped: to_finish_job = execute_wrapped(); break; + case kind::write: to_finish_job = write(); break; + case kind::load: to_finish_job = load(); break; } std::chrono::time_point end{}; std::size_t took_ns{}; @@ -179,21 +201,22 @@ bool flat_task::execute(tateyama::task_scheduler::context& ctx) { << " job_id:" << utils::hex(req_context_->job()->id()) << " kind:" << kind_ << " sticky:" << sticky_ << " worker:" << ctx.index() << " stolen:" << ctx.task_is_stolen(); - return ret; + return to_finish_job; } -void flat_task::finish_job() { +void finish_job(request_context& req_context) { // job completed, and the latch needs to be released - auto& ts = *req_context_->scheduler(); - auto& j = *job(); + auto& ts = *req_context.scheduler(); + auto& j = *req_context.job(); auto& cb = j.callback(); auto req_detail = j.request(); if(cb) { cb(); } + VLOG_LP(log_trace_fine) << "job teardown job_id:" << utils::hex(req_context.job()->id()); if(req_detail) { req_detail->status(scheduler::request_detail_status::finishing); - log_request(*req_detail, req_context_->status_code() == status::ok); + log_request(*req_detail, req_context.status_code() == status::ok); VLOG(log_debug_timing_event_fine) << "/:jogasaki:metrics:task_time" << " job_id:" << utils::hex(req_detail->id()) @@ -252,17 +275,21 @@ void flat_task::operator()(tateyama::task_scheduler::context& ctx) { (void)cnt; (void)jobid; //VLOG_LP(log_debug) << "decremented job " << jobid << " task count to " << cnt; - return; + if(! job_completes) { + return; + } } - // teardown task + // teardown task or job_completes=true if(! job_completes) { + // teardown task is not ready to finish_job + // Submitting teardown should be done at the end of the task since otherwise new teardown finish fast // and start destroying job context, which can be touched by this task. submit_teardown(*req_context_, true); return; } - finish_job(); + finish_job(*req_context_); } flat_task::identity_type flat_task::id() const { @@ -382,10 +409,11 @@ flat_task::flat_task( sctx_(std::move(sctx)) {} -void flat_task::load() { +bool flat_task::load() { log_entry << *this; trace_scope_name("load"); //NOLINT auto res = (*loader_)(); + bool ret = false; if(res == executor::file::loader_result::running) { auto& ts = *req_context_->scheduler(); ts.schedule_task(flat_task{ @@ -406,6 +434,7 @@ void flat_task::load() { submit_teardown(*req_context_); } log_exit << *this; + return ret; } flat_task::flat_task(task_enum_tag_t, request_context* rctx, @@ -416,15 +445,16 @@ flat_task::flat_task(task_enum_tag_t, request_context* rct loader_(std::move(ldr)) {} -void flat_task::execute_wrapped() { +bool flat_task::execute_wrapped() { //DVLOG(log_trace) << *this << " wrapped task executed."; trace_scope_name("executor_task"); //NOLINT model::task_result res{}; while((res = (*origin_)()) == model::task_result::proceed) {} if(res == model::task_result::yield) { resubmit(*req_context_); - return; + return false; } + return res == model::task_result::complete_and_teardown; } } // namespace jogasaki::scheduler diff --git a/src/jogasaki/scheduler/flat_task.h b/src/jogasaki/scheduler/flat_task.h index fe8a9cb40..969fb0c50 100644 --- a/src/jogasaki/scheduler/flat_task.h +++ b/src/jogasaki/scheduler/flat_task.h @@ -135,6 +135,13 @@ struct statement_context { void submit_teardown(request_context& req_context, bool force = false, bool try_on_suspended_worker = false); +bool check_or_submit_teardown( + request_context& req_context, + bool calling_from_task = false, + bool force = false, + bool try_on_suspended_worker = false +); + /** * @brief common task object * @details The task object used commonly for the jogasaki::scheduler::task_scheduler. @@ -305,18 +312,12 @@ class cache_align flat_task { void bootstrap(tateyama::task_scheduler::context& ctx); void dag_schedule(); - /** - * @return true if the teardown task completes - * @return false if the teardown task is rescheduled - */ - bool teardown(); void resolve(tateyama::task_scheduler::context& ctx); - void write(); - void load(); - void execute_wrapped(); + bool write(); + bool load(); + bool execute_wrapped(); void resubmit(request_context& req_context); - void finish_job(); std::ostream& write_to(std::ostream& out) const { using namespace std::string_view_literals; @@ -325,6 +326,19 @@ class cache_align flat_task { }; +/** + * @brief function to check job is ready to finish + * @return true if there is no other tasks for the job and completion is ready + * @return false otherwise + */ +bool ready_to_finish(job_context& job, bool calling_from_task); + +/** + * @brief finish the job + * @details this function doesn't check any condition for teardown, so use only when you are sure the job is ready to finish + */ +void finish_job(request_context& req_context); + void print_task_diagnostic(flat_task const& t, std::ostream& os); } diff --git a/src/jogasaki/scheduler/job_context.h b/src/jogasaki/scheduler/job_context.h index 3391d008b..1a114a3fa 100644 --- a/src/jogasaki/scheduler/job_context.h +++ b/src/jogasaki/scheduler/job_context.h @@ -24,6 +24,7 @@ #include #include +#include #include #include @@ -155,6 +156,15 @@ class cache_align job_context { [[nodiscard]] std::atomic& hybrid_execution_mode() noexcept { return hybrid_execution_mode_; } + + /** + * @brief dump the text representation of the value to output stream + * @param out the target output stream + * @param value the value to be output + */ + friend std::ostream& operator<<(std::ostream& out, job_context const& value) { + return value.write_to(out); + } private: job_id_type id_{id_src_++}; @@ -169,9 +179,13 @@ class cache_align job_context { cache_align std::atomic hybrid_execution_mode_{hybrid_execution_mode_kind::undefined}; static inline std::atomic_size_t id_src_{1UL << 32UL}; //NOLINT -}; -} + std::ostream& write_to(std::ostream& out) const { + using namespace std::string_view_literals; + return out << "job_id:"sv << utils::hex(id()); + } +}; -} +} // namespace scheduler +} // namespace jogasaki diff --git a/test/jogasaki/scheduler/teardown_test.cpp b/test/jogasaki/scheduler/teardown_test.cpp index b6927cc40..d222e9344 100644 --- a/test/jogasaki/scheduler/teardown_test.cpp +++ b/test/jogasaki/scheduler/teardown_test.cpp @@ -103,7 +103,10 @@ TEST_F(teardown_test, basic) { } auto s = teardown_task_submitted.load(); if(!s && teardown_task_submitted.compare_exchange_strong(s, true)) { - submit_teardown(*rctx); + if(check_or_submit_teardown(*rctx, true)) { + ++completed_task_count; + return model::task_result::complete_and_teardown; + } } ++completed_task_count; return model::task_result::complete;