From 46ecb8fbcf01ca7925584b4497c992015b90e7b5 Mon Sep 17 00:00:00 2001 From: little-dude Date: Thu, 1 Oct 2020 10:19:32 +0200 Subject: [PATCH] remove un-necessary pin projections see: https://users.rust-lang.org/t/is-my-pin-projection-actually-safe/49449/7?u=little_dude --- rust/Cargo.lock | 1 - rust/xaynet-client/Cargo.toml | 1 - rust/xaynet-client/src/lib.rs | 2 -- .../src/utils/concurrent_futures.rs | 34 ++++++++++++------- rust/xaynet-client/src/utils/mod.rs | 1 + 5 files changed, 23 insertions(+), 16 deletions(-) create mode 100644 rust/xaynet-client/src/utils/mod.rs diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 8fcee2e2d..cfd0fe7e4 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -2862,7 +2862,6 @@ dependencies = [ "bytes", "derive_more", "futures", - "pin-project", "reqwest", "serde 1.0.116", "sodiumoxide", diff --git a/rust/xaynet-client/Cargo.toml b/rust/xaynet-client/Cargo.toml index c9a456c71..aa594b609 100644 --- a/rust/xaynet-client/Cargo.toml +++ b/rust/xaynet-client/Cargo.toml @@ -21,7 +21,6 @@ tracing = "0.1.19" async-trait = "0.1.40" xaynet-core = { path = "../xaynet-core", version = "0.1.0" } reqwest = { version = "0.10.8", default-features = false } -pin-project = "0.4.24" futures = "0.3.5" [dev-dependencies] diff --git a/rust/xaynet-client/src/lib.rs b/rust/xaynet-client/src/lib.rs index 2d66cb3bf..9037dad13 100644 --- a/rust/xaynet-client/src/lib.rs +++ b/rust/xaynet-client/src/lib.rs @@ -45,8 +45,6 @@ extern crate async_trait; extern crate serde; #[macro_use] extern crate tracing; -#[macro_use] -extern crate pin_project; use std::time::Duration; diff --git a/rust/xaynet-client/src/utils/concurrent_futures.rs b/rust/xaynet-client/src/utils/concurrent_futures.rs index c093c055f..fb4230fd5 100644 --- a/rust/xaynet-client/src/utils/concurrent_futures.rs +++ b/rust/xaynet-client/src/utils/concurrent_futures.rs @@ -1,30 +1,26 @@ +#![allow(dead_code)] + use std::{ collections::VecDeque, pin::Pin, task::{Context, Poll}, - time::Duration, }; use futures::{ - stream::{FuturesUnordered, Stream, StreamExt}, + stream::{FuturesUnordered, Stream}, Future, }; -use tokio::{ - task::{JoinError, JoinHandle}, - time::delay_for, -}; +use tokio::task::{JoinError, JoinHandle}; /// `ConcurrentFutures` can keep a capped number of futures running concurrently, and yield their /// result as they finish. When the max number of concurrent futures is reached, new tasks are /// queued until some in-flight futures finish. -#[pin_project] pub struct ConcurrentFutures where T: Future + Send + 'static, T::Output: Send + 'static, { /// in-flight futures - #[pin] running: FuturesUnordered>, /// buffered tasks pending: VecDeque, @@ -50,15 +46,22 @@ where } } +impl Unpin for ConcurrentFutures +where + T: Future + Send + 'static, + T::Output: Send + 'static, +{ +} + impl Stream for ConcurrentFutures where T: Future + Send + 'static, T::Output: Send + 'static, { type Item = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let this = self.project(); - while this.running.len() < *this.max_in_flight { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.get_mut(); + while this.running.len() < this.max_in_flight { if let Some(pending) = this.pending.pop_front() { let handle = tokio::spawn(pending); this.running.push(handle); @@ -66,12 +69,19 @@ where break; } } - this.running.poll_next(cx) + Pin::new(&mut this.running).poll_next(cx) } } #[cfg(test)] mod tests { + use std::time::Duration; + + use futures::stream::StreamExt; + use tokio::time::delay_for; + + use super::*; + #[tokio::test] async fn test() { let mut stream = diff --git a/rust/xaynet-client/src/utils/mod.rs b/rust/xaynet-client/src/utils/mod.rs new file mode 100644 index 000000000..fe1779808 --- /dev/null +++ b/rust/xaynet-client/src/utils/mod.rs @@ -0,0 +1 @@ +pub(crate) mod concurrent_futures;