Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: Update broadcast::recv to return a named future #6908

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 56 additions & 10 deletions tokio/src/sync/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,17 +389,55 @@ struct RecvGuard<'a, T> {
slot: RwLockReadGuard<'a, Slot<T>>,
}

pub(crate) mod future {
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

use pin_project_lite::pin_project;

use crate::runtime::coop::Coop;

use super::{error::RecvError, RecvInner};

pin_project! {
/// Future for the [`Receiver::recv`][super::Receiver::recv] method.
pub struct Recv<'a, T>
where
T: Clone,
{
#[pin]
pub(super) inner: Coop<RecvInner<'a, T>>,
}
}

impl<'a, T> Future for Recv<'a, T>
where
T: Clone,
{
type Output = Result<T, RecvError>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().inner.poll(cx)
}
}
}

use self::future::Recv;

/// Receive a value future.
struct Recv<'a, T> {
struct RecvInner<'a, T> {
/// Receiver being waited on.
receiver: &'a mut Receiver<T>,

/// Entry in the waiter `LinkedList`.
waiter: UnsafeCell<Waiter>,
}

unsafe impl<'a, T: Send> Send for Recv<'a, T> {}
unsafe impl<'a, T: Send> Sync for Recv<'a, T> {}
unsafe impl<'a, T: Send> Send for RecvInner<'a, T> {}
unsafe impl<'a, T: Send> Sync for RecvInner<'a, T> {}

/// Max number of receivers. Reserve space to lock.
const MAX_RECEIVERS: usize = usize::MAX >> 2;
Expand Down Expand Up @@ -1192,6 +1230,12 @@ impl<T: Clone> Receiver<T> {
}
/// Receives the next value for this receiver.
///
/// Equivalent to:
///
/// ```ignore
/// async fn recv(&self) -> Result<T, RecvError>;
/// ```
///
/// Each [`Receiver`] handle will receive a clone of all values sent
/// **after** it has subscribed.
///
Expand Down Expand Up @@ -1262,8 +1306,10 @@ impl<T: Clone> Receiver<T> {
/// assert_eq!(30, rx.recv().await.unwrap());
/// }
/// ```
pub async fn recv(&mut self) -> Result<T, RecvError> {
cooperative(Recv::new(self)).await
pub fn recv(&mut self) -> Recv<'_, T> {
Recv {
inner: cooperative(RecvInner::new(self)),
}
}

/// Attempts to return a pending value on this receiver without awaiting.
Expand Down Expand Up @@ -1363,9 +1409,9 @@ impl<T> Drop for Receiver<T> {
}
}

impl<'a, T> Recv<'a, T> {
fn new(receiver: &'a mut Receiver<T>) -> Recv<'a, T> {
Recv {
impl<'a, T> RecvInner<'a, T> {
fn new(receiver: &'a mut Receiver<T>) -> RecvInner<'a, T> {
RecvInner {
receiver,
waiter: UnsafeCell::new(Waiter {
queued: AtomicBool::new(false),
Expand All @@ -1389,7 +1435,7 @@ impl<'a, T> Recv<'a, T> {
}
}

impl<'a, T> Future for Recv<'a, T>
impl<'a, T> Future for RecvInner<'a, T>
where
T: Clone,
{
Expand All @@ -1411,7 +1457,7 @@ where
}
}

impl<'a, T> Drop for Recv<'a, T> {
impl<'a, T> Drop for RecvInner<'a, T> {
fn drop(&mut self) {
// Safety: `waiter.queued` is atomic.
// Acquire ordering is required to synchronize with
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@
cfg_sync! {
/// Named future types.
pub mod futures {
pub use super::notify::Notified;
pub use super::{notify::Notified, broadcast::future::Recv};
}

mod barrier;
Expand Down
Loading