From dea3f6b116374347eaf458b0f4bdd0cf5403e49b Mon Sep 17 00:00:00 2001 From: little-dude Date: Thu, 1 Oct 2020 09:53:34 +0200 Subject: [PATCH 1/4] add a helper for running futures concurrently This will allow clients to start sending multiple chunks concurrently. [`FuturesUnordered`](https://docs.rs/futures-util/0.3.5/futures_util/stream/futures_unordered/struct.FuturesUnordered.html) allows us to poll multiple futures, but if we want to really run them _concurrently_ we need to spawn them first. So this type is just a thin wrapper around `FuturesUnordered`. --- rust/Cargo.lock | 1 + rust/xaynet-sdk/Cargo.toml | 7 +- rust/xaynet-sdk/src/lib.rs | 10 +- .../src/utils/concurrent_futures.rs | 121 ++++++++++++++++++ rust/xaynet-sdk/src/utils/mod.rs | 2 + 5 files changed, 133 insertions(+), 8 deletions(-) create mode 100644 rust/xaynet-sdk/src/utils/concurrent_futures.rs create mode 100644 rust/xaynet-sdk/src/utils/mod.rs diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 9c52f7ac4..d27eebf8b 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -4098,6 +4098,7 @@ dependencies = [ "bincode", "bytes 0.5.6", "derive_more", + "futures", "mockall", "num", "paste", diff --git a/rust/xaynet-sdk/Cargo.toml b/rust/xaynet-sdk/Cargo.toml index 239aeff60..55427b4cc 100644 --- a/rust/xaynet-sdk/Cargo.toml +++ b/rust/xaynet-sdk/Cargo.toml @@ -19,10 +19,15 @@ async-trait = "0.1.42" base64 = "0.13.0" bincode = "1.3.1" derive_more = { version = "0.99.11", default-features = false, features = ["from"] } +# TODO: remove once concurrent_futures.rs was moved to the e2e package +futures = "0.3.10" paste = "1.0.4" serde = { version = "1.0.119", features = ["derive"] } sodiumoxide = "0.2.6" thiserror = "1.0.23" +# TODO (XN-1372): upgrade +# TODO: move to dev-dependencies once concurrent_futures.rs was moved to the e2e package +tokio = { version = "0.2.24", features = ["rt-core", "macros"] } tracing = "0.1.22" url = "2.2.0" xaynet-core = { path = "../xaynet-core", version = "0.1.0" } @@ -39,8 +44,6 @@ rand = "0.8.2" mockall = "0.9.0" num = { version = "0.3.1", features = ["serde"] } serde_json = "1.0.61" -# TODO (XN-1372): upgrade -tokio = { version = "0.2.24", features = ["rt-core", "macros"] } # TODO (XN-1372): can't upgrade yet because of tokio tokio-test = "0.2.1" xaynet-core = { path = "../xaynet-core", features = ["testutils"] } diff --git a/rust/xaynet-sdk/src/lib.rs b/rust/xaynet-sdk/src/lib.rs index 4cbda3316..300a1778c 100644 --- a/rust/xaynet-sdk/src/lib.rs +++ b/rust/xaynet-sdk/src/lib.rs @@ -201,14 +201,12 @@ //! ``` pub mod client; - mod message_encoder; -pub(crate) use self::message_encoder::MessageEncoder; - pub mod settings; - mod state_machine; -pub use state_machine::{LocalModelConfig, SerializableState, StateMachine, TransitionOutcome}; - mod traits; +pub(crate) mod utils; + +pub(crate) use self::message_encoder::MessageEncoder; pub use self::traits::{ModelStore, Notify, XaynetClient}; +pub use state_machine::{LocalModelConfig, SerializableState, StateMachine, TransitionOutcome}; diff --git a/rust/xaynet-sdk/src/utils/concurrent_futures.rs b/rust/xaynet-sdk/src/utils/concurrent_futures.rs new file mode 100644 index 000000000..c093c055f --- /dev/null +++ b/rust/xaynet-sdk/src/utils/concurrent_futures.rs @@ -0,0 +1,121 @@ +use std::{ + collections::VecDeque, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; + +use futures::{ + stream::{FuturesUnordered, Stream, StreamExt}, + Future, +}; +use tokio::{ + task::{JoinError, JoinHandle}, + time::delay_for, +}; + +/// `ConcurrentFutures` can keep a capped number of futures running concurrently, and yield their +/// result as they finish. When the max number of concurrent futures is reached, new tasks are +/// queued until some in-flight futures finish. +#[pin_project] +pub struct ConcurrentFutures +where + T: Future + Send + 'static, + T::Output: Send + 'static, +{ + /// in-flight futures + #[pin] + running: FuturesUnordered>, + /// buffered tasks + pending: VecDeque, + /// max number of concurrent futures + max_in_flight: usize, +} + +impl ConcurrentFutures +where + T: Future + Send + 'static, + T::Output: Send + 'static, +{ + pub fn new(max_in_flight: usize) -> Self { + Self { + running: FuturesUnordered::new(), + pending: VecDeque::new(), + max_in_flight, + } + } + + pub fn push(&mut self, task: T) { + self.pending.push_back(task) + } +} + +impl Stream for ConcurrentFutures +where + T: Future + Send + 'static, + T::Output: Send + 'static, +{ + type Item = Result; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.project(); + while this.running.len() < *this.max_in_flight { + if let Some(pending) = this.pending.pop_front() { + let handle = tokio::spawn(pending); + this.running.push(handle); + } else { + break; + } + } + this.running.poll_next(cx) + } +} + +#[cfg(test)] +mod tests { + #[tokio::test] + async fn test() { + let mut stream = + ConcurrentFutures:: + Send + 'static>>>::new(2); + + stream.push(Box::pin(async { + delay_for(Duration::from_millis(10_u64)).await; + 1_u8 + })); + + stream.push(Box::pin(async { + delay_for(Duration::from_millis(25_u64)).await; + 2_u8 + })); + + stream.push(Box::pin(async { + delay_for(Duration::from_millis(12_u64)).await; + 3_u8 + })); + + stream.push(Box::pin(async { + delay_for(Duration::from_millis(1_u64)).await; + 4_u8 + })); + + // poll_next hasn't been called yet so nothing is running + assert_eq!(stream.running.len(), 0); + assert_eq!(stream.pending.len(), 4); + assert_eq!(stream.next().await.unwrap().unwrap(), 1); + + // two futures have been spawned, but one of them just finished: one is still running, two are + // still pending + assert_eq!(stream.running.len(), 1); + assert_eq!(stream.pending.len(), 2); + assert_eq!(stream.next().await.unwrap().unwrap(), 3); + + // three futures have been spawned, two finished: one is still running, one is still pending + assert_eq!(stream.running.len(), 1); + assert_eq!(stream.pending.len(), 1); + assert_eq!(stream.next().await.unwrap().unwrap(), 4); + + // four futures have been spawn, three finished: one is still running + assert_eq!(stream.next().await.unwrap().unwrap(), 2); + assert_eq!(stream.running.len(), 0); + assert_eq!(stream.pending.len(), 0); + } +} diff --git a/rust/xaynet-sdk/src/utils/mod.rs b/rust/xaynet-sdk/src/utils/mod.rs new file mode 100644 index 000000000..7be6bff5f --- /dev/null +++ b/rust/xaynet-sdk/src/utils/mod.rs @@ -0,0 +1,2 @@ +// TODO: move to the e2e package +pub mod concurrent_futures; From 7f957cad4e82cae7c9e7f44fba1b12ce13ed8250 Mon Sep 17 00:00:00 2001 From: little-dude Date: Thu, 1 Oct 2020 10:19:32 +0200 Subject: [PATCH 2/4] remove un-necessary pin projections see: https://users.rust-lang.org/t/is-my-pin-projection-actually-safe/49449/7?u=little_dude --- .../src/utils/concurrent_futures.rs | 34 ++++++++++++------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/rust/xaynet-sdk/src/utils/concurrent_futures.rs b/rust/xaynet-sdk/src/utils/concurrent_futures.rs index c093c055f..fb4230fd5 100644 --- a/rust/xaynet-sdk/src/utils/concurrent_futures.rs +++ b/rust/xaynet-sdk/src/utils/concurrent_futures.rs @@ -1,30 +1,26 @@ +#![allow(dead_code)] + use std::{ collections::VecDeque, pin::Pin, task::{Context, Poll}, - time::Duration, }; use futures::{ - stream::{FuturesUnordered, Stream, StreamExt}, + stream::{FuturesUnordered, Stream}, Future, }; -use tokio::{ - task::{JoinError, JoinHandle}, - time::delay_for, -}; +use tokio::task::{JoinError, JoinHandle}; /// `ConcurrentFutures` can keep a capped number of futures running concurrently, and yield their /// result as they finish. When the max number of concurrent futures is reached, new tasks are /// queued until some in-flight futures finish. -#[pin_project] pub struct ConcurrentFutures where T: Future + Send + 'static, T::Output: Send + 'static, { /// in-flight futures - #[pin] running: FuturesUnordered>, /// buffered tasks pending: VecDeque, @@ -50,15 +46,22 @@ where } } +impl Unpin for ConcurrentFutures +where + T: Future + Send + 'static, + T::Output: Send + 'static, +{ +} + impl Stream for ConcurrentFutures where T: Future + Send + 'static, T::Output: Send + 'static, { type Item = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let this = self.project(); - while this.running.len() < *this.max_in_flight { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.get_mut(); + while this.running.len() < this.max_in_flight { if let Some(pending) = this.pending.pop_front() { let handle = tokio::spawn(pending); this.running.push(handle); @@ -66,12 +69,19 @@ where break; } } - this.running.poll_next(cx) + Pin::new(&mut this.running).poll_next(cx) } } #[cfg(test)] mod tests { + use std::time::Duration; + + use futures::stream::StreamExt; + use tokio::time::delay_for; + + use super::*; + #[tokio::test] async fn test() { let mut stream = From df2d42d8ff22b449370e479f54b7fb49287111b3 Mon Sep 17 00:00:00 2001 From: Jan Petsche <58227040+janpetschexain@users.noreply.github.com> Date: Thu, 14 Jan 2021 18:54:08 +0100 Subject: [PATCH 3/4] renaming, fix test timings --- .../src/utils/concurrent_futures.rs | 47 +++++++++++-------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/rust/xaynet-sdk/src/utils/concurrent_futures.rs b/rust/xaynet-sdk/src/utils/concurrent_futures.rs index fb4230fd5..d736af778 100644 --- a/rust/xaynet-sdk/src/utils/concurrent_futures.rs +++ b/rust/xaynet-sdk/src/utils/concurrent_futures.rs @@ -20,11 +20,11 @@ where T: Future + Send + 'static, T::Output: Send + 'static, { - /// in-flight futures + /// In-flight futures. running: FuturesUnordered>, - /// buffered tasks - pending: VecDeque, - /// max number of concurrent futures + /// Buffered tasks. + queued: VecDeque, + /// Max number of concurrent futures. max_in_flight: usize, } @@ -36,13 +36,13 @@ where pub fn new(max_in_flight: usize) -> Self { Self { running: FuturesUnordered::new(), - pending: VecDeque::new(), + queued: VecDeque::new(), max_in_flight, } } pub fn push(&mut self, task: T) { - self.pending.push_back(task) + self.queued.push_back(task) } } @@ -62,8 +62,8 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let this = self.get_mut(); while this.running.len() < this.max_in_flight { - if let Some(pending) = this.pending.pop_front() { - let handle = tokio::spawn(pending); + if let Some(queued) = this.queued.pop_front() { + let handle = tokio::spawn(queued); this.running.push(handle); } else { break; @@ -93,39 +93,46 @@ mod tests { })); stream.push(Box::pin(async { - delay_for(Duration::from_millis(25_u64)).await; + delay_for(Duration::from_millis(28_u64)).await; 2_u8 })); stream.push(Box::pin(async { - delay_for(Duration::from_millis(12_u64)).await; + delay_for(Duration::from_millis(8_u64)).await; 3_u8 })); stream.push(Box::pin(async { - delay_for(Duration::from_millis(1_u64)).await; + delay_for(Duration::from_millis(2_u64)).await; 4_u8 })); - // poll_next hasn't been called yet so nothing is running + // poll_next() hasn't been called yet so all futures are queued assert_eq!(stream.running.len(), 0); - assert_eq!(stream.pending.len(), 4); + assert_eq!(stream.queued.len(), 4); + + // future 1 and 2 are spawned, then future 1 is ready assert_eq!(stream.next().await.unwrap().unwrap(), 1); - // two futures have been spawned, but one of them just finished: one is still running, two are - // still pending + // future 2 is pending, futures 3 and 4 are queued assert_eq!(stream.running.len(), 1); - assert_eq!(stream.pending.len(), 2); + assert_eq!(stream.queued.len(), 2); + + // future 3 is spawned, then future 3 is ready assert_eq!(stream.next().await.unwrap().unwrap(), 3); - // three futures have been spawned, two finished: one is still running, one is still pending + // future 2 is pending, future 4 is queued assert_eq!(stream.running.len(), 1); - assert_eq!(stream.pending.len(), 1); + assert_eq!(stream.queued.len(), 1); + + // future 4 is spawned, then future 4 is ready assert_eq!(stream.next().await.unwrap().unwrap(), 4); - // four futures have been spawn, three finished: one is still running + // future 2 is pending, then future 2 is ready assert_eq!(stream.next().await.unwrap().unwrap(), 2); + + // all futures have been resolved assert_eq!(stream.running.len(), 0); - assert_eq!(stream.pending.len(), 0); + assert_eq!(stream.queued.len(), 0); } } From 16ed2e9219ddd70a857968961300d05925c28828 Mon Sep 17 00:00:00 2001 From: Jan Petsche <58227040+janpetschexain@users.noreply.github.com> Date: Fri, 15 Jan 2021 14:22:54 +0100 Subject: [PATCH 4/4] bump futures version --- rust/Cargo.lock | 36 +++++++++++++++++------------------ rust/xaynet-sdk/Cargo.toml | 2 +- rust/xaynet-server/Cargo.toml | 2 +- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index d27eebf8b..72a142e78 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -974,9 +974,9 @@ checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" [[package]] name = "futures" -version = "0.3.11" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90fa4cc29d25b0687b8570b0da86eac698dcb525110ad8b938fe6712baa711ec" +checksum = "da9052a1a50244d8d5aa9bf55cbc2fb6f357c86cc52e46c62ed390a7180cf150" dependencies = [ "futures-channel", "futures-core", @@ -989,9 +989,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.11" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31ebc390c6913de330e418add60e1a7e5af4cb5ec600d19111b339cafcdcc027" +checksum = "f2d31b7ec7efab6eefc7c57233bb10b847986139d88cc2f5a02a1ae6871a1846" dependencies = [ "futures-core", "futures-sink", @@ -999,15 +999,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.11" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "089bd0baf024d3216916546338fffe4fc8dfffdd901e33c278abb091e0d52111" +checksum = "79e5145dde8da7d1b3892dad07a9c98fc04bc39892b1ecc9692cf53e2b780a65" [[package]] name = "futures-executor" -version = "0.3.11" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0cb59f15119671c94cd9cc543dc9a50b8d5edc468b4ff5f0bb8567f66c6b48a" +checksum = "e9e59fdc009a4b3096bf94f740a0f2424c082521f20a9b08c5c07c48d90fd9b9" dependencies = [ "futures-core", "futures-task", @@ -1016,9 +1016,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.11" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3868967e4e5ab86614e2176c99949eeef6cbcacaee737765f6ae693988273997" +checksum = "28be053525281ad8259d47e4de5de657b25e7bac113458555bb4b70bc6870500" [[package]] name = "futures-lite" @@ -1037,9 +1037,9 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.11" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95778720c3ee3c179cd0d8fd5a0f9b40aa7d745c080f86a8f8bed33c4fd89758" +checksum = "c287d25add322d9f9abdcdc5927ca398917996600182178774032e9f8258fedd" dependencies = [ "proc-macro-hack", "proc-macro2", @@ -1049,24 +1049,24 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.11" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4e0f6be0ec0357772fd58fb751958dd600bd0b3edfd429e77793e4282831360" +checksum = "caf5c69029bda2e743fddd0582d1083951d65cc9539aebf8812f36c3491342d6" [[package]] name = "futures-task" -version = "0.3.11" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "868090f28a925db6cb7462938c51d807546e298fb314088239f0e52fb4338b96" +checksum = "13de07eb8ea81ae445aca7b69f5f7bf15d7bf4912d8ca37d6645c77ae8a58d86" dependencies = [ "once_cell", ] [[package]] name = "futures-util" -version = "0.3.11" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cad5e82786df758d407932aded1235e24d8e2eb438b6adafd37930c2462fb5d1" +checksum = "632a8cd0f2a4b3fdea1657f08bde063848c3bd00f9bbf6e256b8be78802e624b" dependencies = [ "futures-channel", "futures-core", diff --git a/rust/xaynet-sdk/Cargo.toml b/rust/xaynet-sdk/Cargo.toml index 55427b4cc..7f50e4bcb 100644 --- a/rust/xaynet-sdk/Cargo.toml +++ b/rust/xaynet-sdk/Cargo.toml @@ -20,7 +20,7 @@ base64 = "0.13.0" bincode = "1.3.1" derive_more = { version = "0.99.11", default-features = false, features = ["from"] } # TODO: remove once concurrent_futures.rs was moved to the e2e package -futures = "0.3.10" +futures = "0.3.12" paste = "1.0.4" serde = { version = "1.0.119", features = ["derive"] } sodiumoxide = "0.2.6" diff --git a/rust/xaynet-server/Cargo.toml b/rust/xaynet-server/Cargo.toml index 26088168f..52a329ea6 100644 --- a/rust/xaynet-server/Cargo.toml +++ b/rust/xaynet-server/Cargo.toml @@ -31,7 +31,7 @@ derive_more = { version = "0.99.11", default-features = false, features = [ "into", ] } displaydoc = "0.1.7" -futures = "0.3.11" +futures = "0.3.12" hex = "0.4.2" http = "0.2.3" influxdb = "0.3.0"