Skip to content

Commit

Permalink
Replace certificate chain info query usage with certificate download
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-ds committed Jun 17, 2024
1 parent c1b9285 commit d60e1f5
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 93 deletions.
4 changes: 0 additions & 4 deletions linera-core/src/chain_worker/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,10 +741,6 @@ where
let start = usize::try_from(start).map_err(|_| ArithmeticError::Overflow)?;
info.requested_received_log = self.chain.received_log.read(start..).await?;
}
if let Some(hash) = query.request_hashed_certificate_value {
info.requested_hashed_certificate_value =
Some(self.storage.read_hashed_certificate_value(hash).await?);
}
if query.request_manager_values {
info.manager.add_values(self.chain.manager.get());
}
Expand Down
22 changes: 11 additions & 11 deletions linera-core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,18 +736,14 @@ where
&self,
locations: &[BytecodeLocation],
nodes: &[(ValidatorName, <P as LocalValidatorNodeProvider>::Node)],
chain_id: ChainId,
) -> Vec<HashedCertificateValue> {
future::join_all(locations.iter().map(|location| {
LocalNodeClient::<S>::download_hashed_certificate_value(
nodes.to_owned(),
chain_id,
*location,
)
LocalNodeClient::<S>::download_hashed_certificate_value(nodes.to_owned(), *location)
}))
.await
.into_iter()
.flatten()
.flatten()
.collect::<Vec<_>>()
}

Expand Down Expand Up @@ -813,7 +809,7 @@ where
locations,
)) => {
let values = self
.find_missing_application_bytecodes(locations, &nodes, block.chain_id)
.find_missing_application_bytecodes(locations, &nodes)
.await;

ensure!(values.len() == locations.len(), err);
Expand All @@ -832,7 +828,7 @@ where
blob_ids,
)) => {
let values = self
.find_missing_application_bytecodes(locations, &nodes, block.chain_id)
.find_missing_application_bytecodes(locations, &nodes)
.await;
let blobs = self.find_missing_blobs(blob_ids, &nodes).await;

Expand Down Expand Up @@ -1264,11 +1260,15 @@ where
};
// Collect the hashed certificate values required for execution.
let committee = self.local_committee().await?;
let nodes: Vec<(ValidatorName, P::Node)> =
self.validator_node_provider.make_nodes(&committee)?;
let nodes = self
.validator_node_provider
.make_nodes::<Vec<_>>(&committee)?;
let values = self
.node_client
.read_or_download_hashed_certificate_values(nodes.clone(), block.bytecode_locations())
.read_or_download_hashed_certificate_values(
nodes,
block.bytecode_locations().into_keys(),
)
.await?;
let hashed_blobs = self.read_local_blobs(block.blob_ids()).await?;
// Create the final block proposal.
Expand Down
15 changes: 1 addition & 14 deletions linera-core/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ use linera_base::{
identifiers::{ChainDescription, ChainId, Owner},
};
use linera_chain::{
data_types::{
Certificate, ChainAndHeight, HashedCertificateValue, IncomingMessage, Medium, MessageBundle,
},
data_types::{Certificate, ChainAndHeight, IncomingMessage, Medium, MessageBundle},
manager::ChainManagerInfo,
ChainStateView,
};
Expand Down Expand Up @@ -68,8 +66,6 @@ pub struct ChainInfoQuery {
pub request_leader_timeout: bool,
/// Include a vote to switch to fallback mode, if appropriate.
pub request_fallback: bool,
/// Query a certificate value that contains a binary blob (e.g. bytecode) required by this chain.
pub request_hashed_certificate_value: Option<CryptoHash>,
}

impl ChainInfoQuery {
Expand All @@ -85,7 +81,6 @@ impl ChainInfoQuery {
request_manager_values: false,
request_leader_timeout: false,
request_fallback: false,
request_hashed_certificate_value: None,
}
}

Expand Down Expand Up @@ -133,11 +128,6 @@ impl ChainInfoQuery {
self.request_fallback = true;
self
}

pub fn with_hashed_certificate_value(mut self, hash: CryptoHash) -> Self {
self.request_hashed_certificate_value = Some(hash);
self
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -173,8 +163,6 @@ pub struct ChainInfo {
pub count_received_log: usize,
/// The response to `request_received_certificates_excluding_first_nth`
pub requested_received_log: Vec<ChainAndHeight>,
/// The requested hashed certificate value, if any.
pub requested_hashed_certificate_value: Option<HashedCertificateValue>,
}

/// The response to an `ChainInfoQuery`
Expand Down Expand Up @@ -253,7 +241,6 @@ where
requested_sent_certificates: Vec::new(),
count_received_log: view.received_log.count(),
requested_received_log: Vec::new(),
requested_hashed_certificate_value: None,
}
}
}
Expand Down
84 changes: 42 additions & 42 deletions linera-core/src/local_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ where

async fn find_missing_application_bytecodes<A>(
&self,
chain_id: ChainId,
locations: &[BytecodeLocation],
node: &mut A,
name: ValidatorName,
Expand All @@ -186,15 +185,13 @@ where
future::join_all(locations.iter().map(|location| {
let mut node = node.clone();
async move {
Self::try_download_hashed_certificate_value_from(
name, &mut node, chain_id, *location,
)
.await
Self::try_download_hashed_certificate_value_from(&mut node, name, *location).await
}
}))
.await
.into_iter()
.flatten()
.flatten()
.collect::<Vec<_>>()
}

Expand All @@ -218,7 +215,7 @@ where
}

async fn try_process_certificates<A>(
&self,
&mut self,
name: ValidatorName,
node: &mut A,
chain_id: ChainId,
Expand All @@ -245,12 +242,7 @@ where
locations,
))) => {
let values = self
.find_missing_application_bytecodes(
certificate.value().chain_id(),
locations,
node,
name,
)
.find_missing_application_bytecodes(locations, node, name)
.await;

if values.len() != locations.len() {
Expand All @@ -272,9 +264,8 @@ where
Err(LocalNodeError::WorkerError(
WorkerError::ApplicationBytecodesAndBlobsNotFound(locations, blob_ids),
)) => {
let chain_id = certificate.value().chain_id();
let values = self
.find_missing_application_bytecodes(chain_id, locations, node, name)
.find_missing_application_bytecodes(locations, node, name)
.await;
let blobs = self.find_missing_blobs(blob_ids, node, name).await;
if values.len() != locations.len() || blobs.len() != blob_ids.len() {
Expand Down Expand Up @@ -362,7 +353,7 @@ where
}

pub async fn download_certificates<A>(
&self,
&mut self,
mut validators: Vec<(ValidatorName, A)>,
chain_id: ChainId,
target_next_block_height: BlockHeight,
Expand Down Expand Up @@ -403,17 +394,17 @@ where
///
/// Does not fail if a hashed certificate value can't be downloaded; it just gets omitted from the result.
pub async fn read_or_download_hashed_certificate_values<A>(
&self,
&mut self,
validators: Vec<(ValidatorName, A)>,
hashed_certificate_value_locations: impl IntoIterator<Item = (BytecodeLocation, ChainId)>,
hashed_certificate_value_locations: impl IntoIterator<Item = BytecodeLocation>,
) -> Result<Vec<HashedCertificateValue>, LocalNodeError>
where
A: LocalValidatorNode + Clone + 'static,
{
let mut values = vec![];
let mut tasks = vec![];
let mut node = self.node.lock().await;
for (location, chain_id) in hashed_certificate_value_locations {
for location in hashed_certificate_value_locations {
if let Some(value) = node
.state
.recent_hashed_certificate_value(&location.certificate_hash)
Expand All @@ -424,7 +415,7 @@ where
let validators = validators.clone();
let storage = node.state.storage_client().clone();
tasks.push(Self::read_or_download_hashed_certificate_value(
storage, validators, chain_id, location,
storage, validators, location,
));
}
}
Expand All @@ -448,7 +439,6 @@ where
pub async fn read_or_download_hashed_certificate_value<A>(
storage: S,
validators: Vec<(ValidatorName, A)>,
chain_id: ChainId,
location: BytecodeLocation,
) -> Result<Option<HashedCertificateValue>, LocalNodeError>
where
Expand All @@ -462,7 +452,7 @@ where
Err(ViewError::NotFound(..)) => {}
Err(err) => Err(err)?,
}
match Self::download_hashed_certificate_value(validators, chain_id, location).await {
match Self::download_hashed_certificate_value(validators, location).await? {
Some(hashed_certificate_value) => {
storage
.write_hashed_certificate_value(&hashed_certificate_value)
Expand Down Expand Up @@ -492,7 +482,7 @@ where
}

async fn try_download_certificates_from<A>(
&self,
&mut self,
name: ValidatorName,
mut node: A,
chain_id: ChainId,
Expand Down Expand Up @@ -570,7 +560,7 @@ where
let mut futures = vec![];

for (name, node) in validators {
let client = self.clone();
let mut client = self.clone();
let mut notifications = vec![];
futures.push(async move {
(
Expand All @@ -594,7 +584,7 @@ where
}

pub async fn try_synchronize_chain_state_from<A>(
&self,
&mut self,
name: ValidatorName,
mut node: A,
chain_id: ChainId,
Expand Down Expand Up @@ -661,24 +651,21 @@ where

pub async fn download_hashed_certificate_value<A>(
mut validators: Vec<(ValidatorName, A)>,
chain_id: ChainId,
location: BytecodeLocation,
) -> Option<HashedCertificateValue>
) -> Result<Option<HashedCertificateValue>, LocalNodeError>
where
A: LocalValidatorNode + Clone + 'static,
{
// Sequentially try each validator in random order.
// Sequentially try each validator in random order, to improve efficiency.
validators.shuffle(&mut rand::thread_rng());
for (name, mut node) in validators {
if let Some(value) = Self::try_download_hashed_certificate_value_from(
name, &mut node, chain_id, location,
)
.await
if let Some(value) =
Self::try_download_hashed_certificate_value_from(&mut node, name, location).await?
{
return Some(value);
return Ok(Some(value));
}
}
None
Ok(None)
}

pub async fn download_blob<A>(
Expand Down Expand Up @@ -720,21 +707,34 @@ where
}

async fn try_download_hashed_certificate_value_from<A>(
name: ValidatorName,
node: &mut A,
chain_id: ChainId,
name: ValidatorName,
location: BytecodeLocation,
) -> Option<HashedCertificateValue>
) -> Result<Option<HashedCertificateValue>, LocalNodeError>
where
A: LocalValidatorNode + Clone + 'static,
{
let query =
ChainInfoQuery::new(chain_id).with_hashed_certificate_value(location.certificate_hash);
if let Ok(response) = node.handle_chain_info_query(query).await {
if response.check(name).is_ok() {
return response.info.requested_hashed_certificate_value;
match node
.download_certificate_value(location.certificate_hash)
.await
{
Ok(certificate) => {
let hashed_certificate_value: HashedCertificateValue = certificate.into();
if hashed_certificate_value.hash() == location.certificate_hash {
Ok(Some(hashed_certificate_value))
} else {
tracing::info!(
"Validator {name} sent an invalid certificate value {location:?}."
);
Ok(None)
}
}
Err(error) => {
tracing::debug!(
"Failed to fetch certificate value {location:?} from validator {name}: {error}"
);
Ok(None)
}
}
None
}
}
3 changes: 0 additions & 3 deletions linera-rpc/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,6 @@ message ChainInfoQuery {
// Request a signed vote for a leader timeout.
bool request_leader_timeout = 8;

// Query a value that contains a binary hashed certificate value (e.g. bytecode) required by this chain.
optional bytes request_hashed_certificate_value = 9;

// Query the balance of a given owner.
optional Owner request_owner_balance = 10;

Expand Down
Loading

0 comments on commit d60e1f5

Please sign in to comment.