Skip to content

Commit

Permalink
Merge pull request #426 from cunarist/wake-previous-receiver-on-clone
Browse files Browse the repository at this point in the history
Wake previous receiver on clone
  • Loading branch information
temeddix authored Sep 12, 2024
2 parents 3709130 + 4404f8c commit f6ac782
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 10 deletions.
1 change: 1 addition & 0 deletions flutter_package/lib/src/load_os.dart
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ class RustLibraryNew extends RustLibrary {

/// Class for local native library symbols loaded with `RTLD_LOCAL`.
/// This is relatively inefficient because `malloc.allocate` is required.
/// It involves extra memory copy before sending the data to Rust.
class RustLibraryOld extends RustLibrary {
late DynamicLibrary lib;
late void Function() startRustLogicExtern;
Expand Down
52 changes: 42 additions & 10 deletions rust_crate/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,37 @@ use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};

/// The `MessageSender` is used to send messages into a shared message queue.
/// It is clonable, and multiple senders can be created to send messages into
/// the same queue. Each message is sent to a receiver, but only the currently
/// active receiver can receive messages.
#[derive(Clone)]
pub struct MessageSender<T> {
inner: Arc<Mutex<MessageChannel<T>>>,
}

/// The `MessageReceiver` is used to asynchronously receive messages from the
/// shared message queue. Only one receiver can be active at a time; new
/// receivers are created by cloning the original. When a receiver is cloned,
/// it becomes the active receiver, and the previous receiver will no longer
/// receive messages.
pub struct MessageReceiver<T> {
inner: Arc<Mutex<MessageChannel<T>>>,
id: usize, // Each receiver has a unique ID
}

/// A channel holding a message queue and managing the current active receiver.
/// Only the active receiver can receive messages.
struct MessageChannel<T> {
queue: VecDeque<T>,
waker: Option<Waker>,
active_receiver_id: usize, // Track the active receiver by ID
}

impl<T> MessageSender<T> {
/// Sends a message to the shared queue. If a receiver is waiting for a
/// message, it will be woken up. This method does not fail if the mutex
/// is poisoned but simply ignores the failure.
pub fn send(&self, msg: T) {
let mut inner = match self.inner.lock() {
Ok(inner) => inner,
Expand All @@ -29,13 +43,18 @@ impl<T> MessageSender<T> {

// Enqueue the message
inner.queue.push_back(msg);
// Wake up the previous receiver making it receive `None`, if any
if let Some(waker) = inner.waker.take() {
waker.wake();
}
}
}

impl<T> MessageReceiver<T> {
/// Asynchronously receives the next message from the queue. Only the active
/// receiver is allowed to receive messages. If there are no messages in the
/// queue, the receiver will wait until a new message is sent. If this receiver
/// is not active, it will return `None`.
pub async fn recv(&self) -> Option<T> {
RecvFuture {
inner: self.inner.clone(),
Expand All @@ -47,17 +66,26 @@ impl<T> MessageReceiver<T> {

// Automatically make the cloned receiver the active one
impl<T> Clone for MessageReceiver<T> {
/// Clones the receiver and makes the new receiver the active one. The
/// original receiver will no longer receive messages after this clone.
/// This ensures only the most recent receiver can access the message queue.
fn clone(&self) -> Self {
let mut inner = self.inner.lock().unwrap();
let new_receiver = MessageReceiver {
inner: self.inner.clone(),
id: inner.active_receiver_id + 1, // Increment ID for new receiver
};
inner.active_receiver_id = new_receiver.id; // Update active receiver
inner.active_receiver_id = new_receiver.id;
if let Some(waker) = inner.waker.take() {
waker.wake();
}
new_receiver
}
}

/// A future that represents the attempt of a `MessageReceiver` to receive a
/// message. This future is only completed when the active receiver receives
/// a message from the queue.
struct RecvFuture<T> {
inner: Arc<Mutex<MessageChannel<T>>>,
receiver_id: usize, // Track which receiver is polling
Expand All @@ -66,6 +94,10 @@ struct RecvFuture<T> {
impl<T> Future for RecvFuture<T> {
type Output = Option<T>;

/// Polls the future to check if the active receiver has a message in the
/// queue. If no message is available, the task will be put to sleep until
/// a message is sent. If this receiver is not the active receiver, it will
/// return `None`.
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut inner = match self.inner.lock() {
Ok(inner) => inner,
Expand All @@ -86,23 +118,23 @@ impl<T> Future for RecvFuture<T> {
}
}

// Create the message channel with a message queue
/// Creates a message channel with a sender and a receiver. The sender can be
/// used to send messages, and the receiver can be used to receive them
/// asynchronously. Only one receiver is active at a time, and new receivers
/// are created by cloning the original receiver.
pub fn message_channel<T>() -> (MessageSender<T>, MessageReceiver<T>) {
let channel = Arc::new(Mutex::new(MessageChannel {
queue: VecDeque::new(),
waker: None,
active_receiver_id: 0, // Start with receiver ID 0
}));

let receiver = MessageReceiver {
let sender = MessageSender {
inner: channel.clone(),
};
let receiver = MessageReceiver {
inner: channel,
id: 0,
};

(
MessageSender {
inner: channel.clone(),
},
receiver,
)
(sender, receiver)
}

0 comments on commit f6ac782

Please sign in to comment.