Skip to content

Commit

Permalink
BlobNotFoundOnRead -> BlobsNotFound
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-ds committed Nov 7, 2024
1 parent d169df6 commit ca23cb5
Show file tree
Hide file tree
Showing 14 changed files with 215 additions and 125 deletions.
15 changes: 13 additions & 2 deletions linera-chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use data_types::{MessageBundle, Origin, PostedMessage};
use linera_base::{
crypto::{CryptoError, CryptoHash},
data_types::{ArithmeticError, BlockHeight, Round, Timestamp},
identifiers::{ApplicationId, ChainId},
identifiers::{ApplicationId, BlobId, ChainId},
};
use linera_execution::ExecutionError;
use linera_views::views::ViewError;
Expand All @@ -32,7 +32,7 @@ pub enum ChainError {
#[error("Arithmetic error: {0}")]
ArithmeticError(#[from] ArithmeticError),
#[error("Error in view operation: {0}")]
ViewError(#[from] ViewError),
ViewError(ViewError),
#[error("Execution error: {0} during {1:?}")]
ExecutionError(ExecutionError, ChainExecutionContext),

Expand Down Expand Up @@ -150,6 +150,17 @@ pub enum ChainError {
expected: CryptoHash,
actual: CryptoHash,
},
#[error("Blobs not found: {0:?}")]
BlobsNotFound(Vec<BlobId>),
}

impl From<ViewError> for ChainError {
fn from(error: ViewError) -> Self {
match error {
ViewError::BlobsNotFound(blob_ids) => ChainError::BlobsNotFound(blob_ids),
error => ChainError::ViewError(error),
}
}
}

#[derive(Copy, Clone, Debug)]
Expand Down
66 changes: 51 additions & 15 deletions linera-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,10 +446,10 @@ where
#[derive(Debug, Error)]
pub enum ChainClientError {
#[error("Local node operation failed: {0}")]
LocalNodeError(#[from] LocalNodeError),
LocalNodeError(LocalNodeError),

#[error("Remote node operation failed: {0}")]
RemoteNodeError(#[from] NodeError),
RemoteNodeError(NodeError),

#[error(transparent)]
ArithmeticError(#[from] ArithmeticError),
Expand All @@ -458,7 +458,7 @@ pub enum ChainClientError {
JsonError(#[from] serde_json::Error),

#[error("Chain operation failed: {0}")]
ChainError(#[from] ChainError),
ChainError(ChainError),

#[error(transparent)]
CommunicationError(#[from] CommunicationError<NodeError>),
Expand Down Expand Up @@ -494,12 +494,51 @@ pub enum ChainClientError {
FoundMultipleKeysForChain(ChainId),

#[error(transparent)]
ViewError(#[from] ViewError),
ViewError(ViewError),

#[error("Blobs not found: {0:?}")]
BlobsNotFound(Vec<BlobId>),
}

impl From<NodeError> for ChainClientError {
fn from(error: NodeError) -> Self {
match error {
NodeError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
error => Self::RemoteNodeError(error),
}
}
}

impl From<ViewError> for ChainClientError {
fn from(error: ViewError) -> Self {
match error {
ViewError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
error => Self::ViewError(error),
}
}
}

impl From<LocalNodeError> for ChainClientError {
fn from(error: LocalNodeError) -> Self {
match error {
LocalNodeError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
error => Self::LocalNodeError(error),
}
}
}

impl From<ChainError> for ChainClientError {
fn from(error: ChainError) -> Self {
match error {
ChainError::BlobsNotFound(blob_ids)
| ChainError::ExecutionError(ExecutionError::BlobsNotFound(blob_ids), _) => {
Self::BlobsNotFound(blob_ids)
}
error => Self::ChainError(error),
}
}
}

impl From<Infallible> for ChainClientError {
fn from(infallible: Infallible) -> Self {
match infallible {}
Expand Down Expand Up @@ -1122,7 +1161,7 @@ where
// necessary.
if let Err(err) = self.process_certificate(certificate.clone(), vec![]).await {
match &err {
LocalNodeError::WorkerError(WorkerError::BlobsNotFound(blob_ids)) => {
LocalNodeError::BlobsNotFound(blob_ids) => {
let blobs = LocalNodeClient::<S>::download_blobs(blob_ids, &nodes).await;

ensure!(blobs.len() == blob_ids.len(), err);
Expand Down Expand Up @@ -1623,8 +1662,8 @@ where
.handle_block_proposal(*proposal.clone())
.await
{
if let Some(blob_ids) = original_err.get_blobs_not_found() {
self.update_local_node_with_blobs_from(blob_ids, remote_node)
if let LocalNodeError::BlobsNotFound(blob_ids) = &original_err {
self.update_local_node_with_blobs_from(blob_ids.clone(), remote_node)
.await?;
continue; // We found the missing blobs: retry.
}
Expand All @@ -1641,9 +1680,7 @@ where
let mut blobs = vec![];
while let Err(original_err) = self.client.handle_certificate(*cert.clone(), blobs).await
{
if let LocalNodeError::WorkerError(WorkerError::BlobsNotFound(blob_ids)) =
&original_err
{
if let LocalNodeError::BlobsNotFound(blob_ids) = &original_err {
blobs = remote_node
.find_missing_blobs(blob_ids.clone(), chain_id)
.await?;
Expand Down Expand Up @@ -1792,11 +1829,10 @@ where
.local_node
.stage_block_execution(block.clone())
.await;
if let Err(err) = &result {
if let Some(blob_ids) = err.get_blobs_not_found() {
self.receive_certificates_for_blobs(blob_ids).await?;
continue; // We found the missing blob: retry.
}
if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
self.receive_certificates_for_blobs(blob_ids.clone())
.await?;
continue; // We found the missing blob: retry.
}
return Ok(result?);
}
Expand Down
64 changes: 32 additions & 32 deletions linera-core/src/local_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use linera_chain::{
data_types::{
Block, BlockProposal, Certificate, CertificateValue, ExecutedBlock, LiteCertificate,
},
ChainError, ChainStateView,
ChainStateView,
};
use linera_execution::{ExecutionError, Query, Response, SystemExecutionError};
use linera_execution::{Query, Response};
use linera_storage::Storage;
use linera_views::views::ViewError;
use rand::{prelude::SliceRandom, thread_rng};
Expand Down Expand Up @@ -59,10 +59,10 @@ pub enum LocalNodeError {
ArithmeticError(#[from] ArithmeticError),

#[error(transparent)]
ViewError(#[from] linera_views::views::ViewError),
ViewError(ViewError),

#[error("Local node operation failed: {0}")]
WorkerError(#[from] WorkerError),
WorkerError(WorkerError),

#[error(
"Failed to download certificates and update local node to the next height \
Expand All @@ -83,35 +83,35 @@ pub enum LocalNodeError {
InvalidChainInfoResponse,

#[error(transparent)]
NodeError(#[from] NodeError),
NodeError(NodeError),

#[error("Blobs not found: {0:?}")]
BlobsNotFound(Vec<BlobId>),
}

impl LocalNodeError {
pub fn get_blobs_not_found(&self) -> Option<Vec<BlobId>> {
match self {
LocalNodeError::WorkerError(WorkerError::ChainError(chain_error)) => {
match **chain_error {
ChainError::ExecutionError(
ExecutionError::SystemError(SystemExecutionError::BlobNotFoundOnRead(
blob_id,
)),
_,
)
| ChainError::ExecutionError(
ExecutionError::ViewError(ViewError::BlobNotFoundOnRead(blob_id)),
_,
) => Some(vec![blob_id]),
_ => None,
}
}
LocalNodeError::WorkerError(WorkerError::BlobsNotFound(blob_ids)) => {
Some(blob_ids.clone())
}
LocalNodeError::NodeError(NodeError::BlobNotFoundOnRead(blob_id)) => {
Some(vec![*blob_id])
}
LocalNodeError::NodeError(NodeError::BlobsNotFound(blob_ids)) => Some(blob_ids.clone()),
_ => None,
impl From<WorkerError> for LocalNodeError {
fn from(error: WorkerError) -> Self {
match error {
WorkerError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
error => LocalNodeError::WorkerError(error),
}
}
}

impl From<NodeError> for LocalNodeError {
fn from(error: NodeError) -> Self {
match error {
NodeError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
error => LocalNodeError::NodeError(error),
}
}
}

impl From<ViewError> for LocalNodeError {
fn from(error: ViewError) -> Self {
match error {
ViewError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
error => LocalNodeError::ViewError(error),
}
}
}
Expand Down Expand Up @@ -293,7 +293,7 @@ where

result = match &result {
Err(err) => {
if let Some(blob_ids) = err.get_blobs_not_found() {
if let LocalNodeError::BlobsNotFound(blob_ids) = &err {
let blobs = remote_node.try_download_blobs(blob_ids.as_slice()).await;
if blobs.len() != blob_ids.len() {
result
Expand Down
27 changes: 12 additions & 15 deletions linera-core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use linera_chain::{
};
use linera_execution::{
committee::{Committee, ValidatorName},
ExecutionError, SystemExecutionError,
ExecutionError,
};
use linera_version::VersionInfo;
use linera_views::views::ViewError;
Expand Down Expand Up @@ -178,7 +178,7 @@ pub enum NodeError {
height: BlockHeight,
},

#[error("The following blobs are missing: {0:?}.")]
#[error("Blobs not found: {0:?}")]
BlobsNotFound(Vec<BlobId>),

// This error must be normalized during conversions.
Expand Down Expand Up @@ -216,8 +216,6 @@ pub enum NodeError {
#[error("Failed to make a chain info query on the local node: {error}")]
LocalNodeQuery { error: String },

#[error("Blob not found on storage read: {0}")]
BlobNotFoundOnRead(BlobId),
#[error("Node failed to provide a 'last used by' certificate for the blob")]
InvalidCertificateForBlob(BlobId),
#[error("Local error handling validator response")]
Expand Down Expand Up @@ -252,8 +250,11 @@ impl CrossChainMessageDelivery {

impl From<ViewError> for NodeError {
fn from(error: ViewError) -> Self {
Self::ViewError {
error: error.to_string(),
match error {
ViewError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
error => Self::ViewError {
error: error.to_string(),
},
}
}
}
Expand Down Expand Up @@ -287,14 +288,10 @@ impl From<ChainError> for NodeError {
height,
},
ChainError::InactiveChain(chain_id) => Self::InactiveChain(chain_id),
ChainError::ExecutionError(
ExecutionError::SystemError(SystemExecutionError::BlobNotFoundOnRead(blob_id)),
_,
)
| ChainError::ExecutionError(
ExecutionError::ViewError(ViewError::BlobNotFoundOnRead(blob_id)),
_,
) => Self::BlobNotFoundOnRead(blob_id),
ChainError::BlobsNotFound(blob_ids)
| ChainError::ExecutionError(ExecutionError::BlobsNotFound(blob_ids), _) => {
Self::BlobsNotFound(blob_ids)
}
error => Self::ChainError {
error: error.to_string(),
},
Expand All @@ -307,7 +304,7 @@ impl From<WorkerError> for NodeError {
match error {
WorkerError::ChainError(error) => (*error).into(),
WorkerError::MissingCertificateValue => Self::MissingCertificateValue,
WorkerError::BlobsNotFound(blob_ids) => NodeError::BlobsNotFound(blob_ids),
WorkerError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
error => Self::WorkerError {
error: error.to_string(),
},
Expand Down
4 changes: 2 additions & 2 deletions linera-core/src/unit_tests/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ where
let handle_block_proposal_result =
Self::handle_block_proposal(proposal, &mut validator).await;
let result = match handle_block_proposal_result {
Some(Err(NodeError::BlobsNotFound(_) | NodeError::BlobNotFoundOnRead(_))) => {
Some(Err(NodeError::BlobsNotFound(_))) => {
handle_block_proposal_result.expect("handle_block_proposal_result should be Some")
}
_ => match validator.fault_type {
Expand Down Expand Up @@ -354,7 +354,7 @@ where
let handle_certificate_result =
Self::handle_certificate(certificate, validator, blobs).await;
match handle_certificate_result {
Some(Err(NodeError::BlobsNotFound(_) | NodeError::BlobNotFoundOnRead(_))) => {
Some(Err(NodeError::BlobsNotFound(_))) => {
handle_certificate_result.expect("handle_certificate_result should be Some")
}
_ => match validator.fault_type {
Expand Down
4 changes: 2 additions & 2 deletions linera-core/src/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,8 @@ where
// synchronize them now and retry.
self.send_chain_information_for_senders(chain_id).await?;
}
Err(NodeError::BlobNotFoundOnRead(_)) if !blob_ids.is_empty() => {
// For `BlobNotFoundOnRead`, we assume that the local node should already be
Err(NodeError::BlobsNotFound(_)) if !blob_ids.is_empty() => {
// For `BlobsNotFound`, we assume that the local node should already be
// updated with the needed blobs, so sending the chain information about the
// certificates that last used the blobs to the validator node should be enough.
let missing_blob_ids = stream::iter(mem::take(&mut blob_ids))
Expand Down
Loading

0 comments on commit ca23cb5

Please sign in to comment.