Skip to content

Commit

Permalink
feat: add sorted_set_fetch_by_index (#148)
Browse files Browse the repository at this point in the history
* feat: add sorted_set_fetch_by_index

Adds a function to the simple cache client to fetch elements of a
sorted set selecting them by index (rank).
  • Loading branch information
brayniac authored Jul 6, 2023
1 parent 2ee5c45 commit 9a77988
Show file tree
Hide file tree
Showing 3 changed files with 236 additions and 2 deletions.
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,7 @@ pub type MomentoResult<T> = Result<T, MomentoError>;

pub mod sorted_set {
pub use momento_protos::cache_client::sorted_set_fetch_request::{Order, Range};
pub use momento_protos::cache_client::sorted_set_fetch_response::found::Elements;
pub use momento_protos::cache_client::sorted_set_fetch_response::SortedSet;
pub use momento_protos::cache_client::SortedSetElement;
}
57 changes: 56 additions & 1 deletion src/response/cache_sorted_set_fetch_response.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,62 @@
use momento_protos::cache_client::SortedSetElement;
use crate::sorted_set::SortedSetElement;
use crate::MomentoError;
use core::convert::TryFrom;

#[derive(Debug)]
#[non_exhaustive]
pub struct MomentoSortedSetFetchResponse {
pub value: Option<Vec<SortedSetElement>>,
}

pub enum SortedSetFetch {
Hit { elements: Vec<SortedSetElement> },
Miss,
}

impl TryFrom<SortedSetFetch> for Vec<(Vec<u8>, f64)> {
type Error = MomentoError;

fn try_from(value: SortedSetFetch) -> Result<Self, Self::Error> {
match value {
SortedSetFetch::Hit { mut elements } => {
let result: Vec<(Vec<u8>, f64)> =
elements.drain(..).map(|e| (e.value, e.score)).collect();
Ok(result)
}
SortedSetFetch::Miss => Err(MomentoError::Miss {
description: std::borrow::Cow::Borrowed("sorted set was not found"),
}),
}
}
}

impl TryFrom<SortedSetFetch> for Vec<(String, f64)> {
type Error = MomentoError;

fn try_from(value: SortedSetFetch) -> Result<Self, Self::Error> {
match value {
SortedSetFetch::Hit { mut elements } => {
let mut result = Vec::with_capacity(elements.len());
for element in elements.drain(..) {
match String::from_utf8(element.value) {
Ok(value) => {
result.push((value, element.score));
}
Err(e) => {
return Err::<Self, Self::Error>(MomentoError::TypeError {
description: std::borrow::Cow::Borrowed(
"element value was not a valid utf-8 string",
),
source: Box::new(e),
})
}
}
}
Ok(result)
}
SortedSetFetch::Miss => Err(MomentoError::Miss {
description: std::borrow::Cow::Borrowed("sorted set was not found"),
}),
}
}
}
179 changes: 178 additions & 1 deletion src/simple_cache_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::response::{
MomentoDictionarySetResponse, MomentoError, MomentoFlushCacheResponse,
MomentoGenerateApiTokenResponse, MomentoListCacheResponse, MomentoListFetchResponse,
MomentoListSigningKeyResult, MomentoSetDifferenceResponse, MomentoSetFetchResponse,
MomentoSetResponse, MomentoSigningKey, MomentoSortedSetFetchResponse,
MomentoSetResponse, MomentoSigningKey, MomentoSortedSetFetchResponse, SortedSetFetch,
};
use crate::sorted_set;
use crate::utils::{self, base64_encode};
Expand Down Expand Up @@ -1873,6 +1873,183 @@ impl SimpleCacheClient {
todo!("This api was reworked and is pending implementation in the rust sdk: https://github.com/momentohq/client-sdk-rust/issues/135");
}

/// Fetches a range of elements from a sorted set from a Momento Cache
/// selecting by index (rank).
///
/// *NOTE*: This is preview functionality and requires that you contact
/// Momento Support to enable these APIs for your cache.
///
/// # Arguments
///
/// * `_cache_name` - name of cache.
/// * `_set_name` - name of the set.
/// * `_order` - specify ascending or descending order
/// * `_range` - constrain to a range of elements by index or by score
///
/// # Example
/// ```
/// # fn main() -> momento_test_util::DoctestResult {
/// # momento_test_util::doctest(|cache_name, credential_provider| async move {
/// use std::convert::TryInto;
/// use std::time::Duration;
/// use momento::{CollectionTtl, SimpleCacheClientBuilder};
/// use momento::sorted_set::{Elements, Order, SortedSetElement};
///
/// let ttl = CollectionTtl::default();
/// let mut momento = SimpleCacheClientBuilder::new(credential_provider, Duration::from_secs(30))?
/// .build();
///
/// // some elements that we'll add to the sorted set
/// let test_elements = vec![
/// SortedSetElement { value: "a".into(), score: 2.0 },
/// SortedSetElement { value: "b".into(), score: 3.0 },
/// SortedSetElement { value: "c".into(), score: 5.0 },
/// SortedSetElement { value: "d".into(), score: 7.0 },
/// SortedSetElement { value: "e".into(), score: 11.0 },
/// SortedSetElement { value: "f".into(), score: 13.0 },
/// ];
///
/// // add some elements to a sorted set
/// let result = momento.sorted_set_put(
/// &cache_name,
/// "test sorted set",
/// test_elements.clone(),
/// ttl,
/// ).await?;
///
/// // this will fetch the all the elements (with their scores) in the
/// // sorted set, sorted by index (rank) in ascending order. Any valid
/// // `core::ops::Range` can be used to select and limit the returned
/// // elements. Here we use `0..` to select all the elements.
/// let result = momento.sorted_set_fetch_by_index(
/// &cache_name,
/// "test sorted set",
/// Order::Ascending,
/// 0..,
/// ).await?;
///
/// if let Ok(elements) = TryInto::<Vec<(Vec<u8>, f64)>>::try_into(result) {
/// // we only set 6 elements, check that we got exactly what we set
/// assert_eq!(elements.len(), test_elements.len());
///
/// // we can iterate and print the elements
/// for (idx, (value, score)) in elements.iter().enumerate() {
/// println!("value: {:?} score: {score}", value);
///
/// // check that the value is correct
/// assert_eq!(*value, test_elements[idx].value);
/// }
/// } else {
/// panic!("sorted set was missing or the response was invalid");
/// }
///
/// // by changing the range, we can get a subset of the sorted set. This
/// // will select just first 3 elements, again in ascending order.
/// let result = momento.sorted_set_fetch_by_index(
/// &cache_name,
/// "test sorted set",
/// Order::Ascending,
/// 0..3,
/// ).await?;
///
/// if let Ok(elements) = TryInto::<Vec<(Vec<u8>, f64)>>::try_into(result) {
/// // we only wanted 3 elements, check that we got 3
/// assert_eq!(elements.len(), 3);
///
/// // check that the values are correct
/// for (idx, (value, _score)) in elements.iter().enumerate() {
/// assert_eq!(*value, test_elements[idx].value);
/// }
/// } else {
/// panic!("sorted set was missing or the response was invalid");
/// }
/// # Ok(())
/// # })
/// # }
/// ```
pub async fn sorted_set_fetch_by_index(
&mut self,
cache_name: &str,
set_name: impl IntoBytes,
order: sorted_set::Order,
range: impl RangeBounds<i32>,
) -> MomentoResult<SortedSetFetch> {
use core::ops::Bound;
use sorted_set_fetch_request::by_index::{End, Start};
use sorted_set_fetch_request::{ByIndex, Range};
use sorted_set_fetch_response::found::Elements;

// transforms the Rust `Range` start into a Momento start index. Because
// the Momento start index is always Inclusive (or Unbounded) we need to
// map the value of a Rust `Excluded` start bound to an
// `InclusiveStartIndex` by adding one to the value.
let start = match range.start_bound() {
Bound::Included(v) => Start::InclusiveStartIndex(*v),
Bound::Excluded(v) => Start::InclusiveStartIndex(*v + 1),
Bound::Unbounded => Start::UnboundedStart(Unbounded {}),
};

// transforms the Rust `Range` end into a Momento end index. Because the
// Momento end index is always Exclusive (or Unbounded) we need to map
// the value of a Rust `Included` end bound to an `ExclusiveEndIndex` by
// adding one to the value.
let end = match range.end_bound() {
Bound::Included(v) => End::ExclusiveEndIndex(*v + 1),
Bound::Excluded(v) => End::ExclusiveEndIndex(*v),
Bound::Unbounded => End::UnboundedEnd(Unbounded {}),
};

let request = self.prep_request(
cache_name,
SortedSetFetchRequest {
set_name: set_name.into_bytes(),
order: order.into(),
with_scores: true,
range: Some(Range::ByIndex(ByIndex {
start: Some(start),
end: Some(end),
})),
},
)?;

let response = self
.data_client
.sorted_set_fetch(request)
.await?
.into_inner();

// this flattens the response returning a None for both a missing sorted
// set and for a request that returns Found with elements being None and
// converting the SortedSet enum into the interior collection of
// elements.
match response.sorted_set {
Some(crate::sorted_set::SortedSet::Found(elements)) => match elements.elements {
Some(elements) => match elements {
Elements::ValuesWithScores(elements) => Ok(SortedSetFetch::Hit {
elements: elements.elements,
}),
Elements::Values(_) => {
return Err(MomentoError::ClientSdkError {
description: std::borrow::Cow::Borrowed(
"sorted_set_fetch_by_index response included elements without values"
),
source: crate::response::ErrorSource::Unknown(
std::io::Error::new(
std::io::ErrorKind::InvalidData,
"unexpected response"
).into()),
});
}
},
None => Ok(SortedSetFetch::Hit {
elements: Vec::new(),
}),
},
Some(sorted_set_fetch_response::SortedSet::Missing(_)) => Ok(SortedSetFetch::Miss),
None => Ok(SortedSetFetch::Miss),
}
}

/// Gets the rank of an element in a sorted set in a Momento Cache.
///
/// The return result is a `MomentoResult` which on success contains an
Expand Down

0 comments on commit 9a77988

Please sign in to comment.