From 1b76e6ff7e20632af0ad4ad923b72ee89310b9da Mon Sep 17 00:00:00 2001 From: Yoshiaki Nishimura Date: Wed, 9 Oct 2024 11:23:39 +0900 Subject: [PATCH 1/2] set scan_block_size default from 0 to 100 --- examples/service_benchmark/main.cpp | 2 +- include/jogasaki/configuration.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/service_benchmark/main.cpp b/examples/service_benchmark/main.cpp index da10c6620..a1ee19f03 100644 --- a/examples/service_benchmark/main.cpp +++ b/examples/service_benchmark/main.cpp @@ -110,7 +110,7 @@ DEFINE_int64(worker_suspend_timeout, 1000000, "duration in us before worker wake DEFINE_bool(md, false, "output result to stdout as markdown table"); //NOLINT DEFINE_bool(ddl, false, "issue ddl instead of using built-in table. Required for --secondary."); //NOLINT DEFINE_bool(secondary, false, "use secondary index"); //NOLINT -DEFINE_int64(scan_block_size, 0, "max records processed by scan operator before yielding to other tasks"); //NOLINT +DEFINE_int64(scan_block_size, 100, "max records processed by scan operator before yielding to other tasks"); //NOLINT namespace tateyama::service_benchmark { diff --git a/include/jogasaki/configuration.h b/include/jogasaki/configuration.h index a1a9ebd2a..17d0e04d5 100644 --- a/include/jogasaki/configuration.h +++ b/include/jogasaki/configuration.h @@ -574,7 +574,7 @@ class configuration { std::shared_ptr request_cancel_config_{}; bool lowercase_regular_identifiers_ = false; std::int32_t zone_offset_ = 0; - std::size_t scan_block_size_ = 0; + std::size_t scan_block_size_ = 100; bool rtx_parallel_scan_ = false; }; From 31d5b189b6302e6973c1994bd3ce405dba7eae5b Mon Sep 17 00:00:00 2001 From: Yoshiaki Nishimura Date: Wed, 9 Oct 2024 11:39:53 +0900 Subject: [PATCH 2/2] add sql.scan_yield_interval config. param. --- examples/service_benchmark/main.cpp | 1 + include/jogasaki/configuration.h | 10 ++++++++++ src/jogasaki/api/impl/database.cpp | 1 + src/jogasaki/api/resource/bridge.cpp | 3 +++ src/jogasaki/executor/process/impl/ops/scan.cpp | 10 +++++++++- 5 files changed, 24 insertions(+), 1 deletion(-) diff --git a/examples/service_benchmark/main.cpp b/examples/service_benchmark/main.cpp index a1ee19f03..afc9af3da 100644 --- a/examples/service_benchmark/main.cpp +++ b/examples/service_benchmark/main.cpp @@ -111,6 +111,7 @@ DEFINE_bool(md, false, "output result to stdout as markdown table"); //NOLINT DEFINE_bool(ddl, false, "issue ddl instead of using built-in table. Required for --secondary."); //NOLINT DEFINE_bool(secondary, false, "use secondary index"); //NOLINT DEFINE_int64(scan_block_size, 100, "max records processed by scan operator before yielding to other tasks"); //NOLINT +DEFINE_int64(scan_yield_interval, 1, "max time (ms) processed by scan operator before yielding to other tasks"); //NOLINT namespace tateyama::service_benchmark { diff --git a/include/jogasaki/configuration.h b/include/jogasaki/configuration.h index 17d0e04d5..3826f85d7 100644 --- a/include/jogasaki/configuration.h +++ b/include/jogasaki/configuration.h @@ -450,6 +450,14 @@ class configuration { scan_block_size_ = arg; } + [[nodiscard]] std::size_t scan_yield_interval() const noexcept { + return scan_yield_interval_; + } + + void scan_yield_interval(std::size_t arg) noexcept { + scan_yield_interval_ = arg; + } + [[nodiscard]] std::int32_t zone_offset() const noexcept { return zone_offset_; } @@ -518,6 +526,7 @@ class configuration { print_non_default(lowercase_regular_identifiers); print_non_default(zone_offset); print_non_default(scan_block_size); + print_non_default(scan_yield_interval); print_non_default(rtx_parallel_scan); if(cfg.req_cancel_config()) { @@ -575,6 +584,7 @@ class configuration { bool lowercase_regular_identifiers_ = false; std::int32_t zone_offset_ = 0; std::size_t scan_block_size_ = 100; + std::size_t scan_yield_interval_ = 1; bool rtx_parallel_scan_ = false; }; diff --git a/src/jogasaki/api/impl/database.cpp b/src/jogasaki/api/impl/database.cpp index 81c13d108..bfd7dd70b 100644 --- a/src/jogasaki/api/impl/database.cpp +++ b/src/jogasaki/api/impl/database.cpp @@ -176,6 +176,7 @@ void dump_public_configurations(configuration const& cfg) { LOGCFG << "(dev_lowercase_regular_identifiers) " << cfg.lowercase_regular_identifiers() << " : whether to lowercase regular identifiers"; LOGCFG << "(zone_offset) " << cfg.zone_offset() << " : system time zone offset in minutes"; LOGCFG << "(scan_block_size) " << cfg.scan_block_size() << " : max records processed by scan operator before yielding to other task"; + LOGCFG << "(scan_yield_interval) " << cfg.scan_yield_interval() << " : max time (ms) processed by scan operator before yielding to other tasks"; LOGCFG << "(dev_rtx_parallel_scan) " << cfg.rtx_parallel_scan() << " : whether to enable parallel scan for RTX"; } diff --git a/src/jogasaki/api/resource/bridge.cpp b/src/jogasaki/api/resource/bridge.cpp index 0eda143a6..7f25d7068 100644 --- a/src/jogasaki/api/resource/bridge.cpp +++ b/src/jogasaki/api/resource/bridge.cpp @@ -248,6 +248,9 @@ bool process_sql_config(std::shared_ptr& ret, tateyama: if (auto v = jogasaki_config->get("scan_block_size")) { ret->scan_block_size(v.value()); } + if (auto v = jogasaki_config->get("scan_yield_interval")) { + ret->scan_yield_interval(v.value()); + } if (auto v = jogasaki_config->get("dev_rtx_parallel_scan")) { ret->rtx_parallel_scan(v.value()); } diff --git a/src/jogasaki/executor/process/impl/ops/scan.cpp b/src/jogasaki/executor/process/impl/ops/scan.cpp index ca8ea16f1..e1648be7c 100644 --- a/src/jogasaki/executor/process/impl/ops/scan.cpp +++ b/src/jogasaki/executor/process/impl/ops/scan.cpp @@ -15,6 +15,7 @@ */ #include "scan.h" +#include #include #include #include @@ -160,6 +161,8 @@ operation_status scan::operator()( //NOLINT(readability-function-cognitive-comp status st{}; std::size_t loop_count = 0; auto scan_block_size = global::config_pool()->scan_block_size(); + auto scan_yield_interval = static_cast(global::config_pool()->scan_yield_interval()); + auto previous_time = std::chrono::steady_clock::now(); while(true) { if(utils::request_cancel_enabled(request_cancel_kind::scan) && ctx.req_context()) { auto res_src = ctx.req_context()->req_info().response_source(); @@ -205,7 +208,12 @@ operation_status scan::operator()( //NOLINT(readability-function-cognitive-comp } } if (scan_block_size != 0 && scan_block_size == loop_count ){ - return {operation_status_kind::yield}; + auto current_time = std::chrono::steady_clock::now(); + auto elapsed_time = + std::chrono::duration_cast(current_time - previous_time); + if (elapsed_time.count() >= scan_yield_interval ) { + return {operation_status_kind::yield}; + } } loop_count++; }