Skip to content

Commit

Permalink
Move key_begin and key_end members from scan_context to scan_info
Browse files Browse the repository at this point in the history
  • Loading branch information
YoshiakiNishimura committed Oct 17, 2024
1 parent d19fb40 commit b6a77cd
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 88 deletions.
5 changes: 4 additions & 1 deletion src/jogasaki/executor/process/flow.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -188,6 +188,9 @@ std::shared_ptr<impl::task_context> flow::create_task_context(
empty_input_from_shuffle_
)
);
if (ctx->scan_info() != nullptr){
ctx->encode_key();
}
return ctx;
}

Expand Down
64 changes: 6 additions & 58 deletions src/jogasaki/executor/process/impl/ops/scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,67 +250,15 @@ void scan::finish(abstract::task_context* context) {

status scan::open(scan_context& ctx) { //NOLINT(readability-make-member-function-const)
auto& stg = use_secondary_ ? *ctx.secondary_stg_ : *ctx.stg_;
auto be = ctx.scan_info_->begin_endpoint();
auto ee = ctx.scan_info_->end_endpoint();
if (use_secondary_) {
// at storage layer, secondary index key contains primary key index as postfix
// so boundary condition needs promotion to be compatible
// TODO verify the promotion
if (be == kvs::end_point_kind::inclusive) {
be = kvs::end_point_kind::prefixed_inclusive;
}
if (be == kvs::end_point_kind::exclusive) {
be = kvs::end_point_kind::prefixed_exclusive;
}
if (ee == kvs::end_point_kind::inclusive) {
ee = kvs::end_point_kind::prefixed_inclusive;
}
if (ee == kvs::end_point_kind::exclusive) {
ee = kvs::end_point_kind::prefixed_exclusive;
}
}
executor::process::impl::variable_table vars{};
std::size_t blen{};
std::string msg{};
if(auto res = details::encode_key(
ctx.req_context(),
ctx.scan_info_->begin_columns(),
vars,
*ctx.varlen_resource(),
ctx.key_begin_,
blen,
msg
);
res != status::ok) {
if(res == status::err_type_mismatch) {
// only on err_type_mismatch, msg is filled with error message. use it to create the error info in request context
set_error(*ctx.req_context(), error_code::unsupported_runtime_feature_exception, msg, res);
}
return res;
}
std::size_t elen{};
if(auto res = details::encode_key(
ctx.req_context(),
ctx.scan_info_->end_columns(),
vars,
*ctx.varlen_resource(),
ctx.key_end_,
elen,
msg
);
res != status::ok) {
if(res == status::err_type_mismatch) {
// only on err_type_mismatch, msg is filled with error message. use it to create the error info in request context
set_error(*ctx.req_context(), error_code::unsupported_runtime_feature_exception, msg, res);
}
return res;
if (auto status = ctx.scan_info_->status_result(); status != status::ok) {
return status;
}
if(auto res = stg.content_scan(
*ctx.tx_,
{static_cast<char*>(ctx.key_begin_.data()), blen},
be,
{static_cast<char*>(ctx.key_end_.data()), elen},
ee,
ctx.scan_info_->begin_key(),
ctx.scan_info_->begin_kind(use_secondary_),
ctx.scan_info_->end_key(),
ctx.scan_info_->end_kind(use_secondary_),
ctx.it_
); res != status::ok) {
handle_kvs_errors(*ctx.req_context(), res);
Expand Down
14 changes: 3 additions & 11 deletions src/jogasaki/executor/process/impl/ops/scan_context.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -67,15 +67,7 @@ void scan_context::dump() const noexcept {
<< " " << std::setw(20) << "transaction_context:"
<< (tx_ ? tx_ : nullptr) << "\n"
<< " " << std::setw(20) << "iterator:"
<< (it_ ? it_.get() : nullptr) << "\n"
<< " " << std::setw(20) << "scan_info:"
<< (scan_info_ ? scan_info_ : nullptr) << "\n"
<< " " << std::setw(20) << "key_begin_size:"
<< key_begin_.size() << "\n"
<< " " << std::setw(20) << "key_end_size:"
<< key_end_.size() << std::endl;
<< (it_ ? it_.get() : nullptr) << "\n";
}

}


} // namespace jogasaki::executor::process::impl::ops
9 changes: 2 additions & 7 deletions src/jogasaki/executor/process/impl/ops/scan_context.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -63,7 +63,6 @@ class scan_context : public context_base {
void release() override;

[[nodiscard]] transaction_context* transaction() const noexcept;

/**
* @brief Support for debugging, callable in GDB: ctx->dump()
*/
Expand All @@ -75,10 +74,6 @@ class scan_context : public context_base {
transaction_context* tx_{};
std::unique_ptr<kvs::iterator> it_{};
impl::scan_info const* scan_info_{};
data::aligned_buffer key_begin_{};
data::aligned_buffer key_end_{};
};

}


} // namespace jogasaki::executor::process::impl::ops
46 changes: 45 additions & 1 deletion src/jogasaki/executor/process/impl/scan_info.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,4 +50,48 @@ kvs::end_point_kind scan_info::end_endpoint() const noexcept {
return end_endpoint_;
}

[[nodiscard]] data::aligned_buffer & scan_info::key_begin() noexcept {
return key_begin_;
}

[[nodiscard]] data::aligned_buffer & scan_info::key_end() noexcept {
return key_end_;
}
void scan_info::blen(std::size_t s) noexcept {
blen_ = s;
}
void scan_info::elen(std::size_t s) noexcept {
elen_ = s;
}
[[nodiscard]] std::string_view scan_info::begin_key() const noexcept{
return {static_cast<char*>(key_begin_.data()), blen_};
}
[[nodiscard]] std::string_view scan_info::end_key() const noexcept{
return {static_cast<char*>(key_end_.data()), elen_};
}
[[nodiscard]] kvs::end_point_kind scan_info::get_kind(bool use_secondary, kvs::end_point_kind endpoint) const noexcept {
if (use_secondary) {
if (endpoint == kvs::end_point_kind::inclusive) {
return kvs::end_point_kind::prefixed_inclusive;
}
if (endpoint == kvs::end_point_kind::exclusive) {
return kvs::end_point_kind::prefixed_exclusive;
}
}
return endpoint;
}
[[nodiscard]] kvs::end_point_kind scan_info::begin_kind(bool use_secondary) const noexcept{
return get_kind(use_secondary, begin_endpoint_);
}
[[nodiscard]] kvs::end_point_kind scan_info::end_kind(bool use_secondary) const noexcept{
return get_kind(use_secondary, end_endpoint_);
}

[[nodiscard]] status scan_info::status_result() const noexcept{
return status_result_;
}
void scan_info::status_result(status s) noexcept{
status_result_ = s;
}

} // namespace jogasaki::executor::process::impl
23 changes: 19 additions & 4 deletions src/jogasaki/executor/process/impl/scan_info.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@

#include <vector>

#include <jogasaki/data/aligned_buffer.h>
#include <jogasaki/executor/process/abstract/scan_info.h>
#include <jogasaki/executor/process/impl/ops/details/search_key_field_info.h>
#include <jogasaki/kvs/storage.h>
Expand Down Expand Up @@ -50,14 +51,28 @@ class scan_info : public abstract::scan_info {
[[nodiscard]] std::vector<ops::details::search_key_field_info> const& end_columns() const noexcept;
[[nodiscard]] kvs::end_point_kind begin_endpoint() const noexcept;
[[nodiscard]] kvs::end_point_kind end_endpoint() const noexcept;
[[nodiscard]] data::aligned_buffer & key_begin() noexcept;
[[nodiscard]] data::aligned_buffer & key_end() noexcept;
void blen(std::size_t s) noexcept;
void elen(std::size_t s) noexcept;
[[nodiscard]] std::string_view begin_key() const noexcept;
[[nodiscard]] std::string_view end_key() const noexcept;
[[nodiscard]] kvs::end_point_kind begin_kind(bool use_secondary) const noexcept;
[[nodiscard]] kvs::end_point_kind end_kind(bool use_secondary) const noexcept;
[[nodiscard]] status status_result() const noexcept;
void status_result(status s) noexcept;

private:
std::vector<ops::details::search_key_field_info> begin_columns_{};
kvs::end_point_kind begin_endpoint_{};
std::vector<ops::details::search_key_field_info> end_columns_{};
kvs::end_point_kind end_endpoint_{};
data::aligned_buffer key_begin_{};
data::aligned_buffer key_end_{};
std::size_t blen_{};
std::size_t elen_{};
status status_result_{};
[[nodiscard]] kvs::end_point_kind get_kind(bool use_secondary, kvs::end_point_kind endpoint) const noexcept;
};

}


} // namespace jogasaki::executor::process::impl
53 changes: 52 additions & 1 deletion src/jogasaki/executor/process/impl/task_context.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,8 @@

#include <jogasaki/logging.h>
#include <jogasaki/logging_helper.h>
#include <jogasaki/error/error_info_factory.h>
#include <jogasaki/executor/process/impl/ops/details/encode_key.h>
#include <jogasaki/executor/exchange/aggregate/flow.h>
#include <jogasaki/executor/exchange/forward/flow.h>
#include <jogasaki/executor/exchange/group/flow.h>
Expand Down Expand Up @@ -132,4 +134,53 @@ io::record_channel* task_context::channel() const noexcept {
return channel_;
}

impl::work_context& task_context::getImplWorkContext() const {
return *dynamic_cast<impl::work_context*>(work_context());
}

void task_context::encode_key() noexcept {
std::size_t blen{};
std::string msg{};
executor::process::impl::variable_table vars{};
if(auto res = impl::ops::details::encode_key(
request_context_,
scan_info_->begin_columns(),
vars,
*getImplWorkContext().varlen_resource(),
scan_info_->key_begin(),
blen,
msg
);
res != status::ok) {
if(res == status::err_type_mismatch) {
// only on err_type_mismatch, msg is filled with error message. use it to create the error info in request context
set_error(*request_context_, error_code::unsupported_runtime_feature_exception, msg, res);
}
scan_info_->status_result(res);
return;
}
scan_info_->blen(blen);
std::size_t elen{};
if(auto res = impl::ops::details::encode_key(
request_context_,
scan_info_->end_columns(),
vars,
*getImplWorkContext().varlen_resource(),
scan_info_->key_end(),
elen,
msg
);
res != status::ok) {
if(res == status::err_type_mismatch) {
// only on err_type_mismatch, msg is filled with error message. use it to create the error info in request context
set_error(*request_context_, error_code::unsupported_runtime_feature_exception, msg, res);
}
scan_info_->status_result(res);
return;
}
scan_info_->elen(elen);
scan_info_->status_result(status::ok);
return;

Check warning on line 183 in src/jogasaki/executor/process/impl/task_context.cpp

View workflow job for this annotation

GitHub Actions / Clang-Tidy

readability-redundant-control-flow

redundant return statement at the end of a function with a void return type

Check warning on line 183 in src/jogasaki/executor/process/impl/task_context.cpp

View workflow job for this annotation

GitHub Actions / Clang-Tidy-pr

readability-redundant-control-flow

redundant return statement at the end of a function with a void return type
}

} // namespace jogasaki::executor::process::impl
12 changes: 8 additions & 4 deletions src/jogasaki/executor/process/impl/task_context.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,6 +32,7 @@
#include <jogasaki/executor/process/abstract/work_context.h>
#include <jogasaki/executor/process/impl/ops/emit.h>
#include <jogasaki/executor/process/impl/scan_info.h>
#include <jogasaki/executor/process/impl/work_context.h>
#include <jogasaki/executor/process/io_exchange_map.h>
#include <jogasaki/request_context.h>

Expand Down Expand Up @@ -84,6 +85,10 @@ class task_context : public abstract::task_context {

void deactivate_writer(writer_index idx) override;

impl::work_context& getImplWorkContext() const;

Check warning on line 88 in src/jogasaki/executor/process/impl/task_context.h

View workflow job for this annotation

GitHub Actions / Clang-Tidy

modernize-use-nodiscard

function 'getImplWorkContext' should be marked [[nodiscard]]

Check warning on line 88 in src/jogasaki/executor/process/impl/task_context.h

View workflow job for this annotation

GitHub Actions / Clang-Tidy-pr

modernize-use-nodiscard

function 'getImplWorkContext' should be marked [[nodiscard]]

void encode_key() noexcept;

private:
request_context* request_context_{};
std::size_t partition_{};
Expand All @@ -92,8 +97,7 @@ class task_context : public abstract::task_context {
io::record_channel* channel_{};
std::shared_ptr<io::record_writer> external_writer_{};
partition_index sink_index_{};
status status_result_{};

Check failure on line 100 in src/jogasaki/executor/process/impl/task_context.h

View workflow job for this annotation

GitHub Actions / Clang-Tidy

clang-diagnostic-unused-private-field

private field 'status_result_' is not used

Check failure on line 100 in src/jogasaki/executor/process/impl/task_context.h

View workflow job for this annotation

GitHub Actions / Clang-Tidy-pr

clang-diagnostic-unused-private-field

private field 'status_result_' is not used
};

}


} // namespace jogasaki::executor::process::impl
2 changes: 1 addition & 1 deletion src/jogasaki/serializer/value_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class value_writer {

/// @brief Size the size type represents the number of bytes to write.
using size_type = Size;

/// @brief the result type of `T::write(char const*, size_type)`.
using result_type = decltype(
std::declval<writer_type>().write(
Expand Down

0 comments on commit b6a77cd

Please sign in to comment.