Skip to content

Commit

Permalink
Turn ChainManager's pending_blobs into proposed/locked specific pendi…
Browse files Browse the repository at this point in the history
…ng blobs
  • Loading branch information
ndr-ds committed Nov 15, 2024
1 parent c552c15 commit 3b55663
Show file tree
Hide file tree
Showing 18 changed files with 448 additions and 250 deletions.
15 changes: 10 additions & 5 deletions linera-chain/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ pub struct BlockProposal {
pub owner: Owner,
pub signature: Signature,
#[debug(skip_if = Vec::is_empty)]
pub blobs: Vec<Blob>,
pub published_blobs: Vec<Blob>,
#[debug(skip_if = Option::is_none)]
pub validated_block_certificate: Option<LiteCertificate<'static>>,
}
Expand Down Expand Up @@ -720,7 +720,12 @@ pub struct ProposalContent {
}

impl BlockProposal {
pub fn new_initial(round: Round, block: Block, secret: &KeyPair, blobs: Vec<Blob>) -> Self {
pub fn new_initial(
round: Round,
block: Block,
secret: &KeyPair,
published_blobs: Vec<Blob>,
) -> Self {
let content = ProposalContent {
round,
block,
Expand All @@ -731,7 +736,7 @@ impl BlockProposal {
content,
owner: secret.public().into(),
signature,
blobs,
published_blobs,
validated_block_certificate: None,
}
}
Expand All @@ -740,7 +745,7 @@ impl BlockProposal {
round: Round,
validated_block_certificate: ValidatedBlockCertificate,
secret: &KeyPair,
blobs: Vec<Blob>,
published_blobs: Vec<Blob>,
) -> Self {
let lite_cert = validated_block_certificate.lite_certificate().cloned();
let executed_block = validated_block_certificate.into_inner().into_inner();
Expand All @@ -754,7 +759,7 @@ impl BlockProposal {
content,
owner: secret.public().into(),
signature,
blobs,
published_blobs,
validated_block_certificate: Some(lite_cert),
}
}
Expand Down
141 changes: 98 additions & 43 deletions linera-chain/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ use rand_distr::{Distribution, WeightedAliasIndex};
use serde::{Deserialize, Serialize};

use crate::{
data_types::{Block, BlockExecutionOutcome, BlockProposal, LiteVote, ProposalContent, Vote},
data_types::{Block, BlockProposal, ExecutedBlock, LiteVote, ProposalContent, Vote},
types::{
CertificateValue, ConfirmedBlockCertificate, HashedCertificateValue, TimeoutCertificate,
ValidatedBlockCertificate,
Certificate, CertificateValue, ConfirmedBlockCertificate, HashedCertificateValue,
LiteCertificate, TimeoutCertificate, ValidatedBlockCertificate,
},
ChainError,
};
Expand Down Expand Up @@ -120,12 +120,18 @@ pub struct ChainManager {
/// validator).
#[debug(skip_if = Option::is_none)]
pub locked: Option<ValidatedBlockCertificate>,
/// These are the published blobs belonging to the locked block.
#[debug(skip_if = BTreeMap::is_empty)]
pub locked_published_blobs: BTreeMap<BlobId, Blob>,
/// These are the used blobs belonging to the locked block.
#[debug(skip_if = BTreeMap::is_empty)]
pub locked_used_blobs: BTreeMap<BlobId, Blob>,
/// Latest leader timeout certificate we have received.
#[debug(skip_if = Option::is_none)]
pub timeout: Option<TimeoutCertificate>,
/// Latest vote we have cast, to validate or confirm.
#[debug(skip_if = Option::is_none)]
pub pending: Option<Vote>,
pub pending_vote: Option<Vote>,
/// Latest timeout vote we cast.
#[debug(skip_if = Option::is_none)]
pub timeout_vote: Option<Vote>,
Expand All @@ -145,9 +151,6 @@ pub struct ChainManager {
/// The owners that take over in fallback mode.
#[debug(skip_if = BTreeMap::is_empty)]
pub fallback_owners: BTreeMap<Owner, (PublicKey, u64)>,
/// These are blobs belonging to proposed or validated blocks that have not been confirmed yet.
#[debug(skip_if = BTreeMap::is_empty)]
pub pending_blobs: BTreeMap<BlobId, Blob>,
}

doc_scalar!(
Expand Down Expand Up @@ -209,20 +212,21 @@ impl ChainManager {
fallback_distribution,
proposed: None,
locked: None,
locked_published_blobs: BTreeMap::new(),
locked_used_blobs: BTreeMap::new(),
timeout: None,
pending: None,
pending_vote: None,
timeout_vote: None,
fallback_vote: None,
round_timeout,
current_round,
fallback_owners,
pending_blobs: BTreeMap::new(),
})
}

/// Returns the most recent vote we cast.
pub fn pending(&self) -> Option<&Vote> {
self.pending.as_ref()
pub fn pending_vote(&self) -> Option<&Vote> {
self.pending_vote.as_ref()
}

/// Verifies the safety of a proposed block with respect to voting rules.
Expand Down Expand Up @@ -352,7 +356,7 @@ impl ChainManager {
) -> Result<Outcome, ChainError> {
let new_block = &certificate.executed_block().block;
let new_round = certificate.round;
if let Some(Vote { value, round, .. }) = &self.pending {
if let Some(Vote { value, round, .. }) = &self.pending_vote {
match value.inner() {
CertificateValue::ConfirmedBlock(confirmed) => {
if &confirmed.inner().block == new_block && *round == new_round {
Expand Down Expand Up @@ -385,36 +389,46 @@ impl ChainManager {
Ok(Outcome::Accept)
}

/// If the validated block certificate is more recent, return that certificate
/// so that the locked block can be updated.
pub fn maybe_update_locked(
&self,
validated_block_certificate: Option<LiteCertificate<'static>>,
executed_block: ExecutedBlock,
) -> Option<Certificate> {
if let Some(lite_cert) = validated_block_certificate {
if self
.locked
.as_ref()
.map_or(true, |locked| locked.round < lite_cert.round)
{
let value = HashedCertificateValue::new_validated(executed_block);
return lite_cert.with_value(value);
}
}

None
}

/// Signs a vote to validate the proposed block.
pub fn create_vote(
&mut self,
proposal: BlockProposal,
outcome: BlockExecutionOutcome,
executed_block: ExecutedBlock,
key_pair: Option<&KeyPair>,
local_time: Timestamp,
used_blobs: Vec<Blob>,
maybe_update_locked: Option<Certificate>,
) {
let proposal_content = proposal.content.clone();
let published_blobs = proposal.published_blobs.clone();
// Record the proposed block, so it can be supplied to clients that request it.
self.proposed = Some(proposal.clone());
self.update_proposed(proposal);
self.update_current_round(local_time);
let ProposalContent { block, round, .. } = proposal.content;
let executed_block = outcome.with(block);

// If the validated block certificate is more recent, update our locked block.
if let Some(lite_cert) = proposal.validated_block_certificate {
if self
.locked
.as_ref()
.map_or(true, |locked| locked.round < lite_cert.round)
{
let value = HashedCertificateValue::new_validated(executed_block.clone());
if let Some(certificate) = lite_cert.with_value(value) {
self.locked = Some(certificate.into());
}
}
}
let ProposalContent { round, .. } = proposal_content;

for blob in proposal.blobs {
self.pending_blobs.insert(blob.id(), blob);
if let Some(certificate) = maybe_update_locked {
self.update_locked(certificate.into(), published_blobs, used_blobs);
}

if let Some(key_pair) = key_pair {
Expand All @@ -424,14 +438,16 @@ impl ChainManager {
} else {
HashedCertificateValue::new_validated(executed_block)
};
self.pending = Some(Vote::new(value, round, key_pair));
self.pending_vote = Some(Vote::new(value, round, key_pair));
}
}

/// Signs a vote to confirm the validated block.
pub fn create_final_vote(
&mut self,
validated: ValidatedBlockCertificate,
published_blobs: Vec<Blob>,
used_blobs: Vec<Blob>,
key_pair: Option<&KeyPair>,
local_time: Timestamp,
) {
Expand All @@ -442,15 +458,15 @@ impl ChainManager {
return;
}
let confirmed = ConfirmedBlockCertificate::from_validated(validated.clone());
self.locked = Some(validated);
self.update_locked(validated, published_blobs, used_blobs);
self.update_current_round(local_time);
if let Some(key_pair) = key_pair {
// Vote to confirm.
// NOTE: For backwards compatibility, we need to turn `ValidatedBlockCertificate`
// back into `Certificate` type so that the vote is cast over hash of the old type.
let vote = Vote::new(confirmed.into_inner().into(), round, key_pair);
// Ok to overwrite validation votes with confirmation votes at equal or higher round.
self.pending = Some(vote);
self.pending_vote = Some(vote);
}
}

Expand Down Expand Up @@ -561,6 +577,40 @@ impl ChainManager {
fn is_super(&self, owner: &Owner) -> bool {
self.ownership.super_owners.contains_key(owner)
}

fn update_locked(
&mut self,
new_locked: ValidatedBlockCertificate,
published_blobs: Vec<Blob>,
used_blobs: Vec<Blob>,
) {
self.locked = Some(new_locked);
self.locked_published_blobs = published_blobs
.into_iter()
.map(|blob| (blob.id(), blob))
.collect();
self.locked_used_blobs = used_blobs
.into_iter()
.map(|blob| (blob.id(), blob))
.collect();
}

fn update_proposed(&mut self, new_proposed: BlockProposal) {
self.proposed = Some(new_proposed);
}

pub fn proposed_blobs(&self) -> BTreeMap<BlobId, Blob> {
self.proposed
.as_ref()
.map(|proposed| {
proposed
.published_blobs
.iter()
.map(|blob| (blob.id(), blob.clone()))
.collect()
})
.unwrap_or_default()
}
}

/// Chain manager information that is included in `ChainInfo` sent to clients.
Expand All @@ -576,12 +626,18 @@ pub struct ChainManagerInfo {
/// validator).
#[debug(skip_if = Option::is_none)]
pub requested_locked: Option<Box<ValidatedBlockCertificate>>,
/// Published blobs belonging to the locked block.
#[debug(skip_if = Vec::is_empty)]
pub locked_published_blobs: Vec<Blob>,
/// Used blobs belonging to the locked block.
#[debug(skip_if = Vec::is_empty)]
pub locked_used_blobs: Vec<Blob>,
/// Latest timeout certificate we have seen.
#[debug(skip_if = Option::is_none)]
pub timeout: Option<Box<TimeoutCertificate>>,
/// Latest vote we cast (either to validate or to confirm a block).
#[debug(skip_if = Option::is_none)]
pub pending: Option<LiteVote>,
pub pending_vote: Option<LiteVote>,
/// Latest timeout vote we cast.
#[debug(skip_if = Option::is_none)]
pub timeout_vote: Option<LiteVote>,
Expand All @@ -600,9 +656,6 @@ pub struct ChainManagerInfo {
/// The timestamp when the current round times out.
#[debug(skip_if = Option::is_none)]
pub round_timeout: Option<Timestamp>,
/// These are blobs belonging to proposed or validated blocks that have not been confirmed yet.
#[debug(skip_if = BTreeMap::is_empty)]
pub pending_blobs: BTreeMap<BlobId, Blob>,
}

impl From<&ChainManager> for ChainManagerInfo {
Expand All @@ -612,15 +665,16 @@ impl From<&ChainManager> for ChainManagerInfo {
ownership: manager.ownership.clone(),
requested_proposed: None,
requested_locked: None,
locked_published_blobs: Vec::new(),
locked_used_blobs: Vec::new(),
timeout: manager.timeout.clone().map(Box::new),
pending: manager.pending.as_ref().map(|vote| vote.lite()),
pending_vote: manager.pending_vote.as_ref().map(|vote| vote.lite()),
timeout_vote: manager.timeout_vote.as_ref().map(Vote::lite),
fallback_vote: manager.fallback_vote.as_ref().map(Vote::lite),
requested_pending_value: None,
current_round,
leader: manager.round_leader(current_round).cloned(),
round_timeout: manager.round_timeout,
pending_blobs: BTreeMap::new(),
}
}
}
Expand All @@ -631,10 +685,11 @@ impl ChainManagerInfo {
self.requested_proposed = manager.proposed.clone().map(Box::new);
self.requested_locked = manager.locked.clone().map(Box::new);
self.requested_pending_value = manager
.pending
.pending_vote
.as_ref()
.map(|vote| Box::new(vote.value.clone()));
self.pending_blobs = manager.pending_blobs.clone();
self.locked_published_blobs = manager.locked_published_blobs.values().cloned().collect();
self.locked_used_blobs = manager.locked_used_blobs.values().cloned().collect();
}

/// Gets the highest validated block.
Expand Down
Loading

0 comments on commit 3b55663

Please sign in to comment.