Skip to content

Commit

Permalink
Check tail.is_closed for return
Browse files Browse the repository at this point in the history
  • Loading branch information
evanrittenhouse committed Sep 22, 2024
1 parent b6491c0 commit 43be386
Showing 1 changed file with 16 additions and 4 deletions.
20 changes: 16 additions & 4 deletions tokio/src/sync/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@
//! }
//! ```

use crate::future::poll_fn;
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
use crate::loom::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard};
Expand Down Expand Up @@ -846,7 +845,19 @@ impl<T> Sender<T> {
/// }
/// ```
pub async fn closed(&self) {
self.shared.notify_rx_drop.notified().await;
loop {
let notified = self.shared.notify_rx_drop.notified();

{
// Ensure the lock drops if the channel isn't closed
let tail = self.shared.tail.lock();
if tail.closed {
return;
}
}

notified.await;
}
}

fn close_channel(&self) {
Expand Down Expand Up @@ -1398,12 +1409,13 @@ impl<T> Drop for Receiver<T> {
let until = tail.pos;
let remaining_rx = tail.rx_cnt;

drop(tail);

if remaining_rx == 0 {
self.shared.notify_rx_drop.notify_waiters();
tail.closed = true;
}

drop(tail);

while self.next < until {
match self.recv_ref(None) {
Ok(_) => {}
Expand Down

0 comments on commit 43be386

Please sign in to comment.