Skip to content

Commit

Permalink
improve trace log for tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
kuron99 committed Oct 24, 2024
1 parent 511575d commit 74f31d1
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 9 deletions.
8 changes: 5 additions & 3 deletions src/jogasaki/executor/process/impl/ops/scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,11 @@ operation_status scan::operator()( //NOLINT(readability-function-cognitive-comp
loop_count = 0;
auto current_time = std::chrono::steady_clock::now();
auto elapsed_time =
std::chrono::duration_cast<std::chrono::milliseconds>(current_time - previous_time);
if (elapsed_time.count() >= scan_yield_interval ) {
return {operation_status_kind::yield};
std::chrono::duration_cast<std::chrono::milliseconds>(current_time - previous_time);
if (elapsed_time.count() >= scan_yield_interval) {
++ctx.yield_count_;
VLOG_LP(log_trace) << "scan operator yields count:" << ctx.yield_count_;
return {operation_status_kind::yield};
}
}
loop_count++;
Expand Down
1 change: 1 addition & 0 deletions src/jogasaki/executor/process/impl/ops/scan_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class scan_context : public context_base {
impl::scan_info const* scan_info_{};
data::aligned_buffer key_begin_{};
data::aligned_buffer key_end_{};
std::size_t yield_count_{};
};

}
Expand Down
21 changes: 15 additions & 6 deletions src/jogasaki/scheduler/flat_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ bool flat_task::execute(tateyama::task_scheduler::context& ctx) {
if (VLOG_IS_ON(log_debug_timing_event_fine)) {
begin = clock::now();
}
VLOG_LP(log_trace) << "task begin " << *this << " job_id:" << utils::hex(req_context_->job()->id())
<< " kind:" << kind_ << " sticky:" << sticky_;
bool ret = false;
switch(kind_) {
using kind = flat_task_kind;
Expand All @@ -157,11 +159,13 @@ bool flat_task::execute(tateyama::task_scheduler::context& ctx) {
case kind::load: load(); break;
}
std::chrono::time_point<clock> end{};
std::size_t took_ns{};
if (VLOG_IS_ON(log_debug_timing_event_fine)) {
end = clock::now();
took_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(end-begin).count();
}
if(auto req_detail = job()->request()) {
req_detail->task_duration_ns() += std::chrono::duration_cast<std::chrono::nanoseconds>(end-begin).count();
req_detail->task_duration_ns() += took_ns;
++req_detail->task_count();
if(sticky_) {
++req_detail->sticky_task_count();
Expand All @@ -170,6 +174,8 @@ bool flat_task::execute(tateyama::task_scheduler::context& ctx) {
++req_detail->task_steling_count();
}
}
VLOG_LP(log_trace) << "task end " << *this << " took(ns):" << took_ns
<< " job_id:" << utils::hex(req_context_->job()->id()) << " kind:" << kind_;
return ret;
}

Expand Down Expand Up @@ -260,7 +266,7 @@ flat_task::identity_type flat_task::id() const {
if (origin_) {
return origin_->id();
}
return undefined_id;
return id_;
}

flat_task::flat_task(
Expand All @@ -278,6 +284,7 @@ flat_task::flat_task(
task_enum_tag_t<flat_task_kind::dag_events>,
request_context* rctx
) noexcept:
id_(id_src_++),
kind_(flat_task_kind::dag_events),
req_context_(rctx)
{}
Expand All @@ -287,6 +294,7 @@ flat_task::flat_task(
request_context* rctx,
model::graph& g
) noexcept:
id_(id_src_++),
kind_(flat_task_kind::bootstrap),
req_context_(rctx),
graph_(std::addressof(g))
Expand Down Expand Up @@ -335,6 +343,7 @@ flat_task::flat_task(
task_enum_tag_t<flat_task_kind::teardown>,
request_context* rctx
) noexcept:
id_(id_src_++),
kind_(flat_task_kind::teardown),
req_context_(rctx)
{}
Expand All @@ -344,6 +353,7 @@ flat_task::flat_task(
request_context* rctx,
executor::common::write_statement* write
) noexcept:
id_(id_src_++),
kind_(flat_task_kind::write),
req_context_(rctx),
write_(write),
Expand All @@ -363,6 +373,7 @@ flat_task::flat_task(
std::shared_ptr<request_context> rctx,
std::shared_ptr<statement_context> sctx
) noexcept:
id_(id_src_++),
kind_(flat_task_kind::resolve),
req_context_(std::move(rctx)),
sctx_(std::move(sctx))
Expand Down Expand Up @@ -396,6 +407,7 @@ void flat_task::load() {

flat_task::flat_task(task_enum_tag_t<flat_task_kind::load>, request_context* rctx,
std::shared_ptr<executor::file::loader> ldr) noexcept:
id_(id_src_++),
kind_(flat_task_kind::load),
req_context_(rctx),
loader_(std::move(ldr))
Expand All @@ -412,7 +424,4 @@ void flat_task::execute_wrapped() {
}
}

}



} // namespace jogasaki::scheduler
4 changes: 4 additions & 0 deletions src/jogasaki/scheduler/flat_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ class cache_align flat_task {
[[nodiscard]] request_context* req_context() const noexcept;

private:
std::size_t id_{undefined_id};
flat_task_kind kind_{};
maybe_shared_ptr<request_context> req_context_{};
std::shared_ptr<model::task> origin_{};
Expand All @@ -293,6 +294,8 @@ class cache_align flat_task {
std::shared_ptr<statement_context> sctx_{};
std::shared_ptr<executor::file::loader> loader_{};

static inline std::atomic_size_t id_src_{}; //NOLINT

/**
* @return true if job completes together with the task
* @return false if only task completes
Expand All @@ -314,6 +317,7 @@ class cache_align flat_task {
void execute_wrapped();
void resubmit(request_context& req_context);
void finish_job();
std::size_t assign_id();

std::ostream& write_to(std::ostream& out) const {
using namespace std::string_view_literals;
Expand Down

0 comments on commit 74f31d1

Please sign in to comment.