Skip to content

Commit

Permalink
Replace certificate chain info query usage with certificate download
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-ds committed Jun 18, 2024
1 parent bbf8b45 commit 340b0a1
Show file tree
Hide file tree
Showing 10 changed files with 36 additions and 98 deletions.
18 changes: 5 additions & 13 deletions linera-chain/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@
// Copyright (c) Zefchain Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{
borrow::Cow,
collections::{HashMap, HashSet},
};
use std::{borrow::Cow, collections::HashSet};

use async_graphql::{Object, SimpleObject};
use linera_base::{
Expand Down Expand Up @@ -62,17 +59,12 @@ pub struct Block {
}

impl Block {
/// Returns all bytecode locations referred to in this block's incoming messages, with the
/// sender chain ID.
pub fn bytecode_locations(&self) -> HashMap<BytecodeLocation, ChainId> {
let mut locations = HashMap::new();
/// Returns all bytecode locations referred to in this block's incoming messages.
pub fn bytecode_locations(&self) -> HashSet<BytecodeLocation> {
let mut locations = HashSet::new();
for message in &self.incoming_messages {
if let Message::System(sys_message) = &message.event.message {
locations.extend(
sys_message
.bytecode_locations(message.event.certificate_hash)
.map(|location| (location, message.origin.sender)),
);
locations.extend(sys_message.bytecode_locations(message.event.certificate_hash));
}
}
locations
Expand Down
6 changes: 1 addition & 5 deletions linera-core/src/chain_worker/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,10 +741,6 @@ where
let start = usize::try_from(start).map_err(|_| ArithmeticError::Overflow)?;
info.requested_received_log = self.chain.received_log.read(start..).await?;
}
if let Some(hash) = query.request_hashed_certificate_value {
info.requested_hashed_certificate_value =
Some(self.storage.read_hashed_certificate_value(hash).await?);
}
if query.request_manager_values {
info.manager.add_values(self.chain.manager.get());
}
Expand Down Expand Up @@ -858,7 +854,7 @@ where
// Find all certificates containing bytecode used when executing this block.
let mut required_locations_left: HashMap<_, _> = block
.bytecode_locations()
.into_keys()
.into_iter()
.map(|bytecode_location| (bytecode_location.certificate_hash, bytecode_location))
.collect();
for value in hashed_certificate_values {
Expand Down
16 changes: 6 additions & 10 deletions linera-core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -767,14 +767,9 @@ where
&self,
locations: &[BytecodeLocation],
nodes: &[(ValidatorName, <P as LocalValidatorNodeProvider>::Node)],
chain_id: ChainId,
) -> Vec<HashedCertificateValue> {
future::join_all(locations.iter().map(|location| {
LocalNodeClient::<S>::download_hashed_certificate_value(
nodes.to_owned(),
chain_id,
*location,
)
LocalNodeClient::<S>::download_hashed_certificate_value(nodes.to_owned(), *location)
}))
.await
.into_iter()
Expand Down Expand Up @@ -845,7 +840,7 @@ where
locations,
)) => {
let values = self
.find_missing_application_bytecodes(locations, &nodes, block.chain_id)
.find_missing_application_bytecodes(locations, &nodes)
.await;

ensure!(values.len() == locations.len(), err);
Expand All @@ -864,7 +859,7 @@ where
blob_ids,
)) => {
let values = self
.find_missing_application_bytecodes(locations, &nodes, block.chain_id)
.find_missing_application_bytecodes(locations, &nodes)
.await;
let blobs = self.find_missing_blobs(blob_ids, &nodes).await;

Expand Down Expand Up @@ -1307,8 +1302,9 @@ where
};
// Collect the hashed certificate values required for execution.
let committee = self.local_committee().await?;
let nodes: Vec<(ValidatorName, P::Node)> =
self.validator_node_provider.make_nodes(&committee)?;
let nodes = self
.validator_node_provider
.make_nodes::<Vec<_>>(&committee)?;
let values = self
.client
.local_node
Expand Down
15 changes: 1 addition & 14 deletions linera-core/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ use linera_base::{
identifiers::{ChainDescription, ChainId, Owner},
};
use linera_chain::{
data_types::{
Certificate, ChainAndHeight, HashedCertificateValue, IncomingMessage, Medium, MessageBundle,
},
data_types::{Certificate, ChainAndHeight, IncomingMessage, Medium, MessageBundle},
manager::ChainManagerInfo,
ChainStateView,
};
Expand Down Expand Up @@ -68,8 +66,6 @@ pub struct ChainInfoQuery {
pub request_leader_timeout: bool,
/// Include a vote to switch to fallback mode, if appropriate.
pub request_fallback: bool,
/// Query a certificate value that contains a binary blob (e.g. bytecode) required by this chain.
pub request_hashed_certificate_value: Option<CryptoHash>,
}

impl ChainInfoQuery {
Expand All @@ -85,7 +81,6 @@ impl ChainInfoQuery {
request_manager_values: false,
request_leader_timeout: false,
request_fallback: false,
request_hashed_certificate_value: None,
}
}

Expand Down Expand Up @@ -133,11 +128,6 @@ impl ChainInfoQuery {
self.request_fallback = true;
self
}

pub fn with_hashed_certificate_value(mut self, hash: CryptoHash) -> Self {
self.request_hashed_certificate_value = Some(hash);
self
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -173,8 +163,6 @@ pub struct ChainInfo {
pub count_received_log: usize,
/// The response to `request_received_certificates_excluding_first_nth`
pub requested_received_log: Vec<ChainAndHeight>,
/// The requested hashed certificate value, if any.
pub requested_hashed_certificate_value: Option<HashedCertificateValue>,
}

/// The response to an `ChainInfoQuery`
Expand Down Expand Up @@ -253,7 +241,6 @@ where
requested_sent_certificates: Vec::new(),
count_received_log: view.received_log.count(),
requested_received_log: Vec::new(),
requested_hashed_certificate_value: None,
}
}
}
Expand Down
53 changes: 21 additions & 32 deletions linera-core/src/local_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ where

async fn find_missing_application_bytecodes<A>(
&self,
chain_id: ChainId,
locations: &[BytecodeLocation],
node: &mut A,
name: ValidatorName,
Expand All @@ -186,10 +185,7 @@ where
future::join_all(locations.iter().map(|location| {
let mut node = node.clone();
async move {
Self::try_download_hashed_certificate_value_from(
name, &mut node, chain_id, *location,
)
.await
Self::try_download_hashed_certificate_value_from(&mut node, name, *location).await
}
}))
.await
Expand Down Expand Up @@ -245,12 +241,7 @@ where
locations,
))) => {
let values = self
.find_missing_application_bytecodes(
certificate.value().chain_id(),
locations,
node,
name,
)
.find_missing_application_bytecodes(locations, node, name)
.await;

if values.len() != locations.len() {
Expand All @@ -272,9 +263,8 @@ where
Err(LocalNodeError::WorkerError(
WorkerError::ApplicationBytecodesAndBlobsNotFound(locations, blob_ids),
)) => {
let chain_id = certificate.value().chain_id();
let values = self
.find_missing_application_bytecodes(chain_id, locations, node, name)
.find_missing_application_bytecodes(locations, node, name)
.await;
let blobs = self.find_missing_blobs(blob_ids, node, name).await;
if values.len() != locations.len() || blobs.len() != blob_ids.len() {
Expand Down Expand Up @@ -405,15 +395,15 @@ where
pub async fn read_or_download_hashed_certificate_values<A>(
&self,
validators: Vec<(ValidatorName, A)>,
hashed_certificate_value_locations: impl IntoIterator<Item = (BytecodeLocation, ChainId)>,
hashed_certificate_value_locations: impl IntoIterator<Item = BytecodeLocation>,
) -> Result<Vec<HashedCertificateValue>, LocalNodeError>
where
A: LocalValidatorNode + Clone + 'static,
{
let mut values = vec![];
let mut tasks = vec![];
let mut node = self.node.lock().await;
for (location, chain_id) in hashed_certificate_value_locations {
for location in hashed_certificate_value_locations {
if let Some(value) = node
.state
.recent_hashed_certificate_value(&location.certificate_hash)
Expand All @@ -424,7 +414,7 @@ where
let validators = validators.clone();
let storage = node.state.storage_client().clone();
tasks.push(Self::read_or_download_hashed_certificate_value(
storage, validators, chain_id, location,
storage, validators, location,
));
}
}
Expand All @@ -448,7 +438,6 @@ where
pub async fn read_or_download_hashed_certificate_value<A>(
storage: S,
validators: Vec<(ValidatorName, A)>,
chain_id: ChainId,
location: BytecodeLocation,
) -> Result<Option<HashedCertificateValue>, LocalNodeError>
where
Expand All @@ -462,7 +451,7 @@ where
Err(ViewError::NotFound(..)) => {}
Err(err) => Err(err)?,
}
match Self::download_hashed_certificate_value(validators, chain_id, location).await {
match Self::download_hashed_certificate_value(validators, location).await {
Some(hashed_certificate_value) => {
storage
.write_hashed_certificate_value(&hashed_certificate_value)
Expand Down Expand Up @@ -661,19 +650,16 @@ where

pub async fn download_hashed_certificate_value<A>(
mut validators: Vec<(ValidatorName, A)>,
chain_id: ChainId,
location: BytecodeLocation,
) -> Option<HashedCertificateValue>
where
A: LocalValidatorNode + Clone + 'static,
{
// Sequentially try each validator in random order.
// Sequentially try each validator in random order, to improve efficiency.
validators.shuffle(&mut rand::thread_rng());
for (name, mut node) in validators {
if let Some(value) = Self::try_download_hashed_certificate_value_from(
name, &mut node, chain_id, location,
)
.await
if let Some(value) =
Self::try_download_hashed_certificate_value_from(&mut node, name, location).await
{
return Some(value);
}
Expand Down Expand Up @@ -720,21 +706,24 @@ where
}

async fn try_download_hashed_certificate_value_from<A>(
name: ValidatorName,
node: &mut A,
chain_id: ChainId,
name: ValidatorName,
location: BytecodeLocation,
) -> Option<HashedCertificateValue>
where
A: LocalValidatorNode + Clone + 'static,
{
let query =
ChainInfoQuery::new(chain_id).with_hashed_certificate_value(location.certificate_hash);
if let Ok(response) = node.handle_chain_info_query(query).await {
if response.check(name).is_ok() {
return response.info.requested_hashed_certificate_value;
match node
.download_certificate_value(location.certificate_hash)
.await
{
Ok(hashed_certificate_value) => Some(hashed_certificate_value),
Err(error) => {
tracing::debug!(
"Failed to fetch certificate value {location:?} from validator {name}: {error}"
);
None
}
}
None
}
}
1 change: 0 additions & 1 deletion linera-core/src/unit_tests/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,6 @@ where
.storage_client()
.read_hashed_certificate_value(hash)
.await
.map(Into::into)
.map_err(Into::into);
sender.send(certificate_value)
}
Expand Down
4 changes: 2 additions & 2 deletions linera-core/src/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,10 @@ where
| CertificateValue::ValidatedBlock { executed_block, .. } => {
executed_block.block.bytecode_locations()
}
CertificateValue::Timeout { .. } => HashMap::new(),
CertificateValue::Timeout { .. } => HashSet::new(),
};
for location in locations {
if !required.contains_key(location) {
if !required.contains(location) {
let hash = location.certificate_hash;
warn!("validator requested {:?} but it is not required", hash);
return Err(NodeError::InvalidChainInfoResponse);
Expand Down
3 changes: 0 additions & 3 deletions linera-rpc/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,6 @@ message ChainInfoQuery {
// Request a signed vote for a leader timeout.
bool request_leader_timeout = 8;

// Query a value that contains a binary hashed certificate value (e.g. bytecode) required by this chain.
optional bytes request_hashed_certificate_value = 9;

// Query the balance of a given owner.
optional Owner request_owner_balance = 10;

Expand Down
12 changes: 0 additions & 12 deletions linera-rpc/src/grpc/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,10 +356,6 @@ impl TryFrom<api::ChainInfoQuery> for ChainInfoQuery {
.request_sent_certificates_in_range
.map(|range| bincode::deserialize(&range))
.transpose()?;
let request_hashed_certificate_value = chain_info_query
.request_hashed_certificate_value
.map(|bytes| bincode::deserialize(&bytes))
.transpose()?;

Ok(Self {
request_committees: chain_info_query.request_committees,
Expand All @@ -376,7 +372,6 @@ impl TryFrom<api::ChainInfoQuery> for ChainInfoQuery {
request_manager_values: chain_info_query.request_manager_values,
request_leader_timeout: chain_info_query.request_leader_timeout,
request_fallback: chain_info_query.request_fallback,
request_hashed_certificate_value,
})
}
}
Expand All @@ -389,10 +384,6 @@ impl TryFrom<ChainInfoQuery> for api::ChainInfoQuery {
.request_sent_certificates_in_range
.map(|range| bincode::serialize(&range))
.transpose()?;
let request_hashed_certificate_value = chain_info_query
.request_hashed_certificate_value
.map(|hash| bincode::serialize(&hash))
.transpose()?;

Ok(Self {
chain_id: Some(chain_info_query.chain_id.into()),
Expand All @@ -406,7 +397,6 @@ impl TryFrom<ChainInfoQuery> for api::ChainInfoQuery {
request_manager_values: chain_info_query.request_manager_values,
request_leader_timeout: chain_info_query.request_leader_timeout,
request_fallback: chain_info_query.request_fallback,
request_hashed_certificate_value,
})
}
}
Expand Down Expand Up @@ -696,7 +686,6 @@ pub mod tests {
requested_sent_certificates: vec![],
count_received_log: 0,
requested_received_log: vec![],
requested_hashed_certificate_value: None,
});

let chain_info_response_none = ChainInfoResponse {
Expand Down Expand Up @@ -733,7 +722,6 @@ pub mod tests {
request_manager_values: false,
request_leader_timeout: false,
request_fallback: true,
request_hashed_certificate_value: None,
};
round_trip_check::<_, api::ChainInfoQuery>(chain_info_query_some);
}
Expand Down
Loading

0 comments on commit 340b0a1

Please sign in to comment.