Skip to content

Commit

Permalink
Fix the bug
Browse files Browse the repository at this point in the history
  • Loading branch information
adamfaulkner-at authored and ion-elgreco committed Oct 3, 2024
1 parent 59ef7e0 commit 8701046
Showing 1 changed file with 22 additions and 45 deletions.
67 changes: 22 additions & 45 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,15 @@ impl<'a> DeltaScanBuilder<'a> {
let mut exec_plan_builder = ParquetExecBuilder::new(FileScanConfig {
object_store_url: self.log_store.object_store_url(),
file_schema,
file_groups: file_groups.into_values().collect(),
// If all files were filtered out, we still need to emit at least one partition to
// pass datafusion sanity checks.
//
// See https://github.com/apache/datafusion/issues/11322
file_groups: if file_groups.is_empty() {
vec![vec![]]
} else {
file_groups.into_values().collect()
},
statistics: stats,
projection: self.projection.cloned(),
limit: self.limit,
Expand Down Expand Up @@ -2570,55 +2578,24 @@ mod tests {
}

#[tokio::test]
async fn parent_distribution_requirements_bug() {
let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();

let path = "/tmp/table";

let mut table = CreateBuilder::new()
.with_location(path)
.with_columns([StructField {
name: "a".to_string(),
data_type: delta_kernel::schema::DataType::STRING,
nullable: false,
metadata: HashMap::new(),
}])
.await
.unwrap();

table = crate::DeltaOps(table)
.write(vec![batch.clone()])
.with_save_mode(crate::protocol::SaveMode::Append)
.await
.unwrap();

table = crate::DeltaOps(table)
.write(vec![batch])
.with_save_mode(crate::protocol::SaveMode::Append)
async fn passes_sanity_checker_when_all_files_filtered() {
// Run a query that filters out all files and sorts.
// Verify that it returns an empty set of rows without panicing.
//
// Historically, we had a bug that caused us to emit a query plan with 0 partitions, which
// datafusion rejected.
let table = crate::open_table("../test/tests/data/delta-2.2.0-partitioned-types")
.await
.unwrap();
let ctx = SessionContext::new();
ctx.register_table("test", Arc::new(table)).unwrap();

let config = SessionConfig::default();
let ctx = SessionContext::new_with_config(config);

ctx.register_table("table", Arc::new(table)).unwrap();
ctx.sql("SELECT * FROM `table` WHERE `a` > 's' ORDER BY `a` ASC")
.await
.unwrap()
.collect()
let df = ctx
.sql("select * from test where c3 = 100 ORDER BY c1 ASC")
.await
.unwrap();
let actual = df.collect().await.unwrap();

let re_opened_table = open_table(path).await.unwrap();
ctx.register_table("re_opened_table", Arc::new(re_opened_table))
.unwrap();
ctx.sql("SELECT * FROM `re_opened_table` WHERE `a` > 's' ORDER BY `a` ASC")
.await
.unwrap()
.collect()
.await
.unwrap();
assert!(false);
assert_eq!(actual.len(), 0);
}
}

0 comments on commit 8701046

Please sign in to comment.