Skip to content
This repository has been archived by the owner on Aug 30, 2022. It is now read-only.

Commit

Permalink
remove un-necessary pin projections
Browse files Browse the repository at this point in the history
  • Loading branch information
little-dude committed Oct 1, 2020
1 parent 19880d9 commit 46ecb8f
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 16 deletions.
1 change: 0 additions & 1 deletion rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion rust/xaynet-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ tracing = "0.1.19"
async-trait = "0.1.40"
xaynet-core = { path = "../xaynet-core", version = "0.1.0" }
reqwest = { version = "0.10.8", default-features = false }
pin-project = "0.4.24"
futures = "0.3.5"

[dev-dependencies]
Expand Down
2 changes: 0 additions & 2 deletions rust/xaynet-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ extern crate async_trait;
extern crate serde;
#[macro_use]
extern crate tracing;
#[macro_use]
extern crate pin_project;

use std::time::Duration;

Expand Down
34 changes: 22 additions & 12 deletions rust/xaynet-client/src/utils/concurrent_futures.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,26 @@
#![allow(dead_code)]

use std::{
collections::VecDeque,
pin::Pin,
task::{Context, Poll},
time::Duration,
};

use futures::{
stream::{FuturesUnordered, Stream, StreamExt},
stream::{FuturesUnordered, Stream},
Future,
};
use tokio::{
task::{JoinError, JoinHandle},
time::delay_for,
};
use tokio::task::{JoinError, JoinHandle};

/// `ConcurrentFutures` can keep a capped number of futures running concurrently, and yield their
/// result as they finish. When the max number of concurrent futures is reached, new tasks are
/// queued until some in-flight futures finish.
#[pin_project]
pub struct ConcurrentFutures<T>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
/// in-flight futures
#[pin]
running: FuturesUnordered<JoinHandle<T::Output>>,
/// buffered tasks
pending: VecDeque<T>,
Expand All @@ -50,28 +46,42 @@ where
}
}

impl<T> Unpin for ConcurrentFutures<T>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
}

impl<T> Stream for ConcurrentFutures<T>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
type Item = Result<T::Output, JoinError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
while this.running.len() < *this.max_in_flight {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
while this.running.len() < this.max_in_flight {
if let Some(pending) = this.pending.pop_front() {
let handle = tokio::spawn(pending);
this.running.push(handle);
} else {
break;
}
}
this.running.poll_next(cx)
Pin::new(&mut this.running).poll_next(cx)
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use futures::stream::StreamExt;
use tokio::time::delay_for;

use super::*;

#[tokio::test]
async fn test() {
let mut stream =
Expand Down
1 change: 1 addition & 0 deletions rust/xaynet-client/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub(crate) mod concurrent_futures;

0 comments on commit 46ecb8f

Please sign in to comment.