Skip to content

Commit

Permalink
Merge branch 'main' into cast-list-items-to-default-before-write
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco authored Dec 20, 2023
2 parents c7375a0 + 11ea2a5 commit 2dc6a13
Show file tree
Hide file tree
Showing 26 changed files with 1,371 additions and 312 deletions.
24 changes: 24 additions & 0 deletions .github/workflows/docs_release.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: docs_release

on:
pull_request:
types:
- closed
branches: [main]
paths:
- docs/**
- mkdocs.yml

jobs:
release-docs:
if: github.event.pull_request.merged == true
runs-on: ubuntu-latest
steps:
- name: Trigger the docs release event
uses: peter-evans/repository-dispatch@v2
with:
event-type: release-docs
client-payload: >
{
"tag": "${{ github.ref_name }}"
}
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ of features outlined in the Delta [protocol][protocol] is also [tracked](#protoc
| Version 2 | Column Invariants | ![done] |
| Version 3 | Enforce `delta.checkpoint.writeStatsAsJson` | [![open]][writer-rs] |
| Version 3 | Enforce `delta.checkpoint.writeStatsAsStruct` | [![open]][writer-rs] |
| Version 3 | CHECK constraints | [![open]][writer-rs] |
| Version 3 | CHECK constraints | [![semi-done]][check-constraints] |
| Version 4 | Change Data Feed | |
| Version 4 | Generated Columns | |
| Version 5 | Column Mapping | |
Expand All @@ -185,5 +185,6 @@ of features outlined in the Delta [protocol][protocol] is also [tracked](#protoc
[merge-py]: https://github.com/delta-io/delta-rs/issues/1357
[merge-rs]: https://github.com/delta-io/delta-rs/issues/850
[writer-rs]: https://github.com/delta-io/delta-rs/issues/851
[check-constraints]: https://github.com/delta-io/delta-rs/issues/1881
[onelake-rs]: https://github.com/delta-io/delta-rs/issues/1418
[protocol]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md
15 changes: 12 additions & 3 deletions crates/deltalake-core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,9 +347,10 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> {
mod test {
use arrow_schema::DataType as ArrowDataType;
use datafusion::prelude::SessionContext;
use datafusion_common::{DFSchema, ScalarValue};
use datafusion_common::{Column, DFSchema, ScalarValue};
use datafusion_expr::{col, decode, lit, substring, Cast, Expr, ExprSchemable};

use crate::delta_datafusion::DeltaSessionContext;
use crate::kernel::{DataType, PrimitiveType, StructField, StructType};
use crate::{DeltaOps, DeltaTable};

Expand Down Expand Up @@ -388,6 +389,11 @@ mod test {
DataType::Primitive(PrimitiveType::Integer),
true,
),
StructField::new(
"Value3".to_string(),
DataType::Primitive(PrimitiveType::Integer),
true,
),
StructField::new(
"modified".to_string(),
DataType::Primitive(PrimitiveType::String),
Expand Down Expand Up @@ -442,7 +448,10 @@ mod test {
}),
"arrow_cast(1, 'Int32')".to_string()
),
simple!(col("value").eq(lit(3_i64)), "value = 3".to_string()),
simple!(
Expr::Column(Column::from_qualified_name_ignore_case("Value3")).eq(lit(3_i64)),
"Value3 = 3".to_string()
),
simple!(col("active").is_true(), "active IS TRUE".to_string()),
simple!(col("active"), "active".to_string()),
simple!(col("active").eq(lit(true)), "active = true".to_string()),
Expand Down Expand Up @@ -536,7 +545,7 @@ mod test {
),
];

let session = SessionContext::new();
let session: SessionContext = DeltaSessionContext::default().into();

for test in tests {
let actual = fmt_expr_to_sql(&test.expr).unwrap();
Expand Down
38 changes: 35 additions & 3 deletions crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
use datafusion_proto::logical_plan::LogicalExtensionCodec;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use datafusion_sql::planner::ParserOptions;
use futures::TryStreamExt;

use itertools::Itertools;
use log::error;
Expand Down Expand Up @@ -1019,6 +1020,31 @@ pub(crate) fn logical_expr_to_physical_expr(
create_physical_expr(expr, &df_schema, schema, &execution_props).unwrap()
}

pub(crate) async fn execute_plan_to_batch(
state: &SessionState,
plan: Arc<dyn ExecutionPlan>,
) -> DeltaResult<arrow::record_batch::RecordBatch> {
let data =
futures::future::try_join_all((0..plan.output_partitioning().partition_count()).map(|p| {
let plan_copy = plan.clone();
let task_context = state.task_ctx().clone();
async move {
let batch_stream = plan_copy.execute(p, task_context)?;

let schema = batch_stream.schema();

let batches = batch_stream.try_collect::<Vec<_>>().await?;

DataFusionResult::<_>::Ok(arrow::compute::concat_batches(&schema, batches.iter())?)
}
}))
.await?;

let batch = arrow::compute::concat_batches(&plan.schema(), data.iter())?;

Ok(batch)
}

/// Responsible for checking batches of data conform to table's invariants.
#[derive(Clone)]
pub struct DeltaDataChecker {
Expand All @@ -1033,7 +1059,7 @@ impl DeltaDataChecker {
Self {
invariants,
constraints: vec![],
ctx: SessionContext::new(),
ctx: DeltaSessionContext::default().into(),
}
}

Expand All @@ -1042,10 +1068,16 @@ impl DeltaDataChecker {
Self {
constraints,
invariants: vec![],
ctx: SessionContext::new(),
ctx: DeltaSessionContext::default().into(),
}
}

/// Specify the Datafusion context
pub fn with_session_context(mut self, context: SessionContext) -> Self {
self.ctx = context;
self
}

/// Create a new DeltaDataChecker
pub fn new(snapshot: &DeltaTableState) -> Self {
let metadata = snapshot.metadata();
Expand All @@ -1059,7 +1091,7 @@ impl DeltaDataChecker {
Self {
invariants,
constraints,
ctx: SessionContext::new(),
ctx: DeltaSessionContext::default().into(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/kernel/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::collections::HashMap;
use serde::{Deserialize, Serialize};

pub(crate) mod schemas;
mod serde_path;
pub(crate) mod serde_path;
pub(crate) mod types;

pub use types::*;
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/kernel/actions/serde_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ fn encode_path(path: &str) -> String {
percent_encode(path.as_bytes(), INVALID).to_string()
}

fn decode_path(path: &str) -> Result<String, Utf8Error> {
pub fn decode_path(path: &str) -> Result<String, Utf8Error> {
Ok(percent_decode_str(path).decode_utf8()?.to_string())
}

Expand Down
Loading

0 comments on commit 2dc6a13

Please sign in to comment.