Skip to content

Commit

Permalink
feat: Add a force send function
Browse files Browse the repository at this point in the history
Closes #44 by adding a "force_send" method. This method can replace an
existing element in the list, in which case that element is returned.
This can be used to make "limited capacity" channels.

Signed-off-by: John Nunley <[email protected]>
  • Loading branch information
notgull committed May 6, 2024
1 parent 790456a commit 3fc1130
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 2 deletions.
44 changes: 42 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,10 @@ use core::marker::PhantomPinned;
use core::pin::Pin;
use core::sync::atomic::{AtomicUsize, Ordering};
use core::task::{Context, Poll};
use core::usize;

use alloc::sync::Arc;

use concurrent_queue::{ConcurrentQueue, PopError, PushError};
use concurrent_queue::{ConcurrentQueue, ForcePushError, PopError, PushError};
use event_listener::{Event, EventListener};
use event_listener_strategy::{easy_wrapper, EventListenerFuture, Strategy};
use futures_core::ready;
Expand Down Expand Up @@ -286,6 +285,47 @@ impl<T> Sender<T> {
self.send(msg).wait()
}

/// Forcefully push a message into this channel.
///
/// If the channel is full, this method will replace an existing message in the
/// channel and return it as `Ok(Some(value))`. If the channel is closed, this
/// method will return an error.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_channel::{bounded, SendError};
///
/// let (s, r) = bounded(3);
///
/// assert_eq!(s.send(1).await, Ok(()));
/// assert_eq!(s.send(2).await, Ok(()));
/// assert_eq!(s.force_send(3), Ok(None));
/// assert_eq!(s.force_send(4), Ok(Some(1)));
///
/// assert_eq!(r.recv().await, Ok(2));
/// assert_eq!(r.recv().await, Ok(3));
/// assert_eq!(r.recv().await, Ok(4));
/// # });
/// ```
pub fn force_send(&self, msg: T) -> Result<Option<T>, SendError<T>> {
match self.channel.queue.force_push(msg) {
Ok(backlog) => {
// Notify a blocked receive operation. If the notified operation gets canceled,
// it will notify another blocked receive operation.
self.channel.recv_ops.notify_additional(1);

// Notify all blocked streams.
self.channel.stream_ops.notify(usize::MAX);

Ok(backlog)
}

Err(ForcePushError(reject)) => Err(SendError(reject)),
}
}

/// Closes the channel.
///
/// Returns `true` if this call has closed the channel and it was not closed already.
Expand Down
24 changes: 24 additions & 0 deletions tests/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,30 @@ fn send() {
.run();
}

#[cfg(not(target_family = "wasm"))]
#[test]
fn force_send() {
let (s, r) = bounded(1);

Parallel::new()
.add(|| {
s.force_send(7).unwrap();
sleep(ms(1000));
s.force_send(8).unwrap();
sleep(ms(1000));
s.force_send(9).unwrap();
sleep(ms(1000));
s.force_send(10).unwrap();
})
.add(|| {
sleep(ms(1500));
assert_eq!(future::block_on(r.recv()), Ok(8));
assert_eq!(future::block_on(r.recv()), Ok(9));
assert_eq!(future::block_on(r.recv()), Ok(10));
})
.run();
}

#[cfg(not(target_family = "wasm"))]
#[test]
fn send_after_close() {
Expand Down

0 comments on commit 3fc1130

Please sign in to comment.