From 3b4d6d41400e457b57f77f58e4b29b92c18011f4 Mon Sep 17 00:00:00 2001 From: Thomas Peiselt Date: Sun, 17 Dec 2023 18:25:39 +0100 Subject: [PATCH] Add configurable max elapsed time --- crates/deltalake-aws/src/errors.rs | 9 ++ crates/deltalake-aws/src/lib.rs | 129 ++++++++++-------- crates/deltalake-core/src/logstore/s3/mod.rs | 15 +- crates/deltalake-core/src/test_utils.rs | 11 +- .../tests/integration_s3_dynamodb.rs | 39 +++++- 5 files changed, 133 insertions(+), 70 deletions(-) diff --git a/crates/deltalake-aws/src/errors.rs b/crates/deltalake-aws/src/errors.rs index 7ba4fa5403..bbce9dc426 100644 --- a/crates/deltalake-aws/src/errors.rs +++ b/crates/deltalake-aws/src/errors.rs @@ -1,5 +1,7 @@ //! Errors for S3 log store backed by DynamoDb +use std::num::ParseIntError; + use rusoto_core::RusotoError; use rusoto_dynamodb::{CreateTableError, GetItemError, PutItemError, QueryError, UpdateItemError}; @@ -24,6 +26,13 @@ pub enum DynamoDbConfigError { /// Billing mode string invalid #[error("Invalid billing mode : {0}, supported values : ['provided', 'pay_per_request']")] InvalidBillingMode(String), + + /// Cannot parse max_elapsed_request_time value into u64 + #[error("Cannot parse max elapsed request time into u64: {source}")] + ParseMaxElapsedRequestTime { + // config_value: String, + source: ParseIntError, + }, } /// Errors produced by `DynamoDbLockClient` diff --git a/crates/deltalake-aws/src/lib.rs b/crates/deltalake-aws/src/lib.rs index 6dba20680e..6dd34b816e 100644 --- a/crates/deltalake-aws/src/lib.rs +++ b/crates/deltalake-aws/src/lib.rs @@ -65,32 +65,37 @@ pub struct DynamoDbLockClient { impl DynamoDbLockClient { /// Creates a new DynamoDbLockClient from the supplied storage options. pub fn try_new( - lock_table_name: Option<&String>, - billing_mode: Option<&String>, + lock_table_name: Option, + billing_mode: Option, + max_elapsed_request_time: Option, region: Region, use_web_identity: bool, ) -> Result { let dynamodb_client = create_dynamodb_client(region.clone(), use_web_identity)?; - let lock_table_name = lock_table_name.map_or_else( - || { - std::env::var(constants::LOCK_TABLE_KEY_NAME) - .unwrap_or(constants::DEFAULT_LOCK_TABLE_NAME.to_owned()) - }, - Clone::clone, - ); - let billing_mode: BillingMode = billing_mode.map_or_else( - || { - std::env::var(constants::BILLING_MODE_KEY_NAME).map_or_else( - |_| Ok(BillingMode::PayPerRequest), - |bm| BillingMode::from_str(&bm), - ) - }, - |bm| BillingMode::from_str(bm), - )?; + let lock_table_name = lock_table_name + .or_else(|| std::env::var(constants::LOCK_TABLE_KEY_NAME).ok()) + .unwrap_or(constants::DEFAULT_LOCK_TABLE_NAME.to_owned()); + + let billing_mode = billing_mode + .or_else(|| std::env::var(constants::BILLING_MODE_KEY_NAME).ok()) + .map_or_else( + || Ok(BillingMode::PayPerRequest), + |bm| BillingMode::from_str(&bm), + )?; + + let max_elapsed_request_time = max_elapsed_request_time + .or_else(|| std::env::var(constants::MAX_ELAPSED_REQUEST_TIME_KEY_NAME).ok()) + .map_or_else( + || Ok(Duration::from_secs(60)), + |secs| u64::from_str(&secs).map(Duration::from_secs), + ) + .map_err(|err| DynamoDbConfigError::ParseMaxElapsedRequestTime { source: err })?; + let config = DynamoDbConfig { billing_mode, lock_table_name, + max_elapsed_request_time, use_web_identity, region, }; @@ -150,6 +155,10 @@ impl DynamoDbLockClient { self.config.lock_table_name.clone() } + pub fn get_dynamodb_config(&self) -> &DynamoDbConfig { + &self.config + } + fn get_primary_key(&self, version: i64, table_path: &str) -> HashMap { maplit::hashmap! { constants::ATTR_TABLE_PATH.to_owned() => string_attr(table_path), @@ -169,16 +178,19 @@ impl DynamoDbLockClient { key: self.get_primary_key(version, table_path), ..Default::default() }; - let item = retry(|| async { - match self.dynamodb_client.get_item(input.clone()).await { - Ok(x) => Ok(x), - Err(RusotoError::Service(GetItemError::ProvisionedThroughputExceeded(_))) => Err( - backoff::Error::transient(LockClientError::ProvisionedThroughputExceeded), - ), - Err(err) => Err(backoff::Error::permanent(err.into())), - } - }) - .await?; + let item = self + .retry(|| async { + match self.dynamodb_client.get_item(input.clone()).await { + Ok(x) => Ok(x), + Err(RusotoError::Service(GetItemError::ProvisionedThroughputExceeded(_))) => { + Err(backoff::Error::transient( + LockClientError::ProvisionedThroughputExceeded, + )) + } + Err(err) => Err(backoff::Error::permanent(err.into())), + } + }) + .await?; item.item.as_ref().map(CommitEntry::try_from).transpose() } @@ -195,7 +207,7 @@ impl DynamoDbLockClient { item, ..Default::default() }; - retry(|| async { + self.retry(|| async { match self.dynamodb_client.put_item(input.clone()).await { Ok(_) => Ok(()), Err(RusotoError::Service(PutItemError::ProvisionedThroughputExceeded(_))) => Err( @@ -245,16 +257,17 @@ impl DynamoDbLockClient { ), ..Default::default() }; - let query_result = retry(|| async { - match self.dynamodb_client.query(input.clone()).await { - Ok(result) => Ok(result), - Err(RusotoError::Service(QueryError::ProvisionedThroughputExceeded(_))) => Err( - backoff::Error::transient(LockClientError::ProvisionedThroughputExceeded), - ), - Err(err) => Err(backoff::Error::permanent(err.into())), - } - }) - .await?; + let query_result = self + .retry(|| async { + match self.dynamodb_client.query(input.clone()).await { + Ok(result) => Ok(result), + Err(RusotoError::Service(QueryError::ProvisionedThroughputExceeded(_))) => Err( + backoff::Error::transient(LockClientError::ProvisionedThroughputExceeded), + ), + Err(err) => Err(backoff::Error::permanent(err.into())), + } + }) + .await?; query_result .items @@ -288,7 +301,7 @@ impl DynamoDbLockClient { ..Default::default() }; - retry(|| async { + self.retry(|| async { match self.dynamodb_client.update_item(input.clone()).await { Ok(_) => Ok(UpdateLogEntryResult::UpdatePerformed), Err(RusotoError::Service(UpdateItemError::ConditionalCheckFailed(_))) => { @@ -304,19 +317,19 @@ impl DynamoDbLockClient { }) .await } -} -async fn retry(operation: Fn) -> Result -where - Fn: FnMut() -> Fut, - Fut: std::future::Future>>, -{ - let backoff = backoff::ExponentialBackoffBuilder::new() - .with_multiplier(2.) - .with_max_interval(Duration::from_secs(15)) - .with_max_elapsed_time(Some(Duration::from_secs(60))) - .build(); - backoff::future::retry(backoff, operation).await + async fn retry(&self, operation: Fn) -> Result + where + Fn: FnMut() -> Fut, + Fut: std::future::Future>>, + { + let backoff = backoff::ExponentialBackoffBuilder::new() + .with_multiplier(2.) + .with_max_interval(Duration::from_secs(15)) + .with_max_elapsed_time(Some(self.config.max_elapsed_request_time)) + .build(); + backoff::future::retry(backoff, operation).await + } } #[derive(Debug, PartialEq)] @@ -389,7 +402,7 @@ fn create_value_map( } #[derive(Debug, PartialEq)] -enum BillingMode { +pub enum BillingMode { PayPerRequest, Provisioned, } @@ -417,10 +430,11 @@ impl FromStr for BillingMode { #[derive(Debug, PartialEq)] pub struct DynamoDbConfig { - billing_mode: BillingMode, - lock_table_name: String, - use_web_identity: bool, - region: Region, + pub billing_mode: BillingMode, + pub lock_table_name: String, + pub max_elapsed_request_time: Duration, + pub use_web_identity: bool, + pub region: Region, } /// Represents the possible, positive outcomes of calling `DynamoDbClient::try_create_lock_table()` @@ -441,6 +455,7 @@ pub mod constants { pub const DEFAULT_LOCK_TABLE_NAME: &str = "delta_log"; pub const LOCK_TABLE_KEY_NAME: &str = "DELTA_DYNAMO_TABLE_NAME"; pub const BILLING_MODE_KEY_NAME: &str = "DELTA_DYNAMO_BILLING_MODE"; + pub const MAX_ELAPSED_REQUEST_TIME_KEY_NAME: &str = "DELTA_DYNAMO_MAX_ELAPSED_REQUEST_TIME"; pub const ATTR_TABLE_PATH: &str = "tablePath"; pub const ATTR_FILE_NAME: &str = "fileName"; diff --git a/crates/deltalake-core/src/logstore/s3/mod.rs b/crates/deltalake-core/src/logstore/s3/mod.rs index 9e7883c7b2..17b35a4451 100644 --- a/crates/deltalake-core/src/logstore/s3/mod.rs +++ b/crates/deltalake-core/src/logstore/s3/mod.rs @@ -39,8 +39,18 @@ impl S3DynamoDbLogStore { object_store: ObjectStoreRef, ) -> DeltaResult { let lock_client = DynamoDbLockClient::try_new( - s3_options.extra_opts.get(constants::LOCK_TABLE_KEY_NAME), - s3_options.extra_opts.get(constants::BILLING_MODE_KEY_NAME), + s3_options + .extra_opts + .get(constants::LOCK_TABLE_KEY_NAME) + .cloned(), + s3_options + .extra_opts + .get(constants::BILLING_MODE_KEY_NAME) + .cloned(), + s3_options + .extra_opts + .get(constants::MAX_ELAPSED_REQUEST_TIME_KEY_NAME) + .cloned(), s3_options.region.clone(), s3_options.use_web_identity, ) @@ -50,6 +60,7 @@ impl S3DynamoDbLogStore { source: err.into(), }, })?; + let table_path = super::to_uri(&location, &Path::from("")); Ok(Self { storage: object_store, diff --git a/crates/deltalake-core/src/test_utils.rs b/crates/deltalake-core/src/test_utils.rs index 52d757a2eb..28430d884d 100644 --- a/crates/deltalake-core/src/test_utils.rs +++ b/crates/deltalake-core/src/test_utils.rs @@ -2,6 +2,7 @@ use crate::storage::utils::copy_table; use crate::DeltaTableBuilder; use chrono::Utc; +use deltalake_aws::constants; use fs_extra::dir::{copy, CopyOptions}; use object_store::DynObjectStore; use rand::Rng; @@ -240,8 +241,8 @@ impl StorageIntegration { Self::Onelake => Ok(()), Self::OnelakeAbfs => Ok(()), Self::Amazon => { - std::env::set_var( - "DELTA_DYNAMO_TABLE_NAME", + set_env_if_not_set( + constants::LOCK_TABLE_KEY_NAME, format!("delta_log_it_{}", rand::thread_rng().gen::()), ); s3_cli::create_bucket(format!("s3://{}", name.as_ref()))?; @@ -409,6 +410,8 @@ pub mod az_cli { /// small wrapper around s3 cli pub mod s3_cli { + use deltalake_aws::constants; + use super::set_env_if_not_set; use crate::table::builder::s3_storage_options; use std::process::{Command, ExitStatus, Stdio}; @@ -521,7 +524,7 @@ pub mod s3_cli { pub fn create_lock_table() -> std::io::Result { let table_name = - std::env::var("DELTA_DYNAMO_TABLE_NAME").unwrap_or_else(|_| "delta_log".into()); + std::env::var(constants::LOCK_TABLE_KEY_NAME).unwrap_or_else(|_| "delta_log".into()); create_dynamodb_table( &table_name, &[ @@ -546,7 +549,7 @@ pub mod s3_cli { pub fn delete_lock_table() -> std::io::Result { let table_name = - std::env::var("DELTA_DYNAMO_TABLE_NAME").unwrap_or_else(|_| "delta_log".into()); + std::env::var(constants::LOCK_TABLE_KEY_NAME).unwrap_or_else(|_| "delta_log".into()); delete_dynamodb_table(&table_name) } } diff --git a/crates/deltalake-core/tests/integration_s3_dynamodb.rs b/crates/deltalake-core/tests/integration_s3_dynamodb.rs index 412ec223a3..6dc4349807 100644 --- a/crates/deltalake-core/tests/integration_s3_dynamodb.rs +++ b/crates/deltalake-core/tests/integration_s3_dynamodb.rs @@ -8,7 +8,7 @@ use std::collections::HashMap; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use deltalake_aws::{CommitEntry, DynamoDbLockClient}; +use deltalake_aws::{constants, CommitEntry, DynamoDbConfig, DynamoDbLockClient}; use deltalake_core::kernel::{Action, Add, DataType, PrimitiveType, StructField, StructType}; use deltalake_core::logstore::s3::{RepairLogEntryResult, S3DynamoDbLogStore}; use deltalake_core::logstore::LogStore; @@ -38,21 +38,46 @@ lazy_static! { } fn make_client() -> TestResult { + let options: S3StorageOptions = S3StorageOptions::default(); Ok(DynamoDbLockClient::try_new( None, None, - S3_OPTIONS.region.clone(), + None, + options.region.clone(), false, )?) } #[test] #[serial] -fn client_config_picks_up_lock_table_name() -> TestResult<()> { - let _context = IntegrationContext::new(StorageIntegration::Amazon)?; - assert!(make_client()? - .get_lock_table_name() - .starts_with("delta_log_it_")); +fn client_configs_via_env_variables() -> TestResult<()> { + std::env::set_var( + deltalake_aws::constants::MAX_ELAPSED_REQUEST_TIME_KEY_NAME, + "64", + ); + std::env::set_var( + deltalake_aws::constants::LOCK_TABLE_KEY_NAME, + "some_table".to_owned(), + ); + std::env::set_var( + deltalake_aws::constants::BILLING_MODE_KEY_NAME, + "PAY_PER_REQUEST".to_owned(), + ); + let client = make_client()?; + let config = client.get_dynamodb_config(); + assert_eq!( + DynamoDbConfig { + billing_mode: deltalake_aws::BillingMode::PayPerRequest, + lock_table_name: "some_table".to_owned(), + max_elapsed_request_time: Duration::from_secs(64), + use_web_identity: false, + region: config.region.clone(), + }, + *config, + ); + std::env::remove_var(deltalake_aws::constants::LOCK_TABLE_KEY_NAME); + std::env::remove_var(deltalake_aws::constants::MAX_ELAPSED_REQUEST_TIME_KEY_NAME); + std::env::remove_var(deltalake_aws::constants::BILLING_MODE_KEY_NAME); Ok(()) }