Skip to content

Commit

Permalink
im: Add ObservableVectorTransaction
Browse files Browse the repository at this point in the history
Also, remove impl Stream for VectorSubscriber in favor of two separate
stream types, one mirroring the previous implementation and one that's
outputs batches, making it much easier to ensure that multiple diffs of
one transaction are handled as one unit.
  • Loading branch information
jplatte committed Sep 6, 2023
1 parent 647e860 commit 8f64983
Show file tree
Hide file tree
Showing 7 changed files with 570 additions and 44 deletions.
8 changes: 8 additions & 0 deletions eyeball-im/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# unreleased

- Add `ObservableVectorTransaction` for making multiple updates as one atomic
unit (created via `observable_vector.transaction()`)
- Remove `Stream` implementation from `VectorSubscriber` in favor of
`.into_stream()` and `.into_batched_stream()` methods that return different
stream types

# 0.2.6

This release only updates metadata for crates.io.
Expand Down
4 changes: 3 additions & 1 deletion eyeball-im/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
mod vector;

pub use vector::{
ObservableVector, ObservableVectorEntries, ObservableVectorEntry, VectorDiff, VectorSubscriber,
ObservableVector, ObservableVectorEntries, ObservableVectorEntry, ObservableVectorTransaction,
ObservableVectorTransactionEntries, ObservableVectorTransactionEntry, VectorDiff,
VectorSubscriber, VectorSubscriberBatchedStream, VectorSubscriberStream,
};

#[doc(no_inline)]
Expand Down
34 changes: 31 additions & 3 deletions eyeball-im/src/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@ use tokio::sync::broadcast::{self, Sender};

mod entry;
mod subscriber;
mod transaction;

pub use self::{
entry::{ObservableVectorEntries, ObservableVectorEntry},
subscriber::VectorSubscriber,
subscriber::{VectorSubscriber, VectorSubscriberBatchedStream, VectorSubscriberStream},
transaction::{
ObservableVectorTransaction, ObservableVectorTransactionEntries,
ObservableVectorTransactionEntry,
},
};

/// An ordered list of elements that broadcasts any changes made to it.
Expand Down Expand Up @@ -242,9 +247,17 @@ impl<T: Clone + Send + Sync + 'static> ObservableVector<T> {
ObservableVectorEntries::new(self)
}

/// Start a new transaction to make multiple updates as one unit.
///
/// See [`ObservableVectorTransaction`]s documentation for more details.
pub fn transaction(&mut self) -> ObservableVectorTransaction<'_, T> {
ObservableVectorTransaction::new(self)
}

fn broadcast_diff(&self, diff: VectorDiff<T>) {
if self.sender.receiver_count() != 0 {
let msg = BroadcastMessage { diff, state: self.values.clone() };
let msg =
BroadcastMessage { diffs: OneOrManyDiffs::One(diff), state: self.values.clone() };
let _num_receivers = self.sender.send(msg).unwrap_or(0);
#[cfg(feature = "tracing")]
tracing::debug!(
Expand Down Expand Up @@ -290,10 +303,25 @@ impl<T: Clone + Send + Sync + 'static> From<Vector<T>> for ObservableVector<T> {

#[derive(Clone)]
struct BroadcastMessage<T> {
diff: VectorDiff<T>,
diffs: OneOrManyDiffs<T>,
state: Vector<T>,
}

#[derive(Clone)]
enum OneOrManyDiffs<T> {
One(VectorDiff<T>),
Many(Vec<VectorDiff<T>>),
}

impl<T> OneOrManyDiffs<T> {
fn into_vec(self) -> Vec<VectorDiff<T>> {
match self {
OneOrManyDiffs::One(diff) => vec![diff],
OneOrManyDiffs::Many(diffs) => diffs,
}
}
}

/// A change to an [`ObservableVector`].
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum VectorDiff<T> {
Expand Down
6 changes: 3 additions & 3 deletions eyeball-im/src/vector/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ impl<T> Drop for ObservableVectorEntry<'_, T> {
}
}

enum EntryIndex<'a> {
pub(super) enum EntryIndex<'a> {
Borrowed(&'a mut usize),
Owned(usize),
}

impl<'a> EntryIndex<'a> {
fn value(&self) -> usize {
pub(super) fn value(&self) -> usize {
match self {
EntryIndex::Borrowed(idx) => **idx,
EntryIndex::Owned(idx) => *idx,
Expand All @@ -92,7 +92,7 @@ impl<'a> EntryIndex<'a> {
/// Remove the association with the externally-stored index, if any.
///
/// Returns the index value for convenience.
fn make_owned(&mut self) -> usize {
pub(super) fn make_owned(&mut self) -> usize {
match self {
EntryIndex::Borrowed(idx) => {
let idx = **idx;
Expand Down
194 changes: 161 additions & 33 deletions eyeball-im/src/vector/subscriber.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use std::{
hint::unreachable_unchecked,
mem,
pin::Pin,
task::{ready, Context, Poll},
vec,
};

use futures_core::Stream;
use imbl::Vector;
use tokio::sync::broadcast::{
error::{RecvError, TryRecvError},
Receiver,
Expand All @@ -12,71 +16,195 @@ use tokio_util::sync::ReusableBoxFuture;
#[cfg(feature = "tracing")]
use tracing::info;

use super::{BroadcastMessage, VectorDiff};
use super::{BroadcastMessage, OneOrManyDiffs, VectorDiff};

/// A subscriber for updates of a [`Vector`].
#[derive(Debug)]
pub struct VectorSubscriber<T> {
rx: Receiver<BroadcastMessage<T>>,
}

impl<T: Clone + Send + Sync + 'static> VectorSubscriber<T> {
pub(super) fn new(rx: Receiver<BroadcastMessage<T>>) -> Self {
Self { rx }
}

/// Turn this `VectorSubcriber` into a stream of `VectorDiff`s.
pub fn into_stream(self) -> VectorSubscriberStream<T> {
VectorSubscriberStream {
inner: ReusableBoxFuture::new(make_future(self.rx)),
state: VectorSubscriberStreamState::Recv,
}
}

/// Turn this `VectorSubcriber` into a stream of `Vec<VectorDiff>`s.
pub fn into_batched_stream(self) -> VectorSubscriberBatchedStream<T> {
VectorSubscriberBatchedStream { inner: ReusableBoxFuture::new(make_future(self.rx)) }
}
}

/// A stream of `VectorDiff`s created from a [`VectorSubscriber`].
///
/// Use its [`Stream`] implementation to interact with it (futures-util and
/// other futures-related crates have extension traits with convenience
/// methods).
#[derive(Debug)]
pub struct VectorSubscriber<T> {
pub struct VectorSubscriberStream<T> {
inner: ReusableBoxFuture<'static, SubscriberFutureReturn<BroadcastMessage<T>>>,
state: VectorSubscriberStreamState<T>,
}

impl<T: Clone + Send + Sync + 'static> VectorSubscriber<T> {
pub(super) fn new(rx: Receiver<BroadcastMessage<T>>) -> Self {
Self { inner: ReusableBoxFuture::new(make_future(rx)) }
}
#[derive(Debug)]
enum VectorSubscriberStreamState<T> {
Recv,
YieldBatch { iter: vec::IntoIter<VectorDiff<T>>, rx: Receiver<BroadcastMessage<T>> },
}

impl<T: Clone + Send + Sync + 'static> Stream for VectorSubscriber<T> {
// Not clear why this explicit impl is needed, but it's not unsafe so it is fine
impl<T> Unpin for VectorSubscriberStreamState<T> {}

impl<T: Clone + Send + Sync + 'static> Stream for VectorSubscriberStream<T> {
type Item = VectorDiff<T>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match &mut self.state {
VectorSubscriberStreamState::Recv => {
let (result, mut rx) = ready!(self.inner.poll(cx));

let poll = match result {
Ok(msg) => match msg.diffs {
OneOrManyDiffs::One(diff) => Poll::Ready(Some(diff)),
OneOrManyDiffs::Many(diffs) if diffs.is_empty() => {
unreachable!("ObservableVectorTransaction never sends empty diffs")
}
OneOrManyDiffs::Many(mut diffs) if diffs.len() == 1 => {
Poll::Ready(Some(diffs.pop().unwrap()))
}
OneOrManyDiffs::Many(diffs) => {
let mut iter = diffs.into_iter();
let fst = iter.next().unwrap();
self.state = VectorSubscriberStreamState::YieldBatch { iter, rx };
cx.waker().wake_by_ref();
return Poll::Ready(Some(fst));
}
},
Err(RecvError::Closed) => Poll::Ready(None),
Err(RecvError::Lagged(_)) => {
Poll::Ready(handle_lag(&mut rx).map(|values| VectorDiff::Reset { values }))
}
};

self.inner.set(make_future(rx));
poll
}
VectorSubscriberStreamState::YieldBatch { iter, .. } => {
let diff =
iter.next().expect("YieldBatch is never left empty when exiting poll_next");

if iter.len() == 0 {
let old_state =
mem::replace(&mut self.state, VectorSubscriberStreamState::Recv);
let rx = match old_state {
VectorSubscriberStreamState::YieldBatch { rx, .. } => rx,
// Safety: We would not be in the outer branch otherwise
_ => unsafe { unreachable_unchecked() },
};

self.inner.set(make_future(rx));
} else {
cx.waker().wake_by_ref();
}

Poll::Ready(Some(diff))
}
}
}
}

/// A batched stream of `VectorDiff`s created from a [`VectorSubscriber`].
///
/// Use its [`Stream`] implementation to interact with it (futures-util and
/// other futures-related crates have extension traits with convenience
/// methods).
#[derive(Debug)]
pub struct VectorSubscriberBatchedStream<T> {
inner: ReusableBoxFuture<'static, SubscriberFutureReturn<BroadcastMessage<T>>>,
}

impl<T: Clone + Send + Sync + 'static> Stream for VectorSubscriberBatchedStream<T> {
type Item = Vec<VectorDiff<T>>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
fn append<T>(target: &mut Vec<VectorDiff<T>>, source: OneOrManyDiffs<T>) {
match source {
OneOrManyDiffs::One(diff) => target.push(diff),
OneOrManyDiffs::Many(mut diffs) => target.append(&mut diffs),
}
}

let (result, mut rx) = ready!(self.inner.poll(cx));

let poll = match result {
Ok(msg) => Poll::Ready(Some(msg.diff)),
Err(RecvError::Closed) => Poll::Ready(None),
Err(RecvError::Lagged(_)) => {
let mut msg = None;
Ok(msg) => {
let mut batch = msg.diffs.into_vec();
loop {
match rx.try_recv() {
// There's a newer message in the receiver's buffer, use that for reset.
Ok(m) => {
msg = Some(m);
Ok(msg) => append(&mut batch, msg.diffs),
Err(TryRecvError::Empty | TryRecvError::Closed) => {
break Poll::Ready(Some(batch));
}
// Ideally we'd return a `VecDiff::Reset` with the last state before the
// channel was closed here, but we have no way of obtaining the last state.
Err(TryRecvError::Closed) => {
#[cfg(feature = "tracing")]
info!("Channel closed after lag, can't return last state");
break Poll::Ready(None);
Err(TryRecvError::Lagged(_)) => {
break Poll::Ready(
handle_lag(&mut rx)
.map(|values| vec![VectorDiff::Reset { values }]),
);
}
// Lagged twice in a row, is this possible? If it is, it's fine to just
// loop again and look at the next try_recv result.
Err(TryRecvError::Lagged(_)) => {}
Err(TryRecvError::Empty) => match msg {
// We exhausted the internal buffer using try_recv, msg contains the
// last message from it, which we use for the reset.
Some(msg) => {
break Poll::Ready(Some(VectorDiff::Reset { values: msg.state }));
}
// We exhausted the internal buffer using try_recv but there was no
// message in it, even though we got TryRecvError::Lagged(_) before.
None => unreachable!("got no new message via try_recv after lag"),
},
}
}
}
Err(RecvError::Closed) => Poll::Ready(None),
Err(RecvError::Lagged(_)) => {
Poll::Ready(handle_lag(&mut rx).map(|values| vec![VectorDiff::Reset { values }]))
}
};

self.inner.set(make_future(rx));
poll
}
}

fn handle_lag<T: Clone + Send + Sync + 'static>(
rx: &mut Receiver<BroadcastMessage<T>>,
) -> Option<Vector<T>> {
let mut msg = None;
loop {
match rx.try_recv() {
// There's a newer message in the receiver's buffer, use that for reset.
Ok(m) => {
msg = Some(m);
}
// Ideally we'd return a `VecDiff::Reset` with the last state before the
// channel was closed here, but we have no way of obtaining the last state.
Err(TryRecvError::Closed) => {
#[cfg(feature = "tracing")]
info!("Channel closed after lag, can't return last state");
return None;
}
// Lagged twice in a row, is this possible? If it is, it's fine to just
// loop again and look at the next try_recv result.
Err(TryRecvError::Lagged(_)) => {}
Err(TryRecvError::Empty) => match msg {
// We exhausted the internal buffer using try_recv, msg contains the
// last message from it, which we use for the reset.
Some(msg) => return Some(msg.state),
// We exhausted the internal buffer using try_recv but there was no
// message in it, even though we got TryRecvError::Lagged(_) before.
None => unreachable!("got no new message via try_recv after lag"),
},
}
}
}

type SubscriberFutureReturn<T> = (Result<T, RecvError>, Receiver<T>);

async fn make_future<T: Clone>(mut rx: Receiver<T>) -> SubscriberFutureReturn<T> {
Expand Down
Loading

0 comments on commit 8f64983

Please sign in to comment.