Skip to content

Commit

Permalink
Merge pull request #2387 from b-zee/feat-wasm-compatible-retry
Browse files Browse the repository at this point in the history
feat(sn_networking): use wasm compatible retry
  • Loading branch information
b-zee authored Nov 8, 2024
2 parents 3c7b3a5 + 429db18 commit 106f9be
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 186 deletions.
26 changes: 11 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sn_networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ libp2p = { version = "0.54.1", features = [
] }
async-trait = "0.1"
bytes = { version = "1.0.1", features = ["serde"] }
exponential-backoff = "2.0.0"
futures = "~0.3.13"
hex = "~0.4.3"
hyper = { version = "0.14", features = [
Expand Down Expand Up @@ -71,7 +72,6 @@ tokio = { version = "1.32.0", features = [
] }
tracing = { version = "~0.1.26" }
xor_name = "5.0.0"
backoff = { version = "0.4.0", features = ["tokio"] }
aes-gcm-siv = "0.11.1"
hkdf = "0.12"
sha2 = "0.10"
Expand Down
234 changes: 96 additions & 138 deletions sn_networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,9 @@ impl Network {
quorum: Quorum,
retry_strategy: Option<RetryStrategy>,
) -> Result<()> {
let mut total_attempts = 1;
total_attempts += retry_strategy
.map(|strategy| strategy.get_count())
.unwrap_or(0);
let total_attempts = retry_strategy
.map(|strategy| strategy.attempts())
.unwrap_or(1);

let pretty_key = PrettyPrintRecordKey::from(&chunk_address.to_record_key()).into_owned();
let expected_n_verified = get_quorum_value(&quorum);
Expand Down Expand Up @@ -479,30 +478,6 @@ impl Network {
Ok(all_register_copies)
}

/// Get a record from the network
/// This differs from non-wasm32 builds as no retries are applied
#[cfg(target_arch = "wasm32")]
pub async fn get_record_from_network(
&self,
key: RecordKey,
cfg: &GetRecordCfg,
) -> Result<Record> {
let pretty_key = PrettyPrintRecordKey::from(&key);
info!("Getting record from network of {pretty_key:?}. with cfg {cfg:?}",);
let (sender, receiver) = oneshot::channel();
self.send_network_swarm_cmd(NetworkSwarmCmd::GetNetworkRecord {
key: key.clone(),
sender,
cfg: cfg.clone(),
});
let result = receiver.await.map_err(|e| {
error!("When fetching record {pretty_key:?}, encountered a channel error {e:?}");
NetworkError::InternalMsgChannelDropped
})?;

result.map_err(NetworkError::from)
}

/// Get the Record from the network
/// Carry out re-attempts if required
/// In case a target_record is provided, only return when fetched target.
Expand All @@ -511,93 +486,92 @@ impl Network {
/// It also handles the split record error for spends and registers.
/// For spends, it accumulates the spends and returns an error if more than one.
/// For registers, it merges the registers and returns the merged record.
#[cfg(not(target_arch = "wasm32"))]
pub async fn get_record_from_network(
&self,
key: RecordKey,
cfg: &GetRecordCfg,
) -> Result<Record> {
let retry_duration = cfg.retry_strategy.map(|strategy| strategy.get_duration());
backoff::future::retry(
backoff::ExponentialBackoff {
// None sets a random duration, but we'll be terminating with a BackoffError::Permanent, so retry will
// be disabled.
max_elapsed_time: retry_duration,
..Default::default()
},
|| async {
let pretty_key = PrettyPrintRecordKey::from(&key);
info!("Getting record from network of {pretty_key:?}. with cfg {cfg:?}",);
let (sender, receiver) = oneshot::channel();
self.send_network_swarm_cmd(NetworkSwarmCmd::GetNetworkRecord {
key: key.clone(),
sender,
cfg: cfg.clone(),
});
let result = receiver.await.map_err(|e| {
error!("When fetching record {pretty_key:?}, encountered a channel error {e:?}");
NetworkError::InternalMsgChannelDropped
}).map_err(|err| backoff::Error::Transient { err, retry_after: None })?;

// log the results
match &result {
Ok(_) => {
info!("Record returned: {pretty_key:?}.");
}
Err(GetRecordError::RecordDoesNotMatch(_)) => {
warn!("The returned record does not match target {pretty_key:?}.");
}
Err(GetRecordError::NotEnoughCopies { expected, got, .. }) => {
warn!("Not enough copies ({got}/{expected}) found yet for {pretty_key:?}.");
}
// libp2p RecordNotFound does mean no holders answered.
// it does not actually mean the record does not exist.
// just that those asked did not have it
Err(GetRecordError::RecordNotFound) => {
warn!("No holder of record '{pretty_key:?}' found.");
}
// This is returned during SplitRecordError, we should not get this error here.
Err(GetRecordError::RecordKindMismatch) => {
error!("Record kind mismatch for {pretty_key:?}. This error should not happen here.");
}
Err(GetRecordError::SplitRecord { result_map }) => {
error!("Encountered a split record for {pretty_key:?}.");
if let Some(record) = Self::handle_split_record_error(result_map, &key)? {
info!("Merged the split record (register) for {pretty_key:?}, into a single record");
return Ok(record);
}
}
Err(GetRecordError::QueryTimeout) => {
error!("Encountered query timeout for {pretty_key:?}.");
}
};
let pretty_key = PrettyPrintRecordKey::from(&key);
let mut backoff = cfg
.retry_strategy
.unwrap_or(RetryStrategy::None)
.backoff()
.into_iter();

loop {
info!("Getting record from network of {pretty_key:?}. with cfg {cfg:?}",);
let (sender, receiver) = oneshot::channel();
self.send_network_swarm_cmd(NetworkSwarmCmd::GetNetworkRecord {
key: key.clone(),
sender,
cfg: cfg.clone(),
});
let result = match receiver.await {
Ok(result) => result,
Err(err) => {
error!(
"When fetching record {pretty_key:?}, encountered a channel error {err:?}"
);
// Do not attempt retries.
return Err(NetworkError::InternalMsgChannelDropped);
}
};

// if we don't want to retry, throw permanent error
if cfg.retry_strategy.is_none() {
if let Err(e) = result {
return Err(backoff::Error::Permanent(NetworkError::from(e)));
let err = match result {
Ok(record) => {
info!("Record returned: {pretty_key:?}.");
return Ok(record);
}
Err(err) => err,
};

// log the results
match &err {
GetRecordError::RecordDoesNotMatch(_) => {
warn!("The returned record does not match target {pretty_key:?}.");
}
GetRecordError::NotEnoughCopies { expected, got, .. } => {
warn!("Not enough copies ({got}/{expected}) found yet for {pretty_key:?}.");
}
// libp2p RecordNotFound does mean no holders answered.
// it does not actually mean the record does not exist.
// just that those asked did not have it
GetRecordError::RecordNotFound => {
warn!("No holder of record '{pretty_key:?}' found.");
}
// This is returned during SplitRecordError, we should not get this error here.
GetRecordError::RecordKindMismatch => {
error!("Record kind mismatch for {pretty_key:?}. This error should not happen here.");
}
GetRecordError::SplitRecord { result_map } => {
error!("Encountered a split record for {pretty_key:?}.");
if let Some(record) = Self::handle_split_record_error(result_map, &key)? {
info!("Merged the split record (register) for {pretty_key:?}, into a single record");
return Ok(record);
}
}
if result.is_err() {
GetRecordError::QueryTimeout => {
error!("Encountered query timeout for {pretty_key:?}.");
}
}

match backoff.next() {
Some(Some(duration)) => {
crate::target_arch::sleep(duration).await;
debug!("Getting record from network of {pretty_key:?} via backoff...");
}
result.map_err(|err| backoff::Error::Transient {
err: NetworkError::from(err),
retry_after: None,
})
},
)
.await
_ => break Err(err.into()),
}
}
}

/// Handle the split record error.
/// Spend: Accumulate spends and return error if more than one.
/// Register: Merge registers and return the merged record.
#[cfg(not(target_arch = "wasm32"))]
fn handle_split_record_error(
result_map: &HashMap<XorName, (Record, HashSet<PeerId>)>,
key: &RecordKey,
) -> std::result::Result<Option<Record>, backoff::Error<NetworkError>> {
) -> std::result::Result<Option<Record>, NetworkError> {
let pretty_key = PrettyPrintRecordKey::from(key);

// attempt to deserialise and accumulate any spends or registers
Expand All @@ -615,9 +589,9 @@ impl Network {
let kind = record_kind.get_or_insert(header.kind);
if *kind != header.kind {
error!("Encountered a split record for {pretty_key:?} with different RecordHeaders. Expected {kind:?} but got {:?}",header.kind);
return Err(backoff::Error::Permanent(NetworkError::GetRecordError(
return Err(NetworkError::GetRecordError(
GetRecordError::RecordKindMismatch,
)));
));
}

// Accumulate the spends
Expand Down Expand Up @@ -664,9 +638,7 @@ impl Network {
info!("For record {pretty_key:?} task found split record for a spend, accumulated and sending them as a single record");
let accumulated_spends = accumulated_spends.into_iter().collect::<Vec<SignedSpend>>();

return Err(backoff::Error::Permanent(NetworkError::DoubleSpendAttempt(
accumulated_spends,
)));
return Err(NetworkError::DoubleSpendAttempt(accumulated_spends));
} else if !collected_registers.is_empty() {
info!("For record {pretty_key:?} task found multiple registers, merging them.");
let signed_register = collected_registers.iter().fold(collected_registers[0].clone(), |mut acc, x| {
Expand All @@ -681,7 +653,7 @@ impl Network {
error!(
"Error while serializing the merged register for {pretty_key:?}: {err:?}"
);
backoff::Error::Permanent(NetworkError::from(err))
NetworkError::from(err)
})?
.to_vec();

Expand Down Expand Up @@ -739,49 +711,35 @@ impl Network {

/// Put `Record` to network
/// Optionally verify the record is stored after putting it to network
/// If verify is on, retry multiple times within MAX_PUT_RETRY_DURATION duration.
#[cfg(target_arch = "wasm32")]
pub async fn put_record(&self, record: Record, cfg: &PutRecordCfg) -> Result<()> {
let pretty_key = PrettyPrintRecordKey::from(&record.key);

info!("Attempting to PUT record with key: {pretty_key:?} to network, with cfg {cfg:?}");
self.put_record_once(record.clone(), cfg).await
}

/// Put `Record` to network
/// Optionally verify the record is stored after putting it to network
/// If verify is on, retry multiple times within MAX_PUT_RETRY_DURATION duration.
#[cfg(not(target_arch = "wasm32"))]
/// If verify is on, we retry.
pub async fn put_record(&self, record: Record, cfg: &PutRecordCfg) -> Result<()> {
let pretty_key = PrettyPrintRecordKey::from(&record.key);
let mut backoff = cfg
.retry_strategy
.unwrap_or(RetryStrategy::None)
.backoff()
.into_iter();

// Here we only retry after a failed validation.
// So a long validation time will limit the number of PUT retries we attempt here.
let retry_duration = cfg.retry_strategy.map(|strategy| strategy.get_duration());
backoff::future::retry(
backoff::ExponentialBackoff {
// None sets a random duration, but we'll be terminating with a BackoffError::Permanent, so retry will
// be disabled.
max_elapsed_time: retry_duration,
..Default::default()
}, || async {

loop {
info!(
"Attempting to PUT record with key: {pretty_key:?} to network, with cfg {cfg:?}, retrying via backoff..."
);
self.put_record_once(record.clone(), cfg).await.map_err(|err|
{
// FIXME: Skip if we get a permanent error during verification, e.g., DoubleSpendAttempt
warn!("Failed to PUT record with key: {pretty_key:?} to network (retry via backoff) with error: {err:?}");

if cfg.retry_strategy.is_some() {
backoff::Error::Transient { err, retry_after: None }
} else {
backoff::Error::Permanent(err)
}
let err = match self.put_record_once(record.clone(), cfg).await {
Ok(_) => break Ok(()),
Err(err) => err,
};

})
}).await
// FIXME: Skip if we get a permanent error during verification, e.g., DoubleSpendAttempt
warn!("Failed to PUT record with key: {pretty_key:?} to network (retry via backoff) with error: {err:?}");

match backoff.next() {
Some(Some(duration)) => {
crate::target_arch::sleep(duration).await;
}
_ => break Err(err),
}
}
}

async fn put_record_once(&self, record: Record, cfg: &PutRecordCfg) -> Result<()> {
Expand Down
1 change: 1 addition & 0 deletions sn_protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ tracing = { version = "~0.1.26" }
prost = { version = "0.9" , optional=true }
tonic = { version = "0.6.2", optional=true, default-features = false, features = ["prost", "tls", "codegen"]}
xor_name = "5.0.0"
exponential-backoff = "2.0.0"


[build-dependencies]
Expand Down
Loading

0 comments on commit 106f9be

Please sign in to comment.