Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Turn ChainManager's pending_blobs into proposed/locked specific pending blobs #2813

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion linera-chain/src/certificate/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@
// Copyright (c) Zefchain Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::collections::HashSet;

use custom_debug_derive::Debug;
use linera_base::{
crypto::{CryptoHash, Signature},
data_types::Round,
identifiers::BlobId,
};
use linera_execution::committee::{Committee, ValidatorName};

use super::hashed::Hashed;
use super::{hashed::Hashed, CertificateValue};
use crate::ChainError;

/// Generic type representing a certificate for `value` of type `T`.
Expand Down Expand Up @@ -95,6 +98,16 @@ impl<T> GenericCertificate<T> {
}
}

impl GenericCertificate<CertificateValue> {
pub fn required_blob_ids(&self) -> HashSet<BlobId> {
match self.inner() {
CertificateValue::ConfirmedBlock(confirmed) => confirmed.inner().required_blob_ids(),
CertificateValue::ValidatedBlock(validated) => validated.inner().required_blob_ids(),
CertificateValue::Timeout(_) => HashSet::new(),
}
}
}

impl<T: Clone> Clone for GenericCertificate<T> {
fn clone(&self) -> Self {
Self {
Expand Down
15 changes: 10 additions & 5 deletions linera-chain/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ pub struct BlockProposal {
pub owner: Owner,
pub signature: Signature,
#[debug(skip_if = Vec::is_empty)]
pub blobs: Vec<Blob>,
pub published_blobs: Vec<Blob>,
#[debug(skip_if = Option::is_none)]
pub validated_block_certificate: Option<LiteCertificate<'static>>,
}
Expand Down Expand Up @@ -736,7 +736,12 @@ pub struct ProposalContent {
}

impl BlockProposal {
pub fn new_initial(round: Round, block: Block, secret: &KeyPair, blobs: Vec<Blob>) -> Self {
pub fn new_initial(
round: Round,
block: Block,
secret: &KeyPair,
published_blobs: Vec<Blob>,
) -> Self {
let content = ProposalContent {
round,
block,
Expand All @@ -747,7 +752,7 @@ impl BlockProposal {
content,
owner: secret.public().into(),
signature,
blobs,
published_blobs,
validated_block_certificate: None,
}
}
Expand All @@ -756,7 +761,7 @@ impl BlockProposal {
round: Round,
validated_block_certificate: ValidatedBlockCertificate,
secret: &KeyPair,
blobs: Vec<Blob>,
published_blobs: Vec<Blob>,
) -> Self {
let lite_cert = validated_block_certificate.lite_certificate().cloned();
let executed_block = validated_block_certificate.into_inner().into_inner();
Expand All @@ -770,7 +775,7 @@ impl BlockProposal {
content,
owner: secret.public().into(),
signature,
blobs,
published_blobs,
validated_block_certificate: Some(lite_cert),
}
}
Expand Down
140 changes: 97 additions & 43 deletions linera-chain/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ use serde::{Deserialize, Serialize};

use crate::{
block::Timeout,
data_types::{Block, BlockExecutionOutcome, BlockProposal, LiteVote, ProposalContent, Vote},
data_types::{Block, BlockProposal, ExecutedBlock, LiteVote, ProposalContent, Vote},
types::{
CertificateValue, ConfirmedBlockCertificate, HashedCertificateValue, TimeoutCertificate,
ValidatedBlockCertificate,
Certificate, CertificateValue, ConfirmedBlockCertificate, HashedCertificateValue,
LiteCertificate, TimeoutCertificate, ValidatedBlockCertificate,
},
ChainError,
};
Expand Down Expand Up @@ -121,12 +121,18 @@ pub struct ChainManager {
/// validator).
#[debug(skip_if = Option::is_none)]
pub locked: Option<ValidatedBlockCertificate>,
/// These are the published blobs belonging to the locked block.
#[debug(skip_if = BTreeMap::is_empty)]
pub locked_published_blobs: BTreeMap<BlobId, Blob>,
/// These are the used blobs belonging to the locked block.
#[debug(skip_if = BTreeMap::is_empty)]
pub locked_used_blobs: BTreeMap<BlobId, Blob>,
/// Latest leader timeout certificate we have received.
#[debug(skip_if = Option::is_none)]
pub timeout: Option<TimeoutCertificate>,
/// Latest vote we have cast, to validate or confirm.
#[debug(skip_if = Option::is_none)]
pub pending: Option<Vote<CertificateValue>>,
pub pending_vote: Option<Vote<CertificateValue>>,
/// Latest timeout vote we cast.
#[debug(skip_if = Option::is_none)]
pub timeout_vote: Option<Vote<Timeout>>,
Expand All @@ -146,9 +152,6 @@ pub struct ChainManager {
/// The owners that take over in fallback mode.
#[debug(skip_if = BTreeMap::is_empty)]
pub fallback_owners: BTreeMap<Owner, (PublicKey, u64)>,
/// These are blobs belonging to proposed or validated blocks that have not been confirmed yet.
#[debug(skip_if = BTreeMap::is_empty)]
pub pending_blobs: BTreeMap<BlobId, Blob>,
}

doc_scalar!(
Expand Down Expand Up @@ -210,20 +213,21 @@ impl ChainManager {
fallback_distribution,
proposed: None,
locked: None,
locked_published_blobs: BTreeMap::new(),
locked_used_blobs: BTreeMap::new(),
timeout: None,
pending: None,
pending_vote: None,
timeout_vote: None,
fallback_vote: None,
round_timeout,
current_round,
fallback_owners,
pending_blobs: BTreeMap::new(),
})
}

/// Returns the most recent vote we cast.
pub fn pending(&self) -> Option<&Vote<CertificateValue>> {
self.pending.as_ref()
pub fn pending_vote(&self) -> Option<&Vote<CertificateValue>> {
self.pending_vote.as_ref()
}

/// Verifies the safety of a proposed block with respect to voting rules.
Expand Down Expand Up @@ -361,7 +365,7 @@ impl ChainManager {
) -> Result<Outcome, ChainError> {
let new_block = &certificate.executed_block().block;
let new_round = certificate.round;
if let Some(Vote { value, round, .. }) = &self.pending {
if let Some(Vote { value, round, .. }) = &self.pending_vote {
match value.inner() {
CertificateValue::ConfirmedBlock(confirmed) => {
if &confirmed.inner().block == new_block && *round == new_round {
Expand Down Expand Up @@ -394,36 +398,45 @@ impl ChainManager {
Ok(Outcome::Accept)
}

/// If the validated block certificate is more recent, returns that certificate
/// so that the locked block can be updated.
pub fn validated_block_if_newer_than_locked(
&self,
validated_block_certificate: Option<LiteCertificate<'static>>,
executed_block: ExecutedBlock,
) -> Option<Certificate> {
let lite_cert = validated_block_certificate?;
if self
.locked
.as_ref()
.map_or(true, |locked| locked.round < lite_cert.round)
{
let value = HashedCertificateValue::new_validated(executed_block);
return lite_cert.with_value(value);
}

None
}

/// Signs a vote to validate the proposed block.
pub fn create_vote(
&mut self,
proposal: BlockProposal,
outcome: BlockExecutionOutcome,
executed_block: ExecutedBlock,
key_pair: Option<&KeyPair>,
local_time: Timestamp,
used_blobs: Vec<Blob>,
ndr-ds marked this conversation as resolved.
Show resolved Hide resolved
maybe_update_locked: Option<Certificate>,
) {
let proposal_content = proposal.content.clone();
let published_blobs = proposal.published_blobs.clone();
// Record the proposed block, so it can be supplied to clients that request it.
self.proposed = Some(proposal.clone());
self.update_proposed(proposal);
self.update_current_round(local_time);
let ProposalContent { block, round, .. } = proposal.content;
let executed_block = outcome.with(block);

// If the validated block certificate is more recent, update our locked block.
if let Some(lite_cert) = proposal.validated_block_certificate {
if self
.locked
.as_ref()
.map_or(true, |locked| locked.round < lite_cert.round)
{
let value = HashedCertificateValue::new_validated(executed_block.clone());
if let Some(certificate) = lite_cert.with_value(value) {
self.locked = Some(certificate.into());
}
}
}
let ProposalContent { round, .. } = proposal_content;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change? Now we pass in the same block twice, basically.

Copy link
Contributor Author

@ndr-ds ndr-ds Nov 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now instead of building the ExecutedBlock here we do it outside the function. This was done to make the whole maybe_update_locked work and not pass used_blobs that won't be used here. Maybe there's a better way? This was the best I could think of.
I could also pass in the round, the content and the blobs as separate arguments, instead of the whole proposal
Nvm, can't do that 🤔 Open to suggestions on this!


for blob in proposal.blobs {
self.pending_blobs.insert(blob.id(), blob);
if let Some(certificate) = maybe_update_locked {
self.update_locked(certificate.into(), published_blobs, used_blobs);
}

if let Some(key_pair) = key_pair {
Expand All @@ -433,14 +446,16 @@ impl ChainManager {
} else {
HashedCertificateValue::new_validated(executed_block)
};
self.pending = Some(Vote::new(value, round, key_pair));
self.pending_vote = Some(Vote::new(value, round, key_pair));
}
}

/// Signs a vote to confirm the validated block.
pub fn create_final_vote(
&mut self,
validated: ValidatedBlockCertificate,
published_blobs: Vec<Blob>,
used_blobs: Vec<Blob>,
key_pair: Option<&KeyPair>,
local_time: Timestamp,
) {
Expand All @@ -451,15 +466,15 @@ impl ChainManager {
return;
}
let confirmed = ConfirmedBlockCertificate::from_validated(validated.clone());
self.locked = Some(validated);
self.update_locked(validated, published_blobs, used_blobs);
self.update_current_round(local_time);
if let Some(key_pair) = key_pair {
// Vote to confirm.
// NOTE: For backwards compatibility, we need to turn `ValidatedBlockCertificate`
// back into `Certificate` type so that the vote is cast over hash of the old type.
let vote = Vote::new(confirmed.into_inner().into(), round, key_pair);
// Ok to overwrite validation votes with confirmation votes at equal or higher round.
self.pending = Some(vote);
self.pending_vote = Some(vote);
}
}

Expand Down Expand Up @@ -570,6 +585,40 @@ impl ChainManager {
fn is_super(&self, owner: &Owner) -> bool {
self.ownership.super_owners.contains_key(owner)
}

fn update_locked(
&mut self,
new_locked: ValidatedBlockCertificate,
published_blobs: Vec<Blob>,
used_blobs: Vec<Blob>,
) {
self.locked = Some(new_locked);
self.locked_published_blobs = published_blobs
.into_iter()
.map(|blob| (blob.id(), blob))
.collect();
self.locked_used_blobs = used_blobs
.into_iter()
.map(|blob| (blob.id(), blob))
.collect();
afck marked this conversation as resolved.
Show resolved Hide resolved
}

fn update_proposed(&mut self, new_proposed: BlockProposal) {
self.proposed = Some(new_proposed);
}

pub fn proposed_blobs(&self) -> BTreeMap<BlobId, Blob> {
self.proposed
.as_ref()
.map(|proposed| {
proposed
.published_blobs
.iter()
.map(|blob| (blob.id(), blob.clone()))
.collect()
})
.unwrap_or_default()
}
}

/// Chain manager information that is included in `ChainInfo` sent to clients.
Expand All @@ -585,12 +634,18 @@ pub struct ChainManagerInfo {
/// validator).
#[debug(skip_if = Option::is_none)]
pub requested_locked: Option<Box<ValidatedBlockCertificate>>,
/// Published blobs belonging to the locked block.
#[debug(skip_if = Vec::is_empty)]
pub locked_published_blobs: Vec<Blob>,
/// Used blobs belonging to the locked block.
#[debug(skip_if = Vec::is_empty)]
pub locked_used_blobs: Vec<Blob>,
/// Latest timeout certificate we have seen.
#[debug(skip_if = Option::is_none)]
pub timeout: Option<Box<TimeoutCertificate>>,
/// Latest vote we cast (either to validate or to confirm a block).
#[debug(skip_if = Option::is_none)]
pub pending: Option<LiteVote>,
pub pending_vote: Option<LiteVote>,
/// Latest timeout vote we cast.
#[debug(skip_if = Option::is_none)]
pub timeout_vote: Option<LiteVote>,
Expand All @@ -609,9 +664,6 @@ pub struct ChainManagerInfo {
/// The timestamp when the current round times out.
#[debug(skip_if = Option::is_none)]
pub round_timeout: Option<Timestamp>,
/// These are blobs belonging to proposed or validated blocks that have not been confirmed yet.
#[debug(skip_if = BTreeMap::is_empty)]
pub pending_blobs: BTreeMap<BlobId, Blob>,
}

impl From<&ChainManager> for ChainManagerInfo {
Expand All @@ -621,15 +673,16 @@ impl From<&ChainManager> for ChainManagerInfo {
ownership: manager.ownership.clone(),
requested_proposed: None,
requested_locked: None,
locked_published_blobs: Vec::new(),
locked_used_blobs: Vec::new(),
timeout: manager.timeout.clone().map(Box::new),
pending: manager.pending.as_ref().map(|vote| vote.lite()),
pending_vote: manager.pending_vote.as_ref().map(|vote| vote.lite()),
timeout_vote: manager.timeout_vote.as_ref().map(Vote::lite),
fallback_vote: manager.fallback_vote.as_ref().map(Vote::lite),
requested_pending_value: None,
current_round,
leader: manager.round_leader(current_round).cloned(),
round_timeout: manager.round_timeout,
pending_blobs: BTreeMap::new(),
}
}
}
Expand All @@ -640,10 +693,11 @@ impl ChainManagerInfo {
self.requested_proposed = manager.proposed.clone().map(Box::new);
self.requested_locked = manager.locked.clone().map(Box::new);
self.requested_pending_value = manager
.pending
.pending_vote
.as_ref()
.map(|vote| Box::new(vote.value.clone()));
self.pending_blobs = manager.pending_blobs.clone();
self.locked_published_blobs = manager.locked_published_blobs.values().cloned().collect();
self.locked_used_blobs = manager.locked_used_blobs.values().cloned().collect();
afck marked this conversation as resolved.
Show resolved Hide resolved
}

/// Gets the highest validated block.
Expand Down
Loading
Loading