From 7ba36fad246b75ac556d87d8ee3f6a7126f172e7 Mon Sep 17 00:00:00 2001 From: Yosh Date: Wed, 20 Mar 2024 23:26:04 +0100 Subject: [PATCH] stack-pin the `FutureGroup` --- src/concurrent_stream/for_each.rs | 3 +-- src/concurrent_stream/from_concurrent_stream.rs | 1 - src/concurrent_stream/from_stream.rs | 9 +++++---- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/concurrent_stream/for_each.rs b/src/concurrent_stream/for_each.rs index 03c8711..89d1ea6 100644 --- a/src/concurrent_stream/for_each.rs +++ b/src/concurrent_stream/for_each.rs @@ -3,7 +3,6 @@ use futures_lite::StreamExt; use pin_project::pin_project; use super::{Consumer, ConsumerState}; -use alloc::boxed::Box; use alloc::sync::Arc; use core::future::Future; use core::marker::PhantomData; @@ -60,7 +59,7 @@ where { type Output = (); - async fn send(mut self: Pin<&mut Self>, future: FutT) -> super::ConsumerState { + async fn send(self: Pin<&mut Self>, future: FutT) -> super::ConsumerState { let mut this = self.project(); // If we have no space, we're going to provide backpressure until we have space while this.count.load(Ordering::Relaxed) >= *this.limit { diff --git a/src/concurrent_stream/from_concurrent_stream.rs b/src/concurrent_stream/from_concurrent_stream.rs index a3597fd..a9f6971 100644 --- a/src/concurrent_stream/from_concurrent_stream.rs +++ b/src/concurrent_stream/from_concurrent_stream.rs @@ -1,6 +1,5 @@ use super::{ConcurrentStream, Consumer, ConsumerState, IntoConcurrentStream}; use crate::future::FutureGroup; -use alloc::boxed::Box; use alloc::vec::Vec; use core::future::Future; use core::pin::Pin; diff --git a/src/concurrent_stream/from_stream.rs b/src/concurrent_stream/from_stream.rs index 5e13c6e..4ec60b8 100644 --- a/src/concurrent_stream/from_stream.rs +++ b/src/concurrent_stream/from_stream.rs @@ -33,6 +33,7 @@ where C: Consumer, { let mut iter = pin!(self.stream); + let mut consumer = pin!(consumer); // Concurrently progress the consumer as well as the stream. Whenever // there is an item from the stream available, we submit it to the @@ -53,7 +54,7 @@ where // Drive the consumer forward let b = async { - let control_flow = consumer.progress().await; + let control_flow = consumer.as_mut().progress().await; State::Progress(control_flow) }; @@ -64,14 +65,14 @@ where ConsumerState::Break => break, ConsumerState::Continue => continue, ConsumerState::Empty => match iter.next().await { - Some(item) => match consumer.send(ready(item)).await { + Some(item) => match consumer.as_mut().send(ready(item)).await { ConsumerState::Break => break, ConsumerState::Empty | ConsumerState::Continue => continue, }, None => break, }, }, - State::Item(Some(item)) => match consumer.send(ready(item)).await { + State::Item(Some(item)) => match consumer.as_mut().send(ready(item)).await { ConsumerState::Break => break, ConsumerState::Empty | ConsumerState::Continue => continue, }, @@ -81,7 +82,7 @@ where // We will no longer receive items from the underlying stream, which // means we're ready to wait for the consumer to finish up. - consumer.flush().await + consumer.as_mut().flush().await } fn concurrency_limit(&self) -> Option {