From 43be386132f3b6c71eb8cdb783cd2e28475ab0ce Mon Sep 17 00:00:00 2001 From: Evan Rittenhouse Date: Sun, 22 Sep 2024 09:31:03 -0500 Subject: [PATCH] Check tail.is_closed for return --- tokio/src/sync/broadcast.rs | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 5cf916183b5..c9469c0d3a0 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -116,7 +116,6 @@ //! } //! ``` -use crate::future::poll_fn; use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::{AtomicBool, AtomicUsize}; use crate::loom::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard}; @@ -846,7 +845,19 @@ impl Sender { /// } /// ``` pub async fn closed(&self) { - self.shared.notify_rx_drop.notified().await; + loop { + let notified = self.shared.notify_rx_drop.notified(); + + { + // Ensure the lock drops if the channel isn't closed + let tail = self.shared.tail.lock(); + if tail.closed { + return; + } + } + + notified.await; + } } fn close_channel(&self) { @@ -1398,12 +1409,13 @@ impl Drop for Receiver { let until = tail.pos; let remaining_rx = tail.rx_cnt; - drop(tail); - if remaining_rx == 0 { self.shared.notify_rx_drop.notify_waiters(); + tail.closed = true; } + drop(tail); + while self.next < until { match self.recv_ref(None) { Ok(_) => {}