Skip to content

Commit

Permalink
Merge branch 'main' into issue-1957
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco authored Jan 2, 2024
2 parents d0b3dbb + 6d41b37 commit 1e2941d
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 14 deletions.
14 changes: 9 additions & 5 deletions crates/deltalake-core/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use futures::future::BoxFuture;
use serde_json::{Map, Value};
use serde_json::Value;

use super::transaction::{commit, PROTOCOL};
use crate::errors::{DeltaResult, DeltaTableError};
Expand Down Expand Up @@ -56,7 +56,7 @@ pub struct CreateBuilder {
actions: Vec<Action>,
log_store: Option<LogStoreRef>,
configuration: HashMap<String, Option<String>>,
metadata: Option<Map<String, Value>>,
metadata: Option<HashMap<String, Value>>,
}

impl Default for CreateBuilder {
Expand Down Expand Up @@ -181,8 +181,11 @@ impl CreateBuilder {
///
/// This might include provenance information such as an id of the
/// user that made the commit or the program that created it.
pub fn with_metadata(mut self, metadata: Map<String, Value>) -> Self {
self.metadata = Some(metadata);
pub fn with_metadata(
mut self,
metadata: impl IntoIterator<Item = (String, serde_json::Value)>,
) -> Self {
self.metadata = Some(HashMap::from_iter(metadata));
self
}

Expand Down Expand Up @@ -286,6 +289,7 @@ impl std::future::IntoFuture for CreateBuilder {
let this = self;
Box::pin(async move {
let mode = this.mode.clone();
let app_metadata = this.metadata.clone();
let (mut table, actions, operation) = this.into_table_and_actions()?;
let log_store = table.log_store();
let table_state = if log_store.is_delta_table_location().await? {
Expand All @@ -310,7 +314,7 @@ impl std::future::IntoFuture for CreateBuilder {
&actions,
operation,
table_state,
None,
app_metadata,
)
.await?;
table.load_version(version).await?;
Expand Down
11 changes: 11 additions & 0 deletions docs/api/delta_table/delta_table_alterer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
search:
boost: 10
---


# TableAlterer

::: deltalake.table.TableAlterer
options:
show_root_heading: true
2 changes: 2 additions & 0 deletions docs/api/delta_writer.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ search:

::: deltalake.write_deltalake

::: deltalake.WriterProperties

## Convert to Delta Tables
::: deltalake.convert_to_deltalake

Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ nav:
- api/delta_table/metadata.md
- api/delta_table/delta_table_merger.md
- api/delta_table/delta_table_optimizer.md
- api/delta_table/delta_table_alterer.md
- api/schema.md
- api/storage.md
- api/catalog.md
Expand Down
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-python"
version = "0.14.0"
version = "0.15.0"
authors = ["Qingping Hou <[email protected]>", "Will Jones <[email protected]>"]
homepage = "https://github.com/delta-io/delta-rs"
license = "Apache-2.0"
Expand Down
3 changes: 3 additions & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ class RawDeltaTable:
partition_by: List[str],
schema: pyarrow.Schema,
partitions_filters: Optional[FilterType],
custom_metadata: Optional[Dict[str, str]],
) -> None: ...
def cleanup_metadata(self) -> None: ...

Expand All @@ -149,6 +150,7 @@ def write_new_deltalake(
description: Optional[str],
configuration: Optional[Mapping[str, Optional[str]]],
storage_options: Optional[Dict[str, str]],
custom_metadata: Optional[Dict[str, str]],
) -> None: ...
def write_to_deltalake(
table_uri: str,
Expand All @@ -163,6 +165,7 @@ def write_to_deltalake(
configuration: Optional[Mapping[str, Optional[str]]],
storage_options: Optional[Dict[str, str]],
writer_properties: Optional[Dict[str, Optional[str]]],
custom_metadata: Optional[Dict[str, str]],
) -> None: ...
def convert_to_deltalake(
uri: str,
Expand Down
28 changes: 21 additions & 7 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -785,13 +785,22 @@ def update(
def optimize(
self,
) -> "TableOptimizer":
"""Namespace for all table optimize related methods.
Returns:
TableOptimizer: TableOptimizer Object
"""
return TableOptimizer(self)

@property
def alter(
self,
) -> "TableAlterer":
"""Namespace for all table alter related methods"""
"""Namespace for all table alter related methods.
Returns:
TableAlterer: TableAlterer Object
"""
return TableAlterer(self)

def merge(
Expand All @@ -808,6 +817,7 @@ def merge(
target_alias: Optional[str] = None,
error_on_type_mismatch: bool = True,
writer_properties: Optional[WriterProperties] = None,
large_dtypes: bool = True,
) -> "TableMerger":
"""Pass the source data which you want to merge on the target delta table, providing a
predicate in SQL query like format. You can also specify on what to do when the underlying data types do not
Expand All @@ -820,6 +830,7 @@ def merge(
target_alias: Alias for the target table
error_on_type_mismatch: specify if merge will return error if data types are mismatching :default = True
writer_properties: Pass writer properties to the Rust parquet writer
large_dtypes: If True, the data schema is kept in large_dtypes.
Returns:
TableMerger: TableMerger Object
Expand All @@ -835,16 +846,16 @@ def merge(
)

if isinstance(source, pyarrow.RecordBatchReader):
source = convert_pyarrow_recordbatchreader(source, large_dtypes=True)
source = convert_pyarrow_recordbatchreader(source, large_dtypes)
elif isinstance(source, pyarrow.RecordBatch):
source = convert_pyarrow_recordbatch(source, large_dtypes=True)
source = convert_pyarrow_recordbatch(source, large_dtypes)
elif isinstance(source, pyarrow.Table):
source = convert_pyarrow_table(source, large_dtypes=True)
source = convert_pyarrow_table(source, large_dtypes)
elif isinstance(source, ds.Dataset):
source = convert_pyarrow_dataset(source, large_dtypes=True)
source = convert_pyarrow_dataset(source, large_dtypes)
elif isinstance(source, pandas.DataFrame):
source = convert_pyarrow_table(
pyarrow.Table.from_pandas(source), large_dtypes=True
pyarrow.Table.from_pandas(source), large_dtypes
)
else:
raise TypeError(
Expand Down Expand Up @@ -1176,7 +1187,10 @@ def with_writer_properties(
write_batch_size: Optional[int] = None,
max_row_group_size: Optional[int] = None,
) -> "TableMerger":
"""Pass writer properties to the Rust parquet writer, see options https://arrow.apache.org/rust/parquet/file/properties/struct.WriterProperties.html:
"""
!!! warning "Deprecated"
Use `.merge(writer_properties = WriterProperties())` instead
Pass writer properties to the Rust parquet writer, see options https://arrow.apache.org/rust/parquet/file/properties/struct.WriterProperties.html:
Args:
data_page_size_limit: Limit DataPage size to this in bytes.
Expand Down
7 changes: 7 additions & 0 deletions python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def write_deltalake(
partition_filters: Optional[List[Tuple[str, str, Any]]] = ...,
large_dtypes: bool = ...,
engine: Literal["pyarrow"] = ...,
custom_metadata: Optional[Dict[str, str]] = ...,
) -> None:
...

Expand Down Expand Up @@ -128,6 +129,7 @@ def write_deltalake(
large_dtypes: bool = ...,
engine: Literal["rust"],
writer_properties: WriterProperties = ...,
custom_metadata: Optional[Dict[str, str]] = ...,
) -> None:
...

Expand Down Expand Up @@ -163,6 +165,7 @@ def write_deltalake(
large_dtypes: bool = False,
engine: Literal["pyarrow", "rust"] = "pyarrow",
writer_properties: Optional[WriterProperties] = None,
custom_metadata: Optional[Dict[str, str]] = None,
) -> None:
"""Write to a Delta Lake table
Expand Down Expand Up @@ -236,6 +239,7 @@ def write_deltalake(
engine: writer engine to write the delta table. `Rust` engine is still experimental but you may
see up to 4x performance improvements over pyarrow.
writer_properties: Pass writer properties to the Rust parquet writer.
custom_metadata: Custom metadata to add to the commitInfo.
"""
table, table_uri = try_get_table_and_table_uri(table_or_uri, storage_options)
if table is not None:
Expand Down Expand Up @@ -300,6 +304,7 @@ def write_deltalake(
writer_properties=writer_properties._to_dict()
if writer_properties
else None,
custom_metadata=custom_metadata,
)
if table:
table.update_incremental()
Expand Down Expand Up @@ -492,6 +497,7 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch:
description,
configuration,
storage_options,
custom_metadata,
)
else:
table._table.create_write_transaction(
Expand All @@ -500,6 +506,7 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch:
partition_by or [],
schema,
partition_filters,
custom_metadata,
)
table.update_incremental()
else:
Expand Down
21 changes: 20 additions & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,7 @@ impl RawDeltaTable {
partition_by: Vec<String>,
schema: PyArrowType<ArrowSchema>,
partitions_filters: Option<Vec<(&str, &str, PartitionFilterValue)>>,
custom_metadata: Option<HashMap<String, String>>,
) -> PyResult<()> {
let mode = mode.parse().map_err(PythonError::from)?;

Expand Down Expand Up @@ -803,6 +804,10 @@ impl RawDeltaTable {
partition_by: Some(partition_by),
predicate: None,
};

let app_metadata =
custom_metadata.map(|md| md.into_iter().map(|(k, v)| (k, v.into())).collect());

let store = self._table.log_store();

rt()?
Expand All @@ -811,7 +816,7 @@ impl RawDeltaTable {
&actions,
operation,
self._table.get_state(),
None,
app_metadata,
))
.map_err(PythonError::from)?;

Expand Down Expand Up @@ -1173,6 +1178,7 @@ fn write_to_deltalake(
configuration: Option<HashMap<String, Option<String>>>,
storage_options: Option<HashMap<String, String>>,
writer_properties: Option<HashMap<String, Option<String>>>,
custom_metadata: Option<HashMap<String, String>>,
) -> PyResult<()> {
let batches = data.0.map(|batch| batch.unwrap()).collect::<Vec<_>>();
let save_mode = mode.parse().map_err(PythonError::from)?;
Expand Down Expand Up @@ -1216,6 +1222,12 @@ fn write_to_deltalake(
builder = builder.with_configuration(config);
};

if let Some(metadata) = custom_metadata {
let json_metadata: Map<String, Value> =
metadata.into_iter().map(|(k, v)| (k, v.into())).collect();
builder = builder.with_metadata(json_metadata);
};

rt()?
.block_on(builder.into_future())
.map_err(PythonError::from)?;
Expand Down Expand Up @@ -1280,6 +1292,7 @@ fn write_new_deltalake(
description: Option<String>,
configuration: Option<HashMap<String, Option<String>>>,
storage_options: Option<HashMap<String, String>>,
custom_metadata: Option<HashMap<String, String>>,
) -> PyResult<()> {
let table = DeltaTableBuilder::from_uri(table_uri)
.with_storage_options(storage_options.unwrap_or_default())
Expand All @@ -1306,6 +1319,12 @@ fn write_new_deltalake(
builder = builder.with_configuration(config);
};

if let Some(metadata) = custom_metadata {
let json_metadata: Map<String, Value> =
metadata.into_iter().map(|(k, v)| (k, v.into())).collect();
builder = builder.with_metadata(json_metadata);
};

rt()?
.block_on(builder.into_future())
.map_err(PythonError::from)?;
Expand Down

0 comments on commit 1e2941d

Please sign in to comment.