Skip to content

Commit

Permalink
Correctly handle hidden files in _change_data and _delta_index & dele…
Browse files Browse the repository at this point in the history
…tion vector files
  • Loading branch information
Jan-Schweizer committed Nov 11, 2023
1 parent 140f949 commit a327fa8
Showing 1 changed file with 43 additions and 10 deletions.
53 changes: 43 additions & 10 deletions crates/deltalake-core/src/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use object_store::Error;
use object_store::{path::Path, ObjectStore};
use serde::Serialize;
use serde_json::Value;
use url::Url;

use super::transaction::commit;
use crate::crate_version;
Expand Down Expand Up @@ -198,27 +199,44 @@ impl VacuumBuilder {
.snapshot
.files()
.iter()
.map(|a| a.path.as_str())
.map(|a| a.path.clone())
.chain(
self.snapshot
.all_tombstones()
.iter()
.map(|r| r.path.as_str()),
.map(|r| r.path.clone()),
)
.collect::<HashSet<&str>>();
.chain(self.snapshot.files().iter().filter_map(|a| {
return if let Some(deletion_vector) = &a.deletion_vector {
if let Ok(parent) = &Url::parse(self.log_store.root_uri().as_str()) {
if let Ok(dv_absolut_path) = deletion_vector.absolute_path(&parent) {
Some(dv_absolut_path?.to_string())
} else {
None
}
} else {
None
}
} else {
None
};
}))
.collect::<HashSet<String>>();

while let Some(obj_meta) = all_files.next().await {
// TODO should we allow NotFound here in case we have a temporary commit file in the list
let obj_meta = obj_meta.map_err(DeltaTableError::from)?;
let is_hidden = is_hidden_directory(partition_columns, &obj_meta.location)?;

if is_hidden_file(partition_columns, &obj_meta.location)? {
continue;
}

if managed_files.contains(obj_meta.location.as_ref()) {
if !expired_tombstones.contains(obj_meta.location.as_ref()) || is_hidden {
if !expired_tombstones.contains(obj_meta.location.as_ref()) {
continue;
}
} else if now_millis - retention_period.num_milliseconds()
< obj_meta.last_modified.timestamp_millis()
|| is_hidden
{
continue;
}
Expand Down Expand Up @@ -380,15 +398,15 @@ impl VacuumPlan {
/// Names of the form partitionCol=[value] are partition directories, and should be
/// deleted even if they'd normally be hidden. The _db_index directory contains (bloom filter)
/// indexes and these must be deleted when the data they are tied to is deleted.
fn is_hidden_directory(partition_columns: &[String], path: &Path) -> Result<bool, DeltaTableError> {
fn is_hidden_file(partition_columns: &[String], path: &Path) -> Result<bool, DeltaTableError> {
let path_name = path.as_ref();
let skip = path_name.starts_with("_delta_index") || path_name.starts_with("_change_data");
let is_hidden = path
.parts()
.skip(skip as usize)
.any(|p| p.as_ref().starts_with('.') || p.as_ref().starts_with('_'));

let path_name = path.as_ref();
Ok(is_hidden
&& !path_name.starts_with("_delta_index")
&& !path_name.starts_with("_change_data")
&& !partition_columns
.iter()
.any(|partition_column| path_name.starts_with(partition_column)))
Expand Down Expand Up @@ -468,4 +486,19 @@ mod tests {

assert_eq!(result.files_deleted, empty);
}

#[tokio::test]
async fn vacuum_table_with_dv_small() {
let table = open_table("./tests/data/table-with-dv-small")
.await
.unwrap();

let (_table, result) = VacuumBuilder::new(table.log_store, table.state)
.with_dry_run(true)
.await
.unwrap();

let empty: Vec<String> = Vec::new();
assert_eq!(result.files_deleted, empty);
}
}

0 comments on commit a327fa8

Please sign in to comment.