diff --git a/linera-chain/src/data_types.rs b/linera-chain/src/data_types.rs index 39919bfb1a5..52fbff8f1fe 100644 --- a/linera-chain/src/data_types.rs +++ b/linera-chain/src/data_types.rs @@ -291,7 +291,7 @@ pub struct BlockProposal { pub owner: Owner, pub signature: Signature, #[debug(skip_if = Vec::is_empty)] - pub blobs: Vec, + pub published_blobs: Vec, #[debug(skip_if = Option::is_none)] pub validated_block_certificate: Option>, } @@ -720,7 +720,12 @@ pub struct ProposalContent { } impl BlockProposal { - pub fn new_initial(round: Round, block: Block, secret: &KeyPair, blobs: Vec) -> Self { + pub fn new_initial( + round: Round, + block: Block, + secret: &KeyPair, + published_blobs: Vec, + ) -> Self { let content = ProposalContent { round, block, @@ -731,7 +736,7 @@ impl BlockProposal { content, owner: secret.public().into(), signature, - blobs, + published_blobs, validated_block_certificate: None, } } @@ -740,7 +745,7 @@ impl BlockProposal { round: Round, validated_block_certificate: ValidatedBlockCertificate, secret: &KeyPair, - blobs: Vec, + published_blobs: Vec, ) -> Self { let lite_cert = validated_block_certificate.lite_certificate().cloned(); let executed_block = validated_block_certificate.into_inner().into_inner(); @@ -754,7 +759,7 @@ impl BlockProposal { content, owner: secret.public().into(), signature, - blobs, + published_blobs, validated_block_certificate: Some(lite_cert), } } diff --git a/linera-chain/src/manager.rs b/linera-chain/src/manager.rs index 3a9ef40fece..2748d615195 100644 --- a/linera-chain/src/manager.rs +++ b/linera-chain/src/manager.rs @@ -84,10 +84,10 @@ use rand_distr::{Distribution, WeightedAliasIndex}; use serde::{Deserialize, Serialize}; use crate::{ - data_types::{Block, BlockExecutionOutcome, BlockProposal, LiteVote, ProposalContent, Vote}, + data_types::{Block, BlockProposal, ExecutedBlock, LiteVote, ProposalContent, Vote}, types::{ - CertificateValue, ConfirmedBlockCertificate, HashedCertificateValue, TimeoutCertificate, - ValidatedBlockCertificate, + Certificate, CertificateValue, ConfirmedBlockCertificate, HashedCertificateValue, + LiteCertificate, TimeoutCertificate, ValidatedBlockCertificate, }, ChainError, }; @@ -120,12 +120,18 @@ pub struct ChainManager { /// validator). #[debug(skip_if = Option::is_none)] pub locked: Option, + /// These are the published blobs belonging to the locked block. + #[debug(skip_if = BTreeMap::is_empty)] + pub locked_published_blobs: BTreeMap, + /// These are the used blobs belonging to the locked block. + #[debug(skip_if = BTreeMap::is_empty)] + pub locked_used_blobs: BTreeMap, /// Latest leader timeout certificate we have received. #[debug(skip_if = Option::is_none)] pub timeout: Option, /// Latest vote we have cast, to validate or confirm. #[debug(skip_if = Option::is_none)] - pub pending: Option, + pub pending_vote: Option, /// Latest timeout vote we cast. #[debug(skip_if = Option::is_none)] pub timeout_vote: Option, @@ -145,9 +151,6 @@ pub struct ChainManager { /// The owners that take over in fallback mode. #[debug(skip_if = BTreeMap::is_empty)] pub fallback_owners: BTreeMap, - /// These are blobs belonging to proposed or validated blocks that have not been confirmed yet. - #[debug(skip_if = BTreeMap::is_empty)] - pub pending_blobs: BTreeMap, } doc_scalar!( @@ -209,20 +212,21 @@ impl ChainManager { fallback_distribution, proposed: None, locked: None, + locked_published_blobs: BTreeMap::new(), + locked_used_blobs: BTreeMap::new(), timeout: None, - pending: None, + pending_vote: None, timeout_vote: None, fallback_vote: None, round_timeout, current_round, fallback_owners, - pending_blobs: BTreeMap::new(), }) } /// Returns the most recent vote we cast. - pub fn pending(&self) -> Option<&Vote> { - self.pending.as_ref() + pub fn pending_vote(&self) -> Option<&Vote> { + self.pending_vote.as_ref() } /// Verifies the safety of a proposed block with respect to voting rules. @@ -352,7 +356,7 @@ impl ChainManager { ) -> Result { let new_block = &certificate.executed_block().block; let new_round = certificate.round; - if let Some(Vote { value, round, .. }) = &self.pending { + if let Some(Vote { value, round, .. }) = &self.pending_vote { match value.inner() { CertificateValue::ConfirmedBlock(confirmed) => { if &confirmed.inner().block == new_block && *round == new_round { @@ -385,36 +389,46 @@ impl ChainManager { Ok(Outcome::Accept) } + /// If the validated block certificate is more recent, return that certificate + /// so that the locked block can be updated. + pub fn maybe_update_locked( + &self, + validated_block_certificate: Option>, + executed_block: ExecutedBlock, + ) -> Option { + if let Some(lite_cert) = validated_block_certificate { + if self + .locked + .as_ref() + .map_or(true, |locked| locked.round < lite_cert.round) + { + let value = HashedCertificateValue::new_validated(executed_block); + return lite_cert.with_value(value); + } + } + + None + } + /// Signs a vote to validate the proposed block. pub fn create_vote( &mut self, proposal: BlockProposal, - outcome: BlockExecutionOutcome, + executed_block: ExecutedBlock, key_pair: Option<&KeyPair>, local_time: Timestamp, + used_blobs: Vec, + maybe_update_locked: Option, ) { + let proposal_content = proposal.content.clone(); + let published_blobs = proposal.published_blobs.clone(); // Record the proposed block, so it can be supplied to clients that request it. - self.proposed = Some(proposal.clone()); + self.update_proposed(proposal); self.update_current_round(local_time); - let ProposalContent { block, round, .. } = proposal.content; - let executed_block = outcome.with(block); - - // If the validated block certificate is more recent, update our locked block. - if let Some(lite_cert) = proposal.validated_block_certificate { - if self - .locked - .as_ref() - .map_or(true, |locked| locked.round < lite_cert.round) - { - let value = HashedCertificateValue::new_validated(executed_block.clone()); - if let Some(certificate) = lite_cert.with_value(value) { - self.locked = Some(certificate.into()); - } - } - } + let ProposalContent { round, .. } = proposal_content; - for blob in proposal.blobs { - self.pending_blobs.insert(blob.id(), blob); + if let Some(certificate) = maybe_update_locked { + self.update_locked(certificate.into(), published_blobs, used_blobs); } if let Some(key_pair) = key_pair { @@ -424,7 +438,7 @@ impl ChainManager { } else { HashedCertificateValue::new_validated(executed_block) }; - self.pending = Some(Vote::new(value, round, key_pair)); + self.pending_vote = Some(Vote::new(value, round, key_pair)); } } @@ -432,6 +446,8 @@ impl ChainManager { pub fn create_final_vote( &mut self, validated: ValidatedBlockCertificate, + published_blobs: Vec, + used_blobs: Vec, key_pair: Option<&KeyPair>, local_time: Timestamp, ) { @@ -442,7 +458,7 @@ impl ChainManager { return; } let confirmed = ConfirmedBlockCertificate::from_validated(validated.clone()); - self.locked = Some(validated); + self.update_locked(validated, published_blobs, used_blobs); self.update_current_round(local_time); if let Some(key_pair) = key_pair { // Vote to confirm. @@ -450,7 +466,7 @@ impl ChainManager { // back into `Certificate` type so that the vote is cast over hash of the old type. let vote = Vote::new(confirmed.into_inner().into(), round, key_pair); // Ok to overwrite validation votes with confirmation votes at equal or higher round. - self.pending = Some(vote); + self.pending_vote = Some(vote); } } @@ -561,6 +577,40 @@ impl ChainManager { fn is_super(&self, owner: &Owner) -> bool { self.ownership.super_owners.contains_key(owner) } + + fn update_locked( + &mut self, + new_locked: ValidatedBlockCertificate, + published_blobs: Vec, + used_blobs: Vec, + ) { + self.locked = Some(new_locked); + self.locked_published_blobs = published_blobs + .into_iter() + .map(|blob| (blob.id(), blob)) + .collect(); + self.locked_used_blobs = used_blobs + .into_iter() + .map(|blob| (blob.id(), blob)) + .collect(); + } + + fn update_proposed(&mut self, new_proposed: BlockProposal) { + self.proposed = Some(new_proposed); + } + + pub fn proposed_blobs(&self) -> BTreeMap { + self.proposed + .as_ref() + .map(|proposed| { + proposed + .published_blobs + .iter() + .map(|blob| (blob.id(), blob.clone())) + .collect() + }) + .unwrap_or_default() + } } /// Chain manager information that is included in `ChainInfo` sent to clients. @@ -576,12 +626,18 @@ pub struct ChainManagerInfo { /// validator). #[debug(skip_if = Option::is_none)] pub requested_locked: Option>, + /// Published blobs belonging to the locked block. + #[debug(skip_if = Vec::is_empty)] + pub locked_published_blobs: Vec, + /// Used blobs belonging to the locked block. + #[debug(skip_if = Vec::is_empty)] + pub locked_used_blobs: Vec, /// Latest timeout certificate we have seen. #[debug(skip_if = Option::is_none)] pub timeout: Option>, /// Latest vote we cast (either to validate or to confirm a block). #[debug(skip_if = Option::is_none)] - pub pending: Option, + pub pending_vote: Option, /// Latest timeout vote we cast. #[debug(skip_if = Option::is_none)] pub timeout_vote: Option, @@ -600,9 +656,6 @@ pub struct ChainManagerInfo { /// The timestamp when the current round times out. #[debug(skip_if = Option::is_none)] pub round_timeout: Option, - /// These are blobs belonging to proposed or validated blocks that have not been confirmed yet. - #[debug(skip_if = BTreeMap::is_empty)] - pub pending_blobs: BTreeMap, } impl From<&ChainManager> for ChainManagerInfo { @@ -612,15 +665,16 @@ impl From<&ChainManager> for ChainManagerInfo { ownership: manager.ownership.clone(), requested_proposed: None, requested_locked: None, + locked_published_blobs: Vec::new(), + locked_used_blobs: Vec::new(), timeout: manager.timeout.clone().map(Box::new), - pending: manager.pending.as_ref().map(|vote| vote.lite()), + pending_vote: manager.pending_vote.as_ref().map(|vote| vote.lite()), timeout_vote: manager.timeout_vote.as_ref().map(Vote::lite), fallback_vote: manager.fallback_vote.as_ref().map(Vote::lite), requested_pending_value: None, current_round, leader: manager.round_leader(current_round).cloned(), round_timeout: manager.round_timeout, - pending_blobs: BTreeMap::new(), } } } @@ -631,10 +685,11 @@ impl ChainManagerInfo { self.requested_proposed = manager.proposed.clone().map(Box::new); self.requested_locked = manager.locked.clone().map(Box::new); self.requested_pending_value = manager - .pending + .pending_vote .as_ref() .map(|vote| Box::new(vote.value.clone())); - self.pending_blobs = manager.pending_blobs.clone(); + self.locked_published_blobs = manager.locked_published_blobs.values().cloned().collect(); + self.locked_used_blobs = manager.locked_used_blobs.values().cloned().collect(); } /// Gets the highest validated block. diff --git a/linera-core/src/chain_worker/state/attempted_changes.rs b/linera-core/src/chain_worker/state/attempted_changes.rs index 14bd0debcef..acb22e8d0b8 100644 --- a/linera-core/src/chain_worker/state/attempted_changes.rs +++ b/linera-core/src/chain_worker/state/attempted_changes.rs @@ -133,9 +133,44 @@ where ) -> Result<(), WorkerError> { // Create the vote and store it in the chain state. let manager = self.state.chain.manager.get_mut(); - manager.create_vote(proposal, outcome, self.state.config.key_pair(), local_time); + + let executed_block = outcome.with(proposal.content.block.clone()); + let maybe_update_locked = manager.maybe_update_locked( + proposal.validated_block_certificate.clone(), + executed_block.clone(), + ); + + let used_blobs = if maybe_update_locked.is_some() { + let used_blob_ids = executed_block + .required_blob_ids() + .difference(&proposal.content.block.published_blob_ids()) + .cloned() + .collect::>(); + let used_blobs = self.state.storage.read_blobs(&used_blob_ids).await?; + let missing_blob_ids = used_blob_ids + .iter() + .zip(&used_blobs) + .filter_map(|(blob_id, blob)| blob.is_none().then_some(*blob_id)) + .collect::>(); + if !missing_blob_ids.is_empty() { + return Err(WorkerError::BlobsNotFound(missing_blob_ids)); + } + + used_blobs.into_iter().flatten().collect() + } else { + Vec::new() + }; + + manager.create_vote( + proposal, + executed_block, + self.state.config.key_pair(), + local_time, + used_blobs, + maybe_update_locked, + ); // Cache the value we voted on, so the client doesn't have to send it again. - if let Some(vote) = manager.pending() { + if let Some(vote) = manager.pending_vote() { self.state .recent_hashed_certificate_values .insert(Cow::Borrowed(&vote.value)) @@ -201,17 +236,26 @@ where let required_blob_ids = executed_block.required_blob_ids(); // Verify that no unrelated blobs were provided. self.state - .check_for_unneeded_blobs(&required_blob_ids, blobs)?; + .check_for_unneeded_or_duplicated_blobs(&required_blob_ids, blobs)?; let remaining_required_blob_ids = required_blob_ids .difference(&blobs.iter().map(|blob| blob.id()).collect()) .cloned() .collect(); - self.state - .check_no_missing_blobs(&remaining_required_blob_ids) + let mut validated_blobs = self + .state + .get_proposed_blobs(&remaining_required_blob_ids) .await?; + validated_blobs.extend_from_slice(blobs); + + let published_blob_ids = block.published_blob_ids(); + let (published_blobs, used_blobs) = validated_blobs + .into_iter() + .partition(|blob| published_blob_ids.contains(&blob.id())); let old_round = self.state.chain.manager.get().current_round; self.state.chain.manager.get_mut().create_final_vote( certificate, + published_blobs, + used_blobs, self.state.config.key_pair(), self.state.storage.clock().current_time(), ); @@ -292,14 +336,14 @@ where let required_blob_ids = executed_block.required_blob_ids(); // Verify that no unrelated blobs were provided. self.state - .check_for_unneeded_blobs(&required_blob_ids, blobs)?; + .check_for_unneeded_or_duplicated_blobs(&required_blob_ids, blobs)?; let remaining_required_blob_ids = required_blob_ids .difference(&blobs.iter().map(|blob| blob.id()).collect()) .cloned() .collect(); let mut blobs_in_block = self .state - .get_blobs_and_checks_storage(&remaining_required_blob_ids) + .get_proposed_or_locked_blobs_and_check_storage(&remaining_required_blob_ids) .await?; blobs_in_block.extend_from_slice(blobs); diff --git a/linera-core/src/chain_worker/state/mod.rs b/linera-core/src/chain_worker/state/mod.rs index b627f7d1b17..af8902d34c6 100644 --- a/linera-core/src/chain_worker/state/mod.rs +++ b/linera-core/src/chain_worker/state/mod.rs @@ -306,39 +306,95 @@ where } /// Returns an error if the block requires a blob we don't have. - /// Looks for the blob in: chain manager's pending blobs and storage. - async fn check_no_missing_blobs( + /// Looks for the blob in: chain manager's proposed blobs and storage. + async fn get_proposed_blobs( &self, - required_blob_ids: &HashSet, - ) -> Result<(), WorkerError> { - let pending_blobs = &self.chain.manager.get().pending_blobs; - let missing_blob_ids = required_blob_ids - .iter() - .filter(|blob_id| !pending_blobs.contains_key(blob_id)) - .cloned() - .collect::>(); + blob_ids: &HashSet, + ) -> Result, WorkerError> { + let proposed_blobs = self.chain.manager.get().proposed_blobs(); + let (mut found_blobs, missing_blob_ids) = self.get_found_and_missing_blobs( + blob_ids, + &proposed_blobs, + &BTreeMap::new(), + &BTreeMap::new(), + ); - let missing_blob_ids = self - .storage - .missing_blobs(missing_blob_ids.as_slice()) - .await?; + let blobs = self.storage.read_blobs(&missing_blob_ids).await?; + let missing_blob_ids: Vec<_> = missing_blob_ids + .into_iter() + .zip(&blobs) + .filter_map(|(blob_id, blob)| blob.is_none().then_some(blob_id)) + .collect(); + found_blobs.extend(blobs.into_iter().flatten()); if missing_blob_ids.is_empty() { - return Ok(()); + Ok(found_blobs) + } else { + Err(WorkerError::BlobsNotFound(missing_blob_ids)) + } + } + + /// Returns the blobs requested by their `blob_ids` that are in the chain manager's + /// proposed or locked blobs and checks that they are otherwise in storage. + async fn get_proposed_or_locked_blobs_and_check_storage( + &self, + blob_ids: &HashSet, + ) -> Result, WorkerError> { + let manager = self.chain.manager.get(); + + let locked_published_blobs = &manager.locked_published_blobs; + let locked_used_blobs = &manager.locked_used_blobs; + let proposed_blobs = &manager.proposed_blobs(); + let (found_blobs, missing_blob_ids) = self.get_found_and_missing_blobs( + blob_ids, + proposed_blobs, + locked_published_blobs, + locked_used_blobs, + ); + + let not_found_blob_ids = self.storage.missing_blobs(&missing_blob_ids).await?; + if not_found_blob_ids.is_empty() { + Ok(found_blobs) + } else { + Err(WorkerError::BlobsNotFound(not_found_blob_ids)) } + } - Err(WorkerError::BlobsNotFound(missing_blob_ids)) + fn get_found_and_missing_blobs( + &self, + blob_ids: &HashSet, + proposed_blobs: &BTreeMap, + locked_published_blobs: &BTreeMap, + locked_used_blobs: &BTreeMap, + ) -> (Vec, Vec) { + let mut found_blobs = Vec::new(); + let mut missing_blob_ids = Vec::new(); + for blob_id in blob_ids { + match proposed_blobs + .get(blob_id) + .or_else(|| locked_published_blobs.get(blob_id)) + .or_else(|| locked_used_blobs.get(blob_id)) + { + Some(blob) => found_blobs.push(blob.clone()), + None => missing_blob_ids.push(*blob_id), + } + } + (found_blobs, missing_blob_ids) } - /// Returns an error if unrelated blobs were provided. - fn check_for_unneeded_blobs( + /// Returns an error if unrelated or duplicated blobs were provided. + fn check_for_unneeded_or_duplicated_blobs( &self, required_blob_ids: &HashSet, blobs: &[Blob], ) -> Result<(), WorkerError> { - // Find all certificates containing blobs used when executing this block. + let mut seen_blob_ids = HashSet::new(); for blob in blobs { let blob_id = blob.id(); + ensure!( + seen_blob_ids.insert(blob_id), + WorkerError::DuplicatedBlob { blob_id } + ); ensure!( required_blob_ids.contains(&blob_id), WorkerError::UnneededBlob { blob_id } @@ -348,32 +404,6 @@ where Ok(()) } - /// Returns the blobs requested by their `blob_ids` that are in the chain manager's pending blobs - /// and checks that they are otherwise in storage. - async fn get_blobs_and_checks_storage( - &self, - blob_ids: &HashSet, - ) -> Result, WorkerError> { - let pending_blobs = &self.chain.manager.get().pending_blobs; - - let mut found_blobs = Vec::new(); - let mut missing_blob_ids = Vec::new(); - for blob_id in blob_ids { - if let Some(blob) = pending_blobs.get(blob_id) { - found_blobs.push(blob.clone()); - } else { - missing_blob_ids.push(*blob_id); - } - } - let not_found_blob_ids = self.storage.missing_blobs(&missing_blob_ids).await?; - - if not_found_blob_ids.is_empty() { - Ok(found_blobs) - } else { - Err(WorkerError::BlobsNotFound(not_found_blob_ids)) - } - } - /// Adds any newly created chains to the set of `tracked_chains`, if the parent chain is /// also tracked. /// diff --git a/linera-core/src/chain_worker/state/temporary_changes.rs b/linera-core/src/chain_worker/state/temporary_changes.rs index fa5e642c613..42c1b51b7fe 100644 --- a/linera-core/src/chain_worker/state/temporary_changes.rs +++ b/linera-core/src/chain_worker/state/temporary_changes.rs @@ -159,7 +159,7 @@ where forced_oracle_responses, }, owner, - blobs, + published_blobs, validated_block_certificate, signature: _, } = proposal; @@ -208,19 +208,19 @@ where // legitimately required. // Actual execution happens below, after other validity checks. self.0.chain.remove_bundles_from_inboxes(block).await?; - // Verify that no unrelated blobs were provided. + // Verify that no unrelated published blobs were provided. let published_blob_ids = block.published_blob_ids(); self.0 - .check_for_unneeded_blobs(&published_blob_ids, blobs)?; + .check_for_unneeded_or_duplicated_blobs(&published_blob_ids, published_blobs)?; let missing_published_blob_ids = published_blob_ids - .difference(&blobs.iter().map(|blob| blob.id()).collect()) + .difference(&published_blobs.iter().map(|blob| blob.id()).collect()) .cloned() .collect::>(); ensure!( missing_published_blob_ids.is_empty(), WorkerError::BlobsNotFound(missing_published_blob_ids) ); - for blob in blobs { + for blob in published_blobs { Self::check_blob_size(blob.content(), &policy)?; } diff --git a/linera-core/src/client/chain_client_state.rs b/linera-core/src/client/chain_client_state.rs index a8116a0c520..def6641bb90 100644 --- a/linera-core/src/client/chain_client_state.rs +++ b/linera-core/src/client/chain_client_state.rs @@ -33,15 +33,15 @@ pub struct ChainClientState { /// /// This is always at the same height as `next_block_height`. pending_block: Option, + /// This contains blobs belonging to our `pending_block` that may not even have + /// been processed by (i.e. been proposed to) our own local chain manager yet. + pending_blobs: BTreeMap, /// Known key pairs from present and past identities. known_key_pairs: BTreeMap, /// For each validator, up to which index we have synchronized their /// [`ChainStateView::received_log`]. received_certificate_trackers: HashMap, - /// This contains blobs belonging to our `pending_block` that may not even have - /// been processed by (i.e. been proposed to) our own local chain manager yet. - pending_blobs: BTreeMap, /// A mutex that is held whilst we are performing operations that should not be /// attempted by multiple clients at the same time. diff --git a/linera-core/src/client/mod.rs b/linera-core/src/client/mod.rs index a9edb271d41..8203f08ea1f 100644 --- a/linera-core/src/client/mod.rs +++ b/linera-core/src/client/mod.rs @@ -1094,7 +1094,7 @@ where .expect("The result of executing a proposal is always an executed block") .outcome .required_blob_ids(); - let proposed_blobs = proposal.blobs.clone(); + let proposed_blobs = proposal.published_blobs.clone(); let submit_action = CommunicateAction::SubmitBlock { proposal, blob_ids: required_blob_ids, @@ -1306,10 +1306,7 @@ where .client .local_node .next_block_heights(remote_max_heights.keys(), chain_worker_limit) - .await - .map_err(|error| NodeError::LocalError { - error: error.to_string(), - })?; + .await?; // We keep track of the height we've successfully downloaded and checked, per chain. let mut downloaded_heights = BTreeMap::new(); @@ -1699,9 +1696,7 @@ where .await .map_err(|error| match error { ChainClientError::RemoteNodeError(error) => error, - _ => NodeError::LocalError { - error: error.to_string(), - }, + _ => error.into(), }) } }, @@ -1784,7 +1779,11 @@ where { if let LocalNodeError::BlobsNotFound(blob_ids) = &original_err { if let Some(new_blobs) = remote_node - .find_missing_blobs(blob_ids.clone(), chain_id) + .find_missing_blobs( + blob_ids.clone(), + &info.manager.locked_published_blobs, + &info.manager.locked_used_blobs, + ) .await? { blobs = new_blobs; @@ -1928,13 +1927,13 @@ where } } - /// Tries to read blobs from either the pending blobs or the local node's cache, or - /// storage + /// Tries to read blobs from the chain client's pending blobs. #[instrument(level = "trace")] async fn read_local_blobs( &self, blob_ids: impl IntoIterator + std::fmt::Debug, ) -> Result, LocalNodeError> { + let mut missing_blob_ids = Vec::new(); let mut blobs = Vec::new(); for blob_id in blob_ids { let maybe_blob = self.pending_blobs().get(&blob_id).cloned(); @@ -1943,26 +1942,16 @@ where continue; } - let maybe_blob = { - let chain_state_view = self.chain_state_view().await?; - chain_state_view - .manager - .get() - .pending_blobs - .get(&blob_id) - .cloned() - }; - - if let Some(blob) = maybe_blob { - blobs.push(blob); - continue; - } + missing_blob_ids.push(blob_id); + } - return Err(LocalNodeError::CannotReadLocalBlob { + if !missing_blob_ids.is_empty() { + return Err(LocalNodeError::CannotReadLocalBlobs { chain_id: self.chain_id, - blob_id, + blob_ids: missing_blob_ids, }); } + Ok(blobs) } @@ -2034,17 +2023,22 @@ where }; // Collect the hashed certificate values required for execution. let committee = self.local_committee().await?; - let blobs = self.read_local_blobs(block.published_blob_ids()).await?; // Create the final block proposal. let key_pair = self.key_pair().await?; let proposal = if let Some(cert) = manager.requested_locked { - Box::new(BlockProposal::new_retry(round, *cert, &key_pair, blobs)) + Box::new(BlockProposal::new_retry( + round, + *cert, + &key_pair, + manager.locked_published_blobs, + )) } else { + let published_blobs = self.read_local_blobs(block.published_blob_ids()).await?; Box::new(BlockProposal::new_initial( round, block.clone(), &key_pair, - blobs, + published_blobs, )) }; // Check the final block proposal. This will be cheaper after #1401. diff --git a/linera-core/src/local_node.rs b/linera-core/src/local_node.rs index 5418991a3f6..bda06b57bfd 100644 --- a/linera-core/src/local_node.rs +++ b/linera-core/src/local_node.rs @@ -3,7 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 use std::{ - collections::{BTreeMap, VecDeque}, + collections::{BTreeMap, HashSet, VecDeque}, sync::Arc, }; @@ -14,7 +14,7 @@ use linera_base::{ }; use linera_chain::{ data_types::{Block, BlockProposal, ExecutedBlock}, - types::{Certificate, ConfirmedBlockCertificate, LiteCertificate}, + types::{Certificate, CertificateValue, ConfirmedBlockCertificate, LiteCertificate}, ChainStateView, }; use linera_execution::{Query, Response}; @@ -26,7 +26,6 @@ use tracing::{instrument, warn}; use crate::{ data_types::{BlockHeightRange, ChainInfo, ChainInfoQuery, ChainInfoResponse}, - node::NodeError, notifier::Notifier, worker::{WorkerError, WorkerState}, }; @@ -60,8 +59,11 @@ pub enum LocalNodeError { #[error("Local node operation failed: {0}")] WorkerError(WorkerError), - #[error("Failed to read blob {blob_id:?} of chain {chain_id:?}")] - CannotReadLocalBlob { chain_id: ChainId, blob_id: BlobId }, + #[error("Failed to read blobs {blob_ids:?} of chain {chain_id:?}")] + CannotReadLocalBlobs { + chain_id: ChainId, + blob_ids: Vec, + }, #[error("The local node doesn't have an active chain {0:?}")] InactiveChain(ChainId), @@ -71,6 +73,12 @@ pub enum LocalNodeError { #[error("Blobs not found: {0:?}")] BlobsNotFound(Vec), + + #[error("A validator requested blobs that are not required: {0:?}")] + UnrequiredBlobsRequested(Vec), + + #[error("Blobs requested by validator contain duplicates")] + DuplicateBlobsRequested, } impl From for LocalNodeError { @@ -185,44 +193,86 @@ where /// Given a list of missing `BlobId`s and a `Certificate` for a block: /// - Searches for the blob in different places of the local node: blob cache, /// chain manager's pending blobs, and blob storage. - /// - Returns `None` if not all blobs could be found. + /// - Returns `Some` for blobs that were found and `None` for the ones still missing. pub async fn find_missing_blobs( &self, - mut missing_blob_ids: Vec, - chain_id: ChainId, - ) -> Result>, NodeError> { + missing_blob_ids: Vec, + certificate: &Certificate, + ) -> Result>, LocalNodeError> { if missing_blob_ids.is_empty() { - return Ok(Some(Vec::new())); + return Ok(Vec::new()); } - let mut chain_manager_pending_blobs = self - .chain_state_view(chain_id) - .await? - .manager - .get() - .pending_blobs - .clone(); - let mut found_blobs = Vec::new(); - missing_blob_ids.retain(|blob_id| { - if let Some(blob) = chain_manager_pending_blobs.remove(blob_id) { - found_blobs.push(blob); - false - } else { - true + // Find the missing blobs locally and retry. + match certificate.inner() { + CertificateValue::ConfirmedBlock(confirmed) => { + self.check_missing_blob_ids( + &confirmed.inner().required_blob_ids(), + &missing_blob_ids, + )?; + let storage = self.storage_client(); + Ok(storage.read_blobs(&missing_blob_ids).await?) } - }); + CertificateValue::ValidatedBlock(validated) => { + let unique_missing_blob_ids = self.check_missing_blob_ids( + &validated.inner().required_blob_ids(), + &missing_blob_ids, + )?; + let chain_id = validated.inner().block.chain_id; + let chain = self.chain_state_view(chain_id).await?; + let manager = chain.manager.get(); + let locked_blobs = manager + .locked_published_blobs + .iter() + .chain(manager.locked_used_blobs.iter()) + .map(|(k, v)| (k, v.clone())) + .collect::>(); - let storage = self.storage_client(); - let Some(read_blobs) = storage - .read_blobs(&missing_blob_ids) - .await? - .into_iter() - .collect::>>() - else { - return Ok(None); - }; - found_blobs.extend(read_blobs); - Ok(Some(found_blobs)) + Ok(unique_missing_blob_ids + .iter() + .map(|blob_id| locked_blobs.get(blob_id).cloned()) + .collect()) + } + CertificateValue::Timeout(_) => { + warn!( + "validator requested blobs {:?} but they're not required", + missing_blob_ids + ); + Err(LocalNodeError::UnrequiredBlobsRequested(missing_blob_ids)) + } + } + } + + fn check_missing_blob_ids( + &self, + required_blob_ids: &HashSet, + missing_blob_ids: &Vec, + ) -> Result, LocalNodeError> { + let mut unrequired_blob_ids = Vec::new(); + for blob_id in missing_blob_ids { + if !required_blob_ids.contains(blob_id) { + warn!( + "validator requested blob {:?} but it is not required", + blob_id + ); + + unrequired_blob_ids.push(*blob_id); + } + } + + if !unrequired_blob_ids.is_empty() { + return Err(LocalNodeError::UnrequiredBlobsRequested( + unrequired_blob_ids, + )); + } + + let unique_missing_blob_ids = missing_blob_ids.iter().cloned().collect::>(); + if missing_blob_ids.len() > unique_missing_blob_ids.len() { + warn!("blobs requested by validator contain duplicates"); + return Err(LocalNodeError::DuplicateBlobsRequested); + } + + Ok(unique_missing_blob_ids) } /// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its diff --git a/linera-core/src/node.rs b/linera-core/src/node.rs index 7b7f4e19e42..3535d455235 100644 --- a/linera-core/src/node.rs +++ b/linera-core/src/node.rs @@ -27,7 +27,9 @@ use serde::{Deserialize, Serialize}; use thiserror::Error; use crate::{ + client::ChainClientError, data_types::{ChainInfoQuery, ChainInfoResponse}, + local_node::LocalNodeError, worker::{Notification, WorkerError}, }; @@ -238,6 +240,22 @@ impl From for NodeError { } } +impl From for NodeError { + fn from(error: ChainClientError) -> Self { + Self::LocalError { + error: error.to_string(), + } + } +} + +impl From for NodeError { + fn from(error: LocalNodeError) -> Self { + Self::LocalError { + error: error.to_string(), + } + } +} + impl CrossChainMessageDelivery { pub fn new(wait_for_outgoing_messages: bool) -> Self { if wait_for_outgoing_messages { diff --git a/linera-core/src/remote_node.rs b/linera-core/src/remote_node.rs index 68f2bcb942c..722e96739a7 100644 --- a/linera-core/src/remote_node.rs +++ b/linera-core/src/remote_node.rs @@ -208,37 +208,29 @@ impl RemoteNode { } } - /// Downloads the blobs from the specified validator and returns them, including blobs that - /// are still pending in the validator's chain manager. Returns `None` if not all of them - /// could be found. + /// Checks that the given blobs are present in the provided lists of locked blobs from the + /// chain manager. If they're not there, try to download the missing ones from this validator. + /// Returns `None` if not all of them could be found. pub(crate) async fn find_missing_blobs( &self, blob_ids: Vec, - chain_id: ChainId, + locked_published_blobs: &[Blob], + locked_used_blobs: &[Blob], ) -> Result>, NodeError> { - let query = ChainInfoQuery::new(chain_id).with_manager_values(); - let info = match self.handle_chain_info_query(query).await { - Ok(info) => Some(info), - Err(err) => { - warn!("Got error from validator {}: {}", self.name, err); - return Ok(None); - } - }; - - let mut missing_blobs = blob_ids; - let mut found_blobs = if let Some(info) = info { - let new_found_blobs = missing_blobs - .iter() - .filter_map(|blob_id| info.manager.pending_blobs.get(blob_id)) - .map(|blob| (blob.id(), blob.clone())) - .collect::>(); - missing_blobs.retain(|blob_id| !new_found_blobs.contains_key(blob_id)); - new_found_blobs.into_values().collect() - } else { - Vec::new() - }; + let mut missing_blob_ids = blob_ids.into_iter().collect::>(); + let new_found_blobs = locked_published_blobs + .iter() + .chain(locked_used_blobs.iter()) + .filter(|blob| missing_blob_ids.contains(&blob.id())) + .map(|blob| (blob.id(), blob.clone())) + .collect::>(); + missing_blob_ids.retain(|blob_id| !new_found_blobs.contains_key(blob_id)); + let mut found_blobs: Vec = new_found_blobs.into_values().collect(); - if let Some(blobs) = self.try_download_blobs(&missing_blobs).await { + if let Some(blobs) = self + .try_download_blobs(&missing_blob_ids.into_iter().collect::>()) + .await + { found_blobs.extend(blobs); Ok(Some(found_blobs)) } else { diff --git a/linera-core/src/unit_tests/wasm_worker_tests.rs b/linera-core/src/unit_tests/wasm_worker_tests.rs index d9bd25ba0f0..35389efff00 100644 --- a/linera-core/src/unit_tests/wasm_worker_tests.rs +++ b/linera-core/src/unit_tests/wasm_worker_tests.rs @@ -165,7 +165,7 @@ where assert_eq!(BlockHeight::from(1), info.next_block_height); assert_eq!(Timestamp::from(1), info.timestamp); assert_eq!(Some(publish_certificate.hash()), info.block_hash); - assert!(info.manager.pending.is_none()); + assert!(info.manager.pending_vote.is_none()); let mut creator_system_state = SystemExecutionState { committees: [(Epoch::ZERO, committee.clone())].into_iter().collect(), @@ -248,7 +248,7 @@ where assert_eq!(BlockHeight::from(1), info.next_block_height); assert_eq!(Timestamp::from(2), info.timestamp); assert_eq!(Some(create_certificate.hash()), info.block_hash); - assert!(info.manager.pending.is_none()); + assert!(info.manager.pending_vote.is_none()); // Execute an application operation let increment = 5_u64; @@ -301,6 +301,6 @@ where assert_eq!(BlockHeight::from(2), info.next_block_height); assert_eq!(Some(run_certificate.hash()), info.block_hash); assert_eq!(Timestamp::from(3), info.timestamp); - assert!(info.manager.pending.is_none()); + assert!(info.manager.pending_vote.is_none()); Ok(()) } diff --git a/linera-core/src/unit_tests/worker_tests.rs b/linera-core/src/unit_tests/worker_tests.rs index 9e3518986db..eaeee26c80a 100644 --- a/linera-core/src/unit_tests/worker_tests.rs +++ b/linera-core/src/unit_tests/worker_tests.rs @@ -446,7 +446,7 @@ where ); let chain = worker.chain_state_view(ChainId::root(1)).await?; assert!(chain.is_active()); - assert!(chain.manager.get().pending().is_none()); + assert!(chain.manager.get().pending_vote().is_none()); Ok(()) } @@ -495,7 +495,7 @@ where ); let chain = worker.chain_state_view(ChainId::root(1)).await?; assert!(chain.is_active()); - assert!(chain.manager.get().pending().is_none()); + assert!(chain.manager.get().pending_vote().is_none()); Ok(()) } @@ -607,7 +607,7 @@ where ); let chain = worker.chain_state_view(ChainId::root(1)).await?; assert!(chain.is_active()); - assert!(chain.manager.get().pending().is_none()); + assert!(chain.manager.get().pending_vote().is_none()); Ok(()) } @@ -656,7 +656,7 @@ where ); let chain = worker.chain_state_view(ChainId::root(1)).await?; assert!(chain.is_active()); - assert!(chain.manager.get().pending().is_none()); + assert!(chain.manager.get().pending_vote().is_none()); drop(chain); worker @@ -664,7 +664,7 @@ where .await?; let chain = worker.chain_state_view(ChainId::root(1)).await?; assert!(chain.is_active()); - assert!(chain.manager.get().pending().is_some()); + assert!(chain.manager.get().pending_vote().is_some()); drop(chain); worker .handle_certificate(certificate0, vec![], None) @@ -672,7 +672,7 @@ where worker.handle_block_proposal(block_proposal1).await?; let chain = worker.chain_state_view(ChainId::root(1)).await?; assert!(chain.is_active()); - assert!(chain.manager.get().pending().is_some()); + assert!(chain.manager.get().pending_vote().is_some()); drop(chain); assert_matches!( worker.handle_block_proposal(block_proposal0.clone()).await, @@ -1082,7 +1082,7 @@ where ); let chain = worker.chain_state_view(ChainId::root(1)).await?; assert!(chain.is_active()); - assert!(chain.manager.get().pending().is_none()); + assert!(chain.manager.get().pending_vote().is_none()); Ok(()) } @@ -1114,9 +1114,9 @@ where chain_info_response.check(&ValidatorName(worker.public_key()))?; let chain = worker.chain_state_view(ChainId::root(1)).await?; assert!(chain.is_active()); - let pending_value = chain.manager.get().pending().unwrap().lite(); + let pending_value = chain.manager.get().pending_vote().unwrap().lite(); assert_eq!( - chain_info_response.info.manager.pending.unwrap(), + chain_info_response.info.manager.pending_vote.unwrap(), pending_value ); Ok(()) @@ -1847,7 +1847,7 @@ where assert_eq!(Amount::ZERO, info.chain_balance); assert_eq!(BlockHeight::from(1), info.next_block_height); assert_eq!(Some(certificate.hash()), info.block_hash); - assert!(info.manager.pending.is_none()); + assert!(info.manager.pending_vote.is_none()); assert_eq!( worker .query_application(ChainId::root(1), Query::System(SystemQuery)) @@ -1973,7 +1973,7 @@ where assert_eq!(Amount::ZERO, info.chain_balance); assert_eq!(BlockHeight::from(1), info.next_block_height); assert_eq!(Some(certificate.hash()), info.block_hash); - assert!(info.manager.pending.is_none()); + assert!(info.manager.pending_vote.is_none()); Ok(()) } @@ -3212,12 +3212,12 @@ where let value1 = HashedCertificateValue::new_validated(executed_block1.clone()); // If we send the validated block certificate to the worker, it votes to confirm. - let vote = response.info.manager.pending.clone().unwrap(); + let vote = response.info.manager.pending_vote.clone().unwrap(); let certificate1 = vote.with_value(value1.clone()).unwrap().into_certificate(); let (response, _) = worker .handle_certificate(certificate1.clone(), vec![], None) .await?; - let vote = response.info.manager.pending.as_ref().unwrap(); + let vote = response.info.manager.pending_vote.as_ref().unwrap(); let value = HashedCertificateValue::new_confirmed(executed_block1.clone()); assert_eq!(vote.value, value.lite()); @@ -3278,7 +3278,7 @@ where response.info.manager.requested_locked, Some(Box::new(certificate2.into())) ); - let vote = response.info.manager.pending.as_ref().unwrap(); + let vote = response.info.manager.pending_vote.as_ref().unwrap(); assert_eq!(vote.value, lite_value2); assert_eq!(vote.round, Round::SingleLeader(5)); @@ -3460,7 +3460,7 @@ where let (executed_block1, _) = worker.stage_block_execution(block1.clone()).await?; let value1 = HashedCertificateValue::new_confirmed(executed_block1); let (response, _) = worker.handle_block_proposal(proposal1).await?; - let vote = response.info.manager.pending.as_ref().unwrap(); + let vote = response.info.manager.pending_vote.as_ref().unwrap(); assert_eq!(vote.value.value_hash, value1.hash()); // Set the clock to the end of the round. @@ -3512,7 +3512,7 @@ where response.info.manager.requested_locked, Some(Box::new(certificate2.into())) ); - let vote = response.info.manager.pending.as_ref().unwrap(); + let vote = response.info.manager.pending_vote.as_ref().unwrap(); assert_eq!(vote.value, lite_value2); assert_eq!(vote.round, Round::MultiLeader(1)); Ok(()) diff --git a/linera-core/src/updater.rs b/linera-core/src/updater.rs index 1f7a857b986..9800caab4d1 100644 --- a/linera-core/src/updater.rs +++ b/linera-core/src/updater.rs @@ -27,7 +27,7 @@ use tracing::error; use crate::{ data_types::{ChainInfo, ChainInfoQuery}, - local_node::LocalNodeClient, + local_node::{LocalNodeClient, LocalNodeError}, node::{CrossChainMessageDelivery, NodeError, ValidatorNode}, remote_node::RemoteNode, }; @@ -212,21 +212,29 @@ where .handle_optimized_certificate(&certificate, delivery) .await; - match &result { - Err(original_err @ NodeError::BlobsNotFound(blob_ids)) => { - self.remote_node - .check_blobs_not_found(&certificate, blob_ids)?; + if let Err(NodeError::BlobsNotFound(blob_ids)) = result { + self.remote_node + .check_blobs_not_found(&certificate, &blob_ids)?; - let blobs = self - .local_node - .find_missing_blobs(blob_ids.clone(), certificate.inner().chain_id()) - .await? - .ok_or_else(|| original_err.clone())?; - self.remote_node - .handle_certificate(certificate, blobs, delivery) - .await + let blobs = self + .local_node + .find_missing_blobs(blob_ids.clone(), &certificate) + .await?; + let missing_blob_ids = blobs + .iter() + .zip(&blob_ids) + .filter_map(|(maybe_blob, blob_id)| maybe_blob.is_none().then_some(*blob_id)) + .collect::>(); + if !missing_blob_ids.is_empty() { + return Err(LocalNodeError::BlobsNotFound(missing_blob_ids).into()); } - _ => result, + + let blobs = blobs.into_iter().flatten().collect(); + self.remote_node + .handle_certificate(certificate, blobs, delivery) + .await + } else { + result } } @@ -237,7 +245,7 @@ where ) -> Result, NodeError> { let chain_id = proposal.content.block.chain_id; let mut sent_cross_chain_updates = false; - for blob in &proposal.blobs { + for blob in &proposal.published_blobs { blob_ids.remove(&blob.id()); // Keep only blobs we may need to resend. } loop { @@ -404,14 +412,14 @@ where let vote = match action { CommunicateAction::SubmitBlock { proposal, blob_ids } => { let info = self.send_block_proposal(proposal, blob_ids).await?; - info.manager.pending + info.manager.pending_vote } CommunicateAction::FinalizeBlock { certificate, delivery, } => { let info = self.send_certificate(certificate, delivery).await?; - info.manager.pending + info.manager.pending_vote } CommunicateAction::RequestTimeout { .. } => { let query = ChainInfoQuery::new(chain_id).with_timeout(); diff --git a/linera-core/src/worker.rs b/linera-core/src/worker.rs index 2e3dec882e7..71c5bfbc0c5 100644 --- a/linera-core/src/worker.rs +++ b/linera-core/src/worker.rs @@ -201,6 +201,8 @@ pub enum WorkerError { UnneededValue { value_hash: CryptoHash }, #[error("An additional blob was provided that is not required: {blob_id}.")] UnneededBlob { blob_id: BlobId }, + #[error("A duplicated blob was provided: {blob_id}.")] + DuplicatedBlob { blob_id: BlobId }, #[error("The certificate in the block proposal is not a ValidatedBlock")] MissingExecutedBlockInProposal, #[error("Fast blocks cannot query oracles")] diff --git a/linera-rpc/proto/rpc.proto b/linera-rpc/proto/rpc.proto index b5dc521daf4..28020908d94 100644 --- a/linera-rpc/proto/rpc.proto +++ b/linera-rpc/proto/rpc.proto @@ -194,8 +194,8 @@ message BlockProposal { // A lite certificate for a validated block that justifies the proposal in this round. optional bytes validated_block_certificate = 6; - // Required blob - bytes blobs = 7; + // Blobs being published by this block proposal. + bytes published_blobs = 7; } // A certified statement from the committee, without the value. diff --git a/linera-rpc/src/grpc/conversions.rs b/linera-rpc/src/grpc/conversions.rs index e30faf9d83a..7f764036fea 100644 --- a/linera-rpc/src/grpc/conversions.rs +++ b/linera-rpc/src/grpc/conversions.rs @@ -189,7 +189,7 @@ impl TryFrom for api::BlockProposal { content: bincode::serialize(&block_proposal.content)?, owner: Some(block_proposal.owner.into()), signature: Some(block_proposal.signature.into()), - blobs: bincode::serialize(&block_proposal.blobs)?, + published_blobs: bincode::serialize(&block_proposal.published_blobs)?, validated_block_certificate: block_proposal .validated_block_certificate .map(|cert| bincode::serialize(&cert)) @@ -211,7 +211,7 @@ impl TryFrom for BlockProposal { content, owner: try_proto_convert(block_proposal.owner)?, signature: try_proto_convert(block_proposal.signature)?, - blobs: bincode::deserialize(&block_proposal.blobs)?, + published_blobs: bincode::deserialize(&block_proposal.published_blobs)?, validated_block_certificate: block_proposal .validated_block_certificate .map(|bytes| bincode::deserialize(&bytes)) @@ -938,7 +938,7 @@ pub mod tests { }, owner: Owner::from(KeyPair::generate().public()), signature: Signature::new(&Foo("test".into()), &KeyPair::generate()), - blobs: vec![], + published_blobs: vec![], validated_block_certificate: Some(cert), }; diff --git a/linera-rpc/tests/snapshots/format__format.yaml.snap b/linera-rpc/tests/snapshots/format__format.yaml.snap index 0dddf4081ba..a932268eb42 100644 --- a/linera-rpc/tests/snapshots/format__format.yaml.snap +++ b/linera-rpc/tests/snapshots/format__format.yaml.snap @@ -124,7 +124,7 @@ BlockProposal: TYPENAME: Owner - signature: TYPENAME: Signature - - blobs: + - published_blobs: SEQ: TYPENAME: BlobContent - validated_block_certificate: @@ -260,10 +260,16 @@ ChainManagerInfo: - requested_locked: OPTION: TYPENAME: ValidatedBlockCertificate + - locked_published_blobs: + SEQ: + TYPENAME: BlobContent + - locked_used_blobs: + SEQ: + TYPENAME: BlobContent - timeout: OPTION: TYPENAME: TimeoutCertificate - - pending: + - pending_vote: OPTION: TYPENAME: LiteVote - timeout_vote: @@ -283,12 +289,6 @@ ChainManagerInfo: - round_timeout: OPTION: TYPENAME: Timestamp - - pending_blobs: - MAP: - KEY: - TYPENAME: BlobId - VALUE: - TYPENAME: BlobContent ChainOwnership: STRUCT: - super_owners: diff --git a/linera-service/src/linera/main.rs b/linera-service/src/linera/main.rs index ef0113c071b..7b538d6ad40 100644 --- a/linera-service/src/linera/main.rs +++ b/linera-service/src/linera/main.rs @@ -733,7 +733,7 @@ impl Runnable for Job { .into_iter() .filter_map(|message| { let response = deserialize_response(message)?; - let vote = response.info.manager.pending?; + let vote = response.info.manager.pending_vote?; let value = values.get(&vote.value.value_hash)?.clone(); vote.clone().with_value(value) })