Skip to content

Commit

Permalink
im-util: Add constructors for Filter and FilterMap
Browse files Browse the repository at this point in the history
… to allow them to be nested.
  • Loading branch information
jplatte committed Aug 15, 2023
1 parent 71e82a7 commit 784a535
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 82 deletions.
52 changes: 6 additions & 46 deletions eyeball-im-util/src/vector.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
//! Utilities around [`ObservableVector`].

use std::collections::VecDeque;

use eyeball_im::{ObservableVector, Vector, VectorSubscriber};

mod filter;

use self::filter::FilterImpl;
pub use self::filter::{Filter, FilterMap};

/// Extension trait for [`ObservableVector`].
Expand All @@ -18,7 +15,7 @@ where
///
/// Returns a filtered version of the current vector, and a subscriber to
/// get updates through.
fn subscribe_filter<F>(&self, filter: F) -> (Vector<T>, Filter<VectorSubscriber<T>, F>)
fn subscribe_filter<F>(&self, f: F) -> (Vector<T>, Filter<VectorSubscriber<T>, F>)
where
F: Fn(&T) -> bool;

Expand All @@ -27,10 +24,7 @@ where
///
/// Returns a filtered + mapped version of the current vector, and a
/// subscriber to get updates through.
fn subscribe_filter_map<U, F>(
&self,
filter: F,
) -> (Vector<U>, FilterMap<VectorSubscriber<T>, F>)
fn subscribe_filter_map<U, F>(&self, f: F) -> (Vector<U>, FilterMap<VectorSubscriber<T>, F>)
where
U: Clone,
F: Fn(T) -> Option<U>;
Expand All @@ -40,52 +34,18 @@ impl<T> VectorExt<T> for ObservableVector<T>
where
T: Clone + Send + Sync + 'static,
{
fn subscribe_filter<F>(&self, filter: F) -> (Vector<T>, Filter<VectorSubscriber<T>, F>)
fn subscribe_filter<F>(&self, f: F) -> (Vector<T>, Filter<VectorSubscriber<T>, F>)
where
F: Fn(&T) -> bool,
{
let mut filtered_indices = VecDeque::new();
let mut v = (*self).clone();

let mut original_idx = 0;
v.retain(|val| {
let keep = filter(val);
if keep {
filtered_indices.push_back(original_idx);
}
original_idx += 1;
keep
});

let inner = self.subscribe();
let original_len = self.len();
let inner = FilterImpl { inner, filtered_indices, original_len };
let sub = Filter { inner, filter };

(v, sub)
Filter::new((*self).clone(), self.subscribe(), f)
}

fn subscribe_filter_map<U, F>(
&self,
filter: F,
) -> (Vector<U>, FilterMap<VectorSubscriber<T>, F>)
fn subscribe_filter_map<U, F>(&self, f: F) -> (Vector<U>, FilterMap<VectorSubscriber<T>, F>)
where
U: Clone,
F: Fn(T) -> Option<U>,
{
let (v, filtered_indices) = self
.iter()
.enumerate()
.filter_map(|(original_idx, val)| {
filter(val.clone()).map(|mapped| (mapped, original_idx))
})
.unzip();

let inner = self.subscribe();
let original_len = self.len();
let inner = FilterImpl { inner, filtered_indices, original_len };
let sub = FilterMap { inner, filter };

(v, sub)
FilterMap::new((*self).clone(), self.subscribe(), f)
}
}
123 changes: 87 additions & 36 deletions eyeball-im-util/src/vector/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,49 @@ pin_project! {
/// Created through [`VectorExt::subscribe_filtered`].
pub struct Filter<S, F> {
#[pin]
pub(super) inner: FilterImpl<S>,
pub(super) filter: F,
inner: FilterImpl<S>,
filter: F,
}
}

impl<S, T, F> Filter<S, F>
where
S: Stream<Item = VectorDiff<T>>,
T: Clone + Send + Sync + 'static,
F: Fn(&T) -> bool,
{
/// Create a new `Filter` with the given (unfiltered) initial values, stream
/// of `VectorDiff` updates for those values, and filter.
pub fn new(mut values: Vector<T>, inner: S, filter: F) -> (Vector<T>, Self) {
let original_len = values.len();
let mut filtered_indices = VecDeque::new();

let mut original_idx = 0;
values.retain(|val| {
let keep = filter(val);
if keep {
filtered_indices.push_back(original_idx);
}
original_idx += 1;
keep
});

let inner = FilterImpl { inner, filtered_indices, original_len };
(values, Self { inner, filter })
}
}

impl<S, T, F> Stream for Filter<S, F>
where
S: Stream<Item = VectorDiff<T>>,
T: Clone + Send + Sync + 'static,
F: Fn(&T) -> bool,
{
type Item = VectorDiff<T>;

fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
let projected = self.project();
projected.inner.project().handle_diff_filter(&*projected.filter, cx)
}
}

Expand All @@ -28,18 +69,57 @@ pin_project! {
/// Created through [`VectorExt::subscribe_filter_mapped`].
pub struct FilterMap<S, F> {
#[pin]
pub(super) inner: FilterImpl<S>,
pub(super) filter: F,
inner: FilterImpl<S>,
filter: F,
}
}

impl<S, T, U, F> FilterMap<S, F>
where
S: Stream<Item = VectorDiff<T>>,
T: Clone + Send + Sync + 'static,
U: Clone,
F: Fn(T) -> Option<U>,
{
/// Create a new `Filter` with the given (un-filter+mapped) initial values,
/// stream of `VectorDiff` updates for those values, and filter.
pub fn new(values: Vector<T>, inner: S, filter: F) -> (Vector<U>, Self) {
let original_len = values.len();
let (values, filtered_indices) = values
.iter()
.enumerate()
.filter_map(|(original_idx, val)| {
filter(val.clone()).map(|mapped| (mapped, original_idx))
})
.unzip();

let inner = FilterImpl { inner, filtered_indices, original_len };
(values, Self { inner, filter })
}
}

impl<S, T, U, F> Stream for FilterMap<S, F>
where
S: Stream<Item = VectorDiff<T>>,
T: Clone + Send + Sync + 'static,
U: Clone,
F: Fn(T) -> Option<U>,
{
type Item = VectorDiff<U>;

fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
let projected = self.project();
projected.inner.project().handle_diff_filter_map(&*projected.filter, cx)
}
}

pin_project! {
#[project = FilterImplProj]
pub(super) struct FilterImpl<S> {
#[pin]
pub(super) inner: S,
pub(super) filtered_indices: VecDeque<usize>,
pub(super) original_len: usize,
inner: S,
filtered_indices: VecDeque<usize>,
original_len: usize,
}
}

Expand Down Expand Up @@ -307,32 +387,3 @@ where
}
}
}

impl<S, T, F> Stream for Filter<S, F>
where
S: Stream<Item = VectorDiff<T>>,
T: Clone + Send + Sync + 'static,
F: Fn(&T) -> bool,
{
type Item = VectorDiff<T>;

fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
let projected = self.project();
projected.inner.project().handle_diff_filter(&*projected.filter, cx)
}
}

impl<S, T, U, F> Stream for FilterMap<S, F>
where
S: Stream<Item = VectorDiff<T>>,
T: Clone + Send + Sync + 'static,
U: Clone,
F: Fn(T) -> Option<U>,
{
type Item = VectorDiff<U>;

fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
let projected = self.project();
projected.inner.project().handle_diff_filter_map(&*projected.filter, cx)
}
}

0 comments on commit 784a535

Please sign in to comment.