Skip to content

Commit

Permalink
feat: conditional put for default log store
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Aug 22, 2024
1 parent 1f45881 commit b786513
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 42 deletions.
14 changes: 11 additions & 3 deletions crates/aws/src/logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,12 @@ impl LogStore for S3DynamoDbLogStore {
async fn write_commit_entry(
&self,
version: i64,
tmp_commit: &Path,
commit_or_bytes: CommitOrBytes,
) -> Result<(), TransactionError> {
let tmp_commit = match commit_or_bytes {
CommitOrBytes::TmpCommit(tmp_commit) => tmp_commit,
_ => unreachable!(), // S3DynamoDBLogstore should never get Bytes
};
let entry = CommitEntry::new(version, tmp_commit.clone());
debug!("Writing commit entry for {self:?}: {entry:?}");
// create log entry in dynamo db: complete = false, no expireTime
Expand Down Expand Up @@ -244,8 +248,12 @@ impl LogStore for S3DynamoDbLogStore {
async fn abort_commit_entry(
&self,
version: i64,
tmp_commit: &Path,
commit_or_bytes: CommitOrBytes,
) -> Result<(), TransactionError> {
let tmp_commit = match commit_or_bytes {
CommitOrBytes::TmpCommit(tmp_commit) => tmp_commit,
_ => unreachable!(), // S3DynamoDBLogstore should never get Bytes
};
self.lock_client
.delete_commit_entry(version, &self.table_path)
.await
Expand All @@ -266,7 +274,7 @@ impl LogStore for S3DynamoDbLogStore {
},
})?;

abort_commit_entry(&self.storage, version, tmp_commit).await?;
abort_commit_entry(&self.storage, version, &tmp_commit).await?;
Ok(())
}

Expand Down
57 changes: 48 additions & 9 deletions crates/core/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
//! Default implementation of [`LogStore`] for storage backends with atomic put-if-absent operation

use std::sync::Arc;
use std::sync::{Arc, OnceLock};

use bytes::Bytes;
use object_store::{path::Path, ObjectStore};
use object_store::{Attributes, Error as ObjectStoreError, ObjectStore, PutOptions, TagSet};

use super::{LogStore, LogStoreConfig};
use crate::{operations::transaction::TransactionError, storage::ObjectStoreRef, DeltaResult};
use super::{CommitOrBytes, LogStore, LogStoreConfig};
use crate::{
operations::transaction::TransactionError,
storage::{commit_uri_from_version, ObjectStoreRef},
DeltaResult,
};

fn put_options() -> &'static PutOptions {
static PUT_OPTS: OnceLock<PutOptions> = OnceLock::new();
PUT_OPTS.get_or_init(|| PutOptions {
mode: object_store::PutMode::Create, // Creates if file doesn't exists yet
tags: TagSet::default(),
attributes: Attributes::default(),
})
}

/// Default [`LogStore`] implementation
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -45,17 +58,43 @@ impl LogStore for DefaultLogStore {
async fn write_commit_entry(
&self,
version: i64,
tmp_commit: &Path,
commit_or_bytes: CommitOrBytes,
) -> Result<(), TransactionError> {
super::write_commit_entry(self.storage.as_ref(), version, tmp_commit).await
match commit_or_bytes {
CommitOrBytes::LogBytes(log_bytes) => {
match self
.object_store()
.put_opts(
&commit_uri_from_version(version),
log_bytes.into(),
put_options().clone(),
)
.await
{
Ok(_) => Ok(()),
Err(e) => Err(e),
}
}
_ => unreachable!(), // DefaultLogStore should never get a tmp_commit
}
.map_err(|err| -> TransactionError {
match err {
ObjectStoreError::AlreadyExists { .. } => {
TransactionError::VersionAlreadyExists(version)
}
_ => TransactionError::from(err),
}
})?;
Ok(())
}

/// Not required anymore as writes can do Conditinal Put, so it's just a no-op
async fn abort_commit_entry(
&self,
version: i64,
tmp_commit: &Path,
_version: i64,
_commit_or_bytes: CommitOrBytes,
) -> Result<(), TransactionError> {
super::abort_commit_entry(self.storage.as_ref(), version, tmp_commit).await
Ok(())
}

async fn get_latest_version(&self, current_version: i64) -> DeltaResult<i64> {
Expand Down
13 changes: 11 additions & 2 deletions crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,15 @@ pub fn logstore_with(
))
}

/// Holder whether it's tmp_commit path or commit bytes
#[derive(Clone)]
pub enum CommitOrBytes {
/// Path of the tmp commit, to be used by logstores which use CopyIfNotExists
TmpCommit(Path),
/// Bytes of the log, to be used by logstoers which use Conditional Put
LogBytes(Bytes),
}

/// Configuration parameters for a log store
#[derive(Debug, Clone)]
pub struct LogStoreConfig {
Expand Down Expand Up @@ -190,14 +199,14 @@ pub trait LogStore: Sync + Send {
async fn write_commit_entry(
&self,
version: i64,
tmp_commit: &Path,
commit_or_bytes: CommitOrBytes,
) -> Result<(), TransactionError>;

/// Abort the commit entry for the given version.
async fn abort_commit_entry(
&self,
version: i64,
tmp_commit: &Path,
commit_or_bytes: CommitOrBytes,
) -> Result<(), TransactionError>;

/// Find latest version currently stored in the delta log.
Expand Down
11 changes: 8 additions & 3 deletions crates/core/src/operations/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,19 @@ async fn execute(
.await?;

let commit_version = snapshot.version() + 1;
let commit = prepared_commit.path();
match log_store.write_commit_entry(commit_version, commit).await {
let commit_bytes = prepared_commit.commit_or_bytes();
match log_store
.write_commit_entry(commit_version, commit_bytes.clone())
.await
{
Ok(_) => {}
Err(err @ TransactionError::VersionAlreadyExists(_)) => {
return Err(err.into());
}
Err(err) => {
log_store.abort_commit_entry(commit_version, commit).await?;
log_store
.abort_commit_entry(commit_version, commit_bytes.clone())
.await?;
return Err(err.into());
}
}
Expand Down
63 changes: 38 additions & 25 deletions crates/core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ use chrono::Utc;
use conflict_checker::ConflictChecker;
use futures::future::BoxFuture;
use object_store::path::Path;
use object_store::{Error as ObjectStoreError, ObjectStore};
use object_store::Error as ObjectStoreError;
use serde_json::Value;

use self::conflict_checker::{CommitConflictError, TransactionInfo, WinningCommitSummary};
Expand All @@ -89,7 +89,7 @@ use crate::kernel::{
Action, CommitInfo, EagerSnapshot, Metadata, Protocol, ReaderFeatures, Transaction,
WriterFeatures,
};
use crate::logstore::LogStoreRef;
use crate::logstore::{CommitOrBytes, LogStoreRef};
use crate::protocol::DeltaOperation;
use crate::table::config::TableConfig;
use crate::table::state::DeltaTableState;
Expand Down Expand Up @@ -481,18 +481,22 @@ impl<'a> PreCommit<'a> {
PROTOCOL.can_commit(table_reference, &this.data.actions, &this.data.operation)?;
}

// Write delta log entry as temporary file to storage. For the actual commit,
// the temporary file is moved (atomic rename) to the delta log folder within `commit` function.
let log_entry = this.data.get_bytes()?;
let token = uuid::Uuid::new_v4().to_string();
let path = Path::from_iter([DELTA_LOG_FOLDER, &format!("_commit_{token}.json.tmp")]);
this.log_store
.object_store()
.put(&path, log_entry.into())
.await?;

// With the DefaultLogStore, we just pass the bytes around, since we use conditionalPuts
let commit_or_bytes = if this.log_store.name() == "DefaultLogStore" {
CommitOrBytes::LogBytes(log_entry)
} else {
// Write delta log entry as temporary file to storage. For the actual commit,
// the temporary file is moved (atomic rename) to the delta log folder within `commit` function.
let token = uuid::Uuid::new_v4().to_string();
let path =
Path::from_iter([DELTA_LOG_FOLDER, &format!("_commit_{token}.json.tmp")]);
CommitOrBytes::TmpCommit(path)
};

Ok(PreparedCommit {
path,
commit_or_bytes,
log_store: this.log_store,
table_data: this.table_data,
max_retries: this.max_retries,
Expand All @@ -503,9 +507,9 @@ impl<'a> PreCommit<'a> {
}
}

/// Represents a inflight commit with a temporary commit marker on the log store
/// Represents a inflight commit
pub struct PreparedCommit<'a> {
path: Path,
commit_or_bytes: CommitOrBytes,
log_store: LogStoreRef,
data: CommitData,
table_data: Option<&'a dyn TableReference>,
Expand All @@ -515,8 +519,8 @@ pub struct PreparedCommit<'a> {

impl<'a> PreparedCommit<'a> {
/// The temporary commit file created
pub fn path(&self) -> &Path {
&self.path
pub fn commit_or_bytes(&self) -> &CommitOrBytes {
&self.commit_or_bytes
}
}

Expand All @@ -528,10 +532,12 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
let this = self;

Box::pin(async move {
let tmp_commit = &this.path;
let commit_or_bytes = this.commit_or_bytes;

if this.table_data.is_none() {
this.log_store.write_commit_entry(0, tmp_commit).await?;
this.log_store
.write_commit_entry(0, commit_or_bytes.clone())
.await?;
return Ok(PostCommit {
version: 0,
data: this.data,
Expand All @@ -549,7 +555,11 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
let mut attempt_number = 1;
while attempt_number <= this.max_retries {
let version = read_snapshot.version() + attempt_number as i64;
match this.log_store.write_commit_entry(version, tmp_commit).await {
match this
.log_store
.write_commit_entry(version, commit_or_bytes.clone())
.await
{
Ok(()) => {
return Ok(PostCommit {
version,
Expand Down Expand Up @@ -590,15 +600,15 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
}
Err(err) => {
this.log_store
.abort_commit_entry(version, tmp_commit)
.abort_commit_entry(version, commit_or_bytes)
.await?;
return Err(TransactionError::CommitConflict(err).into());
}
};
}
Err(err) => {
this.log_store
.abort_commit_entry(version, tmp_commit)
.abort_commit_entry(version, commit_or_bytes)
.await?;
return Err(err.into());
}
Expand Down Expand Up @@ -732,7 +742,7 @@ mod tests {
logstore::{default_logstore::DefaultLogStore, LogStore},
storage::commit_uri_from_version,
};
use object_store::{memory::InMemory, PutPayload};
use object_store::{memory::InMemory, ObjectStore, PutPayload};
use url::Url;

#[test]
Expand All @@ -754,16 +764,19 @@ mod tests {
options: HashMap::new().into(),
},
);
let tmp_path = Path::from("_delta_log/tmp");
let version_path = Path::from("_delta_log/00000000000000000000.json");
store.put(&tmp_path, PutPayload::new()).await.unwrap();
store.put(&version_path, PutPayload::new()).await.unwrap();

let res = log_store.write_commit_entry(0, &tmp_path).await;
let res = log_store
.write_commit_entry(0, CommitOrBytes::LogBytes(PutPayload::new().into()))
.await;
// fails if file version already exists
assert!(res.is_err());

// succeeds for next version
log_store.write_commit_entry(1, &tmp_path).await.unwrap();
log_store
.write_commit_entry(1, CommitOrBytes::LogBytes(PutPayload::new().into()))
.await
.unwrap();
}
}

0 comments on commit b786513

Please sign in to comment.