Skip to content

Commit

Permalink
improve executor::commit_async for various commit response
Browse files Browse the repository at this point in the history
  • Loading branch information
kuron99 committed Oct 22, 2024
1 parent 410588c commit 0a0abe9
Show file tree
Hide file tree
Showing 10 changed files with 334 additions and 40 deletions.
8 changes: 8 additions & 0 deletions include/jogasaki/commit_response.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <cstdint>
#include <cstdlib>

#include <takatori/util/enum_set.h>

namespace jogasaki {

enum class commit_response_kind : std::int32_t {
Expand Down Expand Up @@ -58,5 +60,11 @@ inline std::ostream& operator<<(std::ostream& out, commit_response_kind value) {
return out << to_string_view(value);
}

/// @brief a set of commit_response_kind.
using commit_response_kind_set = takatori::util::enum_set<
commit_response_kind,
commit_response_kind::undefined,
commit_response_kind::propagated>;

}

27 changes: 16 additions & 11 deletions src/jogasaki/api/impl/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -539,22 +539,27 @@ void service::command_commit(
commit_option opt{};
opt.auto_dispose_on_success(cm.auto_dispose())
.commit_response(from(cm.notification_type()));
tx.commit_async(
[res, req_info](
status st,
std::shared_ptr<api::error_info> info //NOLINT(performance-unnecessary-value-param)
) {
if(st == jogasaki::status::ok) {
details::success<sql::response::ResultOnly>(*res, req_info);
} else {
VLOG(log_error) << log_location_prefix << info->message();
details::error<sql::response::ResultOnly>(*res, info.get(), req_info);
}

auto tctx = get_transaction_context(tx);
executor::commit_async(
get_impl(*db_),
tctx,
[res, req_info](commit_response_kind kind) {
(void) kind; // for now, callback does same regardless of kind
details::success<sql::response::ResultOnly>(*res, req_info);
},
commit_response_kind_set{opt.commit_response()},
[res, req_info](commit_response_kind kind, status st, std::shared_ptr<error::error_info> info) { //NOLINT(performance-unnecessary-value-param)
(void) kind; // for now, callback does same regardless of kind
(void) st;
VLOG(log_error) << log_location_prefix << info->message();
details::error<sql::response::ResultOnly>(*res, info.get(), req_info);
},
opt,
req_info
);
}

void service::command_rollback(
sql::request::Request const& proto_req,
std::shared_ptr<tateyama::api::server::response> const& res,
Expand Down
80 changes: 80 additions & 0 deletions src/jogasaki/commit_context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2018-2023 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <atomic>
#include <memory>
#include <string>
#include <string_view>

#include <jogasaki/error/error_info.h>
#include <jogasaki/commit_response.h>

namespace jogasaki {

/**
* @brief the callback type used for async commit successful response
*/
using commit_response_callback = std::function<void(commit_response_kind)>;

/**
* @brief the callback type used for async commit error response
*/
using commit_error_callback = std::function<void(commit_response_kind, status, std::shared_ptr<error::error_info>)>;

/**
* @brief context object for tx commit processing
*/
class commit_context {
public:
/**
* @brief create default context object
*/
commit_context() = default;

/**
* @brief create new context object
* @param on_response the callback function to be called when the response is ready
* @param response_kinds the kinds of response to be notified
*/
commit_context(
commit_response_callback on_response,
commit_response_kind_set response_kinds,
commit_error_callback on_error
) : on_response_(std::move(on_response)),
response_kinds_(response_kinds),
on_error_(std::move(on_error))
{}

commit_response_callback& on_response() noexcept {
return on_response_;
}

commit_response_kind_set& response_kinds() noexcept {
return response_kinds_;
}

commit_error_callback& on_error() noexcept {
return on_error_;
}
private:
commit_response_callback on_response_{};
commit_response_kind_set response_kinds_{};
commit_error_callback on_error_{};

};

} // namespace jogasaki
5 changes: 3 additions & 2 deletions src/jogasaki/durability_callback.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <jogasaki/api/impl/request_context_factory.h>
#include <jogasaki/commit_profile.h>
#include <jogasaki/configuration.h>
#include <jogasaki/durability_common.h>
#include <jogasaki/durability_manager.h>
#include <jogasaki/logging.h>
#include <jogasaki/model/task.h>
Expand Down Expand Up @@ -83,7 +84,7 @@ void durability_callback::operator()(durability_callback::marker_type marker) {
VLOG(log_trace) << "/:jogasaki:durability_callback:operator() check_cancel "
<< "--- current:" << marker << " txid:" << e->transaction()->transaction_id() << " marker:" << *e->transaction()->durability_marker();
set_cancel_status(*e);
scheduler::submit_teardown(*e, false, true);
submit_commit_response(e, commit_response_kind::stored, true, true);
}
);
if(mgr->update_current_marker(
Expand All @@ -93,7 +94,7 @@ void durability_callback::operator()(durability_callback::marker_type marker) {
<< "--- current:" << marker << " txid:" << e->transaction()->transaction_id() << " marker:" << *e->transaction()->durability_marker();
request_ctx->job()->request()->affected_txs().add(e->transaction()->transaction_id());
e->transaction()->profile()->set_durability_cb_invoked(durability_callback_invoked);
scheduler::submit_teardown(*e, false, true);
submit_commit_response(e, commit_response_kind::stored, false, true);
})) {
scheduler::submit_teardown(*request_ctx);
return model::task_result::complete;
Expand Down
67 changes: 67 additions & 0 deletions src/jogasaki/durability_common.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2018-2023 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "durability_common.h"

#include <memory>
#include <optional>
#include <ostream>
#include <string_view>
#include <type_traits>
#include <utility>
#include <glog/logging.h>

#include <takatori/util/maybe_shared_ptr.h>

#include <jogasaki/api/impl/database.h>
#include <jogasaki/api/impl/request_context_factory.h>
#include <jogasaki/commit_profile.h>
#include <jogasaki/configuration.h>
#include <jogasaki/durability_manager.h>
#include <jogasaki/logging.h>
#include <jogasaki/model/task.h>
#include <jogasaki/request_context.h>
#include <jogasaki/request_logging.h>
#include <jogasaki/scheduler/flat_task.h>
#include <jogasaki/scheduler/request_detail.h>
#include <jogasaki/scheduler/schedule_option.h>
#include <jogasaki/scheduler/task_factory.h>
#include <jogasaki/scheduler/task_scheduler.h>
#include <jogasaki/transaction_context.h>
#include <jogasaki/utils/set_cancel_status.h>

namespace jogasaki {

void submit_commit_response(
std::shared_ptr<request_context> rctx, //NOLINT(performance-unnecessary-value-param)
commit_response_kind kind,
bool is_error,
bool teardown_try_on_suspended_worker
) {
auto& ts = *rctx->scheduler();
ts.schedule_task(
scheduler::create_custom_task(rctx.get(), [rctx, kind, teardown_try_on_suspended_worker, is_error]() {
if(is_error) {
rctx->commit_ctx()->on_error()(kind, rctx->status_code(), rctx->error_info());
} else {
rctx->commit_ctx()->on_response()(kind);
}
scheduler::submit_teardown(*rctx, false, teardown_try_on_suspended_worker);
return model::task_result::complete;
}, false)
);
}

} // namespace jogasaki
41 changes: 41 additions & 0 deletions src/jogasaki/durability_common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2018-2023 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <atomic>
#include <memory>

#include <jogasaki/request_context.h>
#include <jogasaki/commit_response.h>

namespace jogasaki {

/**
* @brief submit a task to process commit response
* @details this function submits a task to invoke commit response callback and scheduler following teardown task
* @param rctx the request context
* @param kind the kind of the commit response
* @param is_error whether the commit response is an error
* @param teardown_try_on_suspended_worker whether to submit teardown on the suspended worker
*/
void submit_commit_response(
std::shared_ptr<request_context> rctx,
commit_response_kind kind,
bool is_error,
bool teardown_try_on_suspended_worker
);

} // namespace jogasaki
Loading

0 comments on commit 0a0abe9

Please sign in to comment.