Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: conditional put for default log store (e.g. azure, gcs, minio, cloudflare) #2813

Merged
merged 7 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ of features outlined in the Delta [protocol][protocol] is also [tracked](#protoc
| -------------------- | :-----: | :-----: | ---------------------------------------------------------------- |
| Local | ![done] | ![done] | |
| S3 - AWS | ![done] | ![done] | requires lock for concurrent writes |
| S3 - MinIO | ![done] | ![done] | requires lock for concurrent writes |
| S3 - R2 | ![done] | ![done] | No lock required when using `AmazonS3ConfigKey::CopyIfNotExists` |
| S3 - MinIO | ![done] | ![done] | No lock required when using `AmazonS3ConfigKey::ConditionalPut` with `storage_options = {"conditional_put":"etag"}` |
| S3 - R2 | ![done] | ![done] | No lock required when using `AmazonS3ConfigKey::ConditionalPut` with `storage_options = {"conditional_put":"etag"}` |
| Azure Blob | ![done] | ![done] | |
| Azure ADLS Gen2 | ![done] | ![done] | |
| Microsoft OneLake | ![done] | ![done] | |
Expand Down
21 changes: 13 additions & 8 deletions crates/aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use std::{
};
use tracing::debug;

use deltalake_core::logstore::{logstores, LogStore, LogStoreFactory};
use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory};
use deltalake_core::storage::{factories, url_prefix_handler, ObjectStoreRef, StorageOptions};
use deltalake_core::{DeltaResult, Path};
use url::Url;
Expand All @@ -49,23 +49,28 @@ impl LogStoreFactory for S3LogStoreFactory {
) -> DeltaResult<Arc<dyn LogStore>> {
let store = url_prefix_handler(store, Path::parse(location.path())?);

// With conditional put in S3-like API we can use the deltalake default logstore which use PutIfAbsent
if options
.0
.contains_key(AmazonS3ConfigKey::ConditionalPut.as_ref())
{
debug!("S3LogStoreFactory has been asked to create a default LogStore where the underlying store has Conditonal Put enabled - no locking provider required");
return Ok(default_logstore(store, location, options));
}

if options
.0
.contains_key(AmazonS3ConfigKey::CopyIfNotExists.as_ref())
{
debug!("S3LogStoreFactory has been asked to create a LogStore where the underlying store has copy-if-not-exists enabled - no locking provider required");
return Ok(deltalake_core::logstore::default_logstore(
store, location, options,
));
return Ok(logstore::default_s3_logstore(store, location, options));
}

let s3_options = S3StorageOptions::from_map(&options.0)?;

if s3_options.locking_provider.as_deref() != Some("dynamodb") {
debug!("S3LogStoreFactory has been asked to create a LogStore without the dynamodb locking provider");
return Ok(deltalake_core::logstore::default_logstore(
store, location, options,
));
return Ok(logstore::default_s3_logstore(store, location, options));
}

Ok(Arc::new(logstore::S3DynamoDbLogStore::try_new(
Expand Down Expand Up @@ -732,7 +737,7 @@ mod tests {
let logstore = factory
.with_options(Arc::new(store), &url, &StorageOptions::from(HashMap::new()))
.unwrap();
assert_eq!(logstore.name(), "DefaultLogStore");
assert_eq!(logstore.name(), "S3LogStore");
}

#[test]
Expand Down
113 changes: 113 additions & 0 deletions crates/aws/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
//! Default implementation of [`LogStore`] for S3 storage backends

use std::sync::Arc;

use bytes::Bytes;
use deltalake_core::{
logstore::{
abort_commit_entry, get_latest_version, read_commit_entry, write_commit_entry,
CommitOrBytes, LogStore, LogStoreConfig,
},
operations::transaction::TransactionError,
storage::{ObjectStoreRef, StorageOptions},
DeltaResult,
};
use object_store::{Error as ObjectStoreError, ObjectStore};
use url::Url;

/// Return the [S3LogStore] implementation with the provided configuration options
pub fn default_s3_logstore(
store: ObjectStoreRef,
location: &Url,
options: &StorageOptions,
) -> Arc<dyn LogStore> {
Arc::new(S3LogStore::new(
store,
LogStoreConfig {
location: location.clone(),
options: options.clone(),
},
))
}

/// Default [`LogStore`] implementation
#[derive(Debug, Clone)]
pub struct S3LogStore {
pub(crate) storage: Arc<dyn ObjectStore>,
config: LogStoreConfig,
}

impl S3LogStore {
/// Create a new instance of [`S3LogStore`]
///
/// # Arguments
///
/// * `storage` - A shared reference to an [`object_store::ObjectStore`] with "/" pointing at delta table root (i.e. where `_delta_log` is located).
/// * `location` - A url corresponding to the storage location of `storage`.
pub fn new(storage: ObjectStoreRef, config: LogStoreConfig) -> Self {
Self { storage, config }
}
}

#[async_trait::async_trait]
impl LogStore for S3LogStore {
fn name(&self) -> String {
"S3LogStore".into()
}

async fn read_commit_entry(&self, version: i64) -> DeltaResult<Option<Bytes>> {
read_commit_entry(self.storage.as_ref(), version).await
}

/// Tries to commit a prepared commit file. Returns [`TransactionError`]
/// if the given `version` already exists. The caller should handle the retry logic itself.
/// This is low-level transaction API. If user does not want to maintain the commit loop then
/// the `DeltaTransaction.commit` is desired to be used as it handles `try_commit_transaction`
/// with retry logic.
async fn write_commit_entry(
&self,
version: i64,
commit_or_bytes: CommitOrBytes,
) -> Result<(), TransactionError> {
match commit_or_bytes {
CommitOrBytes::TmpCommit(tmp_commit) => {
Ok(write_commit_entry(&self.object_store(), version, &tmp_commit).await?)
}
_ => unreachable!(), // S3 Log Store should never receive bytes
}
.map_err(|err| -> TransactionError {
match err {
ObjectStoreError::AlreadyExists { .. } => {
TransactionError::VersionAlreadyExists(version)
}
_ => TransactionError::from(err),
}
})?;
Ok(())
}

async fn abort_commit_entry(
&self,
version: i64,
commit_or_bytes: CommitOrBytes,
) -> Result<(), TransactionError> {
match &commit_or_bytes {
CommitOrBytes::TmpCommit(tmp_commit) => {
abort_commit_entry(self.storage.as_ref(), version, tmp_commit).await
}
_ => unreachable!(), // S3 Log Store should never receive bytes
}
}

async fn get_latest_version(&self, current_version: i64) -> DeltaResult<i64> {
get_latest_version(self, current_version).await
}

fn object_store(&self) -> Arc<dyn ObjectStore> {
self.storage.clone()
}

fn config(&self) -> &LogStoreConfig {
&self.config
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,12 @@ impl LogStore for S3DynamoDbLogStore {
async fn write_commit_entry(
&self,
version: i64,
tmp_commit: &Path,
commit_or_bytes: CommitOrBytes,
) -> Result<(), TransactionError> {
let tmp_commit = match commit_or_bytes {
CommitOrBytes::TmpCommit(tmp_commit) => tmp_commit,
_ => unreachable!(), // S3DynamoDBLogstore should never get Bytes
};
let entry = CommitEntry::new(version, tmp_commit.clone());
debug!("Writing commit entry for {self:?}: {entry:?}");
// create log entry in dynamo db: complete = false, no expireTime
Expand Down Expand Up @@ -244,8 +248,12 @@ impl LogStore for S3DynamoDbLogStore {
async fn abort_commit_entry(
&self,
version: i64,
tmp_commit: &Path,
commit_or_bytes: CommitOrBytes,
) -> Result<(), TransactionError> {
let tmp_commit = match commit_or_bytes {
CommitOrBytes::TmpCommit(tmp_commit) => tmp_commit,
_ => unreachable!(), // S3DynamoDBLogstore should never get Bytes
};
self.lock_client
.delete_commit_entry(version, &self.table_path)
.await
Expand All @@ -266,7 +274,7 @@ impl LogStore for S3DynamoDbLogStore {
},
})?;

abort_commit_entry(&self.storage, version, tmp_commit).await?;
abort_commit_entry(&self.storage, version, &tmp_commit).await?;
Ok(())
}

Expand Down Expand Up @@ -309,13 +317,3 @@ pub enum RepairLogEntryResult {
/// Both parts of the repair process where already carried.
AlreadyCompleted,
}

/// Represents the possible, positive outcomes of calling `DynamoDbClient::try_create_lock_table()`
#[derive(Debug, PartialEq)]
pub enum CreateLockTableResult {
/// Table created successfully.
TableCreated,
/// Table was not created because it already exists.
/// Does not imply that the table has the correct schema.
TableAlreadyExists,
}
11 changes: 11 additions & 0 deletions crates/aws/src/logstore/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
//! Contains the different logstore implementations for S3.
//! - S3LogStore (used when copy-if-not-exists or unsafe_rename is passed)
//! - S3DynamoDBLogStore (used when DynamoDB is the locking client)

mod default_logstore;
mod dynamodb_logstore;

pub use default_logstore::default_s3_logstore;
pub use default_logstore::S3LogStore;
pub use dynamodb_logstore::RepairLogEntryResult;
pub use dynamodb_logstore::S3DynamoDbLogStore;
5 changes: 4 additions & 1 deletion crates/aws/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,13 @@ fn aws_storage_handler(
store: ObjectStoreRef,
options: &StorageOptions,
) -> DeltaResult<ObjectStoreRef> {
// If the copy-if-not-exists env var is set, we don't need to instantiate a locking client or check for allow-unsafe-rename.
// If the copy-if-not-exists env var is set or ConditionalPut is set, we don't need to instantiate a locking client or check for allow-unsafe-rename.
if options
.0
.contains_key(AmazonS3ConfigKey::CopyIfNotExists.as_ref())
|| options
.0
.contains_key(AmazonS3ConfigKey::ConditionalPut.as_ref())
{
Ok(store)
} else {
Expand Down
23 changes: 17 additions & 6 deletions crates/aws/tests/integration_s3_dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};

use aws_sdk_dynamodb::types::BillingMode;
use deltalake_aws::logstore::{RepairLogEntryResult, S3DynamoDbLogStore};
use deltalake_aws::storage::{s3_constants, S3StorageOptions};
use deltalake_aws::storage::S3StorageOptions;
use deltalake_aws::{CommitEntry, DynamoDbConfig, DynamoDbLockClient};
use deltalake_core::kernel::{Action, Add, DataType, PrimitiveType, StructField, StructType};
use deltalake_core::logstore::LogStore;
use deltalake_core::logstore::{CommitOrBytes, LogStore};
use deltalake_core::operations::transaction::CommitBuilder;
use deltalake_core::protocol::{DeltaOperation, SaveMode};
use deltalake_core::storage::commit_uri_from_version;
Expand Down Expand Up @@ -198,7 +198,10 @@ async fn test_abort_commit_entry() -> TestResult<()> {
let entry = create_incomplete_commit_entry(&table, 1, "unfinished_commit").await?;

log_store
.abort_commit_entry(entry.version, &entry.temp_path)
.abort_commit_entry(
entry.version,
CommitOrBytes::TmpCommit(entry.temp_path.clone()),
)
.await?;

// The entry should have been aborted - the latest entry should be one version lower
Expand All @@ -213,7 +216,7 @@ async fn test_abort_commit_entry() -> TestResult<()> {

// Test abort commit is idempotent - still works if already aborted
log_store
.abort_commit_entry(entry.version, &entry.temp_path)
.abort_commit_entry(entry.version, CommitOrBytes::TmpCommit(entry.temp_path))
.await?;

Ok(())
Expand Down Expand Up @@ -244,7 +247,10 @@ async fn test_abort_commit_entry_fail_to_delete_entry() -> TestResult<()> {
// Abort will fail since we marked the entry as complete
assert!(matches!(
log_store
.abort_commit_entry(entry.version, &entry.temp_path)
.abort_commit_entry(
entry.version,
CommitOrBytes::TmpCommit(entry.temp_path.clone())
)
.await,
Err(_),
));
Expand Down Expand Up @@ -346,7 +352,12 @@ async fn create_incomplete_commit_entry(
.into_prepared_commit_future()
.await?;

let commit_entry = CommitEntry::new(version, prepared.path().to_owned());
let tmp_commit = match prepared.commit_or_bytes() {
CommitOrBytes::TmpCommit(tmp_commit) => tmp_commit,
_ => unreachable!(),
};

let commit_entry = CommitEntry::new(version, tmp_commit.to_owned());
make_client()?
.put_commit_entry(&table.table_uri(), &commit_entry)
.await?;
Expand Down
53 changes: 44 additions & 9 deletions crates/core/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
//! Default implementation of [`LogStore`] for storage backends with atomic put-if-absent operation

use std::sync::Arc;
use std::sync::{Arc, OnceLock};

use bytes::Bytes;
use object_store::{path::Path, ObjectStore};
use object_store::{Attributes, Error as ObjectStoreError, ObjectStore, PutOptions, TagSet};

use super::{LogStore, LogStoreConfig};
use crate::{operations::transaction::TransactionError, storage::ObjectStoreRef, DeltaResult};
use super::{CommitOrBytes, LogStore, LogStoreConfig};
use crate::{
operations::transaction::TransactionError,
storage::{commit_uri_from_version, ObjectStoreRef},
DeltaResult,
};

fn put_options() -> &'static PutOptions {
static PUT_OPTS: OnceLock<PutOptions> = OnceLock::new();
ion-elgreco marked this conversation as resolved.
Show resolved Hide resolved
PUT_OPTS.get_or_init(|| PutOptions {
mode: object_store::PutMode::Create, // Creates if file doesn't exists yet
tags: TagSet::default(),
attributes: Attributes::default(),
})
}

/// Default [`LogStore`] implementation
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -45,17 +58,39 @@ impl LogStore for DefaultLogStore {
async fn write_commit_entry(
&self,
version: i64,
tmp_commit: &Path,
commit_or_bytes: CommitOrBytes,
) -> Result<(), TransactionError> {
super::write_commit_entry(self.storage.as_ref(), version, tmp_commit).await
match commit_or_bytes {
CommitOrBytes::LogBytes(log_bytes) => self
.object_store()
.put_opts(
&commit_uri_from_version(version),
log_bytes.into(),
put_options().clone(),
)
.await
.map_err(|err| -> TransactionError {
match err {
ObjectStoreError::AlreadyExists { .. } => {
TransactionError::VersionAlreadyExists(version)
}
_ => TransactionError::from(err),
}
})?,
_ => unreachable!(), // Default log store should never get a tmp_commit, since this is for conditional put stores
};
Ok(())
}

async fn abort_commit_entry(
&self,
version: i64,
tmp_commit: &Path,
_version: i64,
commit_or_bytes: CommitOrBytes,
) -> Result<(), TransactionError> {
super::abort_commit_entry(self.storage.as_ref(), version, tmp_commit).await
match &commit_or_bytes {
CommitOrBytes::LogBytes(_) => Ok(()),
_ => unreachable!(), // Default log store should never get a tmp_commit, since this is for conditional put stores
}
}

async fn get_latest_version(&self, current_version: i64) -> DeltaResult<i64> {
Expand Down
Loading
Loading