Skip to content

Commit

Permalink
Merge pull request #120 from project-tsurugi/yield
Browse files Browse the repository at this point in the history
set scan_block_size default from 0 to 100 & add sql.scan_yield_interval config. param
  • Loading branch information
YoshiakiNishimura authored Oct 9, 2024
2 parents 11b23d0 + 31d5b18 commit fe498c1
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 3 deletions.
3 changes: 2 additions & 1 deletion examples/service_benchmark/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ DEFINE_int64(worker_suspend_timeout, 1000000, "duration in us before worker wake
DEFINE_bool(md, false, "output result to stdout as markdown table"); //NOLINT
DEFINE_bool(ddl, false, "issue ddl instead of using built-in table. Required for --secondary."); //NOLINT
DEFINE_bool(secondary, false, "use secondary index"); //NOLINT
DEFINE_int64(scan_block_size, 0, "max records processed by scan operator before yielding to other tasks"); //NOLINT
DEFINE_int64(scan_block_size, 100, "max records processed by scan operator before yielding to other tasks"); //NOLINT
DEFINE_int64(scan_yield_interval, 1, "max time (ms) processed by scan operator before yielding to other tasks"); //NOLINT

namespace tateyama::service_benchmark {

Expand Down
12 changes: 11 additions & 1 deletion include/jogasaki/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,14 @@ class configuration {
scan_block_size_ = arg;
}

[[nodiscard]] std::size_t scan_yield_interval() const noexcept {
return scan_yield_interval_;
}

void scan_yield_interval(std::size_t arg) noexcept {
scan_yield_interval_ = arg;
}

[[nodiscard]] std::int32_t zone_offset() const noexcept {
return zone_offset_;
}
Expand Down Expand Up @@ -518,6 +526,7 @@ class configuration {
print_non_default(lowercase_regular_identifiers);
print_non_default(zone_offset);
print_non_default(scan_block_size);
print_non_default(scan_yield_interval);
print_non_default(rtx_parallel_scan);

if(cfg.req_cancel_config()) {
Expand Down Expand Up @@ -574,7 +583,8 @@ class configuration {
std::shared_ptr<request_cancel_config> request_cancel_config_{};
bool lowercase_regular_identifiers_ = false;
std::int32_t zone_offset_ = 0;
std::size_t scan_block_size_ = 0;
std::size_t scan_block_size_ = 100;
std::size_t scan_yield_interval_ = 1;
bool rtx_parallel_scan_ = false;

};
Expand Down
1 change: 1 addition & 0 deletions src/jogasaki/api/impl/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ void dump_public_configurations(configuration const& cfg) {
LOGCFG << "(dev_lowercase_regular_identifiers) " << cfg.lowercase_regular_identifiers() << " : whether to lowercase regular identifiers";
LOGCFG << "(zone_offset) " << cfg.zone_offset() << " : system time zone offset in minutes";
LOGCFG << "(scan_block_size) " << cfg.scan_block_size() << " : max records processed by scan operator before yielding to other task";
LOGCFG << "(scan_yield_interval) " << cfg.scan_yield_interval() << " : max time (ms) processed by scan operator before yielding to other tasks";
LOGCFG << "(dev_rtx_parallel_scan) " << cfg.rtx_parallel_scan() << " : whether to enable parallel scan for RTX";
}

Expand Down
3 changes: 3 additions & 0 deletions src/jogasaki/api/resource/bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,9 @@ bool process_sql_config(std::shared_ptr<jogasaki::configuration>& ret, tateyama:
if (auto v = jogasaki_config->get<std::size_t>("scan_block_size")) {
ret->scan_block_size(v.value());
}
if (auto v = jogasaki_config->get<std::size_t>("scan_yield_interval")) {
ret->scan_yield_interval(v.value());
}
if (auto v = jogasaki_config->get<bool>("dev_rtx_parallel_scan")) {
ret->rtx_parallel_scan(v.value());
}
Expand Down
10 changes: 9 additions & 1 deletion src/jogasaki/executor/process/impl/ops/scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
#include "scan.h"

#include <chrono>
#include <cstddef>
#include <utility>
#include <vector>
Expand Down Expand Up @@ -160,6 +161,8 @@ operation_status scan::operator()( //NOLINT(readability-function-cognitive-comp
status st{};
std::size_t loop_count = 0;
auto scan_block_size = global::config_pool()->scan_block_size();
auto scan_yield_interval = static_cast<std::int64_t>(global::config_pool()->scan_yield_interval());
auto previous_time = std::chrono::steady_clock::now();
while(true) {
if(utils::request_cancel_enabled(request_cancel_kind::scan) && ctx.req_context()) {
auto res_src = ctx.req_context()->req_info().response_source();
Expand Down Expand Up @@ -205,7 +208,12 @@ operation_status scan::operator()( //NOLINT(readability-function-cognitive-comp
}
}
if (scan_block_size != 0 && scan_block_size == loop_count ){
return {operation_status_kind::yield};
auto current_time = std::chrono::steady_clock::now();
auto elapsed_time =
std::chrono::duration_cast<std::chrono::milliseconds>(current_time - previous_time);
if (elapsed_time.count() >= scan_yield_interval ) {
return {operation_status_kind::yield};
}
}
loop_count++;
}
Expand Down

0 comments on commit fe498c1

Please sign in to comment.