From 932892c089930a2fb45dc8c7cdbb1fbbe1130f8b Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts <2467194+yoshuawuyts@users.noreply.github.com> Date: Mon, 21 Nov 2022 11:48:56 +0100 Subject: [PATCH 1/2] replace rng with deterministic indexer --- src/future/race/array.rs | 10 ++++----- src/future/race/tuple.rs | 14 +++++-------- src/future/race/vec.rs | 11 ++++------ src/stream/merge/array.rs | 10 ++++----- src/stream/merge/tuple.rs | 7 +++---- src/stream/merge/vec.rs | 10 ++++----- src/utils/indexer.rs | 44 +++++++++++++++++++++++++++++++++++++++ src/utils/mod.rs | 6 +++--- src/utils/rng.rs | 30 -------------------------- src/utils/tuple.rs | 24 ++++++++------------- 10 files changed, 80 insertions(+), 86 deletions(-) create mode 100644 src/utils/indexer.rs delete mode 100644 src/utils/rng.rs diff --git a/src/future/race/array.rs b/src/future/race/array.rs index 9ce2fa3..0c66962 100644 --- a/src/future/race/array.rs +++ b/src/future/race/array.rs @@ -1,4 +1,4 @@ -use crate::utils::{self, RandomGenerator}; +use crate::utils::{self, Indexer}; use super::Race as RaceTrait; @@ -24,7 +24,7 @@ where { #[pin] futures: [Fut; N], - rng: RandomGenerator, + indexer: Indexer, done: bool, } @@ -48,9 +48,7 @@ where let mut this = self.project(); assert!(!*this.done, "Futures must not be polled after completing"); - let index = this.rng.generate(N as u32) as usize; - - for index in (0..N).map(|pos| (index + pos).wrapping_rem(N)) { + for index in this.indexer.iter() { let fut = utils::get_pin_mut(this.futures.as_mut(), index).unwrap(); match fut.poll(cx) { Poll::Ready(item) => { @@ -74,7 +72,7 @@ where fn race(self) -> Self::Future { Race { futures: self.map(|fut| fut.into_future()), - rng: RandomGenerator::new(), + indexer: Indexer::new(N), done: false, } } diff --git a/src/future/race/tuple.rs b/src/future/race/tuple.rs index 3346c3a..0b501ac 100644 --- a/src/future/race/tuple.rs +++ b/src/future/race/tuple.rs @@ -25,7 +25,7 @@ macro_rules! impl_race_tuple { $F: Future, )* { done: bool, - rng: utils::RandomGenerator, + indexer: utils::Indexer, $(#[pin] $F: $F,)* } @@ -52,7 +52,7 @@ macro_rules! impl_race_tuple { let ($($F,)*): ($($F,)*) = self; $StructName { done: false, - rng: utils::RandomGenerator::new(), + indexer: utils::Indexer::new(utils::tuple_len!($($F,)*)), $($F: $F.into_future()),* } } @@ -70,17 +70,13 @@ macro_rules! impl_race_tuple { let mut this = self.project(); assert!(!*this.done, "Futures must not be polled after completing"); - #[repr(u32)] + #[repr(usize)] enum Indexes { $($F),* } - const LEN: u32 = [$(Indexes::$F),*].len() as u32; - const PERMUTATIONS: u32 = utils::permutations(LEN); - let r = this.rng.generate(PERMUTATIONS); - - for i in 0..LEN { - utils::gen_conditions!(LEN, i, r, this, cx, poll, $((Indexes::$F as u32; $F, { + for i in this.indexer.iter() { + utils::gen_conditions!(i, this, cx, poll, $((Indexes::$F as usize; $F, { Poll::Ready(output) => { *this.done = true; return Poll::Ready(output); diff --git a/src/future/race/vec.rs b/src/future/race/vec.rs index 5a66c2a..de664d8 100644 --- a/src/future/race/vec.rs +++ b/src/future/race/vec.rs @@ -1,4 +1,4 @@ -use crate::utils::{self, RandomGenerator}; +use crate::utils::{self, Indexer}; use super::Race as RaceTrait; @@ -24,7 +24,7 @@ where { #[pin] futures: Vec, - rng: RandomGenerator, + indexer: Indexer, done: bool, } @@ -48,10 +48,7 @@ where let mut this = self.project(); assert!(!*this.done, "Futures must not be polled after completing"); - let len = this.futures.len(); - let index = this.rng.generate(len as u32) as usize; - - for index in (0..len).map(|pos| (index + pos).wrapping_rem(len)) { + for index in this.indexer.iter() { let fut = utils::get_pin_mut_from_vec(this.futures.as_mut(), index).unwrap(); match fut.poll(cx) { Poll::Ready(item) => { @@ -74,8 +71,8 @@ where fn race(self) -> Self::Future { Race { + indexer: Indexer::new(self.len()), futures: self.into_iter().map(|fut| fut.into_future()).collect(), - rng: RandomGenerator::new(), done: false, } } diff --git a/src/stream/merge/array.rs b/src/stream/merge/array.rs index 57cbb0e..8a5fe39 100644 --- a/src/stream/merge/array.rs +++ b/src/stream/merge/array.rs @@ -1,6 +1,6 @@ use super::Merge as MergeTrait; use crate::stream::IntoStream; -use crate::utils::{self, PollArray, RandomGenerator, WakerArray}; +use crate::utils::{self, Indexer, PollArray, WakerArray}; use core::fmt; use futures_core::Stream; @@ -21,7 +21,7 @@ where { #[pin] streams: [S; N], - rng: RandomGenerator, + indexer: Indexer, wakers: WakerArray, state: PollArray, complete: usize, @@ -35,7 +35,7 @@ where pub(crate) fn new(streams: [S; N]) -> Self { Self { streams, - rng: RandomGenerator::new(), + indexer: Indexer::new(N), wakers: WakerArray::new(), state: PollArray::new(), complete: 0, @@ -68,9 +68,7 @@ where // Iterate over our streams one-by-one. If a stream yields a value, // we exit early. By default we'll return `Poll::Ready(None)`, but // this changes if we encounter a `Poll::Pending`. - let len = this.streams.len(); - let r = this.rng.generate(len as u32) as usize; - for index in (0..len).map(|n| (r + n).wrapping_rem(len)) { + for index in this.indexer.iter() { if !readiness.any_ready() { // Nothing is ready yet return Poll::Pending; diff --git a/src/stream/merge/tuple.rs b/src/stream/merge/tuple.rs index 4b1c7b6..093bc5b 100644 --- a/src/stream/merge/tuple.rs +++ b/src/stream/merge/tuple.rs @@ -92,7 +92,7 @@ macro_rules! impl_merge_tuple { $F: Stream, )* { #[pin] streams: $mod_name::Streams<$($F,)+>, - rng: utils::RandomGenerator, + indexer: utils::Indexer, wakers: WakerArray<{$mod_name::LEN}>, state: PollArray<{$mod_name::LEN}>, completed: u8, @@ -123,14 +123,13 @@ macro_rules! impl_merge_tuple { readiness.set_waker(cx.waker()); const LEN: u8 = $mod_name::LEN as u8; - let r = this.rng.generate(LEN as u32) as u8; let mut streams = this.streams.project(); // Iterate over our streams one-by-one. If a stream yields a value, // we exit early. By default we'll return `Poll::Ready(None)`, but // this changes if we encounter a `Poll::Pending`. - for index in (0..LEN).map(|n| (r + n).wrapping_rem(LEN) as usize) { + for index in this.indexer.iter() { if !readiness.any_ready() { // Nothing is ready yet return Poll::Pending; @@ -175,7 +174,7 @@ macro_rules! impl_merge_tuple { let ($($F,)*): ($($F,)*) = self; $StructName { streams: $mod_name::Streams { $($F: $F.into_stream()),+ }, - rng: utils::RandomGenerator::new(), + indexer: utils::Indexer::new(utils::tuple_len!($($F,)*)), wakers: WakerArray::new(), state: PollArray::new(), completed: 0, diff --git a/src/stream/merge/vec.rs b/src/stream/merge/vec.rs index a1a0e39..80869b4 100644 --- a/src/stream/merge/vec.rs +++ b/src/stream/merge/vec.rs @@ -1,6 +1,6 @@ use super::Merge as MergeTrait; use crate::stream::IntoStream; -use crate::utils::{self, PollVec, RandomGenerator, WakerVec}; +use crate::utils::{self, Indexer, PollVec, WakerVec}; use core::fmt; use futures_core::Stream; @@ -21,7 +21,7 @@ where { #[pin] streams: Vec, - rng: RandomGenerator, + indexer: Indexer, complete: usize, wakers: WakerVec, state: PollVec, @@ -37,8 +37,8 @@ where Self { wakers: WakerVec::new(len), state: PollVec::new(len), + indexer: Indexer::new(len), streams, - rng: RandomGenerator::new(), complete: 0, done: false, } @@ -69,9 +69,7 @@ where // Iterate over our streams one-by-one. If a stream yields a value, // we exit early. By default we'll return `Poll::Ready(None)`, but // this changes if we encounter a `Poll::Pending`. - let len = this.streams.len(); - let r = this.rng.generate(len as u32) as usize; - for index in (0..len).map(|n| (r + n).wrapping_rem(len)) { + for index in this.indexer.iter() { if !readiness.any_ready() { // Nothing is ready yet return Poll::Pending; diff --git a/src/utils/indexer.rs b/src/utils/indexer.rs new file mode 100644 index 0000000..afcbadd --- /dev/null +++ b/src/utils/indexer.rs @@ -0,0 +1,44 @@ +use core::ops; + +/// Generate an iteration sequence. This provides *fair* iteration when multiple +/// futures need to be polled concurrently. +pub(crate) struct Indexer { + offset: usize, + max: usize, +} + +impl Indexer { + pub(crate) fn new(max: usize) -> Self { + Self { offset: 0, max } + } + + /// Generate a range between `0..max`, incrementing the starting point + /// for the next iteration. + pub(crate) fn iter(&mut self) -> IndexIter { + // Increment the starting point for next time. + let offset = self.offset; + self.offset = self.offset.wrapping_rem(self.max); + + IndexIter { + iter: (0..self.max), + max: self.max, + offset, + } + } +} + +pub(crate) struct IndexIter { + iter: ops::Range, + offset: usize, + max: usize, +} + +impl Iterator for IndexIter { + type Item = usize; + + fn next(&mut self) -> Option { + self.iter + .next() + .map(|pos| (pos + self.offset).wrapping_rem(self.max)) + } +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 324e13f..ac5d38e 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,18 +1,18 @@ //! Utilities to implement the different futures of this crate. mod array; +mod indexer; mod pin; mod poll_state; -mod rng; mod tuple; mod wakers; pub(crate) use array::array_assume_init; +pub(crate) use indexer::Indexer; pub(crate) use pin::{get_pin_mut, get_pin_mut_from_vec, iter_pin_mut, iter_pin_mut_vec}; pub(crate) use poll_state::MaybeDone; pub(crate) use poll_state::{PollArray, PollState, PollVec}; -pub(crate) use rng::RandomGenerator; -pub(crate) use tuple::{gen_conditions, permutations}; +pub(crate) use tuple::{gen_conditions, tuple_len}; pub(crate) use wakers::{WakerArray, WakerVec}; #[cfg(test)] diff --git a/src/utils/rng.rs b/src/utils/rng.rs deleted file mode 100644 index 6b27e91..0000000 --- a/src/utils/rng.rs +++ /dev/null @@ -1,30 +0,0 @@ -use std::num::Wrapping; - -/// Generates a random number in `0..n`. -pub(crate) struct RandomGenerator(Wrapping); - -impl RandomGenerator { - pub(crate) fn new() -> Self { - // Take the address of a local value as seed. - let mut x = 0i32; - let r = &mut x; - let addr = r as *mut i32 as usize; - Self(Wrapping(addr as u32)) - } - pub(crate) fn generate(&mut self, n: u32) -> u32 { - // This is the 32-bit variant of Xorshift. - // - // Source: https://en.wikipedia.org/wiki/Xorshift - let mut x = self.0; - x ^= x << 13; - x ^= x >> 17; - x ^= x << 5; - self.0 = x; - - // This is a fast alternative to `x % n`. - // - // Author: Daniel Lemire - // Source: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ - ((u64::from(x.0)).wrapping_mul(u64::from(n)) >> 32) as u32 - } -} diff --git a/src/utils/tuple.rs b/src/utils/tuple.rs index 9e4bc55..4afbd64 100644 --- a/src/utils/tuple.rs +++ b/src/utils/tuple.rs @@ -1,16 +1,3 @@ -/// Compute the number of permutations for a number -/// during compilation. -pub(crate) const fn permutations(mut num: u32) -> u32 { - let mut total = 1; - loop { - total *= num; - num -= 1; - if num == 0 { - break total; - } - } -} - /// Generate the `match` conditions inside the main polling body. This macro /// chooses a random starting point on each call to the given method, making /// it "fair". @@ -30,9 +17,9 @@ pub(crate) const fn permutations(mut num: u32) -> u32 { // - https://rust-lang.github.io/rfcs/3086-macro-metavar-expr.html macro_rules! gen_conditions { // Base condition, setup the depth counter. - ($LEN:expr, $i:expr, $r:expr, $this:expr, $cx:expr, $method:ident, $(($F_index: expr; $F:ident, { $($arms:pat => $foo:expr,)* }))*) => { + ($i:expr, $this:expr, $cx:expr, $method:ident, $(($F_index: expr; $F:ident, { $($arms:pat => $foo:expr,)* }))*) => { $( - if $i == ($r + $F_index).wrapping_rem($LEN) { + if $i == $F_index { match unsafe { Pin::new_unchecked(&mut $this.$F) }.$method($cx) { $($arms => $foo,)* } @@ -41,3 +28,10 @@ macro_rules! gen_conditions { } } pub(crate) use gen_conditions; + +/// Calculate the number of tuples currently being operated on. +macro_rules! tuple_len { + (@count_one $F:ident) => (1); + ($($F:ident,)*) => (0 $(+ crate::utils::tuple_len!(@count_one $F))*); +} +pub(crate) use tuple_len; From 654e9930e7e2bbcc7729cbcca08e4aef6fe6d079 Mon Sep 17 00:00:00 2001 From: Yosh Date: Tue, 22 Nov 2022 16:33:29 +0100 Subject: [PATCH 2/2] Apply suggestions from code review Co-authored-by: Consoli --- src/utils/indexer.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/utils/indexer.rs b/src/utils/indexer.rs index afcbadd..9da6227 100644 --- a/src/utils/indexer.rs +++ b/src/utils/indexer.rs @@ -17,11 +17,10 @@ impl Indexer { pub(crate) fn iter(&mut self) -> IndexIter { // Increment the starting point for next time. let offset = self.offset; - self.offset = self.offset.wrapping_rem(self.max); + self.offset = (self.offset + 1).wrapping_rem(self.max); IndexIter { iter: (0..self.max), - max: self.max, offset, } } @@ -30,7 +29,6 @@ impl Indexer { pub(crate) struct IndexIter { iter: ops::Range, offset: usize, - max: usize, } impl Iterator for IndexIter { @@ -39,6 +37,6 @@ impl Iterator for IndexIter { fn next(&mut self) -> Option { self.iter .next() - .map(|pos| (pos + self.offset).wrapping_rem(self.max)) + .map(|pos| (pos + self.offset).wrapping_rem(self.iter.end)) } }