Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace certificate chain info query usage with certificate download #2081

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 5 additions & 13 deletions linera-chain/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@
// Copyright (c) Zefchain Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{
borrow::Cow,
collections::{HashMap, HashSet},
};
use std::{borrow::Cow, collections::HashSet};

use async_graphql::{Object, SimpleObject};
use linera_base::{
Expand Down Expand Up @@ -62,17 +59,12 @@ pub struct Block {
}

impl Block {
/// Returns all bytecode locations referred to in this block's incoming messages, with the
/// sender chain ID.
pub fn bytecode_locations(&self) -> HashMap<BytecodeLocation, ChainId> {
let mut locations = HashMap::new();
/// Returns all bytecode locations referred to in this block's incoming messages.
pub fn bytecode_locations(&self) -> HashSet<BytecodeLocation> {
let mut locations = HashSet::new();
for message in &self.incoming_messages {
if let Message::System(sys_message) = &message.event.message {
locations.extend(
sys_message
.bytecode_locations(message.event.certificate_hash)
.map(|location| (location, message.origin.sender)),
);
locations.extend(sys_message.bytecode_locations(message.event.certificate_hash));
}
}
locations
Expand Down
6 changes: 1 addition & 5 deletions linera-core/src/chain_worker/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,10 +741,6 @@ where
let start = usize::try_from(start).map_err(|_| ArithmeticError::Overflow)?;
info.requested_received_log = self.chain.received_log.read(start..).await?;
}
if let Some(hash) = query.request_hashed_certificate_value {
info.requested_hashed_certificate_value =
Some(self.storage.read_hashed_certificate_value(hash).await?);
}
if query.request_manager_values {
info.manager.add_values(self.chain.manager.get());
}
Expand Down Expand Up @@ -858,7 +854,7 @@ where
// Find all certificates containing bytecode used when executing this block.
let mut required_locations_left: HashMap<_, _> = block
.bytecode_locations()
.into_keys()
.into_iter()
.map(|bytecode_location| (bytecode_location.certificate_hash, bytecode_location))
.collect();
for value in hashed_certificate_values {
Expand Down
16 changes: 6 additions & 10 deletions linera-core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -767,14 +767,9 @@ where
&self,
locations: &[BytecodeLocation],
nodes: &[(ValidatorName, <P as LocalValidatorNodeProvider>::Node)],
chain_id: ChainId,
) -> Vec<HashedCertificateValue> {
future::join_all(locations.iter().map(|location| {
LocalNodeClient::<S>::download_hashed_certificate_value(
nodes.to_owned(),
chain_id,
*location,
)
LocalNodeClient::<S>::download_hashed_certificate_value(nodes.to_owned(), *location)
}))
.await
.into_iter()
Expand Down Expand Up @@ -845,7 +840,7 @@ where
locations,
)) => {
let values = self
.find_missing_application_bytecodes(locations, &nodes, block.chain_id)
.find_missing_application_bytecodes(locations, &nodes)
.await;

ensure!(values.len() == locations.len(), err);
Expand All @@ -864,7 +859,7 @@ where
blob_ids,
)) => {
let values = self
.find_missing_application_bytecodes(locations, &nodes, block.chain_id)
.find_missing_application_bytecodes(locations, &nodes)
.await;
let blobs = self.find_missing_blobs(blob_ids, &nodes).await;

Expand Down Expand Up @@ -1307,8 +1302,9 @@ where
};
// Collect the hashed certificate values required for execution.
let committee = self.local_committee().await?;
let nodes: Vec<(ValidatorName, P::Node)> =
self.validator_node_provider.make_nodes(&committee)?;
let nodes = self
.validator_node_provider
.make_nodes::<Vec<_>>(&committee)?;
let values = self
.client
.local_node
Expand Down
15 changes: 1 addition & 14 deletions linera-core/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ use linera_base::{
identifiers::{ChainDescription, ChainId, Owner},
};
use linera_chain::{
data_types::{
Certificate, ChainAndHeight, HashedCertificateValue, IncomingMessage, Medium, MessageBundle,
},
data_types::{Certificate, ChainAndHeight, IncomingMessage, Medium, MessageBundle},
manager::ChainManagerInfo,
ChainStateView,
};
Expand Down Expand Up @@ -68,8 +66,6 @@ pub struct ChainInfoQuery {
pub request_leader_timeout: bool,
/// Include a vote to switch to fallback mode, if appropriate.
pub request_fallback: bool,
/// Query a certificate value that contains a binary blob (e.g. bytecode) required by this chain.
pub request_hashed_certificate_value: Option<CryptoHash>,
}

impl ChainInfoQuery {
Expand All @@ -85,7 +81,6 @@ impl ChainInfoQuery {
request_manager_values: false,
request_leader_timeout: false,
request_fallback: false,
request_hashed_certificate_value: None,
}
}

Expand Down Expand Up @@ -133,11 +128,6 @@ impl ChainInfoQuery {
self.request_fallback = true;
self
}

pub fn with_hashed_certificate_value(mut self, hash: CryptoHash) -> Self {
self.request_hashed_certificate_value = Some(hash);
self
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -173,8 +163,6 @@ pub struct ChainInfo {
pub count_received_log: usize,
/// The response to `request_received_certificates_excluding_first_nth`
pub requested_received_log: Vec<ChainAndHeight>,
/// The requested hashed certificate value, if any.
pub requested_hashed_certificate_value: Option<HashedCertificateValue>,
}

/// The response to an `ChainInfoQuery`
Expand Down Expand Up @@ -253,7 +241,6 @@ where
requested_sent_certificates: Vec::new(),
count_received_log: view.received_log.count(),
requested_received_log: Vec::new(),
requested_hashed_certificate_value: None,
}
}
}
Expand Down
53 changes: 21 additions & 32 deletions linera-core/src/local_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ where

async fn find_missing_application_bytecodes<A>(
&self,
chain_id: ChainId,
locations: &[BytecodeLocation],
node: &mut A,
name: ValidatorName,
Expand All @@ -186,10 +185,7 @@ where
future::join_all(locations.iter().map(|location| {
let mut node = node.clone();
async move {
Self::try_download_hashed_certificate_value_from(
name, &mut node, chain_id, *location,
)
.await
Self::try_download_hashed_certificate_value_from(&mut node, name, *location).await
}
}))
.await
Expand Down Expand Up @@ -245,12 +241,7 @@ where
locations,
))) => {
let values = self
.find_missing_application_bytecodes(
certificate.value().chain_id(),
locations,
node,
name,
)
.find_missing_application_bytecodes(locations, node, name)
.await;

if values.len() != locations.len() {
Expand All @@ -272,9 +263,8 @@ where
Err(LocalNodeError::WorkerError(
WorkerError::ApplicationBytecodesAndBlobsNotFound(locations, blob_ids),
)) => {
let chain_id = certificate.value().chain_id();
let values = self
.find_missing_application_bytecodes(chain_id, locations, node, name)
.find_missing_application_bytecodes(locations, node, name)
.await;
let blobs = self.find_missing_blobs(blob_ids, node, name).await;
if values.len() != locations.len() || blobs.len() != blob_ids.len() {
Expand Down Expand Up @@ -405,15 +395,15 @@ where
pub async fn read_or_download_hashed_certificate_values<A>(
&self,
validators: Vec<(ValidatorName, A)>,
hashed_certificate_value_locations: impl IntoIterator<Item = (BytecodeLocation, ChainId)>,
hashed_certificate_value_locations: impl IntoIterator<Item = BytecodeLocation>,
) -> Result<Vec<HashedCertificateValue>, LocalNodeError>
where
A: LocalValidatorNode + Clone + 'static,
{
let mut values = vec![];
let mut tasks = vec![];
let mut node = self.node.lock().await;
for (location, chain_id) in hashed_certificate_value_locations {
for location in hashed_certificate_value_locations {
if let Some(value) = node
.state
.recent_hashed_certificate_value(&location.certificate_hash)
Expand All @@ -424,7 +414,7 @@ where
let validators = validators.clone();
let storage = node.state.storage_client().clone();
tasks.push(Self::read_or_download_hashed_certificate_value(
storage, validators, chain_id, location,
storage, validators, location,
));
}
}
Expand All @@ -448,7 +438,6 @@ where
pub async fn read_or_download_hashed_certificate_value<A>(
storage: S,
validators: Vec<(ValidatorName, A)>,
chain_id: ChainId,
location: BytecodeLocation,
) -> Result<Option<HashedCertificateValue>, LocalNodeError>
where
Expand All @@ -462,7 +451,7 @@ where
Err(ViewError::NotFound(..)) => {}
Err(err) => Err(err)?,
}
match Self::download_hashed_certificate_value(validators, chain_id, location).await {
match Self::download_hashed_certificate_value(validators, location).await {
Some(hashed_certificate_value) => {
storage
.write_hashed_certificate_value(&hashed_certificate_value)
Expand Down Expand Up @@ -661,19 +650,16 @@ where

pub async fn download_hashed_certificate_value<A>(
mut validators: Vec<(ValidatorName, A)>,
chain_id: ChainId,
location: BytecodeLocation,
) -> Option<HashedCertificateValue>
where
A: LocalValidatorNode + Clone + 'static,
{
// Sequentially try each validator in random order.
// Sequentially try each validator in random order, to improve efficiency.
validators.shuffle(&mut rand::thread_rng());
ndr-ds marked this conversation as resolved.
Show resolved Hide resolved
for (name, mut node) in validators {
if let Some(value) = Self::try_download_hashed_certificate_value_from(
name, &mut node, chain_id, location,
)
.await
if let Some(value) =
Self::try_download_hashed_certificate_value_from(&mut node, name, location).await
{
return Some(value);
}
Expand Down Expand Up @@ -720,21 +706,24 @@ where
}

async fn try_download_hashed_certificate_value_from<A>(
name: ValidatorName,
node: &mut A,
chain_id: ChainId,
name: ValidatorName,
location: BytecodeLocation,
) -> Option<HashedCertificateValue>
where
A: LocalValidatorNode + Clone + 'static,
{
let query =
ChainInfoQuery::new(chain_id).with_hashed_certificate_value(location.certificate_hash);
if let Ok(response) = node.handle_chain_info_query(query).await {
if response.check(name).is_ok() {
return response.info.requested_hashed_certificate_value;
match node
.download_certificate_value(location.certificate_hash)
.await
{
Ok(hashed_certificate_value) => Some(hashed_certificate_value),
Err(error) => {
tracing::debug!(
"Failed to fetch certificate value {location:?} from validator {name}: {error}"
);
None
}
}
None
}
}
1 change: 0 additions & 1 deletion linera-core/src/unit_tests/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,6 @@ where
.storage_client()
.read_hashed_certificate_value(hash)
.await
.map(Into::into)
.map_err(Into::into);
sender.send(certificate_value)
}
Expand Down
4 changes: 2 additions & 2 deletions linera-core/src/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,10 @@ where
| CertificateValue::ValidatedBlock { executed_block, .. } => {
executed_block.block.bytecode_locations()
}
CertificateValue::Timeout { .. } => HashMap::new(),
CertificateValue::Timeout { .. } => HashSet::new(),
};
for location in locations {
if !required.contains_key(location) {
if !required.contains(location) {
let hash = location.certificate_hash;
warn!("validator requested {:?} but it is not required", hash);
return Err(NodeError::InvalidChainInfoResponse);
Expand Down
3 changes: 0 additions & 3 deletions linera-rpc/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,6 @@ message ChainInfoQuery {
// Request a signed vote for a leader timeout.
bool request_leader_timeout = 8;

// Query a value that contains a binary hashed certificate value (e.g. bytecode) required by this chain.
optional bytes request_hashed_certificate_value = 9;

// Query the balance of a given owner.
optional Owner request_owner_balance = 10;

Expand Down
12 changes: 0 additions & 12 deletions linera-rpc/src/grpc/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,10 +356,6 @@ impl TryFrom<api::ChainInfoQuery> for ChainInfoQuery {
.request_sent_certificates_in_range
.map(|range| bincode::deserialize(&range))
.transpose()?;
let request_hashed_certificate_value = chain_info_query
.request_hashed_certificate_value
.map(|bytes| bincode::deserialize(&bytes))
.transpose()?;

Ok(Self {
request_committees: chain_info_query.request_committees,
Expand All @@ -376,7 +372,6 @@ impl TryFrom<api::ChainInfoQuery> for ChainInfoQuery {
request_manager_values: chain_info_query.request_manager_values,
request_leader_timeout: chain_info_query.request_leader_timeout,
request_fallback: chain_info_query.request_fallback,
request_hashed_certificate_value,
})
}
}
Expand All @@ -389,10 +384,6 @@ impl TryFrom<ChainInfoQuery> for api::ChainInfoQuery {
.request_sent_certificates_in_range
.map(|range| bincode::serialize(&range))
.transpose()?;
let request_hashed_certificate_value = chain_info_query
.request_hashed_certificate_value
.map(|hash| bincode::serialize(&hash))
.transpose()?;

Ok(Self {
chain_id: Some(chain_info_query.chain_id.into()),
Expand All @@ -406,7 +397,6 @@ impl TryFrom<ChainInfoQuery> for api::ChainInfoQuery {
request_manager_values: chain_info_query.request_manager_values,
request_leader_timeout: chain_info_query.request_leader_timeout,
request_fallback: chain_info_query.request_fallback,
request_hashed_certificate_value,
})
}
}
Expand Down Expand Up @@ -696,7 +686,6 @@ pub mod tests {
requested_sent_certificates: vec![],
count_received_log: 0,
requested_received_log: vec![],
requested_hashed_certificate_value: None,
});

let chain_info_response_none = ChainInfoResponse {
Expand Down Expand Up @@ -733,7 +722,6 @@ pub mod tests {
request_manager_values: false,
request_leader_timeout: false,
request_fallback: true,
request_hashed_certificate_value: None,
};
round_trip_check::<_, api::ChainInfoQuery>(chain_info_query_some);
}
Expand Down
Loading
Loading