Skip to content

Commit

Permalink
sync: add Sender<T>::closed future
Browse files Browse the repository at this point in the history
  • Loading branch information
evanrittenhouse committed Jul 14, 2024
1 parent c8f3539 commit 1a01e50
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 1 deletion.
2 changes: 1 addition & 1 deletion tokio/src/loom/std/mutex.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::{self, MutexGuard, TryLockError};

/// Adapter for `std::Mutex` that removes the poisoning aspects
/// from its api.
/// from its API.
#[derive(Debug)]
pub(crate) struct Mutex<T: ?Sized>(sync::Mutex<T>);

Expand Down
41 changes: 41 additions & 0 deletions tokio/src/sync/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,47 @@ impl<T> Sender<T> {
Arc::ptr_eq(&self.shared, &other.shared)
}

/// A future which completes when the number of [Receiver]s subscribed to this `Sender` reaches
/// zero.
///
/// # Examples
///
/// ```
/// use futures::FutureExt;
/// use tokio::sync::broadcast;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx1) = broadcast::channel::<u32>(16);
/// let mut rx2 = tx.subscribe();
///
/// tokio::spawn(async move {
/// assert_eq!(rx1.recv().await.unwrap(), 10);
/// });
///
/// let _ = tx.send(10);
/// assert!(tx.closed().now_or_never().is_none());
///
/// let _ = tokio::spawn(async move {
/// assert_eq!(rx2.recv().await.unwrap(), 10);
/// }).await;
///
/// assert!(tx.closed().now_or_never().is_some());
/// }
/// ```
pub async fn closed(&self) {
std::future::poll_fn(|_| {
let tail = self.shared.tail.lock();

if tail.closed || tail.rx_cnt == 0 {
Poll::Ready(())
} else {
Poll::Pending
}
})
.await;
}

fn close_channel(&self) {
let mut tail = self.shared.tail.lock();
tail.closed = true;
Expand Down

0 comments on commit 1a01e50

Please sign in to comment.