Skip to content

Commit

Permalink
Read confirmed certs (#2867)
Browse files Browse the repository at this point in the history
* RM unused write_hashed_certificate_value method

* Return Confirmed certificates when reading from storage.

* RM Certificate::has_message method

* Fix conditioned import

* RM contains_hashed_certificate_value methods

* Don't panic, return error, when remote node returns invalid cert.

* cargo clippy again
  • Loading branch information
deuszx authored Nov 12, 2024
1 parent ff96202 commit 65eb2f7
Show file tree
Hide file tree
Showing 18 changed files with 112 additions and 137 deletions.
29 changes: 26 additions & 3 deletions linera-chain/src/certificate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use std::{
use linera_base::{
crypto::{CryptoHash, Signature},
data_types::Round,
identifiers::BlobId,
identifiers::{BlobId, ChainId, MessageId},
};
use linera_execution::committee::{Committee, ValidatorName};
use linera_execution::committee::{Committee, Epoch, ValidatorName};
use serde::{
ser::{Serialize, SerializeStruct, Serializer},
Deserialize, Deserializer,
Expand All @@ -22,7 +22,7 @@ use crate::{
block::{ConfirmedBlock, Timeout, ValidatedBlock},
data_types::{
Certificate, CertificateValue, ExecutedBlock, HashedCertificateValue, LiteCertificate,
LiteValue,
LiteValue, Medium, MessageBundle,
},
ChainError,
};
Expand Down Expand Up @@ -377,6 +377,29 @@ impl ConfirmedBlockCertificate {
pub fn executed_block(&self) -> &ExecutedBlock {
self.inner().inner()
}

/// Returns whether this value contains the message with the specified ID.
pub fn has_message(&self, message_id: &MessageId) -> bool {
self.executed_block().message_by_id(message_id).is_some()
}

/// Returns the bundles of messages sent via the given medium to the specified
/// recipient. Messages originating from different transactions of the original block
/// are kept in separate bundles. If the medium is a channel, does not verify that the
/// recipient is actually subscribed to that channel.
pub fn message_bundles_for<'a>(
&'a self,
medium: &'a Medium,
recipient: ChainId,
) -> impl Iterator<Item = (Epoch, MessageBundle)> + 'a {
let certificate_hash = self.hash();
self.executed_block()
.message_bundles_for(medium, recipient, certificate_hash)
}

pub fn requires_blob(&self, blob_id: &BlobId) -> bool {
self.executed_block().requires_blob(blob_id)
}
}

impl TryFrom<Certificate> for ConfirmedBlockCertificate {
Expand Down
8 changes: 0 additions & 8 deletions linera-chain/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,14 +742,6 @@ impl CertificateValue {
HashedCertificateValue::unchecked_new(self, hash)
}

/// Returns whether this value contains the message with the specified ID.
pub fn has_message(&self, message_id: &MessageId) -> bool {
let Some(executed_block) = self.executed_block() else {
return false;
};
executed_block.message_by_id(message_id).is_some()
}

pub fn is_confirmed(&self) -> bool {
matches!(self, CertificateValue::ConfirmedBlock { .. })
}
Expand Down
3 changes: 1 addition & 2 deletions linera-core/benches/client_benchmarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use linera_core::{
use linera_execution::system::Recipient;
use linera_storage::{
READ_CERTIFICATE_COUNTER, READ_HASHED_CERTIFICATE_VALUE_COUNTER, WRITE_CERTIFICATE_COUNTER,
WRITE_HASHED_CERTIFICATE_VALUE_COUNTER,
};
use linera_views::metrics::{LOAD_VIEW_COUNTER, SAVE_VIEW_COUNTER};
use prometheus::core::Collector;
Expand Down Expand Up @@ -118,7 +117,7 @@ criterion_group!(
config = Criterion::default()
.measurement_time(Duration::from_secs(40))
.with_measurement(BenchRecorderMeasurement::new(vec![
READ_HASHED_CERTIFICATE_VALUE_COUNTER.desc()[0].fq_name.as_str(), WRITE_HASHED_CERTIFICATE_VALUE_COUNTER.desc()[0].fq_name.as_str(),
READ_HASHED_CERTIFICATE_VALUE_COUNTER.desc()[0].fq_name.as_str(),
READ_CERTIFICATE_COUNTER.desc()[0].fq_name.as_str(), WRITE_CERTIFICATE_COUNTER.desc()[0].fq_name.as_str(),
LOAD_VIEW_COUNTER.desc()[0].fq_name.as_str(), SAVE_VIEW_COUNTER.desc()[0].fq_name.as_str(),
]));
Expand Down
3 changes: 1 addition & 2 deletions linera-core/src/chain_worker/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ where
#[cfg(with_testing)]
ReadCertificate {
height: BlockHeight,
callback:
oneshot::Sender<Result<Option<linera_chain::data_types::Certificate>, WorkerError>>,
callback: oneshot::Sender<Result<Option<ConfirmedBlockCertificate>, WorkerError>>,
},

/// Search for a bundle in one of the chain's inboxes.
Expand Down
2 changes: 1 addition & 1 deletion linera-core/src/chain_worker/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ where
pub(super) async fn read_certificate(
&mut self,
height: BlockHeight,
) -> Result<Option<linera_chain::data_types::Certificate>, WorkerError> {
) -> Result<Option<ConfirmedBlockCertificate>, WorkerError> {
ChainWorkerStateWithTemporaryChanges::new(self)
.await
.read_certificate(height)
Expand Down
7 changes: 5 additions & 2 deletions linera-core/src/chain_worker/state/temporary_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ use linera_views::views::View;
#[cfg(with_testing)]
use {
linera_base::{crypto::CryptoHash, data_types::BlockHeight},
linera_chain::data_types::{Certificate, MessageBundle, Origin},
linera_chain::{
data_types::{MessageBundle, Origin},
types::ConfirmedBlockCertificate,
},
};

use super::{check_block_epoch, ChainWorkerState};
Expand Down Expand Up @@ -57,7 +60,7 @@ where
pub(super) async fn read_certificate(
&mut self,
height: BlockHeight,
) -> Result<Option<Certificate>, WorkerError> {
) -> Result<Option<ConfirmedBlockCertificate>, WorkerError> {
self.0.ensure_is_active()?;
let certificate_hash = match self.0.chain.confirmed_log.get(height.try_into()?).await? {
Some(hash) => hash,
Expand Down
20 changes: 14 additions & 6 deletions linera-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,25 +365,27 @@ where
&self,
remote_node: &RemoteNode<impl ValidatorNode>,
chain_id: ChainId,
certificates: Vec<Certificate>,
certificates: Vec<ConfirmedBlockCertificate>,
) -> Option<Box<ChainInfo>> {
let mut info = None;
for certificate in certificates {
let hash = certificate.hash();
if !certificate.value().is_confirmed() || certificate.value().chain_id() != chain_id {
if certificate.executed_block().block.chain_id != chain_id {
// The certificate is not as expected. Give up.
warn!("Failed to process network certificate {}", hash);
return info;
}
let mut result = self.handle_certificate(certificate.clone(), vec![]).await;
let mut result = self
.handle_certificate(certificate.clone().into(), vec![])
.await;

if let Some(blob_ids) = result
.as_ref()
.err()
.and_then(LocalNodeError::get_blobs_not_found)
{
if let Some(blobs) = remote_node.try_download_blobs(&blob_ids).await {
result = self.handle_certificate(certificate, blobs).await;
result = self.handle_certificate(certificate.into(), blobs).await;
}
}

Expand Down Expand Up @@ -1698,10 +1700,16 @@ where
.with_manager_values();
let info = remote_node.handle_chain_info_query(query).await?;

let certificates = remote_node
let certificates: Vec<ConfirmedBlockCertificate> = remote_node
.node
.download_certificates(info.requested_sent_certificate_hashes)
.await?;
.await?
.into_iter()
.map(|c| {
ConfirmedBlockCertificate::try_from(c)
.map_err(|_| NodeError::InvalidChainInfoResponse)
})
.collect::<Result<_, _>>()?;

if !certificates.is_empty()
&& self
Expand Down
5 changes: 3 additions & 2 deletions linera-core/src/local_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use linera_chain::{
data_types::{
Block, BlockProposal, Certificate, CertificateValue, ExecutedBlock, LiteCertificate,
},
types::ConfirmedBlockCertificate,
ChainError, ChainStateView,
};
use linera_execution::{ExecutionError, Query, Response, SystemExecutionError};
Expand Down Expand Up @@ -304,7 +305,7 @@ where
pub async fn certificate_for(
&self,
message_id: &MessageId,
) -> Result<Certificate, LocalNodeError> {
) -> Result<ConfirmedBlockCertificate, LocalNodeError> {
let query = ChainInfoQuery::new(message_id.chain_id)
.with_sent_certificate_hashes_in_range(BlockHeightRange::single(message_id.height));
let info = self.handle_chain_info_query(query).await?.info;
Expand All @@ -314,7 +315,7 @@ where
.await?;
let certificate = certificates
.into_iter()
.find(|certificate| certificate.value().has_message(message_id))
.find(|certificate| certificate.has_message(message_id))
.ok_or_else(|| {
ViewError::not_found("could not find certificate with message {}", message_id)
})?;
Expand Down
10 changes: 8 additions & 2 deletions linera-core/src/remote_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl<N: ValidatorNode> RemoteNode<N> {
chain_id: ChainId,
start: BlockHeight,
limit: u64,
) -> Result<Option<Vec<Certificate>>, NodeError> {
) -> Result<Option<Vec<ConfirmedBlockCertificate>>, NodeError> {
tracing::debug!(name = ?self.name, ?chain_id, ?start, ?limit, "Querying certificates");
let range = BlockHeightRange {
start,
Expand All @@ -123,7 +123,13 @@ impl<N: ValidatorNode> RemoteNode<N> {
let certificates = self
.node
.download_certificates(info.requested_sent_certificate_hashes)
.await?;
.await?
.into_iter()
.map(|c| {
ConfirmedBlockCertificate::try_from(c)
.map_err(|_| NodeError::InvalidChainInfoResponse)
})
.collect::<Result<_, _>>()?;
Ok(Some(certificates))
} else {
Ok(None)
Expand Down
2 changes: 1 addition & 1 deletion linera-core/src/unit_tests/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ where
.certificate_for(&sub_message_id)
.await
.unwrap(),
certificate.clone().into()
certificate.clone()
);

assert_eq!(sender.next_block_height(), BlockHeight::from(1));
Expand Down
11 changes: 6 additions & 5 deletions linera-core/src/unit_tests/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ where
validator.do_download_certificate(hash, sender)
})
.await
.map(Into::into)
}

async fn download_certificates(
Expand All @@ -188,6 +189,7 @@ where
validator.do_download_certificates(hashes, sender)
})
.await
.map(|certs| certs.into_iter().map(Certificate::from).collect())
}

async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
Expand Down Expand Up @@ -468,15 +470,14 @@ where
async fn do_download_certificate(
self,
hash: CryptoHash,
sender: oneshot::Sender<Result<Certificate, NodeError>>,
) -> Result<(), Result<Certificate, NodeError>> {
sender: oneshot::Sender<Result<ConfirmedBlockCertificate, NodeError>>,
) -> Result<(), Result<ConfirmedBlockCertificate, NodeError>> {
let validator = self.client.lock().await;
let certificate = validator
.state
.storage_client()
.read_certificate(hash)
.await
.map(Into::into)
.map_err(Into::into);

sender.send(certificate)
Expand All @@ -485,8 +486,8 @@ where
async fn do_download_certificates(
self,
hashes: Vec<CryptoHash>,
sender: oneshot::Sender<Result<Vec<Certificate>, NodeError>>,
) -> Result<(), Result<Vec<Certificate>, NodeError>> {
sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
let validator = self.client.lock().await;
let certificates = validator
.state
Expand Down
7 changes: 4 additions & 3 deletions linera-core/src/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,9 @@ where
.await?;
let mut chain_heights = BTreeMap::new();
for certificate in certificates {
let block_chain_id = certificate.value().chain_id();
let block_height = certificate.value().height().try_add_one()?;
let block_chain_id = certificate.executed_block().block.chain_id;
let block_height =
certificate.executed_block().block.height.try_add_one()?;
chain_heights
.entry(block_chain_id)
.and_modify(|h| *h = block_height.max(*h))
Expand Down Expand Up @@ -347,7 +348,7 @@ where
let storage = self.local_node.storage_client();
let certs = storage.read_certificates(keys.into_iter()).await?;
for cert in certs {
self.send_certificate(cert, delivery).await?;
self.send_certificate(cert.into(), delivery).await?;
}
}
if let Some(cert) = manager.timeout {
Expand Down
2 changes: 1 addition & 1 deletion linera-core/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ where
&self,
chain_id: ChainId,
height: BlockHeight,
) -> Result<Option<Certificate>, WorkerError> {
) -> Result<Option<ConfirmedBlockCertificate>, WorkerError> {
self.query_chain_worker(chain_id, move |callback| {
ChainWorkerRequest::ReadCertificate { height, callback }
})
Expand Down
19 changes: 18 additions & 1 deletion linera-rpc/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use linera_base::{
data_types::BlobContent,
identifiers::{BlobId, ChainId},
};
use linera_chain::data_types::{BlockProposal, Certificate, CertificateValue, LiteVote};
use linera_chain::{
data_types::{BlockProposal, Certificate, CertificateValue, LiteVote},
types::ConfirmedBlockCertificate,
};
use linera_core::{
data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
node::NodeError,
Expand Down Expand Up @@ -291,8 +294,22 @@ impl From<Certificate> for RpcMessage {
}
}

impl From<ConfirmedBlockCertificate> for RpcMessage {
fn from(certificate: ConfirmedBlockCertificate) -> Self {
RpcMessage::DownloadCertificateResponse(Box::new(certificate.into()))
}
}

impl From<Vec<Certificate>> for RpcMessage {
fn from(certificates: Vec<Certificate>) -> Self {
RpcMessage::DownloadCertificatesResponse(Box::new(certificates))
}
}

impl From<Vec<ConfirmedBlockCertificate>> for RpcMessage {
fn from(certificates: Vec<ConfirmedBlockCertificate>) -> Self {
RpcMessage::DownloadCertificatesResponse(Box::new(
certificates.into_iter().map(|c| c.into()).collect(),
))
}
}
10 changes: 3 additions & 7 deletions linera-service/src/linera/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use linera_base::{
identifiers::{ChainDescription, ChainId, MessageId, Owner},
ownership::ChainOwnership,
};
#[cfg(feature = "benchmark")]
use linera_chain::data_types::CertificateValue;
use linera_client::{
chain_listener::ClientContext as _,
Expand Down Expand Up @@ -1190,12 +1191,7 @@ impl Job {
.certificate_for(&message_id)
.await
.context("could not find OpenChain message")?;
let CertificateValue::ConfirmedBlock { executed_block, .. } = certificate.value() else {
bail!(
"Unexpected certificate. Please make sure you are connecting to the right \
network and are using a current software version."
);
};
let executed_block = certificate.executed_block();
let Some(Message::System(SystemMessage::OpenChain(config))) = executed_block
.message_by_id(&message_id)
.map(|msg| &msg.message)
Expand Down Expand Up @@ -1257,7 +1253,7 @@ impl Job {
};
let certificate = storage.read_certificate(hash).await?;
let committee = committees
.get(&certificate.value().epoch())
.get(&certificate.executed_block().block.epoch)
.ok_or_else(|| anyhow!("tip of chain {chain_id} is outdated."))?;
certificate.check(committee)?;
}
Expand Down
Loading

0 comments on commit 65eb2f7

Please sign in to comment.