From a159ef1cec57e7e15411782477ae092702dc3403 Mon Sep 17 00:00:00 2001 From: Yoshiaki Nishimura Date: Mon, 21 Oct 2024 13:31:43 +0900 Subject: [PATCH] Move key_begin and key_end members from scan_context to scan_info --- src/jogasaki/data/aligned_buffer.cpp | 21 +++- src/jogasaki/data/aligned_buffer.h | 5 +- src/jogasaki/executor/process/flow.cpp | 8 +- .../process/impl/ops/operator_builder.cpp | 21 ++-- .../process/impl/ops/operator_builder.h | 12 +-- .../executor/process/impl/ops/scan.cpp | 64 ++---------- .../executor/process/impl/processor.cpp | 6 +- .../executor/process/impl/processor.h | 8 +- .../executor/process/impl/scan_info.cpp | 97 ++++++++++++++++++- .../executor/process/impl/scan_info.h | 22 ++++- .../executor/process/impl/task_context.cpp | 7 +- .../executor/process/impl/task_context.h | 8 +- .../executor/process/ops/scan_test.cpp | 36 ++++--- test/jogasaki/operator_test_utils.h | 1 + 14 files changed, 199 insertions(+), 117 deletions(-) diff --git a/src/jogasaki/data/aligned_buffer.cpp b/src/jogasaki/data/aligned_buffer.cpp index 369a596fb..b7e54c066 100644 --- a/src/jogasaki/data/aligned_buffer.cpp +++ b/src/jogasaki/data/aligned_buffer.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -126,4 +126,21 @@ aligned_buffer& aligned_buffer::assign(std::string_view sv) { return assign(aligned_buffer{sv}); } -} // namespace +void aligned_buffer::dump(std::ostream& out, int indent) const noexcept{ + std::string indent_space(indent, ' '); + out << indent_space << "aligned_buffer:" << "\n"; + out << indent_space << " capacity_: " << capacity_ << "\n"; + out << indent_space << " alignment_: " << alignment_ << "\n"; + out << indent_space << " size_: " << size_ << "\n"; + out << indent_space << " data_: " ; + for (std::size_t i = 0; i < size_; ++i) { + out << std::hex << std::setw(2) << std::setfill('0') << static_cast(data_[i]) << " "; + if ((i + 1) % 16 == 0) { + out << std::endl; + } + } + out << std::setfill(' ') << std::dec << std::endl; + +} + +} // namespace jogasaki::data diff --git a/src/jogasaki/data/aligned_buffer.h b/src/jogasaki/data/aligned_buffer.h index d00e86e82..eb8ecda3d 100644 --- a/src/jogasaki/data/aligned_buffer.h +++ b/src/jogasaki/data/aligned_buffer.h @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -162,6 +162,7 @@ class aligned_buffer { * @return the output */ friend std::ostream& operator<<(std::ostream& out, aligned_buffer const& value); + void dump(std::ostream& out, int indent = 0) const noexcept; private: std::size_t capacity_{}; @@ -172,4 +173,4 @@ class aligned_buffer { void resize_internal(std::size_t sz, bool copydata); }; -} // namespace +} // namespace jogasaki::data diff --git a/src/jogasaki/executor/process/flow.cpp b/src/jogasaki/executor/process/flow.cpp index fa5682db8..da7fc8636 100644 --- a/src/jogasaki/executor/process/flow.cpp +++ b/src/jogasaki/executor/process/flow.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -95,7 +95,7 @@ sequence_view> flow::create_tasks() { step_->io_info(), step_->relation_io_map(), *step_->io_exchange_map(), - context_->request_resource() + context_ ); } catch (plan::impl::compile_exception const& e) { error::set_error_info(*context_, e.info()); @@ -175,14 +175,14 @@ std::shared_ptr flow::create_task_context( (context_->record_channel() && external_output != nullptr) ? context_->record_channel().get() : nullptr, sink_index ); - + auto scan = ctx->non_const_impl_scan_info(); ctx->work_context( std::make_unique( context_, operators.size(), info_->vars_info_list().size(), std::make_unique(&global::page_pool()), - std::make_unique(&global::page_pool()), + (scan == nullptr)?std::make_unique(&global::page_pool()):scan->varlen_resource(), context_->database(), context_->transaction(), empty_input_from_shuffle_ diff --git a/src/jogasaki/executor/process/impl/ops/operator_builder.cpp b/src/jogasaki/executor/process/impl/ops/operator_builder.cpp index d974e30dc..61076d402 100644 --- a/src/jogasaki/executor/process/impl/ops/operator_builder.cpp +++ b/src/jogasaki/executor/process/impl/ops/operator_builder.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -76,16 +76,14 @@ operator_builder::operator_builder( std::shared_ptr io_info, std::shared_ptr relation_io_map, io_exchange_map& io_exchange_map, - memory::lifo_paged_memory_resource* resource + request_context* request_context ) : info_(std::move(info)), io_info_(std::move(io_info)), io_exchange_map_(std::addressof(io_exchange_map)), relation_io_map_(std::move(relation_io_map)), - resource_(resource) -{ - (void)resource_; //TODO remove if not necessary -} + request_context_(request_context) +{} operator_container operator_builder::operator()()&& { auto root = dispatch(*this, head()); @@ -132,7 +130,7 @@ std::unique_ptr operator_builder::operator()(const relation::scan // scan info is not passed to scan operator here, but passed back through task_context // in order to support parallel scan in the future scan_info_ = create_scan_info(node, secondary_or_primary_index); - + scan_info_->encode_key(request_context_); return std::make_unique( index_++, *info_, @@ -220,7 +218,7 @@ std::unique_ptr operator_builder::operator()(const relation::writ write_kind_from(node.operator_kind()), index, columns, - resource_ + request_context_->request_resource() ); } @@ -404,16 +402,15 @@ operator_container create_operators( std::shared_ptr io_info, std::shared_ptr relation_io_map, io_exchange_map& io_exchange_map, - memory::lifo_paged_memory_resource* resource + request_context* request_context ) { return operator_builder{ std::move(info), std::move(io_info), std::move(relation_io_map), io_exchange_map, - resource + request_context }(); } -} - +} // namespace jogasaki::executor::process::impl::ops \ No newline at end of file diff --git a/src/jogasaki/executor/process/impl/ops/operator_builder.h b/src/jogasaki/executor/process/impl/ops/operator_builder.h index 4653849c8..f2f6ce3c9 100644 --- a/src/jogasaki/executor/process/impl/ops/operator_builder.h +++ b/src/jogasaki/executor/process/impl/ops/operator_builder.h @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -90,7 +90,7 @@ class operator_builder { std::shared_ptr io_info, std::shared_ptr relation_io_map, io_exchange_map& io_exchange_map, - memory::lifo_paged_memory_resource* resource = nullptr + request_context* request_context = nullptr ); [[nodiscard]] operator_container operator()() &&; @@ -140,8 +140,7 @@ class operator_builder { std::shared_ptr relation_io_map_{}; operator_base::operator_index_type index_{}; std::shared_ptr scan_info_{}; - memory::lifo_paged_memory_resource* resource_{}; - + request_context* request_context_{}; kvs::end_point_kind from(relation::scan::endpoint::kind_type type); }; @@ -160,8 +159,7 @@ class operator_builder { std::shared_ptr io_info, std::shared_ptr relation_io_map, io_exchange_map& io_exchange_map, - memory::lifo_paged_memory_resource* resource = nullptr + request_context* request_context = nullptr ); -} - +} // namespace jogasaki::executor::process::impl::ops \ No newline at end of file diff --git a/src/jogasaki/executor/process/impl/ops/scan.cpp b/src/jogasaki/executor/process/impl/ops/scan.cpp index 8d6b862a6..55721fc69 100644 --- a/src/jogasaki/executor/process/impl/ops/scan.cpp +++ b/src/jogasaki/executor/process/impl/ops/scan.cpp @@ -250,67 +250,15 @@ void scan::finish(abstract::task_context* context) { status scan::open(scan_context& ctx) { //NOLINT(readability-make-member-function-const) auto& stg = use_secondary_ ? *ctx.secondary_stg_ : *ctx.stg_; - auto be = ctx.scan_info_->begin_endpoint(); - auto ee = ctx.scan_info_->end_endpoint(); - if (use_secondary_) { - // at storage layer, secondary index key contains primary key index as postfix - // so boundary condition needs promotion to be compatible - // TODO verify the promotion - if (be == kvs::end_point_kind::inclusive) { - be = kvs::end_point_kind::prefixed_inclusive; - } - if (be == kvs::end_point_kind::exclusive) { - be = kvs::end_point_kind::prefixed_exclusive; - } - if (ee == kvs::end_point_kind::inclusive) { - ee = kvs::end_point_kind::prefixed_inclusive; - } - if (ee == kvs::end_point_kind::exclusive) { - ee = kvs::end_point_kind::prefixed_exclusive; - } - } - executor::process::impl::variable_table vars{}; - std::size_t blen{}; - std::string msg{}; - if(auto res = details::encode_key( - ctx.req_context(), - ctx.scan_info_->begin_columns(), - vars, - *ctx.varlen_resource(), - ctx.key_begin_, - blen, - msg - ); - res != status::ok) { - if(res == status::err_type_mismatch) { - // only on err_type_mismatch, msg is filled with error message. use it to create the error info in request context - set_error(*ctx.req_context(), error_code::unsupported_runtime_feature_exception, msg, res); - } - return res; - } - std::size_t elen{}; - if(auto res = details::encode_key( - ctx.req_context(), - ctx.scan_info_->end_columns(), - vars, - *ctx.varlen_resource(), - ctx.key_end_, - elen, - msg - ); - res != status::ok) { - if(res == status::err_type_mismatch) { - // only on err_type_mismatch, msg is filled with error message. use it to create the error info in request context - set_error(*ctx.req_context(), error_code::unsupported_runtime_feature_exception, msg, res); - } - return res; + if (auto status = ctx.scan_info_->status_result(); status != status::ok) { + return status; } if(auto res = stg.content_scan( *ctx.tx_, - {static_cast(ctx.key_begin_.data()), blen}, - be, - {static_cast(ctx.key_end_.data()), elen}, - ee, + ctx.scan_info_->begin_key(), + ctx.scan_info_->begin_kind(use_secondary_), + ctx.scan_info_->end_key(), + ctx.scan_info_->end_kind(use_secondary_), ctx.it_ ); res != status::ok) { handle_kvs_errors(*ctx.req_context(), res); diff --git a/src/jogasaki/executor/process/impl/processor.cpp b/src/jogasaki/executor/process/impl/processor.cpp index 7b348787c..3088daba8 100644 --- a/src/jogasaki/executor/process/impl/processor.cpp +++ b/src/jogasaki/executor/process/impl/processor.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,7 +43,7 @@ processor::processor( std::shared_ptr io_info, std::shared_ptr relation_io_map, io_exchange_map& io_exchange_map, - memory::lifo_paged_memory_resource* resource + request_context* resource ) : info_(std::move(info)), operators_( @@ -96,4 +96,4 @@ ops::operator_container const& processor::operators() const noexcept { return operators_; } -} +} // namespace jogasaki::executor::process::impl \ No newline at end of file diff --git a/src/jogasaki/executor/process/impl/processor.h b/src/jogasaki/executor/process/impl/processor.h index a9b070f12..70068d333 100644 --- a/src/jogasaki/executor/process/impl/processor.h +++ b/src/jogasaki/executor/process/impl/processor.h @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -56,7 +56,7 @@ class processor : public process::abstract::processor { std::shared_ptr io_info, std::shared_ptr relation_io_map, io_exchange_map& io_exchange_map, - memory::lifo_paged_memory_resource* resource + request_context* resource ); /** @@ -78,6 +78,4 @@ class processor : public process::abstract::processor { std::shared_ptr relation_io_map_{}; }; -} - - +} // namespace jogasaki::executor::process::impl \ No newline at end of file diff --git a/src/jogasaki/executor/process/impl/scan_info.cpp b/src/jogasaki/executor/process/impl/scan_info.cpp index 6a23236a4..ea7c1b2c8 100644 --- a/src/jogasaki/executor/process/impl/scan_info.cpp +++ b/src/jogasaki/executor/process/impl/scan_info.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,11 +17,16 @@ #include +#include +#include +#include #include +#include #include namespace jogasaki::executor::process::impl { + using memory_resource = ops::context_base::memory_resource; scan_info::scan_info( std::vector begin_columns, kvs::end_point_kind begin_endpoint, @@ -32,7 +37,9 @@ scan_info::scan_info( begin_endpoint_(begin_endpoint), end_columns_(std::move(end_columns)), end_endpoint_(end_endpoint) -{} +{ + varlen_resource_ = std::make_unique(&global::page_pool()); +} std::vector const& scan_info::begin_columns() const noexcept { return begin_columns_; @@ -50,4 +57,90 @@ kvs::end_point_kind scan_info::end_endpoint() const noexcept { return end_endpoint_; } +[[nodiscard]] std::unique_ptr scan_info::varlen_resource() noexcept { + return std::move(varlen_resource_); +} + +[[nodiscard]] std::string_view scan_info::begin_key() const noexcept{ + return {static_cast(key_begin_.data()), blen_}; +} +[[nodiscard]] std::string_view scan_info::end_key() const noexcept{ + return {static_cast(key_end_.data()), elen_}; +} +[[nodiscard]] kvs::end_point_kind scan_info::get_kind(bool use_secondary, kvs::end_point_kind endpoint) const noexcept { + if (use_secondary) { + if (endpoint == kvs::end_point_kind::inclusive) { + return kvs::end_point_kind::prefixed_inclusive; + } + if (endpoint == kvs::end_point_kind::exclusive) { + return kvs::end_point_kind::prefixed_exclusive; + } + } + return endpoint; +} +[[nodiscard]] kvs::end_point_kind scan_info::begin_kind(bool use_secondary) const noexcept{ + return get_kind(use_secondary, begin_endpoint_); +} +[[nodiscard]] kvs::end_point_kind scan_info::end_kind(bool use_secondary) const noexcept{ + return get_kind(use_secondary, end_endpoint_); } +[[nodiscard]] status scan_info::status_result() const noexcept{ + return status_result_; +} + +void scan_info::encode_key(request_context* rc) noexcept { + std::string msg{}; + executor::process::impl::variable_table vars{}; + dump(std::cerr,0); + if(status_result_ = impl::ops::details::encode_key( + rc, + begin_columns_, + vars, + *varlen_resource_, + key_begin_, + blen_, + msg + ); + status_result_ != status::ok) { + if(status_result_ == status::err_type_mismatch) { + // only on err_type_mismatch, msg is filled with error message. use it to create the error info in request context + set_error(*rc, error_code::unsupported_runtime_feature_exception, msg, status_result_); + } + return; + } + if(status_result_ = impl::ops::details::encode_key( + rc, + end_columns_, + vars, + *varlen_resource_, + key_end_, + elen_, + msg + ); + status_result_ != status::ok) { + if(status_result_ == status::err_type_mismatch) { + // only on err_type_mismatch, msg is filled with error message. use it to create the error info in request context + set_error(*rc, error_code::unsupported_runtime_feature_exception, msg, status_result_); + } + } + std::cerr << "key_begin_" << std::endl; + key_begin_.dump(std::cerr,0); + std::cerr << "key_end_" << std::endl; + key_end_.dump(std::cerr,0); +} + +void scan_info::dump(std::ostream& out, int indent) const noexcept{ + std::string indent_space(indent, ' '); + out << indent_space << "begin_columns_: " << &begin_columns_ << "\n"; + out << indent_space << "begin_endpoint_: " << &begin_endpoint_ << "\n"; + out << indent_space << "end_columns_: " << &end_columns_ << "\n"; + out << indent_space << "end_endpoint_: " << &end_endpoint_ << "\n"; + out << indent_space << "key_begin_: " << &key_begin_ << "\n"; + out << indent_space << "key_end_: " << &key_end_ << "\n"; + out << indent_space << "blen_: " << blen_ << "\n"; + out << indent_space << "elen_: " << elen_ << "\n"; + out << indent_space << "status_result_: " << &status_result_ << "\n"; + out << indent_space << "varlen_resource_: " << &varlen_resource_ << "\n"; +} + +} // namespace jogasaki::executor::process::impl diff --git a/src/jogasaki/executor/process/impl/scan_info.h b/src/jogasaki/executor/process/impl/scan_info.h index 824d84bc2..bed24489b 100644 --- a/src/jogasaki/executor/process/impl/scan_info.h +++ b/src/jogasaki/executor/process/impl/scan_info.h @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,8 @@ #include #include +#include +#include #include namespace jogasaki::executor::process::impl { @@ -29,6 +31,7 @@ namespace jogasaki::executor::process::impl { */ class scan_info : public abstract::scan_info { public: + using memory_resource = ops::context_base::memory_resource; /** * @brief create new object */ @@ -50,14 +53,29 @@ class scan_info : public abstract::scan_info { [[nodiscard]] std::vector const& end_columns() const noexcept; [[nodiscard]] kvs::end_point_kind begin_endpoint() const noexcept; [[nodiscard]] kvs::end_point_kind end_endpoint() const noexcept; + [[nodiscard]] std::unique_ptr varlen_resource() noexcept; + [[nodiscard]] std::string_view begin_key() const noexcept; + [[nodiscard]] std::string_view end_key() const noexcept; + [[nodiscard]] kvs::end_point_kind begin_kind(bool use_secondary) const noexcept; + [[nodiscard]] kvs::end_point_kind end_kind(bool use_secondary) const noexcept; + [[nodiscard]] status status_result() const noexcept; + void encode_key(request_context* rc) noexcept; + void dump(std::ostream& out, int indent = 0) const noexcept; private: std::vector begin_columns_{}; kvs::end_point_kind begin_endpoint_{}; std::vector end_columns_{}; kvs::end_point_kind end_endpoint_{}; + data::aligned_buffer key_begin_{}; + data::aligned_buffer key_end_{}; + std::size_t blen_{}; + std::size_t elen_{}; + status status_result_{}; + std::unique_ptr varlen_resource_{}; + [[nodiscard]] kvs::end_point_kind get_kind(bool use_secondary, kvs::end_point_kind endpoint) const noexcept; }; -} +} // namespace jogasaki::executor::process::impl diff --git a/src/jogasaki/executor/process/impl/task_context.cpp b/src/jogasaki/executor/process/impl/task_context.cpp index 21fec4c4a..26717434c 100644 --- a/src/jogasaki/executor/process/impl/task_context.cpp +++ b/src/jogasaki/executor/process/impl/task_context.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -123,6 +123,9 @@ io::record_writer* task_context::external_writer() { class abstract::scan_info const* task_context::scan_info() { return scan_info_.get(); } +std::shared_ptr task_context::non_const_impl_scan_info() { + return scan_info_; +} std::size_t task_context::partition() const noexcept { return partition_; @@ -132,4 +135,4 @@ io::record_channel* task_context::channel() const noexcept { return channel_; } -} +} // namespace jogasaki::executor::process::impl diff --git a/src/jogasaki/executor/process/impl/task_context.h b/src/jogasaki/executor/process/impl/task_context.h index b05eef24b..d55d164cb 100644 --- a/src/jogasaki/executor/process/impl/task_context.h +++ b/src/jogasaki/executor/process/impl/task_context.h @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -78,6 +78,8 @@ class task_context : public abstract::task_context { class abstract::scan_info const* scan_info() override; + std::shared_ptr non_const_impl_scan_info(); + [[nodiscard]] std::size_t partition() const noexcept; [[nodiscard]] io::record_channel* channel() const noexcept; @@ -94,6 +96,4 @@ class task_context : public abstract::task_context { partition_index sink_index_{}; }; -} - - +} // namespace jogasaki::executor::process::impl diff --git a/test/jogasaki/executor/process/ops/scan_test.cpp b/test/jogasaki/executor/process/ops/scan_test.cpp index 0a965e6b3..2aa1486cf 100644 --- a/test/jogasaki/executor/process/ops/scan_test.cpp +++ b/test/jogasaki/executor/process/ops/scan_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -75,6 +75,8 @@ #include #include #include +#include +#include #include "verifier.h" @@ -170,7 +172,7 @@ TEST_F(scan_test, simple) { auto tx = wrap(db_->create_transaction()); auto sinfo = std::make_shared(); mock::task_context task_ctx{ {}, {}, {}, {sinfo}}; - scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), nullptr, tx.get(), sinfo.get(), &resource_, &varlen_resource_); + scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), nullptr, tx.get(), sinfo.get(), request_context_.request_resource(), &varlen_resource_); ASSERT_TRUE(static_cast(op(ctx))); ctx.release(); @@ -235,7 +237,7 @@ TEST_F(scan_test, nullable_fields) { auto tx = wrap(db_->create_transaction()); auto sinfo = std::make_shared(); mock::task_context task_ctx{ {}, {}, {}, {sinfo}}; - scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), nullptr, tx.get(), sinfo.get(), &resource_, &varlen_resource_); + scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), nullptr, tx.get(), sinfo.get(), request_context_.request_resource(), &varlen_resource_); ASSERT_TRUE(static_cast(op(ctx))); ctx.release(); @@ -326,10 +328,14 @@ TEST_F(scan_test, scan_info) { &output_variable_info }; + auto transaction_ctx = std::make_shared(); + transaction_ctx->error_info(create_error_info(error_code::none, "", status::err_unknown)); + request_context_.transaction(transaction_ctx); jogasaki::plan::compiler_context compiler_ctx{}; io_exchange_map exchange_map{}; - operator_builder builder{processor_info_, {}, {}, exchange_map, &resource_}; + operator_builder builder{processor_info_, {}, {}, exchange_map, &request_context_}; auto sinfo = builder.create_scan_info(target, *primary_idx); + sinfo->encode_key(&request_context_); mock::task_context task_ctx{ {}, {}, {}, {sinfo}}; put( *db_, primary_idx->simple_name(), create_record(100, accessor::text{"123456789012345678901234567890/B"}), create_record(1.0)); @@ -337,8 +343,7 @@ TEST_F(scan_test, scan_info) { put( *db_, primary_idx->simple_name(), create_record(100, accessor::text{"123456789012345678901234567890/D"}), create_record(3.0)); auto tx = wrap(db_->create_transaction()); - scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), nullptr, tx.get(), sinfo.get(), &resource_, &varlen_resource_); - + scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), nullptr, tx.get(), sinfo.get(), request_context_.request_resource(), &varlen_resource_); ASSERT_TRUE(static_cast(op(ctx))); ctx.release(); ASSERT_EQ(2, result.size()); @@ -416,9 +421,11 @@ TEST_F(scan_test, secondary_index) { &input_variable_info, &output_variable_info }; - + auto transaction_ctx = std::make_shared(); + transaction_ctx->error_info(create_error_info(error_code::none, "", status::err_unknown)); + request_context_.transaction(transaction_ctx); io_exchange_map exchange_map{}; - operator_builder builder{processor_info_, {}, {}, exchange_map, &resource_}; + operator_builder builder{processor_info_, {}, {}, exchange_map, &request_context_}; auto sinfo = builder.create_scan_info(target, *secondary_idx); mock::task_context task_ctx{ {}, {}, {}, {sinfo}}; @@ -432,7 +439,7 @@ TEST_F(scan_test, secondary_index) { put( *db_, secondary_idx->simple_name(), create_record(300, 30), {}); auto tx = wrap(db_->create_transaction()); - scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), get_storage(*db_, secondary_idx->simple_name()), tx.get(), sinfo.get(), &resource_, &varlen_resource_); + scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), get_storage(*db_, secondary_idx->simple_name()), tx.get(), sinfo.get(), request_context_.request_resource(), &varlen_resource_); ASSERT_TRUE(static_cast(op(ctx))); ctx.release(); @@ -545,10 +552,12 @@ TEST_F(scan_test, host_variables) { &input_variable_info, &output_variable_info }; - + auto transaction_ctx = std::make_shared(); + transaction_ctx->error_info(create_error_info(error_code::none, "", status::err_unknown)); + request_context_.transaction(transaction_ctx); jogasaki::plan::compiler_context compiler_ctx{}; io_exchange_map exchange_map{}; - operator_builder builder{processor_info_, {}, {}, exchange_map, &resource_}; + operator_builder builder{processor_info_, {}, {}, exchange_map, &request_context_}; auto sinfo = builder.create_scan_info(target, *primary_idx); mock::task_context task_ctx{ {}, {}, {}, {sinfo}}; @@ -557,7 +566,7 @@ TEST_F(scan_test, host_variables) { put( *db_, primary_idx->simple_name(), create_record(100, 30), create_record(3)); auto tx = wrap(db_->create_transaction()); - scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), nullptr, tx.get(), sinfo.get(), &resource_, &varlen_resource_); + scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), nullptr, tx.get(), sinfo.get(), request_context_.request_resource(), &varlen_resource_); ASSERT_TRUE(static_cast(op(ctx))); ctx.release(); @@ -566,5 +575,4 @@ TEST_F(scan_test, host_variables) { ASSERT_EQ(status::ok, tx->commit()); } -} - +} // namespace jogasaki::executor::process::impl::ops \ No newline at end of file diff --git a/test/jogasaki/operator_test_utils.h b/test/jogasaki/operator_test_utils.h index 18a08205a..5acb8cb04 100644 --- a/test/jogasaki/operator_test_utils.h +++ b/test/jogasaki/operator_test_utils.h @@ -176,6 +176,7 @@ class operator_test_utils { takatori::plan::process& process_; //NOLINT memory::page_pool pool_{}; //NOLINT + request_context request_context_{}; //NOLINT memory::lifo_paged_memory_resource resource_; //NOLINT memory::lifo_paged_memory_resource varlen_resource_; //NOLINT memory::lifo_paged_memory_resource verifier_varlen_resource_; //NOLINT