Skip to content

Commit

Permalink
Turn ChainManager's pending_blobs into proposed/locked specific pendi…
Browse files Browse the repository at this point in the history
…ng blobs
  • Loading branch information
ndr-ds committed Nov 12, 2024
1 parent 8a48bfb commit dc5272b
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 117 deletions.
67 changes: 43 additions & 24 deletions linera-chain/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,12 @@ pub struct ChainManager {
/// Latest validated proposal that we have voted to confirm (or would have, if we are not a
/// validator).
pub locked: Option<ValidatedBlockCertificate>,
/// These are blobs belonging to validated block above.
pub locked_blobs: BTreeMap<BlobId, Blob>,
/// Latest leader timeout certificate we have received.
pub timeout: Option<TimeoutCertificate>,
/// Latest vote we have cast, to validate or confirm.
pub pending: Option<Vote>,
pub pending_vote: Option<Vote>,
/// Latest timeout vote we cast.
pub timeout_vote: Option<Vote>,
/// Fallback vote we cast.
Expand All @@ -134,8 +136,6 @@ pub struct ChainManager {
pub current_round: Round,
/// The owners that take over in fallback mode.
pub fallback_owners: BTreeMap<Owner, (PublicKey, u64)>,
/// These are blobs belonging to proposed or validated blocks that have not been confirmed yet.
pub pending_blobs: BTreeMap<BlobId, Blob>,
}

doc_scalar!(
Expand Down Expand Up @@ -197,20 +197,20 @@ impl ChainManager {
fallback_distribution,
proposed: None,
locked: None,
locked_blobs: BTreeMap::new(),
timeout: None,
pending: None,
pending_vote: None,
timeout_vote: None,
fallback_vote: None,
round_timeout,
current_round,
fallback_owners,
pending_blobs: BTreeMap::new(),
})
}

/// Returns the most recent vote we cast.
pub fn pending(&self) -> Option<&Vote> {
self.pending.as_ref()
self.pending_vote.as_ref()
}

/// Verifies the safety of a proposed block with respect to voting rules.
Expand Down Expand Up @@ -340,7 +340,7 @@ impl ChainManager {
) -> Result<Outcome, ChainError> {
let new_block = &certificate.executed_block().block;
let new_round = certificate.round;
if let Some(Vote { value, round, .. }) = &self.pending {
if let Some(Vote { value, round, .. }) = &self.pending_vote {
match value.inner() {
CertificateValue::ConfirmedBlock { executed_block } => {
if &executed_block.block == new_block && *round == new_round {
Expand Down Expand Up @@ -381,45 +381,45 @@ impl ChainManager {
key_pair: Option<&KeyPair>,
local_time: Timestamp,
) {
let validated_block_certificate = proposal.validated_block_certificate.clone();
let proposal_content = proposal.content.clone();
let proposal_blobs = proposal.blobs.clone();
// Record the proposed block, so it can be supplied to clients that request it.
self.proposed = Some(proposal.clone());
self.update_proposed(proposal);
self.update_current_round(local_time);
let ProposalContent { block, round, .. } = proposal.content;
let ProposalContent { block, round, .. } = proposal_content;
let executed_block = outcome.with(block);

// If the validated block certificate is more recent, update our locked block.
if let Some(lite_cert) = proposal.validated_block_certificate {
if let Some(lite_cert) = validated_block_certificate {
if self
.locked
.as_ref()
.map_or(true, |locked| locked.round < lite_cert.round)
{
let value = HashedCertificateValue::new_validated(executed_block.clone());
if let Some(certificate) = lite_cert.with_value(value) {
self.locked = Some(certificate.into());
self.update_locked(certificate.into(), proposal_blobs);
}
}
}

for blob in proposal.blobs {
self.pending_blobs.insert(blob.id(), blob);
}

if let Some(key_pair) = key_pair {
// If this is a fast block, vote to confirm. Otherwise vote to validate.
let value = if round.is_fast() {
HashedCertificateValue::new_confirmed(executed_block)
} else {
HashedCertificateValue::new_validated(executed_block)
};
self.pending = Some(Vote::new(value, round, key_pair));
self.pending_vote = Some(Vote::new(value, round, key_pair));
}
}

/// Signs a vote to confirm the validated block.
pub fn create_final_vote(
&mut self,
validated: ValidatedBlockCertificate,
validated_blobs: Vec<Blob>,
key_pair: Option<&KeyPair>,
local_time: Timestamp,
) {
Expand All @@ -430,15 +430,15 @@ impl ChainManager {
return;
}
let confirmed = ConfirmedBlockCertificate::from_validated(validated.clone());
self.locked = Some(validated);
self.update_locked(validated, validated_blobs);
self.update_current_round(local_time);
if let Some(key_pair) = key_pair {
// Vote to confirm.
// NOTE: For backwards compatibility, we need to turn `ValidatedBlockCertificate`
// back into `Certificate` type so that the vote is cast over hash of the old type.
let vote = Vote::new(confirmed.into_inner().into(), round, key_pair);
// Ok to overwrite validation votes with confirmation votes at equal or higher round.
self.pending = Some(vote);
self.pending_vote = Some(vote);
}
}

Expand Down Expand Up @@ -549,6 +549,25 @@ impl ChainManager {
fn is_super(&self, owner: &Owner) -> bool {
self.ownership.super_owners.contains_key(owner)
}

fn update_locked(&mut self, new_locked: ValidatedBlockCertificate, validated_blobs: Vec<Blob>) {
self.locked = Some(new_locked);
self.locked_blobs = validated_blobs
.into_iter()
.map(|blob| (blob.id(), blob))
.collect();
}

fn update_proposed(&mut self, new_proposed: BlockProposal) {
self.proposed = Some(new_proposed);
}

pub fn proposed_blobs(&self) -> &[Blob] {
self.proposed
.as_ref()
.map(|proposed| proposed.blobs.as_ref())
.unwrap_or_default()
}
}

/// Chain manager information that is included in `ChainInfo` sent to clients.
Expand All @@ -562,6 +581,8 @@ pub struct ChainManagerInfo {
/// Latest validated proposal that we have voted to confirm (or would have, if we are not a
/// validator).
pub requested_locked: Option<Box<ValidatedBlockCertificate>>,
/// These are blobs belonging to validated blocks that have not been confirmed yet.
pub locked_blobs: Vec<Blob>,
/// Latest timeout certificate we have seen.
pub timeout: Option<Box<TimeoutCertificate>>,
/// Latest vote we cast (either to validate or to confirm a block).
Expand All @@ -579,8 +600,6 @@ pub struct ChainManagerInfo {
pub leader: Option<Owner>,
/// The timestamp when the current round times out.
pub round_timeout: Option<Timestamp>,
/// These are blobs belonging to proposed or validated blocks that have not been confirmed yet.
pub pending_blobs: BTreeMap<BlobId, Blob>,
}

impl From<&ChainManager> for ChainManagerInfo {
Expand All @@ -590,15 +609,15 @@ impl From<&ChainManager> for ChainManagerInfo {
ownership: manager.ownership.clone(),
requested_proposed: None,
requested_locked: None,
locked_blobs: Vec::new(),
timeout: manager.timeout.clone().map(Box::new),
pending: manager.pending.as_ref().map(|vote| vote.lite()),
pending: manager.pending_vote.as_ref().map(|vote| vote.lite()),
timeout_vote: manager.timeout_vote.as_ref().map(Vote::lite),
fallback_vote: manager.fallback_vote.as_ref().map(Vote::lite),
requested_pending_value: None,
current_round,
leader: manager.round_leader(current_round).cloned(),
round_timeout: manager.round_timeout,
pending_blobs: BTreeMap::new(),
}
}
}
Expand All @@ -609,10 +628,10 @@ impl ChainManagerInfo {
self.requested_proposed = manager.proposed.clone().map(Box::new);
self.requested_locked = manager.locked.clone().map(Box::new);
self.requested_pending_value = manager
.pending
.pending_vote
.as_ref()
.map(|vote| Box::new(vote.value.clone()));
self.pending_blobs = manager.pending_blobs.clone();
self.locked_blobs = manager.locked_blobs.values().cloned().collect();
}

/// Gets the highest validated block.
Expand Down
8 changes: 4 additions & 4 deletions linera-core/src/chain_worker/state/attempted_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,12 @@ where
.difference(&blobs.iter().map(|blob| blob.id()).collect())
.cloned()
.collect();
self.state
.check_no_missing_blobs(&remaining_required_blob_ids)
.await?;
let mut validated_blobs = self.state.get_blobs(&remaining_required_blob_ids).await?;
validated_blobs.extend_from_slice(blobs);
let old_round = self.state.chain.manager.get().current_round;
self.state.chain.manager.get_mut().create_final_vote(
certificate,
validated_blobs,
self.state.config.key_pair(),
self.state.storage.clock().current_time(),
);
Expand Down Expand Up @@ -299,7 +299,7 @@ where
.collect();
let mut blobs_in_block = self
.state
.get_blobs_and_checks_storage(&remaining_required_blob_ids)
.get_blobs_and_check_storage(&remaining_required_blob_ids)
.await?;
blobs_in_block.extend_from_slice(blobs);

Expand Down
42 changes: 24 additions & 18 deletions linera-core/src/chain_worker/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,28 +306,34 @@ where
}

/// Returns an error if the block requires a blob we don't have.
/// Looks for the blob in: chain manager's pending blobs and storage.
async fn check_no_missing_blobs(
/// Looks for the blob in: chain manager's proposed blobs and storage.
async fn get_blobs(
&self,
required_blob_ids: &HashSet<BlobId>,
) -> Result<(), WorkerError> {
let pending_blobs = &self.chain.manager.get().pending_blobs;
let missing_blob_ids = required_blob_ids
) -> Result<Vec<Blob>, WorkerError> {
let (found_blob_ids, found_blobs) = self
.chain
.manager
.get()
.proposed_blobs()
.iter()
.filter(|blob_id| !pending_blobs.contains_key(blob_id))
.filter(|&blob| required_blob_ids.contains(&blob.id()))
.map(|blob| (blob.id(), blob.clone()))
.unzip::<_, _, HashSet<_>, Vec<_>>();

let missing_blob_ids = required_blob_ids
.difference(&found_blob_ids)
.cloned()
.collect::<Vec<_>>();

let missing_blob_ids = self
.storage
.missing_blobs(missing_blob_ids.as_slice())
.await?;

if missing_blob_ids.is_empty() {
return Ok(());
// Since the blobs returned from this will be set as pending, we don't need to return
// the blobs we've found in storage, as they're not pending.
let missing_blob_ids = self.storage.missing_blobs(&missing_blob_ids).await?;
if !missing_blob_ids.is_empty() {
return Err(WorkerError::BlobsNotFound(missing_blob_ids));
}

Err(WorkerError::BlobsNotFound(missing_blob_ids))
Ok(found_blobs)
}

/// Returns an error if unrelated blobs were provided.
Expand All @@ -348,13 +354,13 @@ where
Ok(())
}

/// Returns the blobs requested by their `blob_ids` that are in the chain manager's pending blobs
/// and checks that they are otherwise in storage.
async fn get_blobs_and_checks_storage(
/// Returns the blobs requested by their `blob_ids` that are in the chain manager's locked blobs
/// and check that they are otherwise in storage.
async fn get_blobs_and_check_storage(
&self,
blob_ids: &HashSet<BlobId>,
) -> Result<Vec<Blob>, WorkerError> {
let pending_blobs = &self.chain.manager.get().pending_blobs;
let pending_blobs = &self.chain.manager.get().locked_blobs;

let mut found_blobs = Vec::new();
let mut missing_blob_ids = Vec::new();
Expand Down
6 changes: 3 additions & 3 deletions linera-core/src/client/chain_client_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ pub struct ChainClientState {
///
/// This is always at the same height as `next_block_height`.
pending_block: Option<Block>,
/// This contains blobs belonging to our `pending_block` that may not even have
/// been processed by (i.e. been proposed to) our own local chain manager yet.
pending_blobs: BTreeMap<BlobId, Blob>,
/// Known key pairs from present and past identities.
known_key_pairs: BTreeMap<Owner, KeyPair>,

/// For each validator, up to which index we have synchronized their
/// [`ChainStateView::received_log`].
received_certificate_trackers: HashMap<ValidatorName, u64>,
/// This contains blobs belonging to our `pending_block` that may not even have
/// been processed by (i.e. been proposed to) our own local chain manager yet.
pending_blobs: BTreeMap<BlobId, Blob>,

/// A mutex that is held whilst we are performing operations that should not be
/// attempted by multiple clients at the same time.
Expand Down
15 changes: 10 additions & 5 deletions linera-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1745,7 +1745,7 @@ where
&original_err
{
if let Some(new_blobs) = remote_node
.find_missing_blobs(blob_ids.clone(), chain_id)
.find_missing_blobs(blob_ids.clone(), &info.manager.locked_blobs)
.await?
{
blobs = new_blobs;
Expand Down Expand Up @@ -1890,8 +1890,8 @@ where
}
}

/// Tries to read blobs from either the pending blobs or the local node's cache, or
/// storage
/// Tries to read blobs from either the locked blobs or the
/// local node's cache, or storage
#[instrument(level = "trace")]
async fn read_local_blobs(
&self,
Expand All @@ -1910,7 +1910,7 @@ where
chain_state_view
.manager
.get()
.pending_blobs
.locked_blobs
.get(&blob_id)
.cloned()
};
Expand Down Expand Up @@ -2000,7 +2000,12 @@ where
// Create the final block proposal.
let key_pair = self.key_pair().await?;
let proposal = if let Some(cert) = manager.requested_locked {
Box::new(BlockProposal::new_retry(round, *cert, &key_pair, blobs))
Box::new(BlockProposal::new_retry(
round,
*cert,
&key_pair,
manager.locked_blobs,
))
} else {
Box::new(BlockProposal::new_initial(
round,
Expand Down
Loading

0 comments on commit dc5272b

Please sign in to comment.