Skip to content

Commit

Permalink
Put IO methods into Consumer and Producer
Browse files Browse the repository at this point in the history
  • Loading branch information
agerasev committed Aug 17, 2023
1 parent 392c203 commit cc1fb6e
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 90 deletions.
37 changes: 33 additions & 4 deletions src/halves/cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ use super::{
frozen::{FrozenCons, FrozenProd},
};
use crate::{
impl_consumer_traits, impl_producer_traits,
rb::traits::{RbRef, ToRbRef},
traits::{Consumer, Observe, Observer, Producer},
};
use core::{mem::MaybeUninit, num::NonZeroUsize};
use core::{fmt, mem::MaybeUninit, num::NonZeroUsize};
#[cfg(feature = "std")]
use std::io;

/// Caching producer of ring buffer.
pub struct CachingProd<R: RbRef> {
Expand Down Expand Up @@ -144,8 +145,36 @@ impl<R: RbRef> Consumer for CachingCons<R> {
}
}

impl_producer_traits!(CachingProd<R: RbRef>);
impl_consumer_traits!(CachingCons<R: RbRef>);
#[cfg(feature = "std")]
impl<R: RbRef> io::Write for CachingProd<R>
where
Self: Producer<Item = u8>,
{
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
<Self as Producer>::write(self, buf)
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl<R: RbRef> fmt::Write for CachingProd<R>
where
Self: Producer<Item = u8>,
{
fn write_str(&mut self, s: &str) -> fmt::Result {
<Self as Producer>::write_str(self, s)
}
}

#[cfg(feature = "std")]
impl<R: RbRef> io::Read for CachingCons<R>
where
Self: Consumer<Item = u8>,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
<Self as Consumer>::read(self, buf)
}
}

impl<R: RbRef> Observe for CachingProd<R> {
type Obs = Obs<R>;
Expand Down
36 changes: 33 additions & 3 deletions src/halves/direct.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use crate::{
impl_consumer_traits, impl_producer_traits,
rb::traits::{RbRef, ToRbRef},
traits::{
delegate::{self, Delegate},
Consumer, Observe, Producer,
},
};
use core::fmt;
#[cfg(feature = "std")]
use std::io;

/// Observer of ring buffer.
#[derive(Clone)]
Expand Down Expand Up @@ -108,8 +110,36 @@ impl<R: RbRef> Consumer for Cons<R> {
}
}

impl_producer_traits!(Prod<R: RbRef>);
impl_consumer_traits!(Cons<R: RbRef>);
#[cfg(feature = "std")]
impl<R: RbRef> io::Write for Prod<R>
where
Self: Producer<Item = u8>,
{
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
<Self as Producer>::write(self, buf)
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl<R: RbRef> fmt::Write for Prod<R>
where
Self: Producer<Item = u8>,
{
fn write_str(&mut self, s: &str) -> fmt::Result {
<Self as Producer>::write_str(self, s)
}
}

#[cfg(feature = "std")]
impl<R: RbRef> io::Read for Cons<R>
where
Self: Consumer<Item = u8>,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
<Self as Consumer>::read(self, buf)
}
}

impl<R: RbRef> Observe for Obs<R> {
type Obs = Self;
Expand Down
35 changes: 35 additions & 0 deletions src/halves/frozen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ use crate::{
};
use core::{
cell::Cell,
fmt,
mem::{ManuallyDrop, MaybeUninit},
num::NonZeroUsize,
ptr,
};
#[cfg(feature = "std")]
use std::io;

/// Frozen read end of some ring buffer.
///
Expand Down Expand Up @@ -202,6 +205,38 @@ impl<R: RbRef> ToRbRef for FrozenProd<R> {
}
}

#[cfg(feature = "std")]
impl<R: RbRef> io::Write for FrozenProd<R>
where
Self: Producer<Item = u8>,
{
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
<Self as Producer>::write(self, buf)
}
fn flush(&mut self) -> io::Result<()> {
self.commit();
Ok(())
}
}
impl<R: RbRef> fmt::Write for FrozenProd<R>
where
Self: Producer<Item = u8>,
{
fn write_str(&mut self, s: &str) -> fmt::Result {
<Self as Producer>::write_str(self, s)
}
}

#[cfg(feature = "std")]
impl<R: RbRef> io::Read for FrozenCons<R>
where
Self: Consumer<Item = u8>,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
<Self as Consumer>::read(self, buf)
}
}

impl<R: RbRef> Observe for FrozenCons<R> {
type Obs = Obs<R>;
fn observe(&self) -> Self::Obs {
Expand Down
27 changes: 24 additions & 3 deletions src/rb/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@ use super::{macros::rb_impl_init, utils::ranges};
use crate::traits::Split;
use crate::{
halves::{Cons, Prod},
impl_consumer_traits, impl_producer_traits,
storage::{Shared, Static, Storage},
traits::{Consumer, Observer, Producer, RingBuffer, SplitRef},
};
#[cfg(feature = "alloc")]
use alloc::rc::Rc;
use core::{
cell::Cell,
fmt,
mem::{ManuallyDrop, MaybeUninit},
num::NonZeroUsize,
ptr,
};
#[cfg(feature = "std")]
use std::io;

/// Ring buffer for single-threaded use only.
pub struct LocalRb<S: Storage> {
Expand Down Expand Up @@ -114,5 +116,24 @@ impl<S: Storage> SplitRef for LocalRb<S> {

rb_impl_init!(LocalRb);

impl_producer_traits!(LocalRb<S: Storage>);
impl_consumer_traits!(LocalRb<S: Storage>);
#[cfg(feature = "std")]
impl<S: Storage<Item = u8>> io::Write for LocalRb<S> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
<Self as Producer>::write(self, buf)
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl<S: Storage<Item = u8>> fmt::Write for LocalRb<S> {
fn write_str(&mut self, s: &str) -> fmt::Result {
<Self as Producer>::write_str(self, s)
}
}

#[cfg(feature = "std")]
impl<S: Storage<Item = u8>> io::Read for LocalRb<S> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
<Self as Consumer>::read(self, buf)
}
}
27 changes: 24 additions & 3 deletions src/rb/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,21 @@ use super::{macros::rb_impl_init, utils::ranges};
use crate::traits::Split;
use crate::{
halves::{CachingCons, CachingProd},
impl_consumer_traits, impl_producer_traits,
storage::{Shared, Static, Storage},
traits::{Consumer, Observer, Producer, RingBuffer, SplitRef},
};
#[cfg(feature = "alloc")]
use alloc::sync::Arc;
use core::{
fmt,
mem::{ManuallyDrop, MaybeUninit},
num::NonZeroUsize,
ptr,
sync::atomic::{AtomicUsize, Ordering},
};
use crossbeam_utils::CachePadded;
#[cfg(feature = "std")]
use std::io;

/// Ring buffer that can be shared between threads.
///
Expand Down Expand Up @@ -142,5 +144,24 @@ impl<S: Storage> SplitRef for SharedRb<S> {

rb_impl_init!(SharedRb);

impl_producer_traits!(SharedRb<S: Storage>);
impl_consumer_traits!(SharedRb<S: Storage>);
#[cfg(feature = "std")]
impl<S: Storage<Item = u8>> io::Write for SharedRb<S> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
<Self as Producer>::write(self, buf)
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl<S: Storage<Item = u8>> fmt::Write for SharedRb<S> {
fn write_str(&mut self, s: &str) -> fmt::Result {
<Self as Producer>::write_str(self, s)
}
}

#[cfg(feature = "std")]
impl<S: Storage<Item = u8>> io::Read for SharedRb<S> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
<Self as Consumer>::read(self, buf)
}
}
44 changes: 13 additions & 31 deletions src/traits/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,6 @@ use core::{iter::Chain, mem::MaybeUninit, ptr, slice};
use std::io::{self, Write};

/// Consumer part of ring buffer.
///
/// # Mode
///
/// It can operate in immediate (by default) or postponed mode.
/// Mode could be switched using [`Self::postponed`]/[`Self::into_postponed`] and [`Self::into_immediate`] methods.
///
/// + In immediate mode removed and inserted items are automatically synchronized with the other end.
/// + In postponed mode synchronization occurs only when [`Self::sync`] or [`Self::into_immediate`] is called or when `Self` is dropped.
/// The reason to use postponed mode is that multiple subsequent operations are performed faster due to less frequent cache synchronization.
pub trait Consumer: Observer {
unsafe fn set_read_index(&self, value: usize);

Expand Down Expand Up @@ -212,6 +203,19 @@ pub trait Consumer: Observer {
unsafe { self.advance_read_index(write_count) };
Ok(write_count)
}

#[cfg(feature = "std")]
fn read(&mut self, buffer: &mut [u8]) -> std::io::Result<usize>
where
Self: Consumer<Item = u8>,
{
let n = self.pop_slice(buffer);
if n == 0 && !buffer.is_empty() {
Err(std::io::ErrorKind::WouldBlock.into())
} else {
Ok(n)
}
}
}

pub struct IntoIter<C: Consumer>(C);
Expand Down Expand Up @@ -297,28 +301,6 @@ pub type Iter<'a, C: Consumer> = Chain<slice::Iter<'a, C::Item>, slice::Iter<'a,
#[allow(type_alias_bounds)]
pub type IterMut<'a, C: Consumer> = Chain<slice::IterMut<'a, C::Item>, slice::IterMut<'a, C::Item>>;

#[macro_export]
macro_rules! impl_consumer_traits {
($type:ident $(< $( $param:tt $( : $first_bound:tt $(+ $next_bound:tt )* )? ),+ >)?) => {

#[cfg(feature = "std")]
impl $(< $( $param $( : $first_bound $(+ $next_bound )* )? ),+ >)? std::io::Read for $type $(< $( $param ),+ >)?
where
Self: $crate::traits::Consumer<Item = u8>,
{
fn read(&mut self, buffer: &mut [u8]) -> std::io::Result<usize> {
use $crate::consumer::Consumer;
let n = self.pop_slice(buffer);
if n == 0 && !buffer.is_empty() {
Err(std::io::ErrorKind::WouldBlock.into())
} else {
Ok(n)
}
}
}
};
}

pub trait DelegateConsumer: DelegateObserver + DelegateMut
where
Self::Base: Consumer,
Expand Down
Loading

0 comments on commit cc1fb6e

Please sign in to comment.