Skip to content

Commit

Permalink
Turn ChainManager's pending_blobs into proposed/locked specific pendi…
Browse files Browse the repository at this point in the history
…ng blobs
  • Loading branch information
ndr-ds committed Nov 5, 2024
1 parent f1966ea commit 5cbadfd
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 29 deletions.
33 changes: 22 additions & 11 deletions linera-chain/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,13 @@ pub struct ChainManager {
/// Highest-round authenticated block that we have received and checked. If there are multiple
/// proposals in the same round, this contains only the first one.
pub proposed: Option<BlockProposal>,
/// These are blobs belonging to proposed blocks that have not been confirmed yet.
pub proposed_pending_blobs: BTreeMap<BlobId, Blob>,
/// Latest validated proposal that we have voted to confirm (or would have, if we are not a
/// validator).
pub locked: Option<Certificate>,
/// These are blobs belonging to validated blocks that have not been confirmed yet.
pub locked_pending_blobs: BTreeMap<BlobId, Blob>,
/// Latest leader timeout certificate we have received.
pub timeout: Option<Certificate>,
/// Latest vote we have cast, to validate or confirm.
Expand All @@ -134,8 +138,6 @@ pub struct ChainManager {
pub current_round: Round,
/// The owners that take over in fallback mode.
pub fallback_owners: BTreeMap<Owner, (PublicKey, u64)>,
/// These are blobs belonging to proposed or validated blocks that have not been confirmed yet.
pub pending_blobs: BTreeMap<BlobId, Blob>,
}

doc_scalar!(
Expand Down Expand Up @@ -196,15 +198,16 @@ impl ChainManager {
distribution,
fallback_distribution,
proposed: None,
proposed_pending_blobs: BTreeMap::new(),
locked: None,
locked_pending_blobs: BTreeMap::new(),
timeout: None,
pending: None,
timeout_vote: None,
fallback_vote: None,
round_timeout,
current_round,
fallback_owners,
pending_blobs: BTreeMap::new(),
})
}

Expand Down Expand Up @@ -380,6 +383,12 @@ impl ChainManager {
) {
// Record the proposed block, so it can be supplied to clients that request it.
self.proposed = Some(proposal.clone());
self.proposed_pending_blobs = proposal
.blobs
.into_iter()
.map(|blob| (blob.id(), blob))
.collect();

self.update_current_round(local_time);
let ProposalContent { block, round, .. } = proposal.content;
let executed_block = outcome.with(block);
Expand All @@ -394,14 +403,11 @@ impl ChainManager {
let value = HashedCertificateValue::new_validated(executed_block.clone());
if let Some(certificate) = lite_cert.with_value(value) {
self.locked = Some(certificate);
self.locked_pending_blobs = self.proposed_pending_blobs.clone();
}
}
}

for blob in proposal.blobs {
self.pending_blobs.insert(blob.id(), blob);
}

if let Some(key_pair) = key_pair {
// If this is a fast block, vote to confirm. Otherwise vote to validate.
let value = if round.is_fast() {
Expand Down Expand Up @@ -432,6 +438,7 @@ impl ChainManager {
return;
};
self.locked = Some(certificate);
self.locked_pending_blobs = self.proposed_pending_blobs.clone();
self.update_current_round(local_time);
if let Some(key_pair) = key_pair {
// Vote to confirm.
Expand Down Expand Up @@ -559,9 +566,13 @@ pub struct ChainManagerInfo {
pub ownership: ChainOwnership,
/// Latest authenticated block that we have received, if requested.
pub requested_proposed: Option<Box<BlockProposal>>,
/// These are blobs belonging to proposed blocks that have not been confirmed yet.
pub proposed_pending_blobs: BTreeMap<BlobId, Blob>,
/// Latest validated proposal that we have voted to confirm (or would have, if we are not a
/// validator).
pub requested_locked: Option<Box<Certificate>>,
/// These are blobs belonging to validated blocks that have not been confirmed yet.
pub locked_pending_blobs: BTreeMap<BlobId, Blob>,
/// Latest timeout certificate we have seen.
pub timeout: Option<Box<Certificate>>,
/// Latest vote we cast (either to validate or to confirm a block).
Expand All @@ -579,8 +590,6 @@ pub struct ChainManagerInfo {
pub leader: Option<Owner>,
/// The timestamp when the current round times out.
pub round_timeout: Option<Timestamp>,
/// These are blobs belonging to proposed or validated blocks that have not been confirmed yet.
pub pending_blobs: BTreeMap<BlobId, Blob>,
}

impl From<&ChainManager> for ChainManagerInfo {
Expand All @@ -589,7 +598,9 @@ impl From<&ChainManager> for ChainManagerInfo {
ChainManagerInfo {
ownership: manager.ownership.clone(),
requested_proposed: None,
proposed_pending_blobs: BTreeMap::new(),
requested_locked: None,
locked_pending_blobs: BTreeMap::new(),
timeout: manager.timeout.clone().map(Box::new),
pending: manager.pending.as_ref().map(|vote| vote.lite()),
timeout_vote: manager.timeout_vote.as_ref().map(Vote::lite),
Expand All @@ -598,7 +609,6 @@ impl From<&ChainManager> for ChainManagerInfo {
current_round,
leader: manager.round_leader(current_round).cloned(),
round_timeout: manager.round_timeout,
pending_blobs: BTreeMap::new(),
}
}
}
Expand All @@ -612,7 +622,8 @@ impl ChainManagerInfo {
.pending
.as_ref()
.map(|vote| Box::new(vote.value.clone()));
self.pending_blobs = manager.pending_blobs.clone();
self.locked_pending_blobs = manager.locked_pending_blobs.clone();
self.proposed_pending_blobs = manager.proposed_pending_blobs.clone();
}

/// Gets the highest validated block.
Expand Down
8 changes: 4 additions & 4 deletions linera-core/src/chain_worker/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,12 +305,12 @@ where
}

/// Returns an error if the block requires a blob we don't have.
/// Looks for the blob in: chain manager's pending blobs and storage.
/// Looks for the blob in: chain manager's proposed pending blobs and storage.
async fn check_no_missing_blobs(
&self,
required_blob_ids: &HashSet<BlobId>,
) -> Result<(), WorkerError> {
let pending_blobs = &self.chain.manager.get().pending_blobs;
let pending_blobs = &self.chain.manager.get().proposed_pending_blobs;
let missing_blob_ids = required_blob_ids
.iter()
.filter(|blob_id| !pending_blobs.contains_key(blob_id))
Expand Down Expand Up @@ -347,13 +347,13 @@ where
Ok(())
}

/// Returns the blobs requested by their `blob_ids` that are in the chain manager's pending blobs
/// Returns the blobs requested by their `blob_ids` that are in the chain manager's locked pending blobs
/// and checks that they are otherwise in storage.
async fn get_blobs_and_checks_storage(
&self,
blob_ids: &HashSet<BlobId>,
) -> Result<Vec<Blob>, WorkerError> {
let pending_blobs = &self.chain.manager.get().pending_blobs;
let pending_blobs = &self.chain.manager.get().locked_pending_blobs;

let mut found_blobs = Vec::new();
let mut missing_blob_ids = Vec::new();
Expand Down
6 changes: 3 additions & 3 deletions linera-core/src/client/chain_client_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ pub struct ChainClientState {
///
/// This is always at the same height as `next_block_height`.
pending_block: Option<Block>,
/// This contains blobs belonging to our `pending_block` that may not even have
/// been processed by (i.e. been proposed to) our own local chain manager yet.
pending_blobs: BTreeMap<BlobId, Blob>,
/// Known key pairs from present and past identities.
known_key_pairs: BTreeMap<Owner, KeyPair>,

/// For each validator, up to which index we have synchronized their
/// [`ChainStateView::received_log`].
received_certificate_trackers: HashMap<ValidatorName, u64>,
/// This contains blobs belonging to our `pending_block` that may not even have
/// been processed by (i.e. been proposed to) our own local chain manager yet.
pending_blobs: BTreeMap<BlobId, Blob>,

/// A mutex that is held whilst we are performing operations that should not be
/// attempted by multiple clients at the same time.
Expand Down
6 changes: 3 additions & 3 deletions linera-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1802,8 +1802,8 @@ where
}
}

/// Tries to read blobs from either the pending blobs or the local node's cache, or
/// storage
/// Tries to read blobs from either the proposed pending blobs or the
/// local node's cache, or storage
#[instrument(level = "trace")]
async fn read_local_blobs(
&self,
Expand All @@ -1822,7 +1822,7 @@ where
chain_state_view
.manager
.get()
.pending_blobs
.proposed_pending_blobs
.get(&blob_id)
.cloned()
};
Expand Down
2 changes: 1 addition & 1 deletion linera-core/src/local_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ where
.await?
.manager
.get()
.pending_blobs
.locked_pending_blobs
.clone();
let mut found_blobs = Vec::new();
for blob_id in missing_blob_ids {
Expand Down
2 changes: 1 addition & 1 deletion linera-core/src/remote_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ impl<N: ValidatorNode> RemoteNode<N> {
let mut found_blobs = if let Some(info) = info {
let new_found_blobs = missing_blobs
.iter()
.filter_map(|blob_id| info.manager.pending_blobs.get(blob_id))
.filter_map(|blob_id| info.manager.locked_pending_blobs.get(blob_id))
.map(|blob| (blob.id(), blob.clone()))
.collect::<HashMap<_, _>>();
missing_blobs.retain(|blob_id| !new_found_blobs.contains_key(blob_id));
Expand Down
18 changes: 12 additions & 6 deletions linera-rpc/tests/snapshots/format__format.yaml.snap
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,21 @@ ChainManagerInfo:
- requested_proposed:
OPTION:
TYPENAME: BlockProposal
- proposed_pending_blobs:
MAP:
KEY:
TYPENAME: BlobId
VALUE:
TYPENAME: BlobContent
- requested_locked:
OPTION:
TYPENAME: Certificate
- locked_pending_blobs:
MAP:
KEY:
TYPENAME: BlobId
VALUE:
TYPENAME: BlobContent
- timeout:
OPTION:
TYPENAME: Certificate
Expand All @@ -290,12 +302,6 @@ ChainManagerInfo:
- round_timeout:
OPTION:
TYPENAME: Timestamp
- pending_blobs:
MAP:
KEY:
TYPENAME: BlobId
VALUE:
TYPENAME: BlobContent
ChainOwnership:
STRUCT:
- super_owners:
Expand Down

0 comments on commit 5cbadfd

Please sign in to comment.