diff --git a/crates/deltalake-core/src/operations/create.rs b/crates/deltalake-core/src/operations/create.rs index 0dca038f4a..0e44fe215f 100644 --- a/crates/deltalake-core/src/operations/create.rs +++ b/crates/deltalake-core/src/operations/create.rs @@ -5,7 +5,7 @@ use std::collections::HashMap; use std::sync::Arc; use futures::future::BoxFuture; -use serde_json::{Map, Value}; +use serde_json::Value; use super::transaction::{commit, PROTOCOL}; use crate::errors::{DeltaResult, DeltaTableError}; @@ -56,7 +56,7 @@ pub struct CreateBuilder { actions: Vec, log_store: Option, configuration: HashMap>, - metadata: Option>, + metadata: Option>, } impl Default for CreateBuilder { @@ -181,8 +181,11 @@ impl CreateBuilder { /// /// This might include provenance information such as an id of the /// user that made the commit or the program that created it. - pub fn with_metadata(mut self, metadata: Map) -> Self { - self.metadata = Some(metadata); + pub fn with_metadata( + mut self, + metadata: impl IntoIterator, + ) -> Self { + self.metadata = Some(HashMap::from_iter(metadata)); self } @@ -286,6 +289,7 @@ impl std::future::IntoFuture for CreateBuilder { let this = self; Box::pin(async move { let mode = this.mode.clone(); + let app_metadata = this.metadata.clone(); let (mut table, actions, operation) = this.into_table_and_actions()?; let log_store = table.log_store(); let table_state = if log_store.is_delta_table_location().await? { @@ -310,7 +314,7 @@ impl std::future::IntoFuture for CreateBuilder { &actions, operation, table_state, - None, + app_metadata, ) .await?; table.load_version(version).await?; diff --git a/docs/api/delta_table/delta_table_alterer.md b/docs/api/delta_table/delta_table_alterer.md new file mode 100644 index 0000000000..d859f605e1 --- /dev/null +++ b/docs/api/delta_table/delta_table_alterer.md @@ -0,0 +1,11 @@ +--- +search: + boost: 10 +--- + + +# TableAlterer + +::: deltalake.table.TableAlterer + options: + show_root_heading: true \ No newline at end of file diff --git a/docs/api/delta_writer.md b/docs/api/delta_writer.md index 432a32b768..9b395d3604 100644 --- a/docs/api/delta_writer.md +++ b/docs/api/delta_writer.md @@ -8,6 +8,8 @@ search: ::: deltalake.write_deltalake +::: deltalake.WriterProperties + ## Convert to Delta Tables ::: deltalake.convert_to_deltalake diff --git a/mkdocs.yml b/mkdocs.yml index 4e713d73ec..a554378f4f 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -67,6 +67,7 @@ nav: - api/delta_table/metadata.md - api/delta_table/delta_table_merger.md - api/delta_table/delta_table_optimizer.md + - api/delta_table/delta_table_alterer.md - api/schema.md - api/storage.md - api/catalog.md diff --git a/python/Cargo.toml b/python/Cargo.toml index a9936a483c..dd3bcca1e9 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-python" -version = "0.14.0" +version = "0.15.0" authors = ["Qingping Hou ", "Will Jones "] homepage = "https://github.com/delta-io/delta-rs" license = "Apache-2.0" diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index b4d0ca8c3d..b893fc065b 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -135,6 +135,7 @@ class RawDeltaTable: partition_by: List[str], schema: pyarrow.Schema, partitions_filters: Optional[FilterType], + custom_metadata: Optional[Dict[str, str]], ) -> None: ... def cleanup_metadata(self) -> None: ... @@ -149,6 +150,7 @@ def write_new_deltalake( description: Optional[str], configuration: Optional[Mapping[str, Optional[str]]], storage_options: Optional[Dict[str, str]], + custom_metadata: Optional[Dict[str, str]], ) -> None: ... def write_to_deltalake( table_uri: str, @@ -163,6 +165,7 @@ def write_to_deltalake( configuration: Optional[Mapping[str, Optional[str]]], storage_options: Optional[Dict[str, str]], writer_properties: Optional[Dict[str, Optional[str]]], + custom_metadata: Optional[Dict[str, str]], ) -> None: ... def convert_to_deltalake( uri: str, diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 5adeaaa9dc..4f93a6aa61 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -785,13 +785,22 @@ def update( def optimize( self, ) -> "TableOptimizer": + """Namespace for all table optimize related methods. + + Returns: + TableOptimizer: TableOptimizer Object + """ return TableOptimizer(self) @property def alter( self, ) -> "TableAlterer": - """Namespace for all table alter related methods""" + """Namespace for all table alter related methods. + + Returns: + TableAlterer: TableAlterer Object + """ return TableAlterer(self) def merge( @@ -808,6 +817,7 @@ def merge( target_alias: Optional[str] = None, error_on_type_mismatch: bool = True, writer_properties: Optional[WriterProperties] = None, + large_dtypes: bool = True, ) -> "TableMerger": """Pass the source data which you want to merge on the target delta table, providing a predicate in SQL query like format. You can also specify on what to do when the underlying data types do not @@ -820,6 +830,7 @@ def merge( target_alias: Alias for the target table error_on_type_mismatch: specify if merge will return error if data types are mismatching :default = True writer_properties: Pass writer properties to the Rust parquet writer + large_dtypes: If True, the data schema is kept in large_dtypes. Returns: TableMerger: TableMerger Object @@ -835,16 +846,16 @@ def merge( ) if isinstance(source, pyarrow.RecordBatchReader): - source = convert_pyarrow_recordbatchreader(source, large_dtypes=True) + source = convert_pyarrow_recordbatchreader(source, large_dtypes) elif isinstance(source, pyarrow.RecordBatch): - source = convert_pyarrow_recordbatch(source, large_dtypes=True) + source = convert_pyarrow_recordbatch(source, large_dtypes) elif isinstance(source, pyarrow.Table): - source = convert_pyarrow_table(source, large_dtypes=True) + source = convert_pyarrow_table(source, large_dtypes) elif isinstance(source, ds.Dataset): - source = convert_pyarrow_dataset(source, large_dtypes=True) + source = convert_pyarrow_dataset(source, large_dtypes) elif isinstance(source, pandas.DataFrame): source = convert_pyarrow_table( - pyarrow.Table.from_pandas(source), large_dtypes=True + pyarrow.Table.from_pandas(source), large_dtypes ) else: raise TypeError( @@ -1176,7 +1187,10 @@ def with_writer_properties( write_batch_size: Optional[int] = None, max_row_group_size: Optional[int] = None, ) -> "TableMerger": - """Pass writer properties to the Rust parquet writer, see options https://arrow.apache.org/rust/parquet/file/properties/struct.WriterProperties.html: + """ + !!! warning "Deprecated" + Use `.merge(writer_properties = WriterProperties())` instead + Pass writer properties to the Rust parquet writer, see options https://arrow.apache.org/rust/parquet/file/properties/struct.WriterProperties.html: Args: data_page_size_limit: Limit DataPage size to this in bytes. diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 609a6487c6..7306a5705c 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -100,6 +100,7 @@ def write_deltalake( partition_filters: Optional[List[Tuple[str, str, Any]]] = ..., large_dtypes: bool = ..., engine: Literal["pyarrow"] = ..., + custom_metadata: Optional[Dict[str, str]] = ..., ) -> None: ... @@ -128,6 +129,7 @@ def write_deltalake( large_dtypes: bool = ..., engine: Literal["rust"], writer_properties: WriterProperties = ..., + custom_metadata: Optional[Dict[str, str]] = ..., ) -> None: ... @@ -163,6 +165,7 @@ def write_deltalake( large_dtypes: bool = False, engine: Literal["pyarrow", "rust"] = "pyarrow", writer_properties: Optional[WriterProperties] = None, + custom_metadata: Optional[Dict[str, str]] = None, ) -> None: """Write to a Delta Lake table @@ -236,6 +239,7 @@ def write_deltalake( engine: writer engine to write the delta table. `Rust` engine is still experimental but you may see up to 4x performance improvements over pyarrow. writer_properties: Pass writer properties to the Rust parquet writer. + custom_metadata: Custom metadata to add to the commitInfo. """ table, table_uri = try_get_table_and_table_uri(table_or_uri, storage_options) if table is not None: @@ -300,6 +304,7 @@ def write_deltalake( writer_properties=writer_properties._to_dict() if writer_properties else None, + custom_metadata=custom_metadata, ) if table: table.update_incremental() @@ -492,6 +497,7 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch: description, configuration, storage_options, + custom_metadata, ) else: table._table.create_write_transaction( @@ -500,6 +506,7 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch: partition_by or [], schema, partition_filters, + custom_metadata, ) table.update_incremental() else: diff --git a/python/src/lib.rs b/python/src/lib.rs index b5842e547a..0274f5fbac 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -741,6 +741,7 @@ impl RawDeltaTable { partition_by: Vec, schema: PyArrowType, partitions_filters: Option>, + custom_metadata: Option>, ) -> PyResult<()> { let mode = mode.parse().map_err(PythonError::from)?; @@ -803,6 +804,10 @@ impl RawDeltaTable { partition_by: Some(partition_by), predicate: None, }; + + let app_metadata = + custom_metadata.map(|md| md.into_iter().map(|(k, v)| (k, v.into())).collect()); + let store = self._table.log_store(); rt()? @@ -811,7 +816,7 @@ impl RawDeltaTable { &actions, operation, self._table.get_state(), - None, + app_metadata, )) .map_err(PythonError::from)?; @@ -1173,6 +1178,7 @@ fn write_to_deltalake( configuration: Option>>, storage_options: Option>, writer_properties: Option>>, + custom_metadata: Option>, ) -> PyResult<()> { let batches = data.0.map(|batch| batch.unwrap()).collect::>(); let save_mode = mode.parse().map_err(PythonError::from)?; @@ -1216,6 +1222,12 @@ fn write_to_deltalake( builder = builder.with_configuration(config); }; + if let Some(metadata) = custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + builder = builder.with_metadata(json_metadata); + }; + rt()? .block_on(builder.into_future()) .map_err(PythonError::from)?; @@ -1280,6 +1292,7 @@ fn write_new_deltalake( description: Option, configuration: Option>>, storage_options: Option>, + custom_metadata: Option>, ) -> PyResult<()> { let table = DeltaTableBuilder::from_uri(table_uri) .with_storage_options(storage_options.unwrap_or_default()) @@ -1306,6 +1319,12 @@ fn write_new_deltalake( builder = builder.with_configuration(config); }; + if let Some(metadata) = custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + builder = builder.with_metadata(json_metadata); + }; + rt()? .block_on(builder.into_future()) .map_err(PythonError::from)?;