-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sync: add Sender<T>::closed future #6685
base: master
Are you sure you want to change the base?
sync: add Sender<T>::closed future #6685
Conversation
tokio/src/sync/broadcast.rs
Outdated
pub async fn closed(&self) { | ||
std::future::poll_fn(|_| { | ||
let tail = self.shared.tail.lock(); | ||
|
||
if tail.closed || tail.rx_cnt == 0 { | ||
Poll::Ready(()) | ||
} else { | ||
Poll::Pending | ||
} | ||
}) | ||
.await; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be wrong here butI think this future will only get polled once because you are not waking up the task again.
Consider this example:
#[tokio::main]
async fn main() {
let (tx, mut rx1) = tokio::sync::broadcast::channel(16);
tokio::spawn(async move {
let r = rx1.recv().await.unwrap();
println!("Rx1 received: {}", r);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
});
tx.send(20).unwrap();
println!("Waiting for subscribers to drop");
tx.closed().await;
println!("Subscribers dropped");
}
Here the call to closed
will block because it will only be polled once right when you call it. But the subscriber is not dropped at that point, so the poll_fn
will return Pending
. But because the future is never be polled again, this will not resolve after that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, this is not correct. When returning Pending
, you must register for wakeups so that you are polled again when progress can be made.
Compare with the mpsc channel:
tokio/tokio/src/sync/mpsc/chan.rs
Lines 58 to 59 in 4825c44
/// Notifies all tasks listening for the receiver being dropped. | |
notify_rx_closed: Notify, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, and I guess poll_fn
is flagged as unstable in CI anyway. I'll find another way around this - thanks both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tokio has a poll_fn
implementation you can use if you find that you need it.
What's the status on this? |
Work has been nuts - haven't had much time for anything else. This is on my backlog though |
No rush from my side. Just wanted to check in. |
1a01e50
to
d73005b
Compare
Pushed up what should be closer to the proper implementation. I'd like to add a I'll dig more into that when I have a bit more time, as well as fix the various CI issues and clean up the code (for example the tail lock will drop when it goes out of scope). |
tokio/src/sync/broadcast.rs
Outdated
self.shared.notify_rx_drop.notified().await; | ||
|
||
poll_fn(|_| { | ||
let tail = self.shared.tail.lock(); | ||
|
||
if tail.closed || tail.rx_cnt == 0 { | ||
return Poll::Ready(()); | ||
} | ||
|
||
drop(tail); | ||
return Poll::Pending; | ||
}) | ||
.await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The poll fn needs to interact with the notified()
future for this to work. Check out this for inspiration:
tokio/tokio-util/src/sync/cancellation_token.rs
Lines 303 to 319 in 5b9a290
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { | |
let mut this = self.project(); | |
loop { | |
if this.cancellation_token.is_cancelled() { | |
return Poll::Ready(()); | |
} | |
// No wakeups can be lost here because there is always a call to | |
// `is_cancelled` between the creation of the future and the call to | |
// `poll`, and the code that sets the cancelled flag does so before | |
// waking the `Notified`. | |
if this.future.as_mut().poll(cx).is_pending() { | |
return Poll::Pending; | |
} | |
this.future.set(this.cancellation_token.inner.notified()); | |
} |
You should be able to do something similar without creating a whole future struct if you use tokio::pin!
on the notified()
future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we now notify only once, when the last receiver drops, can we just await
the Notified directly? I originally had this block because I notified on every Rx
drop, so had to include an additional check to ensure that we had just dropped the last Rx
.
I'm also trying to get a better understanding of the internals here. For my own understanding - this doesn't work because, after the first call, the internal Future
state machine is progressed to the point where we're already in the poll_fn
and have a Pending
from a non-terminal Rx
dropping. That means that we'd never await the notified()
again. Is that correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, Notified
doesn't have spurious wakeups in the scenario we are using it, so it should work. Well, except for the fact that we probably want it to return immediately if it's already closed when we call it. We can do it like this:
loop {
let notified = self.shared.notify_rx_drop.notified();
if self.is_closed() { return; }
notified.await;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, done. I used tail.closed
instead of self.is_closed()
as I couldn't find an is_closed
on Sender
, which means an extra lock.
For my own curiosity, what's the advantage of the loop above vs. a simple self.shared.notify_rx_drop.notified().await
, if Notified
doesn't suffer from spurious wakeups? Now that we only notify once when the last receiver drops, it may be a nice way to avoid that extra lock I mentioned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you don't use a loop, then it changes the behavior of the test I requested in #6685 (comment). Which behavior we want is up for discussion.
d73005b
to
a215e6f
Compare
eb0d891
to
43be386
Compare
43be386
to
12d46d0
Compare
drop(rx); | ||
assert!(!task.is_woken()); | ||
assert_pending!(task.poll()); | ||
|
||
drop(rx2); | ||
assert!(task.is_woken()); | ||
assert_ready!(task.poll()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add another test that calls subscribe
between dropping the last receiver and polling closed()
, to see what happens in that case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, will do - sorry for the delay!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done - I simply re-open the channel if the rx_cnt
is 0
when calling new_receiver
. This allows us to re-create the closed()
future, which I try to demonstrate in the new broadcast_sender_closed_with_extra_subscribe
test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, it seems that this change made another test failing.
Now even after dropping all senders, the channel doesn't seem to be marked as closed when resubscribe is called afterwards.
618f51d
to
2ebc20b
Compare
We have to re-open the channel if a new receiver has subscribed after closing the channel.
2ebc20b
to
e170736
Compare
Motivation
Adds a
closed
Future tobroadcast::Sender<T>
, similar to theoneshot
ormpsc
variants, which completes when all subscribed receivers have been dropped.Solution
This is a simple
poll_fn
which wraps a check aroundshared.tail.rx_cnt
, returningReady
if the remaining count is 0.Closes: #6649