diff --git a/src/jogasaki/executor/process/impl/ops/scan.cpp b/src/jogasaki/executor/process/impl/ops/scan.cpp index 82180d85..ed2ec476 100644 --- a/src/jogasaki/executor/process/impl/ops/scan.cpp +++ b/src/jogasaki/executor/process/impl/ops/scan.cpp @@ -211,9 +211,11 @@ operation_status scan::operator()( //NOLINT(readability-function-cognitive-comp loop_count = 0; auto current_time = std::chrono::steady_clock::now(); auto elapsed_time = - std::chrono::duration_cast(current_time - previous_time); - if (elapsed_time.count() >= scan_yield_interval ) { - return {operation_status_kind::yield}; + std::chrono::duration_cast(current_time - previous_time); + if (elapsed_time.count() >= scan_yield_interval) { + ++ctx.yield_count_; + VLOG_LP(log_trace) << "scan operator yields count:" << ctx.yield_count_; + return {operation_status_kind::yield}; } } loop_count++; diff --git a/src/jogasaki/executor/process/impl/ops/scan_context.h b/src/jogasaki/executor/process/impl/ops/scan_context.h index 330866c7..712f40d2 100644 --- a/src/jogasaki/executor/process/impl/ops/scan_context.h +++ b/src/jogasaki/executor/process/impl/ops/scan_context.h @@ -77,6 +77,7 @@ class scan_context : public context_base { impl::scan_info const* scan_info_{}; data::aligned_buffer key_begin_{}; data::aligned_buffer key_end_{}; + std::size_t yield_count_{}; }; } diff --git a/src/jogasaki/scheduler/flat_task.cpp b/src/jogasaki/scheduler/flat_task.cpp index e1ea3524..071c91e3 100644 --- a/src/jogasaki/scheduler/flat_task.cpp +++ b/src/jogasaki/scheduler/flat_task.cpp @@ -145,6 +145,8 @@ bool flat_task::execute(tateyama::task_scheduler::context& ctx) { if (VLOG_IS_ON(log_debug_timing_event_fine)) { begin = clock::now(); } + VLOG_LP(log_trace) << "task begin " << *this << " job_id:" << utils::hex(req_context_->job()->id()) + << " kind:" << kind_ << " sticky:" << sticky_; bool ret = false; switch(kind_) { using kind = flat_task_kind; @@ -157,11 +159,13 @@ bool flat_task::execute(tateyama::task_scheduler::context& ctx) { case kind::load: load(); break; } std::chrono::time_point end{}; + std::size_t took_ns{}; if (VLOG_IS_ON(log_debug_timing_event_fine)) { end = clock::now(); + took_ns = std::chrono::duration_cast(end-begin).count(); } if(auto req_detail = job()->request()) { - req_detail->task_duration_ns() += std::chrono::duration_cast(end-begin).count(); + req_detail->task_duration_ns() += took_ns; ++req_detail->task_count(); if(sticky_) { ++req_detail->sticky_task_count(); @@ -170,6 +174,8 @@ bool flat_task::execute(tateyama::task_scheduler::context& ctx) { ++req_detail->task_steling_count(); } } + VLOG_LP(log_trace) << "task end " << *this << " took(ns):" << took_ns + << " job_id:" << utils::hex(req_context_->job()->id()) << " kind:" << kind_; return ret; } @@ -260,7 +266,7 @@ flat_task::identity_type flat_task::id() const { if (origin_) { return origin_->id(); } - return undefined_id; + return id_; } flat_task::flat_task( @@ -278,6 +284,7 @@ flat_task::flat_task( task_enum_tag_t, request_context* rctx ) noexcept: + id_(id_src_++), kind_(flat_task_kind::dag_events), req_context_(rctx) {} @@ -287,6 +294,7 @@ flat_task::flat_task( request_context* rctx, model::graph& g ) noexcept: + id_(id_src_++), kind_(flat_task_kind::bootstrap), req_context_(rctx), graph_(std::addressof(g)) @@ -335,6 +343,7 @@ flat_task::flat_task( task_enum_tag_t, request_context* rctx ) noexcept: + id_(id_src_++), kind_(flat_task_kind::teardown), req_context_(rctx) {} @@ -344,6 +353,7 @@ flat_task::flat_task( request_context* rctx, executor::common::write_statement* write ) noexcept: + id_(id_src_++), kind_(flat_task_kind::write), req_context_(rctx), write_(write), @@ -363,6 +373,7 @@ flat_task::flat_task( std::shared_ptr rctx, std::shared_ptr sctx ) noexcept: + id_(id_src_++), kind_(flat_task_kind::resolve), req_context_(std::move(rctx)), sctx_(std::move(sctx)) @@ -396,6 +407,7 @@ void flat_task::load() { flat_task::flat_task(task_enum_tag_t, request_context* rctx, std::shared_ptr ldr) noexcept: + id_(id_src_++), kind_(flat_task_kind::load), req_context_(rctx), loader_(std::move(ldr)) @@ -412,7 +424,4 @@ void flat_task::execute_wrapped() { } } -} - - - +} // namespace jogasaki::scheduler diff --git a/src/jogasaki/scheduler/flat_task.h b/src/jogasaki/scheduler/flat_task.h index 4268189d..d880d926 100644 --- a/src/jogasaki/scheduler/flat_task.h +++ b/src/jogasaki/scheduler/flat_task.h @@ -284,6 +284,7 @@ class cache_align flat_task { [[nodiscard]] request_context* req_context() const noexcept; private: + std::size_t id_{undefined_id}; flat_task_kind kind_{}; maybe_shared_ptr req_context_{}; std::shared_ptr origin_{}; @@ -293,6 +294,8 @@ class cache_align flat_task { std::shared_ptr sctx_{}; std::shared_ptr loader_{}; + static inline std::atomic_size_t id_src_{}; //NOLINT + /** * @return true if job completes together with the task * @return false if only task completes @@ -314,6 +317,7 @@ class cache_align flat_task { void execute_wrapped(); void resubmit(request_context& req_context); void finish_job(); + std::size_t assign_id(); std::ostream& write_to(std::ostream& out) const { using namespace std::string_view_literals;