From 8d538b7d74397806aee94d926a75d855e7c2e31c Mon Sep 17 00:00:00 2001 From: Yoshiaki Nishimura Date: Tue, 22 Oct 2024 18:31:10 +0900 Subject: [PATCH] Move key_begin and key_end members from scan_context to scan_info --- src/jogasaki/data/aligned_buffer.cpp | 21 +++- src/jogasaki/data/aligned_buffer.h | 11 ++- src/jogasaki/executor/process/flow.cpp | 10 +- .../process/impl/ops/operator_builder.cpp | 21 ++-- .../process/impl/ops/operator_builder.h | 12 +-- .../executor/process/impl/ops/scan.cpp | 66 ++----------- .../process/impl/ops/scan_context.cpp | 12 +-- .../executor/process/impl/ops/scan_context.h | 8 +- .../executor/process/impl/processor.cpp | 8 +- .../executor/process/impl/processor.h | 10 +- .../executor/process/impl/scan_info.cpp | 97 ++++++++++++++++++- .../executor/process/impl/scan_info.h | 31 ++++-- .../executor/process/impl/task_context.cpp | 8 +- .../executor/process/impl/task_context.h | 8 +- .../executor/process/ops/scan_test.cpp | 51 +++++++--- test/jogasaki/operator_test_utils.h | 6 +- 16 files changed, 235 insertions(+), 145 deletions(-) diff --git a/src/jogasaki/data/aligned_buffer.cpp b/src/jogasaki/data/aligned_buffer.cpp index 369a596fb..763c8761d 100644 --- a/src/jogasaki/data/aligned_buffer.cpp +++ b/src/jogasaki/data/aligned_buffer.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -126,4 +126,21 @@ aligned_buffer& aligned_buffer::assign(std::string_view sv) { return assign(aligned_buffer{sv}); } -} // namespace +void aligned_buffer::dump(std::ostream& out, int indent) const noexcept{ + std::string indent_space(indent, ' '); + out << indent_space << "aligned_buffer:" << "\n"; + out << indent_space << " capacity_: " << capacity_ << "\n"; + out << indent_space << " alignment_: " << alignment_ << "\n"; + out << indent_space << " size_: " << size_ << "\n"; + out << indent_space << " data_: " ; + for (std::size_t i = 0; i < size_; ++i) { + out << std::hex << std::setw(2) << std::setfill('0') << static_cast(data_[i]) << " "; + if ((i + 1) % 16 == 0) { + out << std::endl; + } + } + out << std::setfill(' ') << std::dec << std::endl; + +} + +} // namespace jogasaki::data \ No newline at end of file diff --git a/src/jogasaki/data/aligned_buffer.h b/src/jogasaki/data/aligned_buffer.h index d00e86e82..fd5462d57 100644 --- a/src/jogasaki/data/aligned_buffer.h +++ b/src/jogasaki/data/aligned_buffer.h @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -139,7 +139,12 @@ class aligned_buffer { * The alignment is not changed. */ void shrink_to_fit(); - + /** + * @brief Support for debugging, callable in GDB + * @param out The output stream to which the buffer's internal state will be written. + * @param indent The indentation level for formatting the output, default is 0. + */ + void dump(std::ostream& out, int indent = 0) const noexcept; /** * @brief return alignment of the buffer */ @@ -172,4 +177,4 @@ class aligned_buffer { void resize_internal(std::size_t sz, bool copydata); }; -} // namespace +} // namespace jogasaki::data \ No newline at end of file diff --git a/src/jogasaki/executor/process/flow.cpp b/src/jogasaki/executor/process/flow.cpp index fa5682db8..753f51ddb 100644 --- a/src/jogasaki/executor/process/flow.cpp +++ b/src/jogasaki/executor/process/flow.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -95,7 +95,7 @@ sequence_view> flow::create_tasks() { step_->io_info(), step_->relation_io_map(), *step_->io_exchange_map(), - context_->request_resource() + context_ ); } catch (plan::impl::compile_exception const& e) { error::set_error_info(*context_, e.info()); @@ -175,14 +175,14 @@ std::shared_ptr flow::create_task_context( (context_->record_channel() && external_output != nullptr) ? context_->record_channel().get() : nullptr, sink_index ); - + auto scan = ctx->shared_scan_info(); ctx->work_context( std::make_unique( context_, operators.size(), info_->vars_info_list().size(), std::make_unique(&global::page_pool()), - std::make_unique(&global::page_pool()), + (scan == nullptr)?std::make_unique(&global::page_pool()):scan->varlen_resource(), context_->database(), context_->transaction(), empty_input_from_shuffle_ @@ -191,4 +191,4 @@ std::shared_ptr flow::create_task_context( return ctx; } -} // namespace jogasaki::executor::process +} // namespace jogasaki::executor::process \ No newline at end of file diff --git a/src/jogasaki/executor/process/impl/ops/operator_builder.cpp b/src/jogasaki/executor/process/impl/ops/operator_builder.cpp index d974e30dc..61076d402 100644 --- a/src/jogasaki/executor/process/impl/ops/operator_builder.cpp +++ b/src/jogasaki/executor/process/impl/ops/operator_builder.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -76,16 +76,14 @@ operator_builder::operator_builder( std::shared_ptr io_info, std::shared_ptr relation_io_map, io_exchange_map& io_exchange_map, - memory::lifo_paged_memory_resource* resource + request_context* request_context ) : info_(std::move(info)), io_info_(std::move(io_info)), io_exchange_map_(std::addressof(io_exchange_map)), relation_io_map_(std::move(relation_io_map)), - resource_(resource) -{ - (void)resource_; //TODO remove if not necessary -} + request_context_(request_context) +{} operator_container operator_builder::operator()()&& { auto root = dispatch(*this, head()); @@ -132,7 +130,7 @@ std::unique_ptr operator_builder::operator()(const relation::scan // scan info is not passed to scan operator here, but passed back through task_context // in order to support parallel scan in the future scan_info_ = create_scan_info(node, secondary_or_primary_index); - + scan_info_->encode_key(request_context_); return std::make_unique( index_++, *info_, @@ -220,7 +218,7 @@ std::unique_ptr operator_builder::operator()(const relation::writ write_kind_from(node.operator_kind()), index, columns, - resource_ + request_context_->request_resource() ); } @@ -404,16 +402,15 @@ operator_container create_operators( std::shared_ptr io_info, std::shared_ptr relation_io_map, io_exchange_map& io_exchange_map, - memory::lifo_paged_memory_resource* resource + request_context* request_context ) { return operator_builder{ std::move(info), std::move(io_info), std::move(relation_io_map), io_exchange_map, - resource + request_context }(); } -} - +} // namespace jogasaki::executor::process::impl::ops \ No newline at end of file diff --git a/src/jogasaki/executor/process/impl/ops/operator_builder.h b/src/jogasaki/executor/process/impl/ops/operator_builder.h index 4653849c8..f2f6ce3c9 100644 --- a/src/jogasaki/executor/process/impl/ops/operator_builder.h +++ b/src/jogasaki/executor/process/impl/ops/operator_builder.h @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -90,7 +90,7 @@ class operator_builder { std::shared_ptr io_info, std::shared_ptr relation_io_map, io_exchange_map& io_exchange_map, - memory::lifo_paged_memory_resource* resource = nullptr + request_context* request_context = nullptr ); [[nodiscard]] operator_container operator()() &&; @@ -140,8 +140,7 @@ class operator_builder { std::shared_ptr relation_io_map_{}; operator_base::operator_index_type index_{}; std::shared_ptr scan_info_{}; - memory::lifo_paged_memory_resource* resource_{}; - + request_context* request_context_{}; kvs::end_point_kind from(relation::scan::endpoint::kind_type type); }; @@ -160,8 +159,7 @@ class operator_builder { std::shared_ptr io_info, std::shared_ptr relation_io_map, io_exchange_map& io_exchange_map, - memory::lifo_paged_memory_resource* resource = nullptr + request_context* request_context = nullptr ); -} - +} // namespace jogasaki::executor::process::impl::ops \ No newline at end of file diff --git a/src/jogasaki/executor/process/impl/ops/scan.cpp b/src/jogasaki/executor/process/impl/ops/scan.cpp index 82180d85e..8c2efe1b0 100644 --- a/src/jogasaki/executor/process/impl/ops/scan.cpp +++ b/src/jogasaki/executor/process/impl/ops/scan.cpp @@ -251,67 +251,15 @@ void scan::finish(abstract::task_context* context) { status scan::open(scan_context& ctx) { //NOLINT(readability-make-member-function-const) auto& stg = use_secondary_ ? *ctx.secondary_stg_ : *ctx.stg_; - auto be = ctx.scan_info_->begin_endpoint(); - auto ee = ctx.scan_info_->end_endpoint(); - if (use_secondary_) { - // at storage layer, secondary index key contains primary key index as postfix - // so boundary condition needs promotion to be compatible - // TODO verify the promotion - if (be == kvs::end_point_kind::inclusive) { - be = kvs::end_point_kind::prefixed_inclusive; - } - if (be == kvs::end_point_kind::exclusive) { - be = kvs::end_point_kind::prefixed_exclusive; - } - if (ee == kvs::end_point_kind::inclusive) { - ee = kvs::end_point_kind::prefixed_inclusive; - } - if (ee == kvs::end_point_kind::exclusive) { - ee = kvs::end_point_kind::prefixed_exclusive; - } - } - executor::process::impl::variable_table vars{}; - std::size_t blen{}; - std::string msg{}; - if(auto res = details::encode_key( - ctx.req_context(), - ctx.scan_info_->begin_columns(), - vars, - *ctx.varlen_resource(), - ctx.key_begin_, - blen, - msg - ); - res != status::ok) { - if(res == status::err_type_mismatch) { - // only on err_type_mismatch, msg is filled with error message. use it to create the error info in request context - set_error(*ctx.req_context(), error_code::unsupported_runtime_feature_exception, msg, res); - } - return res; - } - std::size_t elen{}; - if(auto res = details::encode_key( - ctx.req_context(), - ctx.scan_info_->end_columns(), - vars, - *ctx.varlen_resource(), - ctx.key_end_, - elen, - msg - ); - res != status::ok) { - if(res == status::err_type_mismatch) { - // only on err_type_mismatch, msg is filled with error message. use it to create the error info in request context - set_error(*ctx.req_context(), error_code::unsupported_runtime_feature_exception, msg, res); - } - return res; + if (auto status = ctx.scan_info_->status_result(); status != status::ok) { + return status; } if(auto res = stg.content_scan( *ctx.tx_, - {static_cast(ctx.key_begin_.data()), blen}, - be, - {static_cast(ctx.key_end_.data()), elen}, - ee, + ctx.scan_info_->begin_key(), + ctx.scan_info_->begin_kind(use_secondary_), + ctx.scan_info_->end_key(), + ctx.scan_info_->end_kind(use_secondary_), ctx.it_ ); res != status::ok) { handle_kvs_errors(*ctx.req_context(), res); @@ -371,4 +319,4 @@ void scan::dump() const noexcept { std::cerr << head << std::setw(width) << "field_mapper_:" << "not implemented yet" << std::endl; } -} // namespace jogasaki::executor::process::impl::ops +} // namespace jogasaki::executor::process::impl::ops \ No newline at end of file diff --git a/src/jogasaki/executor/process/impl/ops/scan_context.cpp b/src/jogasaki/executor/process/impl/ops/scan_context.cpp index 40affd1bc..b2db60250 100644 --- a/src/jogasaki/executor/process/impl/ops/scan_context.cpp +++ b/src/jogasaki/executor/process/impl/ops/scan_context.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -69,13 +69,7 @@ void scan_context::dump() const noexcept { << " " << std::setw(20) << "iterator:" << (it_ ? it_.get() : nullptr) << "\n" << " " << std::setw(20) << "scan_info:" - << (scan_info_ ? scan_info_ : nullptr) << "\n" - << " " << std::setw(20) << "key_begin_size:" - << key_begin_.size() << "\n" - << " " << std::setw(20) << "key_end_size:" - << key_end_.size() << std::endl; + << (scan_info_ ? scan_info_ : nullptr) << "\n"; } -} - - +} // namespace jogasaki::executor::process::impl::ops \ No newline at end of file diff --git a/src/jogasaki/executor/process/impl/ops/scan_context.h b/src/jogasaki/executor/process/impl/ops/scan_context.h index 330866c74..913d67218 100644 --- a/src/jogasaki/executor/process/impl/ops/scan_context.h +++ b/src/jogasaki/executor/process/impl/ops/scan_context.h @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -75,10 +75,6 @@ class scan_context : public context_base { transaction_context* tx_{}; std::unique_ptr it_{}; impl::scan_info const* scan_info_{}; - data::aligned_buffer key_begin_{}; - data::aligned_buffer key_end_{}; }; -} - - +} // namespace jogasaki::executor::process::impl::ops \ No newline at end of file diff --git a/src/jogasaki/executor/process/impl/processor.cpp b/src/jogasaki/executor/process/impl/processor.cpp index 7b348787c..5ac006d76 100644 --- a/src/jogasaki/executor/process/impl/processor.cpp +++ b/src/jogasaki/executor/process/impl/processor.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,7 +43,7 @@ processor::processor( std::shared_ptr io_info, std::shared_ptr relation_io_map, io_exchange_map& io_exchange_map, - memory::lifo_paged_memory_resource* resource + request_context* request_context ) : info_(std::move(info)), operators_( @@ -52,7 +52,7 @@ processor::processor( std::move(io_info), std::move(relation_io_map), io_exchange_map, - resource + request_context ) ), relation_io_map_(std::move(relation_io_map)) @@ -96,4 +96,4 @@ ops::operator_container const& processor::operators() const noexcept { return operators_; } -} +} // namespace jogasaki::executor::process::impl \ No newline at end of file diff --git a/src/jogasaki/executor/process/impl/processor.h b/src/jogasaki/executor/process/impl/processor.h index a9b070f12..3afbc44fb 100644 --- a/src/jogasaki/executor/process/impl/processor.h +++ b/src/jogasaki/executor/process/impl/processor.h @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,7 +48,7 @@ class processor : public process::abstract::processor { * @param io_info input/output information * @param relation_io_map mapping from relation to input/output indices * @param io_exchange_map map from input/output to exchange operator - * @param resource the memory resource to build the structures needed by this processor + * @param request_context memory resource for initializing and managing processor structures * @throws plan::impl::compile_exception if the processor construction fails */ processor( @@ -56,7 +56,7 @@ class processor : public process::abstract::processor { std::shared_ptr io_info, std::shared_ptr relation_io_map, io_exchange_map& io_exchange_map, - memory::lifo_paged_memory_resource* resource + request_context* request_context ); /** @@ -78,6 +78,4 @@ class processor : public process::abstract::processor { std::shared_ptr relation_io_map_{}; }; -} - - +} // namespace jogasaki::executor::process::impl \ No newline at end of file diff --git a/src/jogasaki/executor/process/impl/scan_info.cpp b/src/jogasaki/executor/process/impl/scan_info.cpp index 6a23236a4..bf08fb9de 100644 --- a/src/jogasaki/executor/process/impl/scan_info.cpp +++ b/src/jogasaki/executor/process/impl/scan_info.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,11 +17,16 @@ #include +#include +#include +#include #include +#include #include namespace jogasaki::executor::process::impl { + using memory_resource = ops::context_base::memory_resource; scan_info::scan_info( std::vector begin_columns, kvs::end_point_kind begin_endpoint, @@ -32,7 +37,9 @@ scan_info::scan_info( begin_endpoint_(begin_endpoint), end_columns_(std::move(end_columns)), end_endpoint_(end_endpoint) -{} +{ + varlen_resource_ = std::make_unique(&global::page_pool()); +} std::vector const& scan_info::begin_columns() const noexcept { return begin_columns_; @@ -50,4 +57,90 @@ kvs::end_point_kind scan_info::end_endpoint() const noexcept { return end_endpoint_; } +[[nodiscard]] std::unique_ptr scan_info::varlen_resource() noexcept { + return std::move(varlen_resource_); +} +void scan_info::set_varlen_resource(std::unique_ptr resource) { + varlen_resource_ = std::move(resource); +} + +[[nodiscard]] std::string_view scan_info::begin_key() const noexcept{ + return {static_cast(key_begin_.data()), blen_}; +} +[[nodiscard]] std::string_view scan_info::end_key() const noexcept{ + return {static_cast(key_end_.data()), elen_}; +} +[[nodiscard]] kvs::end_point_kind scan_info::get_kind(bool use_secondary, kvs::end_point_kind endpoint) const noexcept { + if (use_secondary) { + if (endpoint == kvs::end_point_kind::inclusive) { + return kvs::end_point_kind::prefixed_inclusive; + } + if (endpoint == kvs::end_point_kind::exclusive) { + return kvs::end_point_kind::prefixed_exclusive; + } + } + return endpoint; +} +[[nodiscard]] kvs::end_point_kind scan_info::begin_kind(bool use_secondary) const noexcept{ + return get_kind(use_secondary, begin_endpoint_); +} +[[nodiscard]] kvs::end_point_kind scan_info::end_kind(bool use_secondary) const noexcept{ + return get_kind(use_secondary, end_endpoint_); +} +[[nodiscard]] status scan_info::status_result() const noexcept{ + return status_result_; +} + +void scan_info::encode_key(request_context* rc) noexcept { + std::string msg{}; + executor::process::impl::variable_table vars{}; + if(status_result_ = impl::ops::details::encode_key( + rc, + begin_columns_, + vars, + *varlen_resource_, + key_begin_, + blen_, + msg + ); + status_result_ != status::ok) { + if(status_result_ == status::err_type_mismatch) { + // only on err_type_mismatch, msg is filled with error message. use it to create the error info in request context + set_error(*rc, error_code::unsupported_runtime_feature_exception, msg, status_result_); + } + return; + } + if(status_result_ = impl::ops::details::encode_key( + rc, + end_columns_, + vars, + *varlen_resource_, + key_end_, + elen_, + msg + ); + status_result_ != status::ok) { + if(status_result_ == status::err_type_mismatch) { + // only on err_type_mismatch, msg is filled with error message. use it to create the error info in request context + set_error(*rc, error_code::unsupported_runtime_feature_exception, msg, status_result_); + } + } } + +void scan_info::dump(std::ostream& out, int indent) const noexcept{ + std::string indent_space(indent, ' '); + out << indent_space << "begin_columns_: " << &begin_columns_ << "\n"; + out << indent_space << "begin_endpoint_: " << &begin_endpoint_ << "\n"; + out << indent_space << "end_columns_: " << &end_columns_ << "\n"; + out << indent_space << "end_endpoint_: " << &end_endpoint_ << "\n"; + out << indent_space << "key_begin_: " << &key_begin_ << "\n"; + key_begin_.dump(out,indent+2); + out << indent_space << "key_end_: " << &key_end_ << "\n"; + key_end_.dump(out,indent+2); + out << indent_space << "blen_: " << blen_ << "\n"; + out << indent_space << "elen_: " << elen_ << "\n"; + out << indent_space << "status_result_: " << &status_result_ << "\n"; + out << indent_space << "varlen_resource_: " << &varlen_resource_ << "\n"; +} + +} // namespace jogasaki::executor::process::impl \ No newline at end of file diff --git a/src/jogasaki/executor/process/impl/scan_info.h b/src/jogasaki/executor/process/impl/scan_info.h index 824d84bc2..b1a65e581 100644 --- a/src/jogasaki/executor/process/impl/scan_info.h +++ b/src/jogasaki/executor/process/impl/scan_info.h @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,8 @@ #include #include +#include +#include #include namespace jogasaki::executor::process::impl { @@ -29,6 +31,7 @@ namespace jogasaki::executor::process::impl { */ class scan_info : public abstract::scan_info { public: + using memory_resource = ops::context_base::memory_resource; /** * @brief create new object */ @@ -41,8 +44,8 @@ class scan_info : public abstract::scan_info { ~scan_info() override = default; - scan_info(scan_info const& other) = default; - scan_info& operator=(scan_info const& other) = default; + scan_info(scan_info const& other) = delete; + scan_info& operator=(scan_info const& other) = delete; scan_info(scan_info&& other) noexcept = default; scan_info& operator=(scan_info&& other) noexcept = default; @@ -50,14 +53,30 @@ class scan_info : public abstract::scan_info { [[nodiscard]] std::vector const& end_columns() const noexcept; [[nodiscard]] kvs::end_point_kind begin_endpoint() const noexcept; [[nodiscard]] kvs::end_point_kind end_endpoint() const noexcept; + [[nodiscard]] std::unique_ptr varlen_resource() noexcept; + // void varlen_resource(std::unique_ptr resource) noexcept; + //void set_varlen_resource(memory_resource* resource) noexcept; + void set_varlen_resource(std::unique_ptr resource); + [[nodiscard]] std::string_view begin_key() const noexcept; + [[nodiscard]] std::string_view end_key() const noexcept; + [[nodiscard]] kvs::end_point_kind begin_kind(bool use_secondary) const noexcept; + [[nodiscard]] kvs::end_point_kind end_kind(bool use_secondary) const noexcept; + [[nodiscard]] status status_result() const noexcept; + void encode_key(request_context* rc) noexcept; + void dump(std::ostream& out, int indent = 0) const noexcept; private: std::vector begin_columns_{}; kvs::end_point_kind begin_endpoint_{}; std::vector end_columns_{}; kvs::end_point_kind end_endpoint_{}; + data::aligned_buffer key_begin_{}; + data::aligned_buffer key_end_{}; + std::size_t blen_{}; + std::size_t elen_{}; + status status_result_{}; + std::unique_ptr varlen_resource_{}; + [[nodiscard]] kvs::end_point_kind get_kind(bool use_secondary, kvs::end_point_kind endpoint) const noexcept; }; -} - - +} // namespace jogasaki::executor::process::impl \ No newline at end of file diff --git a/src/jogasaki/executor/process/impl/task_context.cpp b/src/jogasaki/executor/process/impl/task_context.cpp index 21fec4c4a..ac1cca01a 100644 --- a/src/jogasaki/executor/process/impl/task_context.cpp +++ b/src/jogasaki/executor/process/impl/task_context.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -124,6 +124,10 @@ class abstract::scan_info const* task_context::scan_info() { return scan_info_.get(); } +std::shared_ptr const& task_context::shared_scan_info() noexcept { + return scan_info_; +} + std::size_t task_context::partition() const noexcept { return partition_; } @@ -132,4 +136,4 @@ io::record_channel* task_context::channel() const noexcept { return channel_; } -} +} // namespace jogasaki::executor::process::impl \ No newline at end of file diff --git a/src/jogasaki/executor/process/impl/task_context.h b/src/jogasaki/executor/process/impl/task_context.h index b05eef24b..b33121744 100644 --- a/src/jogasaki/executor/process/impl/task_context.h +++ b/src/jogasaki/executor/process/impl/task_context.h @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -78,6 +78,8 @@ class task_context : public abstract::task_context { class abstract::scan_info const* scan_info() override; + [[nodiscard]] std::shared_ptr const& shared_scan_info() noexcept; + [[nodiscard]] std::size_t partition() const noexcept; [[nodiscard]] io::record_channel* channel() const noexcept; @@ -94,6 +96,4 @@ class task_context : public abstract::task_context { partition_index sink_index_{}; }; -} - - +} // namespace jogasaki::executor::process::impl \ No newline at end of file diff --git a/test/jogasaki/executor/process/ops/scan_test.cpp b/test/jogasaki/executor/process/ops/scan_test.cpp index 0a965e6b3..a6ebb25f9 100644 --- a/test/jogasaki/executor/process/ops/scan_test.cpp +++ b/test/jogasaki/executor/process/ops/scan_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -75,6 +75,8 @@ #include #include #include +#include +#include #include "verifier.h" @@ -169,8 +171,11 @@ TEST_F(scan_test, simple) { auto tx = wrap(db_->create_transaction()); auto sinfo = std::make_shared(); + std::unique_ptr vr = std::make_unique(&global::page_pool()); + sinfo->set_varlen_resource(std::move(vr)); + sinfo->encode_key(&request_context_); mock::task_context task_ctx{ {}, {}, {}, {sinfo}}; - scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), nullptr, tx.get(), sinfo.get(), &resource_, &varlen_resource_); + scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), nullptr, tx.get(), sinfo.get(), request_context_.request_resource(), &varlen_resource_); ASSERT_TRUE(static_cast(op(ctx))); ctx.release(); @@ -234,8 +239,11 @@ TEST_F(scan_test, nullable_fields) { auto tx = wrap(db_->create_transaction()); auto sinfo = std::make_shared(); + std::unique_ptr vr = std::make_unique(&global::page_pool()); + sinfo->set_varlen_resource(std::move(vr)); + sinfo->encode_key(&request_context_); mock::task_context task_ctx{ {}, {}, {}, {sinfo}}; - scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), nullptr, tx.get(), sinfo.get(), &resource_, &varlen_resource_); + scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), nullptr, tx.get(), sinfo.get(), request_context_.request_resource(), &varlen_resource_); ASSERT_TRUE(static_cast(op(ctx))); ctx.release(); @@ -326,10 +334,16 @@ TEST_F(scan_test, scan_info) { &output_variable_info }; + auto transaction_ctx = std::make_shared(); + transaction_ctx->error_info(create_error_info(error_code::none, "", status::err_unknown)); + request_context_.transaction(transaction_ctx); jogasaki::plan::compiler_context compiler_ctx{}; io_exchange_map exchange_map{}; - operator_builder builder{processor_info_, {}, {}, exchange_map, &resource_}; + operator_builder builder{processor_info_, {}, {}, exchange_map, &request_context_}; auto sinfo = builder.create_scan_info(target, *primary_idx); + std::unique_ptr vr = std::make_unique(&global::page_pool()); + sinfo->set_varlen_resource(std::move(vr)); + sinfo->encode_key(&request_context_); mock::task_context task_ctx{ {}, {}, {}, {sinfo}}; put( *db_, primary_idx->simple_name(), create_record(100, accessor::text{"123456789012345678901234567890/B"}), create_record(1.0)); @@ -337,8 +351,7 @@ TEST_F(scan_test, scan_info) { put( *db_, primary_idx->simple_name(), create_record(100, accessor::text{"123456789012345678901234567890/D"}), create_record(3.0)); auto tx = wrap(db_->create_transaction()); - scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), nullptr, tx.get(), sinfo.get(), &resource_, &varlen_resource_); - + scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), nullptr, tx.get(), sinfo.get(), request_context_.request_resource(), &varlen_resource_); ASSERT_TRUE(static_cast(op(ctx))); ctx.release(); ASSERT_EQ(2, result.size()); @@ -416,10 +429,15 @@ TEST_F(scan_test, secondary_index) { &input_variable_info, &output_variable_info }; - + auto transaction_ctx = std::make_shared(); + transaction_ctx->error_info(create_error_info(error_code::none, "", status::err_unknown)); + request_context_.transaction(transaction_ctx); io_exchange_map exchange_map{}; - operator_builder builder{processor_info_, {}, {}, exchange_map, &resource_}; + operator_builder builder{processor_info_, {}, {}, exchange_map, &request_context_}; auto sinfo = builder.create_scan_info(target, *secondary_idx); + std::unique_ptr vr = std::make_unique(&global::page_pool()); + sinfo->set_varlen_resource(std::move(vr)); + sinfo->encode_key(&request_context_); mock::task_context task_ctx{ {}, {}, {}, {sinfo}}; put( *db_, primary_idx->simple_name(), create_record(10), create_record(1.0, 100)); @@ -432,8 +450,7 @@ TEST_F(scan_test, secondary_index) { put( *db_, secondary_idx->simple_name(), create_record(300, 30), {}); auto tx = wrap(db_->create_transaction()); - scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), get_storage(*db_, secondary_idx->simple_name()), tx.get(), sinfo.get(), &resource_, &varlen_resource_); - + scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), get_storage(*db_, secondary_idx->simple_name()), tx.get(), sinfo.get(), request_context_.request_resource(), &varlen_resource_); ASSERT_TRUE(static_cast(op(ctx))); ctx.release(); ASSERT_EQ(2, result.size()); @@ -545,11 +562,16 @@ TEST_F(scan_test, host_variables) { &input_variable_info, &output_variable_info }; - + auto transaction_ctx = std::make_shared(); + transaction_ctx->error_info(create_error_info(error_code::none, "", status::err_unknown)); + request_context_.transaction(transaction_ctx); jogasaki::plan::compiler_context compiler_ctx{}; io_exchange_map exchange_map{}; - operator_builder builder{processor_info_, {}, {}, exchange_map, &resource_}; + operator_builder builder{processor_info_, {}, {}, exchange_map, &request_context_}; auto sinfo = builder.create_scan_info(target, *primary_idx); + std::unique_ptr vr = std::make_unique(&global::page_pool()); + sinfo->set_varlen_resource(std::move(vr)); + sinfo->encode_key(&request_context_); mock::task_context task_ctx{ {}, {}, {}, {sinfo}}; put( *db_, primary_idx->simple_name(), create_record(100, 10), create_record(1)); @@ -557,7 +579,7 @@ TEST_F(scan_test, host_variables) { put( *db_, primary_idx->simple_name(), create_record(100, 30), create_record(3)); auto tx = wrap(db_->create_transaction()); - scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), nullptr, tx.get(), sinfo.get(), &resource_, &varlen_resource_); + scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), nullptr, tx.get(), sinfo.get(), request_context_.request_resource(), &varlen_resource_); ASSERT_TRUE(static_cast(op(ctx))); ctx.release(); @@ -566,5 +588,4 @@ TEST_F(scan_test, host_variables) { ASSERT_EQ(status::ok, tx->commit()); } -} - +} // namespace jogasaki::executor::process::impl::ops \ No newline at end of file diff --git a/test/jogasaki/operator_test_utils.h b/test/jogasaki/operator_test_utils.h index 18a08205a..2da436966 100644 --- a/test/jogasaki/operator_test_utils.h +++ b/test/jogasaki/operator_test_utils.h @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -176,6 +176,7 @@ class operator_test_utils { takatori::plan::process& process_; //NOLINT memory::page_pool pool_{}; //NOLINT + request_context request_context_{}; //NOLINT memory::lifo_paged_memory_resource resource_; //NOLINT memory::lifo_paged_memory_resource varlen_resource_; //NOLINT memory::lifo_paged_memory_resource verifier_varlen_resource_; //NOLINT @@ -309,5 +310,4 @@ class operator_test_utils { }; -} - +} // namespace jogasaki::executor::process::impl::ops