Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: read delta table using datafusion #2922

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -824,9 +824,12 @@ impl TableProvider for DeltaTableProvider {

fn supports_filters_pushdown(
&self,
_filter: &[&Expr],
filter: &[&Expr],
) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
Ok(vec![TableProviderFilterPushDown::Inexact])
Ok(filter
.iter()
.map(|_| TableProviderFilterPushDown::Inexact)
.collect())
}

fn statistics(&self) -> Option<Statistics> {
Expand Down
1 change: 1 addition & 0 deletions python/deltalake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from ._internal import __version__ as __version__
from ._internal import rust_core_version as rust_core_version
from .data_catalog import DataCatalog as DataCatalog
from .query import QueryBuilder as QueryBuilder
from .schema import DataType as DataType
from .schema import Field as Field
from .schema import Schema as Schema
Expand Down
5 changes: 5 additions & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,11 @@ class DeltaFileSystemHandler:
) -> ObjectOutputStream:
"""Open an output stream for sequential writing."""

class PyQueryBuilder:
def __init__(self) -> None: ...
def register(self, table_name: str, delta_table: RawDeltaTable) -> None: ...
def execute(self, sql: str) -> List[pyarrow.RecordBatch]: ...

class DeltaDataChecker:
def __init__(self, invariants: List[Tuple[str, str]]) -> None: ...
def check_batch(self, batch: pyarrow.RecordBatch) -> None: ...
Expand Down
31 changes: 31 additions & 0 deletions python/deltalake/query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from __future__ import annotations

import warnings
from typing import List

import pyarrow

from deltalake._internal import PyQueryBuilder
from deltalake.table import DeltaTable
from deltalake.warnings import ExperimentalWarning


class QueryBuilder:
def __init__(self) -> None:
warnings.warn(
"QueryBuilder is experimental and subject to change",
category=ExperimentalWarning,
)
self._query_builder = PyQueryBuilder()

def register(self, table_name: str, delta_table: DeltaTable) -> QueryBuilder:
"""Add a table to the query builder."""
self._query_builder.register(
table_name=table_name,
delta_table=delta_table._table,
)
return self

def execute(self, sql: str) -> List[pyarrow.RecordBatch]:
"""Execute the query and return a list of record batches."""
return self._query_builder.execute(sql)
2 changes: 2 additions & 0 deletions python/deltalake/warnings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class ExperimentalWarning(Warning):
pass
8 changes: 8 additions & 0 deletions python/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use arrow_schema::ArrowError;
use deltalake::datafusion::error::DataFusionError;
use deltalake::protocol::ProtocolError;
use deltalake::{errors::DeltaTableError, ObjectStoreError};
use pyo3::exceptions::{
Expand Down Expand Up @@ -79,6 +80,10 @@ fn checkpoint_to_py(err: ProtocolError) -> PyErr {
}
}

fn datafusion_to_py(err: DataFusionError) -> PyErr {
DeltaError::new_err(err.to_string())
}

#[derive(thiserror::Error, Debug)]
pub enum PythonError {
#[error("Error in delta table")]
Expand All @@ -89,6 +94,8 @@ pub enum PythonError {
Arrow(#[from] ArrowError),
#[error("Error in checkpoint")]
Protocol(#[from] ProtocolError),
#[error("Error in data fusion")]
DataFusion(#[from] DataFusionError),
}

impl From<PythonError> for pyo3::PyErr {
Expand All @@ -98,6 +105,7 @@ impl From<PythonError> for pyo3::PyErr {
PythonError::ObjectStore(err) => object_store_to_py(err),
PythonError::Arrow(err) => arrow_to_py(err),
PythonError::Protocol(err) => checkpoint_to_py(err),
PythonError::DataFusion(err) => datafusion_to_py(err),
}
}
}
14 changes: 12 additions & 2 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ mod error;
mod features;
mod filesystem;
mod merge;
mod query;
mod schema;
mod utils;

use std::collections::{HashMap, HashSet};
use std::future::IntoFuture;
use std::str::FromStr;
use std::sync::Arc;
use std::time;
use std::time::{SystemTime, UNIX_EPOCH};

Expand All @@ -17,12 +19,18 @@ use delta_kernel::expressions::Scalar;
use delta_kernel::schema::StructField;
use deltalake::arrow::compute::concat_batches;
use deltalake::arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream};
use deltalake::arrow::pyarrow::ToPyArrow;
use deltalake::arrow::record_batch::{RecordBatch, RecordBatchIterator};
use deltalake::arrow::{self, datatypes::Schema as ArrowSchema};
use deltalake::checkpoints::{cleanup_metadata, create_checkpoint};
use deltalake::datafusion::datasource::provider_as_source;
use deltalake::datafusion::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE};
use deltalake::datafusion::physical_plan::ExecutionPlan;
use deltalake::datafusion::prelude::SessionContext;
use deltalake::delta_datafusion::DeltaDataChecker;
use deltalake::datafusion::prelude::{DataFrame, SessionContext};
use deltalake::delta_datafusion::{
DataFusionMixins, DeltaDataChecker, DeltaScanConfigBuilder, DeltaSessionConfig,
DeltaTableProvider,
};
use deltalake::errors::DeltaTableError;
use deltalake::kernel::{
scalars::ScalarExt, Action, Add, Invariant, LogicalFile, Remove, StructType, Transaction,
Expand Down Expand Up @@ -66,6 +74,7 @@ use crate::error::PythonError;
use crate::features::TableFeatures;
use crate::filesystem::FsConfig;
use crate::merge::PyMergeBuilder;
use crate::query::PyQueryBuilder;
use crate::schema::{schema_to_pyobject, Field};
use crate::utils::rt;

Expand Down Expand Up @@ -2069,6 +2078,7 @@ fn _internal(m: &Bound<'_, PyModule>) -> PyResult<()> {
)?)?;
m.add_class::<RawDeltaTable>()?;
m.add_class::<PyMergeBuilder>()?;
m.add_class::<PyQueryBuilder>()?;
m.add_class::<RawDeltaTableMetaData>()?;
m.add_class::<PyDeltaDataChecker>()?;
m.add_class::<PyTransaction>()?;
Expand Down
58 changes: 58 additions & 0 deletions python/src/query.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::sync::Arc;

use deltalake::{
arrow::pyarrow::ToPyArrow,
datafusion::prelude::SessionContext,
delta_datafusion::{DeltaScanConfigBuilder, DeltaSessionConfig, DeltaTableProvider},
};
use pyo3::prelude::*;

use crate::{error::PythonError, utils::rt, RawDeltaTable};

#[pyclass(module = "deltalake._internal")]
pub(crate) struct PyQueryBuilder {
_ctx: SessionContext,
}

#[pymethods]
impl PyQueryBuilder {
#[new]
pub fn new() -> Self {
let config = DeltaSessionConfig::default().into();
let _ctx = SessionContext::new_with_config(config);

PyQueryBuilder { _ctx }
}

pub fn register(&self, table_name: &str, delta_table: &RawDeltaTable) -> PyResult<()> {
let snapshot = delta_table._table.snapshot().map_err(PythonError::from)?;
let log_store = delta_table._table.log_store();

let scan_config = DeltaScanConfigBuilder::default()
.build(snapshot)
.map_err(PythonError::from)?;

let provider = Arc::new(
DeltaTableProvider::try_new(snapshot.clone(), log_store, scan_config)
.map_err(PythonError::from)?,
);

self._ctx
.register_table(table_name, provider)
.map_err(PythonError::from)?;

Ok(())
}

pub fn execute(&self, py: Python, sql: &str) -> PyResult<PyObject> {
let batches = py.allow_threads(|| {
rt().block_on(async {
let df = self._ctx.sql(sql).await?;
df.collect().await
})
.map_err(PythonError::from)
})?;

batches.to_pyarrow(py)
}
}
54 changes: 54 additions & 0 deletions python/tests/test_table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from deltalake._util import encode_partition_value
from deltalake.exceptions import DeltaProtocolError
from deltalake.query import QueryBuilder
from deltalake.table import ProtocolVersions
from deltalake.writer import write_deltalake

Expand Down Expand Up @@ -946,3 +947,56 @@ def test_is_deltatable_with_storage_opts():
"DELTA_DYNAMO_TABLE_NAME": "custom_table_name",
}
assert DeltaTable.is_deltatable(table_path, storage_options=storage_options)


def test_read_query_builder():
table_path = "../crates/test/tests/data/delta-0.8.0-partitioned"
dt = DeltaTable(table_path)
expected = {
"value": ["4", "5", "6", "7"],
"year": ["2021", "2021", "2021", "2021"],
"month": ["4", "12", "12", "12"],
"day": ["5", "4", "20", "20"],
}
actual = pa.Table.from_batches(
QueryBuilder()
.register("tbl", dt)
.execute("SELECT * FROM tbl WHERE year >= 2021 ORDER BY value")
).to_pydict()
assert expected == actual


def test_read_query_builder_join_multiple_tables(tmp_path):
table_path = "../crates/test/tests/data/delta-0.8.0-date"
dt1 = DeltaTable(table_path)

write_deltalake(
tmp_path,
pa.table(
{
"date": ["2021-01-01", "2021-01-02", "2021-01-03", "2021-12-31"],
"value": ["a", "b", "c", "d"],
}
),
)
dt2 = DeltaTable(tmp_path)

expected = {
"date": ["2021-01-01", "2021-01-02", "2021-01-03"],
"dayOfYear": [1, 2, 3],
"value": ["a", "b", "c"],
}
actual = pa.Table.from_batches(
QueryBuilder()
.register("tbl1", dt1)
.register("tbl2", dt2)
.execute(
"""
SELECT tbl2.date, tbl1.dayOfYear, tbl2.value
FROM tbl1
INNER JOIN tbl2 ON tbl1.date = tbl2.date
ORDER BY tbl1.date
"""
)
).to_pydict()
assert expected == actual
Loading