Skip to content

Commit

Permalink
m: Remove the thread-local executor optimization
Browse files Browse the repository at this point in the history
This was added in #37 as an optimization, but has since lead to many bugs. See
the issues #53, #57 and #60 for more information. I do not have the bandwidth
to address all of these bugs, so I'm taking the path of least resistance by
just removing the problematic code.

CLoses #53, #57 and #60

Signed-off-by: John Nunley <[email protected]>
  • Loading branch information
notgull authored Oct 17, 2023
1 parent 917caad commit 8a0832c
Showing 1 changed file with 15 additions and 152 deletions.
167 changes: 15 additions & 152 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
)]

use std::cell::RefCell;
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
Expand Down Expand Up @@ -236,56 +235,29 @@ impl<'a> Executor<'a> {
let runner = Runner::new(self.state());
let mut rng = fastrand::Rng::new();

// Set the local queue while we're running.
LocalQueue::set(self.state(), &runner.local, {
let runner = &runner;
async move {
// A future that runs tasks forever.
let run_forever = async {
loop {
for _ in 0..200 {
let runnable = runner.runnable(&mut rng).await;
runnable.run();
}
future::yield_now().await;
}
};

// Run `future` and `run_forever` concurrently until `future` completes.
future.or(run_forever).await
// A future that runs tasks forever.
let run_forever = async {
loop {
for _ in 0..200 {
let runnable = runner.runnable(&mut rng).await;
runnable.run();
}
future::yield_now().await;
}
})
.await
};

// Run `future` and `run_forever` concurrently until `future` completes.
future.or(run_forever).await
}

/// Returns a function that schedules a runnable task when it gets woken up.
fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
let state = self.state().clone();

// If possible, push into the current local queue and notify the ticker.
// TODO: If possible, push into the current local queue and notify the ticker.
move |runnable| {
let mut runnable = Some(runnable);

// Try to push into the local queue.
LocalQueue::with(|local_queue| {
// Make sure that we don't accidentally push to an executor that isn't ours.
if local_queue.state != &*state as *const State as usize {
return;
}

if let Err(e) = local_queue.queue.push(runnable.take().unwrap()) {
runnable = Some(e.into_inner());
return;
}

local_queue.waker.wake_by_ref();
});

// If the local queue push failed, just push to the global queue.
if let Some(runnable) = runnable {
state.queue.push(runnable).unwrap();
state.notify();
}
state.queue.push(runnable).unwrap();
state.notify();
}
}

Expand Down Expand Up @@ -853,106 +825,6 @@ impl Drop for Runner<'_> {
}
}

/// The state of the currently running local queue.
struct LocalQueue {
/// The pointer to the state of the executor.
///
/// Used to make sure we don't push runnables to the wrong executor.
state: usize,

/// The concurrent queue.
queue: Arc<ConcurrentQueue<Runnable>>,

/// The waker for the runnable.
waker: Waker,
}

impl LocalQueue {
/// Run a function with the current local queue.
fn with<R>(f: impl FnOnce(&LocalQueue) -> R) -> Option<R> {
std::thread_local! {
/// The current local queue.
static LOCAL_QUEUE: RefCell<Option<LocalQueue>> = RefCell::new(None);
}

impl LocalQueue {
/// Run a function with a set local queue.
async fn set<F>(
state: &State,
queue: &Arc<ConcurrentQueue<Runnable>>,
fut: F,
) -> F::Output
where
F: Future,
{
// Make the `LocalQueue` structure.
let make_local_queue = |waker: &Waker| LocalQueue {
state: state as *const State as usize,
queue: queue.clone(),
waker: waker.clone(),
};

// Store the local queue and the current waker.
let mut old = with_waker(|waker| {
LOCAL_QUEUE.with(move |slot| slot.borrow_mut().replace(make_local_queue(waker)))
})
.await;

// Restore the old local queue on drop.
let _guard = CallOnDrop(move || {
let old = old.take();
let _ = LOCAL_QUEUE.try_with(move |slot| {
*slot.borrow_mut() = old;
});
});

// Pin the future.
futures_lite::pin!(fut);

// Run it such that the waker is updated every time it's polled.
future::poll_fn(move |cx| {
LOCAL_QUEUE
.try_with({
let waker = cx.waker();
move |slot| {
let mut slot = slot.borrow_mut();
let qaw = match slot.as_mut() {
None => {
// Another local queue dropped itself and replaced with None,
// we can take its place!
*slot = Some(make_local_queue(waker));
return;
}
Some(qaw) => qaw,
};

// If we've been replaced, just ignore the slot.
if !Arc::ptr_eq(&qaw.queue, queue) {
return;
}

// Update the waker, if it has changed.
if !qaw.waker.will_wake(waker) {
qaw.waker = waker.clone();
}
}
})
.ok();

// Poll the future.
fut.as_mut().poll(cx)
})
.await
}
}

LOCAL_QUEUE
.try_with(|local_queue| local_queue.borrow().as_ref().map(f))
.ok()
.flatten()
}
}

/// Steals some items from one queue into another.
fn steal<T>(src: &ConcurrentQueue<T>, dest: &ConcurrentQueue<T>) {
// Half of `src`'s length rounded up.
Expand Down Expand Up @@ -1053,15 +925,6 @@ impl<F: FnMut()> Drop for CallOnDrop<F> {
}
}

/// Run a closure with the current waker.
fn with_waker<F: FnOnce(&Waker) -> R, R>(f: F) -> impl Future<Output = R> {
let mut f = Some(f);
future::poll_fn(move |cx| {
let f = f.take().unwrap();
Poll::Ready(f(cx.waker()))
})
}

fn _ensure_send_and_sync() {
use futures_lite::future::pending;

Expand Down

0 comments on commit 8a0832c

Please sign in to comment.