diff --git a/linera-chain/src/certificate/generic.rs b/linera-chain/src/certificate/generic.rs index 7d5257d9200..f7d1a00146f 100644 --- a/linera-chain/src/certificate/generic.rs +++ b/linera-chain/src/certificate/generic.rs @@ -2,14 +2,17 @@ // Copyright (c) Zefchain Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::collections::HashSet; + use custom_debug_derive::Debug; use linera_base::{ crypto::{CryptoHash, Signature}, data_types::Round, + identifiers::BlobId, }; use linera_execution::committee::{Committee, ValidatorName}; -use super::hashed::Hashed; +use super::{hashed::Hashed, CertificateValue}; use crate::ChainError; /// Generic type representing a certificate for `value` of type `T`. @@ -95,6 +98,16 @@ impl GenericCertificate { } } +impl GenericCertificate { + pub fn required_blob_ids(&self) -> HashSet { + match self.inner() { + CertificateValue::ConfirmedBlock(confirmed) => confirmed.inner().required_blob_ids(), + CertificateValue::ValidatedBlock(validated) => validated.inner().required_blob_ids(), + CertificateValue::Timeout(_) => HashSet::new(), + } + } +} + impl Clone for GenericCertificate { fn clone(&self) -> Self { Self { diff --git a/linera-chain/src/data_types.rs b/linera-chain/src/data_types.rs index e1fa19a3126..57074ae4422 100644 --- a/linera-chain/src/data_types.rs +++ b/linera-chain/src/data_types.rs @@ -292,7 +292,7 @@ pub struct BlockProposal { pub owner: Owner, pub signature: Signature, #[debug(skip_if = Vec::is_empty)] - pub blobs: Vec, + pub published_blobs: Vec, #[debug(skip_if = Option::is_none)] pub validated_block_certificate: Option>, } @@ -736,7 +736,12 @@ pub struct ProposalContent { } impl BlockProposal { - pub fn new_initial(round: Round, block: Block, secret: &KeyPair, blobs: Vec) -> Self { + pub fn new_initial( + round: Round, + block: Block, + secret: &KeyPair, + published_blobs: Vec, + ) -> Self { let content = ProposalContent { round, block, @@ -747,7 +752,7 @@ impl BlockProposal { content, owner: secret.public().into(), signature, - blobs, + published_blobs, validated_block_certificate: None, } } @@ -756,7 +761,7 @@ impl BlockProposal { round: Round, validated_block_certificate: ValidatedBlockCertificate, secret: &KeyPair, - blobs: Vec, + published_blobs: Vec, ) -> Self { let lite_cert = validated_block_certificate.lite_certificate().cloned(); let executed_block = validated_block_certificate.into_inner().into_inner(); @@ -770,7 +775,7 @@ impl BlockProposal { content, owner: secret.public().into(), signature, - blobs, + published_blobs, validated_block_certificate: Some(lite_cert), } } diff --git a/linera-chain/src/manager.rs b/linera-chain/src/manager.rs index 54f5598d7ff..7293c90647e 100644 --- a/linera-chain/src/manager.rs +++ b/linera-chain/src/manager.rs @@ -85,10 +85,10 @@ use serde::{Deserialize, Serialize}; use crate::{ block::Timeout, - data_types::{Block, BlockExecutionOutcome, BlockProposal, LiteVote, ProposalContent, Vote}, + data_types::{Block, BlockProposal, ExecutedBlock, LiteVote, ProposalContent, Vote}, types::{ - CertificateValue, ConfirmedBlockCertificate, HashedCertificateValue, TimeoutCertificate, - ValidatedBlockCertificate, + Certificate, CertificateValue, ConfirmedBlockCertificate, HashedCertificateValue, + LiteCertificate, TimeoutCertificate, ValidatedBlockCertificate, }, ChainError, }; @@ -121,12 +121,18 @@ pub struct ChainManager { /// validator). #[debug(skip_if = Option::is_none)] pub locked: Option, + /// These are the published blobs belonging to the locked block. + #[debug(skip_if = BTreeMap::is_empty)] + pub locked_published_blobs: BTreeMap, + /// These are the used blobs belonging to the locked block. + #[debug(skip_if = BTreeMap::is_empty)] + pub locked_used_blobs: BTreeMap, /// Latest leader timeout certificate we have received. #[debug(skip_if = Option::is_none)] pub timeout: Option, /// Latest vote we have cast, to validate or confirm. #[debug(skip_if = Option::is_none)] - pub pending: Option>, + pub pending_vote: Option>, /// Latest timeout vote we cast. #[debug(skip_if = Option::is_none)] pub timeout_vote: Option>, @@ -146,9 +152,6 @@ pub struct ChainManager { /// The owners that take over in fallback mode. #[debug(skip_if = BTreeMap::is_empty)] pub fallback_owners: BTreeMap, - /// These are blobs belonging to proposed or validated blocks that have not been confirmed yet. - #[debug(skip_if = BTreeMap::is_empty)] - pub pending_blobs: BTreeMap, } doc_scalar!( @@ -210,20 +213,21 @@ impl ChainManager { fallback_distribution, proposed: None, locked: None, + locked_published_blobs: BTreeMap::new(), + locked_used_blobs: BTreeMap::new(), timeout: None, - pending: None, + pending_vote: None, timeout_vote: None, fallback_vote: None, round_timeout, current_round, fallback_owners, - pending_blobs: BTreeMap::new(), }) } /// Returns the most recent vote we cast. - pub fn pending(&self) -> Option<&Vote> { - self.pending.as_ref() + pub fn pending_vote(&self) -> Option<&Vote> { + self.pending_vote.as_ref() } /// Verifies the safety of a proposed block with respect to voting rules. @@ -361,7 +365,7 @@ impl ChainManager { ) -> Result { let new_block = &certificate.executed_block().block; let new_round = certificate.round; - if let Some(Vote { value, round, .. }) = &self.pending { + if let Some(Vote { value, round, .. }) = &self.pending_vote { match value.inner() { CertificateValue::ConfirmedBlock(confirmed) => { if &confirmed.inner().block == new_block && *round == new_round { @@ -394,36 +398,45 @@ impl ChainManager { Ok(Outcome::Accept) } + /// If the validated block certificate is more recent, returns that certificate + /// so that the locked block can be updated. + pub fn validated_block_if_newer_than_locked( + &self, + validated_block_certificate: Option>, + executed_block: ExecutedBlock, + ) -> Option { + let lite_cert = validated_block_certificate?; + if self + .locked + .as_ref() + .map_or(true, |locked| locked.round < lite_cert.round) + { + let value = HashedCertificateValue::new_validated(executed_block); + return lite_cert.with_value(value); + } + + None + } + /// Signs a vote to validate the proposed block. pub fn create_vote( &mut self, proposal: BlockProposal, - outcome: BlockExecutionOutcome, + executed_block: ExecutedBlock, key_pair: Option<&KeyPair>, local_time: Timestamp, + used_blobs: Vec, + maybe_update_locked: Option, ) { + let proposal_content = proposal.content.clone(); + let published_blobs = proposal.published_blobs.clone(); // Record the proposed block, so it can be supplied to clients that request it. - self.proposed = Some(proposal.clone()); + self.update_proposed(proposal); self.update_current_round(local_time); - let ProposalContent { block, round, .. } = proposal.content; - let executed_block = outcome.with(block); - - // If the validated block certificate is more recent, update our locked block. - if let Some(lite_cert) = proposal.validated_block_certificate { - if self - .locked - .as_ref() - .map_or(true, |locked| locked.round < lite_cert.round) - { - let value = HashedCertificateValue::new_validated(executed_block.clone()); - if let Some(certificate) = lite_cert.with_value(value) { - self.locked = Some(certificate.into()); - } - } - } + let ProposalContent { round, .. } = proposal_content; - for blob in proposal.blobs { - self.pending_blobs.insert(blob.id(), blob); + if let Some(certificate) = maybe_update_locked { + self.update_locked(certificate.into(), published_blobs, used_blobs); } if let Some(key_pair) = key_pair { @@ -433,7 +446,7 @@ impl ChainManager { } else { HashedCertificateValue::new_validated(executed_block) }; - self.pending = Some(Vote::new(value, round, key_pair)); + self.pending_vote = Some(Vote::new(value, round, key_pair)); } } @@ -441,6 +454,8 @@ impl ChainManager { pub fn create_final_vote( &mut self, validated: ValidatedBlockCertificate, + published_blobs: Vec, + used_blobs: Vec, key_pair: Option<&KeyPair>, local_time: Timestamp, ) { @@ -451,7 +466,7 @@ impl ChainManager { return; } let confirmed = ConfirmedBlockCertificate::from_validated(validated.clone()); - self.locked = Some(validated); + self.update_locked(validated, published_blobs, used_blobs); self.update_current_round(local_time); if let Some(key_pair) = key_pair { // Vote to confirm. @@ -459,7 +474,7 @@ impl ChainManager { // back into `Certificate` type so that the vote is cast over hash of the old type. let vote = Vote::new(confirmed.into_inner().into(), round, key_pair); // Ok to overwrite validation votes with confirmation votes at equal or higher round. - self.pending = Some(vote); + self.pending_vote = Some(vote); } } @@ -570,6 +585,40 @@ impl ChainManager { fn is_super(&self, owner: &Owner) -> bool { self.ownership.super_owners.contains_key(owner) } + + fn update_locked( + &mut self, + new_locked: ValidatedBlockCertificate, + published_blobs: Vec, + used_blobs: Vec, + ) { + self.locked = Some(new_locked); + self.locked_published_blobs = published_blobs + .into_iter() + .map(|blob| (blob.id(), blob)) + .collect(); + self.locked_used_blobs = used_blobs + .into_iter() + .map(|blob| (blob.id(), blob)) + .collect(); + } + + fn update_proposed(&mut self, new_proposed: BlockProposal) { + self.proposed = Some(new_proposed); + } + + pub fn proposed_blobs(&self) -> BTreeMap { + self.proposed + .as_ref() + .map(|proposed| { + proposed + .published_blobs + .iter() + .map(|blob| (blob.id(), blob.clone())) + .collect() + }) + .unwrap_or_default() + } } /// Chain manager information that is included in `ChainInfo` sent to clients. @@ -585,12 +634,18 @@ pub struct ChainManagerInfo { /// validator). #[debug(skip_if = Option::is_none)] pub requested_locked: Option>, + /// Published blobs belonging to the locked block. + #[debug(skip_if = Vec::is_empty)] + pub locked_published_blobs: Vec, + /// Used blobs belonging to the locked block. + #[debug(skip_if = Vec::is_empty)] + pub locked_used_blobs: Vec, /// Latest timeout certificate we have seen. #[debug(skip_if = Option::is_none)] pub timeout: Option>, /// Latest vote we cast (either to validate or to confirm a block). #[debug(skip_if = Option::is_none)] - pub pending: Option, + pub pending_vote: Option, /// Latest timeout vote we cast. #[debug(skip_if = Option::is_none)] pub timeout_vote: Option, @@ -609,9 +664,6 @@ pub struct ChainManagerInfo { /// The timestamp when the current round times out. #[debug(skip_if = Option::is_none)] pub round_timeout: Option, - /// These are blobs belonging to proposed or validated blocks that have not been confirmed yet. - #[debug(skip_if = BTreeMap::is_empty)] - pub pending_blobs: BTreeMap, } impl From<&ChainManager> for ChainManagerInfo { @@ -621,15 +673,16 @@ impl From<&ChainManager> for ChainManagerInfo { ownership: manager.ownership.clone(), requested_proposed: None, requested_locked: None, + locked_published_blobs: Vec::new(), + locked_used_blobs: Vec::new(), timeout: manager.timeout.clone().map(Box::new), - pending: manager.pending.as_ref().map(|vote| vote.lite()), + pending_vote: manager.pending_vote.as_ref().map(|vote| vote.lite()), timeout_vote: manager.timeout_vote.as_ref().map(Vote::lite), fallback_vote: manager.fallback_vote.as_ref().map(Vote::lite), requested_pending_value: None, current_round, leader: manager.round_leader(current_round).cloned(), round_timeout: manager.round_timeout, - pending_blobs: BTreeMap::new(), } } } @@ -640,10 +693,11 @@ impl ChainManagerInfo { self.requested_proposed = manager.proposed.clone().map(Box::new); self.requested_locked = manager.locked.clone().map(Box::new); self.requested_pending_value = manager - .pending + .pending_vote .as_ref() .map(|vote| Box::new(vote.value.clone())); - self.pending_blobs = manager.pending_blobs.clone(); + self.locked_published_blobs = manager.locked_published_blobs.values().cloned().collect(); + self.locked_used_blobs = manager.locked_used_blobs.values().cloned().collect(); } /// Gets the highest validated block. diff --git a/linera-core/src/chain_worker/state/attempted_changes.rs b/linera-core/src/chain_worker/state/attempted_changes.rs index 14bd0debcef..1513c4b0731 100644 --- a/linera-core/src/chain_worker/state/attempted_changes.rs +++ b/linera-core/src/chain_worker/state/attempted_changes.rs @@ -133,9 +133,44 @@ where ) -> Result<(), WorkerError> { // Create the vote and store it in the chain state. let manager = self.state.chain.manager.get_mut(); - manager.create_vote(proposal, outcome, self.state.config.key_pair(), local_time); + + let executed_block = outcome.with(proposal.content.block.clone()); + let maybe_update_locked = manager.validated_block_if_newer_than_locked( + proposal.validated_block_certificate.clone(), + executed_block.clone(), + ); + + let used_blobs = if maybe_update_locked.is_some() { + let used_blob_ids = executed_block + .required_blob_ids() + .difference(&proposal.content.block.published_blob_ids()) + .cloned() + .collect::>(); + let used_blobs = self.state.storage.read_blobs(&used_blob_ids).await?; + let missing_blob_ids = used_blob_ids + .iter() + .zip(&used_blobs) + .filter_map(|(blob_id, blob)| blob.is_none().then_some(*blob_id)) + .collect::>(); + if !missing_blob_ids.is_empty() { + return Err(WorkerError::BlobsNotFound(missing_blob_ids)); + } + + used_blobs.into_iter().flatten().collect() + } else { + Vec::new() + }; + + manager.create_vote( + proposal, + executed_block, + self.state.config.key_pair(), + local_time, + used_blobs, + maybe_update_locked, + ); // Cache the value we voted on, so the client doesn't have to send it again. - if let Some(vote) = manager.pending() { + if let Some(vote) = manager.pending_vote() { self.state .recent_hashed_certificate_values .insert(Cow::Borrowed(&vote.value)) @@ -201,17 +236,26 @@ where let required_blob_ids = executed_block.required_blob_ids(); // Verify that no unrelated blobs were provided. self.state - .check_for_unneeded_blobs(&required_blob_ids, blobs)?; + .check_for_unneeded_or_duplicated_blobs(&required_blob_ids, blobs)?; let remaining_required_blob_ids = required_blob_ids .difference(&blobs.iter().map(|blob| blob.id()).collect()) .cloned() .collect(); - self.state - .check_no_missing_blobs(&remaining_required_blob_ids) + let mut validated_blobs = self + .state + .get_proposed_blobs(&remaining_required_blob_ids) .await?; + validated_blobs.extend_from_slice(blobs); + + let published_blob_ids = block.published_blob_ids(); + let (published_blobs, used_blobs) = validated_blobs + .into_iter() + .partition(|blob| published_blob_ids.contains(&blob.id())); let old_round = self.state.chain.manager.get().current_round; self.state.chain.manager.get_mut().create_final_vote( certificate, + published_blobs, + used_blobs, self.state.config.key_pair(), self.state.storage.clock().current_time(), ); @@ -292,14 +336,14 @@ where let required_blob_ids = executed_block.required_blob_ids(); // Verify that no unrelated blobs were provided. self.state - .check_for_unneeded_blobs(&required_blob_ids, blobs)?; + .check_for_unneeded_or_duplicated_blobs(&required_blob_ids, blobs)?; let remaining_required_blob_ids = required_blob_ids .difference(&blobs.iter().map(|blob| blob.id()).collect()) .cloned() .collect(); let mut blobs_in_block = self .state - .get_blobs_and_checks_storage(&remaining_required_blob_ids) + .get_proposed_or_locked_blobs_and_check_storage(&remaining_required_blob_ids) .await?; blobs_in_block.extend_from_slice(blobs); diff --git a/linera-core/src/chain_worker/state/mod.rs b/linera-core/src/chain_worker/state/mod.rs index b627f7d1b17..af8902d34c6 100644 --- a/linera-core/src/chain_worker/state/mod.rs +++ b/linera-core/src/chain_worker/state/mod.rs @@ -306,39 +306,95 @@ where } /// Returns an error if the block requires a blob we don't have. - /// Looks for the blob in: chain manager's pending blobs and storage. - async fn check_no_missing_blobs( + /// Looks for the blob in: chain manager's proposed blobs and storage. + async fn get_proposed_blobs( &self, - required_blob_ids: &HashSet, - ) -> Result<(), WorkerError> { - let pending_blobs = &self.chain.manager.get().pending_blobs; - let missing_blob_ids = required_blob_ids - .iter() - .filter(|blob_id| !pending_blobs.contains_key(blob_id)) - .cloned() - .collect::>(); + blob_ids: &HashSet, + ) -> Result, WorkerError> { + let proposed_blobs = self.chain.manager.get().proposed_blobs(); + let (mut found_blobs, missing_blob_ids) = self.get_found_and_missing_blobs( + blob_ids, + &proposed_blobs, + &BTreeMap::new(), + &BTreeMap::new(), + ); - let missing_blob_ids = self - .storage - .missing_blobs(missing_blob_ids.as_slice()) - .await?; + let blobs = self.storage.read_blobs(&missing_blob_ids).await?; + let missing_blob_ids: Vec<_> = missing_blob_ids + .into_iter() + .zip(&blobs) + .filter_map(|(blob_id, blob)| blob.is_none().then_some(blob_id)) + .collect(); + found_blobs.extend(blobs.into_iter().flatten()); if missing_blob_ids.is_empty() { - return Ok(()); + Ok(found_blobs) + } else { + Err(WorkerError::BlobsNotFound(missing_blob_ids)) + } + } + + /// Returns the blobs requested by their `blob_ids` that are in the chain manager's + /// proposed or locked blobs and checks that they are otherwise in storage. + async fn get_proposed_or_locked_blobs_and_check_storage( + &self, + blob_ids: &HashSet, + ) -> Result, WorkerError> { + let manager = self.chain.manager.get(); + + let locked_published_blobs = &manager.locked_published_blobs; + let locked_used_blobs = &manager.locked_used_blobs; + let proposed_blobs = &manager.proposed_blobs(); + let (found_blobs, missing_blob_ids) = self.get_found_and_missing_blobs( + blob_ids, + proposed_blobs, + locked_published_blobs, + locked_used_blobs, + ); + + let not_found_blob_ids = self.storage.missing_blobs(&missing_blob_ids).await?; + if not_found_blob_ids.is_empty() { + Ok(found_blobs) + } else { + Err(WorkerError::BlobsNotFound(not_found_blob_ids)) } + } - Err(WorkerError::BlobsNotFound(missing_blob_ids)) + fn get_found_and_missing_blobs( + &self, + blob_ids: &HashSet, + proposed_blobs: &BTreeMap, + locked_published_blobs: &BTreeMap, + locked_used_blobs: &BTreeMap, + ) -> (Vec, Vec) { + let mut found_blobs = Vec::new(); + let mut missing_blob_ids = Vec::new(); + for blob_id in blob_ids { + match proposed_blobs + .get(blob_id) + .or_else(|| locked_published_blobs.get(blob_id)) + .or_else(|| locked_used_blobs.get(blob_id)) + { + Some(blob) => found_blobs.push(blob.clone()), + None => missing_blob_ids.push(*blob_id), + } + } + (found_blobs, missing_blob_ids) } - /// Returns an error if unrelated blobs were provided. - fn check_for_unneeded_blobs( + /// Returns an error if unrelated or duplicated blobs were provided. + fn check_for_unneeded_or_duplicated_blobs( &self, required_blob_ids: &HashSet, blobs: &[Blob], ) -> Result<(), WorkerError> { - // Find all certificates containing blobs used when executing this block. + let mut seen_blob_ids = HashSet::new(); for blob in blobs { let blob_id = blob.id(); + ensure!( + seen_blob_ids.insert(blob_id), + WorkerError::DuplicatedBlob { blob_id } + ); ensure!( required_blob_ids.contains(&blob_id), WorkerError::UnneededBlob { blob_id } @@ -348,32 +404,6 @@ where Ok(()) } - /// Returns the blobs requested by their `blob_ids` that are in the chain manager's pending blobs - /// and checks that they are otherwise in storage. - async fn get_blobs_and_checks_storage( - &self, - blob_ids: &HashSet, - ) -> Result, WorkerError> { - let pending_blobs = &self.chain.manager.get().pending_blobs; - - let mut found_blobs = Vec::new(); - let mut missing_blob_ids = Vec::new(); - for blob_id in blob_ids { - if let Some(blob) = pending_blobs.get(blob_id) { - found_blobs.push(blob.clone()); - } else { - missing_blob_ids.push(*blob_id); - } - } - let not_found_blob_ids = self.storage.missing_blobs(&missing_blob_ids).await?; - - if not_found_blob_ids.is_empty() { - Ok(found_blobs) - } else { - Err(WorkerError::BlobsNotFound(not_found_blob_ids)) - } - } - /// Adds any newly created chains to the set of `tracked_chains`, if the parent chain is /// also tracked. /// diff --git a/linera-core/src/chain_worker/state/temporary_changes.rs b/linera-core/src/chain_worker/state/temporary_changes.rs index fa5e642c613..42c1b51b7fe 100644 --- a/linera-core/src/chain_worker/state/temporary_changes.rs +++ b/linera-core/src/chain_worker/state/temporary_changes.rs @@ -159,7 +159,7 @@ where forced_oracle_responses, }, owner, - blobs, + published_blobs, validated_block_certificate, signature: _, } = proposal; @@ -208,19 +208,19 @@ where // legitimately required. // Actual execution happens below, after other validity checks. self.0.chain.remove_bundles_from_inboxes(block).await?; - // Verify that no unrelated blobs were provided. + // Verify that no unrelated published blobs were provided. let published_blob_ids = block.published_blob_ids(); self.0 - .check_for_unneeded_blobs(&published_blob_ids, blobs)?; + .check_for_unneeded_or_duplicated_blobs(&published_blob_ids, published_blobs)?; let missing_published_blob_ids = published_blob_ids - .difference(&blobs.iter().map(|blob| blob.id()).collect()) + .difference(&published_blobs.iter().map(|blob| blob.id()).collect()) .cloned() .collect::>(); ensure!( missing_published_blob_ids.is_empty(), WorkerError::BlobsNotFound(missing_published_blob_ids) ); - for blob in blobs { + for blob in published_blobs { Self::check_blob_size(blob.content(), &policy)?; } diff --git a/linera-core/src/client/chain_client_state.rs b/linera-core/src/client/chain_client_state.rs index a8116a0c520..def6641bb90 100644 --- a/linera-core/src/client/chain_client_state.rs +++ b/linera-core/src/client/chain_client_state.rs @@ -33,15 +33,15 @@ pub struct ChainClientState { /// /// This is always at the same height as `next_block_height`. pending_block: Option, + /// This contains blobs belonging to our `pending_block` that may not even have + /// been processed by (i.e. been proposed to) our own local chain manager yet. + pending_blobs: BTreeMap, /// Known key pairs from present and past identities. known_key_pairs: BTreeMap, /// For each validator, up to which index we have synchronized their /// [`ChainStateView::received_log`]. received_certificate_trackers: HashMap, - /// This contains blobs belonging to our `pending_block` that may not even have - /// been processed by (i.e. been proposed to) our own local chain manager yet. - pending_blobs: BTreeMap, /// A mutex that is held whilst we are performing operations that should not be /// attempted by multiple clients at the same time. diff --git a/linera-core/src/client/mod.rs b/linera-core/src/client/mod.rs index a9edb271d41..17090a9b2b2 100644 --- a/linera-core/src/client/mod.rs +++ b/linera-core/src/client/mod.rs @@ -1094,7 +1094,7 @@ where .expect("The result of executing a proposal is always an executed block") .outcome .required_blob_ids(); - let proposed_blobs = proposal.blobs.clone(); + let proposed_blobs = proposal.published_blobs.clone(); let submit_action = CommunicateAction::SubmitBlock { proposal, blob_ids: required_blob_ids, @@ -1784,7 +1784,11 @@ where { if let LocalNodeError::BlobsNotFound(blob_ids) = &original_err { if let Some(new_blobs) = remote_node - .find_missing_blobs(blob_ids.clone(), chain_id) + .find_missing_blobs( + blob_ids.clone(), + &info.manager.locked_published_blobs, + &info.manager.locked_used_blobs, + ) .await? { blobs = new_blobs; @@ -1928,13 +1932,13 @@ where } } - /// Tries to read blobs from either the pending blobs or the local node's cache, or - /// storage + /// Tries to read blobs from the chain client's pending blobs. #[instrument(level = "trace")] async fn read_local_blobs( &self, blob_ids: impl IntoIterator + std::fmt::Debug, ) -> Result, LocalNodeError> { + let mut missing_blob_ids = Vec::new(); let mut blobs = Vec::new(); for blob_id in blob_ids { let maybe_blob = self.pending_blobs().get(&blob_id).cloned(); @@ -1943,26 +1947,16 @@ where continue; } - let maybe_blob = { - let chain_state_view = self.chain_state_view().await?; - chain_state_view - .manager - .get() - .pending_blobs - .get(&blob_id) - .cloned() - }; - - if let Some(blob) = maybe_blob { - blobs.push(blob); - continue; - } + missing_blob_ids.push(blob_id); + } - return Err(LocalNodeError::CannotReadLocalBlob { + if !missing_blob_ids.is_empty() { + return Err(LocalNodeError::CannotReadLocalBlobs { chain_id: self.chain_id, - blob_id, + blob_ids: missing_blob_ids, }); } + Ok(blobs) } @@ -2034,17 +2028,22 @@ where }; // Collect the hashed certificate values required for execution. let committee = self.local_committee().await?; - let blobs = self.read_local_blobs(block.published_blob_ids()).await?; // Create the final block proposal. let key_pair = self.key_pair().await?; let proposal = if let Some(cert) = manager.requested_locked { - Box::new(BlockProposal::new_retry(round, *cert, &key_pair, blobs)) + Box::new(BlockProposal::new_retry( + round, + *cert, + &key_pair, + manager.locked_published_blobs, + )) } else { + let published_blobs = self.read_local_blobs(block.published_blob_ids()).await?; Box::new(BlockProposal::new_initial( round, block.clone(), &key_pair, - blobs, + published_blobs, )) }; // Check the final block proposal. This will be cheaper after #1401. diff --git a/linera-core/src/local_node.rs b/linera-core/src/local_node.rs index 5418991a3f6..941b3753d78 100644 --- a/linera-core/src/local_node.rs +++ b/linera-core/src/local_node.rs @@ -3,7 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 use std::{ - collections::{BTreeMap, VecDeque}, + collections::{BTreeMap, HashSet, VecDeque}, sync::Arc, }; @@ -14,7 +14,7 @@ use linera_base::{ }; use linera_chain::{ data_types::{Block, BlockProposal, ExecutedBlock}, - types::{Certificate, ConfirmedBlockCertificate, LiteCertificate}, + types::{Certificate, CertificateValue, ConfirmedBlockCertificate, LiteCertificate}, ChainStateView, }; use linera_execution::{Query, Response}; @@ -60,8 +60,11 @@ pub enum LocalNodeError { #[error("Local node operation failed: {0}")] WorkerError(WorkerError), - #[error("Failed to read blob {blob_id:?} of chain {chain_id:?}")] - CannotReadLocalBlob { chain_id: ChainId, blob_id: BlobId }, + #[error("Failed to read blobs {blob_ids:?} of chain {chain_id:?}")] + CannotReadLocalBlobs { + chain_id: ChainId, + blob_ids: Vec, + }, #[error("The local node doesn't have an active chain {0:?}")] InactiveChain(ChainId), @@ -185,44 +188,88 @@ where /// Given a list of missing `BlobId`s and a `Certificate` for a block: /// - Searches for the blob in different places of the local node: blob cache, /// chain manager's pending blobs, and blob storage. - /// - Returns `None` if not all blobs could be found. + /// - Returns `Some` for blobs that were found and `None` for the ones still missing. pub async fn find_missing_blobs( &self, - mut missing_blob_ids: Vec, - chain_id: ChainId, - ) -> Result>, NodeError> { + missing_blob_ids: Vec, + certificate: &Certificate, + ) -> Result>, NodeError> { if missing_blob_ids.is_empty() { - return Ok(Some(Vec::new())); + return Ok(Vec::new()); } - let mut chain_manager_pending_blobs = self - .chain_state_view(chain_id) - .await? - .manager - .get() - .pending_blobs - .clone(); - let mut found_blobs = Vec::new(); - missing_blob_ids.retain(|blob_id| { - if let Some(blob) = chain_manager_pending_blobs.remove(blob_id) { - found_blobs.push(blob); - false - } else { - true + // Find the missing blobs locally and retry. + match certificate.inner() { + CertificateValue::ConfirmedBlock(confirmed) => { + self.check_missing_blob_ids( + &confirmed.inner().required_blob_ids(), + &missing_blob_ids, + )?; + let storage = self.storage_client(); + Ok(storage.read_blobs(&missing_blob_ids).await?) } - }); + CertificateValue::ValidatedBlock(validated) => { + self.check_missing_blob_ids( + &validated.inner().required_blob_ids(), + &missing_blob_ids, + )?; + let chain_id = validated.inner().block.chain_id; + let chain = self.chain_state_view(chain_id).await?; + let manager = chain.manager.get(); + let locked_blobs = manager + .locked_published_blobs + .iter() + .chain(manager.locked_used_blobs.iter()) + .map(|(k, v)| (k, v.clone())) + .collect::>(); - let storage = self.storage_client(); - let Some(read_blobs) = storage - .read_blobs(&missing_blob_ids) - .await? - .into_iter() - .collect::>>() - else { - return Ok(None); - }; - found_blobs.extend(read_blobs); - Ok(Some(found_blobs)) + Ok(missing_blob_ids + .iter() + .map(|blob_id| locked_blobs.get(blob_id).cloned()) + .collect()) + } + CertificateValue::Timeout(_) => { + warn!( + "validator requested blobs {:?} but they're not required", + missing_blob_ids + ); + Err(NodeError::UnexpectedEntriesInBlobsNotFound( + missing_blob_ids, + )) + } + } + } + + pub fn check_missing_blob_ids( + &self, + required_blob_ids: &HashSet, + missing_blob_ids: &Vec, + ) -> Result<(), NodeError> { + let mut unexpected_blob_ids = Vec::new(); + for blob_id in missing_blob_ids { + if !required_blob_ids.contains(blob_id) { + warn!( + "validator requested blob {:?} but it is not required", + blob_id + ); + + unexpected_blob_ids.push(*blob_id); + } + } + + if !unexpected_blob_ids.is_empty() { + return Err(NodeError::UnexpectedEntriesInBlobsNotFound( + unexpected_blob_ids, + )); + } + + let unique_missing_blob_ids = missing_blob_ids.iter().cloned().collect::>(); + if missing_blob_ids.len() > unique_missing_blob_ids.len() { + warn!("blobs requested by validator contain duplicates"); + return Err(NodeError::DuplicatesInBlobsNotFound); + } + + Ok(()) } /// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its diff --git a/linera-core/src/node.rs b/linera-core/src/node.rs index c06682295d7..552dd48c139 100644 --- a/linera-core/src/node.rs +++ b/linera-core/src/node.rs @@ -223,8 +223,8 @@ pub enum NodeError { InvalidCertificateForBlob(BlobId), #[error("Node returned a BlobsNotFound error with duplicates")] DuplicatesInBlobsNotFound, - #[error("Node returned a BlobsNotFound error with unexpected blob IDs")] - UnexpectedEntriesInBlobsNotFound, + #[error("Node returned a BlobsNotFound error with unexpected blob IDs: {0:?}")] + UnexpectedEntriesInBlobsNotFound(Vec), #[error("Local error handling validator response")] LocalError { error: String }, } diff --git a/linera-core/src/remote_node.rs b/linera-core/src/remote_node.rs index 68f2bcb942c..809c440fbc7 100644 --- a/linera-core/src/remote_node.rs +++ b/linera-core/src/remote_node.rs @@ -13,7 +13,7 @@ use linera_base::{ }; use linera_chain::{ data_types::BlockProposal, - types::{Certificate, CertificateValue, ConfirmedBlockCertificate, LiteCertificate}, + types::{Certificate, ConfirmedBlockCertificate, LiteCertificate}, }; use linera_execution::committee::ValidatorName; use rand::seq::SliceRandom as _; @@ -208,37 +208,29 @@ impl RemoteNode { } } - /// Downloads the blobs from the specified validator and returns them, including blobs that - /// are still pending in the validator's chain manager. Returns `None` if not all of them - /// could be found. + /// Checks that the given blobs are present in the provided lists of locked blobs from the + /// chain manager. If they're not there, tries to download the missing ones from this validator. + /// Returns `None` if not all of them could be found. pub(crate) async fn find_missing_blobs( &self, blob_ids: Vec, - chain_id: ChainId, + locked_published_blobs: &[Blob], + locked_used_blobs: &[Blob], ) -> Result>, NodeError> { - let query = ChainInfoQuery::new(chain_id).with_manager_values(); - let info = match self.handle_chain_info_query(query).await { - Ok(info) => Some(info), - Err(err) => { - warn!("Got error from validator {}: {}", self.name, err); - return Ok(None); - } - }; - - let mut missing_blobs = blob_ids; - let mut found_blobs = if let Some(info) = info { - let new_found_blobs = missing_blobs - .iter() - .filter_map(|blob_id| info.manager.pending_blobs.get(blob_id)) - .map(|blob| (blob.id(), blob.clone())) - .collect::>(); - missing_blobs.retain(|blob_id| !new_found_blobs.contains_key(blob_id)); - new_found_blobs.into_values().collect() - } else { - Vec::new() - }; + let mut missing_blob_ids = blob_ids.into_iter().collect::>(); + let new_found_blobs = locked_published_blobs + .iter() + .chain(locked_used_blobs.iter()) + .filter(|blob| missing_blob_ids.contains(&blob.id())) + .map(|blob| (blob.id(), blob.clone())) + .collect::>(); + missing_blob_ids.retain(|blob_id| !new_found_blobs.contains_key(blob_id)); + let mut found_blobs: Vec = new_found_blobs.into_values().collect(); - if let Some(blobs) = self.try_download_blobs(&missing_blobs).await { + if let Some(blobs) = self + .try_download_blobs(&missing_blob_ids.into_iter().collect::>()) + .await + { found_blobs.extend(blobs); Ok(Some(found_blobs)) } else { @@ -301,37 +293,4 @@ impl RemoteNode { } Some(blobs) } - - /// Checks that requesting these blobs when trying to handle this certificate is legitimate, - /// i.e. that there are no duplicates and the blobs are actually required. - pub fn check_blobs_not_found( - &self, - certificate: &Certificate, - blob_ids: &[BlobId], - ) -> Result<(), NodeError> { - // Find the missing blobs locally and retry. - let required = match certificate.inner() { - CertificateValue::ConfirmedBlock(confirmed) => confirmed.inner().required_blob_ids(), - CertificateValue::ValidatedBlock(validated) => validated.inner().required_blob_ids(), - CertificateValue::Timeout(_) => HashSet::new(), - }; - for blob_id in blob_ids { - if !required.contains(blob_id) { - warn!( - "validator {} requested blob {blob_id:?} but it is not required", - self.name - ); - return Err(NodeError::UnexpectedEntriesInBlobsNotFound); - } - } - let unique_missing_blob_ids = blob_ids.iter().cloned().collect::>(); - if blob_ids.len() > unique_missing_blob_ids.len() { - warn!( - "blobs requested by validator {} contain duplicates", - self.name - ); - return Err(NodeError::DuplicatesInBlobsNotFound); - } - Ok(()) - } } diff --git a/linera-core/src/unit_tests/wasm_worker_tests.rs b/linera-core/src/unit_tests/wasm_worker_tests.rs index d9bd25ba0f0..35389efff00 100644 --- a/linera-core/src/unit_tests/wasm_worker_tests.rs +++ b/linera-core/src/unit_tests/wasm_worker_tests.rs @@ -165,7 +165,7 @@ where assert_eq!(BlockHeight::from(1), info.next_block_height); assert_eq!(Timestamp::from(1), info.timestamp); assert_eq!(Some(publish_certificate.hash()), info.block_hash); - assert!(info.manager.pending.is_none()); + assert!(info.manager.pending_vote.is_none()); let mut creator_system_state = SystemExecutionState { committees: [(Epoch::ZERO, committee.clone())].into_iter().collect(), @@ -248,7 +248,7 @@ where assert_eq!(BlockHeight::from(1), info.next_block_height); assert_eq!(Timestamp::from(2), info.timestamp); assert_eq!(Some(create_certificate.hash()), info.block_hash); - assert!(info.manager.pending.is_none()); + assert!(info.manager.pending_vote.is_none()); // Execute an application operation let increment = 5_u64; @@ -301,6 +301,6 @@ where assert_eq!(BlockHeight::from(2), info.next_block_height); assert_eq!(Some(run_certificate.hash()), info.block_hash); assert_eq!(Timestamp::from(3), info.timestamp); - assert!(info.manager.pending.is_none()); + assert!(info.manager.pending_vote.is_none()); Ok(()) } diff --git a/linera-core/src/unit_tests/worker_tests.rs b/linera-core/src/unit_tests/worker_tests.rs index 9e3518986db..eaeee26c80a 100644 --- a/linera-core/src/unit_tests/worker_tests.rs +++ b/linera-core/src/unit_tests/worker_tests.rs @@ -446,7 +446,7 @@ where ); let chain = worker.chain_state_view(ChainId::root(1)).await?; assert!(chain.is_active()); - assert!(chain.manager.get().pending().is_none()); + assert!(chain.manager.get().pending_vote().is_none()); Ok(()) } @@ -495,7 +495,7 @@ where ); let chain = worker.chain_state_view(ChainId::root(1)).await?; assert!(chain.is_active()); - assert!(chain.manager.get().pending().is_none()); + assert!(chain.manager.get().pending_vote().is_none()); Ok(()) } @@ -607,7 +607,7 @@ where ); let chain = worker.chain_state_view(ChainId::root(1)).await?; assert!(chain.is_active()); - assert!(chain.manager.get().pending().is_none()); + assert!(chain.manager.get().pending_vote().is_none()); Ok(()) } @@ -656,7 +656,7 @@ where ); let chain = worker.chain_state_view(ChainId::root(1)).await?; assert!(chain.is_active()); - assert!(chain.manager.get().pending().is_none()); + assert!(chain.manager.get().pending_vote().is_none()); drop(chain); worker @@ -664,7 +664,7 @@ where .await?; let chain = worker.chain_state_view(ChainId::root(1)).await?; assert!(chain.is_active()); - assert!(chain.manager.get().pending().is_some()); + assert!(chain.manager.get().pending_vote().is_some()); drop(chain); worker .handle_certificate(certificate0, vec![], None) @@ -672,7 +672,7 @@ where worker.handle_block_proposal(block_proposal1).await?; let chain = worker.chain_state_view(ChainId::root(1)).await?; assert!(chain.is_active()); - assert!(chain.manager.get().pending().is_some()); + assert!(chain.manager.get().pending_vote().is_some()); drop(chain); assert_matches!( worker.handle_block_proposal(block_proposal0.clone()).await, @@ -1082,7 +1082,7 @@ where ); let chain = worker.chain_state_view(ChainId::root(1)).await?; assert!(chain.is_active()); - assert!(chain.manager.get().pending().is_none()); + assert!(chain.manager.get().pending_vote().is_none()); Ok(()) } @@ -1114,9 +1114,9 @@ where chain_info_response.check(&ValidatorName(worker.public_key()))?; let chain = worker.chain_state_view(ChainId::root(1)).await?; assert!(chain.is_active()); - let pending_value = chain.manager.get().pending().unwrap().lite(); + let pending_value = chain.manager.get().pending_vote().unwrap().lite(); assert_eq!( - chain_info_response.info.manager.pending.unwrap(), + chain_info_response.info.manager.pending_vote.unwrap(), pending_value ); Ok(()) @@ -1847,7 +1847,7 @@ where assert_eq!(Amount::ZERO, info.chain_balance); assert_eq!(BlockHeight::from(1), info.next_block_height); assert_eq!(Some(certificate.hash()), info.block_hash); - assert!(info.manager.pending.is_none()); + assert!(info.manager.pending_vote.is_none()); assert_eq!( worker .query_application(ChainId::root(1), Query::System(SystemQuery)) @@ -1973,7 +1973,7 @@ where assert_eq!(Amount::ZERO, info.chain_balance); assert_eq!(BlockHeight::from(1), info.next_block_height); assert_eq!(Some(certificate.hash()), info.block_hash); - assert!(info.manager.pending.is_none()); + assert!(info.manager.pending_vote.is_none()); Ok(()) } @@ -3212,12 +3212,12 @@ where let value1 = HashedCertificateValue::new_validated(executed_block1.clone()); // If we send the validated block certificate to the worker, it votes to confirm. - let vote = response.info.manager.pending.clone().unwrap(); + let vote = response.info.manager.pending_vote.clone().unwrap(); let certificate1 = vote.with_value(value1.clone()).unwrap().into_certificate(); let (response, _) = worker .handle_certificate(certificate1.clone(), vec![], None) .await?; - let vote = response.info.manager.pending.as_ref().unwrap(); + let vote = response.info.manager.pending_vote.as_ref().unwrap(); let value = HashedCertificateValue::new_confirmed(executed_block1.clone()); assert_eq!(vote.value, value.lite()); @@ -3278,7 +3278,7 @@ where response.info.manager.requested_locked, Some(Box::new(certificate2.into())) ); - let vote = response.info.manager.pending.as_ref().unwrap(); + let vote = response.info.manager.pending_vote.as_ref().unwrap(); assert_eq!(vote.value, lite_value2); assert_eq!(vote.round, Round::SingleLeader(5)); @@ -3460,7 +3460,7 @@ where let (executed_block1, _) = worker.stage_block_execution(block1.clone()).await?; let value1 = HashedCertificateValue::new_confirmed(executed_block1); let (response, _) = worker.handle_block_proposal(proposal1).await?; - let vote = response.info.manager.pending.as_ref().unwrap(); + let vote = response.info.manager.pending_vote.as_ref().unwrap(); assert_eq!(vote.value.value_hash, value1.hash()); // Set the clock to the end of the round. @@ -3512,7 +3512,7 @@ where response.info.manager.requested_locked, Some(Box::new(certificate2.into())) ); - let vote = response.info.manager.pending.as_ref().unwrap(); + let vote = response.info.manager.pending_vote.as_ref().unwrap(); assert_eq!(vote.value, lite_value2); assert_eq!(vote.round, Round::MultiLeader(1)); Ok(()) diff --git a/linera-core/src/updater.rs b/linera-core/src/updater.rs index 865c4fb5be6..1926101b81c 100644 --- a/linera-core/src/updater.rs +++ b/linera-core/src/updater.rs @@ -26,7 +26,7 @@ use tracing::error; use crate::{ data_types::{ChainInfo, ChainInfoQuery}, - local_node::LocalNodeClient, + local_node::{LocalNodeClient, LocalNodeError}, node::{CrossChainMessageDelivery, NodeError, ValidatorNode}, remote_node::RemoteNode, }; @@ -211,21 +211,31 @@ where .handle_optimized_certificate(&certificate, delivery) .await; - match &result { - Err(original_err @ NodeError::BlobsNotFound(blob_ids)) => { - self.remote_node - .check_blobs_not_found(&certificate, blob_ids)?; + if let Err(NodeError::BlobsNotFound(blob_ids)) = result { + self.local_node + .check_missing_blob_ids(&certificate.required_blob_ids(), &blob_ids)?; - let blobs = self - .local_node - .find_missing_blobs(blob_ids.clone(), certificate.inner().chain_id()) - .await? - .ok_or_else(|| original_err.clone())?; - self.remote_node - .handle_certificate(certificate, blobs, delivery) - .await + let blobs = self + .local_node + .find_missing_blobs(blob_ids.clone(), &certificate) + .await?; + let missing_blob_ids = blobs + .iter() + .zip(&blob_ids) + .filter_map(|(maybe_blob, blob_id)| maybe_blob.is_none().then_some(*blob_id)) + .collect::>(); + if !missing_blob_ids.is_empty() { + return Err(NodeError::LocalError { + error: LocalNodeError::BlobsNotFound(missing_blob_ids).to_string(), + }); } - _ => result, + + let blobs = blobs.into_iter().flatten().collect(); + self.remote_node + .handle_certificate(certificate, blobs, delivery) + .await + } else { + result } } @@ -236,7 +246,7 @@ where ) -> Result, NodeError> { let chain_id = proposal.content.block.chain_id; let mut sent_cross_chain_updates = false; - for blob in &proposal.blobs { + for blob in &proposal.published_blobs { blob_ids.remove(&blob.id()); // Keep only blobs we may need to resend. } loop { @@ -397,14 +407,14 @@ where let vote = match action { CommunicateAction::SubmitBlock { proposal, blob_ids } => { let info = self.send_block_proposal(proposal, blob_ids).await?; - info.manager.pending + info.manager.pending_vote } CommunicateAction::FinalizeBlock { certificate, delivery, } => { let info = self.send_certificate(certificate, delivery).await?; - info.manager.pending + info.manager.pending_vote } CommunicateAction::RequestTimeout { .. } => { let query = ChainInfoQuery::new(chain_id).with_timeout(); diff --git a/linera-core/src/worker.rs b/linera-core/src/worker.rs index 2e3dec882e7..71c5bfbc0c5 100644 --- a/linera-core/src/worker.rs +++ b/linera-core/src/worker.rs @@ -201,6 +201,8 @@ pub enum WorkerError { UnneededValue { value_hash: CryptoHash }, #[error("An additional blob was provided that is not required: {blob_id}.")] UnneededBlob { blob_id: BlobId }, + #[error("A duplicated blob was provided: {blob_id}.")] + DuplicatedBlob { blob_id: BlobId }, #[error("The certificate in the block proposal is not a ValidatedBlock")] MissingExecutedBlockInProposal, #[error("Fast blocks cannot query oracles")] diff --git a/linera-rpc/proto/rpc.proto b/linera-rpc/proto/rpc.proto index cd06b75f385..efb25c33aca 100644 --- a/linera-rpc/proto/rpc.proto +++ b/linera-rpc/proto/rpc.proto @@ -194,8 +194,8 @@ message BlockProposal { // A lite certificate for a validated block that justifies the proposal in this round. optional bytes validated_block_certificate = 6; - // Required blob - bytes blobs = 7; + // Blobs being published by this block proposal. + bytes published_blobs = 7; } // A certified statement from the committee, without the value. diff --git a/linera-rpc/src/grpc/conversions.rs b/linera-rpc/src/grpc/conversions.rs index a27b58147cc..adc646eb4d5 100644 --- a/linera-rpc/src/grpc/conversions.rs +++ b/linera-rpc/src/grpc/conversions.rs @@ -189,7 +189,7 @@ impl TryFrom for api::BlockProposal { content: bincode::serialize(&block_proposal.content)?, owner: Some(block_proposal.owner.into()), signature: Some(block_proposal.signature.into()), - blobs: bincode::serialize(&block_proposal.blobs)?, + published_blobs: bincode::serialize(&block_proposal.published_blobs)?, validated_block_certificate: block_proposal .validated_block_certificate .map(|cert| bincode::serialize(&cert)) @@ -211,7 +211,7 @@ impl TryFrom for BlockProposal { content, owner: try_proto_convert(block_proposal.owner)?, signature: try_proto_convert(block_proposal.signature)?, - blobs: bincode::deserialize(&block_proposal.blobs)?, + published_blobs: bincode::deserialize(&block_proposal.published_blobs)?, validated_block_certificate: block_proposal .validated_block_certificate .map(|bytes| bincode::deserialize(&bytes)) @@ -916,7 +916,7 @@ pub mod tests { }, owner: Owner::from(KeyPair::generate().public()), signature: Signature::new(&Foo("test".into()), &KeyPair::generate()), - blobs: vec![], + published_blobs: vec![], validated_block_certificate: Some(cert), }; diff --git a/linera-rpc/tests/snapshots/format__format.yaml.snap b/linera-rpc/tests/snapshots/format__format.yaml.snap index 03c2a7c0bfe..79364f01451 100644 --- a/linera-rpc/tests/snapshots/format__format.yaml.snap +++ b/linera-rpc/tests/snapshots/format__format.yaml.snap @@ -124,7 +124,7 @@ BlockProposal: TYPENAME: Owner - signature: TYPENAME: Signature - - blobs: + - published_blobs: SEQ: TYPENAME: BlobContent - validated_block_certificate: @@ -260,10 +260,16 @@ ChainManagerInfo: - requested_locked: OPTION: TYPENAME: ValidatedBlockCertificate + - locked_published_blobs: + SEQ: + TYPENAME: BlobContent + - locked_used_blobs: + SEQ: + TYPENAME: BlobContent - timeout: OPTION: TYPENAME: TimeoutCertificate - - pending: + - pending_vote: OPTION: TYPENAME: LiteVote - timeout_vote: @@ -283,12 +289,6 @@ ChainManagerInfo: - round_timeout: OPTION: TYPENAME: Timestamp - - pending_blobs: - MAP: - KEY: - TYPENAME: BlobId - VALUE: - TYPENAME: BlobContent ChainOwnership: STRUCT: - super_owners: @@ -598,7 +598,10 @@ NodeError: 21: DuplicatesInBlobsNotFound: UNIT 22: - UnexpectedEntriesInBlobsNotFound: UNIT + UnexpectedEntriesInBlobsNotFound: + NEWTYPE: + SEQ: + TYPENAME: BlobId 23: LocalError: STRUCT: diff --git a/linera-service/src/linera/main.rs b/linera-service/src/linera/main.rs index ef0113c071b..7b538d6ad40 100644 --- a/linera-service/src/linera/main.rs +++ b/linera-service/src/linera/main.rs @@ -733,7 +733,7 @@ impl Runnable for Job { .into_iter() .filter_map(|message| { let response = deserialize_response(message)?; - let vote = response.info.manager.pending?; + let vote = response.info.manager.pending_vote?; let value = values.get(&vote.value.value_hash)?.clone(); vote.clone().with_value(value) })