Skip to content

Commit

Permalink
replace rng with deterministic indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
yoshuawuyts committed Nov 21, 2022
1 parent 3f0326f commit b115cc7
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 73 deletions.
10 changes: 4 additions & 6 deletions src/future/race/array.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::utils::{self, RandomGenerator};
use crate::utils::{self, Indexer};

use super::Race as RaceTrait;

Expand All @@ -24,7 +24,7 @@ where
{
#[pin]
futures: [Fut; N],
rng: RandomGenerator,
indexer: Indexer,
done: bool,
}

Expand All @@ -48,9 +48,7 @@ where
let mut this = self.project();
assert!(!*this.done, "Futures must not be polled after completing");

let index = this.rng.generate(N as u32) as usize;

for index in (0..N).map(|pos| (index + pos).wrapping_rem(N)) {
for index in this.indexer.iter() {
let fut = utils::get_pin_mut(this.futures.as_mut(), index).unwrap();
match fut.poll(cx) {
Poll::Ready(item) => {
Expand All @@ -74,7 +72,7 @@ where
fn race(self) -> Self::Future {
Race {
futures: self.map(|fut| fut.into_future()),
rng: RandomGenerator::new(),
indexer: Indexer::new(N),
done: false,
}
}
Expand Down
14 changes: 5 additions & 9 deletions src/future/race/tuple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ macro_rules! impl_race_tuple {
$F: Future<Output = T>,
)* {
done: bool,
rng: utils::RandomGenerator,
indexer: utils::Indexer,
$(#[pin] $F: $F,)*
}

Expand All @@ -52,7 +52,7 @@ macro_rules! impl_race_tuple {
let ($($F,)*): ($($F,)*) = self;
$StructName {
done: false,
rng: utils::RandomGenerator::new(),
indexer: utils::Indexer::new(utils::tuple_len!($($F,)*)),
$($F: $F.into_future()),*
}
}
Expand All @@ -70,17 +70,13 @@ macro_rules! impl_race_tuple {
let mut this = self.project();
assert!(!*this.done, "Futures must not be polled after completing");

#[repr(u32)]
#[repr(usize)]
enum Indexes {
$($F),*
}

const LEN: u32 = [$(Indexes::$F),*].len() as u32;
const PERMUTATIONS: u32 = utils::permutations(LEN);
let r = this.rng.generate(PERMUTATIONS);

for i in 0..LEN {
utils::gen_conditions!(LEN, i, r, this, cx, poll, $((Indexes::$F as u32; $F, {
for i in this.indexer.iter() {
utils::gen_conditions!(i, this, cx, poll, $((Indexes::$F as usize; $F, {
Poll::Ready(output) => {
*this.done = true;
return Poll::Ready(output);
Expand Down
11 changes: 4 additions & 7 deletions src/future/race/vec.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::utils::{self, RandomGenerator};
use crate::utils::{self, Indexer};

use super::Race as RaceTrait;

Expand All @@ -24,7 +24,7 @@ where
{
#[pin]
futures: Vec<Fut>,
rng: RandomGenerator,
indexer: Indexer,
done: bool,
}

Expand All @@ -48,10 +48,7 @@ where
let mut this = self.project();
assert!(!*this.done, "Futures must not be polled after completing");

let len = this.futures.len();
let index = this.rng.generate(len as u32) as usize;

for index in (0..len).map(|pos| (index + pos).wrapping_rem(len)) {
for index in this.indexer.iter() {
let fut = utils::get_pin_mut_from_vec(this.futures.as_mut(), index).unwrap();
match fut.poll(cx) {
Poll::Ready(item) => {
Expand All @@ -74,8 +71,8 @@ where

fn race(self) -> Self::Future {
Race {
indexer: Indexer::new(self.len()),
futures: self.into_iter().map(|fut| fut.into_future()).collect(),
rng: RandomGenerator::new(),
done: false,
}
}
Expand Down
10 changes: 4 additions & 6 deletions src/stream/merge/array.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::Merge as MergeTrait;
use crate::stream::IntoStream;
use crate::utils::{self, PollArray, RandomGenerator, WakerArray};
use crate::utils::{self, Indexer, PollArray, WakerArray};

use core::fmt;
use futures_core::Stream;
Expand All @@ -21,7 +21,7 @@ where
{
#[pin]
streams: [S; N],
rng: RandomGenerator,
indexer: Indexer,
wakers: WakerArray<N>,
state: PollArray<N>,
complete: usize,
Expand All @@ -35,7 +35,7 @@ where
pub(crate) fn new(streams: [S; N]) -> Self {
Self {
streams,
rng: RandomGenerator::new(),
indexer: Indexer::new(N),
wakers: WakerArray::new(),
state: PollArray::new(),
complete: 0,
Expand Down Expand Up @@ -68,9 +68,7 @@ where
// Iterate over our streams one-by-one. If a stream yields a value,
// we exit early. By default we'll return `Poll::Ready(None)`, but
// this changes if we encounter a `Poll::Pending`.
let len = this.streams.len();
let r = this.rng.generate(len as u32) as usize;
for index in (0..len).map(|n| (r + n).wrapping_rem(len)) {
for index in this.indexer.iter() {
if !readiness.any_ready() {
// Nothing is ready yet
return Poll::Pending;
Expand Down
7 changes: 3 additions & 4 deletions src/stream/merge/tuple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ macro_rules! impl_merge_tuple {
$F: Stream<Item = T>,
)* {
#[pin] streams: $mod_name::Streams<$($F,)+>,
rng: utils::RandomGenerator,
indexer: utils::Indexer,
wakers: WakerArray<{$mod_name::LEN}>,
state: PollArray<{$mod_name::LEN}>,
completed: u8,
Expand Down Expand Up @@ -123,14 +123,13 @@ macro_rules! impl_merge_tuple {
readiness.set_waker(cx.waker());

const LEN: u8 = $mod_name::LEN as u8;
let r = this.rng.generate(LEN as u32) as u8;

let mut streams = this.streams.project();

// Iterate over our streams one-by-one. If a stream yields a value,
// we exit early. By default we'll return `Poll::Ready(None)`, but
// this changes if we encounter a `Poll::Pending`.
for index in (0..LEN).map(|n| (r + n).wrapping_rem(LEN) as usize) {
for index in this.indexer.iter() {
if !readiness.any_ready() {
// Nothing is ready yet
return Poll::Pending;
Expand Down Expand Up @@ -175,7 +174,7 @@ macro_rules! impl_merge_tuple {
let ($($F,)*): ($($F,)*) = self;
$StructName {
streams: $mod_name::Streams { $($F: $F.into_stream()),+ },
rng: utils::RandomGenerator::new(),
indexer: utils::Indexer::new(utils::tuple_len!($($F,)*)),
wakers: WakerArray::new(),
state: PollArray::new(),
completed: 0,
Expand Down
10 changes: 4 additions & 6 deletions src/stream/merge/vec.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::Merge as MergeTrait;
use crate::stream::IntoStream;
use crate::utils::{self, PollVec, RandomGenerator, WakerVec};
use crate::utils::{self, Indexer, PollVec, WakerVec};

use core::fmt;
use futures_core::Stream;
Expand All @@ -21,7 +21,7 @@ where
{
#[pin]
streams: Vec<S>,
rng: RandomGenerator,
indexer: Indexer,
complete: usize,
wakers: WakerVec,
state: PollVec,
Expand All @@ -37,8 +37,8 @@ where
Self {
wakers: WakerVec::new(len),
state: PollVec::new(len),
indexer: Indexer::new(len),
streams,
rng: RandomGenerator::new(),
complete: 0,
done: false,
}
Expand Down Expand Up @@ -69,9 +69,7 @@ where
// Iterate over our streams one-by-one. If a stream yields a value,
// we exit early. By default we'll return `Poll::Ready(None)`, but
// this changes if we encounter a `Poll::Pending`.
let len = this.streams.len();
let r = this.rng.generate(len as u32) as usize;
for index in (0..len).map(|n| (r + n).wrapping_rem(len)) {
for index in this.indexer.iter() {
if !readiness.any_ready() {
// Nothing is ready yet
return Poll::Pending;
Expand Down
52 changes: 52 additions & 0 deletions src/utils/indexer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use core::{iter, ops};

/// Generate an iteration sequence. This provides *fair* iteration when multiple
/// futures need to be polled concurrently.
pub(crate) struct Indexer {
offset: usize,
max: usize,
}

impl Indexer {
pub(crate) fn new(max: usize) -> Self {
Self { offset: 0, max }
}

/// Access the current offset.
///
/// This method should be called before calling `iter`, as `iter` will shift
/// the starting point by one for the next iteration.
pub(crate) fn offset(&self) -> usize {
self.offset
}

/// Generate a range between `0..max`, incrementing the starting point
/// for the next iteration.
pub(crate) fn iter(&mut self) -> IndexIter {
// Increment the starting point for next time.
let offset = self.offset;
self.offset.wrapping_rem(self.max);

IndexIter {
iter: (0..self.max),
max: self.max,
offset,
}
}
}

pub(crate) struct IndexIter {
iter: ops::Range<usize>,
offset: usize,
max: usize,
}

impl Iterator for IndexIter {
type Item = usize;

fn next(&mut self) -> Option<Self::Item> {
self.iter
.next()
.map(|pos| (pos + self.offset).wrapping_rem(self.max))
}
}
6 changes: 3 additions & 3 deletions src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@

mod array;
mod fuse;
mod indexer;
mod pin;
mod poll_state;
mod rng;
mod tuple;
mod wakers;

pub(crate) use array::array_assume_init;
pub(crate) use fuse::Fuse;
pub(crate) use indexer::Indexer;
pub(crate) use pin::{get_pin_mut, get_pin_mut_from_vec, iter_pin_mut, iter_pin_mut_vec};
pub(crate) use poll_state::MaybeDone;
pub(crate) use poll_state::{PollArray, PollState, PollVec};
pub(crate) use rng::RandomGenerator;
pub(crate) use tuple::{gen_conditions, permutations};
pub(crate) use tuple::{gen_conditions, permutations, tuple_len};
pub(crate) use wakers::{InlineWakerArray, WakerArray, WakerVec};

#[cfg(test)]
Expand Down
30 changes: 0 additions & 30 deletions src/utils/rng.rs

This file was deleted.

11 changes: 9 additions & 2 deletions src/utils/tuple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ pub(crate) const fn permutations(mut num: u32) -> u32 {
// - https://rust-lang.github.io/rfcs/3086-macro-metavar-expr.html
macro_rules! gen_conditions {
// Base condition, setup the depth counter.
($LEN:expr, $i:expr, $r:expr, $this:expr, $cx:expr, $method:ident, $(($F_index: expr; $F:ident, { $($arms:pat => $foo:expr,)* }))*) => {
($i:expr, $this:expr, $cx:expr, $method:ident, $(($F_index: expr; $F:ident, { $($arms:pat => $foo:expr,)* }))*) => {
$(
if $i == ($r + ($F_index)).wrapping_rem($LEN) {
if $i == $F_index {
match unsafe { Pin::new_unchecked(&mut $this.$F) }.$method($cx) {
$($arms => $foo,)*
}
Expand All @@ -41,3 +41,10 @@ macro_rules! gen_conditions {
}
}
pub(crate) use gen_conditions;

/// Calculate the number of tuples currently being operated on.
macro_rules! tuple_len {
(@count_one $F:ident) => (1);
($($F:ident,)*) => (0 $(+ crate::utils::tuple_len!(@count_one $F))*);
}
pub(crate) use tuple_len;

0 comments on commit b115cc7

Please sign in to comment.