Skip to content

Commit

Permalink
Replace certificate chain info query usage with certificate download
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-ds committed May 30, 2024
1 parent 715ccd6 commit ab42c36
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 119 deletions.
4 changes: 0 additions & 4 deletions linera-core/src/chain_worker/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,10 +709,6 @@ where
let start = usize::try_from(start).map_err(|_| ArithmeticError::Overflow)?;
info.requested_received_log = self.chain.received_log.read(start..).await?;
}
if let Some(hash) = query.request_hashed_certificate_value {
info.requested_hashed_certificate_value =
Some(self.storage.read_hashed_certificate_value(hash).await?);
}
if query.request_manager_values {
info.manager.add_values(self.chain.manager.get());
}
Expand Down
51 changes: 21 additions & 30 deletions linera-core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -729,15 +729,10 @@ where
async fn find_missing_application_bytecodes(
&self,
locations: &[BytecodeLocation],
nodes: &[(ValidatorName, <P as LocalValidatorNodeProvider>::Node)],
chain_id: ChainId,
nodes: &[<P as LocalValidatorNodeProvider>::Node],
) -> Vec<HashedCertificateValue> {
future::join_all(locations.iter().map(|location| {
LocalNodeClient::<S>::download_hashed_certificate_value(
nodes.to_owned(),
chain_id,
*location,
)
LocalNodeClient::<S>::download_hashed_certificate_value(nodes.to_owned(), *location)
}))
.await
.into_iter()
Expand Down Expand Up @@ -795,28 +790,25 @@ where
.process_certificate(certificate.clone(), vec![], vec![])
.await
{
let nodes = nodes
.into_iter()
.map(|(_name, node)| node)
.collect::<Vec<_>>();

match &err {
LocalNodeError::WorkerError(WorkerError::ApplicationBytecodesNotFound(
locations,
)) => {
let values = self
.find_missing_application_bytecodes(locations, &nodes, block.chain_id)
.find_missing_application_bytecodes(locations, &nodes)
.await;

ensure!(values.len() == locations.len(), err);
self.process_certificate(certificate.clone(), values.clone(), vec![])
.await?;
}
LocalNodeError::WorkerError(WorkerError::BlobsNotFound(blob_ids)) => {
let blobs = self
.find_missing_blobs(
blob_ids,
&nodes
.into_iter()
.map(|(_name, node)| node)
.collect::<Vec<_>>(),
)
.await;
let blobs = self.find_missing_blobs(blob_ids, &nodes).await;

ensure!(blobs.len() == blob_ids.len(), err);
self.process_certificate(certificate.clone(), vec![], blobs)
Expand All @@ -827,17 +819,9 @@ where
blob_ids,
)) => {
let values = self
.find_missing_application_bytecodes(locations, &nodes, block.chain_id)
.await;
let blobs = self
.find_missing_blobs(
blob_ids,
&nodes
.into_iter()
.map(|(_name, node)| node)
.collect::<Vec<_>>(),
)
.find_missing_application_bytecodes(locations, &nodes)
.await;
let blobs = self.find_missing_blobs(blob_ids, &nodes).await;

ensure!(
blobs.len() == blob_ids.len() && values.len() == locations.len(),
Expand Down Expand Up @@ -1254,11 +1238,18 @@ where
};
// Collect the hashed certificate values required for execution.
let committee = self.local_committee().await?;
let nodes: Vec<(ValidatorName, P::Node)> =
self.validator_node_provider.make_nodes(&committee)?;
let nodes = self
.validator_node_provider
.make_nodes::<Vec<_>>(&committee)?
.into_iter()
.map(|(_name, node)| node)
.collect();
let values = self
.node_client
.read_or_download_hashed_certificate_values(nodes.clone(), block.bytecode_locations())
.read_or_download_hashed_certificate_values(
nodes,
block.bytecode_locations().into_keys(),
)
.await?;
let hashed_blobs = self.read_local_blobs(block.blob_ids()).await?;
// Create the final block proposal.
Expand Down
15 changes: 1 addition & 14 deletions linera-core/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ use linera_base::{
identifiers::{ChainDescription, ChainId, Owner},
};
use linera_chain::{
data_types::{
Certificate, ChainAndHeight, HashedCertificateValue, IncomingMessage, Medium, MessageBundle,
},
data_types::{Certificate, ChainAndHeight, IncomingMessage, Medium, MessageBundle},
manager::ChainManagerInfo,
ChainStateView,
};
Expand Down Expand Up @@ -68,8 +66,6 @@ pub struct ChainInfoQuery {
pub request_leader_timeout: bool,
/// Include a vote to switch to fallback mode, if appropriate.
pub request_fallback: bool,
/// Query a certificate value that contains a binary blob (e.g. bytecode) required by this chain.
pub request_hashed_certificate_value: Option<CryptoHash>,
}

impl ChainInfoQuery {
Expand All @@ -85,7 +81,6 @@ impl ChainInfoQuery {
request_manager_values: false,
request_leader_timeout: false,
request_fallback: false,
request_hashed_certificate_value: None,
}
}

Expand Down Expand Up @@ -133,11 +128,6 @@ impl ChainInfoQuery {
self.request_fallback = true;
self
}

pub fn with_hashed_certificate_value(mut self, hash: CryptoHash) -> Self {
self.request_hashed_certificate_value = Some(hash);
self
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -173,8 +163,6 @@ pub struct ChainInfo {
pub count_received_log: usize,
/// The response to `request_received_certificates_excluding_first_nth`
pub requested_received_log: Vec<ChainAndHeight>,
/// The requested hashed certificate value, if any.
pub requested_hashed_certificate_value: Option<HashedCertificateValue>,
}

/// The response to an `ChainInfoQuery`
Expand Down Expand Up @@ -253,7 +241,6 @@ where
requested_sent_certificates: Vec::new(),
count_received_log: view.received_log.count(),
requested_received_log: Vec::new(),
requested_hashed_certificate_value: None,
}
}
}
Expand Down
73 changes: 24 additions & 49 deletions linera-core/src/local_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,23 +185,20 @@ where

async fn find_missing_application_bytecodes<A>(
&self,
chain_id: ChainId,
locations: &[BytecodeLocation],
node: &mut A,
name: ValidatorName,
) -> Vec<HashedCertificateValue>
where
A: LocalValidatorNode + Clone + 'static,
{
future::join_all(locations.iter().map(|location| {
let mut node = node.clone();
async move {
Self::try_download_hashed_certificate_value_from(
name, &mut node, chain_id, *location,
)
.await
}
}))
future::join_all(
locations.iter().map(|location| {
let mut node = node.clone();
async move {
Self::try_download_hashed_certificate_value_from(&mut node, *location).await
}
}),
)
.await
.into_iter()
.flatten()
Expand All @@ -224,7 +221,6 @@ where

async fn try_process_certificates<A>(
&mut self,
name: ValidatorName,
node: &mut A,
chain_id: ChainId,
certificates: Vec<Certificate>,
Expand All @@ -249,12 +245,7 @@ where
locations,
))) => {
let values = self
.find_missing_application_bytecodes(
certificate.value().chain_id(),
locations,
node,
name,
)
.find_missing_application_bytecodes(locations, node)
.await;

if values.len() != locations.len() {
Expand All @@ -275,9 +266,8 @@ where
Err(LocalNodeError::WorkerError(
WorkerError::ApplicationBytecodesAndBlobsNotFound(locations, blob_ids),
)) => {
let chain_id = certificate.value().chain_id();
let values = self
.find_missing_application_bytecodes(chain_id, locations, node, name)
.find_missing_application_bytecodes(locations, node)
.await;
let blobs = self.find_missing_blobs(blob_ids, node).await;

Expand Down Expand Up @@ -392,16 +382,16 @@ where
/// Does not fail if a hashed certificate value can't be downloaded; it just gets omitted from the result.
pub async fn read_or_download_hashed_certificate_values<A>(
&mut self,
validators: Vec<(ValidatorName, A)>,
hashed_certificate_value_locations: impl IntoIterator<Item = (BytecodeLocation, ChainId)>,
validators: Vec<A>,
hashed_certificate_value_locations: impl IntoIterator<Item = BytecodeLocation>,
) -> Result<Vec<HashedCertificateValue>, LocalNodeError>
where
A: LocalValidatorNode + Clone + 'static,
{
let mut values = vec![];
let mut tasks = vec![];
let mut node = self.node.lock().await;
for (location, chain_id) in hashed_certificate_value_locations {
for location in hashed_certificate_value_locations {
if let Some(value) = node
.state
.recent_hashed_certificate_value(&location.certificate_hash)
Expand All @@ -412,7 +402,7 @@ where
let validators = validators.clone();
let storage = node.state.storage_client().clone();
tasks.push(Self::read_or_download_hashed_certificate_value(
storage, validators, chain_id, location,
storage, validators, location,
));
}
}
Expand All @@ -435,8 +425,7 @@ where

pub async fn read_or_download_hashed_certificate_value<A>(
storage: S,
validators: Vec<(ValidatorName, A)>,
chain_id: ChainId,
validators: Vec<A>,
location: BytecodeLocation,
) -> Result<Option<HashedCertificateValue>, LocalNodeError>
where
Expand All @@ -450,7 +439,7 @@ where
Err(ViewError::NotFound(..)) => {}
Err(err) => Err(err)?,
}
match Self::download_hashed_certificate_value(validators, chain_id, location).await {
match Self::download_hashed_certificate_value(validators, location).await {
Some(hashed_certificate_value) => {
storage
.write_hashed_certificate_value(&hashed_certificate_value)
Expand Down Expand Up @@ -503,7 +492,7 @@ where
break;
};
let Some(info) = self
.try_process_certificates(name, &mut node, chain_id, certificates)
.try_process_certificates(&mut node, chain_id, certificates)
.await
else {
break;
Expand Down Expand Up @@ -600,12 +589,7 @@ where
};
if !info.requested_sent_certificates.is_empty()
&& self
.try_process_certificates(
name,
&mut node,
chain_id,
info.requested_sent_certificates,
)
.try_process_certificates(&mut node, chain_id, info.requested_sent_certificates)
.await
.is_none()
{
Expand All @@ -631,20 +615,17 @@ where
}

pub async fn download_hashed_certificate_value<A>(
mut validators: Vec<(ValidatorName, A)>,
chain_id: ChainId,
mut validators: Vec<A>,
location: BytecodeLocation,
) -> Option<HashedCertificateValue>
where
A: LocalValidatorNode + Clone + 'static,
{
// Sequentially try each validator in random order.
validators.shuffle(&mut rand::thread_rng());
for (name, mut node) in validators {
if let Some(value) = Self::try_download_hashed_certificate_value_from(
name, &mut node, chain_id, location,
)
.await
for mut node in validators {
if let Some(value) =
Self::try_download_hashed_certificate_value_from(&mut node, location).await
{
return Some(value);
}
Expand Down Expand Up @@ -677,20 +658,14 @@ where
}

async fn try_download_hashed_certificate_value_from<A>(
name: ValidatorName,
node: &mut A,
chain_id: ChainId,
location: BytecodeLocation,
) -> Option<HashedCertificateValue>
where
A: LocalValidatorNode + Clone + 'static,
{
let query =
ChainInfoQuery::new(chain_id).with_hashed_certificate_value(location.certificate_hash);
if let Ok(response) = node.handle_chain_info_query(query).await {
if response.check(name).is_ok() {
return response.info.requested_hashed_certificate_value;
}
if let Ok(certificate) = node.download_certificate(location.certificate_hash).await {
return Some(certificate.into());
}
None
}
Expand Down
21 changes: 21 additions & 0 deletions linera-core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,27 @@ pub trait LocalValidatorNodeProvider {

fn make_node(&self, address: &str) -> Result<Self::Node, NodeError>;

// fn make_nodes<I>(&self, committee: &Committee) -> Result<I, NodeError>
// where
// I: FromIterator<Self::Node>,
// {
// self.make_nodes_from_list(committee.validator_addresses())
// }

// fn make_nodes_from_list<I, A>(
// &self,
// validators: impl IntoIterator<Item = (ValidatorName, A)>,
// ) -> Result<I, NodeError>
// where
// I: FromIterator<Self::Node>,
// A: AsRef<str>,
// {
// validators
// .into_iter()
// .map(|(_name, address)| self.make_node(address.as_ref()))
// .collect()
// }

fn make_nodes<I>(&self, committee: &Committee) -> Result<I, NodeError>
where
I: FromIterator<(ValidatorName, Self::Node)>,
Expand Down
3 changes: 0 additions & 3 deletions linera-rpc/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,6 @@ message ChainInfoQuery {
// Request a signed vote for a leader timeout.
bool request_leader_timeout = 8;

// Query a value that contains a binary hashed certificate value (e.g. bytecode) required by this chain.
optional bytes request_hashed_certificate_value = 9;

// Query the balance of a given owner.
optional Owner request_owner_balance = 10;

Expand Down
Loading

0 comments on commit ab42c36

Please sign in to comment.