Skip to content

Commit

Permalink
io: add AsyncFd::try_io() and try_io_mut()
Browse files Browse the repository at this point in the history
This allows to provide APIs like `try_recv()` and `try_send()` in custom
types built on top of `AsyncFd`.
  • Loading branch information
de-vri-es committed Nov 11, 2024
1 parent bb7ca75 commit 298f98b
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 1 deletion.
50 changes: 50 additions & 0 deletions tokio/src/io/async_fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,56 @@ impl<T: AsRawFd> AsyncFd<T> {
.async_io(interest, || f(self.inner.as_mut().unwrap()))
.await
}

/// Tries to read or write from the file descriptor using a user-provided IO operation.
///
/// If the file descriptor is ready, the provided closure is called. The closure
/// should attempt to perform IO operation on the file descriptor by manually
/// calling the appropriate syscall. If the operation fails because the
/// file descriptor is not actually ready, then the closure should return a
/// `WouldBlock` error and the readiness flag is cleared. The return value
/// of the closure is then returned by `try_io`.
///
/// If the file descriptor is not ready, then the closure is not called
/// and a `WouldBlock` error is returned.
///
/// The closure should only return a `WouldBlock` error if it has performed
/// an IO operation on the file descriptor that failed due to the file descriptor not being
/// ready. Returning a `WouldBlock` error in any other situation will
/// incorrectly clear the readiness flag, which can cause the file descriptor to
/// behave incorrectly.
///
/// The closure should not perform the IO operation using any of the methods
/// defined on the Tokio `AsyncFd` type, as this will mess with the
/// readiness flag and can cause the file descriptor to behave incorrectly.
///
/// This method is not intended to be used with combined interests.
/// The closure should perform only one type of IO operation, so it should not
/// require more than one ready state. This method may panic or sleep forever
/// if it is called with a combined interest.
pub fn try_io<R>(
&self,
interest: Interest,
f: impl FnOnce(&T) -> io::Result<R>,
) -> io::Result<R> {
self.registration
.try_io(interest, || f(self.inner.as_ref().unwrap()))
}

/// Tries to read or write from the file descriptor using a user-provided IO operation.
///
/// The behavior is the same as [`try_io`], except that the closure can mutate the inner
/// value of the [`AsyncFd`].
///
/// [`try_io`]: AsyncFd::try_io
pub fn try_io_mut<R>(
&mut self,
interest: Interest,
f: impl FnOnce(&mut T) -> io::Result<R>,
) -> io::Result<R> {
self.registration
.try_io(interest, || f(self.inner.as_mut().unwrap()))
}
}

impl<T: AsRawFd> AsRawFd for AsyncFd<T> {
Expand Down
102 changes: 101 additions & 1 deletion tokio/tests/io_async_fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ async fn reregister() {
}

#[tokio::test]
async fn try_io() {
async fn guard_try_io() {
let (a, mut b) = socketpair();

b.write_all(b"0").unwrap();
Expand Down Expand Up @@ -330,6 +330,106 @@ async fn try_io() {
let _ = readable.await.unwrap();
}

#[tokio::test]
async fn try_io_readable() {
let (a, mut b) = socketpair();
let mut afd_a = AsyncFd::new(a).unwrap();

// Give the runtime some time to update bookkeeping.
tokio::time::sleep(Duration::from_micros(1)).await;

{
let mut called = false;
let _ = afd_a.try_io_mut(Interest::READABLE, |_| {
called = true;
Ok(())
});
assert!(
!called,
"closure should not have been called, since socket should not be readable"
);
}

// Make `a` readable by writing to `b`.
// Give the runtime some time to update bookkeeping.
b.write_all(&[0]).unwrap();
tokio::time::sleep(Duration::from_micros(1)).await;

{
let mut called = false;
let _ = afd_a.try_io(Interest::READABLE, |_| {
called = true;
Ok(())
});
assert!(
called,
"closure should have been called, since socket should have data available to read"
);
}

{
let mut called = false;
let _ = afd_a.try_io(Interest::READABLE, |_| {
called = true;
io::Result::<()>::Err(ErrorKind::WouldBlock.into())
});
assert!(
called,
"closure should have been called, since socket should have data available to read"
);
}

{
let mut called = false;
let _ = afd_a.try_io(Interest::READABLE, |_| {
called = true;
Ok(())
});
assert!(!called, "closure should not have been called, since socket readable state should have been cleared");
}
}

#[tokio::test]
async fn try_io_writable() {
let (a, _b) = socketpair();
let afd_a = AsyncFd::new(a).unwrap();

// Give the runtime some time to update bookkeeping.
tokio::time::sleep(Duration::from_micros(1)).await;

{
let mut called = false;
let _ = afd_a.try_io(Interest::WRITABLE, |_| {
called = true;
Ok(())
});
assert!(
called,
"closure should have been called, since socket should still be marked as writable"
);
}
{
let mut called = false;
let _ = afd_a.try_io(Interest::WRITABLE, |_| {
called = true;
io::Result::<()>::Err(ErrorKind::WouldBlock.into())
});
assert!(
called,
"closure should have been called, since socket should still be marked as writable"
);
}

{
let mut called = false;
let _ = afd_a.try_io(Interest::WRITABLE, |_| {
called = true;
Ok(())
});
assert!(!called, "closure should not have been called, since socket writable state should have been cleared");
}
}

#[tokio::test]
async fn multiple_waiters() {
let (a, mut b) = socketpair();
Expand Down

0 comments on commit 298f98b

Please sign in to comment.