Skip to content

Commit

Permalink
Convert-to-delta initial implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Nov 27, 2023
1 parent c28c067 commit 9d27680
Show file tree
Hide file tree
Showing 10 changed files with 197 additions and 87 deletions.
9 changes: 5 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ services:
- ./tests/data:/test-data
entrypoint: >
/bin/sh -c " /usr/bin/mc config host add test-minio http://minio:9000 minioadmin minioadmin;
/usr/bin/mc rm -r --force test-minio/seafowl-test-bucket; /usr/bin/mc mb
test-minio/seafowl-test-bucket; /usr/bin/mc cp test-data/table_with_ns_column.parquet
test-minio/seafowl-test-bucket/table_with_ns_column.parquet; /usr/bin/mc anonymous set public
test-minio/seafowl-test-bucket/table_with_ns_column.parquet; exit 0; "
/usr/bin/mc rm -r --force test-minio/seafowl-test-bucket;
/usr/bin/mc mb test-minio/seafowl-test-bucket; /usr/bin/mc cp
test-data/table_with_ns_column.parquet
test-minio/seafowl-test-bucket/table_with_ns_column.parquet; /usr/bin/mc anonymous set public
test-minio/seafowl-test-bucket/table_with_ns_column.parquet; exit 0; "
fake-gcs:
image: tustvold/fake-gcs-server
Expand Down
2 changes: 1 addition & 1 deletion src/catalog.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::str::FromStr;
use std::{collections::HashMap, fmt::Debug, sync::Arc};

use arrow_schema::Schema;
use async_trait::async_trait;
use datafusion::catalog::schema::MemorySchemaProvider;
use datafusion::datasource::TableProvider;
Expand All @@ -27,7 +28,6 @@ use crate::{
AllDatabaseColumnsResult, AllDatabaseFunctionsResult, Error as RepositoryError,
Repository, TableVersionsResult,
},
schema::Schema,
};

pub const DEFAULT_DB: &str = "default";
Expand Down
131 changes: 101 additions & 30 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::fs::File;

use datafusion::execution::context::SessionState;

use crate::datafusion::parser::{DFParser, Statement as DFStatement};
use crate::datafusion::parser::{DFParser, Statement as DFStatement, CONVERT_TO_DELTA};
use crate::datafusion::utils::build_schema;
use crate::object_store::http::try_prepare_http_url;
use crate::object_store::wrapped::InternalObjectStore;
Expand Down Expand Up @@ -76,6 +76,7 @@ use datafusion_expr::logical_plan::{
};
use datafusion_expr::{DdlStatement, DmlStatement, Filter, WriteOp};
use deltalake::kernel::{Action, Add, Remove, Schema as DeltaSchema};
use deltalake::operations::convert_to_delta::ConvertToDeltaBuilder;
use deltalake::operations::create::CreateBuilder;
use deltalake::operations::transaction::commit;
use deltalake::operations::vacuum::VacuumBuilder;
Expand All @@ -96,6 +97,7 @@ use crate::config::schema::{GCS, S3};
use crate::delta_rs::backports::parquet_scan_from_actions;
#[cfg(test)]
use crate::frontend::http::tests::deterministic_uuid;
use crate::nodes::ConvertTable;
use crate::provider::project_expressions;
use crate::utils::gc_databases;
use crate::wasm_udf::data_types::{get_volatility, CreateFunctionDetails};
Expand All @@ -106,7 +108,6 @@ use crate::{
CreateFunction, CreateTable, DropFunction, DropSchema, RenameTable,
SeafowlExtensionNode, Vacuum,
},
schema::Schema as SeafowlSchema,
version::TableVersionProcessor,
};

Expand Down Expand Up @@ -389,6 +390,11 @@ pub fn is_statement_read_only(statement: &DFStatement) -> bool {
}
}

enum CreateDeltaTableDetails {
WithSchema(Schema),
FromFiles(Path),
}

// The only reason to keep this trait around (instead of migrating all the functions directly into
// DefaultSeafowlContext), is that `create_physical_plan` would then be a recursive async function,
// which works for traits, but not for structs: https://stackoverflow.com/a/74737853
Expand Down Expand Up @@ -679,16 +685,13 @@ impl DefaultSeafowlContext {
async fn create_delta_table<'a>(
&self,
name: impl Into<TableReference<'a>>,
schema: &Schema,
details: CreateDeltaTableDetails,
) -> Result<Arc<DeltaTable>> {
let table_ref: TableReference = name.into();
let resolved_ref = table_ref.resolve(&self.database, DEFAULT_SCHEMA);
let schema_name = resolved_ref.schema.clone();
let table_name = resolved_ref.table.clone();

let sf_schema = SeafowlSchema {
arrow_schema: Arc::new(schema.clone()),
};
let collection_id = self
.table_catalog
.get_collection_id_by_name(&self.database, &schema_name)
Expand All @@ -697,8 +700,6 @@ impl DefaultSeafowlContext {
Error::Plan(format!("Schema {schema_name:?} does not exist!"))
})?;

let delta_schema = DeltaSchema::try_from(schema)?;

// TODO: we could be doing this inside the DB itself (i.e. `... DEFAULT gen_random_uuid()`
// in Postgres and `... DEFAULT (uuid())` in SQLite) however we won't be able to do it until
// sqlx 0.7 is released (which has libsqlite3-sys > 0.25, with the SQLite version that has
Expand All @@ -708,27 +709,53 @@ impl DefaultSeafowlContext {
// On the other hand that would complicate etag testing logic.
let table_uuid = get_uuid();
let table_log_store = self.internal_object_store.get_log_store(table_uuid);
let table_prefix = self.internal_object_store.table_prefix(table_uuid);

// NB: there's also a uuid generated below for table's `DeltaTableMetaData::id`, so it would
// be nice if those two could match
let table = Arc::new(
CreateBuilder::new()
.with_log_store(table_log_store)
.with_table_name(&*table_name)
.with_columns(delta_schema.fields().clone())
.with_comment(format!(
"Created by Seafowl version {}",
env!("CARGO_PKG_VERSION")
))
.await?,
);
// be nice if those two could match somehow
let table = Arc::new(match details {
CreateDeltaTableDetails::WithSchema(schema) => {
let delta_schema = DeltaSchema::try_from(&schema)?;

CreateBuilder::new()
.with_log_store(table_log_store)
.with_table_name(&*table_name)
.with_columns(delta_schema.fields().clone())
.with_comment(format!(
"Created by Seafowl {}",
env!("CARGO_PKG_VERSION")
))
.await?
}
CreateDeltaTableDetails::FromFiles(path) => {
// Copy the files over to the table root location
self.internal_object_store
.copy_in_prefix(&path, &table_prefix)
.await?;

// Now convert them to a Delta table
ConvertToDeltaBuilder::new()
.with_log_store(table_log_store)
.with_table_name(&*table_name)
.with_comment(format!(
"Converted by Seafowl {}",
env!("CARGO_PKG_VERSION")
))
.await?
}
});

// We still persist the table into our own catalog, one reason is us being able to load all
// tables and their schemas in bulk to satisfy information_schema queries.
// Another is to keep track of table uuid's, which are used to construct the table uri.
// We may look into doing this via delta-rs somehow eventually.
self.table_catalog
.create_table(collection_id, &table_name, &sf_schema, table_uuid)
.create_table(
collection_id,
&table_name,
TableProvider::schema(table.as_ref()).as_ref(),
table_uuid,
)
.await?;

self.inner.register_table(resolved_ref, table.clone())?;
Expand Down Expand Up @@ -951,8 +978,11 @@ impl DefaultSeafowlContext {
};

if !table_exists {
self.create_delta_table(table_ref.clone(), plan.schema().as_ref())
.await?;
self.create_delta_table(
table_ref.clone(),
CreateDeltaTableDetails::WithSchema(plan.schema().as_ref().clone()),
)
.await?;
}

self.plan_to_delta_table(table_ref, &plan).await
Expand Down Expand Up @@ -1163,20 +1193,39 @@ impl SeafowlContext for DefaultSeafowlContext {
"Unsupported SQL statement: {s:?}"
))),
},
DFStatement::CopyTo(CopyToStatement { ref mut source, .. }) => {
DFStatement::CopyTo(CopyToStatement {
ref mut source,
options,
..
}) if !options.contains(&CONVERT_TO_DELTA) => {
let state = if let CopyToSource::Query(ref mut query) = source {
self.rewrite_time_travel_query(query).await?
} else {
self.inner.state()
};
state.statement_to_plan(stmt).await
}
DFStatement::CopyTo(CopyToStatement {
source: CopyToSource::Relation(table_name),
target,
options,
}) if options.contains(&CONVERT_TO_DELTA) => {
Ok(LogicalPlan::Extension(Extension {
node: Arc::new(SeafowlExtensionNode::ConvertTable(ConvertTable {
location: target.clone(),
name: table_name.to_string(),
output_schema: Arc::new(DFSchema::empty()),
})),
}))
}
DFStatement::DescribeTableStmt(_) | DFStatement::CreateExternalTable(_) => {
self.inner.state().statement_to_plan(stmt).await
}
DFStatement::Explain(_) => Err(Error::NotImplemented(format!(
"Unsupported SQL statement: {statement:?}"
))),
DFStatement::CopyTo(_) | DFStatement::Explain(_) => {
Err(Error::NotImplemented(format!(
"Unsupported SQL statement: {statement:?}"
)))
}
}
}

Expand Down Expand Up @@ -1355,8 +1404,11 @@ impl SeafowlContext for DefaultSeafowlContext {
// TODO: this means we'll have 2 table versions at the end, 1st from the create
// and 2nd from the insert, while it seems more reasonable that in this case we have
// only one
self.create_delta_table(name, plan.schema().as_ref())
.await?;
self.create_delta_table(
name,
CreateDeltaTableDetails::WithSchema(plan.schema().as_ref().clone()),
)
.await?;
self.plan_to_delta_table(name, &plan).await?;

Ok(make_dummy_exec())
Expand Down Expand Up @@ -1675,12 +1727,31 @@ impl SeafowlContext for DefaultSeafowlContext {
// Other custom nodes we made like CREATE TABLE/INSERT/ALTER
match SeafowlExtensionNode::from_dynamic(node) {
Some(sfe_node) => match sfe_node {
SeafowlExtensionNode::ConvertTable(ConvertTable {
location,
name,
..
}) => {
self.create_delta_table(
name,
CreateDeltaTableDetails::FromFiles(Path::from(
location.as_str(),
)),
)
.await?;

Ok(make_dummy_exec())
}
SeafowlExtensionNode::CreateTable(CreateTable {
schema,
name,
..
}) => {
self.create_delta_table(name.as_str(), schema).await?;
self.create_delta_table(
name.as_str(),
CreateDeltaTableDetails::WithSchema(schema.clone()),
)
.await?;

Ok(make_dummy_exec())
}
Expand Down
60 changes: 32 additions & 28 deletions src/datafusion/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use datafusion::sql::parser::{
CopyToSource, CopyToStatement, CreateExternalTable, DescribeTableStmt,
};
use datafusion_common::parsers::CompressionTypeVariant;
use lazy_static::lazy_static;
use sqlparser::ast::{CreateFunctionBody, Expr, ObjectName, OrderByExpr, Value};
use sqlparser::tokenizer::{TokenWithLocation, Word};
use sqlparser::{
Expand All @@ -41,7 +42,6 @@ use sqlparser::{
use std::collections::{HashMap, VecDeque};
use std::str::FromStr;
use std::string::ToString;
use strum_macros::Display;

// Use `Parser::expected` instead, if possible
macro_rules! parser_err {
Expand All @@ -63,10 +63,12 @@ pub struct DFParser<'a> {
parser: Parser<'a>,
}

#[derive(Debug, Clone, Display)]
#[strum(serialize_all = "UPPERCASE")]
enum KeywordExtensions {
Vacuum,
// Hacky way to distinguish `COPY TO` statement from `CONVERT TO DELTA`.
// We should really introduce our own Statement enum which encapsulates
// the DataFusion one and adds our custom variants (this one and `VACUUM`).
lazy_static! {
pub static ref CONVERT_TO_DELTA: (String, Value) =
("CONVERT_TO_DELTA".to_string(), Value::Boolean(true));
}

impl<'a> DFParser<'a> {
Expand Down Expand Up @@ -136,39 +138,25 @@ impl<'a> DFParser<'a> {
pub fn parse_statement(&mut self) -> Result<Statement, ParserError> {
match self.parser.peek_token().token {
Token::Word(w) => {
match w {
Word {
keyword: Keyword::CREATE,
..
} => {
// move one token forward
match w.keyword {
Keyword::CREATE => {
self.parser.next_token();
// use custom parsing
self.parse_create()
}
Word {
keyword: Keyword::COPY,
..
} => {
Keyword::CONVERT => {
self.parser.next_token();
self.parse_convert()
}
Keyword::COPY => {
self.parser.next_token();
self.parse_copy()
}
Word {
keyword: Keyword::DESCRIBE,
..
} => {
// move one token forward
Keyword::DESCRIBE => {
self.parser.next_token();
// use custom parsing
self.parse_describe()
}
Word { value, .. }
if value.to_uppercase()
== KeywordExtensions::Vacuum.to_string() =>
{
// move one token forward
Keyword::VACUUM => {
self.parser.next_token();
// use custom parsing
self.parse_vacuum()
}
_ => {
Expand All @@ -195,6 +183,22 @@ impl<'a> DFParser<'a> {
}))
}

// Parse `CONVERT location TO DELTA table_name` type statement
pub fn parse_convert(&mut self) -> Result<Statement, ParserError> {
let location = self.parser.parse_literal_string()?;
self.parser
.expect_keywords(&[Keyword::TO, Keyword::DELTA])?;
let table_name = self.parser.parse_object_name()?;

// We'll use the CopyToStatement struct to pass the location and table name
// as it's the closest match to what we need.
Ok(Statement::CopyTo(CopyToStatement {
source: CopyToSource::Relation(table_name),
target: location,
options: vec![CONVERT_TO_DELTA.clone()],
}))
}

pub fn parse_vacuum(&mut self) -> Result<Statement, ParserError> {
// Since `VACUUM` is not a supported keyword by sqlparser, we abuse the semantically related
// TRUNCATE to smuggle the info on whether we want GC of tables, partitions or the DB itself.
Expand Down
Loading

0 comments on commit 9d27680

Please sign in to comment.