Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

im-util: Rename DynamicLimit => Limit and add more constructors #36

Merged
merged 1 commit into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions eyeball-im-util/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# Unreleased

- Add `Limit` adapter for presenting a limited view of an underlying observable
vector

# 0.3.1

- Fix a bug with `Filter` and `FilterMap` that was corrupting their internal
Expand Down
2 changes: 1 addition & 1 deletion eyeball-im-util/src/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use futures_core::Stream;

pub use self::{
filter::{Filter, FilterMap},
limit::DynamicLimit,
limit::{EmptyLimitStream, Limit},
};

/// Abstraction over stream items that the adapters in this module can deal
Expand Down
106 changes: 84 additions & 22 deletions eyeball-im-util/src/vector/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,19 @@ pin_project! {
/// A [`VectorDiff`] stream adapter that presents a limited view of the
/// underlying [`ObservableVector`]s items.
///
/// For example, let `S` be a `Stream<Item = VectorDiff>`. The `Vector` represented
/// by `S` can have any length, but one may want to virtually _limit_ this `Vector`
/// to a certain size. Then this `DynamicLimit` adapter is well appropriate.
/// The limit is dynamic, i.e. it changes over time based on values that are polled
/// from another `Stream` (ref. [`Self::limit_stream`]).
/// For example, let `S` be a `Stream<Item = VectorDiff>`. The `Vector`
/// represented by `S` can have any length, but one may want to virtually
/// _limit_ this `Vector` to a certain size. Then this `Limit` adapter is
/// appropriate.
///
/// Because the limit is dynamic, an internal buffered vector is kept, so that
/// the adapter knows which values can be added when the limit is increased, or
/// when values are removed and new values must be inserted. This fact is important
/// if the items of the `Vector` have a non-negligible size.
/// An internal buffered vector is kept so that the adapter knows which
/// values can be added when the limit is increased, or when values are
/// removed and new values must be inserted. This fact is important if the
/// items of the `Vector` have a non-negligible size.
///
/// It's OK to have a limit larger than the length of the observed `Vector`.
#[project = DynamicLimitProj]
pub struct DynamicLimit<S, L>
#[project = LimitProj]
pub struct Limit<S, L>
where
S: Stream,
S::Item: VectorDiffContainer,
Expand All @@ -45,8 +44,8 @@ pin_project! {
#[pin]
limit_stream: L,

// The buffered vector that is updated with the main stream's items. It's
// used to provide missing items, e.g. when the limit increases.
// The buffered vector that is updated with the main stream's items.
// It's used to provide missing items, e.g. when the limit increases.
buffered_vector: Vector<VectorDiffContainerStreamElement<S>>,

// The current limit.
Expand All @@ -61,22 +60,47 @@ pin_project! {
}
}

impl<S, L> DynamicLimit<S, L>
impl<S> Limit<S, EmptyLimitStream>
where
S: Stream,
S::Item: VectorDiffContainer,
VectorDiffContainerStreamElement<S>: Clone + Send + Sync + 'static,
VectorDiffContainerStreamFamily<S>:
VectorDiffContainerFamily<Member<VectorDiffContainerStreamElement<S>> = S::Item>,
L: Stream<Item = usize>,
{
/// Create a new [`DynamicLimit`] with the given (unlimited) initial values,
/// stream of `VectorDiff` updates for those values, and a stream of
/// limits.
/// Create a new [`Limit`] with the given (unlimited) initial values,
/// stream of `VectorDiff` updates for those values, and a fixed limit.
///
/// Note that this adapter won't produce anything until the first limit is
/// polled.
/// Returns the truncated initial values as well as a stream of updates that
/// ensure that the resulting vector never exceeds the given limit.
pub fn new(
initial_values: Vector<VectorDiffContainerStreamElement<S>>,
inner_stream: S,
limit: usize,
) -> (Vector<VectorDiffContainerStreamElement<S>>, Self) {
Self::dynamic_with_initial_limit(initial_values, inner_stream, limit, EmptyLimitStream)
}
}

impl<S, L> Limit<S, L>
where
S: Stream,
S::Item: VectorDiffContainer,
VectorDiffContainerStreamElement<S>: Clone + Send + Sync + 'static,
VectorDiffContainerStreamFamily<S>:
VectorDiffContainerFamily<Member<VectorDiffContainerStreamElement<S>> = S::Item>,
L: Stream<Item = usize>,
{
/// Create a new [`Limit`] with the given (unlimited) initial values, stream
/// of `VectorDiff` updates for those values, and a stream of limits.
///
/// This is equivalent to `dynamic_with_initial_limit` where the
/// `initial_limit` is 0, except that it doesn't return the limited
/// vector as it would be empty anyways.
///
/// Note that the returned `Limit` won't produce anything until the first
/// limit is produced by the limit stream.
pub fn dynamic(
initial_values: Vector<VectorDiffContainerStreamElement<S>>,
inner_stream: S,
limit_stream: L,
Expand All @@ -89,9 +113,34 @@ where
ready_values: VecDeque::new(),
}
}

/// Create a new [`Limit`] with the given (unlimited) initial values, stream
/// of `VectorDiff` updates for those values, and an initial limit as well
/// as a stream of new limits.
pub fn dynamic_with_initial_limit(
mut initial_values: Vector<VectorDiffContainerStreamElement<S>>,
inner_stream: S,
initial_limit: usize,
limit_stream: L,
) -> (Vector<VectorDiffContainerStreamElement<S>>, Self) {
let buffered_vector = initial_values.clone();
if initial_limit < initial_values.len() {
initial_values.truncate(initial_limit);
}

let stream = Self {
inner_stream,
limit_stream,
buffered_vector,
limit: initial_limit,
ready_values: VecDeque::new(),
};

(initial_values, stream)
}
}

impl<S, L> Stream for DynamicLimit<S, L>
impl<S, L> Stream for Limit<S, L>
where
S: Stream,
S::Item: VectorDiffContainer,
Expand All @@ -105,7 +154,7 @@ where
}
}

impl<S, L> DynamicLimitProj<'_, S, L>
impl<S, L> LimitProj<'_, S, L>
where
S: Stream,
S::Item: VectorDiffContainer,
Expand Down Expand Up @@ -347,3 +396,16 @@ where
}
}
}

/// An empty stream with an item type of `usize`.
#[derive(Debug)]
#[non_exhaustive]
pub struct EmptyLimitStream;

impl Stream for EmptyLimitStream {
type Item = usize;

fn poll_next(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(None)
}
}
58 changes: 39 additions & 19 deletions eyeball-im-util/tests/it/limit.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,29 @@
use eyeball::Observable;
use eyeball_im::{ObservableVector, VectorDiff};
use eyeball_im_util::vector::DynamicLimit;
use eyeball_im_util::vector::Limit;
use imbl::vector;
use stream_assert::{assert_closed, assert_next_eq, assert_pending};

#[test]
fn static_limit() {
let mut ob: ObservableVector<usize> = ObservableVector::from(vector![1, 20, 300]);
let (limited, mut sub) = Limit::new(ob.clone(), ob.subscribe().into_stream(), 2);
assert_eq!(limited, vector![1, 20]);
assert_pending!(sub);

ob.pop_back();
assert_pending!(sub);

ob.pop_back();
assert_next_eq!(sub, VectorDiff::PopBack);
}

#[test]
fn pending_until_limit_emits_a_value() {
let mut ob: ObservableVector<usize> = ObservableVector::new();
let mut limit = Observable::new(0);
let mut sub =
DynamicLimit::new(ob.clone(), ob.subscribe().into_stream(), Observable::subscribe(&limit));
Limit::dynamic(ob.clone(), ob.subscribe().into_stream(), Observable::subscribe(&limit));

// Append new values…
ob.append(vector![10, 11, 12]);
Expand All @@ -31,9 +45,15 @@ fn pending_until_limit_emits_a_value() {
#[test]
fn increase_and_decrease_the_limit_on_an_empty_stream() {
let ob = ObservableVector::<usize>::new();
let mut limit = Observable::new(0);
let mut sub =
DynamicLimit::new(ob.clone(), ob.subscribe().into_stream(), Observable::subscribe(&limit));
let mut limit = Observable::new(1);
let (limited, mut sub) = Limit::dynamic_with_initial_limit(
ob.clone(),
ob.subscribe().into_stream(),
1,
Observable::subscribe(&limit),
);

assert!(limited.is_empty());

// `ob` is empty!

Expand All @@ -58,7 +78,7 @@ fn increase_and_decrease_the_limit_only() {
let mut ob = ObservableVector::<usize>::new();
let mut limit = Observable::new(0);
let mut sub =
DynamicLimit::new(ob.clone(), ob.subscribe().into_stream(), Observable::subscribe(&limit));
Limit::dynamic(ob.clone(), ob.subscribe().into_stream(), Observable::subscribe(&limit));

// Append 4 values.
ob.append(vector![10, 11, 12, 13]);
Expand Down Expand Up @@ -114,7 +134,7 @@ fn limit_is_zero() {
let mut ob = ObservableVector::<usize>::new();
let mut limit = Observable::new(0);
let mut sub =
DynamicLimit::new(ob.clone(), ob.subscribe().into_stream(), Observable::subscribe(&limit));
Limit::dynamic(ob.clone(), ob.subscribe().into_stream(), Observable::subscribe(&limit));

// Append 4 values.
ob.append(vector![10, 11, 12, 13]);
Expand Down Expand Up @@ -158,7 +178,7 @@ fn limit_is_polled_first() {
let mut ob = ObservableVector::<usize>::new();
let mut limit = Observable::new(0);
let mut sub =
DynamicLimit::new(ob.clone(), ob.subscribe().into_stream(), Observable::subscribe(&limit));
Limit::dynamic(ob.clone(), ob.subscribe().into_stream(), Observable::subscribe(&limit));

// Append 4 values.
ob.append(vector![10, 11, 12, 13]);
Expand Down Expand Up @@ -192,7 +212,7 @@ fn append() {
let mut ob = ObservableVector::<usize>::new();
let mut limit = Observable::new(0);
let mut sub =
DynamicLimit::new(ob.clone(), ob.subscribe().into_stream(), Observable::subscribe(&limit));
Limit::dynamic(ob.clone(), ob.subscribe().into_stream(), Observable::subscribe(&limit));

// Set limit to 4.
Observable::set(&mut limit, 4);
Expand Down Expand Up @@ -248,7 +268,7 @@ fn clear() {
let mut ob = ObservableVector::<usize>::new();
let mut limit = Observable::new(0);
let mut sub =
DynamicLimit::new(ob.clone(), ob.subscribe().into_stream(), Observable::subscribe(&limit));
Limit::dynamic(ob.clone(), ob.subscribe().into_stream(), Observable::subscribe(&limit));

// Set limit to 4.
Observable::set(&mut limit, 4);
Expand Down Expand Up @@ -278,7 +298,7 @@ fn push_front() {
let mut ob = ObservableVector::<usize>::new();
let mut limit = Observable::new(0);
let mut sub =
DynamicLimit::new(ob.clone(), ob.subscribe().into_stream(), Observable::subscribe(&limit));
Limit::dynamic(ob.clone(), ob.subscribe().into_stream(), Observable::subscribe(&limit));

// Set limit to 2.
Observable::set(&mut limit, 2);
Expand Down Expand Up @@ -331,7 +351,7 @@ fn push_back() {
let mut ob = ObservableVector::<usize>::new();
let mut limit = Observable::new(0);
let mut sub =
DynamicLimit::new(ob.clone(), ob.subscribe().into_stream(), Observable::subscribe(&limit));
Limit::dynamic(ob.clone(), ob.subscribe().into_stream(), Observable::subscribe(&limit));

// Set limit to 2.
Observable::set(&mut limit, 2);
Expand Down Expand Up @@ -381,7 +401,7 @@ fn pop_front() {
let mut ob = ObservableVector::<usize>::new();
let mut limit = Observable::new(0);
let mut sub =
DynamicLimit::new(ob.clone(), ob.subscribe().into_stream(), Observable::subscribe(&limit));
Limit::dynamic(ob.clone(), ob.subscribe().into_stream(), Observable::subscribe(&limit));

// Add 4 values.
ob.append(vector![10, 11, 12, 13]);
Expand Down Expand Up @@ -434,7 +454,7 @@ fn pop_back() {
let mut ob = ObservableVector::<usize>::new();
let mut limit = Observable::new(0);
let mut sub =
DynamicLimit::new(ob.clone(), ob.subscribe().into_stream(), Observable::subscribe(&limit));
Limit::dynamic(ob.clone(), ob.subscribe().into_stream(), Observable::subscribe(&limit));

// Add 4 values.
ob.append(vector![10, 11, 12, 13]);
Expand Down Expand Up @@ -484,7 +504,7 @@ fn insert() {
let mut ob = ObservableVector::<usize>::new();
let mut limit = Observable::new(0);
let mut sub =
DynamicLimit::new(ob.clone(), ob.subscribe().into_stream(), Observable::subscribe(&limit));
Limit::dynamic(ob.clone(), ob.subscribe().into_stream(), Observable::subscribe(&limit));

// Add 2 values.
ob.append(vector![10, 11]);
Expand Down Expand Up @@ -542,7 +562,7 @@ fn set() {
let mut ob = ObservableVector::<usize>::new();
let mut limit = Observable::new(0);
let mut sub =
DynamicLimit::new(ob.clone(), ob.subscribe().into_stream(), Observable::subscribe(&limit));
Limit::dynamic(ob.clone(), ob.subscribe().into_stream(), Observable::subscribe(&limit));

// Add 3 values.
ob.append(vector![10, 11, 12]);
Expand Down Expand Up @@ -592,7 +612,7 @@ fn remove() {
let mut ob = ObservableVector::<usize>::new();
let mut limit = Observable::new(0);
let mut sub =
DynamicLimit::new(ob.clone(), ob.subscribe().into_stream(), Observable::subscribe(&limit));
Limit::dynamic(ob.clone(), ob.subscribe().into_stream(), Observable::subscribe(&limit));

// Add 5 values.
ob.append(vector![10, 11, 12, 13, 14]);
Expand Down Expand Up @@ -645,7 +665,7 @@ fn truncate() {
let mut ob = ObservableVector::<usize>::new();
let mut limit = Observable::new(0);
let mut sub =
DynamicLimit::new(ob.clone(), ob.subscribe().into_stream(), Observable::subscribe(&limit));
Limit::dynamic(ob.clone(), ob.subscribe().into_stream(), Observable::subscribe(&limit));

// Add 5 values.
ob.append(vector![10, 11, 12, 13, 14]);
Expand Down Expand Up @@ -695,7 +715,7 @@ fn reset() {
let mut ob = ObservableVector::<usize>::with_capacity(2);
let mut limit = Observable::new(0);
let mut sub =
DynamicLimit::new(ob.clone(), ob.subscribe().into_stream(), Observable::subscribe(&limit));
Limit::dynamic(ob.clone(), ob.subscribe().into_stream(), Observable::subscribe(&limit));

// Add 1 value.
ob.append(vector![10]);
Expand Down