-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add recv_many for mpsc channels #6010
Conversation
Why not use the watch channel for this? |
Agree that the watch channel is appropriate if one target is to be updated.
|
I'd like to solicit feedback on adding another parameter to
Are there meaningful cases when memory is at a premium but an unbounded queue is required? Or, thinking about an actor, for liveness reasons, an actor might want to limit the number of messages received in one chunk. While actors could |
Usually this kind of API never increases the capacity of the vector unless it's equal to the length. This defines a limit. |
I think I am a bit confused on your meaning -- can you point me in the direction of some examples? Couldn't this again result in unbounded capacity growth if the caller fails to clear the buffer between calls? In the proposed chan.rs
Maybe the cleanest way to bound the buffer is to allow |
I mean that if But an argument with a limit is also fine. |
I think I am a bit confused on your meaning -- can you point me in the direction of some examples? Couldn't this again result in unbounded capacity growth if the caller fails to clear the buffer between calls? In the proposed chan.rs
Maybe the cleanest way to bound the buffer is to allow
I am going to rework my implementation to accept a capacity argument. That will simplify the interaction with the Semaphore. I will also simplify the logic in chan.rs |
So, I decided to let the input buffer set the capacity. In the event of a zero-capacity vector is passed, it reserves |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this point, I think the main thing remaining is adjusting documentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a few more documentation nits. I know we've gone through many review iterations for this. Please let me know if it's been too many. I would be happy to merge it as it is now if so.
tokio/src/sync/mpsc/bounded.rs
Outdated
/// If at the time of the call `buffer` has unused capacity, | ||
/// `recv_many` extends the buffer with no more elements than | ||
/// its unused capacity. Should `buffer` have no initial unused | ||
/// capacity, additional elements are first reserved. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was looking at this paragraph, and I think it could be improved like this:
/// If at the time of the call `buffer` has unused capacity, | |
/// `recv_many` extends the buffer with no more elements than | |
/// its unused capacity. Should `buffer` have no initial unused | |
/// capacity, additional elements are first reserved. | |
/// If `buffer` has unused capacity, then this call will not reserve | |
/// additional space in `buffer`. This means that the maximum number of | |
/// received messages is `buffer.capacity() - buffer.len()`. However, if | |
/// the capacity is equal to the length, then this call will increase the | |
/// capacity to make space for additional elements. |
This actually raises a question: Perhaps it makes sense to only reserve space when we return a message? This way, we don't consume extra memory until a message arrives, and if the channel gets closed, we don't reserve any space.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like your language here better.
I also like the idea of not allocating memory unless needed. If the channels are very lightweight perhaps there are often created and a finite number of messages sent and then closed. Why force the caller to always allocate one more message than is actually sent?
In chan.rs, recv_many
:
let mut insufficient_capacity = buffer.capacity() == buffer.len();
...
() => {
while (buffer.len() < buffer.capacity() || insufficient_capacity) {
match rx_fields.list.pop(&self.inner.tx) {
Some(Read::Value(value)) => {
if insufficient_capacity {
buffer.reserve(super::BLOCK_CAP);
insufficient_capacity = false;
}
buffer.push(value);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to urge users to manage the buffer's capacity outside recv_many
?
e.g.,
If buffer
has unused capacity, then this call will not reserve
additional space in buffer
. This means that the maximum number of
received messages is buffer.capacity() - buffer.len()
.
Efficient client code should ensure buffer
has unused capacity,
but if the capacity is equal to the length and there is at least one message
in the channel's queue, then this call will conservatively increase the capacity
to make space for additional elements.
tokio/src/sync/mpsc/bounded.rs
Outdated
/// This method extends `buffer` by no more than a fixed number | ||
/// of values as specified by `limit`. If `limit` is zero, | ||
/// then a default is used. The return value is the number | ||
/// of values added to `buffer`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the limit is zero, then I think it would be okay to just return 0 immediately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the caller wants a no-op, they can instead simply not-call the method, instead of calling with a zero limit.
Furthermore, immediately returning 0 means that this behavior becomes unique to a single value of limit
; all other values of limit
only return 0 if the channel is closed and no values are pending. So it breaks the API's guarantee for a case of no practical value to someone seeking to receive messages on the channel.
I considered the following possible ways to handle a 0 value for limit:
- The fail-fast approach --
assert!(limit > 0);
. - Have it so that
limit=0
acts the same way aslimit=usize::max_value()
. - Have it so that
limit=0
results in some reasonable default behavior, e.g., retrieve 2 or 8 messages or BLOCK_CAP. I liked BLOCK_CAP because it corresponded to the internal chunking used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A tangentially related discussion: #2742
If limit
is computed at run-time and 0
, then my sense is both assert!(...)
and immediately return 0 violate the principal of least surprise as in both cases the function doesn't do what it is named. But the assert
approach also pops up within the same file, so it unsurprising from an implementation standpoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. I think immediately returning 0 is the least surprising behavior. After all, you asked to receive zero messages, and you got zero messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Recall, you wrote:
Perhaps we can swap these paragraphs, and add something to the paragraph about 0 along the lines of "this method will never return 0 if the channel is not closed" or "if the channel is not closed, then this method will never return 0".
Nonetheless, I see that the same no-op convention is used in unistd.h's read
function... and evidence of users running into the types of bugs I anticipate:
https://stackoverflow.com/a/3074217
If this is acceptable, I'll update the handling and the documentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If what is acceptable?
I mainly see people be confused about length zero reads when they do this:
let mut vec = Vec::with_capacity(1024);
io.read(&mut vec);
and are surprised because &mut vec
becomes a slice of length zero.
But that doesn't happen in our case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Understood regarding io.read(&mut vec);
-- here the issue is resolved as we take a Vec.
The behavior of recv_many(buf, 0)
is similar to calling rd.take(0)
. I am unclear when in an async context take(0)
is ever useful -- but it is allowed -- so at least recv_many(buf, 0)
is no worse.
Contrived example adapted from Example: split a file into chunks
#[tokio::test]
async fn take_zero() {
use tokio::fs::File;
use std::path::PathBuf;
for chunk_size in [0, 1, 1024] {
let mut input_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
input_path.push("tests/io_take.rs");
let input_file = File::open(input_path).await.unwrap();
let mut output_path = PathBuf::from(env!("CARGO_TARGET_TMPDIR"));
output_path.push("takezero");
if output_path.exists() {
tokio::fs::remove_file(&output_path).await.unwrap();
}
assert!(!output_path.exists());
// the `.take` method is coming from the AsyncReadExt trait imported above
let mut reader = input_file.take(chunk_size);
let mut output = File::create(&output_path).await.unwrap();
loop {
let bytes_copied = tokio::io::copy(&mut reader, &mut output).await.unwrap();
if bytes_copied == 0 {
break;
}
// our reader is now exhausted, but that doesn't mean the underlying reader
// is. So we recover it, and we create the next chunked reader
reader = reader.into_inner().take(chunk_size);
}
let file_size = tokio::fs::metadata(output_path).await.unwrap().len();
assert!((chunk_size > 0 && file_size > 0) || chunk_size == 0 && file_size == 0);
}
}
I think this is now resolved (for me).
edit: removed comment; I have pushed code to return 0 immediately when limit=0
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think allowing a zero length is useful for cases where the length is computed dynamically. I've seen code along these lines:
let len = io.read_u64().await? as usize;
let mut buffer = Vec::with_capacity(len);
io.take(len).read_to_end(&mut buffer)?;
if there are length-zero messages, then the above makes use of take(0)
behavior sometimes.
Circumstances may exist where multiple messages are queued for the receiver by the time the receiving task runs -- and the receiving task itself may benefit from batch as opposed to item-by-item processing, e.g., update a row based on a primary key but only the most recent update is needed; recv_many enables the receiver to process messages last-first and skip earlier updates for rows already updated. The change involved introducing a peek function to sync/mpsc/list.rs; it is based on fn pop. fn recv_many returns Poll<Vec<T>> contrasting recv's Poll<Option<T>>. The new function returns Ready(vec![]) where recv returned Ready(None). The mpsc.rs trait Semaphore was extended with two functions, num_acquired() and add_permits(n) -- which are used to size the vector and update the underlying semaphores in a single call. Simple tests were added to tests/sync_mpsc.rs.
The implementation uses a passed-in buffer to return results. The buffer is always cleared, and its capacity may be increased to match the number of queued messages. This avoids excessive memory allocation. This commit also adds new benchmarks in benches/sync_mpsc_recv_many.rs It borrows substantially from benches/sync_mpsc.rs The benchmarks are framed around the idea that only the last value is needed, e.g., what might be typical in a dashboard. The following benchmarks compare extracting the last value from 5_000 recv's vs. a variable number of recv_many: contention_bounded_updater_recv vs. contention_bounded_updater_recv_many contention_bounded_full_updater_recv vs. contention_bounded_full_updater_recv_many contention_unbounded_updater_recv vs. contention_unbounded_updater_recv_many uncontented_bounded_updater_recv vs. uncontented_bounded_updater_recv_many uncontented_unbounded_updater_recv vs. uncontented_unbounded_updater_recv_many Initial tests suggests a ~30% performance improvement for the uncontented tests. The benchmarks contention_bounded_updater_publish_recv and contention_bounded_updater_publish_recv_many are different; first the number of messages recevied are only 1_000. In these test, we contemplate an actor who needs to receive a message and forward it over an API that takes 1ns to complete. An actor accepting a single message-at-a-time will hit the 1ns lag repeatedly; the actor that can pick only the last message out of the buffer from recv_many has much less work.
Previous benchmarks in sync_mpsc_recv_many.rs included a contrived example that is removed.
Removed methods num_acquired and peek; allow caller to control the number of retrieved elements by setting the capacity of the result buffer. In the event the result buffer has no capacity, the capacity is set to BLOCK_CAP. Added examples to the documentation.
Also, update test async_send_recv_many_with_buffer to test that buffer capacity is not increased by recv_many.
Co-authored-by: Alice Ryhl <[email protected]>
The documentation is changed to better highlight the limited circumstance in which `recv_many` may return 0, '..will never return 0 unless...'. Two new tests are somewhat analogous to the doctests were added: - send_recv_many_bounded_capacity() - send_recv_many_unbounded_capacity() These ensure that for buffers that have not reached their capacity a call to `recv_many` only fills up to the capacity leaving other messages still queued. A subsequent call on a filled buffer would reserve additional elements. The test also looks at pending messages holding the channel open -- compared to the doctest example which showcases non-dropped senders.
Co-authored-by: Alice Ryhl <[email protected]>
Co-authored-by: Alice Ryhl <[email protected]>
Positive values of `limit` set the maximum number of elements that can be added to `buffer` in a single call. A zero value of `limit` sets the maximum number of elements that can be added to a default value, currently `super::BLOCK_CAP`.
Returning 0 immediately when limit=0 is consistent with the current take(0) behavior. Tests were updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for being patient with my reviews. I'm sorry it took so many rounds. I think we're reaching the end.
Co-authored-by: Alice Ryhl <[email protected]>
Co-authored-by: Alice Ryhl <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Just a single grammar fix, and I'll merge this.
Co-authored-by: Alice Ryhl <[email protected]>
Thanks for your help on this. I note that there have been changes to master since approval and that github offers the "Update branch" button. Should I rebase this PR or leave as-is? |
Sorry for the delay. I've merged it now. |
Bumps tokio from 1.33.0 to 1.34.0. Release notes Sourced from tokio's releases. Tokio v1.34.0 Fixed io: allow clear_readiness after io driver shutdown (#6067) io: fix integer overflow in take (#6080) io: fix I/O resource hang (#6134) sync: fix broadcast::channel link (#6100) Changed macros: use ::core qualified imports instead of ::std inside tokio::test macro (#5973) Added fs: update cfg attr in fs::read_dir to include aix (#6075) sync: add mpsc::Receiver::recv_many (#6010) tokio: added vita target support (#6094) #5973: tokio-rs/tokio#5973 #6067: tokio-rs/tokio#6067 #6080: tokio-rs/tokio#6080 #6134: tokio-rs/tokio#6134 #6100: tokio-rs/tokio#6100 #6075: tokio-rs/tokio#6075 #6010: tokio-rs/tokio#6010 #6094: tokio-rs/tokio#6094 Commits 49eb26f chore: prepare Tokio v1.34.0 release (#6138) 19d96c0 io: increase ScheduledIo tick resolution (#6135) 30b2eb1 io: fix possible I/O resource hang (#6134) 8ec3e0d metrics: update stats when unparking in multi-thread (#6131) 161ecec stream: fix typo in peekable docs (#6130) 61fcc3b time: remove cached elapsed value from driver state (#6097) 944024e chore: update rust-version to 1.63 in all crates (#6126) 65f861f stream: add StreamExt::peekable (#6095) 4c85801 ci: fix docs on latest nightly (#6120) ed32cd1 task: add tests for tracing instrumentation of tasks (#6112) Additional commits viewable in compare view Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase. Dependabot commands and options You can trigger Dependabot actions by commenting on this PR: @dependabot rebase will rebase this PR @dependabot recreate will recreate this PR, overwriting any edits that have been made to it @dependabot merge will merge this PR after your CI passes on it @dependabot squash and merge will squash and merge this PR after your CI passes on it @dependabot cancel merge will cancel a previously requested merge and block automerging @dependabot reopen will reopen this PR if it is closed @dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually @dependabot show <dependency name> ignore conditions will show all of the ignore conditions of the specified dependency @dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself) @dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself) @dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
Introduces recv_many that allows mpsc receivers to get all available messages on a queue in bulk, as opposed to one-at-a-time. In the use case where message arrivals are bursty and only the last sent message matters, e.g., when updating a dashboard, this approach provides an easy approach to avoiding unnecessary work resulting in substantial speedup. The approach can also provide a modest raw speedup over
recv
as shown in the 'uncontented_bounded_updater_*' anduncontented_unbounded_updater_*
benchmarks included in this PR.Sample usage from proposed test in
sync_mpsc.rs
:Calls to
recv_many
will clear the passed-in buffer and, if needed, reserve additional capacity.Pull request for feature enhancement discussed here:
#5844
edit:clarified origin of code sample.