From 1a01e50760a8f822059737f2bd1fe39bf05e8f3b Mon Sep 17 00:00:00 2001 From: Evan Rittenhouse Date: Sat, 13 Jul 2024 18:41:29 -0500 Subject: [PATCH] sync: add Sender::closed future --- tokio/src/loom/std/mutex.rs | 2 +- tokio/src/sync/broadcast.rs | 41 +++++++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/tokio/src/loom/std/mutex.rs b/tokio/src/loom/std/mutex.rs index 7b8f9ba1e24..95f6d73ba60 100644 --- a/tokio/src/loom/std/mutex.rs +++ b/tokio/src/loom/std/mutex.rs @@ -1,7 +1,7 @@ use std::sync::{self, MutexGuard, TryLockError}; /// Adapter for `std::Mutex` that removes the poisoning aspects -/// from its api. +/// from its API. #[derive(Debug)] pub(crate) struct Mutex(sync::Mutex); diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index ba0a44fb8b9..d3c6c587169 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -804,6 +804,47 @@ impl Sender { Arc::ptr_eq(&self.shared, &other.shared) } + /// A future which completes when the number of [Receiver]s subscribed to this `Sender` reaches + /// zero. + /// + /// # Examples + /// + /// ``` + /// use futures::FutureExt; + /// use tokio::sync::broadcast; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx1) = broadcast::channel::(16); + /// let mut rx2 = tx.subscribe(); + /// + /// tokio::spawn(async move { + /// assert_eq!(rx1.recv().await.unwrap(), 10); + /// }); + /// + /// let _ = tx.send(10); + /// assert!(tx.closed().now_or_never().is_none()); + /// + /// let _ = tokio::spawn(async move { + /// assert_eq!(rx2.recv().await.unwrap(), 10); + /// }).await; + /// + /// assert!(tx.closed().now_or_never().is_some()); + /// } + /// ``` + pub async fn closed(&self) { + std::future::poll_fn(|_| { + let tail = self.shared.tail.lock(); + + if tail.closed || tail.rx_cnt == 0 { + Poll::Ready(()) + } else { + Poll::Pending + } + }) + .await; + } + fn close_channel(&self) { let mut tail = self.shared.tail.lock(); tail.closed = true;