From 924b4530a72209b417fae5fb53e32d63a9824fcb Mon Sep 17 00:00:00 2001 From: James Liu Date: Sun, 12 May 2024 16:22:32 -0700 Subject: [PATCH] feat: Implement static executors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Resolves #111. Creates a `StaticExecutor` type under a feature flag and allows constructing it from an `Executor` via `Executor::leak`. Unlike the executor it came from, it's a wrapper around a `State` and omits all changes to `active`. Note, unlike the API proposed in #111, this PR also includes a unsafe `StaticExecutor::spawn_scoped` for spawning non-'static tasks, where the caller is responsible for ensuring that the task doesn't outlive the borrowed state. This would be required for Bevy to migrate to this type, where we're currently using lifetime transmutation on `Executor` to enable `Thread::scope`-like APIs for working with borrowed state. `StaticExecutor` does not have an external lifetime parameter so this approach is infeasible without such an API. The performance gains while using the type are substantial: ``` single_thread/executor::spawn_one time: [1.6157 µs 1.6238 µs 1.6362 µs] Found 6 outliers among 100 measurements (6.00%) 3 (3.00%) high mild 3 (3.00%) high severe single_thread/executor::spawn_batch time: [28.169 µs 29.650 µs 32.196 µs] Found 19 outliers among 100 measurements (19.00%) 10 (10.00%) low severe 3 (3.00%) low mild 3 (3.00%) high mild 3 (3.00%) high severe single_thread/executor::spawn_many_local time: [6.1952 ms 6.2230 ms 6.2578 ms] Found 4 outliers among 100 measurements (4.00%) 1 (1.00%) high mild 3 (3.00%) high severe single_thread/executor::spawn_recursively time: [50.202 ms 50.479 ms 50.774 ms] Found 6 outliers among 100 measurements (6.00%) 5 (5.00%) high mild 1 (1.00%) high severe single_thread/executor::yield_now time: [5.8795 ms 5.8883 ms 5.8977 ms] Found 3 outliers among 100 measurements (3.00%) 3 (3.00%) high mild multi_thread/executor::spawn_one time: [1.2565 µs 1.2979 µs 1.3470 µs] Found 8 outliers among 100 measurements (8.00%) 7 (7.00%) high mild 1 (1.00%) high severe multi_thread/executor::spawn_batch time: [38.009 µs 43.693 µs 52.882 µs] Found 22 outliers among 100 measurements (22.00%) 21 (21.00%) high mild 1 (1.00%) high severe Benchmarking multi_thread/executor::spawn_many_local: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 386.6s, or reduce sample count to 10. multi_thread/executor::spawn_many_local time: [27.492 ms 27.652 ms 27.814 ms] Found 4 outliers among 100 measurements (4.00%) 1 (1.00%) low mild 3 (3.00%) high mild Benchmarking multi_thread/executor::spawn_recursively: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 16.6s, or reduce sample count to 30. multi_thread/executor::spawn_recursively time: [165.82 ms 166.04 ms 166.26 ms] Found 1 outliers among 100 measurements (1.00%) 1 (1.00%) high mild multi_thread/executor::yield_now time: [22.469 ms 22.649 ms 22.798 ms] Found 8 outliers among 100 measurements (8.00%) 5 (5.00%) low severe 3 (3.00%) low mild single_thread/leaked_executor::spawn_one time: [1.4717 µs 1.4778 µs 1.4832 µs] Found 9 outliers among 100 measurements (9.00%) 3 (3.00%) low severe 2 (2.00%) low mild 3 (3.00%) high mild 1 (1.00%) high severe single_thread/leaked_executor::spawn_many_local time: [4.2622 ms 4.3065 ms 4.3489 ms] Found 2 outliers among 100 measurements (2.00%) 2 (2.00%) low mild single_thread/leaked_executor::spawn_recursively time: [26.566 ms 26.899 ms 27.228 ms] single_thread/leaked_executor::yield_now time: [5.7200 ms 5.7270 ms 5.7342 ms] Found 1 outliers among 100 measurements (1.00%) 1 (1.00%) high mild multi_thread/leaked_executor::spawn_one time: [1.3755 µs 1.4321 µs 1.4892 µs] Found 1 outliers among 100 measurements (1.00%) 1 (1.00%) high mild multi_thread/leaked_executor::spawn_many_local time: [4.1838 ms 4.2394 ms 4.2989 ms] Found 7 outliers among 100 measurements (7.00%) 7 (7.00%) high mild multi_thread/leaked_executor::spawn_recursively time: [43.074 ms 43.159 ms 43.241 ms] Found 1 outliers among 100 measurements (1.00%) 1 (1.00%) low mild multi_thread/leaked_executor::yield_now time: [23.210 ms 23.257 ms 23.302 ms] Found 1 outliers among 100 measurements (1.00%) 1 (1.00%) low mild ``` --- .github/workflows/ci.yml | 5 + Cargo.toml | 9 +- benches/executor.rs | 619 +++++++++++++++++++++++++++------------ src/lib.rs | 86 ++++-- src/static_executors.rs | 479 ++++++++++++++++++++++++++++++ 5 files changed, 972 insertions(+), 226 deletions(-) create mode 100644 src/static_executors.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index be05945..db2bd1a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -45,6 +45,7 @@ jobs: if: startsWith(matrix.rust, 'nightly') run: cargo check -Z features=dev_dep - run: cargo test + - run: cargo test --all-features - run: cargo check --all --all-features --target wasm32-unknown-unknown - run: cargo hack build --all --all-features --target wasm32-unknown-unknown --no-dev-deps @@ -82,6 +83,10 @@ jobs: env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout + - run: cargo miri test --all-features + env: + MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation -Zmiri-ignore-leaks + RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout security_audit: permissions: diff --git a/Cargo.toml b/Cargo.toml index 15d0cf5..16d33bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ name = "async-executor" version = "1.11.0" authors = ["Stjepan Glavina ", "John Nunley "] edition = "2021" -rust-version = "1.60" +rust-version = "1.63" description = "Async executor" license = "Apache-2.0 OR MIT" repository = "https://github.com/smol-rs/async-executor" @@ -14,9 +14,13 @@ keywords = ["asynchronous", "executor", "single", "multi", "spawn"] categories = ["asynchronous", "concurrency"] exclude = ["/.*"] +[features] +# Adds support for executors optimized for use in static variables. +static = [] + [dependencies] async-task = "4.4.0" -concurrent-queue = "2.0.0" +concurrent-queue = "2.5.0" fastrand = "2.0.0" futures-lite = { version = "2.0.0", default-features = false } slab = "0.4.4" @@ -37,3 +41,4 @@ once_cell = "1.16.0" [[bench]] name = "executor" harness = false +required-features = ["static"] diff --git a/benches/executor.rs b/benches/executor.rs index f624513..74b2955 100644 --- a/benches/executor.rs +++ b/benches/executor.rs @@ -1,7 +1,7 @@ use std::mem; use std::thread::available_parallelism; -use async_executor::Executor; +use async_executor::{Executor, StaticExecutor}; use criterion::{criterion_group, criterion_main, Criterion}; use futures_lite::{future, prelude::*}; @@ -10,6 +10,7 @@ const STEPS: usize = 300; const LIGHT_TASKS: usize = 25_000; static EX: Executor<'_> = Executor::new(); +static STATIC_EX: StaticExecutor = StaticExecutor::new(); fn run(f: impl FnOnce(), multithread: bool) { let limit = if multithread { @@ -27,6 +28,22 @@ fn run(f: impl FnOnce(), multithread: bool) { }); } +fn run_static(f: impl FnOnce(), multithread: bool) { + let limit = if multithread { + available_parallelism().unwrap().get() + } else { + 1 + }; + + let (s, r) = async_channel::bounded::<()>(1); + easy_parallel::Parallel::new() + .each(0..limit, |_| future::block_on(STATIC_EX.run(r.recv()))) + .finish(move || { + let _s = s; + f() + }); +} + fn create(c: &mut Criterion) { c.bench_function("executor::create", |b| { b.iter(|| { @@ -38,224 +55,442 @@ fn create(c: &mut Criterion) { } fn running_benches(c: &mut Criterion) { - for (group_name, multithread) in [("single_thread", false), ("multi_thread", true)].iter() { - let mut group = c.benchmark_group(group_name.to_string()); - - group.bench_function("executor::spawn_one", |b| { - run( - || { - b.iter(|| { - future::block_on(async { EX.spawn(async {}).await }); - }); - }, - *multithread, - ); - }); - - group.bench_function("executor::spawn_batch", |b| { - run( - || { - let mut handles = vec![]; + for (prefix, with_static) in [("executor", false), ("static_executor", true)] { + for (group_name, multithread) in [("single_thread", false), ("multi_thread", true)].iter() { + let mut group = c.benchmark_group(group_name.to_string()); + + group.bench_function(format!("{}::spawn_one", prefix), |b| { + if with_static { + run_static( + || { + b.iter(|| { + future::block_on(async { STATIC_EX.spawn(async {}).await }); + }); + }, + *multithread, + ); + } else { + run( + || { + b.iter(|| { + future::block_on(async { EX.spawn(async {}).await }); + }); + }, + *multithread, + ); + } + }); - b.iter(|| { - EX.spawn_many((0..250).map(|_| future::yield_now()), &mut handles); - }); + if !with_static { + group.bench_function("executor::spawn_batch", |b| { + run( + || { + let mut handles = vec![]; - handles.clear(); - }, - *multithread, - ) - }); + b.iter(|| { + EX.spawn_many((0..250).map(|_| future::yield_now()), &mut handles); + }); - group.bench_function("executor::spawn_many_local", |b| { - run( - || { - b.iter(move || { - future::block_on(async { - let mut tasks = Vec::new(); - for _ in 0..LIGHT_TASKS { - tasks.push(EX.spawn(async {})); - } - for task in tasks { - task.await; - } - }); - }); - }, - *multithread, - ); - }); + handles.clear(); + }, + *multithread, + ) + }); + } - group.bench_function("executor::spawn_recursively", |b| { - #[allow(clippy::manual_async_fn)] - fn go(i: usize) -> impl Future + Send + 'static { - async move { - if i != 0 { - EX.spawn(async move { - let fut = go(i - 1).boxed(); - fut.await; - }) - .await; + group.bench_function(format!("{}::spawn_many_local", prefix), |b| { + if with_static { + run_static( + || { + b.iter(move || { + future::block_on(async { + let mut tasks = Vec::new(); + for _ in 0..LIGHT_TASKS { + tasks.push(STATIC_EX.spawn(async {})); + } + for task in tasks { + task.await; + } + }); + }); + }, + *multithread, + ); + } else { + run( + || { + b.iter(move || { + future::block_on(async { + let mut tasks = Vec::new(); + for _ in 0..LIGHT_TASKS { + tasks.push(EX.spawn(async {})); + } + for task in tasks { + task.await; + } + }); + }); + }, + *multithread, + ); + } + }); + + group.bench_function(format!("{}::spawn_recursively", prefix), |b| { + #[allow(clippy::manual_async_fn)] + fn go(i: usize) -> impl Future + Send + 'static { + async move { + if i != 0 { + EX.spawn(async move { + let fut = go(i - 1).boxed(); + fut.await; + }) + .await; + } } } - } - run( - || { - b.iter(move || { - future::block_on(async { - let mut tasks = Vec::new(); - for _ in 0..TASKS { - tasks.push(EX.spawn(go(STEPS))); - } - for task in tasks { - task.await; - } - }); - }); - }, - *multithread, - ); - }); + #[allow(clippy::manual_async_fn)] + fn go_static(i: usize) -> impl Future + Send + 'static { + async move { + if i != 0 { + STATIC_EX + .spawn(async move { + let fut = go_static(i - 1).boxed(); + fut.await; + }) + .await; + } + } + } - group.bench_function("executor::yield_now", |b| { - run( - || { - b.iter(move || { - future::block_on(async { - let mut tasks = Vec::new(); - for _ in 0..TASKS { - tasks.push(EX.spawn(async move { - for _ in 0..STEPS { - future::yield_now().await; + if with_static { + run_static( + || { + b.iter(move || { + future::block_on(async { + let mut tasks = Vec::new(); + for _ in 0..TASKS { + tasks.push(STATIC_EX.spawn(go_static(STEPS))); + } + for task in tasks { + task.await; + } + }); + }); + }, + *multithread, + ); + } else { + run( + || { + b.iter(move || { + future::block_on(async { + let mut tasks = Vec::new(); + for _ in 0..TASKS { + tasks.push(EX.spawn(go(STEPS))); + } + for task in tasks { + task.await; + } + }); + }); + }, + *multithread, + ); + } + }); + + group.bench_function(format!("{}::yield_now", prefix), |b| { + if with_static { + run_static( + || { + b.iter(move || { + future::block_on(async { + let mut tasks = Vec::new(); + for _ in 0..TASKS { + tasks.push(STATIC_EX.spawn(async move { + for _ in 0..STEPS { + future::yield_now().await; + } + })); + } + for task in tasks { + task.await; + } + }); + }); + }, + *multithread, + ); + } else { + run( + || { + b.iter(move || { + future::block_on(async { + let mut tasks = Vec::new(); + for _ in 0..TASKS { + tasks.push(EX.spawn(async move { + for _ in 0..STEPS { + future::yield_now().await; + } + })); + } + for task in tasks { + task.await; + } + }); + }); + }, + *multithread, + ); + } + }); + + group.bench_function(format!("{}::channels", prefix), |b| { + if with_static { + run_static( + || { + b.iter(move || { + future::block_on(async { + // Create channels. + let mut tasks = Vec::new(); + let (first_send, first_recv) = async_channel::bounded(1); + let mut current_recv = first_recv; + + for _ in 0..TASKS { + let (next_send, next_recv) = async_channel::bounded(1); + let current_recv = + mem::replace(&mut current_recv, next_recv); + + tasks.push(STATIC_EX.spawn(async move { + // Send a notification on to the next task. + for _ in 0..STEPS { + current_recv.recv().await.unwrap(); + next_send.send(()).await.unwrap(); + } + })); } - })); - } - for task in tasks { - task.await; - } - }); - }); - }, - *multithread, - ); - }); - group.bench_function("executor::channels", |b| { - run( - || { - b.iter(move || { - future::block_on(async { - // Create channels. - let mut tasks = Vec::new(); - let (first_send, first_recv) = async_channel::bounded(1); - let mut current_recv = first_recv; - - for _ in 0..TASKS { - let (next_send, next_recv) = async_channel::bounded(1); - let current_recv = mem::replace(&mut current_recv, next_recv); - - tasks.push(EX.spawn(async move { - // Send a notification on to the next task. for _ in 0..STEPS { + first_send.send(()).await.unwrap(); current_recv.recv().await.unwrap(); - next_send.send(()).await.unwrap(); } - })); - } - - for _ in 0..STEPS { - first_send.send(()).await.unwrap(); - current_recv.recv().await.unwrap(); - } - - for task in tasks { - task.await; - } - }); - }); - }, - *multithread, - ) - }); - group.bench_function("executor::web_server", |b| { - run( - || { - b.iter(move || { - future::block_on(async { - let (db_send, db_recv) = - async_channel::bounded::>(TASKS / 5); - let mut db_rng = fastrand::Rng::with_seed(0x12345678); - let mut web_rng = db_rng.fork(); - - // This task simulates a database. - let db_task = EX.spawn(async move { - loop { - // Wait for a new task. - let incoming = match db_recv.recv().await { - Ok(incoming) => incoming, - Err(_) => break, - }; - - // Process the task. Maybe it takes a while. - for _ in 0..db_rng.usize(..10) { - future::yield_now().await; + for task in tasks { + task.await; } - - // Send the data back. - incoming.send(db_rng.usize(..)).await.ok(); - } + }); }); + }, + *multithread, + ) + } else { + run( + || { + b.iter(move || { + future::block_on(async { + // Create channels. + let mut tasks = Vec::new(); + let (first_send, first_recv) = async_channel::bounded(1); + let mut current_recv = first_recv; + + for _ in 0..TASKS { + let (next_send, next_recv) = async_channel::bounded(1); + let current_recv = + mem::replace(&mut current_recv, next_recv); + + tasks.push(EX.spawn(async move { + // Send a notification on to the next task. + for _ in 0..STEPS { + current_recv.recv().await.unwrap(); + next_send.send(()).await.unwrap(); + } + })); + } - // This task simulates a web server waiting for new tasks. - let server_task = EX.spawn(async move { - for i in 0..TASKS { - // Get a new connection. - if web_rng.usize(..=16) == 16 { - future::yield_now().await; + for _ in 0..STEPS { + first_send.send(()).await.unwrap(); + current_recv.recv().await.unwrap(); } - let mut web_rng = web_rng.fork(); - let db_send = db_send.clone(); - let task = EX.spawn(async move { - // Check if the data is cached... - if web_rng.bool() { - // ...it's in cache! - future::yield_now().await; - return; + for task in tasks { + task.await; + } + }); + }); + }, + *multithread, + ) + } + }); + + group.bench_function(format!("{}::web_server", prefix), |b| { + if with_static { + run_static( + || { + b.iter(move || { + future::block_on(async { + let (db_send, db_recv) = + async_channel::bounded::>( + TASKS / 5, + ); + let mut db_rng = fastrand::Rng::with_seed(0x12345678); + let mut web_rng = db_rng.fork(); + + // This task simulates a database. + let db_task = STATIC_EX.spawn(async move { + loop { + // Wait for a new task. + let incoming = match db_recv.recv().await { + Ok(incoming) => incoming, + Err(_) => break, + }; + + // Process the task. Maybe it takes a while. + for _ in 0..db_rng.usize(..10) { + future::yield_now().await; + } + + // Send the data back. + incoming.send(db_rng.usize(..)).await.ok(); } + }); - // Otherwise we have to make a DB call or two. - for _ in 0..web_rng.usize(STEPS / 2..STEPS) { - let (resp_send, resp_recv) = async_channel::bounded(1); - db_send.send(resp_send).await.unwrap(); - criterion::black_box(resp_recv.recv().await.unwrap()); + // This task simulates a web server waiting for new tasks. + let server_task = STATIC_EX.spawn(async move { + for i in 0..TASKS { + // Get a new connection. + if web_rng.usize(..=16) == 16 { + future::yield_now().await; + } + + let mut web_rng = web_rng.fork(); + let db_send = db_send.clone(); + let task = STATIC_EX.spawn(async move { + // Check if the data is cached... + if web_rng.bool() { + // ...it's in cache! + future::yield_now().await; + return; + } + + // Otherwise we have to make a DB call or two. + for _ in 0..web_rng.usize(STEPS / 2..STEPS) { + let (resp_send, resp_recv) = + async_channel::bounded(1); + db_send.send(resp_send).await.unwrap(); + criterion::black_box( + resp_recv.recv().await.unwrap(), + ); + } + + // Send the data back... + for _ in 0..web_rng.usize(3..16) { + future::yield_now().await; + } + }); + + task.detach(); + + if i & 16 == 0 { + future::yield_now().await; + } } + }); - // Send the data back... - for _ in 0..web_rng.usize(3..16) { - future::yield_now().await; + // Spawn and wait for it to stop. + server_task.await; + db_task.await; + }); + }) + }, + *multithread, + ) + } else { + run( + || { + b.iter(move || { + future::block_on(async { + let (db_send, db_recv) = + async_channel::bounded::>( + TASKS / 5, + ); + let mut db_rng = fastrand::Rng::with_seed(0x12345678); + let mut web_rng = db_rng.fork(); + + // This task simulates a database. + let db_task = EX.spawn(async move { + loop { + // Wait for a new task. + let incoming = match db_recv.recv().await { + Ok(incoming) => incoming, + Err(_) => break, + }; + + // Process the task. Maybe it takes a while. + for _ in 0..db_rng.usize(..10) { + future::yield_now().await; + } + + // Send the data back. + incoming.send(db_rng.usize(..)).await.ok(); } }); - task.detach(); - - if i & 16 == 0 { - future::yield_now().await; - } - } - }); + // This task simulates a web server waiting for new tasks. + let server_task = EX.spawn(async move { + for i in 0..TASKS { + // Get a new connection. + if web_rng.usize(..=16) == 16 { + future::yield_now().await; + } + + let mut web_rng = web_rng.fork(); + let db_send = db_send.clone(); + let task = EX.spawn(async move { + // Check if the data is cached... + if web_rng.bool() { + // ...it's in cache! + future::yield_now().await; + return; + } + + // Otherwise we have to make a DB call or two. + for _ in 0..web_rng.usize(STEPS / 2..STEPS) { + let (resp_send, resp_recv) = + async_channel::bounded(1); + db_send.send(resp_send).await.unwrap(); + criterion::black_box( + resp_recv.recv().await.unwrap(), + ); + } + + // Send the data back... + for _ in 0..web_rng.usize(3..16) { + future::yield_now().await; + } + }); + + task.detach(); + + if i & 16 == 0 { + future::yield_now().await; + } + } + }); - // Spawn and wait for it to stop. - server_task.await; - db_task.await; - }); - }) - }, - *multithread, - ) - }); + // Spawn and wait for it to stop. + server_task.await; + db_task.await; + }); + }) + }, + *multithread, + ) + } + }); + } } } diff --git a/src/lib.rs b/src/lib.rs index a663be8..7c5d49d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,6 +37,7 @@ #![doc( html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" )] +#![cfg_attr(docsrs, feature(doc_auto_cfg))] use std::fmt; use std::marker::PhantomData; @@ -51,8 +52,13 @@ use concurrent_queue::ConcurrentQueue; use futures_lite::{future, prelude::*}; use slab::Slab; +#[cfg(feature = "static")] +mod static_executors; + #[doc(no_inline)] pub use async_task::{FallibleTask, Task}; +#[cfg(feature = "static")] +pub use static_executors::*; /// An async executor. /// @@ -292,18 +298,7 @@ impl<'a> Executor<'a> { /// assert!(ex.try_tick()); // a task was found /// ``` pub fn try_tick(&self) -> bool { - match self.state().queue.pop() { - Err(_) => false, - Ok(runnable) => { - // Notify another ticker now to pick up where this ticker left off, just in case - // running the task takes a long time. - self.state().notify(); - - // Run the task. - runnable.run(); - true - } - } + self.state().try_tick() } /// Runs a single task. @@ -326,9 +321,7 @@ impl<'a> Executor<'a> { /// future::block_on(ex.tick()); // runs the task /// ``` pub async fn tick(&self) { - let state = self.state(); - let runnable = Ticker::new(state).runnable().await; - runnable.run(); + self.state().tick().await; } /// Runs the executor until the given future completes. @@ -347,22 +340,7 @@ impl<'a> Executor<'a> { /// assert_eq!(res, 6); /// ``` pub async fn run(&self, future: impl Future) -> T { - let mut runner = Runner::new(self.state()); - let mut rng = fastrand::Rng::new(); - - // A future that runs tasks forever. - let run_forever = async { - loop { - for _ in 0..200 { - let runnable = runner.runnable(&mut rng).await; - runnable.run(); - } - future::yield_now().await; - } - }; - - // Run `future` and `run_forever` concurrently until `future` completes. - future.or(run_forever).await + self.state().run(future).await } /// Returns a function that schedules a runnable task when it gets woken up. @@ -701,7 +679,7 @@ struct State { impl State { /// Creates state for a new executor. - fn new() -> State { + const fn new() -> State { State { queue: ConcurrentQueue::unbounded(), local_queues: RwLock::new(Vec::new()), @@ -729,6 +707,45 @@ impl State { } } } + + pub(crate) fn try_tick(&self) -> bool { + match self.queue.pop() { + Err(_) => false, + Ok(runnable) => { + // Notify another ticker now to pick up where this ticker left off, just in case + // running the task takes a long time. + self.notify(); + + // Run the task. + runnable.run(); + true + } + } + } + + pub(crate) async fn tick(&self) { + let runnable = Ticker::new(self).runnable().await; + runnable.run(); + } + + pub async fn run(&self, future: impl Future) -> T { + let mut runner = Runner::new(self); + let mut rng = fastrand::Rng::new(); + + // A future that runs tasks forever. + let run_forever = async { + loop { + for _ in 0..200 { + let runnable = runner.runnable(&mut rng).await; + runnable.run(); + } + future::yield_now().await; + } + }; + + // Run `future` and `run_forever` concurrently until `future` completes. + future.or(run_forever).await + } } /// A list of sleeping tickers. @@ -1068,6 +1085,11 @@ fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_ // in state_ptr. let state = unsafe { &*ptr }; + debug_state(state, name, f) +} + +/// Debug implementation for `Executor` and `LocalExecutor`. +fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result { /// Debug wrapper for the number of active tasks. struct ActiveTasks<'a>(&'a Mutex>); diff --git a/src/static_executors.rs b/src/static_executors.rs new file mode 100644 index 0000000..c1724e9 --- /dev/null +++ b/src/static_executors.rs @@ -0,0 +1,479 @@ +use crate::{debug_state, Executor, LocalExecutor, State}; +use async_task::{Builder, Runnable, Task}; +use slab::Slab; +use std::{ + cell::UnsafeCell, + fmt, + future::Future, + marker::PhantomData, + panic::{RefUnwindSafe, UnwindSafe}, +}; + +impl Executor<'static> { + /// Consumes the [`Executor`] and intentionally leaks it. + /// + /// Largely equivalent to calling `Box::leak(Box::new(executor))`, but the produced + /// [`StaticExecutor`]'s functions are optimized to require fewer synchronizing operations + /// when spawning, running, and finishing tasks. + /// + /// `StaticExecutor` cannot be converted back into a `Executor`, so this operation is + /// irreversible without the use of unsafe. + /// + /// # Example + /// + /// ``` + /// use async_executor::Executor; + /// use futures_lite::future; + /// + /// let ex = Executor::new().leak(); + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// + /// future::block_on(ex.run(task)); + /// ``` + pub fn leak(self) -> &'static StaticExecutor { + let ptr = self.state_ptr(); + // SAFETY: So long as an Executor lives, it's state pointer will always be valid + // when accessed through state_ptr. This executor will live for the full 'static + // lifetime so this isn't an arbitrary lifetime extension. + let state: &'static State = unsafe { &*ptr }; + + std::mem::forget(self); + + let mut active = state.active.lock().unwrap(); + if !active.is_empty() { + // Reschedule all of the active tasks. + for waker in active.drain() { + waker.wake(); + } + // Overwrite to ensure that the slab is deallocated. + *active = Slab::new(); + } + + // SAFETY: StaticExecutor has the same memory layout as State as it's repr(transparent). + // The lifetime is not altered: 'static -> 'static. + let static_executor: &'static StaticExecutor = unsafe { std::mem::transmute(state) }; + static_executor + } +} + +impl LocalExecutor<'static> { + /// Consumes the [`LocalExecutor`] and intentionally leaks it. + /// + /// Largely equivalent to calling `Box::leak(Box::new(executor))`, but the produced + /// [`StaticLocalExecutor`]'s functions are optimized to require fewer synchronizing operations + /// when spawning, running, and finishing tasks. + /// + /// `StaticLocalExecutor` cannot be converted back into a `Executor`, so this operation is + /// irreversible without the use of unsafe. + /// + /// # Example + /// + /// ``` + /// use async_executor::LocalExecutor; + /// use futures_lite::future; + /// + /// let ex = LocalExecutor::new().leak(); + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// + /// future::block_on(ex.run(task)); + /// ``` + pub fn leak(self) -> &'static StaticLocalExecutor { + let ptr = self.inner.state_ptr(); + // SAFETY: So long as a LocalExecutor lives, it's state pointer will always be valid + // when accessed through state_ptr. This executor will live for the full 'static + // lifetime so this isn't an arbitrary lifetime extension. + let state: &'static State = unsafe { &*ptr }; + + std::mem::forget(self); + + let mut active = state.active.lock().unwrap(); + if !active.is_empty() { + // Reschedule all of the active tasks. + for waker in active.drain() { + waker.wake(); + } + // Overwrite to ensure that the slab is deallocated. + *active = Slab::new(); + } + + // SAFETY: StaticLocalExecutor has the same memory layout as State as it's repr(transparent). + // The lifetime is not altered: 'static -> 'static. + let static_executor: &'static StaticLocalExecutor = unsafe { std::mem::transmute(state) }; + static_executor + } +} + +/// A static-lifetimed async [`Executor`]. +/// +/// This is primarily intended to be used in [`static`] variables, or types intended to be used, or can be created in non-static +/// contexts via [`Executor::leak`]. +/// +/// Spawning, running, and finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed. +/// A static executor may require signficantly less overhead in both single-threaded and mulitthreaded use cases. +/// +/// As this type does not implement `Drop`, losing the handle to the executor or failing +/// to consistently drive the executor with [`tick`] or [`run`] will cause the all spawned +/// tasks to permanently leak. Any tasks at the time will not be cancelled. +/// +/// [`static`]: https://doc.rust-lang.org/std/keyword.static.html +#[repr(transparent)] +pub struct StaticExecutor { + state: State, +} + +// SAFETY: Executor stores no thread local state that can be accessed via other thread. +unsafe impl Send for StaticExecutor {} +// SAFETY: Executor internally synchronizes all of it's operations internally. +unsafe impl Sync for StaticExecutor {} + +impl UnwindSafe for StaticExecutor {} +impl RefUnwindSafe for StaticExecutor {} + +impl fmt::Debug for StaticExecutor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + debug_state(&self.state, "StaticExecutor", f) + } +} + +impl StaticExecutor { + /// Creates a new StaticExecutor. + /// + /// # Examples + /// + /// ``` + /// use async_executor::StaticExecutor; + /// + /// static EXECUTOR: StaticExecutor = StaticExecutor::new(); + /// ``` + pub const fn new() -> Self { + Self { + state: State::new(), + } + } + + /// Spawns a task onto the executor. + /// + /// Note: unlike [`Executor::spawn`], this function requires being called with a `'static` + /// borrow on the executor. + /// + /// # Examples + /// + /// ``` + /// use async_executor::StaticExecutor; + /// + /// static EXECUTOR: StaticExecutor = StaticExecutor::new(); + /// + /// let task = EXECUTOR.spawn(async { + /// println!("Hello world"); + /// }); + /// ``` + pub fn spawn( + &'static self, + future: impl Future + Send + 'static, + ) -> Task { + let (runnable, task) = Builder::new() + .propagate_panic(true) + .spawn(|()| future, self.schedule()); + runnable.schedule(); + task + } + + /// Spawns a non-`'static` task onto the executor. + /// + /// ## Safety + /// + /// The caller must ensure that the returned task terminates + /// or is cancelled before the end of 'a. + pub unsafe fn spawn_scoped<'a, T: Send + 'a>( + &'static self, + future: impl Future + Send + 'a, + ) -> Task { + // SAFETY: + // + // - `future` is `Send` + // - `future` is not `'static`, but the caller guarantees that the + // task, and thus its `Runnable` must not live longer than `'a`. + // - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. + // Therefore we do not need to worry about what is done with the + // `Waker`. + let (runnable, task) = unsafe { + Builder::new() + .propagate_panic(true) + .spawn_unchecked(|()| future, self.schedule()) + }; + runnable.schedule(); + task + } + + /// Attempts to run a task if at least one is scheduled. + /// + /// Running a scheduled task means simply polling its future once. + /// + /// # Examples + /// + /// ``` + /// use async_executor::StaticExecutor; + /// + /// static EXECUTOR: StaticExecutor = StaticExecutor::new(); + /// + /// assert!(!EXECUTOR.try_tick()); // no tasks to run + /// + /// let task = EXECUTOR.spawn(async { + /// println!("Hello world"); + /// }); + /// + /// assert!(EXECUTOR.try_tick()); // a task was found + /// ``` + pub fn try_tick(&self) -> bool { + self.state.try_tick() + } + + /// Runs a single task. + /// + /// Running a task means simply polling its future once. + /// + /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. + /// + /// # Examples + /// + /// ``` + /// use async_executor::StaticExecutor; + /// use futures_lite::future; + /// + /// static EXECUTOR: StaticExecutor = StaticExecutor::new(); + /// + /// let task = EXECUTOR.spawn(async { + /// println!("Hello world"); + /// }); + /// + /// future::block_on(EXECUTOR.tick()); // runs the task + /// ``` + pub async fn tick(&self) { + self.state.tick().await; + } + + /// Runs the executor until the given future completes. + /// + /// # Examples + /// + /// ``` + /// use async_executor::StaticExecutor; + /// use futures_lite::future; + /// + /// static EXECUTOR: StaticExecutor = StaticExecutor::new(); + /// + /// let task = EXECUTOR.spawn(async { 1 + 2 }); + /// let res = future::block_on(EXECUTOR.run(async { task.await * 2 })); + /// + /// assert_eq!(res, 6); + /// ``` + pub async fn run(&self, future: impl Future) -> T { + self.state.run(future).await + } + + /// Returns a function that schedules a runnable task when it gets woken up. + fn schedule(&'static self) -> impl Fn(Runnable) + Send + Sync + 'static { + let state: &'static State = &self.state; + // TODO: If possible, push into the current local queue and notify the ticker. + move |runnable| { + state.queue.push(runnable).unwrap(); + state.notify(); + } + } +} + +impl Default for StaticExecutor { + fn default() -> Self { + Self::new() + } +} + +/// A static async [`LocalExecutor`] created from [`LocalExecutor::leak`]. +/// +/// This is primarily intended to be used in [`thread_local`] variables, or can be created in non-static +/// contexts via [`LocalExecutor::leak`]. +/// +/// Spawning, running, and finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed. +/// A static executor may require signficantly less overhead in both single-threaded and mulitthreaded use cases. +/// +/// As this type does not implement `Drop`, losing the handle to the executor or failing +/// to consistently drive the executor with [`tick`] or [`run`] will cause the all spawned +/// tasks to permanently leak. Any tasks at the time will not be cancelled. +/// +/// [`thread_local]: https://doc.rust-lang.org/std/macro.thread_local.html +#[repr(transparent)] +pub struct StaticLocalExecutor { + state: State, + marker_: PhantomData>, +} + +impl UnwindSafe for StaticLocalExecutor {} +impl RefUnwindSafe for StaticLocalExecutor {} + +impl fmt::Debug for StaticLocalExecutor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + debug_state(&self.state, "StaticLocalExecutor", f) + } +} + +impl StaticLocalExecutor { + /// Creates a new StaticLocalExecutor. + /// + /// # Examples + /// + /// ``` + /// use async_executor::StaticLocalExecutor; + /// + /// thread_local! { + /// static EXECUTOR: StaticLocalExecutor = StaticLocalExecutor::new(); + /// } + /// ``` + pub const fn new() -> Self { + Self { + state: State::new(), + marker_: PhantomData, + } + } + + /// Spawns a task onto the executor. + /// + /// Note: unlike [`LocalExecutor::spawn`], this function requires being called with a `'static` + /// borrow on the executor. + /// + /// # Examples + /// + /// ``` + /// use async_executor::LocalExecutor; + /// + /// let ex = LocalExecutor::new().leak(); + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// ``` + pub fn spawn(&'static self, future: impl Future + 'static) -> Task { + let (runnable, task) = Builder::new() + .propagate_panic(true) + .spawn_local(|()| future, self.schedule()); + runnable.schedule(); + task + } + + /// Spawns a non-`'static` task onto the executor. + /// + /// ## Safety + /// + /// The caller must ensure that the returned task terminates + /// or is cancelled before the end of 'a. + pub unsafe fn spawn_scoped<'a, T: 'a>( + &'static self, + future: impl Future + 'a, + ) -> Task { + // SAFETY: + // + // - `future` is not `Send` but `StaticLocalExecutor` is `!Sync`, + // `try_tick`, `tick` and `run` can only be called from the origin + // thread of the `StaticLocalExecutor`. Similarly, `spawn_scoped` can only + // be called from the origin thread, ensuring that `future` and the executor + // share the same origin thread. The `Runnable` can be scheduled from other + // threads, but because of the above `Runnable` can only be called or + // dropped on the origin thread. + // - `future` is not `'static`, but the caller guarantees that the + // task, and thus its `Runnable` must not live longer than `'a`. + // - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. + // Therefore we do not need to worry about what is done with the + // `Waker`. + let (runnable, task) = unsafe { + Builder::new() + .propagate_panic(true) + .spawn_unchecked(|()| future, self.schedule()) + }; + runnable.schedule(); + task + } + + /// Attempts to run a task if at least one is scheduled. + /// + /// Running a scheduled task means simply polling its future once. + /// + /// # Examples + /// + /// ``` + /// use async_executor::LocalExecutor; + /// + /// let ex = LocalExecutor::new().leak(); + /// assert!(!ex.try_tick()); // no tasks to run + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// assert!(ex.try_tick()); // a task was found + /// ``` + pub fn try_tick(&self) -> bool { + self.state.try_tick() + } + + /// Runs a single task. + /// + /// Running a task means simply polling its future once. + /// + /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. + /// + /// # Examples + /// + /// ``` + /// use async_executor::LocalExecutor; + /// use futures_lite::future; + /// + /// let ex = LocalExecutor::new().leak(); + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// future::block_on(ex.tick()); // runs the task + /// ``` + pub async fn tick(&self) { + self.state.tick().await; + } + + /// Runs the executor until the given future completes. + /// + /// # Examples + /// + /// ``` + /// use async_executor::LocalExecutor; + /// use futures_lite::future; + /// + /// let ex = LocalExecutor::new().leak(); + /// + /// let task = ex.spawn(async { 1 + 2 }); + /// let res = future::block_on(ex.run(async { task.await * 2 })); + /// + /// assert_eq!(res, 6); + /// ``` + pub async fn run(&self, future: impl Future) -> T { + self.state.run(future).await + } + + /// Returns a function that schedules a runnable task when it gets woken up. + fn schedule(&'static self) -> impl Fn(Runnable) + Send + Sync + 'static { + let state: &'static State = &self.state; + // TODO: If possible, push into the current local queue and notify the ticker. + move |runnable| { + state.queue.push(runnable).unwrap(); + state.notify(); + } + } +} + +impl Default for StaticLocalExecutor { + fn default() -> Self { + Self::new() + } +}