Skip to content

Commit

Permalink
Turn ChainManager's pending_blobs into proposed/locked specific pendi…
Browse files Browse the repository at this point in the history
…ng blobs
  • Loading branch information
ndr-ds committed Nov 14, 2024
1 parent 258c3f4 commit 8e30f57
Show file tree
Hide file tree
Showing 17 changed files with 315 additions and 189 deletions.
15 changes: 10 additions & 5 deletions linera-chain/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ pub struct BlockProposal {
pub owner: Owner,
pub signature: Signature,
#[debug(skip_if = Vec::is_empty)]
pub blobs: Vec<Blob>,
pub published_blobs: Vec<Blob>,
#[debug(skip_if = Option::is_none)]
pub validated_block_certificate: Option<LiteCertificate<'static>>,
}
Expand Down Expand Up @@ -720,7 +720,12 @@ pub struct ProposalContent {
}

impl BlockProposal {
pub fn new_initial(round: Round, block: Block, secret: &KeyPair, blobs: Vec<Blob>) -> Self {
pub fn new_initial(
round: Round,
block: Block,
secret: &KeyPair,
published_blobs: Vec<Blob>,
) -> Self {
let content = ProposalContent {
round,
block,
Expand All @@ -731,7 +736,7 @@ impl BlockProposal {
content,
owner: secret.public().into(),
signature,
blobs,
published_blobs,
validated_block_certificate: None,
}
}
Expand All @@ -740,7 +745,7 @@ impl BlockProposal {
round: Round,
validated_block_certificate: ValidatedBlockCertificate,
secret: &KeyPair,
blobs: Vec<Blob>,
published_blobs: Vec<Blob>,
) -> Self {
let lite_cert = validated_block_certificate.lite_certificate().cloned();
let executed_block = validated_block_certificate.into_inner().into_inner();
Expand All @@ -754,7 +759,7 @@ impl BlockProposal {
content,
owner: secret.public().into(),
signature,
blobs,
published_blobs,
validated_block_certificate: Some(lite_cert),
}
}
Expand Down
99 changes: 72 additions & 27 deletions linera-chain/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,18 @@ pub struct ChainManager {
/// validator).
#[debug(skip_if = Option::is_none)]
pub locked: Option<ValidatedBlockCertificate>,
/// These are the published blobs belonging to the locked block.
#[debug(skip_if = BTreeMap::is_empty)]
pub locked_published_blobs: BTreeMap<BlobId, Blob>,
/// These are the used blobs belonging to the locked block.
#[debug(skip_if = BTreeMap::is_empty)]
pub locked_used_blobs: BTreeMap<BlobId, Blob>,
/// Latest leader timeout certificate we have received.
#[debug(skip_if = Option::is_none)]
pub timeout: Option<TimeoutCertificate>,
/// Latest vote we have cast, to validate or confirm.
#[debug(skip_if = Option::is_none)]
pub pending: Option<Vote>,
pub pending_vote: Option<Vote>,
/// Latest timeout vote we cast.
#[debug(skip_if = Option::is_none)]
pub timeout_vote: Option<Vote>,
Expand All @@ -145,9 +151,6 @@ pub struct ChainManager {
/// The owners that take over in fallback mode.
#[debug(skip_if = BTreeMap::is_empty)]
pub fallback_owners: BTreeMap<Owner, (PublicKey, u64)>,
/// These are blobs belonging to proposed or validated blocks that have not been confirmed yet.
#[debug(skip_if = BTreeMap::is_empty)]
pub pending_blobs: BTreeMap<BlobId, Blob>,
}

doc_scalar!(
Expand Down Expand Up @@ -209,20 +212,21 @@ impl ChainManager {
fallback_distribution,
proposed: None,
locked: None,
locked_published_blobs: BTreeMap::new(),
locked_used_blobs: BTreeMap::new(),
timeout: None,
pending: None,
pending_vote: None,
timeout_vote: None,
fallback_vote: None,
round_timeout,
current_round,
fallback_owners,
pending_blobs: BTreeMap::new(),
})
}

/// Returns the most recent vote we cast.
pub fn pending(&self) -> Option<&Vote> {
self.pending.as_ref()
self.pending_vote.as_ref()
}

/// Verifies the safety of a proposed block with respect to voting rules.
Expand Down Expand Up @@ -352,7 +356,7 @@ impl ChainManager {
) -> Result<Outcome, ChainError> {
let new_block = &certificate.executed_block().block;
let new_round = certificate.round;
if let Some(Vote { value, round, .. }) = &self.pending {
if let Some(Vote { value, round, .. }) = &self.pending_vote {
match value.inner() {
CertificateValue::ConfirmedBlock(confirmed) => {
if &confirmed.inner().block == new_block && *round == new_round {
Expand Down Expand Up @@ -392,46 +396,48 @@ impl ChainManager {
outcome: BlockExecutionOutcome,
key_pair: Option<&KeyPair>,
local_time: Timestamp,
used_blobs: Vec<Blob>,
) {
let validated_block_certificate = proposal.validated_block_certificate.clone();
let proposal_content = proposal.content.clone();
let published_blobs = proposal.published_blobs.clone();
// Record the proposed block, so it can be supplied to clients that request it.
self.proposed = Some(proposal.clone());
self.update_proposed(proposal);
self.update_current_round(local_time);
let ProposalContent { block, round, .. } = proposal.content;
let ProposalContent { block, round, .. } = proposal_content;
let executed_block = outcome.with(block);

// If the validated block certificate is more recent, update our locked block.
if let Some(lite_cert) = proposal.validated_block_certificate {
if let Some(lite_cert) = validated_block_certificate {
if self
.locked
.as_ref()
.map_or(true, |locked| locked.round < lite_cert.round)
{
let value = HashedCertificateValue::new_validated(executed_block.clone());
if let Some(certificate) = lite_cert.with_value(value) {
self.locked = Some(certificate.into());
self.update_locked(certificate.into(), published_blobs, used_blobs);
}
}
}

for blob in proposal.blobs {
self.pending_blobs.insert(blob.id(), blob);
}

if let Some(key_pair) = key_pair {
// If this is a fast block, vote to confirm. Otherwise vote to validate.
let value = if round.is_fast() {
HashedCertificateValue::new_confirmed(executed_block)
} else {
HashedCertificateValue::new_validated(executed_block)
};
self.pending = Some(Vote::new(value, round, key_pair));
self.pending_vote = Some(Vote::new(value, round, key_pair));
}
}

/// Signs a vote to confirm the validated block.
pub fn create_final_vote(
&mut self,
validated: ValidatedBlockCertificate,
published_blobs: Vec<Blob>,
used_blobs: Vec<Blob>,
key_pair: Option<&KeyPair>,
local_time: Timestamp,
) {
Expand All @@ -442,15 +448,15 @@ impl ChainManager {
return;
}
let confirmed = ConfirmedBlockCertificate::from_validated(validated.clone());
self.locked = Some(validated);
self.update_locked(validated, published_blobs, used_blobs);
self.update_current_round(local_time);
if let Some(key_pair) = key_pair {
// Vote to confirm.
// NOTE: For backwards compatibility, we need to turn `ValidatedBlockCertificate`
// back into `Certificate` type so that the vote is cast over hash of the old type.
let vote = Vote::new(confirmed.into_inner().into(), round, key_pair);
// Ok to overwrite validation votes with confirmation votes at equal or higher round.
self.pending = Some(vote);
self.pending_vote = Some(vote);
}
}

Expand Down Expand Up @@ -561,6 +567,40 @@ impl ChainManager {
fn is_super(&self, owner: &Owner) -> bool {
self.ownership.super_owners.contains_key(owner)
}

fn update_locked(
&mut self,
new_locked: ValidatedBlockCertificate,
published_blobs: Vec<Blob>,
used_blobs: Vec<Blob>,
) {
self.locked = Some(new_locked);
self.locked_published_blobs = published_blobs
.into_iter()
.map(|blob| (blob.id(), blob))
.collect();
self.locked_used_blobs = used_blobs
.into_iter()
.map(|blob| (blob.id(), blob))
.collect();
}

fn update_proposed(&mut self, new_proposed: BlockProposal) {
self.proposed = Some(new_proposed);
}

pub fn proposed_blobs(&self) -> BTreeMap<BlobId, Blob> {
self.proposed
.as_ref()
.map(|proposed| {
proposed
.published_blobs
.iter()
.map(|blob| (blob.id(), blob.clone()))
.collect()
})
.unwrap_or_default()
}
}

/// Chain manager information that is included in `ChainInfo` sent to clients.
Expand All @@ -576,12 +616,18 @@ pub struct ChainManagerInfo {
/// validator).
#[debug(skip_if = Option::is_none)]
pub requested_locked: Option<Box<ValidatedBlockCertificate>>,
/// These are the published blobs belonging to the locked block.
#[debug(skip_if = Vec::is_empty)]
pub locked_published_blobs: Vec<Blob>,
/// These are the used blobs belonging to the locked block.
#[debug(skip_if = Vec::is_empty)]
pub locked_used_blobs: Vec<Blob>,
/// Latest timeout certificate we have seen.
#[debug(skip_if = Option::is_none)]
pub timeout: Option<Box<TimeoutCertificate>>,
/// Latest vote we cast (either to validate or to confirm a block).
#[debug(skip_if = Option::is_none)]
pub pending: Option<LiteVote>,
pub pending_vote: Option<LiteVote>,
/// Latest timeout vote we cast.
#[debug(skip_if = Option::is_none)]
pub timeout_vote: Option<LiteVote>,
Expand All @@ -600,9 +646,6 @@ pub struct ChainManagerInfo {
/// The timestamp when the current round times out.
#[debug(skip_if = Option::is_none)]
pub round_timeout: Option<Timestamp>,
/// These are blobs belonging to proposed or validated blocks that have not been confirmed yet.
#[debug(skip_if = BTreeMap::is_empty)]
pub pending_blobs: BTreeMap<BlobId, Blob>,
}

impl From<&ChainManager> for ChainManagerInfo {
Expand All @@ -612,15 +655,16 @@ impl From<&ChainManager> for ChainManagerInfo {
ownership: manager.ownership.clone(),
requested_proposed: None,
requested_locked: None,
locked_published_blobs: Vec::new(),
locked_used_blobs: Vec::new(),
timeout: manager.timeout.clone().map(Box::new),
pending: manager.pending.as_ref().map(|vote| vote.lite()),
pending_vote: manager.pending_vote.as_ref().map(|vote| vote.lite()),
timeout_vote: manager.timeout_vote.as_ref().map(Vote::lite),
fallback_vote: manager.fallback_vote.as_ref().map(Vote::lite),
requested_pending_value: None,
current_round,
leader: manager.round_leader(current_round).cloned(),
round_timeout: manager.round_timeout,
pending_blobs: BTreeMap::new(),
}
}
}
Expand All @@ -631,10 +675,11 @@ impl ChainManagerInfo {
self.requested_proposed = manager.proposed.clone().map(Box::new);
self.requested_locked = manager.locked.clone().map(Box::new);
self.requested_pending_value = manager
.pending
.pending_vote
.as_ref()
.map(|vote| Box::new(vote.value.clone()));
self.pending_blobs = manager.pending_blobs.clone();
self.locked_published_blobs = manager.locked_published_blobs.values().cloned().collect();
self.locked_used_blobs = manager.locked_used_blobs.values().cloned().collect();
}

/// Gets the highest validated block.
Expand Down
55 changes: 48 additions & 7 deletions linera-core/src/chain_worker/state/attempted_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@

//! Operations that persist changes to the chain state when they are successful.

use std::{borrow::Cow, collections::BTreeMap};
use std::{
borrow::Cow,
collections::{BTreeMap, HashSet},
};

use linera_base::{
data_types::{Blob, BlockHeight, Timestamp},
Expand Down Expand Up @@ -133,7 +136,28 @@ where
) -> Result<(), WorkerError> {
// Create the vote and store it in the chain state.
let manager = self.state.chain.manager.get_mut();
manager.create_vote(proposal, outcome, self.state.config.key_pair(), local_time);
let used_blob_ids = outcome
.required_blob_ids()
.difference(&proposal.content.block.published_blob_ids())
.cloned()
.collect::<Vec<_>>();
let used_blobs = self.state.storage.read_blobs(&used_blob_ids).await?;
let missing_blob_ids: Vec<_> = used_blob_ids
.iter()
.zip(&used_blobs)
.filter_map(|(blob_id, blob)| blob.is_none().then_some(*blob_id))
.collect();
if !missing_blob_ids.is_empty() {
return Err(WorkerError::BlobsNotFound(missing_blob_ids));
}

manager.create_vote(
proposal,
outcome,
self.state.config.key_pair(),
local_time,
used_blobs.into_iter().flatten().collect(),
);
// Cache the value we voted on, so the client doesn't have to send it again.
if let Some(vote) = manager.pending() {
self.state
Expand Down Expand Up @@ -199,19 +223,36 @@ where
.insert(Cow::Borrowed(cert.value()))
.await;
let required_blob_ids = executed_block.required_blob_ids();
let published_blob_ids = block.published_blob_ids();
let used_blob_ids = required_blob_ids
.difference(&published_blob_ids)
.collect::<HashSet<_>>();
// Verify that no unrelated blobs were provided.
self.state
.check_for_unneeded_blobs(&required_blob_ids, blobs)?;
.check_for_unneeded_or_duplicated_blobs(&required_blob_ids, blobs)?;
let remaining_required_blob_ids = required_blob_ids
.difference(&blobs.iter().map(|blob| blob.id()).collect())
.cloned()
.collect();
self.state
.check_no_missing_blobs(&remaining_required_blob_ids)
let mut validated_blobs = self
.state
.get_proposed_blobs_and_check_storage(&remaining_required_blob_ids)
.await?;
validated_blobs.extend_from_slice(blobs);
let published_blobs = validated_blobs
.iter()
.filter(|blob| published_blob_ids.contains(&blob.id()))
.cloned()
.collect();
let used_blobs = validated_blobs
.into_iter()
.filter(|blob| used_blob_ids.contains(&blob.id()))
.collect();
let old_round = self.state.chain.manager.get().current_round;
self.state.chain.manager.get_mut().create_final_vote(
certificate,
published_blobs,
used_blobs,
self.state.config.key_pair(),
self.state.storage.clock().current_time(),
);
Expand Down Expand Up @@ -292,14 +333,14 @@ where
let required_blob_ids = executed_block.required_blob_ids();
// Verify that no unrelated blobs were provided.
self.state
.check_for_unneeded_blobs(&required_blob_ids, blobs)?;
.check_for_unneeded_or_duplicated_blobs(&required_blob_ids, blobs)?;
let remaining_required_blob_ids = required_blob_ids
.difference(&blobs.iter().map(|blob| blob.id()).collect())
.cloned()
.collect();
let mut blobs_in_block = self
.state
.get_blobs_and_checks_storage(&remaining_required_blob_ids)
.get_locked_blobs_and_check_storage(&remaining_required_blob_ids)
.await?;
blobs_in_block.extend_from_slice(blobs);

Expand Down
Loading

0 comments on commit 8e30f57

Please sign in to comment.