Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoC: async_generator as a stream #596

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

georgikoyrushki95
Copy link
Contributor

Hi, I reached out before Christmas to check if there are any async streams in unifex (#586). In the discussion within the issue, we established it'd be good to have a coroutine-based async stream, similar to cppcoro::async_generator.

In this PR, I have added a small PoC that achieves part of the requirements, listed by @ispeters (list with checbkoxes below). I would appreciate some feedback & see if you guys are interested in seeing something like this in unifex.

A summary of the requirements for an async_generator + whether or not they are addressed in this PR:

  • Support co_await and co_yield inside the generator coroutine.
    ❗ This achieved already by cppcoro::async_generator

  • Feel like a unifex::task<>

    • Scheduler affinity
      ❗ In short:
      (a) resuming the generator resumes it in the scheduler it was running before suspension;
      (b) resuming the consumer coroutine resumes it on the scheduler it was running before co_await-ing the generator.
    • Support awaiting senders
      ❗ Achieved via await_transform customization in the generator promise, similar to unifex::task<>
    • Support unwind-on-cancellation semantics / stop tokens
      ❗ I did not look into that. Looks like it might be too much for 1 PR.
    • noexcept version of it
      ❗ Shouldn't be too much to add to this PR. Can do, if there is interest
  • Ability to let callers opt-out of scheduler affinity. This is achieved in unifex::task<> by having this struct _sa_task.
    ❗ Have not looked into that

At a high-level, the generator supports next() and cleanup() (so it is a stream in itself). In addition, I did not expose begin(), end(). The reason for this: it forces clients to use iterator-based for loops, at least without something like co_await for_each(...) construct in the standard. To me, iterator-based for loops are quite archaic & error-prone, so I would prefer not to expose something like that.

High-level skeleton might be summarized as:

template <typename T>
struct async_generator {
   // from the consumer's POV, expose it ONLY as a stream.
   /* ... */ next();
   /* ... */ cleanup();
private:
  // Private begin()/end(), used within next()
   /* ... */ begin();
   /* ... */ end();
};

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Jan 12, 2024
@georgikoyrushki95 georgikoyrushki95 changed the title PoC: async_generator as a stream #586 PoC: async_generator as a stream Jan 12, 2024
* limitations under the License.
*/

// A lot of stuff from cppcoro
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of this stuff is copied from cppcoro. Not too sure how to reflect this here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lewissbaker, do you have any insight into how we should handle your copyrights here?

@ispeters
Copy link
Contributor

Sorry for the delay on this review; I spent some time looking at it with @janondrusek yesterday and we came up with a few high-level thoughts that I think you'll need to try to address before we can give useful detailed feedback.

First, we agree with your "general feeling" re: implementing the generator in terms of an iterator-based interface. If we had two missing pieces:

  • the read() algorithm from P2300, which returns a Sender that produces the result of invoking a given CPO on the Receiver; and
  • an any_receiver_ref<T&> in the generator's promise_type;

then you could implement next() thusly:

  auto next() noexcept {
    // grab the receiver's scheduler; assume that the next-sender
    // is always started on the corresponding context
    return read(get_scheduler)
      | let_value([this](auto sched) {
        if (!scheduler_) {
          // capture the receiver's scheduler as the stream's
          // scheduler on the first run of the next-sender
          scheduler_ = sched;
        }

        // check to see if we're currently running on the saved scheduler
        return just_void_or_done(scheduler_ == get_scheduler(receiver))
            // get back on the desired scheduler when we're not already there
            | let_done([this]() { return schedule(scheduler_); })
            | let_value([this]() {
              // once we're on the right scheduler, use create<>()
              // to resume the generator's coroutine_handle<> after
              // saving the create-receiver in the promise so we can
              // complete create-sender from within the generator
              return create<T&>([this](auto& rec) {
                // receiver_ is an any_receiver_ref<T>
                coroutine_.promise().receiver_ = rec;
                coroutine_.resume();
              });
            });  
      });
  }

With the above implementation, you can invoke promise().receiver_.set_value(v) (or ….set_error(e)) from a co_yield expression, and promise().receiver_.set_done() from the final_awaiter, which would map the generator's state transitions to the expected results of the next-sender pretty cleanly.

In the absence of read(), you'd need to write your own Sender type so you can hook connect to get access to the Receiver's Scheduler; if you want to implement read() first, I'd be happy to merge that PR.

It's not obvious to me whether create<>() is the best way to implement that part of the next-sender. I like that it's reusing existing code rather than adding yet another hand-rolled Sender, but I wonder if there'd be efficiencies in hand-rolling that part.

Second, I think the current at_coroutine_exit integration is probably doing something surprising; I suspect it runs the registered callbacks on the next co_yield or co_return, whichever comes first, but I'd want registered coroutines to only run when the generator is destroyed/completed.

at_coroutine_exit works by inserting the registered at-exit coroutine into the async stack so that, when the current coroutine goes looking for a continuation to resume it finds the at-exit coroutine instead of the actual caller.

Suppose you have this:

task<void> bar() {
  co_return;
}

task<void> foo() {
  co_await bar();
}

If you suspend the process just before co_return is about to end the bar() coroutine, you'd see that the promise associated with the bar() coroutine has a continuation_ that points to the foo() coroutine. So you have this stack:

bar()
foo()

When bar() returns, it looks at the promise, finds the foo() handle, and resumes it.

If you change bar() like so:

task<void> bar() {
  co_await at_coroutine_exit([]() -> task<void> {
    co_return;
  });

  // breakpoint here
}

and then stop the debugger on the breakpoint here comment, the "stack" will look like this:

bar()
<lambda>
foo()

so, when bar() returns, it'll activate the registered at-exit coroutine, which will then activate foo() upon returning.

With your current implementation, the generator's continuation is whoever is waiting for the result of next() so inserting at-exit coroutines in the default way means running them before yielding the next value.

Instead, I think the generator should have an initially-empty coroutine_handle<> that represents the top of a stack of clean-up actions. Awaiting at_coroutine_exit from within the generator should push the given at-exit coroutine onto this stack. The cleanup-sender should then run whatever clean-up actions in finds in the stack when the Stream is cleaned up.

Third, I regret the way we implemented the magic co_await schedule(s) semantics that reschedules the current coroutine on the new Scheduler and updates the coroutine's concept of which Scheduler it is affine to. We've left the current implementation in place because it's being used internally (and, presumably, externally), but we'd like to migrate to something that's more explicit and less magic. You might notice that we explicitly don't support co_await schedule(s) from a nothrow_task<>. I would prefer not to spread the regrettable implementation choice to this new coroutine type so, instead of copying the behaviour from task<>, could you please copy the static_assert behaviour from nothrow_task<>? It'll mean you'll need to express some of your scheduler-affinity checks differently, but I think it's the better choice in the long run.

Finally, we should be able to avoid the use of an async_scope within the generator. I think that the above-suggested changes to how to implement next() means that the reschedule operation state will be optionally embedded in the next-sender's operation state, which moots this whole line of thought but, if not, we should be able to directly connect and start an operation state in some inline storage somewhere rather than relying on spawn_detached to heap-allocate one for you.

Regarding the questions in your checklist:

  • Support co_await and co_yield inside the generator coroutine.
    ❗ This achieved already by cppcoro::async_generator

Agreed; looks good.

  • Feel like a unifex::task<>
    • Scheduler affinity
      ❗ In short:
      (a) resuming the generator resumes it in the scheduler it was running before suspension;
      (b) resuming the consumer coroutine resumes it on the scheduler it was running before co_await-ing the generator.
    • Support awaiting senders
      ❗ Achieved via await_transform customization in the generator promise, similar to unifex::task<>
    • Support unwind-on-cancellation semantics / stop tokens
      ❗ I did not look into that. Looks like it might be too much for 1 PR.
    • noexcept version of it
      ❗ Shouldn't be too much to add to this PR. Can do, if there is interest

Mostly agreed.

A nothrow_generator is probably useful for binary size-sensitive consumers (terminating on exception tends to save a lot of overhead), but it could wait for another PR.

I think unwind-on-cancellation might need to be implemented in the first draft. Without unwind-on-cancellation, I presume that awaiting a Sender that completes with set_done() would either terminate or continue as if it succeeded and I think both outcomes are bad. Terminating on cancellation would avoid continuing execution in an undefined state, but would be a very disappointing surprise; continuing past co_await foo() when foo()'s sender cancels would mean that the post-conditions of foo() are not met yet we're continuing anyway—an even more disappointing surprise.

  • Ability to let callers opt-out of scheduler affinity. This is achieved in unifex::task<> by having this struct _sa_task.
    ❗ Have not looked into that

_sa_task isn't really for opting out of scheduler affinity; an _sa_task<> is a task<> that assumes its caller enforces the scheduler-affinity invariants and so doesn't do any enforcement itself. I'm not sure whether the scheduler-affinity invariants need updating for generators because the flow of control is quite different from that of normal tasks. One thing that task<> does that we haven't yet contemplated in this generator design is thunk stop requests from the receiver onto the "right" scheduler so that any descendant Senders that complete synchronously from a stop request do so on the right context.

@georgikoyrushki95 georgikoyrushki95 marked this pull request as draft January 26, 2024 10:09
@georgikoyrushki95
Copy link
Contributor Author

Putting it as draft as it will take some time :)

@georgikoyrushki95
Copy link
Contributor Author

@ispeters started looking into this again. I am having a little trouble instantiating an any_receiver_ref<T&> within create<T&>(...). This seems to be the constructor for it. Would appreciate some help, a few of the things I tried:

return create<T&>([this](auto& rec) {
    any_receiver_ref<T&> r1{inplace_stop_token{}, &rec};
    any_receiver_ref<T&> r2{get_stop_token(rec), &rec};
    // below obviously doesn't work because the constructor expects 2 args
    any_receiver_ref<T&> r2{&rec};
}

@ispeters
Copy link
Contributor

@ispeters started looking into this again. I am having a little trouble instantiating an any_receiver_ref<T&> within create<T&>(...). This seems to be the constructor for it. Would appreciate some help, a few of the things I tried:

return create<T&>([this](auto& rec) {
    any_receiver_ref<T&> r1{inplace_stop_token{}, &rec};
    any_receiver_ref<T&> r2{get_stop_token(rec), &rec};
    // below obviously doesn't work because the constructor expects 2 args
    any_receiver_ref<T&> r2{&rec};
}

What kind of errors are you getting? I managed to wrap the create<>() "receiver" in an any_receiver_ref<int&> here and I can't tell what I'm doing differently from what you tried.

This would be better if you plan to actually use the caller's stop token, though.

@georgikoyrushki95
Copy link
Contributor Author

georgikoyrushki95 commented Mar 15, 2024

Just pushed this commit, addressing some of the comments. Gonna write up a quick progress update below:

  • Implemented next() as suggested by @ispeters in this comment.
    • Instead of adding the generic read() from P2300, I just added a read_scheduler() algorithm, private to the generator. The idea was to progress with the rest of the stuff for now - the generic read() we can add towards the end.
  • Removed usages of the async_scope and at_coroutine_exit that I used for the scheduler affinity invariants. Those, it seems, can fully be implemented via the sender / receiver protocol:
    • On resumption of the generator from the consumer (i.e. co_await gen.next()), we reschedule on the correct executor (if necessary).
    • On yielding of the generator - we have a custom receiver for unifex::schedule (reschedule_receiver), that "awakes" the consumer once back on the correct scheduler. This is also used in final_suspend.
  • Unwind on cancellation as per @ispeters comment (I think unwind-on-cancellation might need to be implemented in the first draft. Without unwind-on-cancellation, I presume that awaiting a Sender that completes with set_done()...) is implemented and there is a test for this (await_just_done_in_async_generator).

Things that still need to be done:

  • Drop support for co_await schedule(...) within the generator, as described in this PR comment and the issue as well.

    I currently support co_await schedule(...) just like in task<>

  • The equivalent of _sa_task for the generator - basically that assumes its caller enforces the scheduler-affinity invariants and so doesn't do any enforcement itself

  • This (I haven't delved into it too much): One thing that task<> does that we haven't yet contemplated in this generator design is thunk stop requests from the receiver onto the "right" scheduler so that any descendant Senders that complete synchronously from a stop request do so on the right context.

  • Generally cancellations as described in P2300.

  • no-throw version of the generator

Copy link
Contributor

@ispeters ispeters left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like where this is going! Thanks for persisting and sorry the review turn-around is so long.

// to resume the generator's coroutine_handle<> after
// saving the create-receiver in the promise so we can
// complete create-sender from within the generator
return create<T>([this](auto& rec) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should use create<T&> here. The values generated by this generator will all be passed to the next-receiver using code like this:

co_yield <expr>;

so the lifetime of <expr> will span the suspend point created by the co_yield. You can therefore efficiently and safely pass <expr> as an lvalue reference through to the next-receiver.

Suggested change
return create<T>([this](auto& rec) {
return create<T&>([this](auto& rec) {

Comment on lines +323 to +325
// Potential problem: not sure if get_scheduler(gen.next()) would return
// the right thing here. Perhaps we need a wrapper sender that also records
// each next sender's scheduler and customizes get_scheduler(...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a problem. P2300 has the concept of a "completion scheduler" that senders may report, but Unifex doesn't have that concept. You can't pass a sender to unifex::get_scheduler(), only a receiver.

Comment on lines +314 to +316
async_generator& operator=(async_generator&& other) noexcept {
async_generator temp(std::move(other));
swap(temp);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eric Niebler taught me that you can do this in less code by receiving the argument by value.

Suggested change
async_generator& operator=(async_generator&& other) noexcept {
async_generator temp(std::move(other));
swap(temp);
async_generator& operator=(async_generator other) noexcept {
swap(other);

Comment on lines +118 to +151
template <typename T>
class async_generator_yield_operation {
public:
using value_type = std::remove_reference_t<T>;

async_generator_yield_operation(std::optional<value_type> value = {}) noexcept
: value_{std::move(value)} {}

bool await_ready() const noexcept { return false; }

template <typename Promise>
void await_suspend([[maybe_unused]] unifex::coro::coroutine_handle<Promise>
genCoro) noexcept {
const auto& consumerSched = genCoro.promise().consumerSched_;
if (unifex::get_scheduler(genCoro.promise()) != consumerSched) {
genCoro.promise().rescheduleOpSt_ = unifex::connect(
unifex::schedule(consumerSched),
reschedule_receiver<Promise>{std::move(value_), genCoro});
unifex::start(*genCoro.promise().rescheduleOpSt_);
return;
}

if (value_) {
unifex::set_value(std::move(*genCoro.promise().receiverOpt_), *value_);
} else {
unifex::set_done(std::move(*genCoro.promise().receiverOpt_));
}
}

void await_resume() noexcept {}

private:
std::optional<value_type> value_;
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Below, I argue that you can rely on the generator generating values with code like

co_yield <expr>;

which means that <expr> survives across a suspend point. I think this means you can just store the address of that value here.

Suggested change
template <typename T>
class async_generator_yield_operation {
public:
using value_type = std::remove_reference_t<T>;
async_generator_yield_operation(std::optional<value_type> value = {}) noexcept
: value_{std::move(value)} {}
bool await_ready() const noexcept { return false; }
template <typename Promise>
void await_suspend([[maybe_unused]] unifex::coro::coroutine_handle<Promise>
genCoro) noexcept {
const auto& consumerSched = genCoro.promise().consumerSched_;
if (unifex::get_scheduler(genCoro.promise()) != consumerSched) {
genCoro.promise().rescheduleOpSt_ = unifex::connect(
unifex::schedule(consumerSched),
reschedule_receiver<Promise>{std::move(value_), genCoro});
unifex::start(*genCoro.promise().rescheduleOpSt_);
return;
}
if (value_) {
unifex::set_value(std::move(*genCoro.promise().receiverOpt_), *value_);
} else {
unifex::set_done(std::move(*genCoro.promise().receiverOpt_));
}
}
void await_resume() noexcept {}
private:
std::optional<value_type> value_;
};
template <typename T>
class async_generator_yield_operation {
public:
using value_type = std::remove_reference_t<T>;
async_generator_yield_operation() noexcept
: value_{nullptr} {}
explicit async_generator_yield_operation(value_type& value) noexcept
: value_{&value} {}
bool await_ready() const noexcept { return false; }
template <typename Promise>
void await_suspend([[maybe_unused]] unifex::coro::coroutine_handle<Promise>
genCoro) noexcept {
const auto& consumerSched = genCoro.promise().consumerSched_;
if (unifex::get_scheduler(genCoro.promise()) != consumerSched) {
genCoro.promise().rescheduleOpSt_ = unifex::connect(
unifex::schedule(consumerSched),
reschedule_receiver<Promise>{value_, genCoro});
unifex::start(*genCoro.promise().rescheduleOpSt_);
return;
}
if (value_) {
unifex::set_value(std::move(*genCoro.promise().receiverOpt_), *value_);
} else {
unifex::set_done(std::move(*genCoro.promise().receiverOpt_));
}
}
void await_resume() noexcept {}
private:
value_type* value_;
};


template <typename Promise>
struct reschedule_receiver {
std::optional<typename Promise::value_type> value_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See below for why I think this is safe.

Suggested change
std::optional<typename Promise::value_type> value_;
typename Promise::value_type* value_;

Comment on lines +189 to +195
// This is needed for at_coroutine_exit to do the async clean up
friend unifex::continuation_handle<> tag_invoke(
const unifex::tag_t<unifex::exchange_continuation>&,
async_generator_promise& p,
unifex::continuation_handle<> action) noexcept {
return std::exchange(p.continuation_, std::move(action));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the wrong implementation for at_coroutine_exit. Instead, the generator object should have an initially-null coroutine_handle<> that points to the top of a stack of continuations to be awaited in cleanup(). Invoking at_coroutine_exit inside a generator should push the new coroutine onto the stack. Then, cleanup() should run the stack of continuations (if it's non-empty) before completing with done.

Alternatively, perhaps the handle shouldn't be initially-null, but the result of invoking this coroutine:

nothrow_task<void> cleanup() {
  co_await just_done();
}

then you always have a non-empty stack.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean and also your very original comment explains the same thing (some stuff starts making sense after a while with this code base... 😄).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Invoking at_coroutine_exit inside a generator should push the new coroutine onto the stack.

I am unclear how this bit we can do. at_coroutine_exit doesn't seem to be a customization point right now. I might be missing something, but even if it is a CP, I don't see a way we can customize it on it being invoked inside a generator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% certain this is going to work, but here's how I think it'll work.

at_coroutine_exit returns a _cleanup_task<>. _cleanup_task<>'s implementation of await_suspend invokes exchange_continuation on the calling coroutine's promise. So, I think you can implement exchange_continuation in such a way that each call pushes another clean-up task onto the stack of things to resume from cleanup().

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants