Skip to content

Commit

Permalink
Move key_begin and key_end members from scan_context to scan_info
Browse files Browse the repository at this point in the history
  • Loading branch information
YoshiakiNishimura committed Oct 24, 2024
1 parent 835dda8 commit 62e6c1a
Show file tree
Hide file tree
Showing 16 changed files with 256 additions and 159 deletions.
21 changes: 19 additions & 2 deletions src/jogasaki/data/aligned_buffer.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -126,4 +126,21 @@ aligned_buffer& aligned_buffer::assign(std::string_view sv) {
return assign(aligned_buffer{sv});
}

} // namespace
void aligned_buffer::dump(std::ostream& out, int indent) const noexcept{
std::string indent_space(indent, ' ');
out << indent_space << "aligned_buffer:" << "\n";
out << indent_space << " capacity_: " << capacity_ << "\n";
out << indent_space << " alignment_: " << alignment_ << "\n";
out << indent_space << " size_: " << size_ << "\n";
out << indent_space << " data_: " ;
for (std::size_t i = 0; i < size_; ++i) {
out << std::hex << std::setw(2) << std::setfill('0') << static_cast<int>(data_[i]) << " ";
if ((i + 1) % 16 == 0) {
out << std::endl;
}
}
out << std::setfill(' ') << std::dec << std::endl;

}

} // namespace jogasaki::data
11 changes: 8 additions & 3 deletions src/jogasaki/data/aligned_buffer.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -139,7 +139,12 @@ class aligned_buffer {
* The alignment is not changed.
*/
void shrink_to_fit();

/**
* @brief Support for debugging, callable in GDB
* @param out The output stream to which the buffer's internal state will be written.
* @param indent The indentation level for formatting the output, default is 0.
*/
void dump(std::ostream& out, int indent = 0) const noexcept;
/**
* @brief return alignment of the buffer
*/
Expand Down Expand Up @@ -172,4 +177,4 @@ class aligned_buffer {
void resize_internal(std::size_t sz, bool copydata);
};

} // namespace
} // namespace jogasaki::data
10 changes: 5 additions & 5 deletions src/jogasaki/executor/process/flow.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -95,7 +95,7 @@ sequence_view<std::shared_ptr<model::task>> flow::create_tasks() {
step_->io_info(),
step_->relation_io_map(),
*step_->io_exchange_map(),
context_->request_resource()
context_
);
} catch (plan::impl::compile_exception const& e) {
error::set_error_info(*context_, e.info());
Expand Down Expand Up @@ -175,14 +175,14 @@ std::shared_ptr<impl::task_context> flow::create_task_context(
(context_->record_channel() && external_output != nullptr) ? context_->record_channel().get() : nullptr,
sink_index
);

auto scan = ctx->shared_scan_info();
ctx->work_context(
std::make_unique<impl::work_context>(
context_,
operators.size(),
info_->vars_info_list().size(),
std::make_unique<memory::lifo_paged_memory_resource>(&global::page_pool()),
std::make_unique<memory::lifo_paged_memory_resource>(&global::page_pool()),
(scan == nullptr)?std::make_unique<memory::lifo_paged_memory_resource>(&global::page_pool()):scan->varlen_resource(),
context_->database(),
context_->transaction(),
empty_input_from_shuffle_
Expand All @@ -191,4 +191,4 @@ std::shared_ptr<impl::task_context> flow::create_task_context(
return ctx;
}

} // namespace jogasaki::executor::process
} // namespace jogasaki::executor::process
36 changes: 19 additions & 17 deletions src/jogasaki/executor/process/impl/ops/operator_builder.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -76,16 +76,14 @@ operator_builder::operator_builder(
std::shared_ptr<io_info> io_info,
std::shared_ptr<relation_io_map> relation_io_map,
io_exchange_map& io_exchange_map,
memory::lifo_paged_memory_resource* resource
request_context* request_context
) :
info_(std::move(info)),
io_info_(std::move(io_info)),
io_exchange_map_(std::addressof(io_exchange_map)),
relation_io_map_(std::move(relation_io_map)),
resource_(resource)
{
(void)resource_; //TODO remove if not necessary
}
request_context_(request_context)
{}

operator_container operator_builder::operator()()&& {
auto root = dispatch(*this, head());
Expand Down Expand Up @@ -131,8 +129,7 @@ std::unique_ptr<operator_base> operator_builder::operator()(const relation::scan

// scan info is not passed to scan operator here, but passed back through task_context
// in order to support parallel scan in the future
scan_info_ = create_scan_info(node, secondary_or_primary_index);

scan_info_ = create_scan_info(node, secondary_or_primary_index,std::make_unique<ops::context_base::memory_resource>(&global::page_pool()),request_context_);
return std::make_unique<scan>(
index_++,
*info_,
Expand Down Expand Up @@ -220,7 +217,7 @@ std::unique_ptr<operator_base> operator_builder::operator()(const relation::writ
write_kind_from(node.operator_kind()),
index,
columns,
resource_
request_context_->request_resource()
);
}

Expand Down Expand Up @@ -360,7 +357,9 @@ std::shared_ptr<impl::scan_info>
operator_builder::create_scan_info(
operator_builder::endpoint const& lower,
operator_builder::endpoint const& upper,
yugawara::storage::index const& index
yugawara::storage::index const& index,
std::unique_ptr<ops::context_base::memory_resource> varlen_resource,
request_context* request_context
) {
return std::make_shared<impl::scan_info>(
details::create_search_key_fields(
Expand All @@ -374,15 +373,19 @@ operator_builder::create_scan_info(
upper.keys(),
*info_
),
from(upper.kind())
from(upper.kind()),
std::move(varlen_resource),
request_context
);
}

std::shared_ptr<impl::scan_info> operator_builder::create_scan_info(
relation::scan const& node,
yugawara::storage::index const& index
yugawara::storage::index const& index,
std::unique_ptr<ops::context_base::memory_resource> varlen_resource,
request_context* request_context
) {
return create_scan_info(node.lower(), node.upper(), index);
return create_scan_info(node.lower(), node.upper(), index, std::move(varlen_resource), request_context);
}

kvs::end_point_kind operator_builder::from(relation::scan::endpoint::kind_type type) {
Expand All @@ -404,16 +407,15 @@ operator_container create_operators(
std::shared_ptr<io_info> io_info,
std::shared_ptr<relation_io_map> relation_io_map,
io_exchange_map& io_exchange_map,
memory::lifo_paged_memory_resource* resource
request_context* request_context
) {
return operator_builder{
std::move(info),
std::move(io_info),
std::move(relation_io_map),
io_exchange_map,
resource
request_context
}();
}

}

} // namespace jogasaki::executor::process::impl::ops
20 changes: 11 additions & 9 deletions src/jogasaki/executor/process/impl/ops/operator_builder.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -90,7 +90,7 @@ class operator_builder {
std::shared_ptr<io_info> io_info,
std::shared_ptr<relation_io_map> relation_io_map,
io_exchange_map& io_exchange_map,
memory::lifo_paged_memory_resource* resource = nullptr
request_context* request_context = nullptr
);

[[nodiscard]] operator_container operator()() &&;
Expand Down Expand Up @@ -124,12 +124,16 @@ class operator_builder {
std::shared_ptr<impl::scan_info> create_scan_info(
endpoint const& lower,
endpoint const& upper,
yugawara::storage::index const& index
yugawara::storage::index const& index,
std::unique_ptr<ops::context_base::memory_resource> varlen_resource,
request_context* request_context
);

std::shared_ptr<impl::scan_info> create_scan_info(
relation::scan const& node,
yugawara::storage::index const& index
yugawara::storage::index const& index,
std::unique_ptr<ops::context_base::memory_resource> varlen_resource,
request_context* request_context
);


Expand All @@ -140,8 +144,7 @@ class operator_builder {
std::shared_ptr<relation_io_map> relation_io_map_{};
operator_base::operator_index_type index_{};
std::shared_ptr<impl::scan_info> scan_info_{};
memory::lifo_paged_memory_resource* resource_{};

request_context* request_context_{};
kvs::end_point_kind from(relation::scan::endpoint::kind_type type);

};
Expand All @@ -160,8 +163,7 @@ class operator_builder {
std::shared_ptr<io_info> io_info,
std::shared_ptr<relation_io_map> relation_io_map,
io_exchange_map& io_exchange_map,
memory::lifo_paged_memory_resource* resource = nullptr
request_context* request_context = nullptr
);

}

} // namespace jogasaki::executor::process::impl::ops
66 changes: 7 additions & 59 deletions src/jogasaki/executor/process/impl/ops/scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,67 +251,15 @@ void scan::finish(abstract::task_context* context) {

status scan::open(scan_context& ctx) { //NOLINT(readability-make-member-function-const)
auto& stg = use_secondary_ ? *ctx.secondary_stg_ : *ctx.stg_;
auto be = ctx.scan_info_->begin_endpoint();
auto ee = ctx.scan_info_->end_endpoint();
if (use_secondary_) {
// at storage layer, secondary index key contains primary key index as postfix
// so boundary condition needs promotion to be compatible
// TODO verify the promotion
if (be == kvs::end_point_kind::inclusive) {
be = kvs::end_point_kind::prefixed_inclusive;
}
if (be == kvs::end_point_kind::exclusive) {
be = kvs::end_point_kind::prefixed_exclusive;
}
if (ee == kvs::end_point_kind::inclusive) {
ee = kvs::end_point_kind::prefixed_inclusive;
}
if (ee == kvs::end_point_kind::exclusive) {
ee = kvs::end_point_kind::prefixed_exclusive;
}
}
executor::process::impl::variable_table vars{};
std::size_t blen{};
std::string msg{};
if(auto res = details::encode_key(
ctx.req_context(),
ctx.scan_info_->begin_columns(),
vars,
*ctx.varlen_resource(),
ctx.key_begin_,
blen,
msg
);
res != status::ok) {
if(res == status::err_type_mismatch) {
// only on err_type_mismatch, msg is filled with error message. use it to create the error info in request context
set_error(*ctx.req_context(), error_code::unsupported_runtime_feature_exception, msg, res);
}
return res;
}
std::size_t elen{};
if(auto res = details::encode_key(
ctx.req_context(),
ctx.scan_info_->end_columns(),
vars,
*ctx.varlen_resource(),
ctx.key_end_,
elen,
msg
);
res != status::ok) {
if(res == status::err_type_mismatch) {
// only on err_type_mismatch, msg is filled with error message. use it to create the error info in request context
set_error(*ctx.req_context(), error_code::unsupported_runtime_feature_exception, msg, res);
}
return res;
if (auto status = ctx.scan_info_->status_result(); status != status::ok) {
return status;
}
if(auto res = stg.content_scan(
*ctx.tx_,
{static_cast<char*>(ctx.key_begin_.data()), blen},
be,
{static_cast<char*>(ctx.key_end_.data()), elen},
ee,
ctx.scan_info_->begin_key(),
ctx.scan_info_->begin_kind(use_secondary_),
ctx.scan_info_->end_key(),
ctx.scan_info_->end_kind(use_secondary_),
ctx.it_
); res != status::ok) {
handle_kvs_errors(*ctx.req_context(), res);
Expand Down Expand Up @@ -371,4 +319,4 @@ void scan::dump() const noexcept {
std::cerr << head << std::setw(width) << "field_mapper_:"
<< "not implemented yet" << std::endl;
}
} // namespace jogasaki::executor::process::impl::ops
} // namespace jogasaki::executor::process::impl::ops
12 changes: 3 additions & 9 deletions src/jogasaki/executor/process/impl/ops/scan_context.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -69,13 +69,7 @@ void scan_context::dump() const noexcept {
<< " " << std::setw(20) << "iterator:"
<< (it_ ? it_.get() : nullptr) << "\n"
<< " " << std::setw(20) << "scan_info:"
<< (scan_info_ ? scan_info_ : nullptr) << "\n"
<< " " << std::setw(20) << "key_begin_size:"
<< key_begin_.size() << "\n"
<< " " << std::setw(20) << "key_end_size:"
<< key_end_.size() << std::endl;
<< (scan_info_ ? scan_info_ : nullptr) << "\n";
}

}


} // namespace jogasaki::executor::process::impl::ops
8 changes: 2 additions & 6 deletions src/jogasaki/executor/process/impl/ops/scan_context.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -75,10 +75,6 @@ class scan_context : public context_base {
transaction_context* tx_{};
std::unique_ptr<kvs::iterator> it_{};
impl::scan_info const* scan_info_{};
data::aligned_buffer key_begin_{};
data::aligned_buffer key_end_{};
};

}


} // namespace jogasaki::executor::process::impl::ops
Loading

0 comments on commit 62e6c1a

Please sign in to comment.