From 1f598ca7dfcf2d3071d2c32d3c311360977a91b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Tue, 22 Oct 2024 20:43:43 +0200 Subject: [PATCH] iterator: typed API for new deserialization framework This commit finishes the work related to adjusting the iterators module to the new deserialization framework. The previous commit brought RawIterator, which can deserialize ColumnIterators. This commit introduces new TypedRowIterator, which type-checks once and then deserializes from ColumnIterators into rows. RawIterator can be converted to TypedRowIterator by calling the `into_typed()` method. Unfortunately, due to the limitations of the Stream trait (no support for lending streams, analogous to lending iterators in case of RawRowsLendingIterator), a Stream cannot be used to deserialize borrowed types (i.e. those that borrow from the frame serialized contents). In order to give users both capabilities: 1) deserializing borrowed types (for efficiency), 2) deserializing using Stream (for convienience), two distinct types are used: TypedRowIterator and TypedRowStream. The first supports borrowed types and the second implements Stream. To sum up, instead of `RowIterator` (returning `Row`s) and `TypedRowIterator` (returning instances of the target type) both implementing `Stream`, now we have the following: - `RawIterator` - cannot implement `Stream`, because returns `ColumnIterator`s that borrow from it, - provide `type_check()` and `next()` methods that can be used for low-level, manual deserialization (not recommended for ordinary users) - supports deserializing manually borrowed types (such as `&str`). - `TypedRowIterator` - created by calling `into_typed::()` on `RawIterator`, - type checks upon creation, - supports deserializing borrowed types (such as `&str`), - does not implement `Stream` in order to support borrowed types, - provides basic Stream-like methods (`next()`, `try_next()`), - `TypedRowStream` - created by calling `into_stream()` on `TypedRowIterator`, - implements `Stream` and hence does not support borrowed types. Co-authored-by: Piotr Dulikowski --- scylla/src/transport/iterator.rs | 133 ++++++++++++++++++++++++++++++- 1 file changed, 132 insertions(+), 1 deletion(-) diff --git a/scylla/src/transport/iterator.rs b/scylla/src/transport/iterator.rs index 37625ac03..fa440e4ba 100644 --- a/scylla/src/transport/iterator.rs +++ b/scylla/src/transport/iterator.rs @@ -13,6 +13,7 @@ use scylla_cql::frame::response::result::RawMetadataAndRawRows; use scylla_cql::frame::response::NonErrorResponse; use scylla_cql::types::deserialize::result::RawRowsLendingIterator; use scylla_cql::types::deserialize::row::{ColumnIterator, DeserializeRow}; +use scylla_cql::types::deserialize::TypeCheckError; use scylla_cql::types::serialize::row::SerializedValues; use std::result::Result; use thiserror::Error; @@ -540,7 +541,10 @@ where /// An intermediate object that allows to construct an iterator over a query /// that is asynchronously paged in the background. /// -/// TODO: implement and describe the new API +/// Before the results can be processed in a convenient way, the RawIterator +/// needs to be cast into a typed iterator. This is done by use of `into_typed()` method. +/// As the method is generic over the target type, the turbofish syntax +/// can come in handy there, e.g. `raw_iter.into_typed::<(i32, &str, Uuid)>()`. /// /// A pre-0.15.0 interface is also available, although deprecated: /// `into_legacy()` method converts RawIterator to LegacyRowIterator, @@ -652,6 +656,30 @@ impl RawIterator { Poll::Ready(Some(Ok(()))) } + /// Type-checks the iterator against given type. + /// + /// This is automatically called upon transforming [RawIterator] into [TypedRowIterator]. + /// Can be used with `next()` for manual deserialization. See `next()` for an example. + #[inline] + pub fn type_check<'frame, 'metadata, RowT: DeserializeRow<'frame, 'metadata>>( + &self, + ) -> Result<(), TypeCheckError> { + RowT::type_check(self.column_specs().inner()) + } + + /// Casts the iterator to a given row type, enabling Stream'ed operations + /// on rows, which deserialize them in-fly to that given type. + /// Begins with performing type check. + #[inline] + pub fn into_typed<'frame, 'metadata, RowT: DeserializeRow<'frame, 'metadata>>( + self, + ) -> Result, TypeCheckError> + where + 'frame: 'metadata, + { + TypedRowIterator::::new(self) + } + /// Converts this iterator into an iterator over rows parsed as given type, /// using the legacy deserialization framework. #[inline] @@ -961,6 +989,109 @@ impl RawIterator { } } +/// Returned by [RawIterator::into_typed]. +/// +/// Does not implement [Stream], but permits deserialization of borrowed types. +/// To use [Stream] API (only accessible for owned types), use [TypedRowIterator::into_stream]. +pub struct TypedRowIterator { + raw_iterator: RawIterator, + _phantom: std::marker::PhantomData, +} + +impl Unpin for TypedRowIterator {} + +impl<'frame, 'metadata, RowT> TypedRowIterator +where + 'frame: 'metadata, + RowT: DeserializeRow<'frame, 'metadata>, +{ + fn new(raw_iterator: RawIterator) -> Result { + raw_iterator.type_check::()?; + + Ok(Self { + raw_iterator, + _phantom: Default::default(), + }) + } + + /// If tracing was enabled, returns tracing ids of all finished page queries. + #[inline] + pub fn tracing_ids(&self) -> &[Uuid] { + self.raw_iterator.tracing_ids() + } + + /// Returns specification of row columns + #[inline] + pub fn column_specs(&self) -> ColumnSpecs { + self.raw_iterator.column_specs() + } + + /// Stream-like next() implementation for TypedRowIterator. + /// + /// It also works with borrowed types! For example, &str is supported. + /// However, this is not a Stream. To create a Stream, use `into_stream()`. + #[inline] + pub async fn next(&'frame mut self) -> Option> { + self.raw_iterator.next().await.map(|res| { + res.and_then(|column_iterator| { + ::deserialize(column_iterator) + .map_err(|err| RowsParseError::from(err).into()) + }) + }) + } + + /// Stream-like try_next() implementation for TypedRowIterator. + /// + /// It also works with borrowed types! For example, &str is supported. + /// However, this is not a Stream. To create a Stream, use `into_stream()`. + #[inline] + pub async fn try_next(&'frame mut self) -> Result, QueryError> { + self.next().await.transpose() + } +} + +impl TypedRowIterator { + /// Transforms [TypedRowIterator] into [TypedRowStream]. + /// + /// If you deserialize to owned types only, use this method to unleash power of the `Stream` API. + /// This operation involves no runtime cost, but it limits the iterator to owned types only. + /// Therefore, if you want to work with borrowed types (e.g., to avoid heap allocations), + /// you can't use the `Stream` trait. + pub fn into_stream(self) -> TypedRowStream { + TypedRowStream { + typed_row_iterator: self, + } + } +} + +/// Returned by [TypedRowIterator::into_stream]. +/// +/// Implements [Stream], but only permits deserialization of owned types. +pub struct TypedRowStream { + typed_row_iterator: TypedRowIterator, +} + +impl Unpin for TypedRowStream {} + +/// Stream implementation for TypedRowStream. +/// +/// It only works with owned types! For example, &str is not supported. +impl Stream for TypedRowStream +where + RowT: for<'r> DeserializeRow<'r, 'r>, +{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut s = self.as_mut(); + + let next_fut = s.typed_row_iterator.next(); + futures::pin_mut!(next_fut); + let value = ready_some_ok!(next_fut.poll(cx)); + Poll::Ready(Some(Ok(value))) + } +} + /// Iterator over rows returned by paged queries. /// /// Allows to easily access rows without worrying about handling multiple pages.