Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace rng with deterministic indexer #104

Merged
merged 2 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions src/future/race/array.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::utils::{self, RandomGenerator};
use crate::utils::{self, Indexer};

use super::Race as RaceTrait;

Expand All @@ -24,7 +24,7 @@ where
{
#[pin]
futures: [Fut; N],
rng: RandomGenerator,
indexer: Indexer,
done: bool,
}

Expand All @@ -48,9 +48,7 @@ where
let mut this = self.project();
assert!(!*this.done, "Futures must not be polled after completing");

let index = this.rng.generate(N as u32) as usize;

for index in (0..N).map(|pos| (index + pos).wrapping_rem(N)) {
for index in this.indexer.iter() {
let fut = utils::get_pin_mut(this.futures.as_mut(), index).unwrap();
match fut.poll(cx) {
Poll::Ready(item) => {
Expand All @@ -74,7 +72,7 @@ where
fn race(self) -> Self::Future {
Race {
futures: self.map(|fut| fut.into_future()),
rng: RandomGenerator::new(),
indexer: Indexer::new(N),
done: false,
}
}
Expand Down
14 changes: 5 additions & 9 deletions src/future/race/tuple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ macro_rules! impl_race_tuple {
$F: Future<Output = T>,
)* {
done: bool,
rng: utils::RandomGenerator,
indexer: utils::Indexer,
$(#[pin] $F: $F,)*
}

Expand All @@ -52,7 +52,7 @@ macro_rules! impl_race_tuple {
let ($($F,)*): ($($F,)*) = self;
$StructName {
done: false,
rng: utils::RandomGenerator::new(),
indexer: utils::Indexer::new(utils::tuple_len!($($F,)*)),
$($F: $F.into_future()),*
}
}
Expand All @@ -70,17 +70,13 @@ macro_rules! impl_race_tuple {
let mut this = self.project();
assert!(!*this.done, "Futures must not be polled after completing");

#[repr(u32)]
#[repr(usize)]
enum Indexes {
$($F),*
}

const LEN: u32 = [$(Indexes::$F),*].len() as u32;
const PERMUTATIONS: u32 = utils::permutations(LEN);
let r = this.rng.generate(PERMUTATIONS);

for i in 0..LEN {
utils::gen_conditions!(LEN, i, r, this, cx, poll, $((Indexes::$F as u32; $F, {
for i in this.indexer.iter() {
utils::gen_conditions!(i, this, cx, poll, $((Indexes::$F as usize; $F, {
Poll::Ready(output) => {
*this.done = true;
return Poll::Ready(output);
Expand Down
11 changes: 4 additions & 7 deletions src/future/race/vec.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::utils::{self, RandomGenerator};
use crate::utils::{self, Indexer};

use super::Race as RaceTrait;

Expand All @@ -24,7 +24,7 @@ where
{
#[pin]
futures: Vec<Fut>,
rng: RandomGenerator,
indexer: Indexer,
done: bool,
}

Expand All @@ -48,10 +48,7 @@ where
let mut this = self.project();
assert!(!*this.done, "Futures must not be polled after completing");

let len = this.futures.len();
let index = this.rng.generate(len as u32) as usize;

for index in (0..len).map(|pos| (index + pos).wrapping_rem(len)) {
for index in this.indexer.iter() {
let fut = utils::get_pin_mut_from_vec(this.futures.as_mut(), index).unwrap();
match fut.poll(cx) {
Poll::Ready(item) => {
Expand All @@ -74,8 +71,8 @@ where

fn race(self) -> Self::Future {
Race {
indexer: Indexer::new(self.len()),
futures: self.into_iter().map(|fut| fut.into_future()).collect(),
rng: RandomGenerator::new(),
done: false,
}
}
Expand Down
10 changes: 4 additions & 6 deletions src/stream/merge/array.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::Merge as MergeTrait;
use crate::stream::IntoStream;
use crate::utils::{self, PollArray, RandomGenerator, WakerArray};
use crate::utils::{self, Indexer, PollArray, WakerArray};

use core::fmt;
use futures_core::Stream;
Expand All @@ -21,7 +21,7 @@ where
{
#[pin]
streams: [S; N],
rng: RandomGenerator,
indexer: Indexer,
wakers: WakerArray<N>,
state: PollArray<N>,
complete: usize,
Expand All @@ -35,7 +35,7 @@ where
pub(crate) fn new(streams: [S; N]) -> Self {
Self {
streams,
rng: RandomGenerator::new(),
indexer: Indexer::new(N),
wakers: WakerArray::new(),
state: PollArray::new(),
complete: 0,
Expand Down Expand Up @@ -68,9 +68,7 @@ where
// Iterate over our streams one-by-one. If a stream yields a value,
// we exit early. By default we'll return `Poll::Ready(None)`, but
// this changes if we encounter a `Poll::Pending`.
let len = this.streams.len();
let r = this.rng.generate(len as u32) as usize;
for index in (0..len).map(|n| (r + n).wrapping_rem(len)) {
for index in this.indexer.iter() {
if !readiness.any_ready() {
// Nothing is ready yet
return Poll::Pending;
Expand Down
7 changes: 3 additions & 4 deletions src/stream/merge/tuple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ macro_rules! impl_merge_tuple {
$F: Stream<Item = T>,
)* {
#[pin] streams: $mod_name::Streams<$($F,)+>,
rng: utils::RandomGenerator,
indexer: utils::Indexer,
wakers: WakerArray<{$mod_name::LEN}>,
state: PollArray<{$mod_name::LEN}>,
completed: u8,
Expand Down Expand Up @@ -123,14 +123,13 @@ macro_rules! impl_merge_tuple {
readiness.set_waker(cx.waker());

const LEN: u8 = $mod_name::LEN as u8;
let r = this.rng.generate(LEN as u32) as u8;

let mut streams = this.streams.project();

// Iterate over our streams one-by-one. If a stream yields a value,
// we exit early. By default we'll return `Poll::Ready(None)`, but
// this changes if we encounter a `Poll::Pending`.
for index in (0..LEN).map(|n| (r + n).wrapping_rem(LEN) as usize) {
for index in this.indexer.iter() {
if !readiness.any_ready() {
// Nothing is ready yet
return Poll::Pending;
Expand Down Expand Up @@ -175,7 +174,7 @@ macro_rules! impl_merge_tuple {
let ($($F,)*): ($($F,)*) = self;
$StructName {
streams: $mod_name::Streams { $($F: $F.into_stream()),+ },
rng: utils::RandomGenerator::new(),
indexer: utils::Indexer::new(utils::tuple_len!($($F,)*)),
wakers: WakerArray::new(),
state: PollArray::new(),
completed: 0,
Expand Down
10 changes: 4 additions & 6 deletions src/stream/merge/vec.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::Merge as MergeTrait;
use crate::stream::IntoStream;
use crate::utils::{self, PollVec, RandomGenerator, WakerVec};
use crate::utils::{self, Indexer, PollVec, WakerVec};

use core::fmt;
use futures_core::Stream;
Expand All @@ -21,7 +21,7 @@ where
{
#[pin]
streams: Vec<S>,
rng: RandomGenerator,
indexer: Indexer,
complete: usize,
wakers: WakerVec,
state: PollVec,
Expand All @@ -37,8 +37,8 @@ where
Self {
wakers: WakerVec::new(len),
state: PollVec::new(len),
indexer: Indexer::new(len),
streams,
rng: RandomGenerator::new(),
complete: 0,
done: false,
}
Expand Down Expand Up @@ -69,9 +69,7 @@ where
// Iterate over our streams one-by-one. If a stream yields a value,
// we exit early. By default we'll return `Poll::Ready(None)`, but
// this changes if we encounter a `Poll::Pending`.
let len = this.streams.len();
let r = this.rng.generate(len as u32) as usize;
for index in (0..len).map(|n| (r + n).wrapping_rem(len)) {
for index in this.indexer.iter() {
if !readiness.any_ready() {
// Nothing is ready yet
return Poll::Pending;
Expand Down
44 changes: 44 additions & 0 deletions src/utils/indexer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use core::ops;

/// Generate an iteration sequence. This provides *fair* iteration when multiple
/// futures need to be polled concurrently.
pub(crate) struct Indexer {
offset: usize,
max: usize,
}

impl Indexer {
pub(crate) fn new(max: usize) -> Self {
Self { offset: 0, max }
}

/// Generate a range between `0..max`, incrementing the starting point
/// for the next iteration.
pub(crate) fn iter(&mut self) -> IndexIter {
// Increment the starting point for next time.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to try to save where we ended up last time and pick up from there?

let offset = self.offset;
self.offset = self.offset.wrapping_rem(self.max);
yoshuawuyts marked this conversation as resolved.
Show resolved Hide resolved

IndexIter {
iter: (0..self.max),
max: self.max,
offset,
}
}
}

pub(crate) struct IndexIter {
iter: ops::Range<usize>,
offset: usize,
max: usize,
}

impl Iterator for IndexIter {
type Item = usize;

fn next(&mut self) -> Option<Self::Item> {
self.iter
.next()
.map(|pos| (pos + self.offset).wrapping_rem(self.max))
}
}
yoshuawuyts marked this conversation as resolved.
Show resolved Hide resolved
6 changes: 3 additions & 3 deletions src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
//! Utilities to implement the different futures of this crate.

mod array;
mod indexer;
mod pin;
mod poll_state;
mod rng;
mod tuple;
mod wakers;

pub(crate) use array::array_assume_init;
pub(crate) use indexer::Indexer;
pub(crate) use pin::{get_pin_mut, get_pin_mut_from_vec, iter_pin_mut, iter_pin_mut_vec};
pub(crate) use poll_state::MaybeDone;
pub(crate) use poll_state::{PollArray, PollState, PollVec};
pub(crate) use rng::RandomGenerator;
pub(crate) use tuple::{gen_conditions, permutations};
pub(crate) use tuple::{gen_conditions, tuple_len};
pub(crate) use wakers::{WakerArray, WakerVec};

#[cfg(test)]
Expand Down
30 changes: 0 additions & 30 deletions src/utils/rng.rs

This file was deleted.

24 changes: 9 additions & 15 deletions src/utils/tuple.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,3 @@
/// Compute the number of permutations for a number
/// during compilation.
pub(crate) const fn permutations(mut num: u32) -> u32 {
let mut total = 1;
loop {
total *= num;
num -= 1;
if num == 0 {
break total;
}
}
}

/// Generate the `match` conditions inside the main polling body. This macro
/// chooses a random starting point on each call to the given method, making
/// it "fair".
Expand All @@ -30,9 +17,9 @@ pub(crate) const fn permutations(mut num: u32) -> u32 {
// - https://rust-lang.github.io/rfcs/3086-macro-metavar-expr.html
macro_rules! gen_conditions {
// Base condition, setup the depth counter.
($LEN:expr, $i:expr, $r:expr, $this:expr, $cx:expr, $method:ident, $(($F_index: expr; $F:ident, { $($arms:pat => $foo:expr,)* }))*) => {
($i:expr, $this:expr, $cx:expr, $method:ident, $(($F_index: expr; $F:ident, { $($arms:pat => $foo:expr,)* }))*) => {
$(
if $i == ($r + $F_index).wrapping_rem($LEN) {
if $i == $F_index {
match unsafe { Pin::new_unchecked(&mut $this.$F) }.$method($cx) {
$($arms => $foo,)*
}
Expand All @@ -41,3 +28,10 @@ macro_rules! gen_conditions {
}
}
pub(crate) use gen_conditions;

/// Calculate the number of tuples currently being operated on.
macro_rules! tuple_len {
(@count_one $F:ident) => (1);
($($F:ident,)*) => (0 $(+ crate::utils::tuple_len!(@count_one $F))*);
}
pub(crate) use tuple_len;