diff --git a/README.md b/README.md index b00026b8d8..b7a26b8a42 100644 --- a/README.md +++ b/README.md @@ -136,8 +136,8 @@ of features outlined in the Delta [protocol][protocol] is also [tracked](#protoc | -------------------- | :-----: | :-----: | ---------------------------------------------------------------- | | Local | ![done] | ![done] | | | S3 - AWS | ![done] | ![done] | requires lock for concurrent writes | -| S3 - MinIO | ![done] | ![done] | requires lock for concurrent writes | -| S3 - R2 | ![done] | ![done] | No lock required when using `AmazonS3ConfigKey::CopyIfNotExists` | +| S3 - MinIO | ![done] | ![done] | No lock required when using `AmazonS3ConfigKey::ConditionalPut` with `storage_options = {"conditional_put":"etag"}` | +| S3 - R2 | ![done] | ![done] | No lock required when using `AmazonS3ConfigKey::ConditionalPut` with `storage_options = {"conditional_put":"etag"}` | | Azure Blob | ![done] | ![done] | | | Azure ADLS Gen2 | ![done] | ![done] | | | Microsoft OneLake | ![done] | ![done] | | diff --git a/crates/aws/src/lib.rs b/crates/aws/src/lib.rs index a0a99c01f0..720a1e6a07 100644 --- a/crates/aws/src/lib.rs +++ b/crates/aws/src/lib.rs @@ -29,7 +29,7 @@ use std::{ }; use tracing::debug; -use deltalake_core::logstore::{logstores, LogStore, LogStoreFactory}; +use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory}; use deltalake_core::storage::{factories, url_prefix_handler, ObjectStoreRef, StorageOptions}; use deltalake_core::{DeltaResult, Path}; use url::Url; @@ -49,23 +49,28 @@ impl LogStoreFactory for S3LogStoreFactory { ) -> DeltaResult> { let store = url_prefix_handler(store, Path::parse(location.path())?); + // With conditional put in S3-like API we can use the deltalake default logstore which use PutIfAbsent + if options + .0 + .contains_key(AmazonS3ConfigKey::ConditionalPut.as_ref()) + { + debug!("S3LogStoreFactory has been asked to create a default LogStore where the underlying store has Conditonal Put enabled - no locking provider required"); + return Ok(default_logstore(store, location, options)); + } + if options .0 .contains_key(AmazonS3ConfigKey::CopyIfNotExists.as_ref()) { debug!("S3LogStoreFactory has been asked to create a LogStore where the underlying store has copy-if-not-exists enabled - no locking provider required"); - return Ok(deltalake_core::logstore::default_logstore( - store, location, options, - )); + return Ok(logstore::default_s3_logstore(store, location, options)); } let s3_options = S3StorageOptions::from_map(&options.0)?; if s3_options.locking_provider.as_deref() != Some("dynamodb") { debug!("S3LogStoreFactory has been asked to create a LogStore without the dynamodb locking provider"); - return Ok(deltalake_core::logstore::default_logstore( - store, location, options, - )); + return Ok(logstore::default_s3_logstore(store, location, options)); } Ok(Arc::new(logstore::S3DynamoDbLogStore::try_new( @@ -732,7 +737,7 @@ mod tests { let logstore = factory .with_options(Arc::new(store), &url, &StorageOptions::from(HashMap::new())) .unwrap(); - assert_eq!(logstore.name(), "DefaultLogStore"); + assert_eq!(logstore.name(), "S3LogStore"); } #[test] diff --git a/crates/aws/src/logstore/default_logstore.rs b/crates/aws/src/logstore/default_logstore.rs new file mode 100644 index 0000000000..a5688141c2 --- /dev/null +++ b/crates/aws/src/logstore/default_logstore.rs @@ -0,0 +1,113 @@ +//! Default implementation of [`LogStore`] for S3 storage backends + +use std::sync::Arc; + +use bytes::Bytes; +use deltalake_core::{ + logstore::{ + abort_commit_entry, get_latest_version, read_commit_entry, write_commit_entry, + CommitOrBytes, LogStore, LogStoreConfig, + }, + operations::transaction::TransactionError, + storage::{ObjectStoreRef, StorageOptions}, + DeltaResult, +}; +use object_store::{Error as ObjectStoreError, ObjectStore}; +use url::Url; + +/// Return the [S3LogStore] implementation with the provided configuration options +pub fn default_s3_logstore( + store: ObjectStoreRef, + location: &Url, + options: &StorageOptions, +) -> Arc { + Arc::new(S3LogStore::new( + store, + LogStoreConfig { + location: location.clone(), + options: options.clone(), + }, + )) +} + +/// Default [`LogStore`] implementation +#[derive(Debug, Clone)] +pub struct S3LogStore { + pub(crate) storage: Arc, + config: LogStoreConfig, +} + +impl S3LogStore { + /// Create a new instance of [`S3LogStore`] + /// + /// # Arguments + /// + /// * `storage` - A shared reference to an [`object_store::ObjectStore`] with "/" pointing at delta table root (i.e. where `_delta_log` is located). + /// * `location` - A url corresponding to the storage location of `storage`. + pub fn new(storage: ObjectStoreRef, config: LogStoreConfig) -> Self { + Self { storage, config } + } +} + +#[async_trait::async_trait] +impl LogStore for S3LogStore { + fn name(&self) -> String { + "S3LogStore".into() + } + + async fn read_commit_entry(&self, version: i64) -> DeltaResult> { + read_commit_entry(self.storage.as_ref(), version).await + } + + /// Tries to commit a prepared commit file. Returns [`TransactionError`] + /// if the given `version` already exists. The caller should handle the retry logic itself. + /// This is low-level transaction API. If user does not want to maintain the commit loop then + /// the `DeltaTransaction.commit` is desired to be used as it handles `try_commit_transaction` + /// with retry logic. + async fn write_commit_entry( + &self, + version: i64, + commit_or_bytes: CommitOrBytes, + ) -> Result<(), TransactionError> { + match commit_or_bytes { + CommitOrBytes::TmpCommit(tmp_commit) => { + Ok(write_commit_entry(&self.object_store(), version, &tmp_commit).await?) + } + _ => unreachable!(), // S3 Log Store should never receive bytes + } + .map_err(|err| -> TransactionError { + match err { + ObjectStoreError::AlreadyExists { .. } => { + TransactionError::VersionAlreadyExists(version) + } + _ => TransactionError::from(err), + } + })?; + Ok(()) + } + + async fn abort_commit_entry( + &self, + version: i64, + commit_or_bytes: CommitOrBytes, + ) -> Result<(), TransactionError> { + match &commit_or_bytes { + CommitOrBytes::TmpCommit(tmp_commit) => { + abort_commit_entry(self.storage.as_ref(), version, tmp_commit).await + } + _ => unreachable!(), // S3 Log Store should never receive bytes + } + } + + async fn get_latest_version(&self, current_version: i64) -> DeltaResult { + get_latest_version(self, current_version).await + } + + fn object_store(&self) -> Arc { + self.storage.clone() + } + + fn config(&self) -> &LogStoreConfig { + &self.config + } +} diff --git a/crates/aws/src/logstore.rs b/crates/aws/src/logstore/dynamodb_logstore.rs similarity index 95% rename from crates/aws/src/logstore.rs rename to crates/aws/src/logstore/dynamodb_logstore.rs index fe569256ee..5307040538 100644 --- a/crates/aws/src/logstore.rs +++ b/crates/aws/src/logstore/dynamodb_logstore.rs @@ -199,8 +199,12 @@ impl LogStore for S3DynamoDbLogStore { async fn write_commit_entry( &self, version: i64, - tmp_commit: &Path, + commit_or_bytes: CommitOrBytes, ) -> Result<(), TransactionError> { + let tmp_commit = match commit_or_bytes { + CommitOrBytes::TmpCommit(tmp_commit) => tmp_commit, + _ => unreachable!(), // S3DynamoDBLogstore should never get Bytes + }; let entry = CommitEntry::new(version, tmp_commit.clone()); debug!("Writing commit entry for {self:?}: {entry:?}"); // create log entry in dynamo db: complete = false, no expireTime @@ -244,8 +248,12 @@ impl LogStore for S3DynamoDbLogStore { async fn abort_commit_entry( &self, version: i64, - tmp_commit: &Path, + commit_or_bytes: CommitOrBytes, ) -> Result<(), TransactionError> { + let tmp_commit = match commit_or_bytes { + CommitOrBytes::TmpCommit(tmp_commit) => tmp_commit, + _ => unreachable!(), // S3DynamoDBLogstore should never get Bytes + }; self.lock_client .delete_commit_entry(version, &self.table_path) .await @@ -266,7 +274,7 @@ impl LogStore for S3DynamoDbLogStore { }, })?; - abort_commit_entry(&self.storage, version, tmp_commit).await?; + abort_commit_entry(&self.storage, version, &tmp_commit).await?; Ok(()) } @@ -309,13 +317,3 @@ pub enum RepairLogEntryResult { /// Both parts of the repair process where already carried. AlreadyCompleted, } - -/// Represents the possible, positive outcomes of calling `DynamoDbClient::try_create_lock_table()` -#[derive(Debug, PartialEq)] -pub enum CreateLockTableResult { - /// Table created successfully. - TableCreated, - /// Table was not created because it already exists. - /// Does not imply that the table has the correct schema. - TableAlreadyExists, -} diff --git a/crates/aws/src/logstore/mod.rs b/crates/aws/src/logstore/mod.rs new file mode 100644 index 0000000000..e5d7f87aec --- /dev/null +++ b/crates/aws/src/logstore/mod.rs @@ -0,0 +1,11 @@ +//! Contains the different logstore implementations for S3. +//! - S3LogStore (used when copy-if-not-exists or unsafe_rename is passed) +//! - S3DynamoDBLogStore (used when DynamoDB is the locking client) + +mod default_logstore; +mod dynamodb_logstore; + +pub use default_logstore::default_s3_logstore; +pub use default_logstore::S3LogStore; +pub use dynamodb_logstore::RepairLogEntryResult; +pub use dynamodb_logstore::S3DynamoDbLogStore; diff --git a/crates/aws/src/storage.rs b/crates/aws/src/storage.rs index d5609d321d..1f175f031d 100644 --- a/crates/aws/src/storage.rs +++ b/crates/aws/src/storage.rs @@ -88,10 +88,13 @@ fn aws_storage_handler( store: ObjectStoreRef, options: &StorageOptions, ) -> DeltaResult { - // If the copy-if-not-exists env var is set, we don't need to instantiate a locking client or check for allow-unsafe-rename. + // If the copy-if-not-exists env var is set or ConditionalPut is set, we don't need to instantiate a locking client or check for allow-unsafe-rename. if options .0 .contains_key(AmazonS3ConfigKey::CopyIfNotExists.as_ref()) + || options + .0 + .contains_key(AmazonS3ConfigKey::ConditionalPut.as_ref()) { Ok(store) } else { diff --git a/crates/aws/tests/integration_s3_dynamodb.rs b/crates/aws/tests/integration_s3_dynamodb.rs index 57eb44ea24..b933735b4f 100644 --- a/crates/aws/tests/integration_s3_dynamodb.rs +++ b/crates/aws/tests/integration_s3_dynamodb.rs @@ -7,10 +7,10 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use aws_sdk_dynamodb::types::BillingMode; use deltalake_aws::logstore::{RepairLogEntryResult, S3DynamoDbLogStore}; -use deltalake_aws::storage::{s3_constants, S3StorageOptions}; +use deltalake_aws::storage::S3StorageOptions; use deltalake_aws::{CommitEntry, DynamoDbConfig, DynamoDbLockClient}; use deltalake_core::kernel::{Action, Add, DataType, PrimitiveType, StructField, StructType}; -use deltalake_core::logstore::LogStore; +use deltalake_core::logstore::{CommitOrBytes, LogStore}; use deltalake_core::operations::transaction::CommitBuilder; use deltalake_core::protocol::{DeltaOperation, SaveMode}; use deltalake_core::storage::commit_uri_from_version; @@ -198,7 +198,10 @@ async fn test_abort_commit_entry() -> TestResult<()> { let entry = create_incomplete_commit_entry(&table, 1, "unfinished_commit").await?; log_store - .abort_commit_entry(entry.version, &entry.temp_path) + .abort_commit_entry( + entry.version, + CommitOrBytes::TmpCommit(entry.temp_path.clone()), + ) .await?; // The entry should have been aborted - the latest entry should be one version lower @@ -213,7 +216,7 @@ async fn test_abort_commit_entry() -> TestResult<()> { // Test abort commit is idempotent - still works if already aborted log_store - .abort_commit_entry(entry.version, &entry.temp_path) + .abort_commit_entry(entry.version, CommitOrBytes::TmpCommit(entry.temp_path)) .await?; Ok(()) @@ -244,7 +247,10 @@ async fn test_abort_commit_entry_fail_to_delete_entry() -> TestResult<()> { // Abort will fail since we marked the entry as complete assert!(matches!( log_store - .abort_commit_entry(entry.version, &entry.temp_path) + .abort_commit_entry( + entry.version, + CommitOrBytes::TmpCommit(entry.temp_path.clone()) + ) .await, Err(_), )); @@ -346,7 +352,12 @@ async fn create_incomplete_commit_entry( .into_prepared_commit_future() .await?; - let commit_entry = CommitEntry::new(version, prepared.path().to_owned()); + let tmp_commit = match prepared.commit_or_bytes() { + CommitOrBytes::TmpCommit(tmp_commit) => tmp_commit, + _ => unreachable!(), + }; + + let commit_entry = CommitEntry::new(version, tmp_commit.to_owned()); make_client()? .put_commit_entry(&table.table_uri(), &commit_entry) .await?; diff --git a/crates/core/src/logstore/default_logstore.rs b/crates/core/src/logstore/default_logstore.rs index 8fd4f52beb..79a1c76653 100644 --- a/crates/core/src/logstore/default_logstore.rs +++ b/crates/core/src/logstore/default_logstore.rs @@ -1,12 +1,25 @@ //! Default implementation of [`LogStore`] for storage backends with atomic put-if-absent operation -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use bytes::Bytes; -use object_store::{path::Path, ObjectStore}; +use object_store::{Attributes, Error as ObjectStoreError, ObjectStore, PutOptions, TagSet}; -use super::{LogStore, LogStoreConfig}; -use crate::{operations::transaction::TransactionError, storage::ObjectStoreRef, DeltaResult}; +use super::{CommitOrBytes, LogStore, LogStoreConfig}; +use crate::{ + operations::transaction::TransactionError, + storage::{commit_uri_from_version, ObjectStoreRef}, + DeltaResult, +}; + +fn put_options() -> &'static PutOptions { + static PUT_OPTS: OnceLock = OnceLock::new(); + PUT_OPTS.get_or_init(|| PutOptions { + mode: object_store::PutMode::Create, // Creates if file doesn't exists yet + tags: TagSet::default(), + attributes: Attributes::default(), + }) +} /// Default [`LogStore`] implementation #[derive(Debug, Clone)] @@ -45,17 +58,39 @@ impl LogStore for DefaultLogStore { async fn write_commit_entry( &self, version: i64, - tmp_commit: &Path, + commit_or_bytes: CommitOrBytes, ) -> Result<(), TransactionError> { - super::write_commit_entry(self.storage.as_ref(), version, tmp_commit).await + match commit_or_bytes { + CommitOrBytes::LogBytes(log_bytes) => self + .object_store() + .put_opts( + &commit_uri_from_version(version), + log_bytes.into(), + put_options().clone(), + ) + .await + .map_err(|err| -> TransactionError { + match err { + ObjectStoreError::AlreadyExists { .. } => { + TransactionError::VersionAlreadyExists(version) + } + _ => TransactionError::from(err), + } + })?, + _ => unreachable!(), // Default log store should never get a tmp_commit, since this is for conditional put stores + }; + Ok(()) } async fn abort_commit_entry( &self, - version: i64, - tmp_commit: &Path, + _version: i64, + commit_or_bytes: CommitOrBytes, ) -> Result<(), TransactionError> { - super::abort_commit_entry(self.storage.as_ref(), version, tmp_commit).await + match &commit_or_bytes { + CommitOrBytes::LogBytes(_) => Ok(()), + _ => unreachable!(), // Default log store should never get a tmp_commit, since this is for conditional put stores + } } async fn get_latest_version(&self, current_version: i64) -> DeltaResult { diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index 94f88b5944..51191d77d0 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -151,6 +151,15 @@ pub fn logstore_with( )) } +/// Holder whether it's tmp_commit path or commit bytes +#[derive(Clone)] +pub enum CommitOrBytes { + /// Path of the tmp commit, to be used by logstores which use CopyIfNotExists + TmpCommit(Path), + /// Bytes of the log, to be used by logstoers which use Conditional Put + LogBytes(Bytes), +} + /// Configuration parameters for a log store #[derive(Debug, Clone)] pub struct LogStoreConfig { @@ -190,14 +199,14 @@ pub trait LogStore: Sync + Send { async fn write_commit_entry( &self, version: i64, - tmp_commit: &Path, + commit_or_bytes: CommitOrBytes, ) -> Result<(), TransactionError>; /// Abort the commit entry for the given version. async fn abort_commit_entry( &self, version: i64, - tmp_commit: &Path, + commit_or_bytes: CommitOrBytes, ) -> Result<(), TransactionError>; /// Find latest version currently stored in the delta log. diff --git a/crates/core/src/operations/restore.rs b/crates/core/src/operations/restore.rs index 71d478ca68..498edc67c0 100644 --- a/crates/core/src/operations/restore.rs +++ b/crates/core/src/operations/restore.rs @@ -272,14 +272,19 @@ async fn execute( .await?; let commit_version = snapshot.version() + 1; - let commit = prepared_commit.path(); - match log_store.write_commit_entry(commit_version, commit).await { + let commit_bytes = prepared_commit.commit_or_bytes(); + match log_store + .write_commit_entry(commit_version, commit_bytes.clone()) + .await + { Ok(_) => {} Err(err @ TransactionError::VersionAlreadyExists(_)) => { return Err(err.into()); } Err(err) => { - log_store.abort_commit_entry(commit_version, commit).await?; + log_store + .abort_commit_entry(commit_version, commit_bytes.clone()) + .await?; return Err(err.into()); } } diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index e0902f96e1..6a601e0b73 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -75,11 +75,12 @@ //! use std::collections::HashMap; +use bytes::Bytes; use chrono::Utc; use conflict_checker::ConflictChecker; use futures::future::BoxFuture; use object_store::path::Path; -use object_store::{Error as ObjectStoreError, ObjectStore}; +use object_store::Error as ObjectStoreError; use serde_json::Value; use self::conflict_checker::{CommitConflictError, TransactionInfo, WinningCommitSummary}; @@ -89,8 +90,9 @@ use crate::kernel::{ Action, CommitInfo, EagerSnapshot, Metadata, Protocol, ReaderFeatures, Transaction, WriterFeatures, }; -use crate::logstore::LogStoreRef; +use crate::logstore::{CommitOrBytes, LogStoreRef}; use crate::protocol::DeltaOperation; +use crate::storage::ObjectStoreRef; use crate::table::config::TableConfig; use crate::table::state::DeltaTableState; use crate::{crate_version, DeltaResult}; @@ -476,23 +478,34 @@ impl<'a> PreCommit<'a> { pub fn into_prepared_commit_future(self) -> BoxFuture<'a, DeltaResult>> { let this = self; + // Write delta log entry as temporary file to storage. For the actual commit, + // the temporary file is moved (atomic rename) to the delta log folder within `commit` function. + async fn write_tmp_commit( + log_entry: Bytes, + store: ObjectStoreRef, + ) -> DeltaResult { + let token = uuid::Uuid::new_v4().to_string(); + let path = Path::from_iter([DELTA_LOG_FOLDER, &format!("_commit_{token}.json.tmp")]); + store.put(&path, log_entry.into()).await?; + Ok(CommitOrBytes::TmpCommit(path)) + } + Box::pin(async move { if let Some(table_reference) = this.table_data { PROTOCOL.can_commit(table_reference, &this.data.actions, &this.data.operation)?; } - - // Write delta log entry as temporary file to storage. For the actual commit, - // the temporary file is moved (atomic rename) to the delta log folder within `commit` function. let log_entry = this.data.get_bytes()?; - let token = uuid::Uuid::new_v4().to_string(); - let path = Path::from_iter([DELTA_LOG_FOLDER, &format!("_commit_{token}.json.tmp")]); - this.log_store - .object_store() - .put(&path, log_entry.into()) - .await?; + + // With the DefaultLogStore, we just pass the bytes around, since we use conditionalPuts + // Other stores will use tmp_commits + let commit_or_bytes = if this.log_store.name() == "DefaultLogStore" { + CommitOrBytes::LogBytes(log_entry) + } else { + write_tmp_commit(log_entry, this.log_store.object_store()).await? + }; Ok(PreparedCommit { - path, + commit_or_bytes, log_store: this.log_store, table_data: this.table_data, max_retries: this.max_retries, @@ -503,9 +516,9 @@ impl<'a> PreCommit<'a> { } } -/// Represents a inflight commit with a temporary commit marker on the log store +/// Represents a inflight commit pub struct PreparedCommit<'a> { - path: Path, + commit_or_bytes: CommitOrBytes, log_store: LogStoreRef, data: CommitData, table_data: Option<&'a dyn TableReference>, @@ -515,8 +528,8 @@ pub struct PreparedCommit<'a> { impl<'a> PreparedCommit<'a> { /// The temporary commit file created - pub fn path(&self) -> &Path { - &self.path + pub fn commit_or_bytes(&self) -> &CommitOrBytes { + &self.commit_or_bytes } } @@ -528,10 +541,12 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { let this = self; Box::pin(async move { - let tmp_commit = &this.path; + let commit_or_bytes = this.commit_or_bytes; if this.table_data.is_none() { - this.log_store.write_commit_entry(0, tmp_commit).await?; + this.log_store + .write_commit_entry(0, commit_or_bytes.clone()) + .await?; return Ok(PostCommit { version: 0, data: this.data, @@ -549,7 +564,11 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { let mut attempt_number = 1; while attempt_number <= this.max_retries { let version = read_snapshot.version() + attempt_number as i64; - match this.log_store.write_commit_entry(version, tmp_commit).await { + match this + .log_store + .write_commit_entry(version, commit_or_bytes.clone()) + .await + { Ok(()) => { return Ok(PostCommit { version, @@ -590,7 +609,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { } Err(err) => { this.log_store - .abort_commit_entry(version, tmp_commit) + .abort_commit_entry(version, commit_or_bytes) .await?; return Err(TransactionError::CommitConflict(err).into()); } @@ -598,7 +617,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { } Err(err) => { this.log_store - .abort_commit_entry(version, tmp_commit) + .abort_commit_entry(version, commit_or_bytes) .await?; return Err(err.into()); } @@ -732,7 +751,7 @@ mod tests { logstore::{default_logstore::DefaultLogStore, LogStore}, storage::commit_uri_from_version, }; - use object_store::{memory::InMemory, PutPayload}; + use object_store::{memory::InMemory, ObjectStore, PutPayload}; use url::Url; #[test] @@ -754,16 +773,19 @@ mod tests { options: HashMap::new().into(), }, ); - let tmp_path = Path::from("_delta_log/tmp"); let version_path = Path::from("_delta_log/00000000000000000000.json"); - store.put(&tmp_path, PutPayload::new()).await.unwrap(); store.put(&version_path, PutPayload::new()).await.unwrap(); - let res = log_store.write_commit_entry(0, &tmp_path).await; + let res = log_store + .write_commit_entry(0, CommitOrBytes::LogBytes(PutPayload::new().into())) + .await; // fails if file version already exists assert!(res.is_err()); // succeeds for next version - log_store.write_commit_entry(1, &tmp_path).await.unwrap(); + log_store + .write_commit_entry(1, CommitOrBytes::LogBytes(PutPayload::new().into())) + .await + .unwrap(); } } diff --git a/docs/integrations/object-storage/s3-like.md b/docs/integrations/object-storage/s3-like.md new file mode 100644 index 0000000000..4d32f7c41b --- /dev/null +++ b/docs/integrations/object-storage/s3-like.md @@ -0,0 +1,83 @@ +# CloudFlare R2 & Minio + +`delta-rs` offers native support for using Cloudflare R2 and Minio's as storage backend. R2 and Minio support conditional puts, however we have to pass this flag into the storage options. See the example blow + +You don’t need to install any extra dependencies to red/write Delta tables to S3 with engines that use `delta-rs`. You do need to configure your AWS access credentials correctly. + +## Passing S3 Credentials + +You can pass your AWS credentials explicitly by using: + +- the `storage_options `kwarg +- Environment variables + +## Example + +Let's work through an example with Polars. The same logic applies to other Python engines like Pandas, Daft, Dask, etc. + +Follow the steps below to use Delta Lake on S3 (R2/Minio) with Polars: + +1. Install Polars and deltalake. For example, using: + + `pip install polars deltalake` + +2. Create a dataframe with some toy data. + + `df = pl.DataFrame({'x': [1, 2, 3]})` + +3. Set your `storage_options` correctly. + +```python +storage_options = { + 'AWS_SECRET_ACCESS_KEY': , + 'conditional_put': 'etag', # Here we say to use conditional put, this provides safe concurrency. +} +``` + +4. Write data to Delta table using the `storage_options` kwarg. + + ```python + df.write_delta( + "s3://bucket/delta_table", + storage_options=storage_options, + ) + ``` + +## Delta Lake on S3: Safe Concurrent Writes + +You need a locking provider to ensure safe concurrent writes when writing Delta tables to S3. This is because S3 does not guarantee mutual exclusion. + +A locking provider guarantees that only one writer is able to create the same file. This prevents corrupted or conflicting data. + +`delta-rs` uses DynamoDB to guarantee safe concurrent writes. + +Run the code below in your terminal to create a DynamoDB table that will act as your locking provider. + +``` + aws dynamodb create-table \ + --table-name delta_log \ + --attribute-definitions AttributeName=tablePath,AttributeType=S AttributeName=fileName,AttributeType=S \ + --key-schema AttributeName=tablePath,KeyType=HASH AttributeName=fileName,KeyType=RANGE \ + --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 +``` + +If for some reason you don't want to use DynamoDB as your locking mechanism you can choose to set the `AWS_S3_ALLOW_UNSAFE_RENAME` variable to `true` in order to enable S3 unsafe writes. + +Read more in the [Usage](../../usage/writing/writing-to-s3-with-locking-provider.md) section. + +## Delta Lake on S3: Required permissions + +You need to have permissions to get, put and delete objects in the S3 bucket you're storing your data in. Please note that you must be allowed to delete objects even if you're just appending to the Delta Lake, because there are temporary files into the log folder that are deleted after usage. + +In AWS S3, you will need the following permissions: + +- s3:GetObject +- s3:PutObject +- s3:DeleteObject + +In DynamoDB, you will need the following permissions: + +- dynamodb:GetItem +- dynamodb:Query +- dynamodb:PutItem +- dynamodb:UpdateItem diff --git a/docs/integrations/object-storage/s3.md b/docs/integrations/object-storage/s3.md index 0814eab0c5..5b2034827f 100644 --- a/docs/integrations/object-storage/s3.md +++ b/docs/integrations/object-storage/s3.md @@ -62,9 +62,9 @@ storage_options = { ) ``` -## Delta Lake on S3: Safe Concurrent Writes +## Delta Lake on AWS S3: Safe Concurrent Writes -You need a locking provider to ensure safe concurrent writes when writing Delta tables to S3. This is because S3 does not guarantee mutual exclusion. +You need a locking provider to ensure safe concurrent writes when writing Delta tables to AWS S3. This is because AWS S3 does not guarantee mutual exclusion. A locking provider guarantees that only one writer is able to create the same file. This prevents corrupted or conflicting data. @@ -84,7 +84,7 @@ If for some reason you don't want to use DynamoDB as your locking mechanism you Read more in the [Usage](../../usage/writing/writing-to-s3-with-locking-provider.md) section. -## Delta Lake on S3: Required permissions +## Delta Lake on AWS S3: Required permissions You need to have permissions to get, put and delete objects in the S3 bucket you're storing your data in. Please note that you must be allowed to delete objects even if you're just appending to the Delta Lake, because there are temporary files into the log folder that are deleted after usage. diff --git a/docs/usage/writing/writing-to-s3-with-locking-provider.md b/docs/usage/writing/writing-to-s3-with-locking-provider.md index 0e42baa2e3..bbe2fa958c 100644 --- a/docs/usage/writing/writing-to-s3-with-locking-provider.md +++ b/docs/usage/writing/writing-to-s3-with-locking-provider.md @@ -101,13 +101,10 @@ In DynamoDB, you need those permissions: Unlike AWS S3, some S3 clients support atomic renames by passing some headers in requests. -For CloudFlare R2 passing this in the storage_options will enable concurrent writes: +For CloudFlare R2 or Minio passing this in the storage_options will enable concurrent writes: ```python storage_options = { - "copy_if_not_exists": "header: cf-copy-destination-if-none-match: *", + "conditional_put": "etag", } ``` - -Something similar can be done with MinIO but the header to pass should be verified -in the MinIO documentation. diff --git a/mkdocs.yml b/mkdocs.yml index b2b09cbe49..b0c8d3a0ac 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -84,6 +84,7 @@ nav: - Object Storage: - integrations/object-storage/hdfs.md - integrations/object-storage/s3.md + - integrations/object-storage/s3-like.md - Arrow: integrations/delta-lake-arrow.md - Daft: integrations/delta-lake-daft.md - Dagster: integrations/delta-lake-dagster.md