diff --git a/src/lib.rs b/src/lib.rs index f56acf5..801b418 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -113,7 +113,7 @@ use std::fmt; use std::future::Future; use std::marker::PhantomPinned; use std::pin::Pin; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use event_listener::{Event, EventListener}; @@ -150,7 +150,7 @@ use pin_project_lite::pin_project; pub fn broadcast(cap: usize) -> (Sender, Receiver) { assert!(cap > 0, "capacity cannot be zero"); - let inner = Arc::new(RwLock::new(Inner { + let inner = Arc::new(Mutex::new(Inner { queue: VecDeque::with_capacity(cap), capacity: cap, overflow: false, @@ -302,7 +302,7 @@ impl Inner { /// The channel can also be closed manually by calling [`Sender::close()`]. #[derive(Debug)] pub struct Sender { - inner: Arc>>, + inner: Arc>>, } impl Sender { @@ -317,7 +317,7 @@ impl Sender { /// assert_eq!(s.capacity(), 5); /// ``` pub fn capacity(&self) -> usize { - self.inner.read().unwrap().capacity + self.inner.lock().unwrap().capacity } /// Set the channel capacity. @@ -351,7 +351,7 @@ impl Sender { /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2))); /// ``` pub fn set_capacity(&mut self, new_cap: usize) { - self.inner.write().unwrap().set_capacity(new_cap); + self.inner.lock().unwrap().set_capacity(new_cap); } /// If overflow mode is enabled on this channel. @@ -365,7 +365,7 @@ impl Sender { /// assert!(!s.overflow()); /// ``` pub fn overflow(&self) -> bool { - self.inner.read().unwrap().overflow + self.inner.lock().unwrap().overflow } /// Set overflow mode on the channel. @@ -392,7 +392,7 @@ impl Sender { /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); /// ``` pub fn set_overflow(&mut self, overflow: bool) { - self.inner.write().unwrap().overflow = overflow; + self.inner.lock().unwrap().overflow = overflow; } /// If sender will wait for active receivers. @@ -409,7 +409,7 @@ impl Sender { /// assert!(s.await_active()); /// ``` pub fn await_active(&self) -> bool { - self.inner.read().unwrap().await_active + self.inner.lock().unwrap().await_active } /// Specify if sender will wait for active receivers. @@ -432,7 +432,7 @@ impl Sender { /// # }); /// ``` pub fn set_await_active(&mut self, await_active: bool) { - self.inner.write().unwrap().await_active = await_active; + self.inner.lock().unwrap().await_active = await_active; } /// Closes the channel. @@ -456,7 +456,7 @@ impl Sender { /// # }); /// ``` pub fn close(&self) -> bool { - self.inner.write().unwrap().close() + self.inner.lock().unwrap().close() } /// Returns `true` if the channel is closed. @@ -475,7 +475,7 @@ impl Sender { /// # }); /// ``` pub fn is_closed(&self) -> bool { - self.inner.read().unwrap().is_closed + self.inner.lock().unwrap().is_closed } /// Returns `true` if the channel is empty. @@ -494,7 +494,7 @@ impl Sender { /// # }); /// ``` pub fn is_empty(&self) -> bool { - self.inner.read().unwrap().queue.is_empty() + self.inner.lock().unwrap().queue.is_empty() } /// Returns `true` if the channel is full. @@ -513,7 +513,7 @@ impl Sender { /// # }); /// ``` pub fn is_full(&self) -> bool { - let inner = self.inner.read().unwrap(); + let inner = self.inner.lock().unwrap(); inner.queue.len() == inner.capacity } @@ -535,7 +535,7 @@ impl Sender { /// # }); /// ``` pub fn len(&self) -> usize { - self.inner.read().unwrap().queue.len() + self.inner.lock().unwrap().queue.len() } /// Returns the number of receivers for the channel. @@ -558,7 +558,7 @@ impl Sender { /// assert_eq!(r.inactive_receiver_count(), 1); /// ``` pub fn receiver_count(&self) -> usize { - self.inner.read().unwrap().receiver_count + self.inner.lock().unwrap().receiver_count } /// Returns the number of inactive receivers for the channel. @@ -578,7 +578,7 @@ impl Sender { /// assert_eq!(r.inactive_receiver_count(), 1); /// ``` pub fn inactive_receiver_count(&self) -> usize { - self.inner.read().unwrap().inactive_receiver_count + self.inner.lock().unwrap().inactive_receiver_count } /// Returns the number of senders for the channel. @@ -597,7 +597,7 @@ impl Sender { /// # }); /// ``` pub fn sender_count(&self) -> usize { - self.inner.read().unwrap().sender_count + self.inner.lock().unwrap().sender_count } /// Produce a new Receiver for this channel. @@ -629,7 +629,7 @@ impl Sender { /// # }); /// ``` pub fn new_receiver(&self) -> Receiver { - let mut inner = self.inner.write().unwrap(); + let mut inner = self.inner.lock().unwrap(); inner.receiver_count += 1; Receiver { inner: self.inner.clone(), @@ -725,7 +725,7 @@ impl Sender { /// ``` pub fn try_broadcast(&self, msg: T) -> Result, TrySendError> { let mut ret = None; - let mut inner = self.inner.write().unwrap(); + let mut inner = self.inner.lock().unwrap(); if inner.is_closed { return Err(TrySendError::Closed(msg)); @@ -756,7 +756,7 @@ impl Sender { impl Drop for Sender { fn drop(&mut self) { - let mut inner = self.inner.write().unwrap(); + let mut inner = self.inner.lock().unwrap(); inner.sender_count -= 1; @@ -768,7 +768,7 @@ impl Drop for Sender { impl Clone for Sender { fn clone(&self) -> Self { - self.inner.write().unwrap().sender_count += 1; + self.inner.lock().unwrap().sender_count += 1; Sender { inner: self.inner.clone(), @@ -784,7 +784,7 @@ impl Clone for Sender { /// receivers around. #[derive(Debug)] pub struct Receiver { - inner: Arc>>, + inner: Arc>>, pos: u64, /// Listens for a send or close event to unblock this stream. @@ -803,7 +803,7 @@ impl Receiver { /// assert_eq!(r.capacity(), 5); /// ``` pub fn capacity(&self) -> usize { - self.inner.read().unwrap().capacity + self.inner.lock().unwrap().capacity } /// Set the channel capacity. @@ -837,7 +837,7 @@ impl Receiver { /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2))); /// ``` pub fn set_capacity(&mut self, new_cap: usize) { - self.inner.write().unwrap().set_capacity(new_cap); + self.inner.lock().unwrap().set_capacity(new_cap); } /// If overflow mode is enabled on this channel. @@ -851,7 +851,7 @@ impl Receiver { /// assert!(!r.overflow()); /// ``` pub fn overflow(&self) -> bool { - self.inner.read().unwrap().overflow + self.inner.lock().unwrap().overflow } /// Set overflow mode on the channel. @@ -878,7 +878,7 @@ impl Receiver { /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); /// ``` pub fn set_overflow(&mut self, overflow: bool) { - self.inner.write().unwrap().overflow = overflow; + self.inner.lock().unwrap().overflow = overflow; } /// If sender will wait for active receivers. @@ -895,7 +895,7 @@ impl Receiver { /// assert!(r.await_active()); /// ``` pub fn await_active(&self) -> bool { - self.inner.read().unwrap().await_active + self.inner.lock().unwrap().await_active } /// Specify if sender will wait for active receivers. @@ -918,7 +918,7 @@ impl Receiver { /// # }); /// ``` pub fn set_await_active(&mut self, await_active: bool) { - self.inner.write().unwrap().await_active = await_active; + self.inner.lock().unwrap().await_active = await_active; } /// Closes the channel. @@ -942,7 +942,7 @@ impl Receiver { /// # }); /// ``` pub fn close(&self) -> bool { - self.inner.write().unwrap().close() + self.inner.lock().unwrap().close() } /// Returns `true` if the channel is closed. @@ -961,7 +961,7 @@ impl Receiver { /// # }); /// ``` pub fn is_closed(&self) -> bool { - self.inner.read().unwrap().is_closed + self.inner.lock().unwrap().is_closed } /// Returns `true` if the channel is empty. @@ -980,7 +980,7 @@ impl Receiver { /// # }); /// ``` pub fn is_empty(&self) -> bool { - self.inner.read().unwrap().queue.is_empty() + self.inner.lock().unwrap().queue.is_empty() } /// Returns `true` if the channel is full. @@ -999,7 +999,7 @@ impl Receiver { /// # }); /// ``` pub fn is_full(&self) -> bool { - let inner = self.inner.read().unwrap(); + let inner = self.inner.lock().unwrap(); inner.queue.len() == inner.capacity } @@ -1021,7 +1021,7 @@ impl Receiver { /// # }); /// ``` pub fn len(&self) -> usize { - self.inner.read().unwrap().queue.len() + self.inner.lock().unwrap().queue.len() } /// Returns the number of receivers for the channel. @@ -1044,7 +1044,7 @@ impl Receiver { /// assert_eq!(r.inactive_receiver_count(), 1); /// ``` pub fn receiver_count(&self) -> usize { - self.inner.read().unwrap().receiver_count + self.inner.lock().unwrap().receiver_count } /// Returns the number of inactive receivers for the channel. @@ -1064,7 +1064,7 @@ impl Receiver { /// assert_eq!(r.inactive_receiver_count(), 1); /// ``` pub fn inactive_receiver_count(&self) -> usize { - self.inner.read().unwrap().inactive_receiver_count + self.inner.lock().unwrap().inactive_receiver_count } /// Returns the number of senders for the channel. @@ -1083,7 +1083,7 @@ impl Receiver { /// # }); /// ``` pub fn sender_count(&self) -> usize { - self.inner.read().unwrap().sender_count + self.inner.lock().unwrap().sender_count } /// Downgrade to a [`InactiveReceiver`]. @@ -1114,7 +1114,7 @@ impl Receiver { /// ``` pub fn deactivate(self) -> InactiveReceiver { // Drop::drop impl of Receiver will take care of `receiver_count`. - self.inner.write().unwrap().inactive_receiver_count += 1; + self.inner.lock().unwrap().inactive_receiver_count += 1; InactiveReceiver { inner: self.inner.clone(), @@ -1222,7 +1222,7 @@ impl Receiver { /// ``` pub fn try_recv(&mut self) -> Result { self.inner - .write() + .lock() .unwrap() .try_recv_at(&mut self.pos) .map(|cow| cow.unwrap_or_else(T::clone)) @@ -1254,7 +1254,7 @@ impl Receiver { /// # }); /// ``` pub fn new_sender(&self) -> Sender { - self.inner.write().unwrap().sender_count += 1; + self.inner.lock().unwrap().sender_count += 1; Sender { inner: self.inner.clone(), @@ -1290,7 +1290,7 @@ impl Receiver { /// # }); /// ``` pub fn new_receiver(&self) -> Self { - let mut inner = self.inner.write().unwrap(); + let mut inner = self.inner.lock().unwrap(); inner.receiver_count += 1; Receiver { inner: self.inner.clone(), @@ -1369,7 +1369,7 @@ impl Receiver { None => { // Start listening and then try receiving again. self.listener = { - let inner = self.inner.write().unwrap(); + let inner = self.inner.lock().unwrap(); Some(inner.recv_ops.listen()) }; } @@ -1385,7 +1385,7 @@ impl Receiver { impl Drop for Receiver { fn drop(&mut self) { - let mut inner = self.inner.write().unwrap(); + let mut inner = self.inner.lock().unwrap(); // Remove ourself from each item's counter loop { @@ -1426,7 +1426,7 @@ impl Clone for Receiver { /// # }); /// ``` fn clone(&self) -> Self { - let mut inner = self.inner.write().unwrap(); + let mut inner = self.inner.lock().unwrap(); inner.receiver_count += 1; // increment the waiter count on all items not yet received by this object let n = self.pos.saturating_sub(inner.head_pos) as usize; @@ -1459,7 +1459,7 @@ impl Stream for Receiver { impl futures_core::stream::FusedStream for Receiver { fn is_terminated(&self) -> bool { - let inner = self.inner.read().unwrap(); + let inner = self.inner.lock().unwrap(); inner.is_closed && inner.queue.is_empty() } @@ -1683,7 +1683,7 @@ impl<'a, T: Clone> EventListenerFuture for SendInner<'a, T> { // Attempt to send a message. match this.sender.try_broadcast(msg) { Ok(msg) => { - let inner = inner.write().unwrap(); + let inner = inner.lock().unwrap(); if inner.queue.len() < inner.capacity { // Not full still, so notify the next awaiting sender. @@ -1694,7 +1694,7 @@ impl<'a, T: Clone> EventListenerFuture for SendInner<'a, T> { } Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))), Err(TrySendError::Full(m)) => *this.msg = Some(m), - Err(TrySendError::Inactive(m)) if inner.read().unwrap().await_active => { + Err(TrySendError::Inactive(m)) if inner.lock().unwrap().await_active => { *this.msg = Some(m) } Err(TrySendError::Inactive(m)) => return Poll::Ready(Err(SendError(m))), @@ -1704,7 +1704,7 @@ impl<'a, T: Clone> EventListenerFuture for SendInner<'a, T> { match &this.listener { None => { // Start listening and then try sending again. - let inner = inner.write().unwrap(); + let inner = inner.lock().unwrap(); *this.listener = Some(inner.send_ops.listen()); } Some(_) => { @@ -1764,7 +1764,7 @@ impl<'a, T: Clone> EventListenerFuture for RecvInner<'a, T> { None => { // Start listening and then try receiving again. *this.listener = { - let inner = this.receiver.inner.write().unwrap(); + let inner = this.receiver.inner.lock().unwrap(); Some(inner.recv_ops.listen()) }; } @@ -1784,7 +1784,7 @@ impl<'a, T: Clone> EventListenerFuture for RecvInner<'a, T> { /// keeping a channel open even when no associated active receivers exist. #[derive(Debug)] pub struct InactiveReceiver { - inner: Arc>>, + inner: Arc>>, } impl InactiveReceiver { @@ -1825,7 +1825,7 @@ impl InactiveReceiver { /// assert_eq!(r.try_recv(), Ok(10)); /// ``` pub fn activate_cloned(&self) -> Receiver { - let mut inner = self.inner.write().unwrap(); + let mut inner = self.inner.lock().unwrap(); inner.receiver_count += 1; if inner.receiver_count == 1 { @@ -1845,7 +1845,7 @@ impl InactiveReceiver { /// /// See [`Receiver::capacity`] documentation for examples. pub fn capacity(&self) -> usize { - self.inner.read().unwrap().capacity + self.inner.lock().unwrap().capacity } /// Set the channel capacity. @@ -1856,14 +1856,14 @@ impl InactiveReceiver { /// /// See [`Receiver::set_capacity`] documentation for examples. pub fn set_capacity(&mut self, new_cap: usize) { - self.inner.write().unwrap().set_capacity(new_cap); + self.inner.lock().unwrap().set_capacity(new_cap); } /// If overflow mode is enabled on this channel. /// /// See [`Receiver::overflow`] documentation for examples. pub fn overflow(&self) -> bool { - self.inner.read().unwrap().overflow + self.inner.lock().unwrap().overflow } /// Set overflow mode on the channel. @@ -1873,7 +1873,7 @@ impl InactiveReceiver { /// /// See [`Receiver::set_overflow`] documentation for examples. pub fn set_overflow(&mut self, overflow: bool) { - self.inner.write().unwrap().overflow = overflow; + self.inner.lock().unwrap().overflow = overflow; } /// If sender will wait for active receivers. @@ -1891,7 +1891,7 @@ impl InactiveReceiver { /// assert!(r.await_active()); /// ``` pub fn await_active(&self) -> bool { - self.inner.read().unwrap().await_active + self.inner.lock().unwrap().await_active } /// Specify if sender will wait for active receivers. @@ -1914,7 +1914,7 @@ impl InactiveReceiver { /// # }); /// ``` pub fn set_await_active(&mut self, await_active: bool) { - self.inner.write().unwrap().await_active = await_active; + self.inner.lock().unwrap().await_active = await_active; } /// Closes the channel. @@ -1925,28 +1925,28 @@ impl InactiveReceiver { /// /// See [`Receiver::close`] documentation for examples. pub fn close(&self) -> bool { - self.inner.write().unwrap().close() + self.inner.lock().unwrap().close() } /// Returns `true` if the channel is closed. /// /// See [`Receiver::is_closed`] documentation for examples. pub fn is_closed(&self) -> bool { - self.inner.read().unwrap().is_closed + self.inner.lock().unwrap().is_closed } /// Returns `true` if the channel is empty. /// /// See [`Receiver::is_empty`] documentation for examples. pub fn is_empty(&self) -> bool { - self.inner.read().unwrap().queue.is_empty() + self.inner.lock().unwrap().queue.is_empty() } /// Returns `true` if the channel is full. /// /// See [`Receiver::is_full`] documentation for examples. pub fn is_full(&self) -> bool { - let inner = self.inner.read().unwrap(); + let inner = self.inner.lock().unwrap(); inner.queue.len() == inner.capacity } @@ -1955,7 +1955,7 @@ impl InactiveReceiver { /// /// See [`Receiver::len`] documentation for examples. pub fn len(&self) -> usize { - self.inner.read().unwrap().queue.len() + self.inner.lock().unwrap().queue.len() } /// Returns the number of receivers for the channel. @@ -1978,7 +1978,7 @@ impl InactiveReceiver { /// assert_eq!(r.inactive_receiver_count(), 1); /// ``` pub fn receiver_count(&self) -> usize { - self.inner.read().unwrap().receiver_count + self.inner.lock().unwrap().receiver_count } /// Returns the number of inactive receivers for the channel. @@ -1998,20 +1998,20 @@ impl InactiveReceiver { /// assert_eq!(r.inactive_receiver_count(), 1); /// ``` pub fn inactive_receiver_count(&self) -> usize { - self.inner.read().unwrap().inactive_receiver_count + self.inner.lock().unwrap().inactive_receiver_count } /// Returns the number of senders for the channel. /// /// See [`Receiver::sender_count`] documentation for examples. pub fn sender_count(&self) -> usize { - self.inner.read().unwrap().sender_count + self.inner.lock().unwrap().sender_count } } impl Clone for InactiveReceiver { fn clone(&self) -> Self { - self.inner.write().unwrap().inactive_receiver_count += 1; + self.inner.lock().unwrap().inactive_receiver_count += 1; InactiveReceiver { inner: self.inner.clone(), @@ -2021,7 +2021,7 @@ impl Clone for InactiveReceiver { impl Drop for InactiveReceiver { fn drop(&mut self) { - let mut inner = self.inner.write().unwrap(); + let mut inner = self.inner.lock().unwrap(); inner.inactive_receiver_count -= 1; inner.close_channel();