From 81eb75d96087d69c8dfee0f8af093800cd02dcd9 Mon Sep 17 00:00:00 2001 From: Alexey Gerasev Date: Thu, 17 Aug 2023 20:33:35 +0700 Subject: [PATCH] Remove some async traits, use hold info from base rb --- async/src/rb.rs | 69 ++++++------- async/src/traits/consumer.rs | 111 ++++++++++----------- async/src/traits/mod.rs | 4 - async/src/traits/observer.rs | 6 -- async/src/traits/producer.rs | 133 +++++++++---------------- async/src/traits/ring_buffer.rs | 8 -- async/src/wrap.rs | 168 -------------------------------- async/src/wrap/cons.rs | 87 +++++++++++++++++ async/src/wrap/mod.rs | 65 ++++++++++++ async/src/wrap/prod.rs | 89 +++++++++++++++++ 10 files changed, 369 insertions(+), 371 deletions(-) delete mode 100644 async/src/traits/observer.rs delete mode 100644 async/src/traits/ring_buffer.rs delete mode 100644 async/src/wrap.rs create mode 100644 async/src/wrap/cons.rs create mode 100644 async/src/wrap/mod.rs create mode 100644 async/src/wrap/prod.rs diff --git a/async/src/rb.rs b/async/src/rb.rs index f7e48c1..85bc94d 100644 --- a/async/src/rb.rs +++ b/async/src/rb.rs @@ -1,29 +1,28 @@ -use crate::{ - traits::{AsyncConsumer, AsyncObserver, AsyncProducer, AsyncRingBuffer}, - wrap::{AsyncCons, AsyncProd}, -}; +use crate::wrap::{AsyncCons, AsyncProd}; #[cfg(feature = "alloc")] use alloc::sync::Arc; -use core::{ - mem::MaybeUninit, - num::NonZeroUsize, - sync::atomic::{AtomicBool, Ordering}, - task::Waker, -}; +use core::{mem::MaybeUninit, num::NonZeroUsize}; use futures::task::AtomicWaker; #[cfg(feature = "alloc")] use ringbuf::traits::Split; use ringbuf::{ + rb::traits::RbRef, storage::Storage, traits::{Consumer, Observer, Producer, RingBuffer, SplitRef}, SharedRb, }; +pub trait AsyncRbRef: RbRef> { + type Storage: Storage; +} +impl>> AsyncRbRef for R { + type Storage = S; +} + pub struct AsyncRb { base: SharedRb, - read: AtomicWaker, - write: AtomicWaker, - closed: AtomicBool, + pub(crate) read: AtomicWaker, + pub(crate) write: AtomicWaker, } impl AsyncRb { @@ -32,7 +31,6 @@ impl AsyncRb { base, read: AtomicWaker::default(), write: AtomicWaker::default(), - closed: AtomicBool::new(false), } } } @@ -59,6 +57,15 @@ impl Observer for AsyncRb { unsafe fn unsafe_slices(&self, start: usize, end: usize) -> (&mut [MaybeUninit], &mut [MaybeUninit]) { self.base.unsafe_slices(start, end) } + + #[inline] + fn read_is_held(&self) -> bool { + self.base.read_is_held() + } + #[inline] + fn write_is_held(&self) -> bool { + self.base.write_is_held() + } } impl Producer for AsyncRb { @@ -66,40 +73,26 @@ impl Producer for AsyncRb { self.base.set_write_index(value); self.write.wake(); } + fn close(&mut self) {} } impl Consumer for AsyncRb { unsafe fn set_read_index(&self, value: usize) { self.base.set_read_index(value); self.read.wake(); } + fn close(&mut self) {} } -impl RingBuffer for AsyncRb {} - -impl AsyncObserver for AsyncRb { - fn is_closed(&self) -> bool { - self.closed.load(Ordering::Relaxed) - } - fn close(&self) { - self.closed.store(true, Ordering::Relaxed); - } -} -impl AsyncProducer for AsyncRb { - fn register_read_waker(&self, waker: &Waker) { - self.read.register(waker); - } -} -impl AsyncConsumer for AsyncRb { - fn register_write_waker(&self, waker: &Waker) { - self.write.register(waker); +impl RingBuffer for AsyncRb { + #[inline] + fn hold_read(&self, flag: bool) { + self.base.hold_read(flag); + self.read.wake() } -} -impl AsyncRingBuffer for AsyncRb { - fn wake_consumer(&self) { + #[inline] + fn hold_write(&self, flag: bool) { + self.base.hold_write(flag); self.write.wake() } - fn wake_producer(&self) { - self.read.wake() - } } impl SplitRef for AsyncRb { diff --git a/async/src/traits/consumer.rs b/async/src/traits/consumer.rs index be8d402..00c5150 100644 --- a/async/src/traits/consumer.rs +++ b/async/src/traits/consumer.rs @@ -1,22 +1,15 @@ -use super::{AsyncObserver, AsyncRingBuffer}; -use crate::wrap::AsyncCons; use core::{ future::Future, pin::Pin, task::{Context, Poll, Waker}, }; -#[cfg(feature = "std")] -use futures::io::AsyncRead; -use futures::{future::FusedFuture, Stream}; -use ringbuf::{ - rb::traits::RbRef, - traits::{Consumer, Observer}, -}; +use futures::future::FusedFuture; +use ringbuf::traits::Consumer; #[cfg(feature = "std")] use std::io; -pub trait AsyncConsumer: AsyncObserver + Consumer { - fn register_write_waker(&self, waker: &Waker); +pub trait AsyncConsumer: Consumer { + fn register_waker(&self, waker: &Waker); /// Pop item from the ring buffer waiting asynchronously if the buffer is empty. /// @@ -29,7 +22,7 @@ pub trait AsyncConsumer: AsyncObserver + Consumer { /// Wait for the buffer to contain at least `count` items or to close. /// - /// Panics if `count` is greater than buffer capacity. + /// In debug mode panics if `count` is greater than buffer capacity. fn wait_occupied(&mut self, count: usize) -> WaitOccupiedFuture<'_, Self> { debug_assert!(count <= self.capacity().get()); WaitOccupiedFuture { @@ -54,6 +47,47 @@ pub trait AsyncConsumer: AsyncObserver + Consumer { count: 0, } } + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> + where + Self: Unpin, + { + let mut waker_registered = false; + loop { + let closed = self.is_closed(); + if let Some(item) = self.try_pop() { + break Poll::Ready(Some(item)); + } + if closed { + break Poll::Ready(None); + } + if waker_registered { + break Poll::Pending; + } + self.register_waker(cx.waker()); + waker_registered = true; + } + } + + #[cfg(feature = "std")] + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> + where + Self: AsyncConsumer + Unpin, + { + let mut waker_registered = false; + loop { + let closed = self.is_closed(); + let len = self.pop_slice(buf); + if len != 0 || closed { + break Poll::Ready(Ok(len)); + } + if waker_registered { + break Poll::Pending; + } + self.register_waker(cx.waker()); + waker_registered = true; + } + } } pub struct PopFuture<'a, A: AsyncConsumer> { @@ -84,7 +118,7 @@ impl<'a, A: AsyncConsumer> Future for PopFuture<'a, A> { if waker_registered { break Poll::Pending; } - self.owner.register_write_waker(cx.waker()); + self.owner.register_waker(cx.waker()); waker_registered = true; } } @@ -131,7 +165,7 @@ where if waker_registered { break Poll::Pending; } - self.owner.register_write_waker(cx.waker()); + self.owner.register_waker(cx.waker()); waker_registered = true; } } @@ -162,54 +196,7 @@ impl<'a, A: AsyncConsumer> Future for WaitOccupiedFuture<'a, A> { if waker_registered { break Poll::Pending; } - self.owner.register_write_waker(cx.waker()); - waker_registered = true; - } - } -} - -impl Stream for AsyncCons -where - R::Target: AsyncRingBuffer, -{ - type Item = ::Item; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut waker_registered = false; - loop { - let closed = self.is_closed(); - if let Some(item) = self.try_pop() { - break Poll::Ready(Some(item)); - } - if closed { - break Poll::Ready(None); - } - if waker_registered { - break Poll::Pending; - } - self.register_write_waker(cx.waker()); - waker_registered = true; - } - } -} - -#[cfg(feature = "std")] -impl AsyncRead for AsyncCons -where - R::Target: AsyncRingBuffer, -{ - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { - let mut waker_registered = false; - loop { - let closed = self.is_closed(); - let len = self.pop_slice(buf); - if len != 0 || closed { - break Poll::Ready(Ok(len)); - } - if waker_registered { - break Poll::Pending; - } - self.register_write_waker(cx.waker()); + self.owner.register_waker(cx.waker()); waker_registered = true; } } diff --git a/async/src/traits/mod.rs b/async/src/traits/mod.rs index aab30cd..618fe75 100644 --- a/async/src/traits/mod.rs +++ b/async/src/traits/mod.rs @@ -1,11 +1,7 @@ pub mod consumer; -pub mod observer; pub mod producer; -pub mod ring_buffer; pub use consumer::AsyncConsumer; -pub use observer::AsyncObserver; pub use producer::AsyncProducer; -pub use ring_buffer::AsyncRingBuffer; pub use ringbuf::traits::*; diff --git a/async/src/traits/observer.rs b/async/src/traits/observer.rs deleted file mode 100644 index 4ef9929..0000000 --- a/async/src/traits/observer.rs +++ /dev/null @@ -1,6 +0,0 @@ -use ringbuf::traits::Observer; - -pub trait AsyncObserver: Observer { - fn is_closed(&self) -> bool; - fn close(&self); -} diff --git a/async/src/traits/producer.rs b/async/src/traits/producer.rs index 5fcffb4..46abda6 100644 --- a/async/src/traits/producer.rs +++ b/async/src/traits/producer.rs @@ -1,24 +1,16 @@ -use crate::wrap::AsyncProd; - -use super::{AsyncObserver, AsyncRingBuffer}; use core::{ future::Future, iter::Peekable, pin::Pin, task::{Context, Poll, Waker}, }; -#[cfg(feature = "std")] -use futures::io::AsyncWrite; -use futures::{future::FusedFuture, Sink}; -use ringbuf::{ - rb::traits::RbRef, - traits::{Observer, Producer}, -}; +use futures::future::FusedFuture; +use ringbuf::traits::Producer; #[cfg(feature = "std")] use std::io; -pub trait AsyncProducer: AsyncObserver + Producer { - fn register_read_waker(&self, waker: &Waker); +pub trait AsyncProducer: Producer { + fn register_waker(&self, waker: &Waker); /// Push item to the ring buffer waiting asynchronously if the buffer is full. /// @@ -46,7 +38,7 @@ pub trait AsyncProducer: AsyncObserver + Producer { /// Wait for the buffer to have at least `count` free places for items or to close. /// - /// Panics if `count` is greater than buffer capacity. + /// In debug mode panics if `count` is greater than buffer capacity. fn wait_vacant(&self, count: usize) -> WaitVacantFuture<'_, Self> { debug_assert!(count <= self.capacity().get()); WaitVacantFuture { @@ -71,6 +63,45 @@ pub trait AsyncProducer: AsyncObserver + Producer { count: 0, } } + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut waker_registered = false; + loop { + if self.is_closed() { + break Poll::Ready(false); + } + if !self.is_full() { + break Poll::Ready(true); + } + if waker_registered { + break Poll::Pending; + } + self.register_waker(cx.waker()); + waker_registered = true; + } + } + + #[cfg(feature = "std")] + fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> + where + Self: AsyncProducer + Unpin, + { + let mut waker_registered = false; + loop { + if self.is_closed() { + break Poll::Ready(Ok(0)); + } + let count = self.push_slice(buf); + if count > 0 { + break Poll::Ready(Ok(count)); + } + if waker_registered { + break Poll::Pending; + } + self.register_waker(cx.waker()); + waker_registered = true; + } + } } pub struct PushFuture<'a, A: AsyncProducer> { @@ -101,7 +132,7 @@ impl<'a, A: AsyncProducer> Future for PushFuture<'a, A> { if waker_registered { break Poll::Pending; } - self.owner.register_read_waker(cx.waker()); + self.owner.register_waker(cx.waker()); waker_registered = true; } } @@ -147,7 +178,7 @@ where if waker_registered { break Poll::Pending; } - self.owner.register_read_waker(cx.waker()); + self.owner.register_waker(cx.waker()); waker_registered = true; } } @@ -181,7 +212,7 @@ impl<'a, A: AsyncProducer, I: Iterator> Future for PushIterFutur if waker_registered { break Poll::Pending; } - self.owner.register_read_waker(cx.waker()); + self.owner.register_waker(cx.waker()); waker_registered = true; } } @@ -212,76 +243,8 @@ impl<'a, A: AsyncProducer> Future for WaitVacantFuture<'a, A> { if waker_registered { break Poll::Pending; } - self.owner.register_read_waker(cx.waker()); + self.owner.register_waker(cx.waker()); waker_registered = true; } } } - -impl Sink<::Item> for AsyncProd -where - R::Target: AsyncRingBuffer, -{ - type Error = (); - - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut waker_registered = false; - loop { - if self.is_closed() { - break Poll::Ready(Err(())); - } - if !self.is_full() { - break Poll::Ready(Ok(())); - } - if waker_registered { - break Poll::Pending; - } - self.register_read_waker(cx.waker()); - waker_registered = true; - } - } - fn start_send(mut self: Pin<&mut Self>, item: ::Item) -> Result<(), Self::Error> { - assert!(self.try_push(item).is_ok()); - Ok(()) - } - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - // Don't need to be flushed. - Poll::Ready(Ok(())) - } - fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - self.close(); - Poll::Ready(Ok(())) - } -} - -#[cfg(feature = "std")] -impl AsyncWrite for AsyncProd -where - R::Target: AsyncRingBuffer, -{ - fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { - let mut waker_registered = false; - loop { - if self.is_closed() { - break Poll::Ready(Ok(0)); - } - let count = self.push_slice(buf); - if count > 0 { - break Poll::Ready(Ok(count)); - } - if waker_registered { - break Poll::Pending; - } - self.register_read_waker(cx.waker()); - waker_registered = true; - } - } - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - // Don't need to be flushed. - Poll::Ready(Ok(())) - } - fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - self.close(); - Poll::Ready(Ok(())) - } -} diff --git a/async/src/traits/ring_buffer.rs b/async/src/traits/ring_buffer.rs deleted file mode 100644 index 8718a4d..0000000 --- a/async/src/traits/ring_buffer.rs +++ /dev/null @@ -1,8 +0,0 @@ -use crate::consumer::AsyncConsumer; -use crate::producer::AsyncProducer; -use ringbuf::traits::RingBuffer; - -pub trait AsyncRingBuffer: RingBuffer + AsyncProducer + AsyncConsumer { - fn wake_producer(&self); - fn wake_consumer(&self); -} diff --git a/async/src/wrap.rs b/async/src/wrap.rs deleted file mode 100644 index f510c6e..0000000 --- a/async/src/wrap.rs +++ /dev/null @@ -1,168 +0,0 @@ -use crate::traits::{AsyncConsumer, AsyncObserver, AsyncProducer, AsyncRingBuffer}; -use core::{ - mem::{ManuallyDrop, MaybeUninit}, - num::NonZeroUsize, - ptr, -}; -use ringbuf::{ - rb::traits::{RbRef, ToRbRef}, - traits::{Consumer, Observer, Producer}, - wrap::caching::Caching, - Obs, -}; - -pub struct AsyncWrap -where - R::Target: AsyncRingBuffer, -{ - base: Caching, -} - -pub type AsyncProd = AsyncWrap; -pub type AsyncCons = AsyncWrap; - -impl AsyncWrap -where - R::Target: AsyncRingBuffer, -{ - pub unsafe fn new(rb: R) -> Self { - Self { base: Caching::new(rb) } - } - - pub fn observe(&self) -> Obs { - self.base.observe() - } -} - -impl ToRbRef for AsyncWrap -where - R::Target: AsyncRingBuffer, -{ - type RbRef = R; - fn rb_ref(&self) -> &R { - self.base.rb_ref() - } - fn into_rb_ref(self) -> R { - let this = ManuallyDrop::new(self); - this.close(); - unsafe { ptr::read(&this.base) }.into_rb_ref() - } -} - -impl Unpin for AsyncWrap where R::Target: AsyncRingBuffer {} - -impl Observer for AsyncWrap -where - R::Target: AsyncRingBuffer, -{ - type Item = ::Item; - - #[inline] - fn capacity(&self) -> NonZeroUsize { - self.base.capacity() - } - #[inline] - fn read_index(&self) -> usize { - self.base.read_index() - } - #[inline] - fn write_index(&self) -> usize { - self.base.write_index() - } - #[inline] - unsafe fn unsafe_slices(&self, start: usize, end: usize) -> (&mut [MaybeUninit], &mut [MaybeUninit]) { - self.base.unsafe_slices(start, end) - } -} - -impl AsyncObserver for AsyncWrap -where - R::Target: AsyncRingBuffer, -{ - fn is_closed(&self) -> bool { - self.base.rb().is_closed() - } - fn close(&self) { - self.base.rb().close(); - if P { - self.base.rb().wake_consumer(); - } - if C { - self.base.rb().wake_producer(); - } - } -} - -impl Producer for AsyncProd -where - R::Target: AsyncRingBuffer, -{ - #[inline] - unsafe fn set_write_index(&self, value: usize) { - self.base.set_write_index(value) - } - #[inline] - fn try_push(&mut self, elem: Self::Item) -> Result<(), Self::Item> { - self.base.try_push(elem) - } - #[inline] - fn push_iter>(&mut self, iter: I) -> usize { - self.base.push_iter(iter) - } - #[inline] - fn push_slice(&mut self, elems: &[Self::Item]) -> usize - where - Self::Item: Copy, - { - self.base.push_slice(elems) - } -} - -impl AsyncProducer for AsyncProd -where - R::Target: AsyncRingBuffer, -{ - fn register_read_waker(&self, waker: &core::task::Waker) { - self.base.rb().register_read_waker(waker) - } -} -impl Consumer for AsyncCons -where - R::Target: AsyncRingBuffer, -{ - #[inline] - unsafe fn set_read_index(&self, value: usize) { - self.base.set_read_index(value) - } - #[inline] - fn try_pop(&mut self) -> Option { - self.base.try_pop() - } - #[inline] - fn pop_slice(&mut self, elems: &mut [Self::Item]) -> usize - where - Self::Item: Copy, - { - self.base.pop_slice(elems) - } -} - -impl AsyncConsumer for AsyncCons -where - R::Target: AsyncRingBuffer, -{ - fn register_write_waker(&self, waker: &core::task::Waker) { - self.base.rb().register_write_waker(waker) - } -} - -impl Drop for AsyncWrap -where - R::Target: AsyncRingBuffer, -{ - fn drop(&mut self) { - if P || C { - self.close() - } - } -} diff --git a/async/src/wrap/cons.rs b/async/src/wrap/cons.rs new file mode 100644 index 0000000..bd5c1b2 --- /dev/null +++ b/async/src/wrap/cons.rs @@ -0,0 +1,87 @@ +use crate::{consumer::AsyncConsumer, rb::AsyncRbRef, wrap::AsyncCons}; +use core::{ + pin::Pin, + task::{Context, Poll}, +}; +#[cfg(feature = "std")] +use futures::io::AsyncRead; +use futures::Stream; +use ringbuf::{ + rb::traits::ToRbRef, + traits::{Consumer, Observer, RingBuffer}, +}; +#[cfg(feature = "std")] +use std::io; + +impl Consumer for AsyncCons { + #[inline] + unsafe fn set_read_index(&self, value: usize) { + self.base.set_read_index(value) + } + #[inline] + fn close(&mut self) { + self.base.close(); + } + + #[inline] + fn try_pop(&mut self) -> Option { + self.base.try_pop() + } + #[inline] + fn pop_slice(&mut self, elems: &mut [Self::Item]) -> usize + where + Self::Item: Copy, + { + self.base.pop_slice(elems) + } +} + +impl AsyncConsumer for AsyncCons { + fn register_waker(&self, waker: &core::task::Waker) { + self.rb().write.register(waker) + } +} + +impl Stream for AsyncCons { + type Item = ::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut waker_registered = false; + loop { + let closed = self.is_closed(); + if let Some(item) = self.try_pop() { + break Poll::Ready(Some(item)); + } + if closed { + break Poll::Ready(None); + } + if waker_registered { + break Poll::Pending; + } + self.register_waker(cx.waker()); + waker_registered = true; + } + } +} + +#[cfg(feature = "std")] +impl AsyncRead for AsyncCons +where + R::Target: RingBuffer, +{ + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + let mut waker_registered = false; + loop { + let closed = self.is_closed(); + let len = self.pop_slice(buf); + if len != 0 || closed { + break Poll::Ready(Ok(len)); + } + if waker_registered { + break Poll::Pending; + } + self.register_waker(cx.waker()); + waker_registered = true; + } + } +} diff --git a/async/src/wrap/mod.rs b/async/src/wrap/mod.rs new file mode 100644 index 0000000..6098fcb --- /dev/null +++ b/async/src/wrap/mod.rs @@ -0,0 +1,65 @@ +mod cons; +mod prod; + +use crate::rb::AsyncRbRef; +use core::{mem::MaybeUninit, num::NonZeroUsize}; +use ringbuf::{rb::traits::ToRbRef, traits::Observer, wrap::caching::Caching, Obs}; + +pub struct AsyncWrap { + base: Caching, +} + +pub type AsyncProd = AsyncWrap; +pub type AsyncCons = AsyncWrap; + +impl AsyncWrap { + pub unsafe fn new(rb: R) -> Self { + Self { base: Caching::new(rb) } + } + + pub fn observe(&self) -> Obs { + self.base.observe() + } +} + +impl ToRbRef for AsyncWrap { + type RbRef = R; + fn rb_ref(&self) -> &R { + self.base.rb_ref() + } + fn into_rb_ref(self) -> R { + self.base.into_rb_ref() + } +} + +impl Unpin for AsyncWrap {} + +impl Observer for AsyncWrap { + type Item = ::Item; + + #[inline] + fn capacity(&self) -> NonZeroUsize { + self.base.capacity() + } + #[inline] + fn read_index(&self) -> usize { + self.base.read_index() + } + #[inline] + fn write_index(&self) -> usize { + self.base.write_index() + } + #[inline] + unsafe fn unsafe_slices(&self, start: usize, end: usize) -> (&mut [MaybeUninit], &mut [MaybeUninit]) { + self.base.unsafe_slices(start, end) + } + + #[inline] + fn read_is_held(&self) -> bool { + self.base.read_is_held() + } + #[inline] + fn write_is_held(&self) -> bool { + self.base.write_is_held() + } +} diff --git a/async/src/wrap/prod.rs b/async/src/wrap/prod.rs new file mode 100644 index 0000000..ae74bc8 --- /dev/null +++ b/async/src/wrap/prod.rs @@ -0,0 +1,89 @@ +use crate::{producer::AsyncProducer, rb::AsyncRbRef, wrap::AsyncProd}; +use core::{ + pin::Pin, + task::{Context, Poll}, +}; +#[cfg(feature = "std")] +use futures::io::AsyncWrite; +use futures::{ready, Sink}; +use ringbuf::{ + rb::traits::ToRbRef, + traits::{Observer, Producer, RingBuffer}, +}; +#[cfg(feature = "std")] +use std::io; + +impl Producer for AsyncProd { + #[inline] + unsafe fn set_write_index(&self, value: usize) { + self.base.set_write_index(value) + } + #[inline] + fn close(&mut self) { + self.base.close(); + } + + #[inline] + fn try_push(&mut self, elem: Self::Item) -> Result<(), Self::Item> { + self.base.try_push(elem) + } + #[inline] + fn push_iter>(&mut self, iter: I) -> usize { + self.base.push_iter(iter) + } + #[inline] + fn push_slice(&mut self, elems: &[Self::Item]) -> usize + where + Self::Item: Copy, + { + self.base.push_slice(elems) + } +} + +impl AsyncProducer for AsyncProd { + fn register_waker(&self, waker: &core::task::Waker) { + self.rb().read.register(waker) + } +} + +impl Sink<::Item> for AsyncProd { + type Error = (); + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(if ready!(::poll_ready(self, cx)) { + Ok(()) + } else { + Err(()) + }) + } + fn start_send(mut self: Pin<&mut Self>, item: ::Item) -> Result<(), Self::Error> { + assert!(self.try_push(item).is_ok()); + Ok(()) + } + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + // Don't need to be flushed. + Poll::Ready(Ok(())) + } + fn poll_close(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + self.close(); + Poll::Ready(Ok(())) + } +} + +#[cfg(feature = "std")] +impl AsyncWrite for AsyncProd +where + R::Target: RingBuffer, +{ + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + ::poll_write(self, cx, buf) + } + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + // Don't need to be flushed. + Poll::Ready(Ok(())) + } + fn poll_close(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + self.close(); + Poll::Ready(Ok(())) + } +}