Skip to content

Commit

Permalink
Move key_begin and key_end members from scan_context to scan_info
Browse files Browse the repository at this point in the history
  • Loading branch information
YoshiakiNishimura committed Oct 21, 2024
1 parent 410588c commit a159ef1
Show file tree
Hide file tree
Showing 14 changed files with 199 additions and 117 deletions.
21 changes: 19 additions & 2 deletions src/jogasaki/data/aligned_buffer.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -126,4 +126,21 @@ aligned_buffer& aligned_buffer::assign(std::string_view sv) {
return assign(aligned_buffer{sv});
}

} // namespace
void aligned_buffer::dump(std::ostream& out, int indent) const noexcept{
std::string indent_space(indent, ' ');
out << indent_space << "aligned_buffer:" << "\n";
out << indent_space << " capacity_: " << capacity_ << "\n";
out << indent_space << " alignment_: " << alignment_ << "\n";
out << indent_space << " size_: " << size_ << "\n";
out << indent_space << " data_: " ;
for (std::size_t i = 0; i < size_; ++i) {
out << std::hex << std::setw(2) << std::setfill('0') << static_cast<int>(data_[i]) << " ";
if ((i + 1) % 16 == 0) {
out << std::endl;
}
}
out << std::setfill(' ') << std::dec << std::endl;

}

} // namespace jogasaki::data
5 changes: 3 additions & 2 deletions src/jogasaki/data/aligned_buffer.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -162,6 +162,7 @@ class aligned_buffer {
* @return the output
*/
friend std::ostream& operator<<(std::ostream& out, aligned_buffer const& value);
void dump(std::ostream& out, int indent = 0) const noexcept;

private:
std::size_t capacity_{};
Expand All @@ -172,4 +173,4 @@ class aligned_buffer {
void resize_internal(std::size_t sz, bool copydata);
};

} // namespace
} // namespace jogasaki::data
8 changes: 4 additions & 4 deletions src/jogasaki/executor/process/flow.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -95,7 +95,7 @@ sequence_view<std::shared_ptr<model::task>> flow::create_tasks() {
step_->io_info(),
step_->relation_io_map(),
*step_->io_exchange_map(),
context_->request_resource()
context_
);
} catch (plan::impl::compile_exception const& e) {
error::set_error_info(*context_, e.info());
Expand Down Expand Up @@ -175,14 +175,14 @@ std::shared_ptr<impl::task_context> flow::create_task_context(
(context_->record_channel() && external_output != nullptr) ? context_->record_channel().get() : nullptr,
sink_index
);

auto scan = ctx->non_const_impl_scan_info();
ctx->work_context(
std::make_unique<impl::work_context>(
context_,
operators.size(),
info_->vars_info_list().size(),
std::make_unique<memory::lifo_paged_memory_resource>(&global::page_pool()),
std::make_unique<memory::lifo_paged_memory_resource>(&global::page_pool()),
(scan == nullptr)?std::make_unique<memory::lifo_paged_memory_resource>(&global::page_pool()):scan->varlen_resource(),
context_->database(),
context_->transaction(),
empty_input_from_shuffle_
Expand Down
21 changes: 9 additions & 12 deletions src/jogasaki/executor/process/impl/ops/operator_builder.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -76,16 +76,14 @@ operator_builder::operator_builder(
std::shared_ptr<io_info> io_info,
std::shared_ptr<relation_io_map> relation_io_map,
io_exchange_map& io_exchange_map,
memory::lifo_paged_memory_resource* resource
request_context* request_context
) :
info_(std::move(info)),
io_info_(std::move(io_info)),
io_exchange_map_(std::addressof(io_exchange_map)),
relation_io_map_(std::move(relation_io_map)),
resource_(resource)
{
(void)resource_; //TODO remove if not necessary
}
request_context_(request_context)
{}

operator_container operator_builder::operator()()&& {
auto root = dispatch(*this, head());
Expand Down Expand Up @@ -132,7 +130,7 @@ std::unique_ptr<operator_base> operator_builder::operator()(const relation::scan
// scan info is not passed to scan operator here, but passed back through task_context
// in order to support parallel scan in the future
scan_info_ = create_scan_info(node, secondary_or_primary_index);

scan_info_->encode_key(request_context_);
return std::make_unique<scan>(
index_++,
*info_,
Expand Down Expand Up @@ -220,7 +218,7 @@ std::unique_ptr<operator_base> operator_builder::operator()(const relation::writ
write_kind_from(node.operator_kind()),
index,
columns,
resource_
request_context_->request_resource()
);
}

Expand Down Expand Up @@ -404,16 +402,15 @@ operator_container create_operators(
std::shared_ptr<io_info> io_info,
std::shared_ptr<relation_io_map> relation_io_map,
io_exchange_map& io_exchange_map,
memory::lifo_paged_memory_resource* resource
request_context* request_context
) {
return operator_builder{
std::move(info),
std::move(io_info),
std::move(relation_io_map),
io_exchange_map,
resource
request_context
}();
}

}

} // namespace jogasaki::executor::process::impl::ops
12 changes: 5 additions & 7 deletions src/jogasaki/executor/process/impl/ops/operator_builder.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -90,7 +90,7 @@ class operator_builder {
std::shared_ptr<io_info> io_info,
std::shared_ptr<relation_io_map> relation_io_map,
io_exchange_map& io_exchange_map,
memory::lifo_paged_memory_resource* resource = nullptr
request_context* request_context = nullptr
);

[[nodiscard]] operator_container operator()() &&;
Expand Down Expand Up @@ -140,8 +140,7 @@ class operator_builder {
std::shared_ptr<relation_io_map> relation_io_map_{};
operator_base::operator_index_type index_{};
std::shared_ptr<impl::scan_info> scan_info_{};
memory::lifo_paged_memory_resource* resource_{};

request_context* request_context_{};
kvs::end_point_kind from(relation::scan::endpoint::kind_type type);

};
Expand All @@ -160,8 +159,7 @@ class operator_builder {
std::shared_ptr<io_info> io_info,
std::shared_ptr<relation_io_map> relation_io_map,
io_exchange_map& io_exchange_map,
memory::lifo_paged_memory_resource* resource = nullptr
request_context* request_context = nullptr
);

}

} // namespace jogasaki::executor::process::impl::ops
64 changes: 6 additions & 58 deletions src/jogasaki/executor/process/impl/ops/scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,67 +250,15 @@ void scan::finish(abstract::task_context* context) {

status scan::open(scan_context& ctx) { //NOLINT(readability-make-member-function-const)
auto& stg = use_secondary_ ? *ctx.secondary_stg_ : *ctx.stg_;
auto be = ctx.scan_info_->begin_endpoint();
auto ee = ctx.scan_info_->end_endpoint();
if (use_secondary_) {
// at storage layer, secondary index key contains primary key index as postfix
// so boundary condition needs promotion to be compatible
// TODO verify the promotion
if (be == kvs::end_point_kind::inclusive) {
be = kvs::end_point_kind::prefixed_inclusive;
}
if (be == kvs::end_point_kind::exclusive) {
be = kvs::end_point_kind::prefixed_exclusive;
}
if (ee == kvs::end_point_kind::inclusive) {
ee = kvs::end_point_kind::prefixed_inclusive;
}
if (ee == kvs::end_point_kind::exclusive) {
ee = kvs::end_point_kind::prefixed_exclusive;
}
}
executor::process::impl::variable_table vars{};
std::size_t blen{};
std::string msg{};
if(auto res = details::encode_key(
ctx.req_context(),
ctx.scan_info_->begin_columns(),
vars,
*ctx.varlen_resource(),
ctx.key_begin_,
blen,
msg
);
res != status::ok) {
if(res == status::err_type_mismatch) {
// only on err_type_mismatch, msg is filled with error message. use it to create the error info in request context
set_error(*ctx.req_context(), error_code::unsupported_runtime_feature_exception, msg, res);
}
return res;
}
std::size_t elen{};
if(auto res = details::encode_key(
ctx.req_context(),
ctx.scan_info_->end_columns(),
vars,
*ctx.varlen_resource(),
ctx.key_end_,
elen,
msg
);
res != status::ok) {
if(res == status::err_type_mismatch) {
// only on err_type_mismatch, msg is filled with error message. use it to create the error info in request context
set_error(*ctx.req_context(), error_code::unsupported_runtime_feature_exception, msg, res);
}
return res;
if (auto status = ctx.scan_info_->status_result(); status != status::ok) {
return status;
}
if(auto res = stg.content_scan(
*ctx.tx_,
{static_cast<char*>(ctx.key_begin_.data()), blen},
be,
{static_cast<char*>(ctx.key_end_.data()), elen},
ee,
ctx.scan_info_->begin_key(),
ctx.scan_info_->begin_kind(use_secondary_),
ctx.scan_info_->end_key(),
ctx.scan_info_->end_kind(use_secondary_),
ctx.it_
); res != status::ok) {
handle_kvs_errors(*ctx.req_context(), res);
Expand Down
6 changes: 3 additions & 3 deletions src/jogasaki/executor/process/impl/processor.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -43,7 +43,7 @@ processor::processor(
std::shared_ptr<ops::io_info> io_info,
std::shared_ptr<relation_io_map> relation_io_map,
io_exchange_map& io_exchange_map,
memory::lifo_paged_memory_resource* resource
request_context* resource
) :
info_(std::move(info)),
operators_(
Expand Down Expand Up @@ -96,4 +96,4 @@ ops::operator_container const& processor::operators() const noexcept {
return operators_;
}

}
} // namespace jogasaki::executor::process::impl
8 changes: 3 additions & 5 deletions src/jogasaki/executor/process/impl/processor.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -56,7 +56,7 @@ class processor : public process::abstract::processor {
std::shared_ptr<ops::io_info> io_info,
std::shared_ptr<relation_io_map> relation_io_map,
io_exchange_map& io_exchange_map,
memory::lifo_paged_memory_resource* resource
request_context* resource
);

/**
Expand All @@ -78,6 +78,4 @@ class processor : public process::abstract::processor {
std::shared_ptr<relation_io_map> relation_io_map_{};
};

}


} // namespace jogasaki::executor::process::impl
Loading

0 comments on commit a159ef1

Please sign in to comment.