Skip to content

Commit

Permalink
Replaced scan_info definition with the range class
Browse files Browse the repository at this point in the history
  • Loading branch information
YoshiakiNishimura committed Oct 31, 2024
1 parent 173de03 commit d304948
Show file tree
Hide file tree
Showing 35 changed files with 608 additions and 592 deletions.
4 changes: 2 additions & 2 deletions examples/process_cli/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
#include <jogasaki/executor/io/record_writer.h>
#include <jogasaki/executor/process/abstract/process_executor.h>
#include <jogasaki/executor/process/abstract/processor.h>
#include <jogasaki/executor/process/abstract/scan_info.h>
#include <jogasaki/executor/process/abstract/range.h>
#include <jogasaki/executor/process/abstract/task_context.h>
#include <jogasaki/executor/process/abstract/work_context.h>
#include <jogasaki/executor/process/impl/ops/default_value_kind.h>
Expand Down Expand Up @@ -383,7 +383,7 @@ class cli {
std::vector<io::reader_container>{r},
std::vector<std::shared_ptr<executor::io::record_writer>>{writer},
std::shared_ptr<executor::io::record_writer>{},
std::shared_ptr<abstract::scan_info>{}
std::shared_ptr<abstract::range>{}
);

ctx->work_context(std::make_unique<process::impl::work_context>(
Expand Down
6 changes: 2 additions & 4 deletions mock/jogasaki/executor/process/mock/task_context.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,4 @@
#include "task_context.h"

namespace jogasaki::executor::process::mock {

}

} // jogasaki::executor::process::mock
16 changes: 7 additions & 9 deletions mock/jogasaki/executor/process/mock/task_context.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,7 +24,7 @@
#include <jogasaki/executor/common/task.h>
#include <jogasaki/executor/io/reader_container.h>
#include <jogasaki/executor/io/record_writer.h>
#include <jogasaki/executor/process/abstract/scan_info.h>
#include <jogasaki/executor/process/abstract/range.h>
#include <jogasaki/executor/process/abstract/task_context.h>
#include <jogasaki/model/step.h>
#include <jogasaki/model/task.h>
Expand All @@ -43,12 +43,12 @@ class task_context : public abstract::task_context {
std::vector<io::reader_container> readers = {},
std::vector<std::shared_ptr<io::record_writer>> downstream_writers = {},
std::shared_ptr<io::record_writer> external_writer = {},
std::shared_ptr<abstract::scan_info> info = {}
std::shared_ptr<abstract::range> range = {}
) :
readers_(std::move(readers)),
downstream_writers_(std::move(downstream_writers)),
external_writer_(std::move(external_writer)),
scan_info_(std::move(info))
range_(std::move(range))
{}

io::reader_container reader(reader_index idx) override {
Expand Down Expand Up @@ -76,10 +76,9 @@ class task_context : public abstract::task_context {
external_writer_->release();
external_writer_.reset();
}
scan_info_.reset();
}

class abstract::scan_info const* scan_info() override {
class abstract::range const* range() override {
return nullptr;
}

Expand All @@ -95,7 +94,6 @@ class task_context : public abstract::task_context {
std::vector<io::reader_container> readers_{};
std::vector<std::shared_ptr<io::record_writer>> downstream_writers_{};
std::shared_ptr<io::record_writer> external_writer_{};
std::shared_ptr<abstract::scan_info> scan_info_{};
std::shared_ptr<abstract::range> range_{};
};

}
} // namespace jogasaki::executor::process::mock
21 changes: 19 additions & 2 deletions src/jogasaki/data/aligned_buffer.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -126,4 +126,21 @@ aligned_buffer& aligned_buffer::assign(std::string_view sv) {
return assign(aligned_buffer{sv});
}

} // namespace
void aligned_buffer::dump(std::ostream& out, int indent) const noexcept{
std::string indent_space(indent, ' ');
out << indent_space << "aligned_buffer:" << "\n";
out << indent_space << " capacity_: " << capacity_ << "\n";
out << indent_space << " alignment_: " << alignment_ << "\n";
out << indent_space << " size_: " << size_ << "\n";
out << indent_space << " data_: " ;
for (std::size_t i = 0; i < size_; ++i) {
out << std::hex << std::setw(2) << std::setfill('0') << static_cast<int>(data_[i]) << " ";
if ((i + 1) % 16 == 0) {
out << std::endl;
}
}
out << std::setfill(' ') << std::dec << std::endl;

}

} // namespace jogasaki::data
11 changes: 9 additions & 2 deletions src/jogasaki/data/aligned_buffer.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -145,6 +145,13 @@ class aligned_buffer {
*/
[[nodiscard]] std::size_t alignment() const noexcept;

/**
* @brief Support for debugging, callable in GDB
* @param out The output stream to which the buffer's internal state will be written.
* @param indent The indentation level for formatting the output, default is 0.
*/
void dump(std::ostream& out, int indent = 0) const noexcept;

/**
* @brief compare two objects
* @param a first arg to compare
Expand Down Expand Up @@ -172,4 +179,4 @@ class aligned_buffer {
void resize_internal(std::size_t sz, bool copydata);
};

} // namespace
} // namespace jogasaki::data
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,27 +18,25 @@
namespace jogasaki::executor::process::abstract {

/**
* @brief scan info
* @details this instance provides specification of scan (e.g. definition of the range of scanned records)
* @brief range
* @details definition of the range of scanned records
*/
class scan_info {
public:
class range {
public:
/**
* @brief create empty object
*/
scan_info() = default;
range() = default;

/**
* @brief destroy the object
*/
virtual ~scan_info() = default;
virtual ~range() = default;

scan_info(scan_info const& other) = default;
scan_info& operator=(scan_info const& other) = default;
scan_info(scan_info&& other) noexcept = default;
scan_info& operator=(scan_info&& other) noexcept = default;
range(range const& other) = default;
range& operator=(range const& other) = default;
range(range&& other) noexcept = default;
range& operator=(range&& other) noexcept = default;
};

}


} // namespace jogasaki::executor::process::abstract
20 changes: 9 additions & 11 deletions src/jogasaki/executor/process/abstract/task_context.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,7 +22,7 @@
#include <jogasaki/executor/io/reader_container.h>
#include <jogasaki/executor/io/record_writer.h>

#include "scan_info.h"
#include "range.h"
#include "work_context.h"

namespace jogasaki::executor::process::abstract {
Expand All @@ -33,7 +33,7 @@ namespace jogasaki::executor::process::abstract {
* input data reader, and transient work area
*
* Depending on whether the processor logic is driven by main/sub input or scan, readers() or
* scan_info() functions are called to locate/retrieve the input data for the task.
* range() functions are called to locate/retrieve the input data for the task.
*
* The knowledge about the number of I/O objects and its index (i.e. what port or exchange the i-th reader/writer
* corresponds to) are shared with processor.
Expand Down Expand Up @@ -104,13 +104,13 @@ class task_context {
*/
[[nodiscard]] virtual io::record_writer* external_writer() = 0;

/**
* @brief accessor to scan information that defines scan specification for the task
* @details processor impl. knows the details scan_info and drives scan operation using it.
* The details of scan_info is transparent to processor context.
/**
* @brief accessor to range information
* @details processor impl. knows the details range and drives scan operation using it.
* The details of range is transparent to processor context.
* @return scan info
*/
[[nodiscard]] virtual class scan_info const* scan_info() = 0;
[[nodiscard]] virtual class range const* range() = 0;

/**
* @brief setter of work context
Expand Down Expand Up @@ -150,6 +150,4 @@ inline bool operator!=(task_context const& a, task_context const& b) noexcept {
return !(a == b);
}

}


} // namespace jogasaki::executor::process::abstract
8 changes: 4 additions & 4 deletions src/jogasaki/executor/process/flow.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -95,7 +95,7 @@ sequence_view<std::shared_ptr<model::task>> flow::create_tasks() {
step_->io_info(),
step_->relation_io_map(),
*step_->io_exchange_map(),
context_->request_resource()
context_
);
} catch (plan::impl::compile_exception const& e) {
error::set_error_info(*context_, e.info());
Expand Down Expand Up @@ -170,7 +170,7 @@ std::shared_ptr<impl::task_context> flow::create_task_context(
*context_,
partition,
operators.io_exchange_map(),
operators.scan_info(), // simply pass back the scan info. In the future, scan can be parallel and different scan info are created and filled into the task context.
operators.range(),
(context_->record_channel() && external_output != nullptr) ? context_->record_channel().get() : nullptr,
sink_index
);
Expand All @@ -190,4 +190,4 @@ std::shared_ptr<impl::task_context> flow::create_task_context(
return ctx;
}

} // namespace jogasaki::executor::process
} // namespace jogasaki::executor::process
33 changes: 33 additions & 0 deletions src/jogasaki/executor/process/impl/bound.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "bound.h"

namespace jogasaki::executor::process::impl {

[[nodiscard]] std::string_view bound::key() const noexcept {
return {static_cast<char*>(key_->data()), len_};
}
[[nodiscard]] kvs::end_point_kind bound::endpointkind() const noexcept { return endpointkind_; }
void bound::dump(std::ostream& out, int indent) const noexcept {
std::string indent_space(indent, ' ');
out << indent_space << "bound:"
<< "\n";
out << indent_space << " endpointkind_: " << endpointkind_ << "\n";
out << indent_space << " len_: " << len_ << "\n";
out << indent_space << " key_: " << *key_ << "\n";
key_->dump(out, indent + 2);
}
} // namespace jogasaki::executor::process::impl
52 changes: 52 additions & 0 deletions src/jogasaki/executor/process/impl/bound.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <jogasaki/data/aligned_buffer.h>
#include <jogasaki/kvs/storage.h>

namespace jogasaki::executor::process::impl {

class bound {

public:
bound() = default;
bound(bound const& other) = delete;
bound& operator=(bound const& other) = delete;
bound(bound&& other) noexcept = delete;
bound& operator=(bound&& other) noexcept = delete;
~bound() = default;
bound(kvs::end_point_kind endpointkind, std::size_t len,
std::unique_ptr<data::aligned_buffer> key)
: endpointkind_(endpointkind), len_(len), key_(std::move(key)) {}
kvs::end_point_kind endpointkind() { return endpointkind_; };
std::unique_ptr<data::aligned_buffer> key() { return std::move(key_); };
[[nodiscard]] std::string_view key() const noexcept;
[[nodiscard]] kvs::end_point_kind endpointkind() const noexcept;
/**
* @brief Support for debugging, callable in GDB
* @param out The output stream to which the buffer's internal state will be written.
* @param indent The indentation level for formatting the output, default is 0.
*/
void dump(std::ostream& out, int indent = 0) const noexcept;

private:
kvs::end_point_kind endpointkind_{};
std::size_t len_{};
std::unique_ptr<data::aligned_buffer> key_{};
};

} // namespace jogasaki::executor::process::impl
Loading

0 comments on commit d304948

Please sign in to comment.