Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: omit unmodified files during merge write #1969

Merged
merged 14 commits into from
Dec 30, 2023
8 changes: 8 additions & 0 deletions crates/benchmarks/src/bin/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,14 @@ async fn benchmark_merge_tpcds(
.object_store()
.delete(&Path::parse("_delta_log/00000000000000000002.json")?)
.await?;
table
.object_store()
.delete(&Path::parse("_delta_log/00000000000000000003.json")?)
.await?;
let _ = table
.object_store()
.delete(&Path::parse("_delta_log/00000000000000000004.json")?)
.await;

Ok((duration, metrics))
}
Expand Down
16 changes: 16 additions & 0 deletions crates/deltalake-core/src/delta_datafusion/logical.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Logical Operations for DataFusion

use std::collections::HashSet;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use hashbrown here instead?

I read somewhere it's the default in a newer version in the std library but not sure.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yah as you called out they are changing the underlying implementation to be hashbrown


use datafusion_expr::{LogicalPlan, UserDefinedLogicalNodeCore};

// Metric Observer is used to update DataFusion metrics from a record batch.
Expand All @@ -10,6 +12,7 @@ pub(crate) struct MetricObserver {
// id is preserved during conversion to physical node
pub id: String,
pub input: LogicalPlan,
pub enable_pushdown: bool,
}

impl UserDefinedLogicalNodeCore for MetricObserver {
Expand All @@ -35,6 +38,18 @@ impl UserDefinedLogicalNodeCore for MetricObserver {
write!(f, "MetricObserver id={}", &self.id)
}

fn prevent_predicate_push_down_columns(&self) -> HashSet<String> {
if self.enable_pushdown {
HashSet::new()
} else {
self.schema()
.fields()
.iter()
.map(|f| f.name().clone())
.collect()
}
}

fn from_template(
&self,
_exprs: &[datafusion_expr::Expr],
Expand All @@ -43,6 +58,7 @@ impl UserDefinedLogicalNodeCore for MetricObserver {
MetricObserver {
id: self.id.clone(),
input: inputs[0].clone(),
enable_pushdown: self.enable_pushdown,
}
}
}
56 changes: 30 additions & 26 deletions crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema, SchemaR
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use arrow_array::types::UInt16Type;
use arrow_array::{Array, DictionaryArray, StringArray};
use arrow_array::{Array, DictionaryArray, StringArray, TypedDictionaryArray};
use arrow_cast::display::array_value_to_string;

use arrow_schema::Field;
Expand Down Expand Up @@ -132,6 +132,21 @@ fn get_scalar_value(value: Option<&ColumnValueStat>, field: &Arc<Field>) -> Prec
}
}

pub(crate) fn get_path_column<'a>(
batch: &'a RecordBatch,
path_column: &str,
) -> DeltaResult<TypedDictionaryArray<'a, UInt16Type, StringArray>> {
let err = || DeltaTableError::Generic("Unable to obtain Delta-rs path column".to_string());
batch
.column_by_name(path_column)
.unwrap()
.as_any()
.downcast_ref::<DictionaryArray<UInt16Type>>()
.ok_or_else(err)?
.downcast_dict::<StringArray>()
.ok_or_else(err)
}

impl DeltaTableState {
/// Provide table level statistics to Datafusion
pub fn datafusion_table_statistics(&self) -> DataFusionResult<Statistics> {
Expand Down Expand Up @@ -1362,31 +1377,20 @@ fn join_batches_with_add_actions(

let mut files = Vec::with_capacity(batches.iter().map(|batch| batch.num_rows()).sum());
for batch in batches {
let array = batch.column_by_name(path_column).ok_or_else(|| {
DeltaTableError::Generic(format!("Unable to find column {}", path_column))
})?;

let iter: Box<dyn Iterator<Item = Option<&str>>> =
if dict_array {
let array = array
.as_any()
.downcast_ref::<DictionaryArray<UInt16Type>>()
.ok_or(DeltaTableError::Generic(format!(
"Unable to downcast column {}",
path_column
)))?
.downcast_dict::<StringArray>()
.ok_or(DeltaTableError::Generic(format!(
"Unable to downcast column {}",
path_column
)))?;
Box::new(array.into_iter())
} else {
let array = array.as_any().downcast_ref::<StringArray>().ok_or(
DeltaTableError::Generic(format!("Unable to downcast column {}", path_column)),
)?;
Box::new(array.into_iter())
};
let err = || DeltaTableError::Generic("Unable to obtain Delta-rs path column".to_string());

let iter: Box<dyn Iterator<Item = Option<&str>>> = if dict_array {
let array = get_path_column(&batch, path_column)?;
Box::new(array.into_iter())
} else {
let array = batch
.column_by_name(path_column)
.ok_or_else(err)?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(err)?;
Box::new(array.into_iter())
};

for path in iter {
let path = path.ok_or(DeltaTableError::Generic(format!(
Expand Down
Loading
Loading