Skip to content

Commit

Permalink
Replace certificate chain info query usage with certificate download
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-ds committed Jun 1, 2024
1 parent 4e2ecb0 commit 22df812
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 123 deletions.
4 changes: 0 additions & 4 deletions linera-core/src/chain_worker/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,10 +709,6 @@ where
let start = usize::try_from(start).map_err(|_| ArithmeticError::Overflow)?;
info.requested_received_log = self.chain.received_log.read(start..).await?;
}
if let Some(hash) = query.request_hashed_certificate_value {
info.requested_hashed_certificate_value =
Some(self.storage.read_hashed_certificate_value(hash).await?);
}
if query.request_manager_values {
info.manager.add_values(self.chain.manager.get());
}
Expand Down
52 changes: 22 additions & 30 deletions linera-core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -729,19 +729,15 @@ where
async fn find_missing_application_bytecodes(
&self,
locations: &[BytecodeLocation],
nodes: &[(ValidatorName, <P as LocalValidatorNodeProvider>::Node)],
chain_id: ChainId,
nodes: &[<P as LocalValidatorNodeProvider>::Node],
) -> Vec<HashedCertificateValue> {
future::join_all(locations.iter().map(|location| {
LocalNodeClient::<S>::download_hashed_certificate_value(
nodes.to_owned(),
chain_id,
*location,
)
LocalNodeClient::<S>::download_hashed_certificate_value(nodes.to_owned(), *location)
}))
.await
.into_iter()
.flatten()
.flatten()
.collect::<Vec<_>>()
}

Expand Down Expand Up @@ -796,28 +792,25 @@ where
.process_certificate(certificate.clone(), vec![], vec![])
.await
{
let nodes = nodes
.into_iter()
.map(|(_name, node)| node)
.collect::<Vec<_>>();

match &err {
LocalNodeError::WorkerError(WorkerError::ApplicationBytecodesNotFound(
locations,
)) => {
let values = self
.find_missing_application_bytecodes(locations, &nodes, block.chain_id)
.find_missing_application_bytecodes(locations, &nodes)
.await;

ensure!(values.len() == locations.len(), err);
self.process_certificate(certificate.clone(), values.clone(), vec![])
.await?;
}
LocalNodeError::WorkerError(WorkerError::BlobsNotFound(blob_ids)) => {
let blobs = self
.find_missing_blobs(
blob_ids,
&nodes
.into_iter()
.map(|(_name, node)| node)
.collect::<Vec<_>>(),
)
.await;
let blobs = self.find_missing_blobs(blob_ids, &nodes).await;

ensure!(blobs.len() == blob_ids.len(), err);
self.process_certificate(certificate.clone(), vec![], blobs)
Expand All @@ -828,17 +821,9 @@ where
blob_ids,
)) => {
let values = self
.find_missing_application_bytecodes(locations, &nodes, block.chain_id)
.await;
let blobs = self
.find_missing_blobs(
blob_ids,
&nodes
.into_iter()
.map(|(_name, node)| node)
.collect::<Vec<_>>(),
)
.find_missing_application_bytecodes(locations, &nodes)
.await;
let blobs = self.find_missing_blobs(blob_ids, &nodes).await;

ensure!(
blobs.len() == blob_ids.len() && values.len() == locations.len(),
Expand Down Expand Up @@ -1255,11 +1240,18 @@ where
};
// Collect the hashed certificate values required for execution.
let committee = self.local_committee().await?;
let nodes: Vec<(ValidatorName, P::Node)> =
self.validator_node_provider.make_nodes(&committee)?;
let nodes = self
.validator_node_provider
.make_nodes::<Vec<_>>(&committee)?
.into_iter()
.map(|(_name, node)| node)
.collect();
let values = self
.node_client
.read_or_download_hashed_certificate_values(nodes.clone(), block.bytecode_locations())
.read_or_download_hashed_certificate_values(
nodes,
block.bytecode_locations().into_keys(),
)
.await?;
let hashed_blobs = self.read_local_blobs(block.blob_ids()).await?;
// Create the final block proposal.
Expand Down
15 changes: 1 addition & 14 deletions linera-core/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ use linera_base::{
identifiers::{ChainDescription, ChainId, Owner},
};
use linera_chain::{
data_types::{
Certificate, ChainAndHeight, HashedCertificateValue, IncomingMessage, Medium, MessageBundle,
},
data_types::{Certificate, ChainAndHeight, IncomingMessage, Medium, MessageBundle},
manager::ChainManagerInfo,
ChainStateView,
};
Expand Down Expand Up @@ -68,8 +66,6 @@ pub struct ChainInfoQuery {
pub request_leader_timeout: bool,
/// Include a vote to switch to fallback mode, if appropriate.
pub request_fallback: bool,
/// Query a certificate value that contains a binary blob (e.g. bytecode) required by this chain.
pub request_hashed_certificate_value: Option<CryptoHash>,
}

impl ChainInfoQuery {
Expand All @@ -85,7 +81,6 @@ impl ChainInfoQuery {
request_manager_values: false,
request_leader_timeout: false,
request_fallback: false,
request_hashed_certificate_value: None,
}
}

Expand Down Expand Up @@ -133,11 +128,6 @@ impl ChainInfoQuery {
self.request_fallback = true;
self
}

pub fn with_hashed_certificate_value(mut self, hash: CryptoHash) -> Self {
self.request_hashed_certificate_value = Some(hash);
self
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -173,8 +163,6 @@ pub struct ChainInfo {
pub count_received_log: usize,
/// The response to `request_received_certificates_excluding_first_nth`
pub requested_received_log: Vec<ChainAndHeight>,
/// The requested hashed certificate value, if any.
pub requested_hashed_certificate_value: Option<HashedCertificateValue>,
}

/// The response to an `ChainInfoQuery`
Expand Down Expand Up @@ -253,7 +241,6 @@ where
requested_sent_certificates: Vec::new(),
count_received_log: view.received_log.count(),
requested_received_log: Vec::new(),
requested_hashed_certificate_value: None,
}
}
}
Expand Down
100 changes: 46 additions & 54 deletions linera-core/src/local_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{borrow::Cow, sync::Arc};

use futures::{future, lock::Mutex};
use linera_base::{
crypto::CryptoHash,
data_types::{ArithmeticError, BlockHeight, HashedBlob},
ensure,
identifiers::{BlobId, ChainId, MessageId},
Expand Down Expand Up @@ -75,6 +76,11 @@ pub enum LocalNodeError {

#[error("Got different blob id from downloaded blob when trying to download {blob_id:?}")]
BlobIdMismatch { blob_id: BlobId },

#[error(
"Got different hash from downloaded certificate value when trying to download {hash:?}"
)]
CertificateValueHashMismatch { hash: CryptoHash },
}

impl<S> LocalNodeClient<S>
Expand Down Expand Up @@ -189,26 +195,24 @@ where

async fn find_missing_application_bytecodes<A>(
&self,
chain_id: ChainId,
locations: &[BytecodeLocation],
node: &mut A,
name: ValidatorName,
) -> Vec<HashedCertificateValue>
where
A: LocalValidatorNode + Clone + 'static,
{
future::join_all(locations.iter().map(|location| {
let mut node = node.clone();
async move {
Self::try_download_hashed_certificate_value_from(
name, &mut node, chain_id, *location,
)
.await
}
}))
future::join_all(
locations.iter().map(|location| {
let mut node = node.clone();
async move {
Self::try_download_hashed_certificate_value_from(&mut node, *location).await
}
}),
)
.await
.into_iter()
.flatten()
.flatten()
.collect::<Vec<_>>()
}

Expand All @@ -229,7 +233,6 @@ where

async fn try_process_certificates<A>(
&mut self,
name: ValidatorName,
node: &mut A,
chain_id: ChainId,
certificates: Vec<Certificate>,
Expand All @@ -254,12 +257,7 @@ where
locations,
))) => {
let values = self
.find_missing_application_bytecodes(
certificate.value().chain_id(),
locations,
node,
name,
)
.find_missing_application_bytecodes(locations, node)
.await;

if values.len() != locations.len() {
Expand All @@ -279,9 +277,8 @@ where
Err(LocalNodeError::WorkerError(
WorkerError::ApplicationBytecodesAndBlobsNotFound(locations, blob_ids),
)) => {
let chain_id = certificate.value().chain_id();
let values = self
.find_missing_application_bytecodes(chain_id, locations, node, name)
.find_missing_application_bytecodes(locations, node)
.await;
let blobs = self.find_missing_blobs(blob_ids, node).await;
if values.len() != locations.len() || blobs.len() != blob_ids.len() {
Expand Down Expand Up @@ -395,16 +392,16 @@ where
/// Does not fail if a hashed certificate value can't be downloaded; it just gets omitted from the result.
pub async fn read_or_download_hashed_certificate_values<A>(
&mut self,
validators: Vec<(ValidatorName, A)>,
hashed_certificate_value_locations: impl IntoIterator<Item = (BytecodeLocation, ChainId)>,
validators: Vec<A>,
hashed_certificate_value_locations: impl IntoIterator<Item = BytecodeLocation>,
) -> Result<Vec<HashedCertificateValue>, LocalNodeError>
where
A: LocalValidatorNode + Clone + 'static,
{
let mut values = vec![];
let mut tasks = vec![];
let mut node = self.node.lock().await;
for (location, chain_id) in hashed_certificate_value_locations {
for location in hashed_certificate_value_locations {
if let Some(value) = node
.state
.recent_hashed_certificate_value(&location.certificate_hash)
Expand All @@ -415,7 +412,7 @@ where
let validators = validators.clone();
let storage = node.state.storage_client().clone();
tasks.push(Self::read_or_download_hashed_certificate_value(
storage, validators, chain_id, location,
storage, validators, location,
));
}
}
Expand All @@ -438,8 +435,7 @@ where

pub async fn read_or_download_hashed_certificate_value<A>(
storage: S,
validators: Vec<(ValidatorName, A)>,
chain_id: ChainId,
validators: Vec<A>,
location: BytecodeLocation,
) -> Result<Option<HashedCertificateValue>, LocalNodeError>
where
Expand All @@ -453,7 +449,7 @@ where
Err(ViewError::NotFound(..)) => {}
Err(err) => Err(err)?,
}
match Self::download_hashed_certificate_value(validators, chain_id, location).await {
match Self::download_hashed_certificate_value(validators, location).await? {
Some(hashed_certificate_value) => {
storage
.write_hashed_certificate_value(&hashed_certificate_value)
Expand Down Expand Up @@ -506,7 +502,7 @@ where
break;
};
let Some(info) = self
.try_process_certificates(name, &mut node, chain_id, certificates)
.try_process_certificates(&mut node, chain_id, certificates)
.await
else {
break;
Expand Down Expand Up @@ -603,12 +599,7 @@ where
};
if !info.requested_sent_certificates.is_empty()
&& self
.try_process_certificates(
name,
&mut node,
chain_id,
info.requested_sent_certificates,
)
.try_process_certificates(&mut node, chain_id, info.requested_sent_certificates)
.await
.is_none()
{
Expand All @@ -634,25 +625,22 @@ where
}

pub async fn download_hashed_certificate_value<A>(
mut validators: Vec<(ValidatorName, A)>,
chain_id: ChainId,
mut validators: Vec<A>,
location: BytecodeLocation,
) -> Option<HashedCertificateValue>
) -> Result<Option<HashedCertificateValue>, LocalNodeError>
where
A: LocalValidatorNode + Clone + 'static,
{
// Sequentially try each validator in random order.
validators.shuffle(&mut rand::thread_rng());
for (name, mut node) in validators {
if let Some(value) = Self::try_download_hashed_certificate_value_from(
name, &mut node, chain_id, location,
)
.await
for mut node in validators {
if let Some(value) =
Self::try_download_hashed_certificate_value_from(&mut node, location).await?
{
return Some(value);
return Ok(Some(value));
}
}
None
Ok(None)
}

pub async fn try_download_blob<A>(
Expand Down Expand Up @@ -691,21 +679,25 @@ where
}

async fn try_download_hashed_certificate_value_from<A>(
name: ValidatorName,
node: &mut A,
chain_id: ChainId,
location: BytecodeLocation,
) -> Option<HashedCertificateValue>
) -> Result<Option<HashedCertificateValue>, LocalNodeError>
where
A: LocalValidatorNode + Clone + 'static,
{
let query =
ChainInfoQuery::new(chain_id).with_hashed_certificate_value(location.certificate_hash);
if let Ok(response) = node.handle_chain_info_query(query).await {
if response.check(name).is_ok() {
return response.info.requested_hashed_certificate_value;
}
if let Ok(certificate) = node
.download_certificate_value(location.certificate_hash)
.await
{
let hashed_certificate_value: HashedCertificateValue = certificate.into();
ensure!(
hashed_certificate_value.hash() == location.certificate_hash,
LocalNodeError::CertificateValueHashMismatch {
hash: location.certificate_hash
}
);
return Ok(Some(hashed_certificate_value));
}
None
Ok(None)
}
}
Loading

0 comments on commit 22df812

Please sign in to comment.