Skip to content

Commit

Permalink
Change behavior of recv_many(buf, 0) to return 0 immediately.
Browse files Browse the repository at this point in the history
Returning 0 immediately when limit=0 is consistent with
the current take(0) behavior.  Tests were updated.
  • Loading branch information
Aaron Schweiger authored and Aaron Schweiger committed Oct 8, 2023
1 parent 5d2a8a7 commit bf44fcb
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 23 deletions.
20 changes: 11 additions & 9 deletions tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,19 +234,21 @@ impl<T> Receiver<T> {
///
/// This method extends `buffer` by no more than a fixed number
/// of values as specified by `limit`. If `limit` is zero,
/// then a default is used. The return value is the number
/// of values added to `buffer`.
/// the function returns immediately with `0`.
/// The return value is the number of values added to `buffer`.
///
/// If there are no messages in the channel's queue, but the channel has
/// not yet been closed, this method will sleep until a message is sent or
/// the channel is closed. Note that if [`close`] is called, but there are
/// For `limit > 0`, if there are no messages in the
/// channel's queue, but the channel has not yet been closed, this
/// method will sleep until a message is sent or the channel is closed.
/// Note that if [`close`] is called, but there are
/// still outstanding [`Permits`] from before it was closed, the channel is
/// not considered closed by `recv_many` until the permits are released.
///
/// This method will never return 0 unless the channel has been closed and
/// there are no remaining messages in the channel's queue. This indicates
/// that no further values can ever be received from this `Receiver`. The
/// channel is closed when all senders have been dropped, or when [`close`]
/// For non-zero values `limit`, this method will never return `0` unless
/// the channel has been closed and there are no remaining messages in the
/// channel's queue. This indicates that no further values can ever
/// be received from this `Receiver`. The channel is closed when all senders
/// have been dropped, or when [`close`]
/// is called.
///
/// The capacity of `buffer` is increased as needed.
Expand Down
8 changes: 6 additions & 2 deletions tokio/src/sync/mpsc/chan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ impl<T, S: Semaphore> Rx<T, S> {
/// Receives values into `buffer` up to its capacity
///
/// For `limit` > 0, receives up to limit values into `buffer`.
/// For `limit` = 0, receives at most `super::BLOCK_CAP` values.
/// For `limit` = 0, immediately returns Ready(0).
pub(crate) fn recv_many(
&mut self,
cx: &mut Context<'_>,
Expand All @@ -307,7 +307,11 @@ impl<T, S: Semaphore> Rx<T, S> {
) -> Poll<usize> {
use super::block::Read;

let mut remaining = if limit > 0 { limit } else { super::BLOCK_CAP };
if limit == 0 {
return Ready(0usize);
}

let mut remaining = limit;
let initial_length = buffer.len();

ready!(crate::trace::trace_leaf(cx));
Expand Down
14 changes: 7 additions & 7 deletions tokio/src/sync/mpsc/unbounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,18 +176,18 @@ impl<T> UnboundedReceiver<T> {
///
/// This method extends `buffer` by no more than a fixed number
/// of values as specified by `limit`. If `limit` is zero,
/// then a default is used. The return value is the number
/// of values added to `buffer`.
/// the function returns immediately with `0`.
/// The return value is the number of values added to `buffer`.
///
/// If there are no messages in the channel's queue, but the channel has
/// not yet been closed, this method will sleep until a message is sent or
/// the channel is closed.
///
/// This method will never return 0 unless the channel has been closed and
/// there are no remaining messages in the channel's queue. This indicates
/// that no further values can ever be received from this `Receiver`. The
/// channel is closed when all senders have been dropped, or when [`close`]
/// is called.
/// For `limit > 0`, this method will never return `0` unless
/// the channel has been closed and there are no remaining messages
/// in the channel's queue. This indicates that no further values can ever
/// be received from this `Receiver`. The channel is closed when all senders
/// have been dropped, or when [`close`] is called.
///
/// The capacity of `buffer` is increased as needed.
///
Expand Down
21 changes: 16 additions & 5 deletions tokio/tests/sync_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ async fn async_send_recv_with_buffer() {
#[cfg(feature = "full")]
async fn async_send_recv_many_with_buffer() {
let (tx, mut rx) = mpsc::channel(2);
let mut buffer = Vec::<i32>::with_capacity(3);

// With `limit=0` does not sleep, returns immediately
assert_eq!(0, rx.recv_many(&mut buffer, 0).await);

tokio::spawn(async move {
assert_ok!(tx.send(1).await);
Expand All @@ -132,7 +136,6 @@ async fn async_send_recv_many_with_buffer() {
});

let limit = 3;
let mut buffer = Vec::<i32>::with_capacity(3);
let mut recv_count = 0usize;
while recv_count < 3 {
let n = rx.recv_many(&mut buffer, limit).await;
Expand Down Expand Up @@ -206,15 +209,23 @@ async fn send_recv_unbounded() {
async fn send_recv_many_unbounded() {
let (tx, mut rx) = mpsc::unbounded_channel::<i32>();

let mut buffer: Vec<i32> = Vec::new();

// With `limit=0` does not sleep, returns immediately
rx.recv_many(&mut buffer, 0).await;
assert_eq!(0, buffer.len());

assert_ok!(tx.send(7));
assert_ok!(tx.send(13));
assert_ok!(tx.send(100));
assert_ok!(tx.send(1002));

let mut buffer: Vec<i32> = Vec::new();
rx.recv_many(&mut buffer, 0).await;
assert_eq!(0, buffer.len());

let mut count = 0;
while count < 4 {
count += rx.recv_many(&mut buffer, 0).await;
count += rx.recv_many(&mut buffer, 1).await;
}
assert_eq!(count, 4);
assert_eq!(vec![7, 13, 100, 1002], buffer);
Expand Down Expand Up @@ -258,7 +269,7 @@ async fn send_recv_many_bounded_capacity() {
}
tx.send("one more".to_string()).await.unwrap();

// Here `recv_may` receives all but the last value;
// Here `recv_many` receives all but the last value;
// the initial capacity is adequate, so the buffer does
// not increase in side.
assert_eq!(buffer.capacity(), rx.recv_many(&mut buffer, limit).await);
Expand Down Expand Up @@ -300,7 +311,7 @@ async fn send_recv_many_unbounded_capacity() {
}
tx.send("one more".to_string()).unwrap();

// Here `recv_may` receives all but the last value;
// Here `recv_many` receives all but the last value;
// the initial capacity is adequate, so the buffer does
// not increase in side.
assert_eq!(buffer.capacity(), rx.recv_many(&mut buffer, limit).await);
Expand Down

0 comments on commit bf44fcb

Please sign in to comment.