Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: read delta table using datafusion #2922

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

PeterKeDer
Copy link
Contributor

Description

This adds a new method datafusion_read to DeltaTable, which reads the delta table using datafusion and returns pyarrow batches. This supports reading with SQL predicates, e.g. dt.datafusion_read(predicate="id = 10 and value > 123").

The performance of reading delta tables from AWS S3 using datafusion is vastly superior than using pyarrow, while also being much more reliable. For example, in one of our internal tables on S3 with ~20000 partitions of up to 300 MB each (in memory)

dt = DeltaTable("s3://<omitted>")

# this returns one parquet file of size ~15 MB
files = dt.files(partition_filters=[("part_id", "=", "5956"), ("date", "=", "2024-10-03"), ("hour", "=", "18")])

# baseline of reading parquet directly: ~2 seconds
pl.read_parquet(files[0])

# default to_pyarrow_table: idk, I stopped it when it didn't even finish running after 2 minutes
dt.to_pyarrow_table(filters=[("part_id", "=", "5956"), ("date", "=", "2024-10-03"), ("hour", "=", "18")])

# read using datafusion: ~4 seconds
pa.Table.from_batches(dt.datafusion_read(predicate="part_id = 5956 and date = '2024-10-03' and hour = 18"))

Related Issue(s)

Similar issue reported in #631

@github-actions github-actions bot added the binding/python Issues for the Python package label Oct 4, 2024
Comment on lines 1280 to 1296
.unwrap()
.build()
.unwrap();

let mut df = DataFrame::new(state, plan);

if let Some(filter) = maybe_filter {
df = df.filter(filter).unwrap();
}

if let Some(columns) = columns {
df = df
.select_columns(&columns.iter().map(String::as_str).collect::<Vec<_>>())
.unwrap();
}

Ok(rt().block_on(async { df.collect().await }).unwrap())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should propagate the errors properly instead of unwraps here

Copy link

codecov bot commented Oct 5, 2024

Codecov Report

Attention: Patch coverage is 11.90476% with 37 lines in your changes missing coverage. Please review.

Project coverage is 72.19%. Comparing base (a999c92) to head (167cf6a).

Files with missing lines Patch % Lines
python/src/query.rs 0.00% 32 Missing ⚠️
python/src/error.rs 0.00% 4 Missing ⚠️
python/src/lib.rs 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2922      +/-   ##
==========================================
- Coverage   72.25%   72.19%   -0.07%     
==========================================
  Files         128      129       +1     
  Lines       40329    40369      +40     
  Branches    40329    40369      +40     
==========================================
+ Hits        29141    29143       +2     
- Misses       9335     9371      +36     
- Partials     1853     1855       +2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Collaborator

@ion-elgreco ion-elgreco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am on the fence with adding support like this in its current state

There were previous discussions on this, and I believe using an external TableProvider that can be registered in Datafusion python is the best way. At this stage that's not possible yet though

However maybe to make this more practical and have it working as a drop in replacement for to_pyarrow_dataset, it should take in pyarrow expressions which are then mapped to Datafusion expressions

@rtyler
Copy link
Member

rtyler commented Oct 5, 2024

@ion-elgreco why bother mapping into pyarrow? The python library has to have datafusion bundled either way, I think it's valuable to expose it to users This approach is in line with what I had suggested previously, exposing just a simple way to get at Datafusion and return record batches for the users.

The datafusion-python discussion has taken months and frankly I don't see much value in Datafusion bindings for Python to begin with, so I don't hang high hopes on that meeting the use-case here.

I would suggest, if @PeterKeDer is open to it, inverting this a little bit. Adding predicate searches on a loaded DeltaTable leads to exposing a subset of Datafusion and is likely to see API bloat in the future as users would want more functionality from Datafusion up in Python.

Perhaps a:

class QueryBuilder:
  def __init__(self):
     # some state, like SessionContext
  def use(self, name, table):
     # for registering table in that context
  def execute(self, df_sql) -> List[RecordBatch]:
     # Do the thing

Such that a user would be able to:

dt = DeltaTable('s3://foo')
dt2 = DeltaTable('s3://bar')

qe = QueryBuilder().use('foo', dt).use('bar', dt2)

# Forgive lazy SQL syntax, it's early =_=
results = qe.execute('SELECT foo.id, bar.description FROM foo JOIN foo.id = bar.foreign_id')

That way users would be able to use multiple tables in their query if they have them, and we wouldn't need to do much result pruning from Datafusion since they could just use the SQL engine directly to pull the information out that they want.

@ion-elgreco
Copy link
Collaborator

@rtyler most integrations use the pyarrow dataset interface and the have mapped their own expressions to pyarrow expressions, so was simply thinking it would be a replacement for that.

I like the queryBuilder approach though! Maybe call it register over use.

@PeterKeDer
Copy link
Contributor Author

@ion-elgreco @rtyler thanks for the suggestions!

I will look into the QueryBuilder approach. Just so I understand correctly, the general idea is that we leave functionality like projecting/filtering/joining to datafusion SQL, instead of duplicating them in our python API?

@rtyler rtyler marked this pull request as draft October 16, 2024 10:14
@rtyler rtyler self-assigned this Oct 26, 2024
@ion-elgreco
Copy link
Collaborator

With this PR apache/datafusion-python#921 we don't need this anymore since people can register our tables directly in Datafusion, so let's wait for that

@houqp
Copy link
Member

houqp commented Oct 28, 2024

The datafusion python change would still require users to install a compatible datafusion python package and learn the datafusion python api right? Given that we already have datafusion embedded as a dependency, it seems like a better out of the box experience for our users if they can get much faster data scan with a single api call without any extra setup.

@ion-elgreco
Copy link
Collaborator

The datafusion python change would still require users to install a compatible datafusion python package and learn the datafusion python api right? Given that we already have datafusion embedded as a dependency, it seems like a better out of the box experience for our users if they can get much faster data scan with a single api call without any extra setup.

Any version after Datafusion with ffi support would be compatible.

I don't think it's worth the maintenance, and API bloat for something that can be solved natively within Datafusion-python.

@rtyler
Copy link
Member

rtyler commented Oct 29, 2024

I share @houqp 's concerns about the datafusion-python code. Considering how early in draft status it is, and points to unreleased datafusion code 🙈 it's obviously still too early to tell. I'm comfortable keeping this in a draft state for the time being.

@github-actions github-actions bot added the binding/rust Issues for the Rust crate label Nov 2, 2024
Signed-off-by: Peter Ke <[email protected]>
@PeterKeDer
Copy link
Contributor Author

Hey @rtyler @ion-elgreco I took a shot implementing the QueryBuilder approach. IMO this would still be pretty valuable for people who don't want to manage a separate datafusion-python dependency. Would appreciate another look!

@PeterKeDer PeterKeDer marked this pull request as ready for review November 2, 2024 17:54
Signed-off-by: Peter Ke <[email protected]>
rtyler
rtyler previously approved these changes Nov 3, 2024
Copy link
Member

@rtyler rtyler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a cute little API addition. @ion-elgreco do you know of a good way we could mark this as experimental or subject to change? I understand your reluctance to add more API surface to maintain but I am curious to see how people interact/use this type of API.

If we find the datafusion-python route eventually to be more valuable, then we could remove the experimental API

let log_store = delta_table._table.log_store();

let scan_config = DeltaScanConfigBuilder::default()
.with_parquet_pushdown(false)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be true in this case to get proper pushdown

@ion-elgreco
Copy link
Collaborator

@rtyler we can raise a warning when calling QueryBuilder.

Something like this:

class ExperimentalWarning(Warning))

And then call warnings.warn with that and some text

Signed-off-by: Peter Ke <[email protected]>
@PeterKeDer
Copy link
Contributor Author

Thanks, I added the experimental warning to QueryBuilder

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/python Issues for the Python package binding/rust Issues for the Rust crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants