Skip to content

Commit

Permalink
simplify DoublingDelayScheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
woocash2 committed Sep 4, 2023
1 parent f6384cf commit f59487d
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 27 deletions.
2 changes: 1 addition & 1 deletion rmc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aleph-bft-rmc"
version = "0.9.0"
version = "0.9.1"
edition = "2021"
authors = ["Cardinal Cryptography"]
categories = ["algorithms", "cryptography"]
Expand Down
37 changes: 11 additions & 26 deletions rmc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,9 @@ impl IndexedInstant {
/// `initial_delay`, and each following delay for that task is two times longer than the previous
/// one.
pub struct DoublingDelayScheduler<T> {
initial_delay: time::Duration,
initial_delay: Duration,
scheduled_instants: BinaryHeap<Reverse<IndexedInstant>>,
scheduled_tasks: Vec<ScheduledTask<T>>,
on_new_task_tx: UnboundedSender<T>,
on_new_task_rx: UnboundedReceiver<T>,
}

impl<T> Debug for DoublingDelayScheduler<T> {
Expand All @@ -105,28 +103,27 @@ impl<T> Debug for DoublingDelayScheduler<T> {
}

impl<T> DoublingDelayScheduler<T> {
pub fn new(initial_delay: time::Duration) -> Self {
let (on_new_task_tx, on_new_task_rx) = unbounded();
pub fn new(initial_delay: Duration) -> Self {
DoublingDelayScheduler {
initial_delay,
scheduled_instants: BinaryHeap::new(),
scheduled_tasks: Vec::new(),
on_new_task_tx,
on_new_task_rx,
}
}
}

#[async_trait]
impl<T: Send + Sync + Clone> TaskScheduler<T> for DoublingDelayScheduler<T> {
fn add_task(&mut self, task: T) {
self.on_new_task_tx
.unbounded_send(task)
.expect("We own the the rx, so this can't fail");
let i = self.scheduled_tasks.len();
let indexed_instant = IndexedInstant::now(i);
self.scheduled_instants.push(Reverse(indexed_instant));
let scheduled_task = ScheduledTask::new(task, self.initial_delay);
self.scheduled_tasks.push(scheduled_task);
}

async fn next_task(&mut self) -> Option<T> {
let mut delay: futures::future::Fuse<_> = match self.scheduled_instants.peek() {
let delay: futures::future::Fuse<_> = match self.scheduled_instants.peek() {
Some(&Reverse(IndexedInstant(instant, _))) => {
let now = time::Instant::now();
if now > instant {
Expand All @@ -137,21 +134,9 @@ impl<T: Send + Sync + Clone> TaskScheduler<T> for DoublingDelayScheduler<T> {
}
None => futures::future::Fuse::terminated(),
};
// wait until either the scheduled time of the peeked task or a next call of add_task
futures::select! {
_ = delay => {},
task = self.on_new_task_rx.next() => {
if let Some(task) = task {
let i = self.scheduled_tasks.len();
let indexed_instant = IndexedInstant::now(i);
self.scheduled_instants.push(Reverse(indexed_instant));
let scheduled_task = ScheduledTask::new(task, self.initial_delay);
self.scheduled_tasks.push(scheduled_task);
} else {
return None;
}
}
}

delay.await;

let Reverse(IndexedInstant(instant, i)) = self
.scheduled_instants
.pop()
Expand Down

0 comments on commit f59487d

Please sign in to comment.