Skip to content

Commit

Permalink
im: Add truncate method, VectorDiff variant
Browse files Browse the repository at this point in the history
  • Loading branch information
jplatte committed Sep 11, 2023
1 parent e2bca57 commit c19706c
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 0 deletions.
18 changes: 18 additions & 0 deletions eyeball-im-util/src/vector/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,14 @@ pin_project! {
pub(super) struct FilterImpl<S> {
#[pin]
inner: S,
// Original indices of the elements the filter was applied to.
//
// For example, if the first element of this list is 1, that means the
// first original element got filtered out so a set for index = 1 should
// translate to a set for index = 0 on the filtered elements (if the
// filter still matches after the set operation).
filtered_indices: VecDeque<usize>,
// Length of the original vector (before filter).
original_len: usize,
}
}
Expand Down Expand Up @@ -355,6 +362,15 @@ where
result
}

fn handle_truncate<U>(&mut self, len: usize) -> Option<VectorDiff<U>> {
*self.original_len = len;
let new_filtered_len = self.filtered_indices.iter().take_while(|&&idx| idx < len).count();
(new_filtered_len < self.filtered_indices.len()).then(|| {
self.filtered_indices.truncate(new_filtered_len);
VectorDiff::Truncate { length: new_filtered_len }
})
}

fn handle_reset_filter<F>(
&mut self,
values: Vector<VectorDiffContainerStreamElement<S>>,
Expand Down Expand Up @@ -405,6 +421,7 @@ where
VectorDiff::Insert { index, value } => self.handle_insert(index, value, &f2),
VectorDiff::Set { index, value } => self.handle_set(index, value, &f2),
VectorDiff::Remove { index } => self.handle_remove(index),
VectorDiff::Truncate { length } => self.handle_truncate(length),
VectorDiff::Reset { values } => self.handle_reset_filter(values, f),
});

Expand Down Expand Up @@ -438,6 +455,7 @@ where
VectorDiff::Insert { index, value } => self.handle_insert(index, value, f),
VectorDiff::Set { index, value } => self.handle_set(index, value, f),
VectorDiff::Remove { index } => self.handle_remove(index),
VectorDiff::Truncate { length } => self.handle_truncate(length),
VectorDiff::Reset { values } => self.handle_reset_filter_map(values, f),
});

Expand Down
49 changes: 49 additions & 0 deletions eyeball-im-util/tests/it/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,55 @@ fn remove() {
assert_next_eq!(sub, VectorDiff::Remove { index: 0 });
}

#[test]
fn truncate_matching_prefix() {
let mut ob: ObservableVector<i32> = ObservableVector::from(vector![5, 1, 10, -1, -2, -10]);
let (_, mut sub) = Filter::new(ob.clone(), ob.subscribe().into_stream(), |&i| i > 0);

ob.truncate(4); // remove some non-matching elements
assert_pending!(sub);

ob.truncate(3); // remove remaining non-matching element
assert_pending!(sub);

ob.truncate(1); // remove some matching elements
assert_next_eq!(sub, VectorDiff::Truncate { length: 1 });
}

#[test]
fn truncate_matching_suffix() {
let mut ob: ObservableVector<i32> = ObservableVector::from(vector![-1, -2, -10, 5, 1, 10]);
let (_, mut sub) = Filter::new(ob.clone(), ob.subscribe().into_stream(), |&i| i > 0);

ob.truncate(5); // remove one matching elements
assert_next_eq!(sub, VectorDiff::Truncate { length: 2 });

ob.truncate(3); // remove remaining matching elements
assert_next_eq!(sub, VectorDiff::Truncate { length: 0 });

ob.truncate(1); // remove some non-matching elements
assert_pending!(sub);
}

#[test]
fn truncate_complex() {
let mut ob: ObservableVector<i32> =
ObservableVector::from(vector![-17, 5, 1, -5, 10, -1, -2, 10]);
let (_, mut sub) = Filter::new(ob.clone(), ob.subscribe().into_stream(), |&i| i > 0);

ob.truncate(6); // remove non-matching, matching
assert_next_eq!(sub, VectorDiff::Truncate { length: 3 });

ob.truncate(4); // remove matching, non-matching
assert_next_eq!(sub, VectorDiff::Truncate { length: 2 });

ob.truncate(1); // remove 2 x matching, 1 x non-matching
assert_next_eq!(sub, VectorDiff::Truncate { length: 0 });

ob.truncate(0); // remove last non-matching
assert_pending!(sub);
}

#[test]
fn reset() {
let mut ob: ObservableVector<i32> = ObservableVector::with_capacity(1);
Expand Down
5 changes: 5 additions & 0 deletions eyeball-im/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# unreleased

- Add `truncate` method to `ObservableVector`, `ObservableVectorTransaction`
- Add `VectorDiff::Truncate { length: usize }`

# 0.3.2

- Fix transaction commit not working when there are no subscribers
Expand Down
20 changes: 20 additions & 0 deletions eyeball-im/src/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,20 @@ impl<T: Clone + Send + Sync + 'static> ObservableVector<T> {
}
}

/// Truncate the vector to `len` elements and notify subscribers.
///
/// Does nothing if `len` is greater or equal to the vector's current
/// length.
pub fn truncate(&mut self, len: usize) {
if len < self.len() {
#[cfg(feature = "tracing")]
tracing::debug!(target: "eyeball_im::vector::update", "truncate(len = {len})");

self.values.truncate(len);
self.broadcast_diff(VectorDiff::Truncate { length: len });
}
}

/// Gets an entry for the given index, through which only the element at
/// that index alone can be updated or removed.
///
Expand Down Expand Up @@ -368,6 +382,11 @@ pub enum VectorDiff<T> {
/// The index that the removed element had.
index: usize,
},
/// Truncation of the vector.
Truncate {
/// The number of elements that remain.
length: usize,
},
/// The subscriber lagged too far behind, and the next update that should
/// have been received has already been discarded from the internal buffer.
Reset {
Expand All @@ -390,6 +409,7 @@ impl<T: Clone> VectorDiff<T> {
VectorDiff::Insert { index, value } => VectorDiff::Insert { index, value: f(value) },
VectorDiff::Set { index, value } => VectorDiff::Set { index, value: f(value) },
VectorDiff::Remove { index } => VectorDiff::Remove { index },
VectorDiff::Truncate { length } => VectorDiff::Truncate { length },
VectorDiff::Reset { values } => VectorDiff::Reset { values: vector_map(values, f) },
}
}
Expand Down
14 changes: 14 additions & 0 deletions eyeball-im/src/vector/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,20 @@ impl<'o, T: Clone + Send + Sync + 'static> ObservableVectorTransaction<'o, T> {
}
}

/// Truncate the vector to `len` elements and notify subscribers.
///
/// Does nothing if `len` is greater or equal to the vector's current
/// length.
pub fn truncate(&mut self, len: usize) {
if len < self.len() {
#[cfg(feature = "tracing")]
tracing::debug!(target: "eyeball_im::vector::update", "truncate(len = {len})");

self.values.truncate(len);
self.add_to_batch(VectorDiff::Truncate { length: len });
}
}

/// Gets an entry for the given index through which only the element at that
/// index alone can be updated or removed.
///
Expand Down
19 changes: 19 additions & 0 deletions eyeball-im/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,25 @@ fn lag2() {
assert_pending!(sub);
}

#[test]
fn truncate() {
let mut ob: ObservableVector<i32> = ObservableVector::from(vector![1, 2]);
let mut sub = ob.subscribe().into_stream();

ob.truncate(3);
ob.truncate(2);
assert_pending!(sub);
assert_eq!(*ob, vector![1, 2]);

ob.truncate(1);
assert_next_eq!(sub, VectorDiff::Truncate { length: 1 });
assert_eq!(*ob, vector![1]);

ob.truncate(0);
assert_next_eq!(sub, VectorDiff::Truncate { length: 0 });
assert!(ob.is_empty());
}

#[test]
fn for_each() {
let mut ob: ObservableVector<i32> = ObservableVector::from(vector![0, 10, 1, 2, 4, 33, 5]);
Expand Down

0 comments on commit c19706c

Please sign in to comment.