From 737dff40cb84970d3cf80bc5e1a325241ce57890 Mon Sep 17 00:00:00 2001 From: Adam Chalmers Date: Sun, 10 Sep 2023 04:21:51 -0500 Subject: [PATCH 1/7] task: rename generic paramter for `spawn` (#5993) --- tokio/src/task/spawn.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tokio/src/task/spawn.rs b/tokio/src/task/spawn.rs index 23a46d74562..a4498f47320 100644 --- a/tokio/src/task/spawn.rs +++ b/tokio/src/task/spawn.rs @@ -161,14 +161,14 @@ cfg_rt! { /// error[E0391]: cycle detected when processing `main` /// ``` #[track_caller] - pub fn spawn(future: T) -> JoinHandle + pub fn spawn(future: F) -> JoinHandle where - T: Future + Send + 'static, - T::Output: Send + 'static, + F: Future + Send + 'static, + F::Output: Send + 'static, { // preventing stack overflows on debug mode, by quickly sending the // task to the heap. - if cfg!(debug_assertions) && std::mem::size_of::() > 2048 { + if cfg!(debug_assertions) && std::mem::size_of::() > 2048 { spawn_inner(Box::pin(future), None) } else { spawn_inner(future, None) From b046c0dcbb4d6779def9dc7ea1e6646ea9c42b27 Mon Sep 17 00:00:00 2001 From: "M.Amin Rayej" Date: Sun, 10 Sep 2023 18:12:53 +0330 Subject: [PATCH 2/7] benches: use criterion instead of bencher (#5981) --- benches/Cargo.toml | 2 +- benches/copy.rs | 98 +++++++------ benches/fs.rs | 103 ++++++++------ benches/rt_current_thread.rs | 74 +++++----- benches/rt_multi_threaded.rs | 221 +++++++++++++++-------------- benches/signal.rs | 32 +++-- benches/spawn.rs | 87 ++++++------ benches/sync_mpsc.rs | 268 +++++++++++++++++++---------------- benches/sync_mpsc_oneshot.rs | 35 ++--- benches/sync_notify.rs | 80 ++++++----- benches/sync_rwlock.rs | 161 ++++++++++++--------- benches/sync_semaphore.rs | 190 ++++++++++++++----------- benches/sync_watch.rs | 29 ++-- benches/time_now.rs | 21 ++- 14 files changed, 767 insertions(+), 634 deletions(-) diff --git a/benches/Cargo.toml b/benches/Cargo.toml index 47a830416ab..e0b162b422e 100644 --- a/benches/Cargo.toml +++ b/benches/Cargo.toml @@ -9,7 +9,7 @@ test-util = ["tokio/test-util"] [dependencies] tokio = { version = "1.5.0", path = "../tokio", features = ["full"] } -bencher = "0.1.5" +criterion = "0.5.1" rand = "0.8" rand_chacha = "0.3" diff --git a/benches/copy.rs b/benches/copy.rs index ae02ad5a7d9..1da55f349f7 100644 --- a/benches/copy.rs +++ b/benches/copy.rs @@ -1,4 +1,4 @@ -use bencher::{benchmark_group, benchmark_main, Bencher}; +use criterion::{criterion_group, criterion_main, Criterion}; use rand::{Rng, SeedableRng}; use rand_chacha::ChaCha20Rng; @@ -174,65 +174,77 @@ fn rt() -> tokio::runtime::Runtime { .unwrap() } -fn copy_mem_to_mem(b: &mut Bencher) { +fn copy_mem_to_mem(c: &mut Criterion) { let rt = rt(); - b.iter(|| { - let task = || async { - let mut source = repeat(0).take(SOURCE_SIZE); - let mut dest = Vec::new(); - copy(&mut source, &mut dest).await.unwrap(); - }; - - rt.block_on(task()); - }) + c.bench_function("copy_mem_to_mem", |b| { + b.iter(|| { + let task = || async { + let mut source = repeat(0).take(SOURCE_SIZE); + let mut dest = Vec::new(); + copy(&mut source, &mut dest).await.unwrap(); + }; + + rt.block_on(task()); + }) + }); } -fn copy_mem_to_slow_hdd(b: &mut Bencher) { +fn copy_mem_to_slow_hdd(c: &mut Criterion) { let rt = rt(); - b.iter(|| { - let task = || async { - let mut source = repeat(0).take(SOURCE_SIZE); - let mut dest = SlowHddWriter::new(WRITE_SERVICE_PERIOD, WRITE_BUFFER); - copy(&mut source, &mut dest).await.unwrap(); - }; - - rt.block_on(task()); - }) + c.bench_function("copy_mem_to_slow_hdd", |b| { + b.iter(|| { + let task = || async { + let mut source = repeat(0).take(SOURCE_SIZE); + let mut dest = SlowHddWriter::new(WRITE_SERVICE_PERIOD, WRITE_BUFFER); + copy(&mut source, &mut dest).await.unwrap(); + }; + + rt.block_on(task()); + }) + }); } -fn copy_chunk_to_mem(b: &mut Bencher) { +fn copy_chunk_to_mem(c: &mut Criterion) { let rt = rt(); - b.iter(|| { - let task = || async { - let mut source = ChunkReader::new(CHUNK_SIZE, READ_SERVICE_PERIOD).take(SOURCE_SIZE); - let mut dest = Vec::new(); - copy(&mut source, &mut dest).await.unwrap(); - }; - - rt.block_on(task()); - }) + + c.bench_function("copy_chunk_to_mem", |b| { + b.iter(|| { + let task = || async { + let mut source = + ChunkReader::new(CHUNK_SIZE, READ_SERVICE_PERIOD).take(SOURCE_SIZE); + let mut dest = Vec::new(); + copy(&mut source, &mut dest).await.unwrap(); + }; + + rt.block_on(task()); + }) + }); } -fn copy_chunk_to_slow_hdd(b: &mut Bencher) { +fn copy_chunk_to_slow_hdd(c: &mut Criterion) { let rt = rt(); - b.iter(|| { - let task = || async { - let mut source = ChunkReader::new(CHUNK_SIZE, READ_SERVICE_PERIOD).take(SOURCE_SIZE); - let mut dest = SlowHddWriter::new(WRITE_SERVICE_PERIOD, WRITE_BUFFER); - copy(&mut source, &mut dest).await.unwrap(); - }; - - rt.block_on(task()); - }) + + c.bench_function("copy_chunk_to_slow_hdd", |b| { + b.iter(|| { + let task = || async { + let mut source = + ChunkReader::new(CHUNK_SIZE, READ_SERVICE_PERIOD).take(SOURCE_SIZE); + let mut dest = SlowHddWriter::new(WRITE_SERVICE_PERIOD, WRITE_BUFFER); + copy(&mut source, &mut dest).await.unwrap(); + }; + + rt.block_on(task()); + }) + }); } -benchmark_group!( +criterion_group!( copy_bench, copy_mem_to_mem, copy_mem_to_slow_hdd, copy_chunk_to_mem, copy_chunk_to_slow_hdd, ); -benchmark_main!(copy_bench); +criterion_main!(copy_bench); diff --git a/benches/fs.rs b/benches/fs.rs index 026814ff468..2964afbd46e 100644 --- a/benches/fs.rs +++ b/benches/fs.rs @@ -6,7 +6,7 @@ use tokio::fs::File; use tokio::io::AsyncReadExt; use tokio_util::codec::{BytesCodec, FramedRead /*FramedWrite*/}; -use bencher::{benchmark_group, benchmark_main, Bencher}; +use criterion::{criterion_group, criterion_main, Criterion}; use std::fs::File as StdFile; use std::io::Read as StdRead; @@ -23,81 +23,90 @@ const BLOCK_COUNT: usize = 1_000; const BUFFER_SIZE: usize = 4096; const DEV_ZERO: &str = "/dev/zero"; -fn async_read_codec(b: &mut Bencher) { +fn async_read_codec(c: &mut Criterion) { let rt = rt(); - b.iter(|| { - let task = || async { - let file = File::open(DEV_ZERO).await.unwrap(); - let mut input_stream = FramedRead::with_capacity(file, BytesCodec::new(), BUFFER_SIZE); + c.bench_function("async_read_codec", |b| { + b.iter(|| { + let task = || async { + let file = File::open(DEV_ZERO).await.unwrap(); + let mut input_stream = + FramedRead::with_capacity(file, BytesCodec::new(), BUFFER_SIZE); - for _i in 0..BLOCK_COUNT { - let _bytes = input_stream.next().await.unwrap(); - } - }; + for _i in 0..BLOCK_COUNT { + let _bytes = input_stream.next().await.unwrap(); + } + }; - rt.block_on(task()); + rt.block_on(task()); + }) }); } -fn async_read_buf(b: &mut Bencher) { +fn async_read_buf(c: &mut Criterion) { let rt = rt(); - b.iter(|| { - let task = || async { - let mut file = File::open(DEV_ZERO).await.unwrap(); - let mut buffer = [0u8; BUFFER_SIZE]; - - for _i in 0..BLOCK_COUNT { - let count = file.read(&mut buffer).await.unwrap(); - if count == 0 { - break; + c.bench_function("async_read_buf", |b| { + b.iter(|| { + let task = || async { + let mut file = File::open(DEV_ZERO).await.unwrap(); + let mut buffer = [0u8; BUFFER_SIZE]; + + for _i in 0..BLOCK_COUNT { + let count = file.read(&mut buffer).await.unwrap(); + if count == 0 { + break; + } } - } - }; + }; - rt.block_on(task()); + rt.block_on(task()); + }); }); } -fn async_read_std_file(b: &mut Bencher) { +fn async_read_std_file(c: &mut Criterion) { let rt = rt(); - let task = || async { - let mut file = tokio::task::block_in_place(|| Box::pin(StdFile::open(DEV_ZERO).unwrap())); + c.bench_function("async_read_std_file", |b| { + b.iter(|| { + let task = || async { + let mut file = + tokio::task::block_in_place(|| Box::pin(StdFile::open(DEV_ZERO).unwrap())); - for _i in 0..BLOCK_COUNT { - let mut buffer = [0u8; BUFFER_SIZE]; - let mut file_ref = file.as_mut(); + for _i in 0..BLOCK_COUNT { + let mut buffer = [0u8; BUFFER_SIZE]; + let mut file_ref = file.as_mut(); - tokio::task::block_in_place(move || { - file_ref.read_exact(&mut buffer).unwrap(); - }); - } - }; + tokio::task::block_in_place(move || { + file_ref.read_exact(&mut buffer).unwrap(); + }); + } + }; - b.iter(|| { - rt.block_on(task()); + rt.block_on(task()); + }); }); } -fn sync_read(b: &mut Bencher) { - b.iter(|| { - let mut file = StdFile::open(DEV_ZERO).unwrap(); - let mut buffer = [0u8; BUFFER_SIZE]; +fn sync_read(c: &mut Criterion) { + c.bench_function("sync_read", |b| { + b.iter(|| { + let mut file = StdFile::open(DEV_ZERO).unwrap(); + let mut buffer = [0u8; BUFFER_SIZE]; - for _i in 0..BLOCK_COUNT { - file.read_exact(&mut buffer).unwrap(); - } + for _i in 0..BLOCK_COUNT { + file.read_exact(&mut buffer).unwrap(); + } + }) }); } -benchmark_group!( +criterion_group!( file, async_read_std_file, async_read_buf, async_read_codec, sync_read ); - -benchmark_main!(file); +criterion_main!(file); diff --git a/benches/rt_current_thread.rs b/benches/rt_current_thread.rs index dc832193b49..821207638f1 100644 --- a/benches/rt_current_thread.rs +++ b/benches/rt_current_thread.rs @@ -4,46 +4,50 @@ use tokio::runtime::{self, Runtime}; -use bencher::{benchmark_group, benchmark_main, Bencher}; +use criterion::{criterion_group, criterion_main, Criterion}; const NUM_SPAWN: usize = 1_000; -fn spawn_many_local(b: &mut Bencher) { +fn spawn_many_local(c: &mut Criterion) { let rt = rt(); let mut handles = Vec::with_capacity(NUM_SPAWN); - b.iter(|| { - rt.block_on(async { - for _ in 0..NUM_SPAWN { - handles.push(tokio::spawn(async move {})); - } - - for handle in handles.drain(..) { - handle.await.unwrap(); - } - }); + c.bench_function("spawn_many_local", |b| { + b.iter(|| { + rt.block_on(async { + for _ in 0..NUM_SPAWN { + handles.push(tokio::spawn(async move {})); + } + + for handle in handles.drain(..) { + handle.await.unwrap(); + } + }); + }) }); } -fn spawn_many_remote_idle(b: &mut Bencher) { +fn spawn_many_remote_idle(c: &mut Criterion) { let rt = rt(); let rt_handle = rt.handle(); let mut handles = Vec::with_capacity(NUM_SPAWN); - b.iter(|| { - for _ in 0..NUM_SPAWN { - handles.push(rt_handle.spawn(async {})); - } - - rt.block_on(async { - for handle in handles.drain(..) { - handle.await.unwrap(); + c.bench_function("spawn_many_remote_idle", |b| { + b.iter(|| { + for _ in 0..NUM_SPAWN { + handles.push(rt_handle.spawn(async {})); } - }); + + rt.block_on(async { + for handle in handles.drain(..) { + handle.await.unwrap(); + } + }); + }) }); } -fn spawn_many_remote_busy(b: &mut Bencher) { +fn spawn_many_remote_busy(c: &mut Criterion) { let rt = rt(); let rt_handle = rt.handle(); let mut handles = Vec::with_capacity(NUM_SPAWN); @@ -56,16 +60,18 @@ fn spawn_many_remote_busy(b: &mut Bencher) { iter() }); - b.iter(|| { - for _ in 0..NUM_SPAWN { - handles.push(rt_handle.spawn(async {})); - } - - rt.block_on(async { - for handle in handles.drain(..) { - handle.await.unwrap(); + c.bench_function("spawn_many_remote_busy", |b| { + b.iter(|| { + for _ in 0..NUM_SPAWN { + handles.push(rt_handle.spawn(async {})); } - }); + + rt.block_on(async { + for handle in handles.drain(..) { + handle.await.unwrap(); + } + }); + }) }); } @@ -73,11 +79,11 @@ fn rt() -> Runtime { runtime::Builder::new_current_thread().build().unwrap() } -benchmark_group!( +criterion_group!( scheduler, spawn_many_local, spawn_many_remote_idle, spawn_many_remote_busy ); -benchmark_main!(scheduler); +criterion_main!(scheduler); diff --git a/benches/rt_multi_threaded.rs b/benches/rt_multi_threaded.rs index 689c334b6d7..324fb60961f 100644 --- a/benches/rt_multi_threaded.rs +++ b/benches/rt_multi_threaded.rs @@ -5,63 +5,68 @@ use tokio::runtime::{self, Runtime}; use tokio::sync::oneshot; -use bencher::{benchmark_group, benchmark_main, Bencher}; use std::sync::atomic::Ordering::Relaxed; use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::{mpsc, Arc}; use std::time::{Duration, Instant}; +use criterion::{criterion_group, criterion_main, Criterion}; + const NUM_WORKERS: usize = 4; const NUM_SPAWN: usize = 10_000; const STALL_DUR: Duration = Duration::from_micros(10); -fn spawn_many_local(b: &mut Bencher) { +fn spawn_many_local(c: &mut Criterion) { let rt = rt(); let (tx, rx) = mpsc::sync_channel(1000); let rem = Arc::new(AtomicUsize::new(0)); - b.iter(|| { - rem.store(NUM_SPAWN, Relaxed); + c.bench_function("spawn_many_local", |b| { + b.iter(|| { + rem.store(NUM_SPAWN, Relaxed); - rt.block_on(async { - for _ in 0..NUM_SPAWN { - let tx = tx.clone(); - let rem = rem.clone(); + rt.block_on(async { + for _ in 0..NUM_SPAWN { + let tx = tx.clone(); + let rem = rem.clone(); - tokio::spawn(async move { - if 1 == rem.fetch_sub(1, Relaxed) { - tx.send(()).unwrap(); - } - }); - } + tokio::spawn(async move { + if 1 == rem.fetch_sub(1, Relaxed) { + tx.send(()).unwrap(); + } + }); + } - let _ = rx.recv().unwrap(); - }); + let _ = rx.recv().unwrap(); + }); + }) }); } -fn spawn_many_remote_idle(b: &mut Bencher) { +fn spawn_many_remote_idle(c: &mut Criterion) { let rt = rt(); let mut handles = Vec::with_capacity(NUM_SPAWN); - b.iter(|| { - for _ in 0..NUM_SPAWN { - handles.push(rt.spawn(async {})); - } - - rt.block_on(async { - for handle in handles.drain(..) { - handle.await.unwrap(); + c.bench_function("spawn_many_remote_idle", |b| { + b.iter(|| { + for _ in 0..NUM_SPAWN { + handles.push(rt.spawn(async {})); } - }); + + rt.block_on(async { + for handle in handles.drain(..) { + handle.await.unwrap(); + } + }); + }) }); } // The runtime is busy with tasks that consume CPU time and yield. Yielding is a // lower notification priority than spawning / regular notification. -fn spawn_many_remote_busy1(b: &mut Bencher) { +fn spawn_many_remote_busy1(c: &mut Criterion) { let rt = rt(); let rt_handle = rt.handle(); let mut handles = Vec::with_capacity(NUM_SPAWN); @@ -78,16 +83,18 @@ fn spawn_many_remote_busy1(b: &mut Bencher) { }); } - b.iter(|| { - for _ in 0..NUM_SPAWN { - handles.push(rt_handle.spawn(async {})); - } - - rt.block_on(async { - for handle in handles.drain(..) { - handle.await.unwrap(); + c.bench_function("spawn_many_remote_busy1", |b| { + b.iter(|| { + for _ in 0..NUM_SPAWN { + handles.push(rt_handle.spawn(async {})); } - }); + + rt.block_on(async { + for handle in handles.drain(..) { + handle.await.unwrap(); + } + }); + }) }); flag.store(false, Relaxed); @@ -95,7 +102,7 @@ fn spawn_many_remote_busy1(b: &mut Bencher) { // The runtime is busy with tasks that consume CPU time and spawn new high-CPU // tasks. Spawning goes via a higher notification priority than yielding. -fn spawn_many_remote_busy2(b: &mut Bencher) { +fn spawn_many_remote_busy2(c: &mut Criterion) { const NUM_SPAWN: usize = 1_000; let rt = rt(); @@ -119,49 +126,52 @@ fn spawn_many_remote_busy2(b: &mut Bencher) { }); } - b.iter(|| { - for _ in 0..NUM_SPAWN { - handles.push(rt_handle.spawn(async {})); - } - - rt.block_on(async { - for handle in handles.drain(..) { - handle.await.unwrap(); + c.bench_function("spawn_many_remote_busy2", |b| { + b.iter(|| { + for _ in 0..NUM_SPAWN { + handles.push(rt_handle.spawn(async {})); } - }); + + rt.block_on(async { + for handle in handles.drain(..) { + handle.await.unwrap(); + } + }); + }) }); flag.store(false, Relaxed); } -fn yield_many(b: &mut Bencher) { +fn yield_many(c: &mut Criterion) { const NUM_YIELD: usize = 1_000; const TASKS: usize = 200; - let rt = rt(); + c.bench_function("yield_many", |b| { + let rt = rt(); + let (tx, rx) = mpsc::sync_channel(TASKS); - let (tx, rx) = mpsc::sync_channel(TASKS); - - b.iter(move || { - for _ in 0..TASKS { - let tx = tx.clone(); + b.iter(move || { + for _ in 0..TASKS { + let tx = tx.clone(); - rt.spawn(async move { - for _ in 0..NUM_YIELD { - tokio::task::yield_now().await; - } + rt.spawn(async move { + for _ in 0..NUM_YIELD { + tokio::task::yield_now().await; + } - tx.send(()).unwrap(); - }); - } + tx.send(()).unwrap(); + }); + } - for _ in 0..TASKS { - let _ = rx.recv().unwrap(); - } + for _ in 0..TASKS { + let _ = rx.recv().unwrap(); + } + }) }); } -fn ping_pong(b: &mut Bencher) { +fn ping_pong(c: &mut Criterion) { const NUM_PINGS: usize = 1_000; let rt = rt(); @@ -169,46 +179,46 @@ fn ping_pong(b: &mut Bencher) { let (done_tx, done_rx) = mpsc::sync_channel(1000); let rem = Arc::new(AtomicUsize::new(0)); - b.iter(|| { - let done_tx = done_tx.clone(); - let rem = rem.clone(); - rem.store(NUM_PINGS, Relaxed); + c.bench_function("ping_pong", |b| { + b.iter(|| { + let done_tx = done_tx.clone(); + let rem = rem.clone(); + rem.store(NUM_PINGS, Relaxed); - rt.block_on(async { - tokio::spawn(async move { - for _ in 0..NUM_PINGS { - let rem = rem.clone(); - let done_tx = done_tx.clone(); - - tokio::spawn(async move { - let (tx1, rx1) = oneshot::channel(); - let (tx2, rx2) = oneshot::channel(); + rt.block_on(async { + tokio::spawn(async move { + for _ in 0..NUM_PINGS { + let rem = rem.clone(); + let done_tx = done_tx.clone(); tokio::spawn(async move { - rx1.await.unwrap(); - tx2.send(()).unwrap(); - }); + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); - tx1.send(()).unwrap(); - rx2.await.unwrap(); + tokio::spawn(async move { + rx1.await.unwrap(); + tx2.send(()).unwrap(); + }); - if 1 == rem.fetch_sub(1, Relaxed) { - done_tx.send(()).unwrap(); - } - }); - } - }); + tx1.send(()).unwrap(); + rx2.await.unwrap(); - done_rx.recv().unwrap(); - }); + if 1 == rem.fetch_sub(1, Relaxed) { + done_tx.send(()).unwrap(); + } + }); + } + }); + + done_rx.recv().unwrap(); + }); + }) }); } -fn chained_spawn(b: &mut Bencher) { +fn chained_spawn(c: &mut Criterion) { const ITER: usize = 1_000; - let rt = rt(); - fn iter(done_tx: mpsc::SyncSender<()>, n: usize) { if n == 0 { done_tx.send(()).unwrap(); @@ -219,18 +229,21 @@ fn chained_spawn(b: &mut Bencher) { } } - let (done_tx, done_rx) = mpsc::sync_channel(1000); + c.bench_function("chained_spawn", |b| { + let rt = rt(); + let (done_tx, done_rx) = mpsc::sync_channel(1000); - b.iter(move || { - let done_tx = done_tx.clone(); + b.iter(move || { + let done_tx = done_tx.clone(); - rt.block_on(async { - tokio::spawn(async move { - iter(done_tx, ITER); - }); + rt.block_on(async { + tokio::spawn(async move { + iter(done_tx, ITER); + }); - done_rx.recv().unwrap(); - }); + done_rx.recv().unwrap(); + }); + }) }); } @@ -249,7 +262,7 @@ fn stall() { } } -benchmark_group!( +criterion_group!( scheduler, spawn_many_local, spawn_many_remote_idle, @@ -260,4 +273,4 @@ benchmark_group!( chained_spawn, ); -benchmark_main!(scheduler); +criterion_main!(scheduler); diff --git a/benches/signal.rs b/benches/signal.rs index 4d5f58fd406..af9fc1f3b97 100644 --- a/benches/signal.rs +++ b/benches/signal.rs @@ -1,7 +1,7 @@ //! Benchmark the delay in propagating OS signals to any listeners. #![cfg(unix)] -use bencher::{benchmark_group, benchmark_main, Bencher}; +use criterion::{criterion_group, criterion_main, Criterion}; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; @@ -41,7 +41,7 @@ pub fn send_signal(signal: libc::c_int) { } } -fn many_signals(bench: &mut Bencher) { +fn many_signals(c: &mut Criterion) { let num_signals = 10; let (tx, mut rx) = mpsc::channel(num_signals); @@ -75,21 +75,23 @@ fn many_signals(bench: &mut Bencher) { // tasks have been polled at least once rt.block_on(Spinner::new()); - bench.iter(|| { - rt.block_on(async { - send_signal(libc::SIGCHLD); - for _ in 0..num_signals { - rx.recv().await.expect("channel closed"); - } + c.bench_function("many_signals", |b| { + b.iter(|| { + rt.block_on(async { + send_signal(libc::SIGCHLD); + for _ in 0..num_signals { + rx.recv().await.expect("channel closed"); + } - send_signal(libc::SIGIO); - for _ in 0..num_signals { - rx.recv().await.expect("channel closed"); - } - }); + send_signal(libc::SIGIO); + for _ in 0..num_signals { + rx.recv().await.expect("channel closed"); + } + }); + }) }); } -benchmark_group!(signal_group, many_signals,); +criterion_group!(signal_group, many_signals); -benchmark_main!(signal_group); +criterion_main!(signal_group); diff --git a/benches/spawn.rs b/benches/spawn.rs index 7d4b8137354..628a9b2c86f 100644 --- a/benches/spawn.rs +++ b/benches/spawn.rs @@ -2,10 +2,7 @@ //! This essentially measure the time to enqueue a task in the local and remote //! case. -#[macro_use] -extern crate bencher; - -use bencher::{black_box, Bencher}; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; async fn work() -> usize { let val = 1 + 1; @@ -13,67 +10,77 @@ async fn work() -> usize { black_box(val) } -fn basic_scheduler_spawn(bench: &mut Bencher) { +fn basic_scheduler_spawn(c: &mut Criterion) { let runtime = tokio::runtime::Builder::new_current_thread() .build() .unwrap(); - bench.iter(|| { - runtime.block_on(async { - let h = tokio::spawn(work()); - assert_eq!(h.await.unwrap(), 2); - }); + + c.bench_function("basic_scheduler_spawn", |b| { + b.iter(|| { + runtime.block_on(async { + let h = tokio::spawn(work()); + assert_eq!(h.await.unwrap(), 2); + }); + }) }); } -fn basic_scheduler_spawn10(bench: &mut Bencher) { +fn basic_scheduler_spawn10(c: &mut Criterion) { let runtime = tokio::runtime::Builder::new_current_thread() .build() .unwrap(); - bench.iter(|| { - runtime.block_on(async { - let mut handles = Vec::with_capacity(10); - for _ in 0..10 { - handles.push(tokio::spawn(work())); - } - for handle in handles { - assert_eq!(handle.await.unwrap(), 2); - } - }); + + c.bench_function("basic_scheduler_spawn10", |b| { + b.iter(|| { + runtime.block_on(async { + let mut handles = Vec::with_capacity(10); + for _ in 0..10 { + handles.push(tokio::spawn(work())); + } + for handle in handles { + assert_eq!(handle.await.unwrap(), 2); + } + }); + }) }); } -fn threaded_scheduler_spawn(bench: &mut Bencher) { +fn threaded_scheduler_spawn(c: &mut Criterion) { let runtime = tokio::runtime::Builder::new_multi_thread() .worker_threads(1) .build() .unwrap(); - bench.iter(|| { - runtime.block_on(async { - let h = tokio::spawn(work()); - assert_eq!(h.await.unwrap(), 2); - }); + c.bench_function("threaded_scheduler_spawn", |b| { + b.iter(|| { + runtime.block_on(async { + let h = tokio::spawn(work()); + assert_eq!(h.await.unwrap(), 2); + }); + }) }); } -fn threaded_scheduler_spawn10(bench: &mut Bencher) { +fn threaded_scheduler_spawn10(c: &mut Criterion) { let runtime = tokio::runtime::Builder::new_multi_thread() .worker_threads(1) .build() .unwrap(); - bench.iter(|| { - runtime.block_on(async { - let mut handles = Vec::with_capacity(10); - for _ in 0..10 { - handles.push(tokio::spawn(work())); - } - for handle in handles { - assert_eq!(handle.await.unwrap(), 2); - } - }); + c.bench_function("threaded_scheduler_spawn10", |b| { + b.iter(|| { + runtime.block_on(async { + let mut handles = Vec::with_capacity(10); + for _ in 0..10 { + handles.push(tokio::spawn(work())); + } + for handle in handles { + assert_eq!(handle.await.unwrap(), 2); + } + }); + }) }); } -bencher::benchmark_group!( +criterion_group!( spawn, basic_scheduler_spawn, basic_scheduler_spawn10, @@ -81,4 +88,4 @@ bencher::benchmark_group!( threaded_scheduler_spawn10, ); -bencher::benchmark_main!(spawn); +criterion_main!(spawn); diff --git a/benches/sync_mpsc.rs b/benches/sync_mpsc.rs index 361493b1245..d6545e8047f 100644 --- a/benches/sync_mpsc.rs +++ b/benches/sync_mpsc.rs @@ -1,8 +1,23 @@ -use bencher::{black_box, Bencher}; use tokio::sync::mpsc; -type Medium = [usize; 64]; -type Large = [Medium; 64]; +use criterion::measurement::WallTime; +use criterion::{black_box, criterion_group, criterion_main, BenchmarkGroup, Criterion}; + +#[derive(Debug, Copy, Clone)] +struct Medium([usize; 64]); +impl Default for Medium { + fn default() -> Self { + Medium([0; 64]) + } +} + +#[derive(Debug, Copy, Clone)] +struct Large([Medium; 64]); +impl Default for Large { + fn default() -> Self { + Large([Medium::default(); 64]) + } +} fn rt() -> tokio::runtime::Runtime { tokio::runtime::Builder::new_multi_thread() @@ -11,169 +26,176 @@ fn rt() -> tokio::runtime::Runtime { .unwrap() } -fn create_1_medium(b: &mut Bencher) { - b.iter(|| { - black_box(&mpsc::channel::(1)); - }); -} - -fn create_100_medium(b: &mut Bencher) { - b.iter(|| { - black_box(&mpsc::channel::(100)); - }); -} - -fn create_100_000_medium(b: &mut Bencher) { - b.iter(|| { - black_box(&mpsc::channel::(100_000)); +fn create_medium(g: &mut BenchmarkGroup) { + g.bench_function(SIZE.to_string(), |b| { + b.iter(|| { + black_box(&mpsc::channel::(SIZE)); + }) }); } -fn send_medium(b: &mut Bencher) { +fn send_data(g: &mut BenchmarkGroup, prefix: &str) { let rt = rt(); - b.iter(|| { - let (tx, mut rx) = mpsc::channel::(1000); + g.bench_function(format!("{}_{}", prefix, SIZE), |b| { + b.iter(|| { + let (tx, mut rx) = mpsc::channel::(SIZE); - let _ = rt.block_on(tx.send([0; 64])); + let _ = rt.block_on(tx.send(T::default())); - rt.block_on(rx.recv()).unwrap(); + rt.block_on(rx.recv()).unwrap(); + }) }); } -fn send_large(b: &mut Bencher) { +fn contention_bounded(g: &mut BenchmarkGroup) { let rt = rt(); - b.iter(|| { - let (tx, mut rx) = mpsc::channel::(1000); - - let _ = rt.block_on(tx.send([[0; 64]; 64])); - - rt.block_on(rx.recv()).unwrap(); + g.bench_function("bounded", |b| { + b.iter(|| { + rt.block_on(async move { + let (tx, mut rx) = mpsc::channel::(1_000_000); + + for _ in 0..5 { + let tx = tx.clone(); + tokio::spawn(async move { + for i in 0..1000 { + tx.send(i).await.unwrap(); + } + }); + } + + for _ in 0..1_000 * 5 { + let _ = rx.recv().await; + } + }) + }) }); } -fn contention_bounded(b: &mut Bencher) { +fn contention_bounded_full(g: &mut BenchmarkGroup) { let rt = rt(); - b.iter(|| { - rt.block_on(async move { - let (tx, mut rx) = mpsc::channel::(1_000_000); - - for _ in 0..5 { - let tx = tx.clone(); - tokio::spawn(async move { - for i in 0..1000 { - tx.send(i).await.unwrap(); - } - }); - } - - for _ in 0..1_000 * 5 { - let _ = rx.recv().await; - } + g.bench_function("bounded_full", |b| { + b.iter(|| { + rt.block_on(async move { + let (tx, mut rx) = mpsc::channel::(100); + + for _ in 0..5 { + let tx = tx.clone(); + tokio::spawn(async move { + for i in 0..1000 { + tx.send(i).await.unwrap(); + } + }); + } + + for _ in 0..1_000 * 5 { + let _ = rx.recv().await; + } + }) }) }); } -fn contention_bounded_full(b: &mut Bencher) { +fn contention_unbounded(g: &mut BenchmarkGroup) { let rt = rt(); - b.iter(|| { - rt.block_on(async move { - let (tx, mut rx) = mpsc::channel::(100); - - for _ in 0..5 { - let tx = tx.clone(); - tokio::spawn(async move { - for i in 0..1000 { - tx.send(i).await.unwrap(); - } - }); - } - - for _ in 0..1_000 * 5 { - let _ = rx.recv().await; - } + g.bench_function("unbounded", |b| { + b.iter(|| { + rt.block_on(async move { + let (tx, mut rx) = mpsc::unbounded_channel::(); + + for _ in 0..5 { + let tx = tx.clone(); + tokio::spawn(async move { + for i in 0..1000 { + tx.send(i).unwrap(); + } + }); + } + + for _ in 0..1_000 * 5 { + let _ = rx.recv().await; + } + }) }) }); } -fn contention_unbounded(b: &mut Bencher) { +fn uncontented_bounded(g: &mut BenchmarkGroup) { let rt = rt(); - b.iter(|| { - rt.block_on(async move { - let (tx, mut rx) = mpsc::unbounded_channel::(); - - for _ in 0..5 { - let tx = tx.clone(); - tokio::spawn(async move { - for i in 0..1000 { - tx.send(i).unwrap(); - } - }); - } - - for _ in 0..1_000 * 5 { - let _ = rx.recv().await; - } + g.bench_function("bounded", |b| { + b.iter(|| { + rt.block_on(async move { + let (tx, mut rx) = mpsc::channel::(1_000_000); + + for i in 0..5000 { + tx.send(i).await.unwrap(); + } + + for _ in 0..5_000 { + let _ = rx.recv().await; + } + }) }) }); } -fn uncontented_bounded(b: &mut Bencher) { +fn uncontented_unbounded(g: &mut BenchmarkGroup) { let rt = rt(); - b.iter(|| { - rt.block_on(async move { - let (tx, mut rx) = mpsc::channel::(1_000_000); + g.bench_function("unbounded", |b| { + b.iter(|| { + rt.block_on(async move { + let (tx, mut rx) = mpsc::unbounded_channel::(); - for i in 0..5000 { - tx.send(i).await.unwrap(); - } + for i in 0..5000 { + tx.send(i).unwrap(); + } - for _ in 0..5_000 { - let _ = rx.recv().await; - } + for _ in 0..5_000 { + let _ = rx.recv().await; + } + }) }) }); } -fn uncontented_unbounded(b: &mut Bencher) { - let rt = rt(); +fn bench_create_medium(c: &mut Criterion) { + let mut group = c.benchmark_group("create_medium"); + create_medium::<1>(&mut group); + create_medium::<100>(&mut group); + create_medium::<100_000>(&mut group); + group.finish(); +} - b.iter(|| { - rt.block_on(async move { - let (tx, mut rx) = mpsc::unbounded_channel::(); +fn bench_send(c: &mut Criterion) { + let mut group = c.benchmark_group("send"); + send_data::(&mut group, "medium"); + send_data::(&mut group, "large"); + group.finish(); +} - for i in 0..5000 { - tx.send(i).unwrap(); - } +fn bench_contention(c: &mut Criterion) { + let mut group = c.benchmark_group("contention"); + contention_bounded(&mut group); + contention_bounded_full(&mut group); + contention_unbounded(&mut group); + group.finish(); +} - for _ in 0..5_000 { - let _ = rx.recv().await; - } - }) - }); +fn bench_uncontented(c: &mut Criterion) { + let mut group = c.benchmark_group("uncontented"); + uncontented_bounded(&mut group); + uncontented_unbounded(&mut group); + group.finish(); } -bencher::benchmark_group!( - create, - create_1_medium, - create_100_medium, - create_100_000_medium -); - -bencher::benchmark_group!(send, send_medium, send_large); - -bencher::benchmark_group!( - contention, - contention_bounded, - contention_bounded_full, - contention_unbounded, - uncontented_bounded, - uncontented_unbounded -); - -bencher::benchmark_main!(create, send, contention); +criterion_group!(create, bench_create_medium); +criterion_group!(send, bench_send); +criterion_group!(contention, bench_contention); +criterion_group!(uncontented, bench_uncontented); + +criterion_main!(create, send, contention, uncontented); diff --git a/benches/sync_mpsc_oneshot.rs b/benches/sync_mpsc_oneshot.rs index 04b783b9d41..a0712c1e424 100644 --- a/benches/sync_mpsc_oneshot.rs +++ b/benches/sync_mpsc_oneshot.rs @@ -1,27 +1,28 @@ -use bencher::{benchmark_group, benchmark_main, Bencher}; use tokio::{ runtime::Runtime, sync::{mpsc, oneshot}, }; -fn request_reply_current_thread(b: &mut Bencher) { +use criterion::{criterion_group, criterion_main, Criterion}; + +fn request_reply_current_thread(c: &mut Criterion) { let rt = tokio::runtime::Builder::new_current_thread() .build() .unwrap(); - request_reply(b, rt); + request_reply(c, rt); } -fn request_reply_multi_threaded(b: &mut Bencher) { +fn request_reply_multi_threaded(c: &mut Criterion) { let rt = tokio::runtime::Builder::new_multi_thread() .worker_threads(1) .build() .unwrap(); - request_reply(b, rt); + request_reply(c, rt); } -fn request_reply(b: &mut Bencher, rt: Runtime) { +fn request_reply(b: &mut Criterion, rt: Runtime) { let tx = rt.block_on(async move { let (tx, mut rx) = mpsc::channel::>(10); tokio::spawn(async move { @@ -32,22 +33,24 @@ fn request_reply(b: &mut Bencher, rt: Runtime) { tx }); - b.iter(|| { - let task_tx = tx.clone(); - rt.block_on(async move { - for _ in 0..1_000 { - let (o_tx, o_rx) = oneshot::channel(); - task_tx.send(o_tx).await.unwrap(); - let _ = o_rx.await; - } + b.bench_function("request_reply", |b| { + b.iter(|| { + let task_tx = tx.clone(); + rt.block_on(async move { + for _ in 0..1_000 { + let (o_tx, o_rx) = oneshot::channel(); + task_tx.send(o_tx).await.unwrap(); + let _ = o_rx.await; + } + }) }) }); } -benchmark_group!( +criterion_group!( sync_mpsc_oneshot_group, request_reply_current_thread, request_reply_multi_threaded, ); -benchmark_main!(sync_mpsc_oneshot_group); +criterion_main!(sync_mpsc_oneshot_group); diff --git a/benches/sync_notify.rs b/benches/sync_notify.rs index 585984c5552..a50ad1efb38 100644 --- a/benches/sync_notify.rs +++ b/benches/sync_notify.rs @@ -1,9 +1,11 @@ -use bencher::Bencher; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use tokio::sync::Notify; +use criterion::measurement::WallTime; +use criterion::{criterion_group, criterion_main, BenchmarkGroup, Criterion}; + fn rt() -> tokio::runtime::Runtime { tokio::runtime::Builder::new_multi_thread() .worker_threads(6) @@ -11,7 +13,7 @@ fn rt() -> tokio::runtime::Runtime { .unwrap() } -fn notify_waiters(b: &mut Bencher) { +fn notify_waiters(g: &mut BenchmarkGroup) { let rt = rt(); let notify = Arc::new(Notify::new()); let counter = Arc::new(AtomicUsize::new(0)); @@ -29,18 +31,20 @@ fn notify_waiters(b: &mut Bencher) { } const N_ITERS: usize = 500; - b.iter(|| { - counter.store(0, Ordering::Relaxed); - loop { - notify.notify_waiters(); - if counter.load(Ordering::Relaxed) >= N_ITERS { - break; + g.bench_function(N_WAITERS.to_string(), |b| { + b.iter(|| { + counter.store(0, Ordering::Relaxed); + loop { + notify.notify_waiters(); + if counter.load(Ordering::Relaxed) >= N_ITERS { + break; + } } - } + }) }); } -fn notify_one(b: &mut Bencher) { +fn notify_one(g: &mut BenchmarkGroup) { let rt = rt(); let notify = Arc::new(Notify::new()); let counter = Arc::new(AtomicUsize::new(0)); @@ -58,33 +62,43 @@ fn notify_one(b: &mut Bencher) { } const N_ITERS: usize = 500; - b.iter(|| { - counter.store(0, Ordering::Relaxed); - loop { - notify.notify_one(); - if counter.load(Ordering::Relaxed) >= N_ITERS { - break; + g.bench_function(N_WAITERS.to_string(), |b| { + b.iter(|| { + counter.store(0, Ordering::Relaxed); + loop { + notify.notify_one(); + if counter.load(Ordering::Relaxed) >= N_ITERS { + break; + } } - } + }) }); } -bencher::benchmark_group!( - notify_waiters_simple, - notify_waiters::<10>, - notify_waiters::<50>, - notify_waiters::<100>, - notify_waiters::<200>, - notify_waiters::<500> -); +fn bench_notify_one(c: &mut Criterion) { + let mut group = c.benchmark_group("notify_one"); + notify_one::<10>(&mut group); + notify_one::<50>(&mut group); + notify_one::<100>(&mut group); + notify_one::<200>(&mut group); + notify_one::<500>(&mut group); + group.finish(); +} + +fn bench_notify_waiters(c: &mut Criterion) { + let mut group = c.benchmark_group("notify_waiters"); + notify_waiters::<10>(&mut group); + notify_waiters::<50>(&mut group); + notify_waiters::<100>(&mut group); + notify_waiters::<200>(&mut group); + notify_waiters::<500>(&mut group); + group.finish(); +} -bencher::benchmark_group!( - notify_one_simple, - notify_one::<10>, - notify_one::<50>, - notify_one::<100>, - notify_one::<200>, - notify_one::<500> +criterion_group!( + notify_waiters_simple, + bench_notify_one, + bench_notify_waiters ); -bencher::benchmark_main!(notify_waiters_simple, notify_one_simple); +criterion_main!(notify_waiters_simple); diff --git a/benches/sync_rwlock.rs b/benches/sync_rwlock.rs index 1ac56518d8e..9b4c1ee49be 100644 --- a/benches/sync_rwlock.rs +++ b/benches/sync_rwlock.rs @@ -1,26 +1,30 @@ -use bencher::{black_box, Bencher}; use std::sync::Arc; use tokio::{sync::RwLock, task}; -fn read_uncontended(b: &mut Bencher) { +use criterion::measurement::WallTime; +use criterion::{black_box, criterion_group, criterion_main, BenchmarkGroup, Criterion}; + +fn read_uncontended(g: &mut BenchmarkGroup) { let rt = tokio::runtime::Builder::new_multi_thread() .worker_threads(6) .build() .unwrap(); let lock = Arc::new(RwLock::new(())); - b.iter(|| { - let lock = lock.clone(); - rt.block_on(async move { - for _ in 0..6 { - let read = lock.read().await; - let _read = black_box(read); - } + g.bench_function("read", |b| { + b.iter(|| { + let lock = lock.clone(); + rt.block_on(async move { + for _ in 0..6 { + let read = lock.read().await; + let _read = black_box(read); + } + }) }) }); } -fn read_concurrent_uncontended_multi(b: &mut Bencher) { +fn read_concurrent_uncontended_multi(g: &mut BenchmarkGroup) { let rt = tokio::runtime::Builder::new_multi_thread() .worker_threads(6) .build() @@ -32,23 +36,25 @@ fn read_concurrent_uncontended_multi(b: &mut Bencher) { } let lock = Arc::new(RwLock::new(())); - b.iter(|| { - let lock = lock.clone(); - rt.block_on(async move { - let j = tokio::try_join! { - task::spawn(task(lock.clone())), - task::spawn(task(lock.clone())), - task::spawn(task(lock.clone())), - task::spawn(task(lock.clone())), - task::spawn(task(lock.clone())), - task::spawn(task(lock.clone())) - }; - j.unwrap(); + g.bench_function("read_concurrent_multi", |b| { + b.iter(|| { + let lock = lock.clone(); + rt.block_on(async move { + let j = tokio::try_join! { + task::spawn(task(lock.clone())), + task::spawn(task(lock.clone())), + task::spawn(task(lock.clone())), + task::spawn(task(lock.clone())), + task::spawn(task(lock.clone())), + task::spawn(task(lock.clone())) + }; + j.unwrap(); + }) }) }); } -fn read_concurrent_uncontended(b: &mut Bencher) { +fn read_concurrent_uncontended(g: &mut BenchmarkGroup) { let rt = tokio::runtime::Builder::new_current_thread() .build() .unwrap(); @@ -59,22 +65,24 @@ fn read_concurrent_uncontended(b: &mut Bencher) { } let lock = Arc::new(RwLock::new(())); - b.iter(|| { - let lock = lock.clone(); - rt.block_on(async move { - tokio::join! { - task(lock.clone()), - task(lock.clone()), - task(lock.clone()), - task(lock.clone()), - task(lock.clone()), - task(lock.clone()) - }; + g.bench_function("read_concurrent", |b| { + b.iter(|| { + let lock = lock.clone(); + rt.block_on(async move { + tokio::join! { + task(lock.clone()), + task(lock.clone()), + task(lock.clone()), + task(lock.clone()), + task(lock.clone()), + task(lock.clone()) + }; + }) }) }); } -fn read_concurrent_contended_multi(b: &mut Bencher) { +fn read_concurrent_contended_multi(g: &mut BenchmarkGroup) { let rt = tokio::runtime::Builder::new_multi_thread() .worker_threads(6) .build() @@ -86,24 +94,26 @@ fn read_concurrent_contended_multi(b: &mut Bencher) { } let lock = Arc::new(RwLock::new(())); - b.iter(|| { - let lock = lock.clone(); - rt.block_on(async move { - let write = lock.write().await; - let j = tokio::try_join! { - async move { drop(write); Ok(()) }, - task::spawn(task(lock.clone())), - task::spawn(task(lock.clone())), - task::spawn(task(lock.clone())), - task::spawn(task(lock.clone())), - task::spawn(task(lock.clone())), - }; - j.unwrap(); + g.bench_function("read_concurrent_multi", |b| { + b.iter(|| { + let lock = lock.clone(); + rt.block_on(async move { + let write = lock.write().await; + let j = tokio::try_join! { + async move { drop(write); Ok(()) }, + task::spawn(task(lock.clone())), + task::spawn(task(lock.clone())), + task::spawn(task(lock.clone())), + task::spawn(task(lock.clone())), + task::spawn(task(lock.clone())), + }; + j.unwrap(); + }) }) }); } -fn read_concurrent_contended(b: &mut Bencher) { +fn read_concurrent_contended(g: &mut BenchmarkGroup) { let rt = tokio::runtime::Builder::new_current_thread() .build() .unwrap(); @@ -114,29 +124,40 @@ fn read_concurrent_contended(b: &mut Bencher) { } let lock = Arc::new(RwLock::new(())); - b.iter(|| { - let lock = lock.clone(); - rt.block_on(async move { - let write = lock.write().await; - tokio::join! { - async move { drop(write) }, - task(lock.clone()), - task(lock.clone()), - task(lock.clone()), - task(lock.clone()), - task(lock.clone()), - }; + g.bench_function("read_concurrent", |b| { + b.iter(|| { + let lock = lock.clone(); + rt.block_on(async move { + let write = lock.write().await; + tokio::join! { + async move { drop(write) }, + task(lock.clone()), + task(lock.clone()), + task(lock.clone()), + task(lock.clone()), + task(lock.clone()), + }; + }) }) }); } -bencher::benchmark_group!( - sync_rwlock, - read_uncontended, - read_concurrent_uncontended, - read_concurrent_uncontended_multi, - read_concurrent_contended, - read_concurrent_contended_multi -); +fn bench_contention(c: &mut Criterion) { + let mut group = c.benchmark_group("contention"); + read_concurrent_contended(&mut group); + read_concurrent_contended_multi(&mut group); + group.finish(); +} + +fn bench_uncontented(c: &mut Criterion) { + let mut group = c.benchmark_group("uncontented"); + read_uncontended(&mut group); + read_concurrent_uncontended(&mut group); + read_concurrent_uncontended_multi(&mut group); + group.finish(); +} + +criterion_group!(contention, bench_contention); +criterion_group!(uncontented, bench_uncontented); -bencher::benchmark_main!(sync_rwlock); +criterion_main!(contention, uncontented); diff --git a/benches/sync_semaphore.rs b/benches/sync_semaphore.rs index 17b26148b64..e664e64c94b 100644 --- a/benches/sync_semaphore.rs +++ b/benches/sync_semaphore.rs @@ -1,21 +1,36 @@ -use bencher::Bencher; use std::sync::Arc; +use tokio::runtime::Runtime; use tokio::{sync::Semaphore, task}; -fn uncontended(b: &mut Bencher) { - let rt = tokio::runtime::Builder::new_multi_thread() +use criterion::measurement::WallTime; +use criterion::{criterion_group, criterion_main, BenchmarkGroup, Criterion}; + +fn single_rt() -> Runtime { + tokio::runtime::Builder::new_current_thread() + .build() + .unwrap() +} + +fn multi_rt() -> Runtime { + tokio::runtime::Builder::new_multi_thread() .worker_threads(6) .build() - .unwrap(); + .unwrap() +} + +fn uncontended(g: &mut BenchmarkGroup) { + let rt = multi_rt(); let s = Arc::new(Semaphore::new(10)); - b.iter(|| { - let s = s.clone(); - rt.block_on(async move { - for _ in 0..6 { - let permit = s.acquire().await; - drop(permit); - } + g.bench_function("multi", |b| { + b.iter(|| { + let s = s.clone(); + rt.block_on(async move { + for _ in 0..6 { + let permit = s.acquire().await; + drop(permit); + } + }) }) }); } @@ -25,101 +40,108 @@ async fn task(s: Arc) { drop(permit); } -fn uncontended_concurrent_multi(b: &mut Bencher) { - let rt = tokio::runtime::Builder::new_multi_thread() - .worker_threads(6) - .build() - .unwrap(); +fn uncontended_concurrent_multi(g: &mut BenchmarkGroup) { + let rt = multi_rt(); let s = Arc::new(Semaphore::new(10)); - b.iter(|| { - let s = s.clone(); - rt.block_on(async move { - let j = tokio::try_join! { - task::spawn(task(s.clone())), - task::spawn(task(s.clone())), - task::spawn(task(s.clone())), - task::spawn(task(s.clone())), - task::spawn(task(s.clone())), - task::spawn(task(s.clone())) - }; - j.unwrap(); + g.bench_function("concurrent_multi", |b| { + b.iter(|| { + let s = s.clone(); + rt.block_on(async move { + let j = tokio::try_join! { + task::spawn(task(s.clone())), + task::spawn(task(s.clone())), + task::spawn(task(s.clone())), + task::spawn(task(s.clone())), + task::spawn(task(s.clone())), + task::spawn(task(s.clone())) + }; + j.unwrap(); + }) }) }); } -fn uncontended_concurrent_single(b: &mut Bencher) { - let rt = tokio::runtime::Builder::new_current_thread() - .build() - .unwrap(); +fn uncontended_concurrent_single(g: &mut BenchmarkGroup) { + let rt = single_rt(); let s = Arc::new(Semaphore::new(10)); - b.iter(|| { - let s = s.clone(); - rt.block_on(async move { - tokio::join! { - task(s.clone()), - task(s.clone()), - task(s.clone()), - task(s.clone()), - task(s.clone()), - task(s.clone()) - }; + g.bench_function("concurrent_single", |b| { + b.iter(|| { + let s = s.clone(); + rt.block_on(async move { + tokio::join! { + task(s.clone()), + task(s.clone()), + task(s.clone()), + task(s.clone()), + task(s.clone()), + task(s.clone()) + }; + }) }) }); } -fn contended_concurrent_multi(b: &mut Bencher) { - let rt = tokio::runtime::Builder::new_multi_thread() - .worker_threads(6) - .build() - .unwrap(); +fn contended_concurrent_multi(g: &mut BenchmarkGroup) { + let rt = multi_rt(); let s = Arc::new(Semaphore::new(5)); - b.iter(|| { - let s = s.clone(); - rt.block_on(async move { - let j = tokio::try_join! { - task::spawn(task(s.clone())), - task::spawn(task(s.clone())), - task::spawn(task(s.clone())), - task::spawn(task(s.clone())), - task::spawn(task(s.clone())), - task::spawn(task(s.clone())) - }; - j.unwrap(); + g.bench_function("concurrent_multi", |b| { + b.iter(|| { + let s = s.clone(); + rt.block_on(async move { + let j = tokio::try_join! { + task::spawn(task(s.clone())), + task::spawn(task(s.clone())), + task::spawn(task(s.clone())), + task::spawn(task(s.clone())), + task::spawn(task(s.clone())), + task::spawn(task(s.clone())) + }; + j.unwrap(); + }) }) }); } -fn contended_concurrent_single(b: &mut Bencher) { - let rt = tokio::runtime::Builder::new_current_thread() - .build() - .unwrap(); +fn contended_concurrent_single(g: &mut BenchmarkGroup) { + let rt = single_rt(); let s = Arc::new(Semaphore::new(5)); - b.iter(|| { - let s = s.clone(); - rt.block_on(async move { - tokio::join! { - task(s.clone()), - task(s.clone()), - task(s.clone()), - task(s.clone()), - task(s.clone()), - task(s.clone()) - }; + g.bench_function("concurrent_single", |b| { + b.iter(|| { + let s = s.clone(); + rt.block_on(async move { + tokio::join! { + task(s.clone()), + task(s.clone()), + task(s.clone()), + task(s.clone()), + task(s.clone()), + task(s.clone()) + }; + }) }) }); } -bencher::benchmark_group!( - sync_semaphore, - uncontended, - uncontended_concurrent_multi, - uncontended_concurrent_single, - contended_concurrent_multi, - contended_concurrent_single -); +fn bench_contention(c: &mut Criterion) { + let mut group = c.benchmark_group("contention"); + contended_concurrent_multi(&mut group); + contended_concurrent_single(&mut group); + group.finish(); +} + +fn bench_uncontented(c: &mut Criterion) { + let mut group = c.benchmark_group("uncontented"); + uncontended(&mut group); + uncontended_concurrent_multi(&mut group); + uncontended_concurrent_single(&mut group); + group.finish(); +} + +criterion_group!(contention, bench_contention); +criterion_group!(uncontented, bench_uncontented); -bencher::benchmark_main!(sync_semaphore); +criterion_main!(contention, uncontented); diff --git a/benches/sync_watch.rs b/benches/sync_watch.rs index 401f83c82e7..63370edcb1c 100644 --- a/benches/sync_watch.rs +++ b/benches/sync_watch.rs @@ -1,9 +1,10 @@ -use bencher::{black_box, Bencher}; use rand::prelude::*; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use tokio::sync::{watch, Notify}; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; + fn rt() -> tokio::runtime::Runtime { tokio::runtime::Builder::new_multi_thread() .worker_threads(6) @@ -24,7 +25,7 @@ fn do_work(rng: &mut impl RngCore) -> u32 { .fold(0, u32::wrapping_add) } -fn contention_resubscribe(b: &mut Bencher) { +fn contention_resubscribe(c: &mut Criterion) { const NTASK: u64 = 1000; let rt = rt(); @@ -46,19 +47,21 @@ fn contention_resubscribe(b: &mut Bencher) { }); } - b.iter(|| { - rt.block_on(async { - for _ in 0..100 { - assert_eq!(wg.0.fetch_add(NTASK, Ordering::Relaxed), 0); - let _ = snd.send(black_box(42)); - while wg.0.load(Ordering::Acquire) > 0 { - wg.1.notified().await; + c.bench_function("contention_resubscribe", |b| { + b.iter(|| { + rt.block_on(async { + for _ in 0..100 { + assert_eq!(wg.0.fetch_add(NTASK, Ordering::Relaxed), 0); + let _ = snd.send(black_box(42)); + while wg.0.load(Ordering::Acquire) > 0 { + wg.1.notified().await; + } } - } - }); + }); + }) }); } -bencher::benchmark_group!(contention, contention_resubscribe); +criterion_group!(contention, contention_resubscribe); -bencher::benchmark_main!(contention); +criterion_main!(contention); diff --git a/benches/time_now.rs b/benches/time_now.rs index 8146285691f..d87db2121f1 100644 --- a/benches/time_now.rs +++ b/benches/time_now.rs @@ -2,24 +2,23 @@ //! This essentially measure the time to enqueue a task in the local and remote //! case. -#[macro_use] -extern crate bencher; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; -use bencher::{black_box, Bencher}; - -fn time_now_current_thread(bench: &mut Bencher) { +fn time_now_current_thread(c: &mut Criterion) { let rt = tokio::runtime::Builder::new_current_thread() .enable_time() .build() .unwrap(); - bench.iter(|| { - rt.block_on(async { - black_box(tokio::time::Instant::now()); + c.bench_function("time_now_current_thread", |b| { + b.iter(|| { + rt.block_on(async { + black_box(tokio::time::Instant::now()); + }) }) - }) + }); } -bencher::benchmark_group!(time_now, time_now_current_thread,); +criterion_group!(time_now, time_now_current_thread); -bencher::benchmark_main!(time_now); +criterion_main!(time_now); From 65027b60bc7f8a01d82eae946d58f1d44e9bf9d5 Mon Sep 17 00:00:00 2001 From: Marek Kuskowski <50183564+nylonicious@users.noreply.github.com> Date: Sun, 10 Sep 2023 16:46:07 +0200 Subject: [PATCH 3/7] io: add `Interest::remove` method (#5906) --- tokio/src/io/interest.rs | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/tokio/src/io/interest.rs b/tokio/src/io/interest.rs index 9256bd238da..cf6d270e0c6 100644 --- a/tokio/src/io/interest.rs +++ b/tokio/src/io/interest.rs @@ -167,6 +167,40 @@ impl Interest { Self(self.0 | other.0) } + /// Remove `Interest` from `self`. + /// + /// Interests present in `other` but *not* in `self` are ignored. + /// + /// Returns `None` if the set would be empty after removing `Interest`. + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Interest; + /// + /// const RW_INTEREST: Interest = Interest::READABLE.add(Interest::WRITABLE); + /// + /// let w_interest = RW_INTEREST.remove(Interest::READABLE).unwrap(); + /// assert!(!w_interest.is_readable()); + /// assert!(w_interest.is_writable()); + /// + /// // Removing all interests from the set returns `None`. + /// assert_eq!(w_interest.remove(Interest::WRITABLE), None); + /// + /// // Remove all interests at once. + /// assert_eq!(RW_INTEREST.remove(RW_INTEREST), None); + /// ``` + #[must_use = "this returns the result of the operation, without modifying the original"] + pub fn remove(self, other: Interest) -> Option { + let value = self.0 & !other.0; + + if value != 0 { + Some(Self(value)) + } else { + None + } + } + // This function must be crate-private to avoid exposing a `mio` dependency. pub(crate) fn to_mio(self) -> mio::Interest { fn mio_add(wrapped: &mut Option, add: mio::Interest) { From 61f095fdc136f3e2a547d09b60a3ce1ef598b1f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20Bl=C3=A9ron?= Date: Sun, 10 Sep 2023 20:08:12 +0200 Subject: [PATCH 4/7] sync: add `?Sized` bound to `{MutexGuard,OwnedMutexGuard}::map` (#5997) --- tokio/src/sync/mutex.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tokio/src/sync/mutex.rs b/tokio/src/sync/mutex.rs index 0700c260eb2..1d44e938bf5 100644 --- a/tokio/src/sync/mutex.rs +++ b/tokio/src/sync/mutex.rs @@ -846,6 +846,7 @@ impl<'a, T: ?Sized> MutexGuard<'a, T> { #[inline] pub fn map(mut this: Self, f: F) -> MappedMutexGuard<'a, U> where + U: ?Sized, F: FnOnce(&mut T) -> &mut U, { let data = f(&mut *this) as *mut U; @@ -894,6 +895,7 @@ impl<'a, T: ?Sized> MutexGuard<'a, T> { #[inline] pub fn try_map(mut this: Self, f: F) -> Result, Self> where + U: ?Sized, F: FnOnce(&mut T) -> Option<&mut U>, { let data = match f(&mut *this) { @@ -1026,6 +1028,7 @@ impl OwnedMutexGuard { #[inline] pub fn map(mut this: Self, f: F) -> OwnedMappedMutexGuard where + U: ?Sized, F: FnOnce(&mut T) -> &mut U, { let data = f(&mut *this) as *mut U; @@ -1074,6 +1077,7 @@ impl OwnedMutexGuard { #[inline] pub fn try_map(mut this: Self, f: F) -> Result, Self> where + U: ?Sized, F: FnOnce(&mut T) -> Option<&mut U>, { let data = match f(&mut *this) { From 1c428cc558bd3be71fcd9466010b2ff0ed4048b6 Mon Sep 17 00:00:00 2001 From: Icenowy Zheng Date: Mon, 11 Sep 2023 15:22:14 +0800 Subject: [PATCH 5/7] tokio: fix cache line size for RISC-V (#5994) --- tokio/src/runtime/io/scheduled_io.rs | 12 +++--------- tokio/src/runtime/task/core.rs | 12 +++--------- tokio/src/util/cacheline.rs | 14 ++++---------- 3 files changed, 10 insertions(+), 28 deletions(-) diff --git a/tokio/src/runtime/io/scheduled_io.rs b/tokio/src/runtime/io/scheduled_io.rs index ddce4b3ae4b..04d81fef0c8 100644 --- a/tokio/src/runtime/io/scheduled_io.rs +++ b/tokio/src/runtime/io/scheduled_io.rs @@ -44,25 +44,20 @@ use std::task::{Context, Poll, Waker}; ), repr(align(128)) )] -// arm, mips, mips64, riscv64, sparc, and hexagon have 32-byte cache line size. +// arm, mips, mips64, sparc, and hexagon have 32-byte cache line size. // // Sources: // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9 -// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_riscv64.go#L7 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L17 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/hexagon/include/asm/cache.h#L12 -// -// riscv32 is assumed not to exceed the cache line size of riscv64. #[cfg_attr( any( target_arch = "arm", target_arch = "mips", target_arch = "mips64", - target_arch = "riscv32", - target_arch = "riscv64", target_arch = "sparc", target_arch = "hexagon", ), @@ -79,12 +74,13 @@ use std::task::{Context, Poll, Waker}; // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/s390/include/asm/cache.h#L13 #[cfg_attr(target_arch = "s390x", repr(align(256)))] -// x86, wasm, and sparc64 have 64-byte cache line size. +// x86, riscv, wasm, and sparc64 have 64-byte cache line size. // // Sources: // - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L19 +// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/riscv/include/asm/cache.h#L10 // // All others are assumed to have 64-byte cache line size. #[cfg_attr( @@ -95,8 +91,6 @@ use std::task::{Context, Poll, Waker}; target_arch = "arm", target_arch = "mips", target_arch = "mips64", - target_arch = "riscv32", - target_arch = "riscv64", target_arch = "sparc", target_arch = "hexagon", target_arch = "m68k", diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs index d62ea965659..6f5867df574 100644 --- a/tokio/src/runtime/task/core.rs +++ b/tokio/src/runtime/task/core.rs @@ -57,25 +57,20 @@ use std::task::{Context, Poll, Waker}; ), repr(align(128)) )] -// arm, mips, mips64, riscv64, sparc, and hexagon have 32-byte cache line size. +// arm, mips, mips64, sparc, and hexagon have 32-byte cache line size. // // Sources: // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9 -// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_riscv64.go#L7 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L17 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/hexagon/include/asm/cache.h#L12 -// -// riscv32 is assumed not to exceed the cache line size of riscv64. #[cfg_attr( any( target_arch = "arm", target_arch = "mips", target_arch = "mips64", - target_arch = "riscv32", - target_arch = "riscv64", target_arch = "sparc", target_arch = "hexagon", ), @@ -92,12 +87,13 @@ use std::task::{Context, Poll, Waker}; // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/s390/include/asm/cache.h#L13 #[cfg_attr(target_arch = "s390x", repr(align(256)))] -// x86, wasm, and sparc64 have 64-byte cache line size. +// x86, riscv, wasm, and sparc64 have 64-byte cache line size. // // Sources: // - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L19 +// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/riscv/include/asm/cache.h#L10 // // All others are assumed to have 64-byte cache line size. #[cfg_attr( @@ -108,8 +104,6 @@ use std::task::{Context, Poll, Waker}; target_arch = "arm", target_arch = "mips", target_arch = "mips64", - target_arch = "riscv32", - target_arch = "riscv64", target_arch = "sparc", target_arch = "hexagon", target_arch = "m68k", diff --git a/tokio/src/util/cacheline.rs b/tokio/src/util/cacheline.rs index 64fd5ccad33..b34004c1ac1 100644 --- a/tokio/src/util/cacheline.rs +++ b/tokio/src/util/cacheline.rs @@ -27,21 +27,15 @@ use std::ops::{Deref, DerefMut}; ), repr(align(128)) )] -// arm, mips, mips64, and riscv64 have 32-byte cache line size. +// arm, mips and mips64 have 32-byte cache line size. // // Sources: // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9 -// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_riscv64.go#L7 #[cfg_attr( - any( - target_arch = "arm", - target_arch = "mips", - target_arch = "mips64", - target_arch = "riscv64", - ), + any(target_arch = "arm", target_arch = "mips", target_arch = "mips64",), repr(align(32)) )] // s390x has 256-byte cache line size. @@ -49,11 +43,12 @@ use std::ops::{Deref, DerefMut}; // Sources: // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7 #[cfg_attr(target_arch = "s390x", repr(align(256)))] -// x86 and wasm have 64-byte cache line size. +// x86, riscv and wasm have 64-byte cache line size. // // Sources: // - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7 +// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/riscv/include/asm/cache.h#L10 // // All others are assumed to have 64-byte cache line size. #[cfg_attr( @@ -64,7 +59,6 @@ use std::ops::{Deref, DerefMut}; target_arch = "arm", target_arch = "mips", target_arch = "mips64", - target_arch = "riscv64", target_arch = "s390x", )), repr(align(64)) From 61042b4d90fa737dcf922e01466ded812c0f1a03 Mon Sep 17 00:00:00 2001 From: Victor Timofei Date: Mon, 11 Sep 2023 11:50:29 +0000 Subject: [PATCH 6/7] sync: add `watch::Receiver::mark_unseen` (#5962) --- tokio/src/sync/watch.rs | 19 +++++++++++-- tokio/tests/sync_watch.rs | 58 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 2 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 5a46670eeeb..0452a81aa0a 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -380,7 +380,17 @@ mod state { impl Version { /// Get the initial version when creating the channel. pub(super) fn initial() -> Self { - Version(0) + // The initial version is 1 so that `mark_unseen` can decrement by one. + // (The value is 2 due to the closed bit.) + Version(2) + } + + /// Decrements the version. + pub(super) fn decrement(&mut self) { + // Decrement by two to avoid touching the CLOSED bit. + if self.0 >= 2 { + self.0 -= 2; + } } } @@ -400,7 +410,7 @@ mod state { /// Create a new `AtomicState` that is not closed and which has the /// version set to `Version::initial()`. pub(super) fn new() -> Self { - AtomicState(AtomicUsize::new(0)) + AtomicState(AtomicUsize::new(2)) } /// Load the current value of the state. @@ -634,6 +644,11 @@ impl Receiver { Ok(self.version != new_version) } + /// Marks the state as unseen. + pub fn mark_unseen(&mut self) { + self.version.decrement(); + } + /// Waits for a change notification, then marks the newest value as seen. /// /// If the newest value in the channel has not yet been marked seen when diff --git a/tokio/tests/sync_watch.rs b/tokio/tests/sync_watch.rs index dab57aa5af6..5bcb7476888 100644 --- a/tokio/tests/sync_watch.rs +++ b/tokio/tests/sync_watch.rs @@ -44,6 +44,64 @@ fn single_rx_recv() { assert_eq!(*rx.borrow(), "two"); } +#[test] +fn rx_version_underflow() { + let (_tx, mut rx) = watch::channel("one"); + + // Version starts at 2, validate we do not underflow + rx.mark_unseen(); + rx.mark_unseen(); +} + +#[test] +fn rx_mark_unseen() { + let (tx, mut rx) = watch::channel("one"); + + let mut rx2 = rx.clone(); + let mut rx3 = rx.clone(); + let mut rx4 = rx.clone(); + { + rx.mark_unseen(); + assert!(rx.has_changed().unwrap()); + + let mut t = spawn(rx.changed()); + assert_ready_ok!(t.poll()); + } + + { + assert!(!rx2.has_changed().unwrap()); + + let mut t = spawn(rx2.changed()); + assert_pending!(t.poll()); + } + + { + rx3.mark_unseen(); + assert_eq!(*rx3.borrow(), "one"); + + assert!(rx3.has_changed().unwrap()); + + assert_eq!(*rx3.borrow_and_update(), "one"); + + assert!(!rx3.has_changed().unwrap()); + + let mut t = spawn(rx3.changed()); + assert_pending!(t.poll()); + } + + { + tx.send("two").unwrap(); + assert!(rx4.has_changed().unwrap()); + assert_eq!(*rx4.borrow_and_update(), "two"); + + rx4.mark_unseen(); + assert!(rx4.has_changed().unwrap()); + assert_eq!(*rx4.borrow_and_update(), "two") + } + + assert_eq!(*rx.borrow(), "two"); +} + #[test] fn multi_rx() { let (tx, mut rx1) = watch::channel("one"); From 65e7715909694d8d7d15b081cc3790b4ea1ad300 Mon Sep 17 00:00:00 2001 From: Victor Timofei Date: Mon, 11 Sep 2023 23:53:05 +0000 Subject: [PATCH 7/7] util: replace `sync::reusable_box::Pending` with `std::future::Pending` (#6000) --- tokio-util/src/sync/reusable_box.rs | 18 ++---------------- 1 file changed, 2 insertions(+), 16 deletions(-) diff --git a/tokio-util/src/sync/reusable_box.rs b/tokio-util/src/sync/reusable_box.rs index 1b8ef605e15..1fae38cc6ff 100644 --- a/tokio-util/src/sync/reusable_box.rs +++ b/tokio-util/src/sync/reusable_box.rs @@ -1,7 +1,6 @@ use std::alloc::Layout; use std::fmt; -use std::future::Future; -use std::marker::PhantomData; +use std::future::{self, Future}; use std::mem::{self, ManuallyDrop}; use std::pin::Pin; use std::ptr; @@ -61,7 +60,7 @@ impl<'a, T> ReusableBoxFuture<'a, T> { F: Future + Send + 'a, { // future::Pending is a ZST so this never allocates. - let boxed = mem::replace(&mut this.boxed, Box::pin(Pending(PhantomData))); + let boxed = mem::replace(&mut this.boxed, Box::pin(future::pending())); reuse_pin_box(boxed, future, |boxed| this.boxed = Pin::from(boxed)) } @@ -156,16 +155,3 @@ impl O> Drop for CallOnDrop { f(); } } - -/// The same as `std::future::Pending`; we can't use that type directly because on rustc -/// versions <1.60 it didn't unconditionally implement `Send`. -// FIXME: use `std::future::Pending` once the MSRV is >=1.60 -struct Pending(PhantomData T>); - -impl Future for Pending { - type Output = T; - - fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { - Poll::Pending - } -}