Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] get_completion_scheduler query for senders as of project goals #415

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ build/
/Folder.DotSettings.user
/compile_commands.json
/.cache
/.ccls-cache
27 changes: 27 additions & 0 deletions include/unifex/completion_channels.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) Rishabh Dwivedi <[email protected]>
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <unifex/detail/prologue.hpp>

namespace unifex {
struct set_value_t {};
struct set_error_t {};
struct set_done_t {};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should be the types of the set_value, set_error, and set_done CPOs, not unrelated structs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ericniebler Just made the changes you suggested. Please review it, if I have done the right changes or got something wrong.

} // namespace unifex

#include <unifex/detail/epilogue.hpp>
78 changes: 78 additions & 0 deletions include/unifex/get_completion_scheduler.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (c) Rishabh Dwivedi <[email protected]>
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <unifex/config.hpp>
#include <unifex/completion_channels.hpp>
#include <unifex/scheduler_concepts.hpp>
#include <unifex/sender_concepts.hpp>
#include <unifex/std_concepts.hpp>
#include <unifex/tag_invoke.hpp>

#include <type_traits>

#include <unifex/detail/prologue.hpp>

namespace unifex {
namespace _get_completion_scheduler {

template <typename CPO>
struct _fn;

template <>
struct _fn<set_value_t> {
template (typename Sender)
(requires sender<Sender> AND
is_nothrow_tag_invocable_v<_fn, Sender const&> AND
scheduler<tag_invoke_result_t<_fn, Sender const&>>)
constexpr auto operator()(const Sender& s) const noexcept {
return tag_invoke(*this, s);
}
};

template <>
struct _fn<set_error_t> {
template (typename Sender)
(requires sender<Sender> AND
is_nothrow_tag_invocable_v<_fn, Sender const&> AND
scheduler<tag_invoke_result_t<_fn, Sender const&>>)
constexpr auto operator()(Sender const& sender) const noexcept
-> tag_invoke_result_t<_fn, Sender const&> {
return tag_invoke(*this, sender);
}
};

template <>
struct _fn<set_done_t> {
template (typename Sender)
(requires sender<Sender> AND
is_nothrow_tag_invocable_v<_fn, Sender const&> AND
scheduler<tag_invoke_result_t<_fn, Sender const&>>)
constexpr auto operator()(Sender const& sender) const noexcept
-> tag_invoke_result_t<_fn, Sender const&> {
return tag_invoke(*this, sender);
}
};

} // namespace _get_completion_scheduler

template <typename CPO>
inline _get_completion_scheduler::_fn<CPO> get_completion_scheduler{};

} // namespace unifex

#include <unifex/detail/epilogue.hpp>
61 changes: 36 additions & 25 deletions include/unifex/new_thread_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
*/
#pragma once

#include <unifex/completion_channels.hpp>
#include <unifex/get_completion_scheduler.hpp>
#include <unifex/get_stop_token.hpp>
#include <unifex/receiver_concepts.hpp>
#include <unifex/tag_invoke.hpp>

#include <atomic>
#include <condition_variable>
Expand All @@ -39,18 +42,17 @@ using operation = typename _op<remove_cvref_t<Receiver>>::type;

template <typename Receiver>
class _op<Receiver>::type final {
public:
public:
template <typename Receiver2>
explicit type(context* ctx, Receiver2&& r)
: ctx_(ctx), receiver_((Receiver2&&)r) {}
: ctx_(ctx)
, receiver_((Receiver2 &&) r) {}

~type() {
UNIFEX_ASSERT(!thread_.joinable());
}
~type() { UNIFEX_ASSERT(!thread_.joinable()); }

void start() & noexcept;

private:
private:
void run() noexcept;

context* ctx_;
Expand All @@ -65,23 +67,34 @@ class context {
template <class Receiver>
friend struct _op;

class scheduler;

class schedule_sender {
public:
template <template <typename...> class Variant,
template <typename...> class Tuple>
template <
template <typename...>
class Variant,
template <typename...>
class Tuple>
using value_types = Variant<Tuple<>>;

template <template <typename...> class Variant>
using error_types = Variant<std::exception_ptr>;

static constexpr bool sends_done = true;

explicit schedule_sender(context* ctx) noexcept
: context_(ctx) {}
explicit schedule_sender(context* ctx) noexcept : context_(ctx) {}

template <typename Receiver>
operation<Receiver> connect(Receiver&& r) const {
return operation<Receiver>{context_, (Receiver&&)r};
return operation<Receiver>{context_, (Receiver &&) r};
}

template <typename CPO>
friend auto tag_invoke(
_get_completion_scheduler::_fn<CPO>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to write tag_t<get_completion_scheduler<CPO>> instead of _get_completion_scheduler::_fn<CPO>.
However, when I am doing that, it is resulting me to compile error that effectively says that the function declared here is not available to call.
I can confirm that

static_assert(same_as<_get_completion_scheduler::_fn<set_value_t>, tag_t<get_completion_scheduler<set_value_t>>>);

the above static_assert works.

I need some C++ experts suggestions here, that what is exactly getting wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been playing with this and I can't get it to work either. I don't understand enough about the template interactions here to spot what is going wrong. This might benefit from @ericniebler's greater expertise.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The get_completion_scheduler CPOs must be const or constexpr.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ericniebler I have marked get_completion_scheduler to be constexpr variable.... however, still it has the same problem. Am I missing something here?

schedule_sender const& sender) noexcept -> scheduler {
return scheduler{sender.context_};
}

private:
Expand Down Expand Up @@ -110,9 +123,9 @@ class context {
context() = default;

~context() {
// The activeThreadCount_ counter is initialised to 1 so it will never get to
// zero until after enter the destructor and decrement the last count here.
// We do this so that the retire_thread() call doesn't end up calling
// The activeThreadCount_ counter is initialised to 1 so it will never get
// to zero until after enter the destructor and decrement the last count
// here. We do this so that the retire_thread() call doesn't end up calling
// into the cv_.notify_one() until we are about to start waiting on the
// cv.
activeThreadCount_.fetch_sub(1, std::memory_order_relaxed);
Expand All @@ -126,9 +139,7 @@ class context {
}
}

scheduler get_scheduler() noexcept {
return scheduler{this};
}
scheduler get_scheduler() noexcept { return scheduler{this}; }

private:
void retire_thread(std::thread t) noexcept {
Expand Down Expand Up @@ -170,7 +181,8 @@ inline void _op<Receiver>::type::start() & noexcept {
// we ensure the count increment happens before the count decrement that
// is performed when the thread is being retired.
ctx_->activeThreadCount_.fetch_add(1, std::memory_order_relaxed);
} UNIFEX_CATCH (...) {
}
UNIFEX_CATCH(...) {
unifex::set_error(std::move(receiver_), std::current_exception());
}
}
Expand All @@ -191,29 +203,28 @@ inline void _op<Receiver>::type::run() noexcept {
// inside start().
//
// TODO: This can be replaced with an atomic<bool>::wait() once we have
// access to C++20 atomics. This would eliminate the unnecessary synchronisation
// performed by the unlock() at end-of-scope here.
// access to C++20 atomics. This would eliminate the unnecessary
// synchronisation performed by the unlock() at end-of-scope here.
std::lock_guard opLock{mut_};
thisThread = std::move(thread_);
}

if (get_stop_token(receiver_).stop_requested()) {
unifex::set_done(std::move(receiver_));
} else {
UNIFEX_TRY {
unifex::set_value(std::move(receiver_));
} UNIFEX_CATCH (...) {
UNIFEX_TRY { unifex::set_value(std::move(receiver_)); }
UNIFEX_CATCH(...) {
unifex::set_error(std::move(receiver_), std::current_exception());
}
}

ctx->retire_thread(std::move(thisThread));
}

} // namespace _new_thread
} // namespace _new_thread

using new_thread_context = _new_thread::context;

} // namespace unifex
} // namespace unifex

#include <unifex/detail/epilogue.hpp>
30 changes: 30 additions & 0 deletions test/get_completion_scheduler_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) Rishabh Dwivedi <[email protected]>
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <unifex/get_completion_scheduler.hpp>
#include <unifex/new_thread_context.hpp>
#include <unifex/type_traits.hpp>

#include <gtest/gtest.h>

TEST(GetCompletionScheduler, NewThreadScheduler) {
unifex::new_thread_context ctx;
auto sch = ctx.get_scheduler();
auto sender = unifex::schedule(sch);
static_assert(unifex::same_as<decltype(sch), decltype(unifex::get_completion_scheduler<unifex::set_value_t>(sender))>);
static_assert(unifex::same_as<decltype(sch), decltype(unifex::get_completion_scheduler<unifex::set_error_t>(sender))>);
static_assert(unifex::same_as<decltype(sch), decltype(unifex::get_completion_scheduler<unifex::set_done_t>(sender))>);
}