diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 27c586c4a..a070db9fb 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -52,7 +52,7 @@ lazy_static = { workspace = true } log = { workspace = true } murmur3 = { workspace = true } once_cell = { workspace = true } -opendal = { workspace = true, features = ["services-s3"] } +opendal = { workspace = true, features = ["services-s3", "services-fs"] } ordered-float = { workspace = true } parquet = { workspace = true, features = ["async"] } reqwest = { workspace = true } diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index f2e740aef..e2ea150ce 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -19,7 +19,7 @@ use crate::spec::{ FormatVersion, Schema, Snapshot, SnapshotReference, SortOrder, TableMetadataBuilder, - UnboundPartitionSpec, ViewRepresentation, + UnboundPartitionSpec, ViewRepresentation, ViewVersion, }; use crate::table::Table; use crate::{Error, ErrorKind, Result}; @@ -463,6 +463,64 @@ pub struct ViewCreation { pub summary: HashMap, } +/// ViewUpdate represents an update to a view in the catalog. +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(tag = "action", rename_all = "kebab-case")] +pub enum ViewUpdate { + /// Assign a new UUID to the view + #[serde(rename_all = "kebab-case")] + AssignUuid { + /// The new UUID to assign. + uuid: uuid::Uuid, + }, + /// Upgrade view's format version + #[serde(rename_all = "kebab-case")] + UpgradeFormatVersion { + /// Target format upgrade to. + format_version: i32, + }, + /// Add a new schema to the view + #[serde(rename_all = "kebab-case")] + AddSchema { + /// The schema to add. + schema: Schema, + /// The last column id of the view. + last_column_id: Option, + }, + /// Set view's current schema + #[serde(rename_all = "kebab-case")] + SetLocation { + /// New location for view. + location: String, + }, + /// Set view's properties + /// + /// Matching keys are updated, and non-matching keys are left unchanged. + #[serde(rename_all = "kebab-case")] + SetProperties { + /// Properties to update for view. + updates: HashMap, + }, + /// Remove view's properties + #[serde(rename_all = "kebab-case")] + RemoveProperties { + /// Properties to remove + removals: Vec, + }, + /// Add a new version to the view + #[serde(rename_all = "kebab-case")] + AddViewVersion { + /// The view version to add. + view_version: ViewVersion, + }, + /// Set view's current version + #[serde(rename_all = "kebab-case")] + SetCurrentViewVersion { + /// View version id to set as current, or -1 to set last added version + view_version_id: i32, + }, +} + #[cfg(test)] mod tests { use crate::spec::{ diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 3d24ec39d..407009861 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -38,6 +38,7 @@ pub use catalog::TableIdent; pub use catalog::TableRequirement; pub use catalog::TableUpdate; pub use catalog::ViewCreation; +pub use catalog::ViewUpdate; #[allow(dead_code)] pub mod table; diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs index 7644d181d..793f00d34 100644 --- a/crates/iceberg/src/spec/mod.rs +++ b/crates/iceberg/src/spec/mod.rs @@ -42,4 +42,3 @@ pub use transform::*; pub use values::*; pub use view_metadata::*; pub use view_version::*; -// pub use view_metadata::*; diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 3524595ba..4f7ab435e 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -45,7 +45,8 @@ pub static DEFAULT_SPEC_ID: i32 = 0; pub static DEFAULT_SORT_ORDER_ID: i64 = 0; pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1; -pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 0; +// TODO: spark numbers from one and so does tabular +pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 1; /// Reference to [`TableMetadata`]. pub type TableMetadataRef = Arc; diff --git a/crates/iceberg/src/spec/view_metadata.rs b/crates/iceberg/src/spec/view_metadata.rs index 6282e734b..1b52ed45b 100644 --- a/crates/iceberg/src/spec/view_metadata.rs +++ b/crates/iceberg/src/spec/view_metadata.rs @@ -21,20 +21,24 @@ use serde::{Deserialize, Serialize}; use serde_repr::{Deserialize_repr, Serialize_repr}; use std::cmp::Ordering; +use std::collections::HashSet; use std::fmt::{Display, Formatter}; use std::{collections::HashMap, sync::Arc}; use uuid::Uuid; use super::{ view_version::{ViewVersion, ViewVersionRef}, - SchemaId, SchemaRef, + Schema, SchemaId, SchemaRef, ViewRepresentation, }; use crate::catalog::ViewCreation; use crate::error::Result; use _serde::ViewMetadataEnum; +use crate::Error; use chrono::{DateTime, TimeZone, Utc}; +use itertools::{FoldWhile, Itertools}; +use crate::spec::view_properties::{REPLACE_DROP_DIALECT_ALLOWED, REPLACE_DROP_DIALECT_ALLOWED_DEFAULT, VERSION_HISTORY_SIZE, VERSION_HISTORY_SIZE_DEFAULT}; /// Reference to [`ViewMetadata`]. pub type ViewMetadataRef = Arc; @@ -47,11 +51,11 @@ pub type ViewMetadataRef = Arc; /// We check the validity of this data structure when constructing. pub struct ViewMetadata { /// Integer Version for the format. - pub(crate) format_version: ViewFormatVersion, + pub format_version: ViewFormatVersion, /// A UUID that identifies the view, generated when the view is created. - pub(crate) view_uuid: Uuid, + pub view_uuid: Uuid, /// The view's base location; used to create metadata file locations - pub(crate) location: String, + pub location: String, /// ID of the current version of the view (version-id) pub current_version_id: i64, /// A list of known versions of the view @@ -60,11 +64,11 @@ pub struct ViewMetadata { /// change to current-version-id pub version_log: Vec, /// A list of schemas, stored as objects with schema-id. - pub(crate) schemas: HashMap, + pub schemas: HashMap, /// A string to string map of view properties. /// Properties are used for metadata such as comment and for settings that /// affect view maintenance. This is not intended to be used for arbitrary metadata. - pub(crate) properties: HashMap, + pub properties: HashMap, } impl ViewMetadata { @@ -139,12 +143,112 @@ impl ViewMetadata { } /// Append view version to view - pub fn append_version(&mut self, view_version: ViewVersion) { - self.current_version_id = view_version.version_id(); + fn add_version(&mut self, view_version: ViewVersion) -> Result> { + if self.versions.contains_key(&view_version.version_id()) { + return Err(crate::Error::new( + crate::ErrorKind::DataInvalid, + format!( + "A version with the version id {} already exists.", + view_version.version_id() + ), + )); + } - self.version_log.push(view_version.log()); - self.versions - .insert(view_version.version_id(), Arc::new(view_version)); + let (exist, max_version) = self + .versions + .values() + .fold_while((None, 0), |(_, max), v| { + if is_same_version(v, &view_version) { + FoldWhile::Done((Some(v.version_id()), max)) + } else { + FoldWhile::Continue((None, max.max(v.version_id()))) + } + }) + .into_inner(); + if let Some(v) = exist { + return Ok(AddedOrPresent::AlreadyPresent(v)); + } + + let new_version_id = max_version + 1; + + let mut view_version = view_version; + view_version.version_id = new_version_id; + self.versions.insert(new_version_id, Arc::new(view_version)); + Ok(AddedOrPresent::Added(new_version_id)) + } + + fn set_current_version_id(&mut self, current_version_id: i64) -> Result<()> { + if !self.versions.contains_key(¤t_version_id) { + return Err(crate::Error::new( + crate::ErrorKind::DataInvalid, + format!( + "No version exists with the version id {}.", + current_version_id + ), + )); + } + self.current_version_id = current_version_id; + self.version_log + .push(ViewVersionLog::now(current_version_id)); + Ok(()) + } + + fn add_schema(&mut self, schema: Schema) -> Result> { + if self.schemas.contains_key(&schema.schema_id()) { + return Err(crate::Error::new( + crate::ErrorKind::DataInvalid, + format!( + "A schema with the schema id {} already exists.", + schema.schema_id() + ), + )); + } + // Not sure why, but the java implementation replaces schema_ids by internally computed ids + let (maybe_existing, max_schema_id) = self + .schemas + .values() + .fold_while((None, 0), |(_, max), s| { + if is_same_schema(s, &schema) { + FoldWhile::Done((Some(s.schema_id()), max)) + } else { + FoldWhile::Continue((None, max.max(s.schema_id()))) + } + }) + .into_inner(); + + if let Some(existing) = maybe_existing { + return Ok(AddedOrPresent::AlreadyPresent(existing)); + } + + let schema_id = max_schema_id + 1; + + // TODO: use for updates + let _highest_field_id = self.highest_field_id().max(schema.highest_field_id()); + + let schema = Arc::new(schema.into_builder().with_schema_id(schema_id).build()?); + let schema_id = schema.schema_id(); + self.schemas.insert(schema_id, schema); + Ok(AddedOrPresent::Added(schema_id)) + } + + fn set_properties(&mut self, properties: HashMap) { + self.properties.extend(properties); + } + + fn remove_properties(&mut self, keys: HashSet<&String>) { + self.properties.retain(|k, _| !keys.contains(k)); + } + + fn assign_uuid(&mut self, uuid: Uuid) { + self.view_uuid = uuid; + } + + fn highest_field_id(&self) -> i32 { + self.schemas + .values() + .map(|s| s.highest_field_id()) + .max() + .unwrap_or(0) } /// Returns view history. @@ -154,13 +258,163 @@ impl ViewMetadata { } } +/// `ViewVersion` wrapper to allow appending a new version or replacing the current version. +/// +/// `Append` will add a new version without setting it as the current version. +/// `AsCurrent` will add a new version and set it as the current version. +#[derive(Debug, Clone)] +pub enum AppendViewVersion { + /// Append a new version to the view. + Append(ViewVersion), + /// Replace the current version with a new version. + AsCurrent(ViewVersion), +} + +enum AddedOrPresent { + Added(T), + AlreadyPresent(T), +} + +impl AsRef for AppendViewVersion { + fn as_ref(&self) -> &ViewVersion { + match self { + AppendViewVersion::Append(v) => v, + AppendViewVersion::AsCurrent(v) => v, + } + } +} + +impl AsMut for AppendViewVersion { + fn as_mut(&mut self) -> &mut ViewVersion { + match self { + AppendViewVersion::Append(v) => v, + AppendViewVersion::AsCurrent(v) => v, + } + } +} + +// Checks whether the given view versions would behave the same while ignoring the view version +// id, the creation timestamp, and the operation. +fn is_same_version(a: &ViewVersion, b: &ViewVersion) -> bool { + a.summary == b.summary + && a.representations == b.representations + && a.default_catalog == b.default_catalog + && a.default_namespace == b.default_namespace + && a.schema_id == b.schema_id +} + +fn is_same_schema(a: &Schema, b: &Schema) -> bool { + a.as_struct() == b.as_struct() && a.identifier_field_ids() == b.identifier_field_ids() +} + /// Manipulating view metadata. -pub struct ViewMetadataBuilder(ViewMetadata); +pub struct ViewMetadataBuilder { + previous: ViewVersionRef, + metadata: ViewMetadata, + last_added_version: Option, + last_added_schema: Option, + added_versions: usize, // TODO: Update tracking needed? +} +// TODO: these errors don't match at all impl ViewMetadataBuilder { /// Creates a new view metadata builder from the given view metadata. pub fn new(origin: ViewMetadata) -> Self { - Self(origin) + Self { + previous: origin.current_version().clone(), + metadata: origin, + last_added_version: None, + last_added_schema: None, + added_versions: 0, + } + } + + /// Adds a new version to the view metadata. + /// + /// If the schema id is -1, the schema id of the last added schema will be used. + /// If the view version matches an existing version (ignoring the version id, creation timestamp, and operation), + /// a new version will not be added. + /// + /// # Errors + /// - If the schema id is -1 and no schema was added before, an error will be returned. + /// - If the view version contains multiple queries for the same dialect, an error will be returned. + /// - If the view version is already present, an error will be returned. + pub fn add_version(mut self, mut version: AppendViewVersion) -> Result { + let schema_id = version.as_ref().schema_id(); + if schema_id == -1 { + version.as_mut().schema_id = self.last_added_schema.ok_or(Error::new( + crate::ErrorKind::DataInvalid, + "Cannot set schema to last added without adding schema before.", + ))?; + } + + let (_, maybe_err) = version.as_ref().representations().iter().fold_while((HashSet::new(), None), |(mut dialects, _), r| match r { + ViewRepresentation::SqlViewRepresentation(sql) => { + if dialects.insert(sql.dialect.as_str()) { + FoldWhile::Continue((dialects, None)) + } else { + FoldWhile::Done((dialects, Some(Error::new( + crate::ErrorKind::DataInvalid, + format!("Invalid view version: Cannot add multiple queries for dialect {}", sql.dialect), + )))) + } + } + }).into_inner(); + if let Some(err) = maybe_err { + return Err(err); + } + + match version { + AppendViewVersion::Append(version) => { + self.add_and_maybe_set_last_version(version)?; + Ok(self) + } + AppendViewVersion::AsCurrent(version) => { + let version_id = version.version_id(); + self.add_and_maybe_set_last_version(version)?; + Ok(self.set_current_version_id(version_id)?) + } + } + } + + /// Adds a new schema to the view metadata. + /// + /// If the schema matches an existing schema, a new schema will not be added. + /// Any set schema_id will not be respected, it will be set to the next available id. + pub fn add_schema(mut self, schema: Schema) -> Result { + match self.metadata.add_schema(schema)? { + AddedOrPresent::Added(added) => { + self.last_added_schema = Some(added); + } + AddedOrPresent::AlreadyPresent(_) => {} + }; + Ok(self) + } + + fn add_and_maybe_set_last_version(&mut self, version: ViewVersion) -> Result<()> { + match self.metadata.add_version(version)? { + AddedOrPresent::Added(added) => { + self.added_versions += 1; + self.last_added_version = Some(added); + } + AddedOrPresent::AlreadyPresent(_) => {} + }; + Ok(()) + } + + /// Sets the current version id. + pub fn set_current_version_id(mut self, current_version_id: i64) -> Result { + if current_version_id == -1 { + if let Some(last_added_version) = self.last_added_version { + return self.set_current_version_id(last_added_version); + } + return Err(crate::Error::new( + crate::ErrorKind::DataInvalid, + "Cannot set current version to last added without adding version before.", + )); + } + self.metadata.set_current_version_id(current_version_id)?; + Ok(self) } /// Creates a new view metadata builder from the given view creation. @@ -194,26 +448,152 @@ impl ViewMetadataBuilder { location, current_version_id: initial_version_id, versions, - version_log: Vec::new(), + version_log: vec![ViewVersionLog::now(initial_version_id)], schemas: HashMap::from_iter(vec![(schema.schema_id(), Arc::new(schema))]), properties, }; - Ok(Self(view_metadata)) + Ok(Self { + previous: view_metadata.current_version().clone(), + metadata: view_metadata, + last_added_version: None, + last_added_schema: None, + added_versions: 1, + }) } /// Changes uuid of view metadata. - pub fn assign_uuid(mut self, uuid: Uuid) -> Result { - self.0.view_uuid = uuid; - Ok(self) + pub fn assign_uuid(mut self, uuid: Uuid) -> Self { + self.metadata.assign_uuid(uuid); + self + } + + /// Updates view properties, replacing existing keys, leaving old entries. + pub fn set_properties(mut self, properties: HashMap) -> Self { + self.metadata.set_properties(properties); + self + } + + /// Removes view properties by keys. + pub fn remove_properties(mut self, keys: HashSet<&String>) -> Self { + self.metadata.remove_properties(keys); + self } /// Returns the new view metadata after changes. - pub fn build(self) -> Result { - Ok(self.0) + pub fn build(mut self) -> Result { + if self.metadata.versions.is_empty() { + return Err(crate::Error::new( + crate::ErrorKind::DataInvalid, + "Cannot create view metadata without versions.", + )); + } + + if self + .metadata + .properties() + .get(REPLACE_DROP_DIALECT_ALLOWED) + .map(|s| s == "true") + .unwrap_or(REPLACE_DROP_DIALECT_ALLOWED_DEFAULT) + { + check_if_dialect_is_dropped( + self.previous.as_ref(), + self.metadata.current_version().as_ref(), + )?; + } + + let history_size = dbg!(self + .metadata + .properties() + .get(VERSION_HISTORY_SIZE) + .map(|s| { + s.parse().map_err(|_| { + crate::Error::new( + crate::ErrorKind::DataInvalid, + format!( + "{} must be positive int but was: {}", + VERSION_HISTORY_SIZE, s + ), + ) + }) + }) + .transpose()? + .unwrap_or(VERSION_HISTORY_SIZE_DEFAULT)); + + if history_size < 1 { + return Err(crate::Error::new( + crate::ErrorKind::DataInvalid, + format!( + "{} must be positive int but was: {}", + VERSION_HISTORY_SIZE, history_size + ), + )); + } + + if self.metadata.versions.len() > history_size.max(self.added_versions) { + let mut versions = self.metadata.versions.keys().copied().collect::>(); + versions.sort(); + let to_remove = versions + .iter() + .take(versions.len() - history_size) + .copied() + .collect::>(); + self.metadata + .version_log + .retain(|log| !to_remove.contains(&log.version_id)); + self.metadata + .versions + .retain(|version_id, _| !to_remove.contains(version_id)); + } + Ok(self.metadata) } } +fn check_if_dialect_is_dropped(previous: &ViewVersion, current: &ViewVersion) -> Result<()> { + let base_dialects = sql_dialects_for(previous); + let updated_dialects = sql_dialects_for(current); + + if !updated_dialects.is_superset(&base_dialects) { + return Err(crate::Error::new( + crate::ErrorKind::DataInvalid, + format!( + "Cannot replace view due to loss of view dialects ({REPLACE_DROP_DIALECT_ALLOWED}=false):\nPrevious dialects: {:?}\nNew dialects: {:?}", + base_dialects, + updated_dialects + ), + )); + } + Ok(()) +} + +fn sql_dialects_for(view_version: &ViewVersion) -> HashSet { + view_version + .representations() + .iter() + .map(|repr| match repr { + ViewRepresentation::SqlViewRepresentation(sql) => sql.dialect.to_lowercase(), + }) + .collect() +} + +/// View metadata properties. +pub mod view_properties { + /// View metadata version history size + pub const VERSION_HISTORY_SIZE: &str = "version.history.num-entries"; + /// Default view metadata version history size + pub const VERSION_HISTORY_SIZE_DEFAULT: usize = 10; + /// View metadata compression codec + pub const METADATA_COMPRESSION: &str = "write.metadata.compression-codec"; + /// Default view metadata compression codec + pub const METADATA_COMPRESSION_DEFAULT: &str = "gzip"; + /// View metadata comment + pub const COMMENT: &str = "comment"; + /// View metadata replace drop dialect allowed + pub const REPLACE_DROP_DIALECT_ALLOWED: &str = "replace.drop-dialect.allowed"; + /// Default view metadata replace drop dialect allowed + pub const REPLACE_DROP_DIALECT_ALLOWED_DEFAULT: bool = false; +} + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] #[serde(rename_all = "kebab-case")] /// A log of when each snapshot was made. @@ -226,9 +606,17 @@ pub struct ViewVersionLog { impl ViewVersionLog { /// Returns the last updated timestamp as a DateTime with millisecond precision - pub fn timestamp(self) -> DateTime { + pub fn timestamp(&self) -> DateTime { Utc.timestamp_millis_opt(self.timestamp_ms).unwrap() } + + /// Returns a new ViewVersionLog with the current timestamp + pub fn now(version_id: i64) -> Self { + Self { + version_id, + timestamp_ms: Utc::now().timestamp_millis(), + } + } } pub(super) mod _serde { @@ -587,7 +975,7 @@ mod tests { let metadata = get_test_view_metadata("ViewMetadataV2Valid.json"); let metadata_builder = ViewMetadataBuilder::new(metadata); let uuid = Uuid::new_v4(); - let metadata = metadata_builder.assign_uuid(uuid).unwrap().build().unwrap(); + let metadata = metadata_builder.assign_uuid(uuid).build().unwrap(); assert_eq!(metadata.uuid(), uuid); } } diff --git a/crates/iceberg/src/spec/view_version.rs b/crates/iceberg/src/spec/view_version.rs index 0d2496825..33f422afd 100644 --- a/crates/iceberg/src/spec/view_version.rs +++ b/crates/iceberg/src/spec/view_version.rs @@ -25,7 +25,6 @@ use std::collections::HashMap; use std::sync::Arc; use typed_builder::TypedBuilder; -use super::view_metadata::ViewVersionLog; use crate::catalog::NamespaceIdent; use crate::spec::{SchemaId, SchemaRef, ViewMetadata}; use crate::{Error, ErrorKind}; @@ -40,20 +39,20 @@ pub type ViewVersionRef = Arc; /// A view versions represents the definition of a view at a specific point in time. pub struct ViewVersion { /// A unique long ID - version_id: i64, + pub version_id: i64, /// ID of the schema for the view version - schema_id: SchemaId, + pub schema_id: SchemaId, /// Timestamp when the version was created (ms from epoch) - timestamp_ms: i64, + pub timestamp_ms: i64, /// A string to string map of summary metadata about the version - summary: HashMap, + pub summary: HashMap, /// A list of representations for the view definition. - representations: ViewRepresentations, + pub representations: ViewRepresentations, /// Catalog name to use when a reference in the SELECT does not contain a catalog #[builder(default = None)] - default_catalog: Option, + pub default_catalog: Option, /// Namespace to use when a reference in the SELECT is a single identifier - default_namespace: NamespaceIdent, + pub default_namespace: NamespaceIdent, } impl ViewVersion { @@ -112,13 +111,6 @@ impl ViewVersion { .cloned(); r } - - pub(crate) fn log(&self) -> ViewVersionLog { - ViewVersionLog { - timestamp_ms: self.timestamp_ms, - version_id: self.version_id, - } - } } /// A list of view representations.