Skip to content

Commit

Permalink
m: Replace unnecessary atomics with non-atomic operations
Browse files Browse the repository at this point in the history
  • Loading branch information
james7132 authored Feb 17, 2024
1 parent 0baba46 commit 7ffdf5b
Showing 1 changed file with 22 additions and 26 deletions.
48 changes: 22 additions & 26 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use std::future::Future;
use std::marker::PhantomData;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock, TryLockError};
use std::task::{Poll, Waker};

Expand Down Expand Up @@ -243,7 +243,7 @@ impl<'a> Executor<'a> {
/// assert_eq!(res, 6);
/// ```
pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
let runner = Runner::new(self.state());
let mut runner = Runner::new(self.state());
let mut rng = fastrand::Rng::new();

// A future that runs tasks forever.
Expand Down Expand Up @@ -639,29 +639,26 @@ struct Ticker<'a> {
/// 1) Woken.
/// 2a) Sleeping and unnotified.
/// 2b) Sleeping and notified.
sleeping: AtomicUsize,
sleeping: usize,
}

impl Ticker<'_> {
/// Creates a ticker.
fn new(state: &State) -> Ticker<'_> {
Ticker {
state,
sleeping: AtomicUsize::new(0),
}
Ticker { state, sleeping: 0 }
}

/// Moves the ticker into sleeping and unnotified state.
///
/// Returns `false` if the ticker was already sleeping and unnotified.
fn sleep(&self, waker: &Waker) -> bool {
fn sleep(&mut self, waker: &Waker) -> bool {
let mut sleepers = self.state.sleepers.lock().unwrap();

match self.sleeping.load(Ordering::SeqCst) {
match self.sleeping {
// Move to sleeping state.
0 => self
.sleeping
.store(sleepers.insert(waker), Ordering::SeqCst),
0 => {
self.sleeping = sleepers.insert(waker);
}

// Already sleeping, check if notified.
id => {
Expand All @@ -679,25 +676,25 @@ impl Ticker<'_> {
}

/// Moves the ticker into woken state.
fn wake(&self) {
let id = self.sleeping.swap(0, Ordering::SeqCst);
if id != 0 {
fn wake(&mut self) {
if self.sleeping != 0 {
let mut sleepers = self.state.sleepers.lock().unwrap();
sleepers.remove(id);
sleepers.remove(self.sleeping);

self.state
.notified
.swap(sleepers.is_notified(), Ordering::SeqCst);
}
self.sleeping = 0;
}

/// Waits for the next runnable task to run.
async fn runnable(&self) -> Runnable {
async fn runnable(&mut self) -> Runnable {
self.runnable_with(|| self.state.queue.pop().ok()).await
}

/// Waits for the next runnable task to run, given a function that searches for a task.
async fn runnable_with(&self, mut search: impl FnMut() -> Option<Runnable>) -> Runnable {
async fn runnable_with(&mut self, mut search: impl FnMut() -> Option<Runnable>) -> Runnable {
future::poll_fn(|cx| {
loop {
match search() {
Expand Down Expand Up @@ -728,10 +725,9 @@ impl Ticker<'_> {
impl Drop for Ticker<'_> {
fn drop(&mut self) {
// If this ticker is in sleeping state, it must be removed from the sleepers list.
let id = self.sleeping.swap(0, Ordering::SeqCst);
if id != 0 {
if self.sleeping != 0 {
let mut sleepers = self.state.sleepers.lock().unwrap();
let notified = sleepers.remove(id);
let notified = sleepers.remove(self.sleeping);

self.state
.notified
Expand Down Expand Up @@ -760,7 +756,7 @@ struct Runner<'a> {
local: Arc<ConcurrentQueue<Runnable>>,

/// Bumped every time a runnable task is found.
ticks: AtomicUsize,
ticks: usize,
}

impl Runner<'_> {
Expand All @@ -770,7 +766,7 @@ impl Runner<'_> {
state,
ticker: Ticker::new(state),
local: Arc::new(ConcurrentQueue::bounded(512)),
ticks: AtomicUsize::new(0),
ticks: 0,
};
state
.local_queues
Expand All @@ -781,7 +777,7 @@ impl Runner<'_> {
}

/// Waits for the next runnable task to run.
async fn runnable(&self, rng: &mut fastrand::Rng) -> Runnable {
async fn runnable(&mut self, rng: &mut fastrand::Rng) -> Runnable {
let runnable = self
.ticker
.runnable_with(|| {
Expand Down Expand Up @@ -824,9 +820,9 @@ impl Runner<'_> {
.await;

// Bump the tick counter.
let ticks = self.ticks.fetch_add(1, Ordering::SeqCst);
self.ticks += 1;

if ticks % 64 == 0 {
if self.ticks % 64 == 0 {
// Steal tasks from the global queue to ensure fair task scheduling.
steal(&self.state.queue, &self.local);
}
Expand Down

0 comments on commit 7ffdf5b

Please sign in to comment.