diff --git a/Cargo.lock b/Cargo.lock index bc5a9b1894..0985b319db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1215,20 +1215,6 @@ dependencies = [ "tower-service", ] -[[package]] -name = "backoff" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" -dependencies = [ - "futures-core", - "getrandom 0.2.15", - "instant", - "pin-project-lite", - "rand 0.8.5", - "tokio", -] - [[package]] name = "backtrace" version = "0.3.71" @@ -2906,6 +2892,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "exponential-backoff" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ffb309d235a642598183aeda8925e871e85dd5a433c2c877e69ff0a960f4c02" +dependencies = [ + "fastrand", +] + [[package]] name = "eyre" version = "0.6.12" @@ -8674,10 +8669,10 @@ dependencies = [ "aes-gcm-siv", "assert_fs", "async-trait", - "backoff", "blsttc", "bytes", "custom_debug", + "exponential-backoff", "eyre", "futures", "getrandom 0.2.15", @@ -8825,6 +8820,7 @@ dependencies = [ "crdts", "custom_debug", "dirs-next", + "exponential-backoff", "hex 0.4.3", "lazy_static", "libp2p 0.54.1", diff --git a/sn_networking/Cargo.toml b/sn_networking/Cargo.toml index 9d6a39e75a..e9d53af4dd 100644 --- a/sn_networking/Cargo.toml +++ b/sn_networking/Cargo.toml @@ -39,6 +39,7 @@ libp2p = { version = "0.54.1", features = [ ] } async-trait = "0.1" bytes = { version = "1.0.1", features = ["serde"] } +exponential-backoff = "2.0.0" futures = "~0.3.13" hex = "~0.4.3" hyper = { version = "0.14", features = [ @@ -71,7 +72,6 @@ tokio = { version = "1.32.0", features = [ ] } tracing = { version = "~0.1.26" } xor_name = "5.0.0" -backoff = { version = "0.4.0", features = ["tokio"] } aes-gcm-siv = "0.11.1" hkdf = "0.12" sha2 = "0.10" diff --git a/sn_networking/src/lib.rs b/sn_networking/src/lib.rs index 06699f7fe1..779207c0c2 100644 --- a/sn_networking/src/lib.rs +++ b/sn_networking/src/lib.rs @@ -274,10 +274,9 @@ impl Network { quorum: Quorum, retry_strategy: Option, ) -> Result<()> { - let mut total_attempts = 1; - total_attempts += retry_strategy - .map(|strategy| strategy.get_count()) - .unwrap_or(0); + let total_attempts = retry_strategy + .map(|strategy| strategy.attempts()) + .unwrap_or(1); let pretty_key = PrettyPrintRecordKey::from(&chunk_address.to_record_key()).into_owned(); let expected_n_verified = get_quorum_value(&quorum); @@ -479,30 +478,6 @@ impl Network { Ok(all_register_copies) } - /// Get a record from the network - /// This differs from non-wasm32 builds as no retries are applied - #[cfg(target_arch = "wasm32")] - pub async fn get_record_from_network( - &self, - key: RecordKey, - cfg: &GetRecordCfg, - ) -> Result { - let pretty_key = PrettyPrintRecordKey::from(&key); - info!("Getting record from network of {pretty_key:?}. with cfg {cfg:?}",); - let (sender, receiver) = oneshot::channel(); - self.send_network_swarm_cmd(NetworkSwarmCmd::GetNetworkRecord { - key: key.clone(), - sender, - cfg: cfg.clone(), - }); - let result = receiver.await.map_err(|e| { - error!("When fetching record {pretty_key:?}, encountered a channel error {e:?}"); - NetworkError::InternalMsgChannelDropped - })?; - - result.map_err(NetworkError::from) - } - /// Get the Record from the network /// Carry out re-attempts if required /// In case a target_record is provided, only return when fetched target. @@ -511,93 +486,92 @@ impl Network { /// It also handles the split record error for spends and registers. /// For spends, it accumulates the spends and returns an error if more than one. /// For registers, it merges the registers and returns the merged record. - #[cfg(not(target_arch = "wasm32"))] pub async fn get_record_from_network( &self, key: RecordKey, cfg: &GetRecordCfg, ) -> Result { - let retry_duration = cfg.retry_strategy.map(|strategy| strategy.get_duration()); - backoff::future::retry( - backoff::ExponentialBackoff { - // None sets a random duration, but we'll be terminating with a BackoffError::Permanent, so retry will - // be disabled. - max_elapsed_time: retry_duration, - ..Default::default() - }, - || async { - let pretty_key = PrettyPrintRecordKey::from(&key); - info!("Getting record from network of {pretty_key:?}. with cfg {cfg:?}",); - let (sender, receiver) = oneshot::channel(); - self.send_network_swarm_cmd(NetworkSwarmCmd::GetNetworkRecord { - key: key.clone(), - sender, - cfg: cfg.clone(), - }); - let result = receiver.await.map_err(|e| { - error!("When fetching record {pretty_key:?}, encountered a channel error {e:?}"); - NetworkError::InternalMsgChannelDropped - }).map_err(|err| backoff::Error::Transient { err, retry_after: None })?; - - // log the results - match &result { - Ok(_) => { - info!("Record returned: {pretty_key:?}."); - } - Err(GetRecordError::RecordDoesNotMatch(_)) => { - warn!("The returned record does not match target {pretty_key:?}."); - } - Err(GetRecordError::NotEnoughCopies { expected, got, .. }) => { - warn!("Not enough copies ({got}/{expected}) found yet for {pretty_key:?}."); - } - // libp2p RecordNotFound does mean no holders answered. - // it does not actually mean the record does not exist. - // just that those asked did not have it - Err(GetRecordError::RecordNotFound) => { - warn!("No holder of record '{pretty_key:?}' found."); - } - // This is returned during SplitRecordError, we should not get this error here. - Err(GetRecordError::RecordKindMismatch) => { - error!("Record kind mismatch for {pretty_key:?}. This error should not happen here."); - } - Err(GetRecordError::SplitRecord { result_map }) => { - error!("Encountered a split record for {pretty_key:?}."); - if let Some(record) = Self::handle_split_record_error(result_map, &key)? { - info!("Merged the split record (register) for {pretty_key:?}, into a single record"); - return Ok(record); - } - } - Err(GetRecordError::QueryTimeout) => { - error!("Encountered query timeout for {pretty_key:?}."); - } - }; + let pretty_key = PrettyPrintRecordKey::from(&key); + let mut backoff = cfg + .retry_strategy + .unwrap_or(RetryStrategy::None) + .backoff() + .into_iter(); + + loop { + info!("Getting record from network of {pretty_key:?}. with cfg {cfg:?}",); + let (sender, receiver) = oneshot::channel(); + self.send_network_swarm_cmd(NetworkSwarmCmd::GetNetworkRecord { + key: key.clone(), + sender, + cfg: cfg.clone(), + }); + let result = match receiver.await { + Ok(result) => result, + Err(err) => { + error!( + "When fetching record {pretty_key:?}, encountered a channel error {err:?}" + ); + // Do not attempt retries. + return Err(NetworkError::InternalMsgChannelDropped); + } + }; - // if we don't want to retry, throw permanent error - if cfg.retry_strategy.is_none() { - if let Err(e) = result { - return Err(backoff::Error::Permanent(NetworkError::from(e))); + let err = match result { + Ok(record) => { + info!("Record returned: {pretty_key:?}."); + return Ok(record); + } + Err(err) => err, + }; + + // log the results + match &err { + GetRecordError::RecordDoesNotMatch(_) => { + warn!("The returned record does not match target {pretty_key:?}."); + } + GetRecordError::NotEnoughCopies { expected, got, .. } => { + warn!("Not enough copies ({got}/{expected}) found yet for {pretty_key:?}."); + } + // libp2p RecordNotFound does mean no holders answered. + // it does not actually mean the record does not exist. + // just that those asked did not have it + GetRecordError::RecordNotFound => { + warn!("No holder of record '{pretty_key:?}' found."); + } + // This is returned during SplitRecordError, we should not get this error here. + GetRecordError::RecordKindMismatch => { + error!("Record kind mismatch for {pretty_key:?}. This error should not happen here."); + } + GetRecordError::SplitRecord { result_map } => { + error!("Encountered a split record for {pretty_key:?}."); + if let Some(record) = Self::handle_split_record_error(result_map, &key)? { + info!("Merged the split record (register) for {pretty_key:?}, into a single record"); + return Ok(record); } } - if result.is_err() { + GetRecordError::QueryTimeout => { + error!("Encountered query timeout for {pretty_key:?}."); + } + } + + match backoff.next() { + Some(Some(duration)) => { + crate::target_arch::sleep(duration).await; debug!("Getting record from network of {pretty_key:?} via backoff..."); } - result.map_err(|err| backoff::Error::Transient { - err: NetworkError::from(err), - retry_after: None, - }) - }, - ) - .await + _ => break Err(err.into()), + } + } } /// Handle the split record error. /// Spend: Accumulate spends and return error if more than one. /// Register: Merge registers and return the merged record. - #[cfg(not(target_arch = "wasm32"))] fn handle_split_record_error( result_map: &HashMap)>, key: &RecordKey, - ) -> std::result::Result, backoff::Error> { + ) -> std::result::Result, NetworkError> { let pretty_key = PrettyPrintRecordKey::from(key); // attempt to deserialise and accumulate any spends or registers @@ -615,9 +589,9 @@ impl Network { let kind = record_kind.get_or_insert(header.kind); if *kind != header.kind { error!("Encountered a split record for {pretty_key:?} with different RecordHeaders. Expected {kind:?} but got {:?}",header.kind); - return Err(backoff::Error::Permanent(NetworkError::GetRecordError( + return Err(NetworkError::GetRecordError( GetRecordError::RecordKindMismatch, - ))); + )); } // Accumulate the spends @@ -664,9 +638,7 @@ impl Network { info!("For record {pretty_key:?} task found split record for a spend, accumulated and sending them as a single record"); let accumulated_spends = accumulated_spends.into_iter().collect::>(); - return Err(backoff::Error::Permanent(NetworkError::DoubleSpendAttempt( - accumulated_spends, - ))); + return Err(NetworkError::DoubleSpendAttempt(accumulated_spends)); } else if !collected_registers.is_empty() { info!("For record {pretty_key:?} task found multiple registers, merging them."); let signed_register = collected_registers.iter().fold(collected_registers[0].clone(), |mut acc, x| { @@ -681,7 +653,7 @@ impl Network { error!( "Error while serializing the merged register for {pretty_key:?}: {err:?}" ); - backoff::Error::Permanent(NetworkError::from(err)) + NetworkError::from(err) })? .to_vec(); @@ -739,49 +711,35 @@ impl Network { /// Put `Record` to network /// Optionally verify the record is stored after putting it to network - /// If verify is on, retry multiple times within MAX_PUT_RETRY_DURATION duration. - #[cfg(target_arch = "wasm32")] - pub async fn put_record(&self, record: Record, cfg: &PutRecordCfg) -> Result<()> { - let pretty_key = PrettyPrintRecordKey::from(&record.key); - - info!("Attempting to PUT record with key: {pretty_key:?} to network, with cfg {cfg:?}"); - self.put_record_once(record.clone(), cfg).await - } - - /// Put `Record` to network - /// Optionally verify the record is stored after putting it to network - /// If verify is on, retry multiple times within MAX_PUT_RETRY_DURATION duration. - #[cfg(not(target_arch = "wasm32"))] + /// If verify is on, we retry. pub async fn put_record(&self, record: Record, cfg: &PutRecordCfg) -> Result<()> { let pretty_key = PrettyPrintRecordKey::from(&record.key); + let mut backoff = cfg + .retry_strategy + .unwrap_or(RetryStrategy::None) + .backoff() + .into_iter(); - // Here we only retry after a failed validation. - // So a long validation time will limit the number of PUT retries we attempt here. - let retry_duration = cfg.retry_strategy.map(|strategy| strategy.get_duration()); - backoff::future::retry( - backoff::ExponentialBackoff { - // None sets a random duration, but we'll be terminating with a BackoffError::Permanent, so retry will - // be disabled. - max_elapsed_time: retry_duration, - ..Default::default() - }, || async { - + loop { info!( "Attempting to PUT record with key: {pretty_key:?} to network, with cfg {cfg:?}, retrying via backoff..." ); - self.put_record_once(record.clone(), cfg).await.map_err(|err| - { - // FIXME: Skip if we get a permanent error during verification, e.g., DoubleSpendAttempt - warn!("Failed to PUT record with key: {pretty_key:?} to network (retry via backoff) with error: {err:?}"); - if cfg.retry_strategy.is_some() { - backoff::Error::Transient { err, retry_after: None } - } else { - backoff::Error::Permanent(err) - } + let err = match self.put_record_once(record.clone(), cfg).await { + Ok(_) => break Ok(()), + Err(err) => err, + }; - }) - }).await + // FIXME: Skip if we get a permanent error during verification, e.g., DoubleSpendAttempt + warn!("Failed to PUT record with key: {pretty_key:?} to network (retry via backoff) with error: {err:?}"); + + match backoff.next() { + Some(Some(duration)) => { + crate::target_arch::sleep(duration).await; + } + _ => break Err(err), + } + } } async fn put_record_once(&self, record: Record, cfg: &PutRecordCfg) -> Result<()> { diff --git a/sn_protocol/Cargo.toml b/sn_protocol/Cargo.toml index 58f2c45459..73aa9ba68e 100644 --- a/sn_protocol/Cargo.toml +++ b/sn_protocol/Cargo.toml @@ -41,6 +41,7 @@ tracing = { version = "~0.1.26" } prost = { version = "0.9" , optional=true } tonic = { version = "0.6.2", optional=true, default-features = false, features = ["prost", "tls", "codegen"]} xor_name = "5.0.0" +exponential-backoff = "2.0.0" [build-dependencies] diff --git a/sn_protocol/src/storage.rs b/sn_protocol/src/storage.rs index 2935e43fce..38e685f1d7 100644 --- a/sn_protocol/src/storage.rs +++ b/sn_protocol/src/storage.rs @@ -11,9 +11,9 @@ mod chunks; mod header; mod scratchpad; -use crate::error::Error; use core::fmt; -use std::{str::FromStr, time::Duration}; +use exponential_backoff::Backoff; +use std::{num::NonZeroUsize, time::Duration}; pub use self::{ address::{ChunkAddress, RegisterAddress, ScratchpadAddress, SpendAddress}, @@ -22,50 +22,48 @@ pub use self::{ scratchpad::Scratchpad, }; -/// Represents the strategy for retrying operations. This encapsulates both the duration it may take for an operation to -/// complete or the retry attempts that it may take. This allows the retry of each operation, e.g., PUT/GET of -/// Chunk/Registers/Spend to be more flexible. +/// A strategy that translates into a configuration for exponential backoff. +/// The first retry is done after 2 seconds, after which the backoff is roughly doubled each time. +/// The interval does not go beyond 32 seconds. So the intervals increase from 2 to 4, to 8, to 16, to 32 seconds and +/// all attempts are made at most 32 seconds apart. /// -/// The Duration/Attempts is chosen based on the internal logic. +/// The exact timings depend on jitter, which is set to 0.2, meaning the intervals can deviate quite a bit +/// from the ones listed in the docs. #[derive(Clone, Debug, Copy, Default)] pub enum RetryStrategy { - /// Quick: Resolves to a 15-second wait or 1 retry attempt. + /// Attempt once (no retries) + None, + /// Retry 3 times (waits 2s, 4s and lastly 8s; max total time ~14s) Quick, - /// Balanced: Resolves to a 60-second wait or 3 retry attempt. + /// Retry 5 times (waits 2s, 4s, 8s, 16s and lastly 32s; max total time ~62s) #[default] Balanced, - /// Persistent: Resolves to a 180-second wait or 6 retry attempt. + /// Retry 9 times (waits 2s, 4s, 8s, 16s, 32s, 32s, 32s, 32s and lastly 32s; max total time ~190s) Persistent, + /// Attempt a specific number of times + N(NonZeroUsize), } impl RetryStrategy { - pub fn get_duration(&self) -> Duration { + pub fn attempts(&self) -> usize { match self { - RetryStrategy::Quick => Duration::from_secs(15), - RetryStrategy::Balanced => Duration::from_secs(60), - RetryStrategy::Persistent => Duration::from_secs(180), + RetryStrategy::None => 1, + RetryStrategy::Quick => 4, + RetryStrategy::Balanced => 6, + RetryStrategy::Persistent => 10, + RetryStrategy::N(x) => x.get(), } } - pub fn get_count(&self) -> usize { - match self { - RetryStrategy::Quick => 1, - RetryStrategy::Balanced => 3, - RetryStrategy::Persistent => 6, - } - } -} - -impl FromStr for RetryStrategy { - type Err = Error; - - fn from_str(s: &str) -> Result { - match s.to_lowercase().as_str() { - "quick" => Ok(RetryStrategy::Quick), - "balanced" => Ok(RetryStrategy::Balanced), - "persistent" => Ok(RetryStrategy::Persistent), - _ => Err(Error::ParseRetryStrategyError), - } + pub fn backoff(&self) -> Backoff { + let mut backoff = Backoff::new( + self.attempts() as u32, + Duration::from_secs(1), // First interval is double of this (see https://github.com/yoshuawuyts/exponential-backoff/issues/23) + Some(Duration::from_secs(32)), + ); + backoff.set_factor(2); // Default. + backoff.set_jitter(0.2); // Default is 0.3. + backoff } } @@ -74,3 +72,28 @@ impl fmt::Display for RetryStrategy { write!(f, "{self:?}") } } + +#[test] +fn verify_retry_strategy_intervals() { + let intervals = |strategy: RetryStrategy| -> Vec { + let mut backoff = strategy.backoff(); + backoff.set_jitter(0.01); // Make intervals deterministic. + backoff + .into_iter() + .flatten() + .map(|duration| duration.as_secs_f64().round() as u32) + .collect() + }; + + assert_eq!(intervals(RetryStrategy::None), Vec::::new()); + assert_eq!(intervals(RetryStrategy::Quick), vec![2, 4, 8]); + assert_eq!(intervals(RetryStrategy::Balanced), vec![2, 4, 8, 16, 32]); + assert_eq!( + intervals(RetryStrategy::Persistent), + vec![2, 4, 8, 16, 32, 32, 32, 32, 32] + ); + assert_eq!( + intervals(RetryStrategy::N(NonZeroUsize::new(12).unwrap())), + vec![2, 4, 8, 16, 32, 32, 32, 32, 32, 32, 32] + ); +}