Skip to content

Commit

Permalink
iterator: typed API for new deserialization framework
Browse files Browse the repository at this point in the history
This commit finishes the work related to adjusting the iterators module
to the new deserialization framework.

The previous commit brought RawIterator, which can deserialize
ColumnIterators. This commit introduces new TypedRowIterator, which
type-checks once and then deserializes from ColumnIterators into rows.
RawIterator can be converted to TypedRowIterator by calling the
`into_typed()` method.

Unfortunately, due to the limitations of the Stream trait (no support
for lending streams, analogous to lending iterators in case of
RawRowsLendingIterator), a Stream cannot be used to deserialize borrowed
types (i.e. those that borrow from the frame serialized contents).

In order to give users both capabilities:
1) deserializing borrowed types (for efficiency),
2) deserializing using Stream (for convienience),
two distinct types are used: TypedRowIterator and TypedRowStream.
The first supports borrowed types and the second implements Stream.

To sum up, instead of `RowIterator` (returning `Row`s) and
`TypedRowIterator` (returning instances of the target type) both
implementing `Stream`, now we have the following:
- `RawIterator`
  - cannot implement `Stream`, because returns `ColumnIterator`s that
    borrow from it,
  - provide `type_check()` and `next()` methods that can be used for
    low-level, manual deserialization (not recommended for ordinary
    users)
  - supports deserializing manually borrowed types (such as `&str`).
- `TypedRowIterator`
  - created by calling `into_typed::<TargetType>()` on `RawIterator`,
  - type checks upon creation,
  - supports deserializing borrowed types (such as `&str`),
  - does not implement `Stream` in order to support borrowed types,
  - provides basic Stream-like methods (`next()`, `try_next()`),
- `TypedRowStream`
  - created by calling `into_stream()` on `TypedRowIterator`,
  - implements `Stream` and hence does not support borrowed types.

Co-authored-by: Piotr Dulikowski <[email protected]>
  • Loading branch information
wprzytula and piodul committed Nov 4, 2024
1 parent 6be175b commit 1f598ca
Showing 1 changed file with 132 additions and 1 deletion.
133 changes: 132 additions & 1 deletion scylla/src/transport/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use scylla_cql::frame::response::result::RawMetadataAndRawRows;
use scylla_cql::frame::response::NonErrorResponse;
use scylla_cql::types::deserialize::result::RawRowsLendingIterator;
use scylla_cql::types::deserialize::row::{ColumnIterator, DeserializeRow};
use scylla_cql::types::deserialize::TypeCheckError;
use scylla_cql::types::serialize::row::SerializedValues;
use std::result::Result;
use thiserror::Error;
Expand Down Expand Up @@ -540,7 +541,10 @@ where
/// An intermediate object that allows to construct an iterator over a query
/// that is asynchronously paged in the background.
///
/// TODO: implement and describe the new API
/// Before the results can be processed in a convenient way, the RawIterator
/// needs to be cast into a typed iterator. This is done by use of `into_typed()` method.
/// As the method is generic over the target type, the turbofish syntax
/// can come in handy there, e.g. `raw_iter.into_typed::<(i32, &str, Uuid)>()`.
///
/// A pre-0.15.0 interface is also available, although deprecated:
/// `into_legacy()` method converts RawIterator to LegacyRowIterator,
Expand Down Expand Up @@ -652,6 +656,30 @@ impl RawIterator {
Poll::Ready(Some(Ok(())))
}

/// Type-checks the iterator against given type.
///
/// This is automatically called upon transforming [RawIterator] into [TypedRowIterator].
/// Can be used with `next()` for manual deserialization. See `next()` for an example.
#[inline]
pub fn type_check<'frame, 'metadata, RowT: DeserializeRow<'frame, 'metadata>>(
&self,
) -> Result<(), TypeCheckError> {
RowT::type_check(self.column_specs().inner())
}

/// Casts the iterator to a given row type, enabling Stream'ed operations
/// on rows, which deserialize them in-fly to that given type.
/// Begins with performing type check.
#[inline]
pub fn into_typed<'frame, 'metadata, RowT: DeserializeRow<'frame, 'metadata>>(
self,
) -> Result<TypedRowIterator<RowT>, TypeCheckError>
where
'frame: 'metadata,
{
TypedRowIterator::<RowT>::new(self)
}

/// Converts this iterator into an iterator over rows parsed as given type,
/// using the legacy deserialization framework.
#[inline]
Expand Down Expand Up @@ -961,6 +989,109 @@ impl RawIterator {
}
}

/// Returned by [RawIterator::into_typed].
///
/// Does not implement [Stream], but permits deserialization of borrowed types.
/// To use [Stream] API (only accessible for owned types), use [TypedRowIterator::into_stream].
pub struct TypedRowIterator<RowT> {
raw_iterator: RawIterator,
_phantom: std::marker::PhantomData<RowT>,
}

impl<RowT> Unpin for TypedRowIterator<RowT> {}

impl<'frame, 'metadata, RowT> TypedRowIterator<RowT>
where
'frame: 'metadata,
RowT: DeserializeRow<'frame, 'metadata>,
{
fn new(raw_iterator: RawIterator) -> Result<Self, TypeCheckError> {
raw_iterator.type_check::<RowT>()?;

Ok(Self {
raw_iterator,
_phantom: Default::default(),
})
}

/// If tracing was enabled, returns tracing ids of all finished page queries.
#[inline]
pub fn tracing_ids(&self) -> &[Uuid] {
self.raw_iterator.tracing_ids()
}

/// Returns specification of row columns
#[inline]
pub fn column_specs(&self) -> ColumnSpecs {
self.raw_iterator.column_specs()
}

/// Stream-like next() implementation for TypedRowIterator.
///
/// It also works with borrowed types! For example, &str is supported.
/// However, this is not a Stream. To create a Stream, use `into_stream()`.
#[inline]
pub async fn next(&'frame mut self) -> Option<Result<RowT, QueryError>> {
self.raw_iterator.next().await.map(|res| {
res.and_then(|column_iterator| {
<RowT as DeserializeRow>::deserialize(column_iterator)
.map_err(|err| RowsParseError::from(err).into())
})
})
}

/// Stream-like try_next() implementation for TypedRowIterator.
///
/// It also works with borrowed types! For example, &str is supported.
/// However, this is not a Stream. To create a Stream, use `into_stream()`.
#[inline]
pub async fn try_next(&'frame mut self) -> Result<Option<RowT>, QueryError> {
self.next().await.transpose()
}
}

impl<RowT: 'static> TypedRowIterator<RowT> {
/// Transforms [TypedRowIterator] into [TypedRowStream].
///
/// If you deserialize to owned types only, use this method to unleash power of the `Stream` API.
/// This operation involves no runtime cost, but it limits the iterator to owned types only.
/// Therefore, if you want to work with borrowed types (e.g., to avoid heap allocations),
/// you can't use the `Stream` trait.
pub fn into_stream(self) -> TypedRowStream<RowT> {
TypedRowStream {
typed_row_iterator: self,
}
}
}

/// Returned by [TypedRowIterator::into_stream].
///
/// Implements [Stream], but only permits deserialization of owned types.
pub struct TypedRowStream<RowT: 'static> {
typed_row_iterator: TypedRowIterator<RowT>,
}

impl<RowT> Unpin for TypedRowStream<RowT> {}

/// Stream implementation for TypedRowStream.
///
/// It only works with owned types! For example, &str is not supported.
impl<RowT> Stream for TypedRowStream<RowT>
where
RowT: for<'r> DeserializeRow<'r, 'r>,
{
type Item = Result<RowT, QueryError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut s = self.as_mut();

let next_fut = s.typed_row_iterator.next();
futures::pin_mut!(next_fut);
let value = ready_some_ok!(next_fut.poll(cx));
Poll::Ready(Some(Ok(value)))
}
}

/// Iterator over rows returned by paged queries.
///
/// Allows to easily access rows without worrying about handling multiple pages.
Expand Down

0 comments on commit 1f598ca

Please sign in to comment.