Skip to content

Commit

Permalink
Return multiple blobs not found at once
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-ds committed Oct 16, 2024
1 parent 9fd99b9 commit 18622a1
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 64 deletions.
112 changes: 67 additions & 45 deletions linera-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use dashmap::{
DashMap,
};
use futures::{
future::{self, FusedFuture, Future},
future::{self, try_join_all, FusedFuture, Future},
stream::{self, AbortHandle, FusedStream, FuturesUnordered, StreamExt},
};
#[cfg(not(target_arch = "wasm32"))]
Expand Down Expand Up @@ -500,8 +500,8 @@ pub enum ChainClientError {
#[error(transparent)]
ViewError(#[from] ViewError),

#[error("Blob not found: {0}")]
BlobNotFound(BlobId),
#[error("Blobs not found: {0:?}")]
BlobsNotFound(Vec<BlobId>),
}

impl From<Infallible> for ChainClientError {
Expand Down Expand Up @@ -1493,21 +1493,12 @@ where
.handle_block_proposal(*proposal.clone())
.await
{
if let LocalNodeError::WorkerError(WorkerError::ChainError(chain_error)) =
&original_err
{
if let ChainError::ExecutionError(
ExecutionError::SystemError(SystemExecutionError::BlobNotFoundOnRead(
blob_id,
)),
_,
) = &**chain_error
{
self.update_local_node_with_blob_from(*blob_id, remote_node)
.await?;
continue; // We found the missing blob: retry.
}
if let Some(blob_ids) = original_err.get_blobs_not_found() {
self.update_local_node_with_blobs_from(blob_ids, remote_node)
.await?;
continue; // We found the missing blobs: retry.
}

warn!(
"Skipping proposal from {} and validator {}: {}",
owner, remote_node.name, original_err
Expand Down Expand Up @@ -1543,40 +1534,76 @@ where

/// Downloads and processes from the specified validator a confirmed block certificate that
/// uses the given blob. If this succeeds, the blob will be in our storage.
async fn update_local_node_with_blob_from(
async fn update_local_node_with_blobs_from(
&self,
blob_id: BlobId,
blob_ids: Vec<BlobId>,
remote_node: &RemoteNode<P::Node>,
) -> Result<(), ChainClientError> {
let certificate = remote_node.download_certificate_for_blob(blob_id).await?;
// This will download all ancestors of the certificate and process all of them locally.
self.receive_certificate(certificate).await?;
try_join_all(blob_ids.into_iter().map(|blob_id| async move {
let certificate = remote_node.download_certificate_for_blob(blob_id).await?;
// This will download all ancestors of the certificate and process all of them locally.
self.receive_certificate(certificate).await
}))
.await?;

Ok(())
}

/// Downloads and processes a confirmed block certificate that uses the given blob.
/// If this succeeds, the blob will be in our storage.
async fn receive_certificate_for_blob(&self, blob_id: BlobId) -> Result<(), ChainClientError> {
pub async fn receive_certificate_for_blob(
&self,
blob_id: BlobId,
) -> Result<(), ChainClientError> {
self.receive_certificates_for_blobs(vec![blob_id]).await
}

/// Downloads and processes confirmed block certificates that use the given blobs.
/// If this succeeds, the blobs will be in our storage.
pub async fn receive_certificates_for_blobs(
&self,
blob_ids: Vec<BlobId>,
) -> Result<(), ChainClientError> {
let validators = self.validator_nodes().await?;
let mut tasks = FuturesUnordered::new();
for remote_node in validators {
let remote_node = remote_node.clone();
tasks.push(async move {
let cert = remote_node
.download_certificate_for_blob(blob_id)
.await
.ok()?;
self.receive_certificate(cert).await.ok()
});
let mut tasks = BTreeMap::new();

for blob_id in blob_ids {
if tasks.contains_key(&blob_id) {
continue;
}

tasks.insert(
blob_id,
LocalNodeClient::<S>::download_certificate_for_blob_from_validators_futures(
&validators,
blob_id,
)
.await,
);
}

while let Some(result) = tasks.next().await {
if result.is_some() {
return Ok(());
let mut missing_blobs = Vec::new();
for (blob_id, mut blob_id_tasks) in tasks {
let mut found_blob = false;
while let Some(result) = blob_id_tasks.next().await {
if let Some(cert) = result {
if self.receive_certificate(cert).await.is_ok() {
found_blob = true;
break;
}
}
}

if !found_blob {
missing_blobs.push(blob_id);
}
}

Err(ChainClientError::BlobNotFound(blob_id))
if missing_blobs.is_empty() {
Ok(())
} else {
Err(ChainClientError::BlobsNotFound(missing_blobs))
}
}

/// Attempts to execute the block locally. If any incoming message execution fails, that
Expand Down Expand Up @@ -1634,14 +1661,9 @@ where
.local_node
.stage_block_execution(block.clone())
.await;
if let Err(LocalNodeError::WorkerError(WorkerError::ChainError(chain_error))) = &result
{
if let ChainError::ExecutionError(
ExecutionError::SystemError(SystemExecutionError::BlobNotFoundOnRead(blob_id)),
_,
) = &**chain_error
{
self.receive_certificate_for_blob(*blob_id).await?;
if let Err(err) = &result {
if let Some(blob_ids) = err.get_blobs_not_found() {
self.receive_certificates_for_blobs(blob_ids).await?;
continue; // We found the missing blob: retry.
}
}
Expand Down
80 changes: 68 additions & 12 deletions linera-core/src/local_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
use std::{
borrow::Cow,
collections::{HashMap, HashSet, VecDeque},
future::Future,
sync::Arc,
};

use futures::future;
use futures::{future, stream::FuturesUnordered};
use linera_base::{
data_types::{ArithmeticError, Blob, BlockHeight, UserApplicationDescription},
identifiers::{BlobId, ChainId, MessageId, UserApplicationId},
Expand All @@ -17,12 +18,12 @@ use linera_chain::{
data_types::{
Block, BlockProposal, Certificate, CertificateValue, ExecutedBlock, LiteCertificate,
},
ChainStateView,
ChainError, ChainStateView,
};
use linera_execution::{Query, Response};
use linera_execution::{ExecutionError, Query, Response, SystemExecutionError};
use linera_storage::Storage;
use linera_views::views::ViewError;
use rand::seq::SliceRandom as _;
use rand::{prelude::SliceRandom, thread_rng};
use thiserror::Error;
use tokio::sync::OwnedRwLockReadGuard;
use tracing::warn;
Expand Down Expand Up @@ -86,6 +87,36 @@ pub enum LocalNodeError {
NodeError(#[from] NodeError),
}

impl LocalNodeError {
pub fn get_blobs_not_found(&self) -> Option<Vec<BlobId>> {
match self {
LocalNodeError::WorkerError(WorkerError::ChainError(chain_error)) => {
match **chain_error {
ChainError::ExecutionError(
ExecutionError::SystemError(SystemExecutionError::BlobNotFoundOnRead(
blob_id,
)),
_,
)
| ChainError::ExecutionError(
ExecutionError::ViewError(ViewError::BlobNotFoundOnRead(blob_id)),
_,
) => Some(vec![blob_id]),
_ => None,
}
}
LocalNodeError::WorkerError(WorkerError::BlobsNotFound(blob_ids)) => {
Some(blob_ids.clone())
}
LocalNodeError::NodeError(NodeError::BlobNotFoundOnRead(blob_id)) => {
Some(vec![*blob_id])
}
LocalNodeError::NodeError(NodeError::BlobsNotFound(blob_ids)) => Some(blob_ids.clone()),
_ => None,
}
}
}

impl<S> LocalNodeClient<S>
where
S: Storage + Clone + Send + Sync + 'static,
Expand Down Expand Up @@ -273,13 +304,17 @@ where
.await;

result = match &result {
Err(LocalNodeError::WorkerError(WorkerError::BlobsNotFound(blob_ids))) => {
let blobs = remote_node.try_download_blobs(blob_ids).await;
if blobs.len() != blob_ids.len() {
result
Err(err) => {
if let Some(blob_ids) = err.get_blobs_not_found() {
let blobs = remote_node.try_download_blobs(blob_ids.as_slice()).await;
if blobs.len() != blob_ids.len() {
result
} else {
self.handle_certificate(certificate, blobs, notifications)
.await
}
} else {
self.handle_certificate(certificate, blobs, notifications)
.await
result
}
}
_ => result,
Expand Down Expand Up @@ -370,7 +405,7 @@ where
) -> Result<Box<ChainInfo>, LocalNodeError> {
// Sequentially try each validator in random order.
let mut validators: Vec<_> = validators.iter().collect();
validators.shuffle(&mut rand::thread_rng());
validators.shuffle(&mut thread_rng());
for remote_node in validators {
let info = self.local_chain_info(chain_id).await?;
if target_next_block_height <= info.next_block_height {
Expand Down Expand Up @@ -460,7 +495,7 @@ where
) -> Option<Blob> {
// Sequentially try each validator in random order.
let mut validators: Vec<_> = validators.iter().collect();
validators.shuffle(&mut rand::thread_rng());
validators.shuffle(&mut thread_rng());
for remote_node in validators {
if let Some(blob) = remote_node.try_download_blob(blob_id).await {
return Some(blob);
Expand All @@ -469,6 +504,27 @@ where
None
}

#[tracing::instrument(level = "trace", skip(validators))]
pub async fn download_certificate_for_blob_from_validators_futures(
validators: &[RemoteNode<impl ValidatorNode>],
blob_id: BlobId,
) -> FuturesUnordered<impl Future<Output = Option<Certificate>> + '_> {
let futures = FuturesUnordered::new();

let mut validators: Vec<_> = validators.iter().collect();
validators.shuffle(&mut thread_rng());
for remote_node in validators {
futures.push(async move {
remote_node
.download_certificate_for_blob(blob_id)
.await
.ok()
});
}

futures
}

#[tracing::instrument(level = "trace", skip(nodes))]
pub async fn download_blobs(
blob_ids: &[BlobId],
Expand Down
4 changes: 4 additions & 0 deletions linera-core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ impl From<ChainError> for NodeError {
ChainError::ExecutionError(
ExecutionError::SystemError(SystemExecutionError::BlobNotFoundOnRead(blob_id)),
_,
)
| ChainError::ExecutionError(
ExecutionError::ViewError(ViewError::BlobNotFoundOnRead(blob_id)),
_,
) => Self::BlobNotFoundOnRead(blob_id),
error => Self::ChainError {
error: error.to_string(),
Expand Down
2 changes: 1 addition & 1 deletion linera-core/src/unit_tests/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1420,7 +1420,7 @@ where
.await;
assert_matches!(
result,
Err(ChainClientError::BlobNotFound(not_found_blob_id)) if not_found_blob_id == blob0_id
Err(ChainClientError::BlobsNotFound(not_found_blob_ids)) if not_found_blob_ids == [blob0_id]
);

// Take one validator down
Expand Down
3 changes: 0 additions & 3 deletions linera-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,6 @@ pub enum ExecutionError {
local_time: Timestamp,
},

#[error("Blob not found on storage read: {0}")]
BlobNotFoundOnRead(BlobId),

#[error("Event keys can be at most {MAX_EVENT_KEY_LEN} bytes.")]
EventKeyTooLong,
#[error("Stream names can be at most {MAX_STREAM_NAME_LEN} bytes.")]
Expand Down
3 changes: 1 addition & 2 deletions linera-storage/src/db_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,8 +467,7 @@ where
let maybe_blob_bytes = self.store.read_value::<Vec<u8>>(&blob_key).await?;
#[cfg(with_metrics)]
READ_BLOB_COUNTER.with_label_values(&[]).inc();
let blob_bytes =
maybe_blob_bytes.ok_or_else(|| ViewError::not_found("value for blob ID", blob_id))?;
let blob_bytes = maybe_blob_bytes.ok_or_else(|| ViewError::BlobNotFoundOnRead(blob_id))?;
Ok(Blob::new_with_id_unchecked(blob_id, blob_bytes))
}

Expand Down
6 changes: 5 additions & 1 deletion linera-views/src/views/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use std::{fmt::Debug, io::Write};

use async_trait::async_trait;
use linera_base::{crypto::CryptoHash, data_types::ArithmeticError};
use linera_base::{crypto::CryptoHash, data_types::ArithmeticError, identifiers::BlobId};
pub use linera_views_derive::{
ClonableView, CryptoHashRootView, CryptoHashView, HashableView, RootView, View,
};
Expand Down Expand Up @@ -154,6 +154,10 @@ pub enum ViewError {
/// The value is too large for the client
#[error("The value is too large for the client")]
TooLargeValue,

/// Blob not found when trying to read it.
#[error("Blob not found on storage read: {0}")]
BlobNotFoundOnRead(BlobId),
}

impl ViewError {
Expand Down

0 comments on commit 18622a1

Please sign in to comment.