Skip to content

Commit

Permalink
test: Added block_on
Browse files Browse the repository at this point in the history
  • Loading branch information
nurmohammed840 committed Oct 10, 2024
1 parent 21ac180 commit f66e471
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 0 deletions.
28 changes: 28 additions & 0 deletions tokio-test/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,34 @@ impl<T: Future> Spawn<T> {
let fut = self.future.as_mut();
self.task.enter(|cx| fut.poll(cx))
}

/// Run a future to completion on the current thread.
///
/// This function will block the caller until the given future has completed.
///
/// Note: This does not create a Tokio runtime, and therefore does not support
/// Tokio-specific asynchronous APIs, such as [tokio::time::sleep].
pub fn block_on(&mut self) -> T::Output {
loop {
match self.poll() {
Poll::Ready(val) => return val,
Poll::Pending => {
let mut guard = self.task.waker.state.lock().unwrap();
let state = *guard;

if state == WAKE {
continue;
}

assert_eq!(state, IDLE);
*guard = SLEEP;
let guard = self.task.waker.condvar.wait(guard).unwrap();
assert_eq!(*guard, WAKE);
drop(guard);
}
};
}
}
}

impl<T: Stream> Spawn<T> {
Expand Down
47 changes: 47 additions & 0 deletions tokio-test/tests/task.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::future::poll_fn;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread;
use std::time::Duration;
use tokio_stream::Stream;
use tokio_test::task;

Expand All @@ -23,3 +26,47 @@ fn test_spawn_stream_size_hint() {
let spawn = task::spawn(SizedStream);
assert_eq!(spawn.size_hint(), (100, Some(200)));
}

#[test]
fn test_spawn_block_on() {
let job = thread::spawn(move || {
task::spawn(async {
let mut poll_once = false;
poll_fn(|cx| {
if poll_once {
return Poll::Ready(());
}
assert!(!poll_once);
poll_once = true;
cx.waker().wake_by_ref();
Poll::Pending
})
.await;

let mut once = false;
poll_fn(|cx| {
if once {
return Poll::Ready(());
}
let waker = cx.waker().clone();
thread::spawn(move || {
thread::sleep(Duration::from_millis(333));
waker.wake();
});
assert!(!once);
once = true;
Poll::Pending
})
.await;
})
.block_on();
});

let job2 = thread::spawn(|| {
task::spawn(async { std::future::pending::<()>().await }).block_on();
});

thread::sleep(Duration::from_secs(2));
assert!(job.is_finished());
assert!(!job2.is_finished());
}

0 comments on commit f66e471

Please sign in to comment.