Skip to content

Commit

Permalink
Merge pull request #473 from splitgraph/convert-to-delta-stmt
Browse files Browse the repository at this point in the history
Convert to Delta table support
  • Loading branch information
gruuya authored Dec 1, 2023
2 parents 40b1158 + 2f7fb93 commit a6e4bec
Show file tree
Hide file tree
Showing 12 changed files with 282 additions and 97 deletions.
2 changes: 1 addition & 1 deletion src/catalog.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::str::FromStr;
use std::{collections::HashMap, fmt::Debug, sync::Arc};

use arrow_schema::Schema;
use async_trait::async_trait;
use datafusion::catalog::schema::MemorySchemaProvider;
use datafusion::datasource::TableProvider;
Expand All @@ -27,7 +28,6 @@ use crate::{
AllDatabaseColumnsResult, AllDatabaseFunctionsResult, Error as RepositoryError,
Repository, TableVersionsResult,
},
schema::Schema,
};

pub const DEFAULT_DB: &str = "default";
Expand Down
96 changes: 64 additions & 32 deletions src/context/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,24 @@ use crate::context::SeafowlContext;
#[cfg(test)]
use crate::frontend::http::tests::deterministic_uuid;
use crate::object_store::wrapped::InternalObjectStore;
use crate::schema::Schema as SeafowlSchema;

use bytes::BytesMut;
use datafusion::error::{DataFusionError as Error, Result};
use datafusion::execution::context::SessionState;
use datafusion::parquet::basic::{Compression, ZstdLevel};
use datafusion::{
arrow::datatypes::{Schema, SchemaRef},
datasource::TableProvider,
error::DataFusionError,
execution::context::TaskContext,
parquet::{arrow::ArrowWriter, file::properties::WriterProperties},
physical_plan::ExecutionPlan,
sql::TableReference,
};
use deltalake::kernel::{Action, Add, Schema as DeltaSchema};
use deltalake::operations::create::CreateBuilder;
use deltalake::operations::transaction::commit;
use deltalake::operations::{
convert_to_delta::ConvertToDeltaBuilder, create::CreateBuilder, transaction::commit,
};
use deltalake::protocol::{DeltaOperation, SaveMode};
use deltalake::writer::create_add;
use deltalake::DeltaTable;
Expand Down Expand Up @@ -272,20 +273,22 @@ pub async fn plan_to_object_store(
.collect()
}

pub(super) enum CreateDeltaTableDetails {
EmptyTable(Schema),
FromPath(Path),
}

impl SeafowlContext {
pub(super) async fn create_delta_table<'a>(
&self,
name: impl Into<TableReference<'a>>,
schema: &Schema,
details: CreateDeltaTableDetails,
) -> Result<Arc<DeltaTable>> {
let table_ref: TableReference = name.into();
let resolved_ref = table_ref.resolve(&self.database, DEFAULT_SCHEMA);
let schema_name = resolved_ref.schema.clone();
let table_name = resolved_ref.table.clone();

let sf_schema = SeafowlSchema {
arrow_schema: Arc::new(schema.clone()),
};
let collection_id = self
.table_catalog
.get_collection_id_by_name(&self.database, &schema_name)
Expand All @@ -294,40 +297,69 @@ impl SeafowlContext {
Error::Plan(format!("Schema {schema_name:?} does not exist!"))
})?;

let delta_schema = DeltaSchema::try_from(schema)?;

// TODO: we could be doing this inside the DB itself (i.e. `... DEFAULT gen_random_uuid()`
// in Postgres and `... DEFAULT (uuid())` in SQLite) however we won't be able to do it until
// sqlx 0.7 is released (which has libsqlite3-sys > 0.25, with the SQLite version that has
// the `uuid()` function).
// Then we could create the table in our catalog first and try to create the delta table itself
// with the returned uuid (and delete the catalog entry if the object store creation fails).
// On the other hand that would complicate etag testing logic.
let table_uuid = get_uuid();
let table_log_store = self.internal_object_store.get_log_store(table_uuid);

// NB: there's also a uuid generated below for table's `DeltaTableMetaData::id`, so it would
// be nice if those two could match
let table = Arc::new(
CreateBuilder::new()
.with_log_store(table_log_store)
.with_table_name(&*table_name)
.with_columns(delta_schema.fields().clone())
.with_comment(format!(
"Created by Seafowl version {}",
env!("CARGO_PKG_VERSION")
))
.await?,
);
// be nice if those two could match somehow
let (table_uuid, table) = match details {
CreateDeltaTableDetails::EmptyTable(schema) => {
// TODO: we could be doing this inside the DB itself (i.e. `... DEFAULT gen_random_uuid()`
// in Postgres and `... DEFAULT (uuid())` in SQLite) however we won't be able to do it until
// sqlx 0.7 is released (which has libsqlite3-sys > 0.25, with the SQLite version that has
// the `uuid()` function).
// Then we could create the table in our catalog first and try to create the delta table itself
// with the returned uuid (and delete the catalog entry if the object store creation fails).
// On the other hand that would complicate etag testing logic.
let table_uuid = get_uuid();
let table_log_store =
self.internal_object_store.get_log_store(table_uuid);
let delta_schema = DeltaSchema::try_from(&schema)?;

let table = CreateBuilder::new()
.with_log_store(table_log_store)
.with_table_name(&*table_name)
.with_columns(delta_schema.fields().clone())
.with_comment(format!(
"Created by Seafowl {}",
env!("CARGO_PKG_VERSION")
))
.await?;
(table_uuid, table)
}
CreateDeltaTableDetails::FromPath(path) => {
// For now interpret the path as containing only the final UUID table prefix,
// in accordance with Seafowl convention
let table_uuid = Uuid::try_parse(path.as_ref()).map_err(|e| {
DataFusionError::Execution(format!(
"Unable to parse the UUID path of the table: {e}"
))
})?;
let table_log_store =
self.internal_object_store.get_log_store(table_uuid);
let table = ConvertToDeltaBuilder::new()
.with_log_store(table_log_store)
.with_table_name(&*table_name)
.with_comment(format!(
"Converted by Seafowl {}",
env!("CARGO_PKG_VERSION")
))
.await?;
(table_uuid, table)
}
};

// We still persist the table into our own catalog, one reason is us being able to load all
// tables and their schemas in bulk to satisfy information_schema queries.
// Another is to keep track of table uuid's, which are used to construct the table uri.
// We may look into doing this via delta-rs somehow eventually.
self.table_catalog
.create_table(collection_id, &table_name, &sf_schema, table_uuid)
.create_table(
collection_id,
&table_name,
TableProvider::schema(&table).as_ref(),
table_uuid,
)
.await?;

let table = Arc::new(table);
self.inner.register_table(resolved_ref, table.clone())?;
debug!("Created new table {table}");
Ok(table)
Expand Down
31 changes: 25 additions & 6 deletions src/context/logical.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::catalog::DEFAULT_SCHEMA;
use crate::context::SeafowlContext;
use crate::datafusion::parser::{DFParser, Statement as DFStatement};
use crate::datafusion::parser::{DFParser, Statement as DFStatement, CONVERT_TO_DELTA};
use crate::datafusion::utils::build_schema;
use crate::wasm_udf::data_types::CreateFunctionDetails;
use crate::{
nodes::{
CreateFunction, CreateTable, DropFunction, DropSchema, RenameTable,
ConvertTable, CreateFunction, CreateTable, DropFunction, DropSchema, RenameTable,
SeafowlExtensionNode, Vacuum,
},
version::TableVersionProcessor,
Expand Down Expand Up @@ -267,20 +267,39 @@ impl SeafowlContext {
"Unsupported SQL statement: {s:?}"
))),
},
DFStatement::CopyTo(CopyToStatement { ref mut source, .. }) => {
DFStatement::CopyTo(CopyToStatement {
ref mut source,
options,
..
}) if !options.contains(&CONVERT_TO_DELTA) => {
let state = if let CopyToSource::Query(ref mut query) = source {
self.rewrite_time_travel_query(query).await?
} else {
self.inner.state()
};
state.statement_to_plan(stmt).await
}
DFStatement::CopyTo(CopyToStatement {
source: CopyToSource::Relation(table_name),
target,
options,
}) if options.contains(&CONVERT_TO_DELTA) => {
Ok(LogicalPlan::Extension(Extension {
node: Arc::new(SeafowlExtensionNode::ConvertTable(ConvertTable {
location: target.clone(),
name: table_name.to_string(),
output_schema: Arc::new(DFSchema::empty()),
})),
}))
}
DFStatement::DescribeTableStmt(_) | DFStatement::CreateExternalTable(_) => {
self.inner.state().statement_to_plan(stmt).await
}
DFStatement::Explain(_) => Err(Error::NotImplemented(format!(
"Unsupported SQL statement: {statement:?}"
))),
DFStatement::CopyTo(_) | DFStatement::Explain(_) => {
Err(Error::NotImplemented(format!(
"Unsupported SQL statement: {statement:?}"
)))
}
}
}

Expand Down
39 changes: 33 additions & 6 deletions src/context/physical.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::delta::CreateDeltaTableDetails;
use crate::catalog::{DEFAULT_SCHEMA, STAGING_SCHEMA};
use crate::config::context::build_object_store;
use crate::config::schema;
Expand All @@ -6,7 +7,7 @@ use crate::context::delta::plan_to_object_store;
use crate::context::SeafowlContext;
use crate::delta_rs::backports::parquet_scan_from_actions;
use crate::nodes::{
CreateFunction, CreateTable, DropFunction, DropSchema, RenameTable,
ConvertTable, CreateFunction, CreateTable, DropFunction, DropSchema, RenameTable,
SeafowlExtensionNode, Vacuum,
};
use crate::object_store::http::try_prepare_http_url;
Expand Down Expand Up @@ -49,6 +50,7 @@ use deltalake::operations::vacuum::VacuumBuilder;
use deltalake::protocol::{DeltaOperation, SaveMode};
use deltalake::DeltaTable;
use log::info;
use object_store::path::Path;
use std::borrow::Cow;
use std::ops::Deref;
use std::ops::Not;
Expand Down Expand Up @@ -226,8 +228,11 @@ impl SeafowlContext {
// TODO: this means we'll have 2 table versions at the end, 1st from the create
// and 2nd from the insert, while it seems more reasonable that in this case we have
// only one
self.create_delta_table(name, plan.schema().as_ref())
.await?;
self.create_delta_table(
name,
CreateDeltaTableDetails::EmptyTable(plan.schema().as_ref().clone()),
)
.await?;
self.plan_to_delta_table(name, &plan).await?;

Ok(make_dummy_exec())
Expand Down Expand Up @@ -544,12 +549,31 @@ impl SeafowlContext {
// Other custom nodes we made like CREATE TABLE/INSERT/ALTER
match SeafowlExtensionNode::from_dynamic(node) {
Some(sfe_node) => match sfe_node {
SeafowlExtensionNode::ConvertTable(ConvertTable {
location,
name,
..
}) => {
self.create_delta_table(
name,
CreateDeltaTableDetails::FromPath(Path::from(
location.as_str(),
)),
)
.await?;

Ok(make_dummy_exec())
}
SeafowlExtensionNode::CreateTable(CreateTable {
schema,
name,
..
}) => {
self.create_delta_table(name.as_str(), schema).await?;
self.create_delta_table(
name.as_str(),
CreateDeltaTableDetails::EmptyTable(schema.clone()),
)
.await?;

Ok(make_dummy_exec())
}
Expand Down Expand Up @@ -870,8 +894,11 @@ impl SeafowlContext {
};

if !table_exists {
self.create_delta_table(table_ref.clone(), plan.schema().as_ref())
.await?;
self.create_delta_table(
table_ref.clone(),
CreateDeltaTableDetails::EmptyTable(plan.schema().as_ref().clone()),
)
.await?;
}

self.plan_to_delta_table(table_ref, &plan).await
Expand Down
Loading

0 comments on commit a6e4bec

Please sign in to comment.