Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent stream #164

Merged
merged 49 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
4fc3a9a
start implementing a concurrent `for_each`
yoshuawuyts Feb 16, 2024
b15a413
Update mod.rs
yoshuawuyts Feb 16, 2024
0531290
notes on ordering
yoshuawuyts Feb 16, 2024
ded646f
allow concurrent stream to operate on an async closure
yoshuawuyts Feb 16, 2024
fa3d0f3
add tests
yoshuawuyts Feb 17, 2024
c47ba6d
start defining the concurrent stream trait
yoshuawuyts Feb 17, 2024
addef7c
implement concurrent map
yoshuawuyts Mar 14, 2024
6599e1c
don't clone atomics
yoshuawuyts Mar 14, 2024
d398d8a
Revert "don't clone atomics"
yoshuawuyts Mar 14, 2024
1a7dda4
Revert "implement concurrent map"
yoshuawuyts Mar 14, 2024
07f82fc
don't clone atomics
yoshuawuyts Mar 14, 2024
7c380e2
it works!
yoshuawuyts Mar 16, 2024
a54f367
implement `ConcurrentIterator::map`
yoshuawuyts Mar 16, 2024
1fba7cc
implement most of `foreach`
yoshuawuyts Mar 16, 2024
d4457ef
use streamgroup in `for_each`
yoshuawuyts Mar 16, 2024
faf5f26
update `PassThrough`
yoshuawuyts Mar 16, 2024
c1535f3
make `Future` an associated type of `ConcurrentStream`
yoshuawuyts Mar 16, 2024
cc5b97c
update `Drain`
yoshuawuyts Mar 16, 2024
4db355a
update `for_each`
yoshuawuyts Mar 16, 2024
c958e5a
concurrent drain
yoshuawuyts Mar 16, 2024
6fab7af
for_each works!
yoshuawuyts Mar 16, 2024
fbea630
tweak examples
yoshuawuyts Mar 16, 2024
294942a
concurrent driving of streams
yoshuawuyts Mar 16, 2024
3deb039
split limit into its own method
yoshuawuyts Mar 16, 2024
ac0ac0d
boilerplate `try_for_each`
yoshuawuyts Mar 16, 2024
4e2d283
correctly short-circuit
yoshuawuyts Mar 16, 2024
b85e222
add `ConcurrentStream::enumerate`
yoshuawuyts Mar 16, 2024
e45af14
impl size_hint for `ConcurrentStream`
yoshuawuyts Mar 16, 2024
ab7276b
remove passthrough and add struct docs
yoshuawuyts Mar 17, 2024
f22665d
move IntoConcurrentStream to a new location
yoshuawuyts Mar 17, 2024
433d67d
put `ConcurrentStream` behind the `alloc` feature
yoshuawuyts Mar 17, 2024
d48d38a
fix alloc and no_std builds
yoshuawuyts Mar 17, 2024
52168e5
implement `collect`
yoshuawuyts Mar 17, 2024
5e4b495
fix alloc builds
yoshuawuyts Mar 17, 2024
b8835d9
add private `Try` traits
yoshuawuyts Mar 17, 2024
a5a4022
use the private `Try` trait for `try_for_each`
yoshuawuyts Mar 17, 2024
6fca1e7
fix core builds
yoshuawuyts Mar 17, 2024
6918f71
add an `IntoAsyncStream` impl for `Vec`
yoshuawuyts Mar 20, 2024
4667657
remove `drain`
yoshuawuyts Mar 20, 2024
ce89592
allow unused utils
yoshuawuyts Mar 20, 2024
cf43c56
Delete drain.rs
yoshuawuyts Mar 20, 2024
49b91e9
rename `finish` to `flush`
yoshuawuyts Mar 20, 2024
745bb53
start pinning APIs
yoshuawuyts Mar 20, 2024
ac9087c
stack-pin the `FutureGroup`
yoshuawuyts Mar 20, 2024
8bed249
fix alloc feature
yoshuawuyts Mar 20, 2024
aab09ed
apply clippy fixes
yoshuawuyts Mar 20, 2024
bf9d34c
fix alloc warnings?
yoshuawuyts Mar 20, 2024
f1a64d1
remove redundant import of `ConcurrentStream`
matheus-consoli Mar 21, 2024
8ef8cd0
remove more redundant imports
matheus-consoli Mar 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ readme = "README.md"
edition = "2021"
keywords = []
categories = []
authors = [
"Yoshua Wuyts <[email protected]>"
]
authors = ["Yoshua Wuyts <[email protected]>"]

[profile.bench]
debug = true
Expand All @@ -33,17 +31,21 @@ std = ["alloc"]
alloc = ["bitvec/alloc", "dep:slab", "dep:smallvec"]

[dependencies]
bitvec = { version = "1.0.1", default-features = false }
bitvec = { version = "1.0.1", default-features = false, features = ["alloc"] }
futures-core = { version = "0.3", default-features = false }
futures-lite = "1.12.0"
pin-project = "1.0.8"
slab = { version = "0.4.8", optional = true }
smallvec = { version = "1.11.0", optional = true }

[dev-dependencies]
async-std = { version = "1.12.0", features = ["attributes"] }
criterion = { version = "0.3", features = ["async", "async_futures", "html_reports"] }
criterion = { version = "0.3", features = [
"async",
"async_futures",
"html_reports",
] }
futures = "0.3.25"
futures-lite = "1.12.0"
futures-time = "3.0.0"
lending-stream = "1.0.0"
rand = "0.8.5"
Expand Down
2 changes: 1 addition & 1 deletion examples/happy_eyeballs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,5 @@ async fn open_tcp_socket(
}

// Start connecting. If an attempt succeeds, cancel all others attempts.
Ok(futures.race_ok().await?)
futures.race_ok().await
}
2 changes: 2 additions & 0 deletions src/collections/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#[cfg(feature = "alloc")]
pub mod vec;
67 changes: 67 additions & 0 deletions src/collections/vec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//! Parallel iterator types for [vectors][std::vec] (`Vec<T>`)
//!
//! You will rarely need to interact with this module directly unless you need
//! to name one of the iterator types.
//!
//! [std::vec]: https://doc.rust-lang.org/stable/std/vec/use core::future::Ready;

use crate::concurrent_stream::{self, FromStream};
use crate::prelude::*;
use crate::utils::{from_iter, FromIter};
#[cfg(all(feature = "alloc", not(feature = "std")))]
use alloc::vec::Vec;
use core::future::Ready;

pub use crate::future::join::vec::Join;
pub use crate::future::race::vec::Race;
pub use crate::future::race_ok::vec::{AggregateError, RaceOk};
pub use crate::future::try_join::vec::TryJoin;
pub use crate::stream::chain::vec::Chain;
pub use crate::stream::merge::vec::Merge;
pub use crate::stream::zip::vec::Zip;

/// Concurrent async iterator that moves out of a vector.
#[derive(Debug)]
pub struct IntoConcurrentStream<T>(FromStream<FromIter<alloc::vec::IntoIter<T>>>);

impl<T> ConcurrentStream for IntoConcurrentStream<T> {
type Item = T;

type Future = Ready<T>;

async fn drive<C>(self, consumer: C) -> C::Output
where
C: concurrent_stream::Consumer<Self::Item, Self::Future>,
{
self.0.drive(consumer).await
}

fn concurrency_limit(&self) -> Option<core::num::NonZeroUsize> {
self.0.concurrency_limit()
}
}

impl<T> concurrent_stream::IntoConcurrentStream for Vec<T> {
type Item = T;

type IntoConcurrentStream = IntoConcurrentStream<T>;

fn into_co_stream(self) -> Self::IntoConcurrentStream {
let stream = from_iter(self);
let co_stream = stream.co();
IntoConcurrentStream(co_stream)
}
}

#[cfg(test)]
mod test {
use crate::prelude::*;

#[test]
fn collect() {
futures_lite::future::block_on(async {
let v: Vec<_> = vec![1, 2, 3, 4, 5].into_co_stream().collect().await;
assert_eq!(v, &[1, 2, 3, 4, 5]);
});
}
}
154 changes: 154 additions & 0 deletions src/concurrent_stream/enumerate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
use pin_project::pin_project;

use super::{ConcurrentStream, Consumer};
use core::future::Future;
use core::num::NonZeroUsize;
use core::pin::Pin;
use core::task::{ready, Context, Poll};

/// A concurrent iterator that yields the current count and the element during iteration.
///
/// This `struct` is created by the [`enumerate`] method on [`ConcurrentStream`]. See its
/// documentation for more.
///
/// [`enumerate`]: ConcurrentStream::enumerate
/// [`ConcurrentStream`]: trait.ConcurrentStream.html
#[derive(Debug)]
pub struct Enumerate<CS: ConcurrentStream> {
inner: CS,
}

impl<CS: ConcurrentStream> Enumerate<CS> {
pub(crate) fn new(inner: CS) -> Self {
Self { inner }
}
}

impl<CS: ConcurrentStream> ConcurrentStream for Enumerate<CS> {
type Item = (usize, CS::Item);
type Future = EnumerateFuture<CS::Future, CS::Item>;

async fn drive<C>(self, consumer: C) -> C::Output
where
C: Consumer<Self::Item, Self::Future>,
{
self.inner
.drive(EnumerateConsumer {
inner: consumer,
count: 0,
})
.await
}

fn concurrency_limit(&self) -> Option<NonZeroUsize> {
self.inner.concurrency_limit()
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}

#[pin_project]
struct EnumerateConsumer<C> {
#[pin]
inner: C,
count: usize,
}
impl<C, Item, Fut> Consumer<Item, Fut> for EnumerateConsumer<C>
where
Fut: Future<Output = Item>,
C: Consumer<(usize, Item), EnumerateFuture<Fut, Item>>,
{
type Output = C::Output;

async fn send(self: Pin<&mut Self>, future: Fut) -> super::ConsumerState {
let this = self.project();
let count = *this.count;
*this.count += 1;
this.inner.send(EnumerateFuture::new(future, count)).await
}

async fn progress(self: Pin<&mut Self>) -> super::ConsumerState {
let this = self.project();
this.inner.progress().await
}

async fn flush(self: Pin<&mut Self>) -> Self::Output {
let this = self.project();
this.inner.flush().await
}
}

/// Takes a future and maps it to another future via a closure
#[derive(Debug)]
#[pin_project::pin_project]
pub struct EnumerateFuture<FutT, T>
where
FutT: Future<Output = T>,
{
done: bool,
#[pin]
fut_t: FutT,
count: usize,
}

impl<FutT, T> EnumerateFuture<FutT, T>
where
FutT: Future<Output = T>,
{
fn new(fut_t: FutT, count: usize) -> Self {
Self {
done: false,
fut_t,
count,
}
}
}

impl<FutT, T> Future for EnumerateFuture<FutT, T>
where
FutT: Future<Output = T>,
{
type Output = (usize, T);

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if *this.done {
panic!("future has already been polled to completion once");
}

let item = ready!(this.fut_t.poll(cx));
*this.done = true;
Poll::Ready((*this.count, item))
}
}

#[cfg(test)]
mod test {
// use crate::concurrent_stream::{ConcurrentStream, IntoConcurrentStream};
use crate::prelude::*;
use futures_lite::stream;
use futures_lite::StreamExt;
use std::num::NonZeroUsize;

#[test]
fn enumerate() {
futures_lite::future::block_on(async {
let mut n = 0;
stream::iter(std::iter::from_fn(|| {
let v = n;
n += 1;
Some(v)
}))
.take(5)
.co()
.limit(NonZeroUsize::new(1))
.enumerate()
.for_each(|(index, n)| async move {
assert_eq!(index, n);
})
.await;
});
}
}
Loading
Loading