Skip to content

Commit

Permalink
Merge pull request #1057 from wprzytula/new-deserialization-api
Browse files Browse the repository at this point in the history
Introduce new deserialization API
  • Loading branch information
wprzytula authored Nov 12, 2024
2 parents dce9e9f + d4a222c commit e99d697
Show file tree
Hide file tree
Showing 39 changed files with 1,677 additions and 1,027 deletions.
3 changes: 2 additions & 1 deletion examples/allocations.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::Result;
use scylla::{statement::prepared_statement::PreparedStatement, Session, SessionBuilder};
use scylla::transport::session::Session;
use scylla::{statement::prepared_statement::PreparedStatement, SessionBuilder};
use std::io::Write;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
Expand Down
29 changes: 15 additions & 14 deletions examples/basic.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use anyhow::Result;
use futures::TryStreamExt;
use scylla::macros::FromRow;
use futures::StreamExt as _;
use futures::TryStreamExt as _;
use scylla::frame::response::result::Row;
use scylla::transport::session::Session;
use scylla::DeserializeRow;
use scylla::SessionBuilder;
use std::env;

Expand Down Expand Up @@ -53,39 +55,38 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT a, b, c FROM examples_ks.basic", &[])
.await?
.into_typed::<(i32, i32, String)>();
.rows_stream::<(i32, i32, String)>()?;
while let Some((a, b, c)) = iter.try_next().await? {
println!("a, b, c: {}, {}, {}", a, b, c);
}

// Or as custom structs that derive FromRow
#[derive(Debug, FromRow)]
// Or as custom structs that derive DeserializeRow
#[allow(unused)]
#[derive(Debug, DeserializeRow)]
struct RowData {
_a: i32,
_b: Option<i32>,
_c: String,
a: i32,
b: Option<i32>,
c: String,
}

let mut iter = session
.query_iter("SELECT a, b, c FROM examples_ks.basic", &[])
.await?
.into_typed::<RowData>();
.rows_stream::<RowData>()?;
while let Some(row_data) = iter.try_next().await? {
println!("row_data: {:?}", row_data);
}

// Or simply as untyped rows
let mut iter = session
.query_iter("SELECT a, b, c FROM examples_ks.basic", &[])
.await?;
while let Some(row) = iter.try_next().await? {
.await?
.rows_stream::<Row>()?;
while let Some(row) = iter.next().await.transpose()? {
let a = row.columns[0].as_ref().unwrap().as_int().unwrap();
let b = row.columns[1].as_ref().unwrap().as_int().unwrap();
let c = row.columns[2].as_ref().unwrap().as_text().unwrap();
println!("a, b, c: {}, {}, {}", a, b, c);

// Alternatively each row can be parsed individually
// let (a2, b2, c2) = row.into_typed::<(i32, i32, String)>() ?;
}

let metrics = session.get_metrics();
Expand Down
4 changes: 3 additions & 1 deletion examples/compare-tokens.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ async fn main() -> Result<()> {
(pk,),
)
.await?
.single_row_typed::<(i64,)>()?;
.into_rows_result()?
.expect("Got not Rows result")
.single_row()?;
assert_eq!(t, qt);
println!("token for {}: {}", pk, t);
}
Expand Down
20 changes: 10 additions & 10 deletions examples/cql-time-types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use anyhow::Result;
use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
use futures::{StreamExt, TryStreamExt};
use futures::{StreamExt as _, TryStreamExt as _};
use scylla::frame::response::result::CqlValue;
use scylla::frame::value::{CqlDate, CqlTime, CqlTimestamp};
use scylla::transport::session::Session;
Expand Down Expand Up @@ -44,7 +44,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.dates", &[])
.await?
.into_typed::<(NaiveDate,)>();
.rows_stream::<(NaiveDate,)>()?;
while let Some(row_result) = iter.next().await {
let (read_date,): (NaiveDate,) = match row_result {
Ok(read_date) => read_date,
Expand All @@ -66,7 +66,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.dates", &[])
.await?
.into_typed::<(time::Date,)>();
.rows_stream::<(time::Date,)>()?;
while let Some(row_result) = iter.next().await {
let (read_date,): (time::Date,) = match row_result {
Ok(read_date) => read_date,
Expand All @@ -88,7 +88,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.dates", &[])
.await?
.into_typed::<(CqlValue,)>();
.rows_stream::<(CqlValue,)>()?;
while let Some(row_result) = iter.next().await {
let read_days: u32 = match row_result {
Ok((CqlValue::Date(CqlDate(days)),)) => days,
Expand Down Expand Up @@ -124,7 +124,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.times", &[])
.await?
.into_typed::<(NaiveTime,)>();
.rows_stream::<(NaiveTime,)>()?;
while let Some((read_time,)) = iter.try_next().await? {
println!("Parsed a time into chrono::NaiveTime: {:?}", read_time);
}
Expand All @@ -139,7 +139,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.times", &[])
.await?
.into_typed::<(time::Time,)>();
.rows_stream::<(time::Time,)>()?;
while let Some((read_time,)) = iter.try_next().await? {
println!("Parsed a time into time::Time: {:?}", read_time);
}
Expand All @@ -154,7 +154,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.times", &[])
.await?
.into_typed::<(CqlTime,)>();
.rows_stream::<(CqlTime,)>()?;
while let Some((read_time,)) = iter.try_next().await? {
println!("Read a time as raw nanos: {:?}", read_time);
}
Expand Down Expand Up @@ -185,7 +185,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.timestamps", &[])
.await?
.into_typed::<(DateTime<Utc>,)>();
.rows_stream::<(DateTime<Utc>,)>()?;
while let Some((read_time,)) = iter.try_next().await? {
println!(
"Parsed a timestamp into chrono::DateTime<chrono::Utc>: {:?}",
Expand All @@ -206,7 +206,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.timestamps", &[])
.await?
.into_typed::<(time::OffsetDateTime,)>();
.rows_stream::<(time::OffsetDateTime,)>()?;
while let Some((read_time,)) = iter.try_next().await? {
println!(
"Parsed a timestamp into time::OffsetDateTime: {:?}",
Expand All @@ -227,7 +227,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.timestamps", &[])
.await?
.into_typed::<(CqlTimestamp,)>();
.rows_stream::<(CqlTimestamp,)>()?;
while let Some((read_time,)) = iter.try_next().await? {
println!("Read a timestamp as raw millis: {:?}", read_time);
}
Expand Down
43 changes: 25 additions & 18 deletions examples/cqlsh-rs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ use rustyline::completion::{Completer, Pair};
use rustyline::error::ReadlineError;
use rustyline::{CompletionType, Config, Context, Editor};
use rustyline_derive::{Helper, Highlighter, Hinter, Validator};
use scylla::frame::response::result::Row;
use scylla::transport::session::Session;
use scylla::transport::Compression;
use scylla::{LegacyQueryResult, Session, SessionBuilder};
use scylla::QueryRowsResult;
use scylla::SessionBuilder;
use std::env;

#[derive(Helper, Highlighter, Validator, Hinter)]
Expand Down Expand Up @@ -173,23 +176,24 @@ impl Completer for CqlHelper {
}
}

fn print_result(result: &LegacyQueryResult) {
if result.rows.is_none() {
println!("OK");
return;
}
for row in result.rows.as_ref().unwrap() {
for column in &row.columns {
print!("|");
print!(
" {:16}",
match column {
None => "null".to_owned(),
Some(value) => format!("{:?}", value),
}
);
fn print_result(result: Option<&QueryRowsResult>) {
if let Some(rows_result) = result {
for row in rows_result.rows::<Row>().unwrap() {
let row = row.unwrap();
for column in &row.columns {
print!("|");
print!(
" {:16}",
match column {
None => "null".to_owned(),
Some(value) => format!("{:?}", value),
}
);
}
println!("|")
}
println!("|")
} else {
println!("OK");
}
}

Expand Down Expand Up @@ -222,7 +226,10 @@ async fn main() -> Result<()> {
let maybe_res = session.query_unpaged(line, &[]).await;
match maybe_res {
Err(err) => println!("Error: {}", err),
Ok(res) => print_result(&res),
Ok(res) => {
let rows_res = res.into_rows_result()?;
print_result(rows_res.as_ref())
}
}
}
Err(ReadlineError::Interrupted) => continue,
Expand Down
63 changes: 24 additions & 39 deletions examples/custom_deserialization.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use anyhow::Result;
use scylla::cql_to_rust::{FromCqlVal, FromCqlValError};
use scylla::frame::response::result::CqlValue;
use scylla::macros::impl_from_cql_value_from_method;
use scylla::{Session, SessionBuilder};
use anyhow::{Context, Result};
use scylla::deserialize::DeserializeValue;
use scylla::frame::response::result::ColumnType;
use scylla::transport::session::Session;
use scylla::SessionBuilder;
use std::env;

#[tokio::main]
Expand All @@ -28,53 +28,38 @@ async fn main() -> Result<()> {
)
.await?;

// You can implement FromCqlVal for your own types
// You can implement DeserializeValue for your own types
#[derive(PartialEq, Eq, Debug)]
struct MyType(String);
struct MyType<'a>(&'a str);

impl FromCqlVal<CqlValue> for MyType {
fn from_cql(cql_val: CqlValue) -> Result<Self, FromCqlValError> {
Ok(Self(
cql_val.into_string().ok_or(FromCqlValError::BadCqlType)?,
))
impl<'frame, 'metadata> DeserializeValue<'frame, 'metadata> for MyType<'frame> {
fn type_check(
typ: &scylla::frame::response::result::ColumnType,
) -> std::result::Result<(), scylla::deserialize::TypeCheckError> {
<&str as DeserializeValue<'frame, 'metadata>>::type_check(typ)
}
}

let (v,) = session
.query_unpaged(
"SELECT v FROM examples_ks.custom_deserialization WHERE pk = 1",
(),
)
.await?
.single_row_typed::<(MyType,)>()?;
assert_eq!(v, MyType("asdf".to_owned()));

// If you defined an extension trait for CqlValue then you can use
// the `impl_from_cql_value_from_method` macro to turn it into
// a FromCqlValue impl
#[derive(PartialEq, Eq, Debug)]
struct MyOtherType(String);

trait CqlValueExt {
fn into_my_other_type(self) -> Option<MyOtherType>;
}
fn deserialize(
typ: &'metadata ColumnType<'metadata>,
v: Option<scylla::deserialize::FrameSlice<'frame>>,
) -> std::result::Result<Self, scylla::deserialize::DeserializationError> {
let s = <&str as DeserializeValue<'frame, 'metadata>>::deserialize(typ, v)?;

impl CqlValueExt for CqlValue {
fn into_my_other_type(self) -> Option<MyOtherType> {
Some(MyOtherType(self.into_string()?))
Ok(Self(s))
}
}

impl_from_cql_value_from_method!(MyOtherType, into_my_other_type);

let (v,) = session
let rows_result = session
.query_unpaged(
"SELECT v FROM examples_ks.custom_deserialization WHERE pk = 1",
(),
)
.await?
.single_row_typed::<(MyOtherType,)>()?;
assert_eq!(v, MyOtherType("asdf".to_owned()));
.into_rows_result()?
.context("Expected Result:Rows response, got a different Result response.")?;

let (v,) = rows_result.single_row::<(MyType,)>()?;
assert_eq!(v, MyType("asdf"));

println!("Ok.");

Expand Down
25 changes: 17 additions & 8 deletions examples/get_by_name.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::{anyhow, Result};
use anyhow::{anyhow, Context as _, Result};
use scylla::frame::response::result::Row;
use scylla::transport::session::Session;
use scylla::SessionBuilder;
use std::env;
Expand Down Expand Up @@ -35,18 +36,26 @@ async fn main() -> Result<()> {
)
.await?;

let query_result = session
let rows_result = session
.query_unpaged("SELECT pk, ck, value FROM examples_ks.get_by_name", &[])
.await?;
let (ck_idx, _) = query_result
.get_column_spec("ck")
.await?
.into_rows_result()?
.context("Response is not of Rows type")?;
let col_specs = rows_result.column_specs();
let (ck_idx, _) = col_specs
.get_by_name("ck")
.ok_or_else(|| anyhow!("No ck column found"))?;
let (value_idx, _) = query_result
.get_column_spec("value")
let (value_idx, _) = col_specs
.get_by_name("value")
.ok_or_else(|| anyhow!("No value column found"))?;
let rows = rows_result
.rows::<Row>()
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
println!("ck | value");
println!("---------------------");
for row in query_result.rows.ok_or_else(|| anyhow!("no rows found"))? {
for row in rows {
println!("{:?} | {:?}", row.columns[ck_idx], row.columns[value_idx]);
}

Expand Down
3 changes: 1 addition & 2 deletions examples/logging_log.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use anyhow::Result;
use scylla::transport::session::Session;
use scylla::SessionBuilder;
use scylla::{Session, SessionBuilder};
use std::env;
use tracing::info;

Expand Down
8 changes: 6 additions & 2 deletions examples/query_history.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
//! This example shows how to collect history of query execution.
use anyhow::Result;
use futures::StreamExt;
use futures::StreamExt as _;
use scylla::frame::response::result::Row;
use scylla::history::{HistoryCollector, StructuredHistory};
use scylla::query::Query;
use scylla::transport::session::Session;
Expand Down Expand Up @@ -59,7 +60,10 @@ async fn main() -> Result<()> {
let iter_history_listener = Arc::new(HistoryCollector::new());
iter_query.set_history_listener(iter_history_listener.clone());

let mut rows_iterator = session.query_iter(iter_query, ()).await?;
let mut rows_iterator = session
.query_iter(iter_query, ())
.await?
.rows_stream::<Row>()?;
while let Some(_row) = rows_iterator.next().await {
// Receive rows...
}
Expand Down
Loading

0 comments on commit e99d697

Please sign in to comment.