diff --git a/rmc/Cargo.toml b/rmc/Cargo.toml index 94d602bf..d20bb726 100644 --- a/rmc/Cargo.toml +++ b/rmc/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aleph-bft-rmc" -version = "0.9.0" +version = "0.9.1" edition = "2021" authors = ["Cardinal Cryptography"] categories = ["algorithms", "cryptography"] diff --git a/rmc/src/lib.rs b/rmc/src/lib.rs index eaa3efd1..c20e1a18 100644 --- a/rmc/src/lib.rs +++ b/rmc/src/lib.rs @@ -87,11 +87,9 @@ impl IndexedInstant { /// `initial_delay`, and each following delay for that task is two times longer than the previous /// one. pub struct DoublingDelayScheduler { - initial_delay: time::Duration, + initial_delay: Duration, scheduled_instants: BinaryHeap>, scheduled_tasks: Vec>, - on_new_task_tx: UnboundedSender, - on_new_task_rx: UnboundedReceiver, } impl Debug for DoublingDelayScheduler { @@ -105,14 +103,11 @@ impl Debug for DoublingDelayScheduler { } impl DoublingDelayScheduler { - pub fn new(initial_delay: time::Duration) -> Self { - let (on_new_task_tx, on_new_task_rx) = unbounded(); + pub fn new(initial_delay: Duration) -> Self { DoublingDelayScheduler { initial_delay, scheduled_instants: BinaryHeap::new(), scheduled_tasks: Vec::new(), - on_new_task_tx, - on_new_task_rx, } } } @@ -120,13 +115,15 @@ impl DoublingDelayScheduler { #[async_trait] impl TaskScheduler for DoublingDelayScheduler { fn add_task(&mut self, task: T) { - self.on_new_task_tx - .unbounded_send(task) - .expect("We own the the rx, so this can't fail"); + let i = self.scheduled_tasks.len(); + let indexed_instant = IndexedInstant::now(i); + self.scheduled_instants.push(Reverse(indexed_instant)); + let scheduled_task = ScheduledTask::new(task, self.initial_delay); + self.scheduled_tasks.push(scheduled_task); } async fn next_task(&mut self) -> Option { - let mut delay: futures::future::Fuse<_> = match self.scheduled_instants.peek() { + let delay: futures::future::Fuse<_> = match self.scheduled_instants.peek() { Some(&Reverse(IndexedInstant(instant, _))) => { let now = time::Instant::now(); if now > instant { @@ -137,21 +134,9 @@ impl TaskScheduler for DoublingDelayScheduler { } None => futures::future::Fuse::terminated(), }; - // wait until either the scheduled time of the peeked task or a next call of add_task - futures::select! { - _ = delay => {}, - task = self.on_new_task_rx.next() => { - if let Some(task) = task { - let i = self.scheduled_tasks.len(); - let indexed_instant = IndexedInstant::now(i); - self.scheduled_instants.push(Reverse(indexed_instant)); - let scheduled_task = ScheduledTask::new(task, self.initial_delay); - self.scheduled_tasks.push(scheduled_task); - } else { - return None; - } - } - } + + delay.await; + let Reverse(IndexedInstant(instant, i)) = self .scheduled_instants .pop()