diff --git a/docker-compose.yml b/docker-compose.yml index f72efeb9..94c80b97 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -17,10 +17,11 @@ services: - ./tests/data:/test-data entrypoint: > /bin/sh -c " /usr/bin/mc config host add test-minio http://minio:9000 minioadmin minioadmin; - /usr/bin/mc rm -r --force test-minio/seafowl-test-bucket; /usr/bin/mc mb - test-minio/seafowl-test-bucket; /usr/bin/mc cp test-data/table_with_ns_column.parquet - test-minio/seafowl-test-bucket/table_with_ns_column.parquet; /usr/bin/mc anonymous set public - test-minio/seafowl-test-bucket/table_with_ns_column.parquet; exit 0; " + /usr/bin/mc rm -r --force test-minio/seafowl-test-bucket; + /usr/bin/mc mb test-minio/seafowl-test-bucket; /usr/bin/mc cp + test-data/table_with_ns_column.parquet + test-minio/seafowl-test-bucket/table_with_ns_column.parquet; /usr/bin/mc anonymous set public + test-minio/seafowl-test-bucket/table_with_ns_column.parquet; exit 0; " fake-gcs: image: tustvold/fake-gcs-server diff --git a/src/catalog.rs b/src/catalog.rs index d15ae551..f8262e6b 100644 --- a/src/catalog.rs +++ b/src/catalog.rs @@ -1,6 +1,7 @@ use std::str::FromStr; use std::{collections::HashMap, fmt::Debug, sync::Arc}; +use arrow_schema::Schema; use async_trait::async_trait; use datafusion::catalog::schema::MemorySchemaProvider; use datafusion::datasource::TableProvider; @@ -27,7 +28,6 @@ use crate::{ AllDatabaseColumnsResult, AllDatabaseFunctionsResult, Error as RepositoryError, Repository, TableVersionsResult, }, - schema::Schema, }; pub const DEFAULT_DB: &str = "default"; diff --git a/src/context.rs b/src/context.rs index 264aeec0..01d99ffa 100644 --- a/src/context.rs +++ b/src/context.rs @@ -17,7 +17,7 @@ use std::fs::File; use datafusion::execution::context::SessionState; -use crate::datafusion::parser::{DFParser, Statement as DFStatement}; +use crate::datafusion::parser::{DFParser, Statement as DFStatement, CONVERT_TO_DELTA}; use crate::datafusion::utils::build_schema; use crate::object_store::http::try_prepare_http_url; use crate::object_store::wrapped::InternalObjectStore; @@ -76,6 +76,7 @@ use datafusion_expr::logical_plan::{ }; use datafusion_expr::{DdlStatement, DmlStatement, Filter, WriteOp}; use deltalake::kernel::{Action, Add, Remove, Schema as DeltaSchema}; +use deltalake::operations::convert_to_delta::ConvertToDeltaBuilder; use deltalake::operations::create::CreateBuilder; use deltalake::operations::transaction::commit; use deltalake::operations::vacuum::VacuumBuilder; @@ -96,6 +97,7 @@ use crate::config::schema::{GCS, S3}; use crate::delta_rs::backports::parquet_scan_from_actions; #[cfg(test)] use crate::frontend::http::tests::deterministic_uuid; +use crate::nodes::ConvertTable; use crate::provider::project_expressions; use crate::utils::gc_databases; use crate::wasm_udf::data_types::{get_volatility, CreateFunctionDetails}; @@ -106,7 +108,6 @@ use crate::{ CreateFunction, CreateTable, DropFunction, DropSchema, RenameTable, SeafowlExtensionNode, Vacuum, }, - schema::Schema as SeafowlSchema, version::TableVersionProcessor, }; @@ -389,6 +390,11 @@ pub fn is_statement_read_only(statement: &DFStatement) -> bool { } } +enum CreateDeltaTableDetails { + WithSchema(Schema), + FromFiles(Path), +} + // The only reason to keep this trait around (instead of migrating all the functions directly into // DefaultSeafowlContext), is that `create_physical_plan` would then be a recursive async function, // which works for traits, but not for structs: https://stackoverflow.com/a/74737853 @@ -679,16 +685,13 @@ impl DefaultSeafowlContext { async fn create_delta_table<'a>( &self, name: impl Into>, - schema: &Schema, + details: CreateDeltaTableDetails, ) -> Result> { let table_ref: TableReference = name.into(); let resolved_ref = table_ref.resolve(&self.database, DEFAULT_SCHEMA); let schema_name = resolved_ref.schema.clone(); let table_name = resolved_ref.table.clone(); - let sf_schema = SeafowlSchema { - arrow_schema: Arc::new(schema.clone()), - }; let collection_id = self .table_catalog .get_collection_id_by_name(&self.database, &schema_name) @@ -697,8 +700,6 @@ impl DefaultSeafowlContext { Error::Plan(format!("Schema {schema_name:?} does not exist!")) })?; - let delta_schema = DeltaSchema::try_from(schema)?; - // TODO: we could be doing this inside the DB itself (i.e. `... DEFAULT gen_random_uuid()` // in Postgres and `... DEFAULT (uuid())` in SQLite) however we won't be able to do it until // sqlx 0.7 is released (which has libsqlite3-sys > 0.25, with the SQLite version that has @@ -708,27 +709,53 @@ impl DefaultSeafowlContext { // On the other hand that would complicate etag testing logic. let table_uuid = get_uuid(); let table_log_store = self.internal_object_store.get_log_store(table_uuid); + let table_prefix = self.internal_object_store.table_prefix(table_uuid); // NB: there's also a uuid generated below for table's `DeltaTableMetaData::id`, so it would - // be nice if those two could match - let table = Arc::new( - CreateBuilder::new() - .with_log_store(table_log_store) - .with_table_name(&*table_name) - .with_columns(delta_schema.fields().clone()) - .with_comment(format!( - "Created by Seafowl version {}", - env!("CARGO_PKG_VERSION") - )) - .await?, - ); + // be nice if those two could match somehow + let table = Arc::new(match details { + CreateDeltaTableDetails::WithSchema(schema) => { + let delta_schema = DeltaSchema::try_from(&schema)?; + + CreateBuilder::new() + .with_log_store(table_log_store) + .with_table_name(&*table_name) + .with_columns(delta_schema.fields().clone()) + .with_comment(format!( + "Created by Seafowl {}", + env!("CARGO_PKG_VERSION") + )) + .await? + } + CreateDeltaTableDetails::FromFiles(path) => { + // Copy the files over to the table root location + self.internal_object_store + .copy_in_prefix(&path, &table_prefix) + .await?; + + // Now convert them to a Delta table + ConvertToDeltaBuilder::new() + .with_log_store(table_log_store) + .with_table_name(&*table_name) + .with_comment(format!( + "Converted by Seafowl {}", + env!("CARGO_PKG_VERSION") + )) + .await? + } + }); // We still persist the table into our own catalog, one reason is us being able to load all // tables and their schemas in bulk to satisfy information_schema queries. // Another is to keep track of table uuid's, which are used to construct the table uri. // We may look into doing this via delta-rs somehow eventually. self.table_catalog - .create_table(collection_id, &table_name, &sf_schema, table_uuid) + .create_table( + collection_id, + &table_name, + TableProvider::schema(table.as_ref()).as_ref(), + table_uuid, + ) .await?; self.inner.register_table(resolved_ref, table.clone())?; @@ -951,8 +978,11 @@ impl DefaultSeafowlContext { }; if !table_exists { - self.create_delta_table(table_ref.clone(), plan.schema().as_ref()) - .await?; + self.create_delta_table( + table_ref.clone(), + CreateDeltaTableDetails::WithSchema(plan.schema().as_ref().clone()), + ) + .await?; } self.plan_to_delta_table(table_ref, &plan).await @@ -1163,7 +1193,11 @@ impl SeafowlContext for DefaultSeafowlContext { "Unsupported SQL statement: {s:?}" ))), }, - DFStatement::CopyTo(CopyToStatement { ref mut source, .. }) => { + DFStatement::CopyTo(CopyToStatement { + ref mut source, + options, + .. + }) if !options.contains(&CONVERT_TO_DELTA) => { let state = if let CopyToSource::Query(ref mut query) = source { self.rewrite_time_travel_query(query).await? } else { @@ -1171,12 +1205,27 @@ impl SeafowlContext for DefaultSeafowlContext { }; state.statement_to_plan(stmt).await } + DFStatement::CopyTo(CopyToStatement { + source: CopyToSource::Relation(table_name), + target, + options, + }) if options.contains(&CONVERT_TO_DELTA) => { + Ok(LogicalPlan::Extension(Extension { + node: Arc::new(SeafowlExtensionNode::ConvertTable(ConvertTable { + location: target.clone(), + name: table_name.to_string(), + output_schema: Arc::new(DFSchema::empty()), + })), + })) + } DFStatement::DescribeTableStmt(_) | DFStatement::CreateExternalTable(_) => { self.inner.state().statement_to_plan(stmt).await } - DFStatement::Explain(_) => Err(Error::NotImplemented(format!( - "Unsupported SQL statement: {statement:?}" - ))), + DFStatement::CopyTo(_) | DFStatement::Explain(_) => { + Err(Error::NotImplemented(format!( + "Unsupported SQL statement: {statement:?}" + ))) + } } } @@ -1355,8 +1404,11 @@ impl SeafowlContext for DefaultSeafowlContext { // TODO: this means we'll have 2 table versions at the end, 1st from the create // and 2nd from the insert, while it seems more reasonable that in this case we have // only one - self.create_delta_table(name, plan.schema().as_ref()) - .await?; + self.create_delta_table( + name, + CreateDeltaTableDetails::WithSchema(plan.schema().as_ref().clone()), + ) + .await?; self.plan_to_delta_table(name, &plan).await?; Ok(make_dummy_exec()) @@ -1675,12 +1727,31 @@ impl SeafowlContext for DefaultSeafowlContext { // Other custom nodes we made like CREATE TABLE/INSERT/ALTER match SeafowlExtensionNode::from_dynamic(node) { Some(sfe_node) => match sfe_node { + SeafowlExtensionNode::ConvertTable(ConvertTable { + location, + name, + .. + }) => { + self.create_delta_table( + name, + CreateDeltaTableDetails::FromFiles(Path::from( + location.as_str(), + )), + ) + .await?; + + Ok(make_dummy_exec()) + } SeafowlExtensionNode::CreateTable(CreateTable { schema, name, .. }) => { - self.create_delta_table(name.as_str(), schema).await?; + self.create_delta_table( + name.as_str(), + CreateDeltaTableDetails::WithSchema(schema.clone()), + ) + .await?; Ok(make_dummy_exec()) } diff --git a/src/datafusion/parser.rs b/src/datafusion/parser.rs index 71c0459c..18cd04cb 100644 --- a/src/datafusion/parser.rs +++ b/src/datafusion/parser.rs @@ -30,6 +30,7 @@ use datafusion::sql::parser::{ CopyToSource, CopyToStatement, CreateExternalTable, DescribeTableStmt, }; use datafusion_common::parsers::CompressionTypeVariant; +use lazy_static::lazy_static; use sqlparser::ast::{CreateFunctionBody, Expr, ObjectName, OrderByExpr, Value}; use sqlparser::tokenizer::{TokenWithLocation, Word}; use sqlparser::{ @@ -41,7 +42,6 @@ use sqlparser::{ use std::collections::{HashMap, VecDeque}; use std::str::FromStr; use std::string::ToString; -use strum_macros::Display; // Use `Parser::expected` instead, if possible macro_rules! parser_err { @@ -63,10 +63,12 @@ pub struct DFParser<'a> { parser: Parser<'a>, } -#[derive(Debug, Clone, Display)] -#[strum(serialize_all = "UPPERCASE")] -enum KeywordExtensions { - Vacuum, +// Hacky way to distinguish `COPY TO` statement from `CONVERT TO DELTA`. +// We should really introduce our own Statement enum which encapsulates +// the DataFusion one and adds our custom variants (this one and `VACUUM`). +lazy_static! { + pub static ref CONVERT_TO_DELTA: (String, Value) = + ("CONVERT_TO_DELTA".to_string(), Value::Boolean(true)); } impl<'a> DFParser<'a> { @@ -136,39 +138,25 @@ impl<'a> DFParser<'a> { pub fn parse_statement(&mut self) -> Result { match self.parser.peek_token().token { Token::Word(w) => { - match w { - Word { - keyword: Keyword::CREATE, - .. - } => { - // move one token forward + match w.keyword { + Keyword::CREATE => { self.parser.next_token(); - // use custom parsing self.parse_create() } - Word { - keyword: Keyword::COPY, - .. - } => { + Keyword::CONVERT => { + self.parser.next_token(); + self.parse_convert() + } + Keyword::COPY => { self.parser.next_token(); self.parse_copy() } - Word { - keyword: Keyword::DESCRIBE, - .. - } => { - // move one token forward + Keyword::DESCRIBE => { self.parser.next_token(); - // use custom parsing self.parse_describe() } - Word { value, .. } - if value.to_uppercase() - == KeywordExtensions::Vacuum.to_string() => - { - // move one token forward + Keyword::VACUUM => { self.parser.next_token(); - // use custom parsing self.parse_vacuum() } _ => { @@ -195,6 +183,22 @@ impl<'a> DFParser<'a> { })) } + // Parse `CONVERT location TO DELTA table_name` type statement + pub fn parse_convert(&mut self) -> Result { + let location = self.parser.parse_literal_string()?; + self.parser + .expect_keywords(&[Keyword::TO, Keyword::DELTA])?; + let table_name = self.parser.parse_object_name()?; + + // We'll use the CopyToStatement struct to pass the location and table name + // as it's the closest match to what we need. + Ok(Statement::CopyTo(CopyToStatement { + source: CopyToSource::Relation(table_name), + target: location, + options: vec![CONVERT_TO_DELTA.clone()], + })) + } + pub fn parse_vacuum(&mut self) -> Result { // Since `VACUUM` is not a supported keyword by sqlparser, we abuse the semantically related // TRUNCATE to smuggle the info on whether we want GC of tables, partitions or the DB itself. diff --git a/src/nodes.rs b/src/nodes.rs index c5edead5..19e00c7a 100644 --- a/src/nodes.rs +++ b/src/nodes.rs @@ -8,6 +8,16 @@ use crate::wasm_udf::data_types::CreateFunctionDetails; use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNode}; use strum_macros::AsRefStr; +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +pub struct ConvertTable { + /// Location from which to convert + pub location: String, + /// Name of the table to convert to + pub name: String, + /// Dummy result schema for the plan (empty) + pub output_schema: DFSchemaRef, +} + #[derive(Debug, Clone, Hash, PartialEq, Eq)] pub struct CreateTable { /// The table schema @@ -69,6 +79,7 @@ pub struct Vacuum { #[derive(AsRefStr, Debug, Clone, Hash, PartialEq, Eq)] pub enum SeafowlExtensionNode { + ConvertTable(ConvertTable), CreateTable(CreateTable), CreateFunction(CreateFunction), DropFunction(DropFunction), @@ -102,6 +113,9 @@ impl UserDefinedLogicalNode for SeafowlExtensionNode { // (& means it has to have been borrowed and we can't own anything, since this // function will exit soon) match self { + SeafowlExtensionNode::ConvertTable(ConvertTable { + output_schema, .. + }) => output_schema, SeafowlExtensionNode::CreateTable(CreateTable { output_schema, .. }) => { output_schema } @@ -131,6 +145,11 @@ impl UserDefinedLogicalNode for SeafowlExtensionNode { fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { + SeafowlExtensionNode::ConvertTable(ConvertTable { + location, name, .. + }) => { + write!(f, "Convert: {location} to {name}") + } SeafowlExtensionNode::CreateTable(CreateTable { name, .. }) => { write!(f, "Create: {name}") } diff --git a/src/object_store/wrapped.rs b/src/object_store/wrapped.rs index b7d01621..9673722a 100644 --- a/src/object_store/wrapped.rs +++ b/src/object_store/wrapped.rs @@ -129,6 +129,15 @@ impl InternalObjectStore { Ok(()) } + /// Copy all objects under a given path to a new path + pub async fn copy_in_prefix(&self, from: &Path, to: &Path) -> Result<(), Error> { + let list_result = self.inner.list_with_delimiter(Some(from)).await?; + for object in list_result.objects { + self.inner.copy(&object.location, to).await?; + } + Ok(()) + } + /// For local filesystem object stores, try "uploading" by just moving the file. /// Returns a None if the store isn't local. pub async fn fast_upload( diff --git a/src/repository/default.rs b/src/repository/default.rs index d01e931d..794b0f6a 100644 --- a/src/repository/default.rs +++ b/src/repository/default.rs @@ -239,10 +239,16 @@ impl Repository for $repo { // Create columns // TODO this breaks if we have more than (bind limit) columns - if !schema.arrow_schema.fields().is_empty() { + if !schema.fields().is_empty() { let mut builder: QueryBuilder<_> = QueryBuilder::new("INSERT INTO table_column(table_version_id, name, type) "); - builder.push_values(schema.to_column_names_types(), |mut b, col| { + + let fields: Vec<(String, String)> = schema.fields() + .iter() + .map(|f| (f.name().clone(), field_to_json(f).to_string())) + .collect(); + + builder.push_values(fields, |mut b, col| { b.push_bind(new_version_id) .push_bind(col.0) .push_bind(col.1); diff --git a/src/repository/interface.rs b/src/repository/interface.rs index 33f9d3fc..ce846402 100644 --- a/src/repository/interface.rs +++ b/src/repository/interface.rs @@ -1,18 +1,16 @@ use std::fmt::Debug; use std::str::FromStr; +use arrow_schema::Schema; use async_trait::async_trait; use strum::ParseError; use strum_macros::{Display, EnumString}; use uuid::Uuid; -use crate::wasm_udf::data_types::CreateFunctionDetails; -use crate::{ - data_types::{ - CollectionId, DatabaseId, FunctionId, TableId, TableVersionId, Timestamp, - }, - schema::Schema, +use crate::data_types::{ + CollectionId, DatabaseId, FunctionId, TableId, TableVersionId, Timestamp, }; +use crate::wasm_udf::data_types::CreateFunctionDetails; #[derive(sqlx::FromRow, Default, Debug, PartialEq, Eq)] pub struct AllDatabaseColumnsResult { @@ -232,14 +230,11 @@ pub mod tests { .create_collection(database_id, DEFAULT_SCHEMA) .await .expect("Error creating default schema"); - let empty_schema = Schema { - arrow_schema: Arc::new(ArrowSchema::empty()), - }; repository .create_table( default_schema_id, "empty_table", - &empty_schema, + &ArrowSchema::empty(), Uuid::default(), ) .await @@ -254,12 +249,9 @@ pub mod tests { ArrowField::new("date", ArrowDataType::Date64, false), ArrowField::new("value", ArrowDataType::Float64, false), ]); - let schema = Schema { - arrow_schema: Arc::new(arrow_schema), - }; let (table_id, table_version_id) = repository - .create_table(collection_id, "testtable", &schema, Uuid::default()) + .create_table(collection_id, "testtable", &arrow_schema, Uuid::default()) .await .expect("Error creating table"); @@ -564,10 +556,6 @@ pub mod tests { )); // Make a new table in the existing collection with the same name - let schema = Schema { - arrow_schema: Arc::new(ArrowSchema::empty()), - }; - let collection_id_1 = repository .get_collection_id_by_name("testdb", "testcol") .await @@ -579,7 +567,12 @@ pub mod tests { assert!(matches!( repository - .create_table(collection_id_2, "testtable2", &schema, Uuid::default()) + .create_table( + collection_id_2, + "testtable2", + &ArrowSchema::empty(), + Uuid::default() + ) .await .unwrap_err(), Error::UniqueConstraintViolation(_) @@ -587,7 +580,12 @@ pub mod tests { // Make a new table in the previous collection, try renaming let (new_table_id, _) = repository - .create_table(collection_id_1, "testtable2", &schema, Uuid::default()) + .create_table( + collection_id_1, + "testtable2", + &ArrowSchema::empty(), + Uuid::default(), + ) .await .unwrap(); diff --git a/src/repository/postgres.rs b/src/repository/postgres.rs index 1f9adbe0..3dede0d0 100644 --- a/src/repository/postgres.rs +++ b/src/repository/postgres.rs @@ -1,5 +1,7 @@ use std::{fmt::Debug, time::Duration}; +use arrow_integration_test::field_to_json; +use arrow_schema::Schema; use async_trait::async_trait; use futures::TryStreamExt; use sqlx::{ @@ -12,7 +14,6 @@ use uuid::Uuid; use crate::{ data_types::{CollectionId, DatabaseId, FunctionId, TableId, TableVersionId}, implement_repository, - schema::Schema, wasm_udf::data_types::CreateFunctionDetails, }; diff --git a/src/repository/sqlite.rs b/src/repository/sqlite.rs index a2738e90..8645d223 100644 --- a/src/repository/sqlite.rs +++ b/src/repository/sqlite.rs @@ -1,5 +1,7 @@ use std::{fmt::Debug, str::FromStr}; +use arrow_integration_test::field_to_json; +use arrow_schema::Schema; use async_trait::async_trait; use futures::TryStreamExt; use sqlx::sqlite::SqliteJournalMode; @@ -12,7 +14,6 @@ use uuid::Uuid; use crate::{ data_types::{CollectionId, DatabaseId, FunctionId, TableId, TableVersionId}, - schema::Schema, wasm_udf::data_types::CreateFunctionDetails, };