Skip to content

Commit

Permalink
Merge branch 'main' into merge-partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco authored Dec 20, 2023
2 parents ba10c9a + c14d577 commit c69c18f
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 3 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ of features outlined in the Delta [protocol][protocol] is also [tracked](#protoc
| Version 2 | Column Invariants | ![done] |
| Version 3 | Enforce `delta.checkpoint.writeStatsAsJson` | [![open]][writer-rs] |
| Version 3 | Enforce `delta.checkpoint.writeStatsAsStruct` | [![open]][writer-rs] |
| Version 3 | CHECK constraints | [![open]][writer-rs] |
| Version 3 | CHECK constraints | [![semi-done]][check-constraints] |
| Version 4 | Change Data Feed | |
| Version 4 | Generated Columns | |
| Version 5 | Column Mapping | |
Expand All @@ -185,5 +185,6 @@ of features outlined in the Delta [protocol][protocol] is also [tracked](#protoc
[merge-py]: https://github.com/delta-io/delta-rs/issues/1357
[merge-rs]: https://github.com/delta-io/delta-rs/issues/850
[writer-rs]: https://github.com/delta-io/delta-rs/issues/851
[check-constraints]: https://github.com/delta-io/delta-rs/issues/1881
[onelake-rs]: https://github.com/delta-io/delta-rs/issues/1418
[protocol]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl IntegrationContext {
if let StorageIntegration::Google = integration {
gs_cli::prepare_env();
let base_url = std::env::var("GOOGLE_BASE_URL")?;
let token = json!({"gcs_base_url": base_url, "disable_oauth": true, "client_email": "", "private_key": ""});
let token = json!({"gcs_base_url": base_url, "disable_oauth": true, "client_email": "", "private_key": "", "private_key_id": ""});
let account_path = tmp_dir.path().join("gcs.json");
std::fs::write(&account_path, serde_json::to_vec(&token)?)?;
set_env_if_not_set(
Expand Down
4 changes: 4 additions & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ class RawDeltaTable:
min_commit_interval: Optional[int],
writer_properties: Optional[Dict[str, Optional[str]]],
) -> str: ...
def add_constraints(
self,
constraints: Dict[str, str],
) -> None: ...
def restore(
self,
target: Optional[Any],
Expand Down
44 changes: 44 additions & 0 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,13 @@ def optimize(
) -> "TableOptimizer":
return TableOptimizer(self)

@property
def alter(
self,
) -> "TableAlterer":
"""Namespace for all table alter related methods"""
return TableAlterer(self)

def merge(
self,
source: Union[
Expand Down Expand Up @@ -1609,6 +1616,43 @@ def execute(self) -> Dict[str, Any]:
return json.loads(metrics)


class TableAlterer:
"""API for various table alteration commands."""

def __init__(self, table: DeltaTable) -> None:
self.table = table

def add_constraint(self, constraints: Dict[str, str]) -> None:
"""
Add constraints to the table. Limited to `single constraint` at once.
Args:
constraints: mapping of constraint name to SQL-expression to evaluate on write
Example:
```python
from deltalake import DeltaTable
dt = DeltaTable("test_table_constraints")
dt.alter.add_constraint({
"value_gt_5": "value > 5",
})
```
**Check configuration**
```
dt.metadata().configuration
{'delta.constraints.value_gt_5': 'value > 5'}
```
"""
if len(constraints.keys()) > 1:
raise ValueError(
"""add_constraints is limited to a single constraint addition at once for now.
Please execute add_constraints multiple times with each time a different constraint."""
)

self.table._table.add_constraints(constraints)


class TableOptimizer:
"""API for various table optimization commands."""

Expand Down
2 changes: 1 addition & 1 deletion python/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ fn inner_to_py_err(err: DeltaTableError) -> PyErr {
DeltaTableError::InvalidJsonLog { .. } => DeltaProtocolError::new_err(err.to_string()),
DeltaTableError::InvalidStatsJson { .. } => DeltaProtocolError::new_err(err.to_string()),
DeltaTableError::InvalidData { violations } => {
DeltaProtocolError::new_err(format!("Inaviant violations: {:?}", violations))
DeltaProtocolError::new_err(format!("Invariant violations: {:?}", violations))
}

// commit errors
Expand Down
17 changes: 17 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use deltalake::datafusion::prelude::SessionContext;
use deltalake::delta_datafusion::DeltaDataChecker;
use deltalake::errors::DeltaTableError;
use deltalake::kernel::{Action, Add, Invariant, Remove, StructType};
use deltalake::operations::constraints::ConstraintBuilder;
use deltalake::operations::convert_to_delta::{ConvertToDeltaBuilder, PartitionStrategy};
use deltalake::operations::delete::DeleteBuilder;
use deltalake::operations::filesystem_check::FileSystemCheckBuilder;
Expand Down Expand Up @@ -388,6 +389,22 @@ impl RawDeltaTable {
Ok(serde_json::to_string(&metrics).unwrap())
}

#[pyo3(signature = (constraints))]
pub fn add_constraints(&mut self, constraints: HashMap<String, String>) -> PyResult<()> {
let mut cmd =
ConstraintBuilder::new(self._table.log_store(), self._table.get_state().clone());

for (col_name, expression) in constraints {
cmd = cmd.with_constraint(col_name.clone(), expression.clone());
}

let table = rt()?
.block_on(cmd.into_future())
.map_err(PythonError::from)?;
self._table.state = table.state;
Ok(())
}

#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (source,
predicate,
Expand Down
48 changes: 48 additions & 0 deletions python/tests/test_alter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import pathlib

import pyarrow as pa
import pytest

from deltalake import DeltaTable, write_deltalake
from deltalake.exceptions import DeltaError, DeltaProtocolError


def test_add_constraint(tmp_path: pathlib.Path, sample_table: pa.Table):
write_deltalake(tmp_path, sample_table)

dt = DeltaTable(tmp_path)

dt.alter.add_constraint({"check_price": "price >= 0"})

last_action = dt.history(1)[0]
assert last_action["operation"] == "ADD CONSTRAINT"
assert dt.version() == 1
assert dt.metadata().configuration == {
"delta.constraints.check_price": "price >= 0"
}

with pytest.raises(DeltaError):
# Invalid constraint
dt.alter.add_constraint({"check_price": "price < 0"})

with pytest.raises(DeltaProtocolError):
data = pa.table(
{
"id": pa.array(["1"]),
"price": pa.array([-1], pa.int64()),
"sold": pa.array(list(range(1)), pa.int32()),
"deleted": pa.array([False] * 1),
}
)
write_deltalake(tmp_path, data, engine="rust", mode="append")


def test_add_multiple_constraints(tmp_path: pathlib.Path, sample_table: pa.Table):
write_deltalake(tmp_path, sample_table)

dt = DeltaTable(tmp_path)

with pytest.raises(ValueError):
dt.alter.add_constraint(
{"check_price": "price >= 0", "check_price2": "price >= 0"}
)

0 comments on commit c69c18f

Please sign in to comment.