diff --git a/include/jogasaki/commit_response.h b/include/jogasaki/commit_response.h index 7c3734c5..a5529341 100644 --- a/include/jogasaki/commit_response.h +++ b/include/jogasaki/commit_response.h @@ -20,6 +20,8 @@ #include #include +#include + namespace jogasaki { enum class commit_response_kind : std::int32_t { @@ -58,5 +60,11 @@ inline std::ostream& operator<<(std::ostream& out, commit_response_kind value) { return out << to_string_view(value); } +/// @brief a set of commit_response_kind. +using commit_response_kind_set = takatori::util::enum_set< + commit_response_kind, + commit_response_kind::undefined, + commit_response_kind::propagated>; + } diff --git a/src/jogasaki/api/impl/service.cpp b/src/jogasaki/api/impl/service.cpp index 41107d9f..17c266b7 100644 --- a/src/jogasaki/api/impl/service.cpp +++ b/src/jogasaki/api/impl/service.cpp @@ -539,22 +539,27 @@ void service::command_commit( commit_option opt{}; opt.auto_dispose_on_success(cm.auto_dispose()) .commit_response(from(cm.notification_type())); - tx.commit_async( - [res, req_info]( - status st, - std::shared_ptr info //NOLINT(performance-unnecessary-value-param) - ) { - if(st == jogasaki::status::ok) { - details::success(*res, req_info); - } else { - VLOG(log_error) << log_location_prefix << info->message(); - details::error(*res, info.get(), req_info); - } + + auto tctx = get_transaction_context(tx); + executor::commit_async( + get_impl(*db_), + tctx, + [res, req_info](commit_response_kind kind) { + (void) kind; // for now, callback does same regardless of kind + details::success(*res, req_info); + }, + commit_response_kind_set{opt.commit_response()}, + [res, req_info](commit_response_kind kind, status st, std::shared_ptr info) { //NOLINT(performance-unnecessary-value-param) + (void) kind; // for now, callback does same regardless of kind + (void) st; + VLOG(log_error) << log_location_prefix << info->message(); + details::error(*res, info.get(), req_info); }, opt, req_info ); } + void service::command_rollback( sql::request::Request const& proto_req, std::shared_ptr const& res, diff --git a/src/jogasaki/commit_context.h b/src/jogasaki/commit_context.h new file mode 100644 index 00000000..1127d1e4 --- /dev/null +++ b/src/jogasaki/commit_context.h @@ -0,0 +1,80 @@ +/* + * Copyright 2018-2023 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include + +#include +#include + +namespace jogasaki { + +/** + * @brief the callback type used for async commit successful response + */ +using commit_response_callback = std::function; + +/** + * @brief the callback type used for async commit error response + */ +using commit_error_callback = std::function)>; + +/** + * @brief context object for tx commit processing + */ +class commit_context { +public: + /** + * @brief create default context object + */ + commit_context() = default; + + /** + * @brief create new context object + * @param on_response the callback function to be called when the response is ready + * @param response_kinds the kinds of response to be notified + */ + commit_context( + commit_response_callback on_response, + commit_response_kind_set response_kinds, + commit_error_callback on_error + ) : on_response_(std::move(on_response)), + response_kinds_(response_kinds), + on_error_(std::move(on_error)) + {} + + commit_response_callback& on_response() noexcept { + return on_response_; + } + + commit_response_kind_set& response_kinds() noexcept { + return response_kinds_; + } + + commit_error_callback& on_error() noexcept { + return on_error_; + } +private: + commit_response_callback on_response_{}; + commit_response_kind_set response_kinds_{}; + commit_error_callback on_error_{}; + +}; + +} // namespace jogasaki diff --git a/src/jogasaki/durability_callback.cpp b/src/jogasaki/durability_callback.cpp index b7233dc4..1344ec0b 100644 --- a/src/jogasaki/durability_callback.cpp +++ b/src/jogasaki/durability_callback.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -83,7 +84,7 @@ void durability_callback::operator()(durability_callback::marker_type marker) { VLOG(log_trace) << "/:jogasaki:durability_callback:operator() check_cancel " << "--- current:" << marker << " txid:" << e->transaction()->transaction_id() << " marker:" << *e->transaction()->durability_marker(); set_cancel_status(*e); - scheduler::submit_teardown(*e, false, true); + submit_commit_response(e, commit_response_kind::stored, true, true); } ); if(mgr->update_current_marker( @@ -93,7 +94,7 @@ void durability_callback::operator()(durability_callback::marker_type marker) { << "--- current:" << marker << " txid:" << e->transaction()->transaction_id() << " marker:" << *e->transaction()->durability_marker(); request_ctx->job()->request()->affected_txs().add(e->transaction()->transaction_id()); e->transaction()->profile()->set_durability_cb_invoked(durability_callback_invoked); - scheduler::submit_teardown(*e, false, true); + submit_commit_response(e, commit_response_kind::stored, false, true); })) { scheduler::submit_teardown(*request_ctx); return model::task_result::complete; diff --git a/src/jogasaki/durability_common.cpp b/src/jogasaki/durability_common.cpp new file mode 100644 index 00000000..740ad037 --- /dev/null +++ b/src/jogasaki/durability_common.cpp @@ -0,0 +1,67 @@ +/* + * Copyright 2018-2023 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "durability_common.h" + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace jogasaki { + +void submit_commit_response( + std::shared_ptr rctx, //NOLINT(performance-unnecessary-value-param) + commit_response_kind kind, + bool is_error, + bool teardown_try_on_suspended_worker +) { + auto& ts = *rctx->scheduler(); + ts.schedule_task( + scheduler::create_custom_task(rctx.get(), [rctx, kind, teardown_try_on_suspended_worker, is_error]() { + if(is_error) { + rctx->commit_ctx()->on_error()(kind, rctx->status_code(), rctx->error_info()); + } else { + rctx->commit_ctx()->on_response()(kind); + } + scheduler::submit_teardown(*rctx, false, teardown_try_on_suspended_worker); + return model::task_result::complete; + }, false) + ); +} + +} // namespace jogasaki diff --git a/src/jogasaki/durability_common.h b/src/jogasaki/durability_common.h new file mode 100644 index 00000000..f54755f3 --- /dev/null +++ b/src/jogasaki/durability_common.h @@ -0,0 +1,41 @@ +/* + * Copyright 2018-2023 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +#include +#include + +namespace jogasaki { + +/** + * @brief submit a task to process commit response + * @details this function submits a task to invoke commit response callback and scheduler following teardown task + * @param rctx the request context + * @param kind the kind of the commit response + * @param is_error whether the commit response is an error + * @param teardown_try_on_suspended_worker whether to submit teardown on the suspended worker + */ +void submit_commit_response( + std::shared_ptr rctx, + commit_response_kind kind, + bool is_error, + bool teardown_try_on_suspended_worker +); + +} // namespace jogasaki diff --git a/src/jogasaki/executor/executor.cpp b/src/jogasaki/executor/executor.cpp index be54e6ac..f180df04 100644 --- a/src/jogasaki/executor/executor.cpp +++ b/src/jogasaki/executor/executor.cpp @@ -45,6 +45,7 @@ #include #include #include +#include #include #include #include @@ -680,6 +681,14 @@ bool execute_load( return true; } +bool is_last(commit_response_kind_set const& response_kinds, commit_response_kind kind) { + auto f = std::find(response_kinds.begin(), response_kinds.end(), kind); + if(f == response_kinds.end()) { + return false; + } + return ++f == response_kinds.end(); +} + void process_commit_callback( ::sharksfin::StatusCode st, ::sharksfin::ErrorCode ec, @@ -699,23 +708,12 @@ void process_commit_callback( << utils::hex(jobid); auto res = kvs::resolve(st); if(res != status::ok) { - auto& ts = *rctx->scheduler(); - ts.schedule_task( - scheduler::create_custom_task(rctx.get(), [rctx, res]() { - auto msg = utils::create_abort_message(*rctx); - auto code = res == status::err_inactive_transaction ? - error_code::inactive_transaction_exception : - error_code::cc_exception; - set_error( - *rctx, - code, - msg, - res - ); - scheduler::submit_teardown(*rctx); - return model::task_result::complete; - }, false) - ); + auto msg = utils::create_abort_message(*rctx); + auto code = res == status::err_inactive_transaction ? + error_code::inactive_transaction_exception : + error_code::cc_exception; + set_error(*rctx, code, msg, res); + submit_commit_response(rctx, commit_response_kind::accepted, true, false); return; } rctx->transaction()->durability_marker(marker); @@ -727,15 +725,28 @@ void process_commit_callback( VLOG(log_error) << log_location_prefix << "unexpected error destroying transaction: " << rc; } } - auto cr = rctx->transaction()->commit_response(); - if(cr == commit_response_kind::accepted || cr == commit_response_kind::available) { - scheduler::submit_teardown(*rctx); + auto response_kinds = rctx->commit_ctx()->response_kinds(); + bool last_less_equals_accepted = is_last(response_kinds, commit_response_kind::accepted); + if(response_kinds.contains(commit_response_kind::accepted)) { + auto& ts = *rctx->scheduler(); + ts.schedule_task( + scheduler::create_custom_task(rctx.get(), [rctx, last_less_equals_accepted]() { + rctx->commit_ctx()->on_response()(commit_response_kind::accepted); + if(last_less_equals_accepted) { + // if the last response is accepted or requested, we can finish job and clean up resource here + scheduler::submit_teardown(*rctx); + } + return model::task_result::complete; + }, false) + ); + } + if(last_less_equals_accepted) { return; } // commit_response = stored, propagated, or undefined // current marker should have been set at least once on callback registration if(marker <= database.durable_manager()->current_marker()) { - scheduler::submit_teardown(*rctx); + submit_commit_response(rctx, commit_response_kind::stored, false, false); return; } database.durable_manager()->add_to_waitlist(rctx); @@ -744,7 +755,9 @@ void process_commit_callback( scheduler::job_context::job_id_type commit_async( api::impl::database& database, std::shared_ptr tx, //NOLINT(performance-unnecessary-value-param) - error_info_callback on_completion, + commit_response_callback on_response, + commit_response_kind_set response_kinds, + commit_error_callback on_error, api::commit_option option, request_info const& req_info ) { @@ -761,6 +774,8 @@ scheduler::job_context::job_id_type commit_async( req_info, req ); + rctx->commit_ctx(std::make_shared(std::move(on_response), response_kinds, std::move(on_error))); + auto jobid = rctx->job()->id(); std::string txid{tx->transaction_id()}; @@ -769,7 +784,8 @@ scheduler::job_context::job_id_type commit_async( database.config()->default_commit_response(); tx->commit_response(cr); - auto t = scheduler::create_custom_task(rctx.get(), [&database, rctx, jobid, txid, option]() { + auto t = scheduler::create_custom_task(rctx.get(), + [&database, rctx, jobid, txid, option]() { VLOG(log_debug_timing_event) << "/:jogasaki:timing:committing " << txid << " job_id:" @@ -786,7 +802,7 @@ scheduler::job_context::job_id_type commit_async( }); return model::task_result::complete; }, true); - rctx->job()->callback([on_completion=std::move(on_completion), rctx, jobid, txid, req_info](){ // callback is copy-based + rctx->job()->callback([rctx, jobid, txid, req_info](){ // callback is copy-based VLOG(log_debug_timing_event) << "/:jogasaki:timing:committed " << txid << " job_id:" @@ -808,7 +824,6 @@ scheduler::job_context::job_id_type commit_async( rctx->transaction()->duration().count(), rctx->transaction()->label() ); - on_completion(rctx->status_code(), rctx->error_info()); }); std::weak_ptr wrctx{rctx}; rctx->job()->completion_readiness([wrctx=std::move(wrctx)]() { @@ -826,6 +841,39 @@ scheduler::job_context::job_id_type commit_async( return jobid; } +scheduler::job_context::job_id_type commit_async( + api::impl::database& database, + std::shared_ptr tx, //NOLINT(performance-unnecessary-value-param) + error_info_callback on_completion, //NOLINT(performance-unnecessary-value-param) + api::commit_option option, + request_info const& req_info +) { + auto cr = option.commit_response() != commit_response_kind::undefined ? + option.commit_response() : + database.config()->default_commit_response(); + commit_response_kind_set responses{}; + if(cr == commit_response_kind::accepted || cr == commit_response_kind::available) { + // currently accepted and available are treated the same + responses.insert(commit_response_kind::accepted); + } + if(cr == commit_response_kind::stored || cr == commit_response_kind::propagated) { + responses.insert(commit_response_kind::stored); + } + return commit_async( + database, + std::move(tx), + [on_completion](commit_response_kind) { + on_completion(status::ok, std::make_shared()); + }, + responses, + [on_completion](commit_response_kind, status st, std::shared_ptr error) { + on_completion(st, std::move(error)); + }, + option, + req_info + ); +} + status create_transaction( api::impl::database& db, std::shared_ptr& out, diff --git a/src/jogasaki/executor/executor.h b/src/jogasaki/executor/executor.h index abe0bb13..27e814ee 100644 --- a/src/jogasaki/executor/executor.h +++ b/src/jogasaki/executor/executor.h @@ -90,7 +90,7 @@ status commit( * @param tx the transaction used to execute the request * @param on_completion callback on completion of commit * @param option commit options - * @param info exchange the original request/response info (mainly for logging purpose) + * @param req_info exchange the original request/response info (mainly for logging purpose) * @return id of the job to execute commit * @note normal error such as SQL runtime processing failure will be reported by callback */ @@ -99,7 +99,30 @@ scheduler::job_context::job_id_type commit_async( std::shared_ptr tx, error_info_callback on_completion, api::commit_option option, - request_info const& info + request_info const& req_info +); + +/** + * @brief commit the transaction asynchronously + * @param database the database to request execution + * @param tx the transaction used to execute the request + * @param on_response callback invoked when commit successfully makes progress to the point + * where `response_kinds` indicates + * @param response_kinds the commit response points to invoke the `on_response` callback + * @note currently, only `commit_response_kind::accepted` and `commit_response_kind::stored` are supported + * @param on_error callback invoked when commit fails + * @param option commit options + * @param req_info exchange the original request/response info (mainly for logging purpose) + * @return id of the job to execute commit + */ +scheduler::job_context::job_id_type commit_async( + api::impl::database& database, + std::shared_ptr tx, + commit_response_callback on_response, + commit_response_kind_set response_kinds, + commit_error_callback on_error, + api::commit_option option, + request_info const& req_info ); /** diff --git a/src/jogasaki/request_context.cpp b/src/jogasaki/request_context.cpp index e7bde0c0..5e722178 100644 --- a/src/jogasaki/request_context.cpp +++ b/src/jogasaki/request_context.cpp @@ -229,4 +229,12 @@ void request_context::req_info(request_info req_info) noexcept { req_info_ = std::move(req_info); } +std::shared_ptr const& request_context::commit_ctx() const noexcept { + return commit_ctx_; +} + +void request_context::commit_ctx(std::shared_ptr arg) noexcept { + commit_ctx_ = std::move(arg); +} + } // namespace jogasaki diff --git a/src/jogasaki/request_context.h b/src/jogasaki/request_context.h index 3f837aac..54ca52a0 100644 --- a/src/jogasaki/request_context.h +++ b/src/jogasaki/request_context.h @@ -25,6 +25,7 @@ #include #include +#include #include #include #include @@ -257,6 +258,17 @@ class cache_align request_context { */ void req_info(request_info req_info) noexcept; + /** + * @brief accessor for the commit context + * @return request_info object + */ + [[nodiscard]] std::shared_ptr const& commit_ctx() const noexcept; + + /** + * @brief setter for commit context + */ + void commit_ctx(std::shared_ptr arg) noexcept; + private: std::shared_ptr config_{std::make_shared()}; std::shared_ptr request_resource_{}; @@ -279,6 +291,7 @@ class cache_align request_context { std::shared_ptr stats_{}; request_info req_info_{}; + std::shared_ptr commit_ctx_{}; }; /**