From 98216cec6d7464429199794d85389b8360cf646e Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Fri, 21 Jul 2023 18:05:58 +1000 Subject: [PATCH] Make `{Mutex, Notify, OnceCell, RwLock, Semaphore}::const_new` always available Since MSRV is bumped to 1.63, `Mutex::new` is now usable in const context. Also use `assert!` in const function to ensure correctness instead of silently truncating the value and remove cfg `tokio_no_const_mutex_new`. Signed-off-by: Jiahao XU --- .github/workflows/ci.yml | 6 - tokio/src/loom/std/mutex.rs | 1 - tokio/src/macros/cfg.rs | 16 +- tokio/src/sync/batch_semaphore.rs | 24 ++- tokio/src/sync/mutex.rs | 3 +- tokio/src/sync/notify.rs | 3 +- tokio/src/sync/once_cell.rs | 11 +- tokio/src/sync/rwlock.rs | 11 +- tokio/src/sync/semaphore.rs | 22 +-- tokio/src/util/mod.rs | 7 +- tokio/tests/sync_once_cell.rs | 278 ++++++++++++++---------------- tokio/tests/task_join_set.rs | 76 ++++---- 12 files changed, 213 insertions(+), 245 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5e283e7c682..d4bf03933fe 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -442,7 +442,6 @@ jobs: # Run a platform without AtomicU64 and no const Mutex::new - target: armv5te-unknown-linux-gnueabi - rustflags: --cfg tokio_no_const_mutex_new steps: - uses: actions/checkout@v3 - name: Install Rust stable @@ -485,7 +484,6 @@ jobs: # Run a platform without AtomicU64 and no const Mutex::new - target: armv5te-unknown-linux-gnueabi - rustflags: --cfg tokio_no_const_mutex_new steps: - uses: actions/checkout@v3 - name: Install Rust stable @@ -568,10 +566,6 @@ jobs: # https://github.com/tokio-rs/tokio/pull/5356 # https://github.com/tokio-rs/tokio/issues/5373 - - name: Check without const_mutex_new - run: cargo hack check -p tokio --feature-powerset --depth 2 --keep-going - env: - RUSTFLAGS: --cfg tokio_unstable --cfg tokio_taskdump -Dwarnings --cfg tokio_no_atomic_u64 --cfg tokio_no_const_mutex_new - name: Check with const_mutex_new run: cargo hack check -p tokio --feature-powerset --depth 2 --keep-going env: diff --git a/tokio/src/loom/std/mutex.rs b/tokio/src/loom/std/mutex.rs index 076f7861104..7b8f9ba1e24 100644 --- a/tokio/src/loom/std/mutex.rs +++ b/tokio/src/loom/std/mutex.rs @@ -13,7 +13,6 @@ impl Mutex { } #[inline] - #[cfg(not(tokio_no_const_mutex_new))] pub(crate) const fn const_new(t: T) -> Mutex { Mutex(sync::Mutex::new(t)) } diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index f60c040fd01..96bff633910 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -555,13 +555,7 @@ macro_rules! cfg_not_has_atomic_u64 { macro_rules! cfg_has_const_mutex_new { ($($item:item)*) => { $( - #[cfg(all( - not(all(loom, test)), - any( - feature = "parking_lot", - not(tokio_no_const_mutex_new) - ) - ))] + #[cfg(not(all(loom, test)))] $item )* } @@ -570,13 +564,7 @@ macro_rules! cfg_has_const_mutex_new { macro_rules! cfg_not_has_const_mutex_new { ($($item:item)*) => { $( - #[cfg(not(all( - not(all(loom, test)), - any( - feature = "parking_lot", - not(tokio_no_const_mutex_new) - ) - )))] + #[cfg(all(loom, test))] $item )* } diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index a762f799d56..e0dafaa256e 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -178,14 +178,9 @@ impl Semaphore { /// Creates a new semaphore with the initial number of permits. /// /// Maximum number of permits on 32-bit platforms is `1<<29`. - /// - /// If the specified number of permits exceeds the maximum permit amount - /// Then the value will get clamped to the maximum number of permits. - #[cfg(all(feature = "parking_lot", not(all(loom, test))))] - pub(crate) const fn const_new(mut permits: usize) -> Self { - // NOTE: assertions and by extension panics are still being worked on: https://github.com/rust-lang/rust/issues/74925 - // currently we just clamp the permit count when it exceeds the max - permits &= Self::MAX_PERMITS; + #[cfg(not(all(loom, test)))] + pub(crate) const fn const_new(permits: usize) -> Self { + assert!(permits <= Self::MAX_PERMITS); Self { permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT), @@ -198,6 +193,19 @@ impl Semaphore { } } + /// Creates a new closed semaphore with 0 permits. + pub(crate) fn new_closed() -> Self { + Self { + permits: AtomicUsize::new(Self::CLOSED), + waiters: Mutex::new(Waitlist { + queue: LinkedList::new(), + closed: true, + }), + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span::none(), + } + } + /// Returns the current number of available permits. pub(crate) fn available_permits(&self) -> usize { self.permits.load(Acquire) >> Self::PERMIT_SHIFT diff --git a/tokio/src/sync/mutex.rs b/tokio/src/sync/mutex.rs index 549c77b321e..0700c260eb2 100644 --- a/tokio/src/sync/mutex.rs +++ b/tokio/src/sync/mutex.rs @@ -378,8 +378,7 @@ impl Mutex { /// /// static LOCK: Mutex = Mutex::const_new(5); /// ``` - #[cfg(all(feature = "parking_lot", not(all(loom, test)),))] - #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))] + #[cfg(not(all(loom, test)))] pub const fn const_new(t: T) -> Self where T: Sized, diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index 0f104b71aa2..bf00ca3f64f 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -443,8 +443,7 @@ impl Notify { /// /// static NOTIFY: Notify = Notify::const_new(); /// ``` - #[cfg(all(feature = "parking_lot", not(all(loom, test))))] - #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))] + #[cfg(not(all(loom, test)))] pub const fn const_new() -> Notify { Notify { state: AtomicUsize::new(0), diff --git a/tokio/src/sync/once_cell.rs b/tokio/src/sync/once_cell.rs index 90ea5cd6862..8a5463a3be7 100644 --- a/tokio/src/sync/once_cell.rs +++ b/tokio/src/sync/once_cell.rs @@ -114,12 +114,10 @@ impl Drop for OnceCell { impl From for OnceCell { fn from(value: T) -> Self { - let semaphore = Semaphore::new(0); - semaphore.close(); OnceCell { value_set: AtomicBool::new(true), value: UnsafeCell::new(MaybeUninit::new(value)), - semaphore, + semaphore: Semaphore::new_closed(), } } } @@ -139,6 +137,10 @@ impl OnceCell { /// If the `Option` is `None`, this is equivalent to `OnceCell::new`. /// /// [`OnceCell::new`]: crate::sync::OnceCell::new + // Once https://github.com/rust-lang/rust/issues/73255 lands + // and tokio MSRV is bumped to the rustc version with it stablised, + // we can made this function available in const context, + // by creating `Semaphore::const_new_closed`. pub fn new_with(value: Option) -> Self { if let Some(v) = value { OnceCell::from(v) @@ -171,8 +173,7 @@ impl OnceCell { /// assert_eq!(*result, 2); /// } /// ``` - #[cfg(all(feature = "parking_lot", not(all(loom, test))))] - #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))] + #[cfg(not(all(loom, test)))] pub const fn const_new() -> Self { OnceCell { value_set: AtomicBool::new(false), diff --git a/tokio/src/sync/rwlock.rs b/tokio/src/sync/rwlock.rs index dd4928546fc..ba464eab939 100644 --- a/tokio/src/sync/rwlock.rs +++ b/tokio/src/sync/rwlock.rs @@ -334,8 +334,7 @@ impl RwLock { /// /// static LOCK: RwLock = RwLock::const_new(5); /// ``` - #[cfg(all(feature = "parking_lot", not(all(loom, test))))] - #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))] + #[cfg(not(all(loom, test)))] pub const fn const_new(value: T) -> RwLock where T: Sized, @@ -359,13 +358,13 @@ impl RwLock { /// /// static LOCK: RwLock = RwLock::const_with_max_readers(5, 1024); /// ``` - #[cfg(all(feature = "parking_lot", not(all(loom, test))))] - #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))] - pub const fn const_with_max_readers(value: T, mut max_reads: u32) -> RwLock + #[cfg(not(all(loom, test)))] + pub const fn const_with_max_readers(value: T, max_reads: u32) -> RwLock where T: Sized, { - max_reads &= MAX_READS; + assert!(max_reads <= MAX_READS); + RwLock { mr: max_reads, c: UnsafeCell::new(value), diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index e679d0e6b04..cb770215488 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -172,20 +172,22 @@ impl Semaphore { /// /// static SEM: Semaphore = Semaphore::const_new(10); /// ``` - /// - #[cfg(all(feature = "parking_lot", not(all(loom, test))))] - #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))] + #[cfg(not(all(loom, test)))] pub const fn const_new(permits: usize) -> Self { - #[cfg(all(tokio_unstable, feature = "tracing"))] - return Self { + Self { ll_sem: ll::Semaphore::const_new(permits), + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span: tracing::Span::none(), - }; + } + } - #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] - return Self { - ll_sem: ll::Semaphore::const_new(permits), - }; + /// Creates a new closed semaphore with 0 permits. + pub(crate) fn new_closed() -> Self { + Self { + ll_sem: ll::Semaphore::new_closed(), + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span::none(), + } } /// Returns the current number of available permits. diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index 5912b0b0c46..dc997f4e67a 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -5,12 +5,7 @@ cfg_io_driver! { #[cfg(feature = "rt")] pub(crate) mod atomic_cell; -#[cfg(any( - feature = "rt", - feature = "signal", - feature = "process", - tokio_no_const_mutex_new, -))] +#[cfg(any(feature = "rt", feature = "signal", feature = "process"))] pub(crate) mod once_cell; #[cfg(any( diff --git a/tokio/tests/sync_once_cell.rs b/tokio/tests/sync_once_cell.rs index 38dfa7ca0b6..d5a69478ef2 100644 --- a/tokio/tests/sync_once_cell.rs +++ b/tokio/tests/sync_once_cell.rs @@ -4,7 +4,11 @@ use std::mem; use std::ops::Drop; use std::sync::atomic::{AtomicU32, Ordering}; +use std::time::Duration; +use tokio::runtime; use tokio::sync::OnceCell; +use tokio::sync::SetError; +use tokio::time; #[test] fn drop_cell() { @@ -102,184 +106,170 @@ fn from() { assert_eq!(*cell.get().unwrap(), 2); } -#[cfg(feature = "parking_lot")] -mod parking_lot { - use super::*; - - use tokio::runtime; - use tokio::sync::SetError; - use tokio::time; - - use std::time::Duration; - - async fn func1() -> u32 { - 5 - } - - async fn func2() -> u32 { - time::sleep(Duration::from_millis(1)).await; - 10 - } - - async fn func_err() -> Result { - Err(()) - } +async fn func1() -> u32 { + 5 +} - async fn func_ok() -> Result { - Ok(10) - } +async fn func2() -> u32 { + time::sleep(Duration::from_millis(1)).await; + 10 +} - async fn func_panic() -> u32 { - time::sleep(Duration::from_millis(1)).await; - panic!(); - } +async fn func_err() -> Result { + Err(()) +} - async fn sleep_and_set() -> u32 { - // Simulate sleep by pausing time and waiting for another thread to - // resume clock when calling `set`, then finding the cell being initialized - // by this call - time::sleep(Duration::from_millis(2)).await; - 5 - } +async fn func_ok() -> Result { + Ok(10) +} - async fn advance_time_and_set( - cell: &'static OnceCell, - v: u32, - ) -> Result<(), SetError> { - time::advance(Duration::from_millis(1)).await; - cell.set(v) - } +async fn func_panic() -> u32 { + time::sleep(Duration::from_millis(1)).await; + panic!(); +} - #[test] - fn get_or_init() { - let rt = runtime::Builder::new_current_thread() - .enable_time() - .start_paused(true) - .build() - .unwrap(); +async fn sleep_and_set() -> u32 { + // Simulate sleep by pausing time and waiting for another thread to + // resume clock when calling `set`, then finding the cell being initialized + // by this call + time::sleep(Duration::from_millis(2)).await; + 5 +} - static ONCE: OnceCell = OnceCell::const_new(); +async fn advance_time_and_set(cell: &'static OnceCell, v: u32) -> Result<(), SetError> { + time::advance(Duration::from_millis(1)).await; + cell.set(v) +} - rt.block_on(async { - let handle1 = rt.spawn(async { ONCE.get_or_init(func1).await }); - let handle2 = rt.spawn(async { ONCE.get_or_init(func2).await }); +#[test] +fn get_or_init() { + let rt = runtime::Builder::new_current_thread() + .enable_time() + .start_paused(true) + .build() + .unwrap(); - time::advance(Duration::from_millis(1)).await; - time::resume(); + static ONCE: OnceCell = OnceCell::const_new(); - let result1 = handle1.await.unwrap(); - let result2 = handle2.await.unwrap(); + rt.block_on(async { + let handle1 = rt.spawn(async { ONCE.get_or_init(func1).await }); + let handle2 = rt.spawn(async { ONCE.get_or_init(func2).await }); - assert_eq!(*result1, 5); - assert_eq!(*result2, 5); - }); - } + time::advance(Duration::from_millis(1)).await; + time::resume(); - #[test] - fn get_or_init_panic() { - let rt = runtime::Builder::new_current_thread() - .enable_time() - .build() - .unwrap(); + let result1 = handle1.await.unwrap(); + let result2 = handle2.await.unwrap(); - static ONCE: OnceCell = OnceCell::const_new(); + assert_eq!(*result1, 5); + assert_eq!(*result2, 5); + }); +} - rt.block_on(async { - time::pause(); +#[test] +fn get_or_init_panic() { + let rt = runtime::Builder::new_current_thread() + .enable_time() + .build() + .unwrap(); - let handle1 = rt.spawn(async { ONCE.get_or_init(func1).await }); - let handle2 = rt.spawn(async { ONCE.get_or_init(func_panic).await }); + static ONCE: OnceCell = OnceCell::const_new(); - time::advance(Duration::from_millis(1)).await; + rt.block_on(async { + time::pause(); - let result1 = handle1.await.unwrap(); - let result2 = handle2.await.unwrap(); + let handle1 = rt.spawn(async { ONCE.get_or_init(func1).await }); + let handle2 = rt.spawn(async { ONCE.get_or_init(func_panic).await }); - assert_eq!(*result1, 5); - assert_eq!(*result2, 5); - }); - } + time::advance(Duration::from_millis(1)).await; - #[test] - fn set_and_get() { - let rt = runtime::Builder::new_current_thread() - .enable_time() - .build() - .unwrap(); + let result1 = handle1.await.unwrap(); + let result2 = handle2.await.unwrap(); - static ONCE: OnceCell = OnceCell::const_new(); + assert_eq!(*result1, 5); + assert_eq!(*result2, 5); + }); +} - rt.block_on(async { - let _ = rt.spawn(async { ONCE.set(5) }).await; - let value = ONCE.get().unwrap(); - assert_eq!(*value, 5); - }); - } +#[test] +fn set_and_get() { + let rt = runtime::Builder::new_current_thread() + .enable_time() + .build() + .unwrap(); + + static ONCE: OnceCell = OnceCell::const_new(); + + rt.block_on(async { + let _ = rt.spawn(async { ONCE.set(5) }).await; + let value = ONCE.get().unwrap(); + assert_eq!(*value, 5); + }); +} - #[test] - fn get_uninit() { - static ONCE: OnceCell = OnceCell::const_new(); - let uninit = ONCE.get(); - assert!(uninit.is_none()); - } +#[test] +fn get_uninit() { + static ONCE: OnceCell = OnceCell::const_new(); + let uninit = ONCE.get(); + assert!(uninit.is_none()); +} - #[test] - fn set_twice() { - static ONCE: OnceCell = OnceCell::const_new(); +#[test] +fn set_twice() { + static ONCE: OnceCell = OnceCell::const_new(); - let first = ONCE.set(5); - assert_eq!(first, Ok(())); - let second = ONCE.set(6); - assert!(second.err().unwrap().is_already_init_err()); - } + let first = ONCE.set(5); + assert_eq!(first, Ok(())); + let second = ONCE.set(6); + assert!(second.err().unwrap().is_already_init_err()); +} - #[test] - fn set_while_initializing() { - let rt = runtime::Builder::new_current_thread() - .enable_time() - .build() - .unwrap(); +#[test] +fn set_while_initializing() { + let rt = runtime::Builder::new_current_thread() + .enable_time() + .build() + .unwrap(); - static ONCE: OnceCell = OnceCell::const_new(); + static ONCE: OnceCell = OnceCell::const_new(); - rt.block_on(async { - time::pause(); + rt.block_on(async { + time::pause(); - let handle1 = rt.spawn(async { ONCE.get_or_init(sleep_and_set).await }); - let handle2 = rt.spawn(async { advance_time_and_set(&ONCE, 10).await }); + let handle1 = rt.spawn(async { ONCE.get_or_init(sleep_and_set).await }); + let handle2 = rt.spawn(async { advance_time_and_set(&ONCE, 10).await }); - time::advance(Duration::from_millis(2)).await; + time::advance(Duration::from_millis(2)).await; - let result1 = handle1.await.unwrap(); - let result2 = handle2.await.unwrap(); + let result1 = handle1.await.unwrap(); + let result2 = handle2.await.unwrap(); - assert_eq!(*result1, 5); - assert!(result2.err().unwrap().is_initializing_err()); - }); - } + assert_eq!(*result1, 5); + assert!(result2.err().unwrap().is_initializing_err()); + }); +} - #[test] - fn get_or_try_init() { - let rt = runtime::Builder::new_current_thread() - .enable_time() - .start_paused(true) - .build() - .unwrap(); +#[test] +fn get_or_try_init() { + let rt = runtime::Builder::new_current_thread() + .enable_time() + .start_paused(true) + .build() + .unwrap(); - static ONCE: OnceCell = OnceCell::const_new(); + static ONCE: OnceCell = OnceCell::const_new(); - rt.block_on(async { - let handle1 = rt.spawn(async { ONCE.get_or_try_init(func_err).await }); - let handle2 = rt.spawn(async { ONCE.get_or_try_init(func_ok).await }); + rt.block_on(async { + let handle1 = rt.spawn(async { ONCE.get_or_try_init(func_err).await }); + let handle2 = rt.spawn(async { ONCE.get_or_try_init(func_ok).await }); - time::advance(Duration::from_millis(1)).await; - time::resume(); + time::advance(Duration::from_millis(1)).await; + time::resume(); - let result1 = handle1.await.unwrap(); - assert!(result1.is_err()); + let result1 = handle1.await.unwrap(); + assert!(result1.is_err()); - let result2 = handle2.await.unwrap(); - assert_eq!(*result2.unwrap(), 10); - }); - } + let result2 = handle2.await.unwrap(); + assert_eq!(*result2.unwrap(), 10); + }); } diff --git a/tokio/tests/task_join_set.rs b/tokio/tests/task_join_set.rs index b1b6cf9665f..d236aa1fca1 100644 --- a/tokio/tests/task_join_set.rs +++ b/tokio/tests/task_join_set.rs @@ -1,6 +1,7 @@ #![warn(rust_2018_idioms)] -#![cfg(all(feature = "full"))] +#![cfg(feature = "full")] +use futures::future::FutureExt; use tokio::sync::oneshot; use tokio::task::JoinSet; use tokio::time::Duration; @@ -184,52 +185,45 @@ async fn abort_all() { assert_eq!(set.len(), 0); } -#[cfg(feature = "parking_lot")] -mod parking_lot { - use super::*; +// This ensures that `join_next` works correctly when the coop budget is +// exhausted. +#[tokio::test(flavor = "current_thread")] +async fn join_set_coop() { + // Large enough to trigger coop. + const TASK_NUM: u32 = 1000; - use futures::future::FutureExt; + static SEM: tokio::sync::Semaphore = tokio::sync::Semaphore::const_new(0); - // This ensures that `join_next` works correctly when the coop budget is - // exhausted. - #[tokio::test(flavor = "current_thread")] - async fn join_set_coop() { - // Large enough to trigger coop. - const TASK_NUM: u32 = 1000; - - static SEM: tokio::sync::Semaphore = tokio::sync::Semaphore::const_new(0); + let mut set = JoinSet::new(); - let mut set = JoinSet::new(); + for _ in 0..TASK_NUM { + set.spawn(async { + SEM.add_permits(1); + }); + } - for _ in 0..TASK_NUM { - set.spawn(async { - SEM.add_permits(1); - }); - } + // Wait for all tasks to complete. + // + // Since this is a `current_thread` runtime, there's no race condition + // between the last permit being added and the task completing. + let _ = SEM.acquire_many(TASK_NUM).await.unwrap(); - // Wait for all tasks to complete. - // - // Since this is a `current_thread` runtime, there's no race condition - // between the last permit being added and the task completing. - let _ = SEM.acquire_many(TASK_NUM).await.unwrap(); - - let mut count = 0; - let mut coop_count = 0; - loop { - match set.join_next().now_or_never() { - Some(Some(Ok(()))) => {} - Some(Some(Err(err))) => panic!("failed: {}", err), - None => { - coop_count += 1; - tokio::task::yield_now().await; - continue; - } - Some(None) => break, + let mut count = 0; + let mut coop_count = 0; + loop { + match set.join_next().now_or_never() { + Some(Some(Ok(()))) => {} + Some(Some(Err(err))) => panic!("failed: {}", err), + None => { + coop_count += 1; + tokio::task::yield_now().await; + continue; } - - count += 1; + Some(None) => break, } - assert!(coop_count >= 1); - assert_eq!(count, TASK_NUM); + + count += 1; } + assert!(coop_count >= 1); + assert_eq!(count, TASK_NUM); }