From b786513efb4e737c056dad29ab34eb3d4f4ec9d8 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 22 Aug 2024 12:27:09 +0200 Subject: [PATCH] feat: conditional put for default log store --- crates/aws/src/logstore.rs | 14 ++++- crates/core/src/logstore/default_logstore.rs | 57 ++++++++++++++--- crates/core/src/logstore/mod.rs | 13 +++- crates/core/src/operations/restore.rs | 11 +++- crates/core/src/operations/transaction/mod.rs | 63 +++++++++++-------- 5 files changed, 116 insertions(+), 42 deletions(-) diff --git a/crates/aws/src/logstore.rs b/crates/aws/src/logstore.rs index fe569256ee..37a7ed0047 100644 --- a/crates/aws/src/logstore.rs +++ b/crates/aws/src/logstore.rs @@ -199,8 +199,12 @@ impl LogStore for S3DynamoDbLogStore { async fn write_commit_entry( &self, version: i64, - tmp_commit: &Path, + commit_or_bytes: CommitOrBytes, ) -> Result<(), TransactionError> { + let tmp_commit = match commit_or_bytes { + CommitOrBytes::TmpCommit(tmp_commit) => tmp_commit, + _ => unreachable!(), // S3DynamoDBLogstore should never get Bytes + }; let entry = CommitEntry::new(version, tmp_commit.clone()); debug!("Writing commit entry for {self:?}: {entry:?}"); // create log entry in dynamo db: complete = false, no expireTime @@ -244,8 +248,12 @@ impl LogStore for S3DynamoDbLogStore { async fn abort_commit_entry( &self, version: i64, - tmp_commit: &Path, + commit_or_bytes: CommitOrBytes, ) -> Result<(), TransactionError> { + let tmp_commit = match commit_or_bytes { + CommitOrBytes::TmpCommit(tmp_commit) => tmp_commit, + _ => unreachable!(), // S3DynamoDBLogstore should never get Bytes + }; self.lock_client .delete_commit_entry(version, &self.table_path) .await @@ -266,7 +274,7 @@ impl LogStore for S3DynamoDbLogStore { }, })?; - abort_commit_entry(&self.storage, version, tmp_commit).await?; + abort_commit_entry(&self.storage, version, &tmp_commit).await?; Ok(()) } diff --git a/crates/core/src/logstore/default_logstore.rs b/crates/core/src/logstore/default_logstore.rs index 8fd4f52beb..771c9cbc7b 100644 --- a/crates/core/src/logstore/default_logstore.rs +++ b/crates/core/src/logstore/default_logstore.rs @@ -1,12 +1,25 @@ //! Default implementation of [`LogStore`] for storage backends with atomic put-if-absent operation -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use bytes::Bytes; -use object_store::{path::Path, ObjectStore}; +use object_store::{Attributes, Error as ObjectStoreError, ObjectStore, PutOptions, TagSet}; -use super::{LogStore, LogStoreConfig}; -use crate::{operations::transaction::TransactionError, storage::ObjectStoreRef, DeltaResult}; +use super::{CommitOrBytes, LogStore, LogStoreConfig}; +use crate::{ + operations::transaction::TransactionError, + storage::{commit_uri_from_version, ObjectStoreRef}, + DeltaResult, +}; + +fn put_options() -> &'static PutOptions { + static PUT_OPTS: OnceLock = OnceLock::new(); + PUT_OPTS.get_or_init(|| PutOptions { + mode: object_store::PutMode::Create, // Creates if file doesn't exists yet + tags: TagSet::default(), + attributes: Attributes::default(), + }) +} /// Default [`LogStore`] implementation #[derive(Debug, Clone)] @@ -45,17 +58,43 @@ impl LogStore for DefaultLogStore { async fn write_commit_entry( &self, version: i64, - tmp_commit: &Path, + commit_or_bytes: CommitOrBytes, ) -> Result<(), TransactionError> { - super::write_commit_entry(self.storage.as_ref(), version, tmp_commit).await + match commit_or_bytes { + CommitOrBytes::LogBytes(log_bytes) => { + match self + .object_store() + .put_opts( + &commit_uri_from_version(version), + log_bytes.into(), + put_options().clone(), + ) + .await + { + Ok(_) => Ok(()), + Err(e) => Err(e), + } + } + _ => unreachable!(), // DefaultLogStore should never get a tmp_commit + } + .map_err(|err| -> TransactionError { + match err { + ObjectStoreError::AlreadyExists { .. } => { + TransactionError::VersionAlreadyExists(version) + } + _ => TransactionError::from(err), + } + })?; + Ok(()) } + /// Not required anymore as writes can do Conditinal Put, so it's just a no-op async fn abort_commit_entry( &self, - version: i64, - tmp_commit: &Path, + _version: i64, + _commit_or_bytes: CommitOrBytes, ) -> Result<(), TransactionError> { - super::abort_commit_entry(self.storage.as_ref(), version, tmp_commit).await + Ok(()) } async fn get_latest_version(&self, current_version: i64) -> DeltaResult { diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index 94f88b5944..51191d77d0 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -151,6 +151,15 @@ pub fn logstore_with( )) } +/// Holder whether it's tmp_commit path or commit bytes +#[derive(Clone)] +pub enum CommitOrBytes { + /// Path of the tmp commit, to be used by logstores which use CopyIfNotExists + TmpCommit(Path), + /// Bytes of the log, to be used by logstoers which use Conditional Put + LogBytes(Bytes), +} + /// Configuration parameters for a log store #[derive(Debug, Clone)] pub struct LogStoreConfig { @@ -190,14 +199,14 @@ pub trait LogStore: Sync + Send { async fn write_commit_entry( &self, version: i64, - tmp_commit: &Path, + commit_or_bytes: CommitOrBytes, ) -> Result<(), TransactionError>; /// Abort the commit entry for the given version. async fn abort_commit_entry( &self, version: i64, - tmp_commit: &Path, + commit_or_bytes: CommitOrBytes, ) -> Result<(), TransactionError>; /// Find latest version currently stored in the delta log. diff --git a/crates/core/src/operations/restore.rs b/crates/core/src/operations/restore.rs index 71d478ca68..498edc67c0 100644 --- a/crates/core/src/operations/restore.rs +++ b/crates/core/src/operations/restore.rs @@ -272,14 +272,19 @@ async fn execute( .await?; let commit_version = snapshot.version() + 1; - let commit = prepared_commit.path(); - match log_store.write_commit_entry(commit_version, commit).await { + let commit_bytes = prepared_commit.commit_or_bytes(); + match log_store + .write_commit_entry(commit_version, commit_bytes.clone()) + .await + { Ok(_) => {} Err(err @ TransactionError::VersionAlreadyExists(_)) => { return Err(err.into()); } Err(err) => { - log_store.abort_commit_entry(commit_version, commit).await?; + log_store + .abort_commit_entry(commit_version, commit_bytes.clone()) + .await?; return Err(err.into()); } } diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index e0902f96e1..216b8714d0 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -79,7 +79,7 @@ use chrono::Utc; use conflict_checker::ConflictChecker; use futures::future::BoxFuture; use object_store::path::Path; -use object_store::{Error as ObjectStoreError, ObjectStore}; +use object_store::Error as ObjectStoreError; use serde_json::Value; use self::conflict_checker::{CommitConflictError, TransactionInfo, WinningCommitSummary}; @@ -89,7 +89,7 @@ use crate::kernel::{ Action, CommitInfo, EagerSnapshot, Metadata, Protocol, ReaderFeatures, Transaction, WriterFeatures, }; -use crate::logstore::LogStoreRef; +use crate::logstore::{CommitOrBytes, LogStoreRef}; use crate::protocol::DeltaOperation; use crate::table::config::TableConfig; use crate::table::state::DeltaTableState; @@ -481,18 +481,22 @@ impl<'a> PreCommit<'a> { PROTOCOL.can_commit(table_reference, &this.data.actions, &this.data.operation)?; } - // Write delta log entry as temporary file to storage. For the actual commit, - // the temporary file is moved (atomic rename) to the delta log folder within `commit` function. let log_entry = this.data.get_bytes()?; - let token = uuid::Uuid::new_v4().to_string(); - let path = Path::from_iter([DELTA_LOG_FOLDER, &format!("_commit_{token}.json.tmp")]); - this.log_store - .object_store() - .put(&path, log_entry.into()) - .await?; + + // With the DefaultLogStore, we just pass the bytes around, since we use conditionalPuts + let commit_or_bytes = if this.log_store.name() == "DefaultLogStore" { + CommitOrBytes::LogBytes(log_entry) + } else { + // Write delta log entry as temporary file to storage. For the actual commit, + // the temporary file is moved (atomic rename) to the delta log folder within `commit` function. + let token = uuid::Uuid::new_v4().to_string(); + let path = + Path::from_iter([DELTA_LOG_FOLDER, &format!("_commit_{token}.json.tmp")]); + CommitOrBytes::TmpCommit(path) + }; Ok(PreparedCommit { - path, + commit_or_bytes, log_store: this.log_store, table_data: this.table_data, max_retries: this.max_retries, @@ -503,9 +507,9 @@ impl<'a> PreCommit<'a> { } } -/// Represents a inflight commit with a temporary commit marker on the log store +/// Represents a inflight commit pub struct PreparedCommit<'a> { - path: Path, + commit_or_bytes: CommitOrBytes, log_store: LogStoreRef, data: CommitData, table_data: Option<&'a dyn TableReference>, @@ -515,8 +519,8 @@ pub struct PreparedCommit<'a> { impl<'a> PreparedCommit<'a> { /// The temporary commit file created - pub fn path(&self) -> &Path { - &self.path + pub fn commit_or_bytes(&self) -> &CommitOrBytes { + &self.commit_or_bytes } } @@ -528,10 +532,12 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { let this = self; Box::pin(async move { - let tmp_commit = &this.path; + let commit_or_bytes = this.commit_or_bytes; if this.table_data.is_none() { - this.log_store.write_commit_entry(0, tmp_commit).await?; + this.log_store + .write_commit_entry(0, commit_or_bytes.clone()) + .await?; return Ok(PostCommit { version: 0, data: this.data, @@ -549,7 +555,11 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { let mut attempt_number = 1; while attempt_number <= this.max_retries { let version = read_snapshot.version() + attempt_number as i64; - match this.log_store.write_commit_entry(version, tmp_commit).await { + match this + .log_store + .write_commit_entry(version, commit_or_bytes.clone()) + .await + { Ok(()) => { return Ok(PostCommit { version, @@ -590,7 +600,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { } Err(err) => { this.log_store - .abort_commit_entry(version, tmp_commit) + .abort_commit_entry(version, commit_or_bytes) .await?; return Err(TransactionError::CommitConflict(err).into()); } @@ -598,7 +608,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { } Err(err) => { this.log_store - .abort_commit_entry(version, tmp_commit) + .abort_commit_entry(version, commit_or_bytes) .await?; return Err(err.into()); } @@ -732,7 +742,7 @@ mod tests { logstore::{default_logstore::DefaultLogStore, LogStore}, storage::commit_uri_from_version, }; - use object_store::{memory::InMemory, PutPayload}; + use object_store::{memory::InMemory, ObjectStore, PutPayload}; use url::Url; #[test] @@ -754,16 +764,19 @@ mod tests { options: HashMap::new().into(), }, ); - let tmp_path = Path::from("_delta_log/tmp"); let version_path = Path::from("_delta_log/00000000000000000000.json"); - store.put(&tmp_path, PutPayload::new()).await.unwrap(); store.put(&version_path, PutPayload::new()).await.unwrap(); - let res = log_store.write_commit_entry(0, &tmp_path).await; + let res = log_store + .write_commit_entry(0, CommitOrBytes::LogBytes(PutPayload::new().into())) + .await; // fails if file version already exists assert!(res.is_err()); // succeeds for next version - log_store.write_commit_entry(1, &tmp_path).await.unwrap(); + log_store + .write_commit_entry(1, CommitOrBytes::LogBytes(PutPayload::new().into())) + .await + .unwrap(); } }