-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Convert to Delta table support #473
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(just some initial thoughts, not a review)
let fields: Vec<(String, String)> = schema.fields() | ||
.iter() | ||
.map(|f| (f.name().clone(), field_to_json(f).to_string())) | ||
.collect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait, it's already implemented in
Lines 37 to 43 in 3d428c9
pub fn to_column_names_types(&self) -> Vec<(String, String)> { | |
self.arrow_schema | |
.fields() | |
.iter() | |
.map(|f| (f.name().clone(), field_to_json(f).to_string())) | |
.collect() | |
} |
schema::Schema
and this both call if you want to not use the schema::Schema
wrapper struct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I started writing a comment, but then it got a bit out of hand, so I figured it warrants an issue on it's own: #475
src/context.rs
Outdated
self.internal_object_store | ||
.copy_in_prefix(&path, &table_prefix) | ||
.await?; | ||
|
||
// Now convert them to a Delta table | ||
ConvertToDeltaBuilder::new() | ||
.with_log_store(table_log_store) | ||
.with_table_name(&*table_name) | ||
.with_comment(format!( | ||
"Converted by Seafowl {}", | ||
env!("CARGO_PKG_VERSION") | ||
)) | ||
.await? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we do want to support completely-zero copy, but that means that either the source files have to be in a UUID-named directory inside of a prefix, or we have to track the full path for each table in the catalog (some intersection here with being able to persist external tables, including external Delta tables, probably an extension of #472 where we assume all tables have the same prefix), or we do a destructive convert-to-delta and move all Parquet files to a UUID-named directory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have to track the full path for each table in the catalog
Yeah, this makes the most sense to me going forward, but we'll need to think about the new catalog schema for that.
There's also a possibility of optionally outsourcing this info (completely or partially) to a 3rd party data catalog service, e.g. see here for how delta-rs envisions that: https://github.com/delta-io/delta-rs/blob/main/crates/deltalake-core/src/data_catalog/glue/mod.rs
For now though, I'm going to go with zero-copy, and assume the files are stored at exactly the right location (bucket + optional prefix + new table UUID).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense for now - note the Parquet writer has to know that it has to generate a UUID to put the table into.
c28c067
to
ae09ec3
Compare
9d27680
to
85b632e
Compare
d40eb18
to
8c24d32
Compare
8c24d32
to
d72f6e7
Compare
src/context/delta.rs
Outdated
@@ -272,20 +273,22 @@ pub async fn plan_to_object_store( | |||
.collect() | |||
} | |||
|
|||
pub(super) enum CreateDeltaTableDetails { | |||
WithSchema(Schema), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: Maaaaybe this should be called EmptyTable
instead of WithSchema
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I'm not too happy with the enum/variants name anyway.
// COPY some values multiple times to test converting flat table with more than one parquet file | ||
context | ||
.plan_query(&format!( | ||
"COPY (VALUES (1, 'one'), (2, 'two')) TO '{}/file_1.parquet'", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't realize we / DataFusion could do that 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, ever since #462
Convert a parquet table, as specified by a particular path, into a Delta table. Syntax closely follows that of Databricks.
Closes #469.