Skip to content

Commit

Permalink
stack-pin the FutureGroup
Browse files Browse the repository at this point in the history
  • Loading branch information
yoshuawuyts committed Mar 20, 2024
1 parent f6e63b4 commit 7ba36fa
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 7 deletions.
3 changes: 1 addition & 2 deletions src/concurrent_stream/for_each.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use futures_lite::StreamExt;
use pin_project::pin_project;

use super::{Consumer, ConsumerState};
use alloc::boxed::Box;
use alloc::sync::Arc;
use core::future::Future;
use core::marker::PhantomData;
Expand Down Expand Up @@ -60,7 +59,7 @@ where
{
type Output = ();

async fn send(mut self: Pin<&mut Self>, future: FutT) -> super::ConsumerState {
async fn send(self: Pin<&mut Self>, future: FutT) -> super::ConsumerState {
let mut this = self.project();
// If we have no space, we're going to provide backpressure until we have space
while this.count.load(Ordering::Relaxed) >= *this.limit {
Expand Down
1 change: 0 additions & 1 deletion src/concurrent_stream/from_concurrent_stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use super::{ConcurrentStream, Consumer, ConsumerState, IntoConcurrentStream};
use crate::future::FutureGroup;
use alloc::boxed::Box;
use alloc::vec::Vec;
use core::future::Future;
use core::pin::Pin;
Expand Down
9 changes: 5 additions & 4 deletions src/concurrent_stream/from_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ where
C: Consumer<Self::Item, Self::Future>,
{
let mut iter = pin!(self.stream);
let mut consumer = pin!(consumer);

// Concurrently progress the consumer as well as the stream. Whenever
// there is an item from the stream available, we submit it to the
Expand All @@ -53,7 +54,7 @@ where

// Drive the consumer forward
let b = async {
let control_flow = consumer.progress().await;
let control_flow = consumer.as_mut().progress().await;
State::Progress(control_flow)
};

Expand All @@ -64,14 +65,14 @@ where
ConsumerState::Break => break,
ConsumerState::Continue => continue,
ConsumerState::Empty => match iter.next().await {
Some(item) => match consumer.send(ready(item)).await {
Some(item) => match consumer.as_mut().send(ready(item)).await {
ConsumerState::Break => break,
ConsumerState::Empty | ConsumerState::Continue => continue,
},
None => break,
},
},
State::Item(Some(item)) => match consumer.send(ready(item)).await {
State::Item(Some(item)) => match consumer.as_mut().send(ready(item)).await {
ConsumerState::Break => break,
ConsumerState::Empty | ConsumerState::Continue => continue,
},
Expand All @@ -81,7 +82,7 @@ where

// We will no longer receive items from the underlying stream, which
// means we're ready to wait for the consumer to finish up.
consumer.flush().await
consumer.as_mut().flush().await
}

fn concurrency_limit(&self) -> Option<NonZeroUsize> {
Expand Down

0 comments on commit 7ba36fa

Please sign in to comment.