From 2491b28dadc43f19a1c5160c7b5864f873e8b183 Mon Sep 17 00:00:00 2001 From: Andre da Silva Date: Wed, 8 May 2024 07:09:24 -0700 Subject: [PATCH] PublishBlob --- CLI.md | 15 ++ linera-base/src/data_types.rs | 82 +++++++- linera-base/src/identifiers.rs | 31 +-- linera-chain/src/data_types.rs | 21 ++- linera-chain/src/manager.rs | 12 +- linera-chain/src/test.rs | 4 +- linera-core/src/client.rs | 113 +++++++++-- linera-core/src/data_types.rs | 17 +- linera-core/src/local_node.rs | 110 ++++++++++- linera-core/src/node.rs | 10 +- linera-core/src/unit_tests/test_utils.rs | 11 +- .../src/unit_tests/wasm_client_tests.rs | 25 ++- .../src/unit_tests/wasm_worker_tests.rs | 14 +- linera-core/src/unit_tests/worker_tests.rs | 106 ++++++----- linera-core/src/updater.rs | 48 ++++- linera-core/src/worker.rs | 178 +++++++++++++++--- linera-execution/src/system.rs | 5 +- linera-rpc/proto/rpc.proto | 9 + linera-rpc/src/client.rs | 17 +- linera-rpc/src/grpc/client.rs | 4 +- linera-rpc/src/grpc/conversions.rs | 19 ++ linera-rpc/src/grpc/server.rs | 3 +- linera-rpc/src/lib.rs | 1 + linera-rpc/src/simple/client.rs | 4 +- linera-rpc/src/simple/server.rs | 1 + .../tests/snapshots/format__format.yaml.snap | 54 ++++-- linera-sdk/src/test/chain.rs | 2 +- .../gql/service_schema.graphql | 14 ++ linera-service/src/linera/client_context.rs | 40 +++- linera-service/src/linera/client_options.rs | 9 + linera-service/src/linera/main.rs | 15 ++ linera-service/src/node_service.rs | 20 +- linera-service/src/schema_export.rs | 7 +- linera-storage/src/db_storage.rs | 33 ++-- linera-storage/src/lib.rs | 10 +- 35 files changed, 880 insertions(+), 184 deletions(-) diff --git a/CLI.md b/CLI.md index 6d93bdaf4af8..d7c3c2a6a9cc 100644 --- a/CLI.md +++ b/CLI.md @@ -27,6 +27,7 @@ This document contains the help content for the `linera` command-line program. * [`linera service`↴](#linera-service) * [`linera faucet`↴](#linera-faucet) * [`linera publish-bytecode`↴](#linera-publish-bytecode) +* [`linera publish-blob`↴](#linera-publish-blob) * [`linera create-application`↴](#linera-create-application) * [`linera publish-and-create`↴](#linera-publish-and-create) * [`linera request-application`↴](#linera-request-application) @@ -77,6 +78,7 @@ A Byzantine-fault tolerant sidechain with low-latency finality and high throughp * `service` — Run a GraphQL service to explore and extend the chains of the wallet * `faucet` — Run a GraphQL service that exposes a faucet where users can claim tokens. This gives away the chain's tokens, and is mainly intended for testing * `publish-bytecode` — Publish bytecode +* `publish-blob` — Publish a blob of binary data * `create-application` — Create an application * `publish-and-create` — Create an application, and publish the required bytecode * `request-application` — Request an application from another chain, so it can be used on this one @@ -553,6 +555,19 @@ Publish bytecode +## `linera publish-blob` + +Publish a blob of binary data + +**Usage:** `linera publish-blob [PUBLISHER]` + +###### **Arguments:** + +* `` — Path to blob file to be published +* `` — An optional chain ID to publish the blob. The default chain of the wallet is used otherwise + + + ## `linera create-application` Create an application diff --git a/linera-base/src/data_types.rs b/linera-base/src/data_types.rs index db791f1caf6a..4d14e8ba78b3 100644 --- a/linera-base/src/data_types.rs +++ b/linera-base/src/data_types.rs @@ -10,12 +10,13 @@ use anyhow::Context as _; use async_graphql::SimpleObject; use base64::engine::{general_purpose::STANDARD_NO_PAD, Engine as _}; use linera_witty::{WitLoad, WitStore, WitType}; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Deserializer, Serialize}; use thiserror::Error; use crate::{ + crypto::BcsHashable, doc_scalar, - identifiers::{ApplicationId, Destination, GenericApplicationId}, + identifiers::{ApplicationId, BlobId, Destination, GenericApplicationId}, time::{Duration, SystemTime}, }; @@ -731,6 +732,78 @@ impl std::str::FromStr for OracleResponse { } } +/// A blob of binary data. +#[derive(Eq, PartialEq, Debug, Hash, Clone, Serialize, Deserialize)] +pub struct Blob { + #[serde(with = "serde_bytes")] + bytes: Vec, +} + +impl Blob { + /// Creates a `HashedBlob` without checking that this is the correct `BlobId`! + pub fn with_hash_unchecked(self, blob_id: BlobId) -> HashedBlob { + HashedBlob { + id: blob_id, + blob: self, + } + } +} + +impl BcsHashable for Blob {} + +impl From for Blob { + fn from(blob: HashedBlob) -> Blob { + blob.blob + } +} + +/// A blob of binary data, with its content-addressed blob ID +#[derive(Eq, PartialEq, Debug, Hash, Clone)] +pub struct HashedBlob { + id: BlobId, + blob: Blob, +} + +impl HashedBlob { + /// Loads a hashed blob from a file. + pub async fn load_from_file(path: impl AsRef) -> std::io::Result { + let blob = Blob { + bytes: std::fs::read(path)?, + }; + Ok(blob.into()) + } + + /// A content-addressed blob ID i.e. the hash of the `Blob`. + pub fn id(&self) -> BlobId { + self.id + } +} + +impl Serialize for HashedBlob { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + self.blob.serialize(serializer) + } +} + +impl<'a> Deserialize<'a> for HashedBlob { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'a>, + { + Ok(Blob::deserialize(deserializer)?.into()) + } +} + +impl From for HashedBlob { + fn from(blob: Blob) -> HashedBlob { + let id = BlobId::new(&blob); + HashedBlob { blob, id } + } +} + doc_scalar!(Amount, "A non-negative amount of tokens."); doc_scalar!(BlockHeight, "A block height to identify blocks in a chain"); doc_scalar!( @@ -742,6 +815,11 @@ doc_scalar!( "A number to identify successive attempts to decide a value in a consensus protocol." ); doc_scalar!(OracleResponse, "A record of a single oracle response."); +doc_scalar!(Blob, "A blob of binary data."); +doc_scalar!( + HashedBlob, + "A blob of binary data, with it's content-addressed blob ID." +); #[cfg(test)] mod tests { diff --git a/linera-base/src/identifiers.rs b/linera-base/src/identifiers.rs index 945a75bf8973..c9f2b2fa90b7 100644 --- a/linera-base/src/identifiers.rs +++ b/linera-base/src/identifiers.rs @@ -15,8 +15,8 @@ use serde::{Deserialize, Serialize}; use crate::{ bcs_scalar, - crypto::{BcsHashable, BcsSignable, CryptoError, CryptoHash, PublicKey}, - data_types::BlockHeight, + crypto::{BcsHashable, CryptoError, CryptoHash, PublicKey}, + data_types::{Blob, BlockHeight}, doc_scalar, }; @@ -78,7 +78,7 @@ impl Account { } } -impl std::fmt::Display for Account { +impl Display for Account { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self.owner { Some(owner) => write!(f, "{}:{}", self.chain_id, owner), @@ -142,19 +142,23 @@ impl ChainDescription { #[cfg_attr(with_testing, derive(Default))] pub struct ChainId(pub CryptoHash); -/// A blob ID. +/// A content-addressed blob ID i.e. the hash of the Blob. #[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Copy, Hash, Debug, Serialize, Deserialize)] -#[cfg_attr(with_testing, derive(Default))] +#[cfg_attr(with_testing, derive(test_strategy::Arbitrary, Default))] pub struct BlobId(pub CryptoHash); -/// A blob. -#[derive(Serialize, Deserialize)] -pub struct Blob { - #[serde(with = "serde_bytes")] - bytes: Vec, +impl BlobId { + /// Creates a new `BlobId` from a `Blob` + pub fn new(blob: &Blob) -> Self { + BlobId(CryptoHash::new(blob)) + } } -impl BcsSignable for Blob {} +impl Display for BlobId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Display::fmt(&self.0, f) + } +} /// The index of a message in a chain. #[derive( @@ -802,7 +806,10 @@ doc_scalar!( ); doc_scalar!(AccountOwner, "An owner of an account."); doc_scalar!(Account, "An account"); -doc_scalar!(BlobId, "A blob id"); +doc_scalar!( + BlobId, + "A content-addressed blob ID i.e. the hash of the Blob" +); #[cfg(test)] mod tests { diff --git a/linera-chain/src/data_types.rs b/linera-chain/src/data_types.rs index 363712016413..c3535370210b 100644 --- a/linera-chain/src/data_types.rs +++ b/linera-chain/src/data_types.rs @@ -10,15 +10,15 @@ use std::{ use async_graphql::{Object, SimpleObject}; use linera_base::{ crypto::{BcsHashable, BcsSignable, CryptoError, CryptoHash, KeyPair, PublicKey, Signature}, - data_types::{Amount, BlockHeight, OracleRecord, Round, Timestamp}, + data_types::{Amount, BlockHeight, HashedBlob, OracleRecord, Round, Timestamp}, doc_scalar, ensure, identifiers::{ - Account, ChainId, ChannelName, Destination, GenericApplicationId, MessageId, Owner, + Account, BlobId, ChainId, ChannelName, Destination, GenericApplicationId, MessageId, Owner, }, }; use linera_execution::{ committee::{Committee, Epoch, ValidatorName}, - BytecodeLocation, Message, MessageKind, Operation, + BytecodeLocation, Message, MessageKind, Operation, SystemOperation, }; use serde::{de::Deserializer, Deserialize, Serialize}; @@ -78,6 +78,18 @@ impl Block { locations } + /// Returns all the blob IDs referred to in this block's operations. + pub fn blob_ids(&self) -> HashSet { + let mut blob_ids = HashSet::new(); + for operation in &self.operations { + if let Operation::System(SystemOperation::PublishBlob { blob_id }) = operation { + blob_ids.insert(blob_id.to_owned()); + } + } + + blob_ids + } + /// Returns whether the block contains only rejected incoming messages, which /// makes it admissible even on closed chains. pub fn has_only_rejected_messages(&self) -> bool { @@ -221,6 +233,7 @@ pub struct BlockProposal { pub owner: Owner, pub signature: Signature, pub hashed_certificate_values: Vec, + pub hashed_blobs: Vec, pub validated: Option, } @@ -800,6 +813,7 @@ impl BlockProposal { content: BlockAndRound, secret: &KeyPair, hashed_certificate_values: Vec, + hashed_blobs: Vec, validated: Option, ) -> Self { let outcome = validated @@ -818,6 +832,7 @@ impl BlockProposal { owner: secret.public().into(), signature, hashed_certificate_values, + hashed_blobs, validated, } } diff --git a/linera-chain/src/manager.rs b/linera-chain/src/manager.rs index 1b39e82c7076..da22f47118cd 100644 --- a/linera-chain/src/manager.rs +++ b/linera-chain/src/manager.rs @@ -72,9 +72,9 @@ use std::collections::BTreeMap; use linera_base::{ crypto::{KeyPair, PublicKey}, - data_types::{ArithmeticError, BlockHeight, Round, Timestamp}, + data_types::{ArithmeticError, BlockHeight, HashedBlob, Round, Timestamp}, doc_scalar, ensure, - identifiers::{ChainId, Owner}, + identifiers::{BlobId, ChainId, Owner}, ownership::ChainOwnership, }; use linera_execution::committee::Epoch; @@ -134,6 +134,8 @@ pub struct ChainManager { pub current_round: Round, /// The owners that take over in fallback mode. pub fallback_owners: BTreeMap, + /// The pending blobs + pub pending_blobs: BTreeMap, } doc_scalar!( @@ -202,6 +204,7 @@ impl ChainManager { round_timeout, current_round, fallback_owners, + pending_blobs: BTreeMap::new(), }) } @@ -404,6 +407,11 @@ impl ChainManager { self.locked = Some(validated.clone()); } } + + for hashed_blob in proposal.hashed_blobs { + self.pending_blobs.insert(hashed_blob.id(), hashed_blob); + } + if let Some(key_pair) = key_pair { let BlockAndRound { block, round } = proposal.content; let executed_block = outcome.with(block); diff --git a/linera-chain/src/test.rs b/linera-chain/src/test.rs index ebf586a6df7d..4a5f4859a1d7 100644 --- a/linera-chain/src/test.rs +++ b/linera-chain/src/test.rs @@ -122,7 +122,7 @@ impl BlockTestExt for Block { fn into_proposal_with_round(self, key_pair: &KeyPair, round: Round) -> BlockProposal { let content = BlockAndRound { block: self, round }; - BlockProposal::new(content, key_pair, vec![], None) + BlockProposal::new(content, key_pair, vec![], vec![], None) } fn into_justified_proposal( @@ -132,7 +132,7 @@ impl BlockTestExt for Block { validated: Certificate, ) -> BlockProposal { let content = BlockAndRound { block: self, round }; - BlockProposal::new(content, key_pair, vec![], Some(validated)) + BlockProposal::new(content, key_pair, vec![], vec![], Some(validated)) } } diff --git a/linera-core/src/client.rs b/linera-core/src/client.rs index 6602fa0d51ae..35319a8c9510 100644 --- a/linera-core/src/client.rs +++ b/linera-core/src/client.rs @@ -19,9 +19,11 @@ use futures::{ use linera_base::{ abi::Abi, crypto::{CryptoHash, KeyPair, PublicKey}, - data_types::{Amount, ApplicationPermissions, ArithmeticError, BlockHeight, Round, Timestamp}, + data_types::{ + Amount, ApplicationPermissions, ArithmeticError, BlockHeight, HashedBlob, Round, Timestamp, + }, ensure, - identifiers::{Account, ApplicationId, BytecodeId, ChainId, MessageId, Owner}, + identifiers::{Account, ApplicationId, BlobId, BytecodeId, ChainId, MessageId, Owner}, ownership::{ChainOwnership, TimeoutConfig}, }; use linera_chain::{ @@ -77,8 +79,11 @@ pub struct ChainClientBuilder { message_policy: MessagePolicy, /// Whether to block on cross-chain message delivery. cross_chain_message_delivery: CrossChainMessageDelivery, - /// Cached values by hash. - recent_values: Arc>>, + /// Cached hashed certificate values by hash. + recent_hashed_certificate_values: + Arc>>, + /// Cached blobs by blob ID. + recent_hashed_blobs: Arc>>, /// One-shot channels to notify callers when messages of a particular chain have been /// delivered. delivery_notifiers: Arc>, @@ -93,7 +98,10 @@ impl ChainClientBuilder { max_pending_messages: usize, cross_chain_message_delivery: CrossChainMessageDelivery, ) -> Self { - let recent_values = Arc::new(tokio::sync::Mutex::new(LruCache::new( + let recent_hashed_certificate_values = Arc::new(tokio::sync::Mutex::new(LruCache::new( + NonZeroUsize::try_from(DEFAULT_VALUE_CACHE_SIZE).unwrap(), + ))); + let recent_hashed_blobs = Arc::new(tokio::sync::Mutex::new(LruCache::new( NonZeroUsize::try_from(DEFAULT_VALUE_CACHE_SIZE).unwrap(), ))); Self { @@ -101,7 +109,8 @@ impl ChainClientBuilder { max_pending_messages, message_policy: MessagePolicy::Accept, cross_chain_message_delivery, - recent_values, + recent_hashed_certificate_values, + recent_hashed_blobs, delivery_notifiers: Arc::new(tokio::sync::Mutex::new(DeliveryNotifiers::default())), notifier: Arc::new(Notifier::default()), } @@ -133,7 +142,8 @@ impl ChainClientBuilder { let state = WorkerState::new_for_client( format!("Client node {:?}", chain_id), storage, - self.recent_values.clone(), + self.recent_hashed_certificate_values.clone(), + self.recent_hashed_blobs.clone(), self.delivery_notifiers.clone(), ) .with_allow_inactive_chains(true) @@ -153,6 +163,7 @@ impl ChainClientBuilder { next_block_height, pending_block, node_client, + pending_blobs: BTreeMap::new(), } } } @@ -217,6 +228,8 @@ pub struct ChainClient { /// Local node to manage the execution state and the local storage of the chains that we are /// tracking. node_client: LocalNodeClient, + /// The pending blobs + pending_blobs: BTreeMap, } /// Error type for [`ChainClient`]. @@ -269,6 +282,9 @@ pub enum ChainClientError { #[error("Found several possible identities to interact with chain {0}")] FoundMultipleKeysForChain(ChainId), + + #[error(transparent)] + ViewError(#[from] ViewError), } impl From for ChainClientError { @@ -608,7 +624,7 @@ where .communicate_chain_action(committee, submit_action, value) .await?; if certificate.value().is_confirmed() { - self.process_certificate(certificate.clone(), vec![]) + self.process_certificate(certificate.clone(), vec![], vec![]) .await?; Ok(certificate) } else { @@ -729,10 +745,14 @@ where .download_certificates(nodes.clone(), block.chain_id, block.height) .await?; // Process the received operations. Download required hashed certificate values if necessary. - if let Err(err) = self.process_certificate(certificate.clone(), vec![]).await { - if let LocalNodeError::WorkerError(WorkerError::ApplicationBytecodesNotFound( - locations, - )) = &err + if let Err(err) = self + .process_certificate(certificate.clone(), vec![], vec![]) + .await + { + // This assumes that we check for missing bytecode first then blobs + let values = if let LocalNodeError::WorkerError( + WorkerError::ApplicationBytecodesNotFound(locations), + ) = &err { let values = future::join_all(locations.iter().map(|location| { LocalNodeClient::::download_hashed_certificate_value( @@ -746,7 +766,24 @@ where .flatten() .collect::>(); if !values.is_empty() { - self.process_certificate(certificate.clone(), values) + self.process_certificate(certificate.clone(), values.clone(), vec![]) + .await?; + } + values + } else { + vec![] + }; + + if let LocalNodeError::WorkerError(WorkerError::BlobsNotFound(blob_ids)) = &err { + let blobs = future::join_all(blob_ids.iter().map(|blob_id| { + LocalNodeClient::::download_blob(nodes.clone(), block.chain_id, *blob_id) + })) + .await + .into_iter() + .flatten() + .collect::>(); + if !blobs.is_empty() { + self.process_certificate(certificate.clone(), values, blobs) .await?; } } @@ -979,10 +1016,11 @@ where &mut self, certificate: Certificate, hashed_certificate_values: Vec, + hashed_blobs: Vec, ) -> Result<(), LocalNodeError> { let info = self .node_client - .handle_certificate(certificate, hashed_certificate_values) + .handle_certificate(certificate, hashed_certificate_values, hashed_blobs) .await? .info; self.update_from_info(&info); @@ -1021,7 +1059,7 @@ where let certificate = self .communicate_chain_action(&committee, action, value) .await?; - self.process_certificate(certificate.clone(), vec![]) + self.process_certificate(certificate.clone(), vec![], vec![]) .await?; // The block height didn't increase, but this will communicate the timeout as well. self.communicate_chain_updates( @@ -1070,6 +1108,29 @@ where } } + /// Tries to read blobs from either the pending blobs or the Local Node's cache + async fn read_local_blobs( + &mut self, + blob_ids: impl IntoIterator, + ) -> Result, LocalNodeError> { + let mut blobs = Vec::new(); + for blob_id in blob_ids { + if let Some(blob) = self.node_client.recent_blob(&blob_id).await { + blobs.push(blob); + } else if let Some(blob) = self.pending_blobs.get(&blob_id) { + self.node_client.cache_recent_blob(blob).await; + blobs.push(blob.to_owned()); + } else { + return Err(LocalNodeError::CannotReadLocalBlob { + chain_id: self.chain_id, + blob_id, + }); + } + } + + Ok(blobs) + } + /// Executes (or retries) a regular block proposal. Updates local balance. async fn propose_block( &mut self, @@ -1130,11 +1191,13 @@ where }; // Collect the hashed certificate values required for execution. let committee = self.local_committee().await?; - let nodes = self.validator_node_provider.make_nodes(&committee)?; + let nodes: Vec<(ValidatorName, P::Node)> = + self.validator_node_provider.make_nodes(&committee)?; let values = self .node_client - .read_or_download_hashed_certificate_values(nodes, block.bytecode_locations()) + .read_or_download_hashed_certificate_values(nodes.clone(), block.bytecode_locations()) .await?; + let hashed_blobs = self.read_local_blobs(block.blob_ids()).await?; // Create the final block proposal. let key_pair = self.key_pair().await?; let proposal = BlockProposal::new( @@ -1144,6 +1207,7 @@ where }, key_pair, values, + hashed_blobs, validated, ); // Check the final block proposal. This will be cheaper after #1401. @@ -1812,6 +1876,21 @@ where }) } + /// Publishes some blob. + pub async fn publish_blob( + &mut self, + hashed_blob: HashedBlob, + ) -> Result, ChainClientError> { + self.node_client.cache_recent_blob(&hashed_blob).await; + self.pending_blobs + .insert(hashed_blob.id(), hashed_blob.clone()); + self.execute_operation(Operation::System(SystemOperation::PublishBlob { + blob_id: hashed_blob.id(), + })) + .await? + .try_map(|certificate| Ok((hashed_blob.id(), certificate))) + } + /// Creates an application by instantiating some bytecode. pub async fn create_application< A: Abi, diff --git a/linera-core/src/data_types.rs b/linera-core/src/data_types.rs index 0dd525b06852..dca205bab081 100644 --- a/linera-core/src/data_types.rs +++ b/linera-core/src/data_types.rs @@ -6,8 +6,8 @@ use std::collections::BTreeMap; use linera_base::{ crypto::{BcsSignable, CryptoError, CryptoHash, KeyPair, Signature}, - data_types::{Amount, BlockHeight, Round, Timestamp}, - identifiers::{ChainDescription, ChainId, Owner}, + data_types::{Amount, BlockHeight, HashedBlob, Round, Timestamp}, + identifiers::{BlobId, ChainDescription, ChainId, Owner}, }; use linera_chain::{ data_types::{ @@ -68,8 +68,10 @@ pub struct ChainInfoQuery { pub request_leader_timeout: bool, /// Include a vote to switch to fallback mode, if appropriate. pub request_fallback: bool, - /// Query a value that contains a binary hashed certificate value (e.g. bytecode) required by this chain. + /// Query a certificate value that contains a binary blob (e.g. bytecode) required by this chain. pub request_hashed_certificate_value: Option, + /// Query a binary blob (e.g. bytecode) required by this chain. + pub request_blob: Option, } impl ChainInfoQuery { @@ -86,6 +88,7 @@ impl ChainInfoQuery { request_leader_timeout: false, request_fallback: false, request_hashed_certificate_value: None, + request_blob: None, } } @@ -138,6 +141,11 @@ impl ChainInfoQuery { self.request_hashed_certificate_value = Some(hash); self } + + pub fn with_blob(mut self, blob_id: BlobId) -> Self { + self.request_blob = Some(blob_id); + self + } } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -175,6 +183,8 @@ pub struct ChainInfo { pub requested_received_log: Vec, /// The requested hashed certificate value, if any. pub requested_hashed_certificate_value: Option, + /// The requested blob, if any. + pub requested_blob: Option, } /// The response to an `ChainInfoQuery` @@ -254,6 +264,7 @@ where count_received_log: view.received_log.count(), requested_received_log: Vec::new(), requested_hashed_certificate_value: None, + requested_blob: None, } } } diff --git a/linera-core/src/local_node.rs b/linera-core/src/local_node.rs index 16ad6c3b35f5..e083f6222b57 100644 --- a/linera-core/src/local_node.rs +++ b/linera-core/src/local_node.rs @@ -6,8 +6,8 @@ use std::{borrow::Cow, sync::Arc}; use futures::{future, lock::Mutex}; use linera_base::{ - data_types::{ArithmeticError, BlockHeight}, - identifiers::{ChainId, MessageId}, + data_types::{ArithmeticError, BlockHeight, HashedBlob}, + identifiers::{BlobId, ChainId, MessageId}, }; use linera_chain::data_types::{ Block, BlockProposal, Certificate, ExecutedBlock, HashedCertificateValue, LiteCertificate, @@ -62,6 +62,9 @@ pub enum LocalNodeError { target_next_block_height: BlockHeight, }, + #[error("Failed to read blob {blob_id:?} of chain {chain_id:?}")] + CannotReadLocalBlob { chain_id: ChainId, blob_id: BlobId }, + #[error("The local node doesn't have an active chain {0:?}")] InactiveChain(ChainId), @@ -96,6 +99,7 @@ where .fully_handle_certificate_with_notifications( full_cert, vec![], + vec![], Some(&mut notifications), ) .await?; @@ -107,6 +111,7 @@ where &mut self, certificate: Certificate, hashed_certificate_values: Vec, + hashed_blobs: Vec, ) -> Result { let mut node = self.node.lock().await; let mut notifications = Vec::new(); @@ -115,6 +120,7 @@ where .fully_handle_certificate_with_notifications( certificate, hashed_certificate_values, + hashed_blobs, Some(&mut notifications), ) .await?; @@ -194,10 +200,13 @@ where tracing::warn!("Failed to process network certificate {}", hash); return info; } - let mut result = self.handle_certificate(certificate.clone(), vec![]).await; - if let Err(LocalNodeError::WorkerError(WorkerError::ApplicationBytecodesNotFound( - locations, - ))) = &result + let mut result = self + .handle_certificate(certificate.clone(), vec![], vec![]) + .await; + // This assumes that we check for missing bytecode first then blobs + let values = if let Err(LocalNodeError::WorkerError( + WorkerError::ApplicationBytecodesNotFound(locations), + )) = &result { let chain_id = certificate.value().chain_id(); let mut values = Vec::new(); @@ -220,8 +229,33 @@ where return info; } } - result = self.handle_certificate(certificate.clone(), values).await; + result = self + .handle_certificate(certificate.clone(), values.clone(), vec![]) + .await; + values + } else { + vec![] + }; + + if let Err(LocalNodeError::WorkerError(WorkerError::BlobsNotFound(blob_ids))) = &result + { + let chain_id = certificate.value().chain_id(); + let blobs = future::join_all(blob_ids.iter().map(|blob_id| { + let mut node = node.clone(); + async move { + Self::try_download_blob_from(name, &mut node, chain_id, *blob_id).await + } + })) + .await + .into_iter() + .flatten() + .collect::>(); + + result = self + .handle_certificate(certificate.clone(), values, blobs) + .await; } + match result { Ok(response) => info = Some(response.info), Err(error) => { @@ -266,6 +300,18 @@ where Ok(response) } + pub async fn recent_blob(&self, blob_id: &BlobId) -> Option { + let mut node = self.node.lock().await; + node.state.recent_blob(blob_id).await + } + + pub async fn cache_recent_blob(&self, hashed_blob: &HashedBlob) -> bool { + let mut node = self.node.lock().await; + node.state + .cache_recent_blob(Cow::Borrowed(hashed_blob)) + .await + } + pub async fn download_certificates( &mut self, mut validators: Vec<(ValidatorName, A)>, @@ -317,7 +363,11 @@ where let mut tasks = vec![]; let mut node = self.node.lock().await; for (location, chain_id) in hashed_certificate_value_locations { - if let Some(value) = node.state.recent_value(&location.certificate_hash).await { + if let Some(value) = node + .state + .recent_hashed_certificate_value(&location.certificate_hash) + .await + { values.push(value); } else { let validators = validators.clone(); @@ -335,7 +385,9 @@ where let mut node = self.node.lock().await; for result in results { if let Some(value) = result? { - node.state.cache_recent_value(Cow::Borrowed(&value)).await; + node.state + .cache_recent_hashed_certificate_value(Cow::Borrowed(&value)) + .await; values.push(value); } } @@ -502,7 +554,7 @@ where if let Some(cert) = info.manager.requested_locked { if cert.value().is_validated() && cert.value().chain_id() == chain_id { let hash = cert.hash(); - if let Err(error) = self.handle_certificate(*cert, vec![]).await { + if let Err(error) = self.handle_certificate(*cert, vec![], vec![]).await { tracing::warn!("Skipping certificate {}: {}", hash, error); } } @@ -532,6 +584,44 @@ where None } + pub async fn download_blob( + mut validators: Vec<(ValidatorName, A)>, + chain_id: ChainId, + blob_id: BlobId, + ) -> Option + where + A: LocalValidatorNode + Clone + 'static, + { + // Sequentially try each validator in random order. + validators.shuffle(&mut rand::thread_rng()); + for (name, mut node) in validators { + if let Some(blob) = + Self::try_download_blob_from(name, &mut node, chain_id, blob_id).await + { + return Some(blob); + } + } + None + } + + async fn try_download_blob_from( + name: ValidatorName, + node: &mut A, + chain_id: ChainId, + blob_id: BlobId, + ) -> Option + where + A: LocalValidatorNode + Clone + 'static, + { + let query = ChainInfoQuery::new(chain_id).with_blob(blob_id); + if let Ok(response) = node.handle_chain_info_query(query).await { + if response.check(name).is_ok() { + return response.info.requested_blob; + } + } + None + } + async fn try_download_hashed_certificate_value_from( name: ValidatorName, node: &mut A, diff --git a/linera-core/src/node.rs b/linera-core/src/node.rs index f1f59ca6950d..850f4c2f6cfb 100644 --- a/linera-core/src/node.rs +++ b/linera-core/src/node.rs @@ -5,8 +5,8 @@ use futures::stream::{BoxStream, LocalBoxStream, Stream}; use linera_base::{ crypto::CryptoError, - data_types::{ArithmeticError, BlockHeight}, - identifiers::ChainId, + data_types::{ArithmeticError, BlockHeight, HashedBlob}, + identifiers::{BlobId, ChainId}, }; use linera_chain::{ data_types::{BlockProposal, Certificate, HashedCertificateValue, LiteCertificate, Origin}, @@ -62,6 +62,7 @@ pub trait LocalValidatorNode { &mut self, certificate: Certificate, hashed_certificate_values: Vec, + hashed_blobs: Vec, delivery: CrossChainMessageDelivery, ) -> Result; @@ -163,6 +164,10 @@ pub enum NodeError { #[error("The following values containing application bytecode are missing: {0:?}.")] ApplicationBytecodesNotFound(Vec), + // This error must be normalized during conversions. + #[error("The following blobs are missing: {0:?}.")] + BlobsNotFound(Vec), + // This error must be normalized during conversions. #[error("We don't have the value for the certificate.")] MissingCertificateValue, @@ -277,6 +282,7 @@ impl From for NodeError { WorkerError::ApplicationBytecodesNotFound(locations) => { NodeError::ApplicationBytecodesNotFound(locations) } + WorkerError::BlobsNotFound(blob_ids) => NodeError::BlobsNotFound(blob_ids), error => Self::WorkerError { error: error.to_string(), }, diff --git a/linera-core/src/unit_tests/test_utils.rs b/linera-core/src/unit_tests/test_utils.rs index 2fef6973e9bf..f250c64f81ca 100644 --- a/linera-core/src/unit_tests/test_utils.rs +++ b/linera-core/src/unit_tests/test_utils.rs @@ -126,10 +126,16 @@ where &mut self, certificate: Certificate, hashed_certificate_values: Vec, + hashed_blobs: Vec, _delivery: CrossChainMessageDelivery, ) -> Result { self.spawn_and_receive(move |validator, sender| { - validator.do_handle_certificate(certificate, hashed_certificate_values, sender) + validator.do_handle_certificate( + certificate, + hashed_certificate_values, + hashed_blobs, + sender, + ) }) .await } @@ -238,6 +244,7 @@ where .fully_handle_certificate_with_notifications( cert, vec![], + vec![], Some(&mut notifications), ) .await @@ -257,6 +264,7 @@ where self, certificate: Certificate, hashed_certificate_values: Vec, + hashed_blobs: Vec, sender: oneshot::Sender>, ) -> Result<(), Result> { let mut validator = self.client.lock().await; @@ -272,6 +280,7 @@ where .fully_handle_certificate_with_notifications( certificate, hashed_certificate_values, + hashed_blobs, Some(&mut notifications), ) .await diff --git a/linera-core/src/unit_tests/wasm_client_tests.rs b/linera-core/src/unit_tests/wasm_client_tests.rs index 2309f5583ee8..44b88b555262 100644 --- a/linera-core/src/unit_tests/wasm_client_tests.rs +++ b/linera-core/src/unit_tests/wasm_client_tests.rs @@ -18,7 +18,7 @@ use assert_matches::assert_matches; use async_graphql::Request; use counter::CounterAbi; use linera_base::{ - data_types::{Amount, OracleRecord, OracleResponse}, + data_types::{Amount, HashedBlob, OracleRecord, OracleResponse}, identifiers::{AccountOwner, ApplicationId, ChainDescription, ChainId, Destination, Owner}, ownership::{ChainOwnership, TimeoutConfig}, }; @@ -109,6 +109,29 @@ where let (contract_path, service_path) = linera_execution::wasm_test::get_example_bytecode_paths("counter")?; + + let contract_blob = HashedBlob::load_from_file(contract_path.clone()).await?; + let expected_contract_blob_id = contract_blob.id(); + let (blob_id, _) = publisher + .publish_blob(contract_blob.clone()) + .await + .unwrap() + .unwrap(); + assert_eq!(expected_contract_blob_id, blob_id); + + let service_blob = HashedBlob::load_from_file(service_path.clone()).await?; + let expected_service_blob_id = service_blob.id(); + let (blob_id, _) = publisher.publish_blob(service_blob).await.unwrap().unwrap(); + assert_eq!(expected_service_blob_id, blob_id); + + // If I try to upload the contract blob again, I should get the same blob ID + let (blob_id, _) = publisher + .publish_blob(contract_blob) + .await + .unwrap() + .unwrap(); + assert_eq!(expected_contract_blob_id, blob_id); + let (bytecode_id, cert) = publisher .publish_bytecode( Bytecode::load_from_file(contract_path).await?, diff --git a/linera-core/src/unit_tests/wasm_worker_tests.rs b/linera-core/src/unit_tests/wasm_worker_tests.rs index 9ab09fb94cbc..5b314d646a4a 100644 --- a/linera-core/src/unit_tests/wasm_worker_tests.rs +++ b/linera-core/src/unit_tests/wasm_worker_tests.rs @@ -155,7 +155,7 @@ where let publish_certificate = make_certificate(&committee, &worker, publish_block_proposal); let info = worker - .fully_handle_certificate(publish_certificate.clone(), vec![]) + .fully_handle_certificate(publish_certificate.clone(), vec![], vec![]) .await .unwrap() .info; @@ -226,7 +226,7 @@ where make_certificate(&committee, &worker, failing_broadcast_block_proposal); worker - .fully_handle_certificate(failing_broadcast_certificate, vec![]) + .fully_handle_certificate(failing_broadcast_certificate, vec![], vec![]) .await .expect_err("Broadcast messages with grants should fail"); @@ -250,7 +250,7 @@ where let broadcast_certificate = make_certificate(&committee, &worker, broadcast_block_proposal); let info = worker - .fully_handle_certificate(broadcast_certificate.clone(), vec![]) + .fully_handle_certificate(broadcast_certificate.clone(), vec![], vec![]) .await .unwrap() .info; @@ -305,7 +305,7 @@ where let subscribe_certificate = make_certificate(&committee, &worker, subscribe_block_proposal); let info = worker - .fully_handle_certificate(subscribe_certificate.clone(), vec![]) + .fully_handle_certificate(subscribe_certificate.clone(), vec![], vec![]) .await .unwrap() .info; @@ -358,7 +358,7 @@ where let accept_certificate = make_certificate(&committee, &worker, accept_block_proposal); let info = worker - .fully_handle_certificate(accept_certificate.clone(), vec![]) + .fully_handle_certificate(accept_certificate.clone(), vec![], vec![]) .await .unwrap() .info; @@ -452,7 +452,7 @@ where let create_certificate = make_certificate(&committee, &worker, create_block_proposal); let info = worker - .fully_handle_certificate(create_certificate.clone(), vec![]) + .fully_handle_certificate(create_certificate.clone(), vec![], vec![]) .await .unwrap() .info; @@ -505,7 +505,7 @@ where let run_certificate = make_certificate(&committee, &worker, run_block_proposal); let info = worker - .fully_handle_certificate(run_certificate.clone(), vec![]) + .fully_handle_certificate(run_certificate.clone(), vec![], vec![]) .await .unwrap() .info; diff --git a/linera-core/src/unit_tests/worker_tests.rs b/linera-core/src/unit_tests/worker_tests.rs index 9f7c1bc2c4b1..797eb01962f4 100644 --- a/linera-core/src/unit_tests/worker_tests.rs +++ b/linera-core/src/unit_tests/worker_tests.rs @@ -528,7 +528,7 @@ where make_certificate(&committee, &worker, value) }; worker - .fully_handle_certificate(certificate.clone(), vec![]) + .fully_handle_certificate(certificate.clone(), vec![], vec![]) .await?; { @@ -577,7 +577,7 @@ where let unknown_key = KeyPair::generate(); let unknown_sender_block_proposal = - BlockProposal::new(block_proposal.content, &unknown_key, vec![], None); + BlockProposal::new(block_proposal.content, &unknown_key, vec![], vec![], None); assert_matches!( worker .handle_block_proposal(unknown_sender_block_proposal) @@ -659,7 +659,7 @@ where .pending() .is_some()); worker - .handle_certificate(certificate0, vec![], None) + .handle_certificate(certificate0, vec![], vec![], None) .await?; worker.handle_block_proposal(block_proposal1).await?; assert!(worker @@ -766,7 +766,7 @@ where // Missing earlier blocks assert_matches!( worker - .handle_certificate(certificate1.clone(), vec![], None) + .handle_certificate(certificate1.clone(), vec![], vec![], None) .await, Err(WorkerError::MissingEarlierBlocks { .. }) ); @@ -777,6 +777,7 @@ where .fully_handle_certificate_with_notifications( certificate0.clone(), vec![], + vec![], Some(&mut notifications), ) .await?; @@ -784,6 +785,7 @@ where .fully_handle_certificate_with_notifications( certificate1.clone(), vec![], + vec![], Some(&mut notifications), ) .await?; @@ -1019,7 +1021,7 @@ where ), ); worker - .handle_certificate(certificate.clone(), vec![], None) + .handle_certificate(certificate.clone(), vec![], vec![], None) .await?; // Then receive the next two messages. @@ -1232,7 +1234,7 @@ where .await; assert_matches!( worker - .fully_handle_certificate(certificate, vec![]) + .fully_handle_certificate(certificate, vec![], vec![]) .await, Err(WorkerError::ChainError(error)) if matches!(*error, ChainError::InactiveChain {..}) ); @@ -1314,7 +1316,7 @@ where ); let certificate = make_certificate(&committee, &worker, value); let info = worker - .fully_handle_certificate(certificate, vec![]) + .fully_handle_certificate(certificate, vec![], vec![]) .await? .info; assert_eq!(info.next_block_height, BlockHeight::from(1)); @@ -1356,7 +1358,9 @@ where // This fails because `make_simple_transfer_certificate` uses `sender_key_pair.public()` to // compute the hash of the execution state. assert_matches!( - worker.fully_handle_certificate(certificate, vec![]).await, + worker + .fully_handle_certificate(certificate, vec![], vec![]) + .await, Err(WorkerError::IncorrectStateHash) ); Ok(()) @@ -1403,9 +1407,11 @@ where .await; // Replays are ignored. worker - .fully_handle_certificate(certificate.clone(), vec![]) + .fully_handle_certificate(certificate.clone(), vec![], vec![]) + .await?; + worker + .fully_handle_certificate(certificate, vec![], vec![]) .await?; - worker.fully_handle_certificate(certificate, vec![]).await?; Ok(()) } @@ -1466,7 +1472,7 @@ where ) .await; worker - .fully_handle_certificate(certificate.clone(), vec![]) + .fully_handle_certificate(certificate.clone(), vec![], vec![]) .await?; let mut chain = worker.storage.load_active_chain(ChainId::root(1)).await?; assert_eq!(Amount::ZERO, *chain.execution_state.system.balance.get()); @@ -1551,7 +1557,7 @@ where ) .await; worker - .fully_handle_certificate(certificate.clone(), vec![]) + .fully_handle_certificate(certificate.clone(), vec![], vec![]) .await?; let new_sender_chain = worker.storage.load_active_chain(ChainId::root(1)).await?; assert_eq!( @@ -1606,7 +1612,7 @@ where ) .await; worker - .fully_handle_certificate(certificate.clone(), vec![]) + .fully_handle_certificate(certificate.clone(), vec![], vec![]) .await?; let mut chain = worker.storage.load_active_chain(ChainId::root(1)).await?; assert_eq!(Amount::ZERO, *chain.execution_state.system.balance.get()); @@ -1864,7 +1870,7 @@ where .await; let info = worker - .fully_handle_certificate(certificate.clone(), vec![]) + .fully_handle_certificate(certificate.clone(), vec![], vec![]) .await? .info; assert_eq!(ChainId::root(1), info.chain_id); @@ -1910,7 +1916,7 @@ where ) .await; worker - .fully_handle_certificate(certificate.clone(), vec![]) + .fully_handle_certificate(certificate.clone(), vec![], vec![]) .await?; assert_eq!( @@ -1993,7 +1999,7 @@ where .await; let info = worker - .fully_handle_certificate(certificate.clone(), vec![]) + .fully_handle_certificate(certificate.clone(), vec![], vec![]) .await? .info; assert_eq!(ChainId::root(1), info.chain_id); @@ -2065,7 +2071,7 @@ where .await; worker - .fully_handle_certificate(certificate00.clone(), vec![]) + .fully_handle_certificate(certificate00.clone(), vec![], vec![]) .await?; let certificate01 = make_transfer_certificate( @@ -2102,7 +2108,7 @@ where .await; worker - .fully_handle_certificate(certificate01.clone(), vec![]) + .fully_handle_certificate(certificate01.clone(), vec![], vec![]) .await?; { @@ -2127,7 +2133,7 @@ where .await; worker - .fully_handle_certificate(certificate1.clone(), vec![]) + .fully_handle_certificate(certificate1.clone(), vec![], vec![]) .await?; let certificate2 = make_transfer_certificate( @@ -2146,7 +2152,7 @@ where .await; worker - .fully_handle_certificate(certificate2.clone(), vec![]) + .fully_handle_certificate(certificate2.clone(), vec![], vec![]) .await?; // Reject the first transfer and try to use the money of the second one. @@ -2205,7 +2211,7 @@ where .await; worker - .fully_handle_certificate(certificate.clone(), vec![]) + .fully_handle_certificate(certificate.clone(), vec![], vec![]) .await?; { @@ -2248,7 +2254,7 @@ where .await; worker - .fully_handle_certificate(certificate3.clone(), vec![]) + .fully_handle_certificate(certificate3.clone(), vec![], vec![]) .await?; { @@ -2354,7 +2360,7 @@ where ), ); worker - .fully_handle_certificate(certificate0.clone(), vec![]) + .fully_handle_certificate(certificate0.clone(), vec![], vec![]) .await?; { let admin_chain = worker.storage.load_active_chain(admin_id).await?; @@ -2415,7 +2421,7 @@ where ), ); worker - .fully_handle_certificate(certificate1.clone(), vec![]) + .fully_handle_certificate(certificate1.clone(), vec![], vec![]) .await?; // Have the admin chain accept the subscription now. @@ -2465,7 +2471,7 @@ where ), ); worker - .fully_handle_certificate(certificate2.clone(), vec![]) + .fully_handle_certificate(certificate2.clone(), vec![], vec![]) .await?; { // The root chain has 1 subscribers. @@ -2643,7 +2649,7 @@ where ), ); worker - .fully_handle_certificate(certificate3, vec![]) + .fully_handle_certificate(certificate3, vec![], vec![]) .await?; { let mut user_chain = worker.storage.load_active_chain(user_id).await?; @@ -2774,12 +2780,12 @@ where ), ); worker - .fully_handle_certificate(certificate1.clone(), vec![]) + .fully_handle_certificate(certificate1.clone(), vec![], vec![]) .await?; // Try to execute the transfer. worker - .fully_handle_certificate(certificate0.clone(), vec![]) + .fully_handle_certificate(certificate0.clone(), vec![], vec![]) .await?; // The transfer was started.. @@ -2910,12 +2916,12 @@ where ), ); worker - .fully_handle_certificate(certificate1.clone(), vec![]) + .fully_handle_certificate(certificate1.clone(), vec![], vec![]) .await?; // Try to execute the transfer from the user chain to the admin chain. worker - .fully_handle_certificate(certificate0.clone(), vec![]) + .fully_handle_certificate(certificate0.clone(), vec![], vec![]) .await?; { @@ -2979,7 +2985,7 @@ where ), ); worker - .fully_handle_certificate(certificate2.clone(), vec![]) + .fully_handle_certificate(certificate2.clone(), vec![], vec![]) .await?; { @@ -2994,7 +3000,7 @@ where // Try again to execute the transfer from the user chain to the admin chain. // This time, the epoch verification should be overruled. worker - .fully_handle_certificate(certificate0.clone(), vec![]) + .fully_handle_certificate(certificate0.clone(), vec![], vec![]) .await?; { @@ -3234,7 +3240,7 @@ where let value0 = HashedCertificateValue::new_confirmed(executed_block0); let certificate0 = make_certificate(&committee, &worker, value0.clone()); let response = worker - .fully_handle_certificate(certificate0, vec![]) + .fully_handle_certificate(certificate0, vec![], vec![]) .await?; // The leader sequence is pseudorandom but deterministic. The first leader is owner 1. @@ -3274,7 +3280,7 @@ where .unwrap() .into_certificate(); let (response, _) = worker - .handle_certificate(certificate_timeout, vec![], None) + .handle_certificate(certificate_timeout, vec![], vec![], None) .await?; assert_eq!(response.info.manager.leader, Some(Owner::from(pub_key0))); @@ -3296,7 +3302,7 @@ where let vote = response.info.manager.pending.clone().unwrap(); let certificate1 = vote.with_value(value1.clone()).unwrap().into_certificate(); let (response, _) = worker - .handle_certificate(certificate1.clone(), vec![], None) + .handle_certificate(certificate1.clone(), vec![], vec![], None) .await?; let vote = response.info.manager.pending.as_ref().unwrap(); let value = HashedCertificateValue::new_confirmed(executed_block1.clone()); @@ -3310,7 +3316,7 @@ where Round::SingleLeader(4), ); let (response, _) = worker - .handle_certificate(certificate_timeout, vec![], None) + .handle_certificate(certificate_timeout, vec![], vec![], None) .await?; assert_eq!(response.info.manager.leader, Some(Owner::from(pub_key1))); assert_eq!(response.info.manager.current_round, Round::SingleLeader(5)); @@ -3325,7 +3331,9 @@ where let value2 = HashedCertificateValue::new_validated(executed_block2.clone()); let certificate = make_certificate_with_round(&committee, &worker, value2.clone(), Round::SingleLeader(2)); - worker.handle_certificate(certificate, vec![], None).await?; + worker + .handle_certificate(certificate, vec![], vec![], None) + .await?; let query_values = ChainInfoQuery::new(chain_id).with_manager_values(); let (response, _) = worker.handle_chain_info_query(query_values.clone()).await?; assert_eq!( @@ -3366,7 +3374,7 @@ where Round::SingleLeader(5), ); let (response, _) = worker - .handle_certificate(certificate_timeout, vec![], None) + .handle_certificate(certificate_timeout, vec![], vec![], None) .await?; assert_eq!(response.info.manager.leader, Some(Owner::from(pub_key0))); assert_eq!(response.info.manager.current_round, Round::SingleLeader(6)); @@ -3382,7 +3390,7 @@ where let certificate_timeout = make_certificate_with_round(&committee, &worker, value_timeout, Round::SingleLeader(7)); let (response, _) = worker - .handle_certificate(certificate_timeout, vec![], None) + .handle_certificate(certificate_timeout, vec![], vec![], None) .await?; assert_eq!(response.info.manager.current_round, Round::SingleLeader(8)); @@ -3392,7 +3400,7 @@ where make_certificate_with_round(&committee, &worker, value1, Round::SingleLeader(7)); let mut worker = worker.with_key_pair(None); // Forget validator keys. worker - .handle_certificate(certificate.clone(), vec![], None) + .handle_certificate(certificate.clone(), vec![], vec![], None) .await?; let (response, _) = worker.handle_chain_info_query(query_values).await?; assert_eq!( @@ -3434,7 +3442,7 @@ where let value0 = HashedCertificateValue::new_confirmed(executed_block0); let certificate0 = make_certificate(&committee, &worker, value0.clone()); let response = worker - .fully_handle_certificate(certificate0, vec![]) + .fully_handle_certificate(certificate0, vec![], vec![]) .await?; // The first round is the fast-block round, and owner 0 is a super owner. @@ -3473,7 +3481,7 @@ where .unwrap() .into_certificate(); let (response, _) = worker - .handle_certificate(certificate_timeout, vec![], None) + .handle_certificate(certificate_timeout, vec![], vec![], None) .await?; assert_eq!(response.info.manager.current_round, Round::MultiLeader(0)); assert_eq!(response.info.manager.leader, None); @@ -3522,7 +3530,7 @@ where let value0 = HashedCertificateValue::new_confirmed(executed_block0); let certificate0 = make_certificate(&committee, &worker, value0.clone()); let response = worker - .fully_handle_certificate(certificate0, vec![]) + .fully_handle_certificate(certificate0, vec![], vec![]) .await?; // The first round is the fast-block round, and owner 0 is a super owner. @@ -3549,7 +3557,7 @@ where let certificate_timeout = make_certificate_with_round(&committee, &worker, value_timeout.clone(), Round::Fast); let (response, _) = worker - .handle_certificate(certificate_timeout, vec![], None) + .handle_certificate(certificate_timeout, vec![], vec![], None) .await?; assert_eq!(response.info.manager.current_round, Round::MultiLeader(0)); assert_eq!(response.info.manager.leader, None); @@ -3596,7 +3604,7 @@ where let certificate3 = make_certificate_with_round(&committee, &worker, value2.clone(), Round::MultiLeader(2)); worker - .handle_certificate(certificate3.clone(), vec![], None) + .handle_certificate(certificate3.clone(), vec![], vec![], None) .await?; let query_values = ChainInfoQuery::new(chain_id).with_manager_values(); let (response, _) = worker.handle_chain_info_query(query_values).await?; @@ -3647,7 +3655,9 @@ where let (executed_block, _) = worker.stage_block_execution(block).await?; let value = HashedCertificateValue::new_confirmed(executed_block); let certificate = make_certificate(&committee, &worker, value); - worker.fully_handle_certificate(certificate, vec![]).await?; + worker + .fully_handle_certificate(certificate, vec![], vec![]) + .await?; // The message only just arrived: No fallback mode. let (response, _) = worker.handle_chain_info_query(query.clone()).await?; @@ -3662,7 +3672,9 @@ where assert_eq!(vote.value.value_hash, value.hash()); assert_eq!(vote.round, round); let certificate = make_certificate_with_round(&committee, &worker, value, round); - worker.fully_handle_certificate(certificate, vec![]).await?; + worker + .fully_handle_certificate(certificate, vec![], vec![]) + .await?; // Now we are in fallback mode, and the validator is the leader. let (response, _) = worker.handle_chain_info_query(query.clone()).await?; diff --git a/linera-core/src/updater.rs b/linera-core/src/updater.rs index 54181c96b9cc..5c2252466cd1 100644 --- a/linera-core/src/updater.rs +++ b/linera-core/src/updater.rs @@ -198,7 +198,7 @@ where } } self.node - .handle_certificate(certificate.clone(), vec![], delivery) + .handle_certificate(certificate.clone(), vec![], vec![], delivery) .await } @@ -211,7 +211,8 @@ where .send_optimized_certificate(&certificate, delivery) .await; - if let Err(NodeError::ApplicationBytecodesNotFound(locations)) = &result { + // This assumes that we check for missing bytecode first then blobs + let values = if let Err(NodeError::ApplicationBytecodesNotFound(locations)) = &result { // Find the missing bytecodes locally and retry. let required = match certificate.value() { CertificateValue::ConfirmedBlock { executed_block, .. } @@ -241,7 +242,47 @@ where .collect::, _>>()?; result = self .node - .handle_certificate(certificate.clone(), values, delivery) + .handle_certificate(certificate.clone(), values.clone(), vec![], delivery) + .await; + values + } else { + vec![] + }; + + if let Err(NodeError::BlobsNotFound(blob_ids)) = &result { + // Find the missing blobs locally and retry. + let required = match certificate.value() { + CertificateValue::ConfirmedBlock { executed_block, .. } + | CertificateValue::ValidatedBlock { executed_block, .. } => { + executed_block.block.blob_ids() + } + CertificateValue::Timeout { .. } => HashSet::new(), + }; + for blob_id in blob_ids { + if !required.contains(blob_id) { + warn!( + "validator requested blob {:?} but it is not required", + blob_id + ); + return Err(NodeError::InvalidChainInfoResponse); + } + } + let unique_blob_ids = blob_ids.iter().cloned().collect::>(); + if blob_ids.len() > unique_blob_ids.len() { + warn!("blobs requested by validator contain duplicates"); + return Err(NodeError::InvalidChainInfoResponse); + } + let blobs = future::join_all( + unique_blob_ids + .into_iter() + .map(|blob_id| self.storage.read_hashed_blob(blob_id)), + ) + .await + .into_iter() + .collect::, _>>()?; + result = self + .node + .handle_certificate(certificate.clone(), values, blobs, delivery) .await; } @@ -260,6 +301,7 @@ where Ok(response) => response, Err(NodeError::MissingCrossChainUpdate { .. }) | Err(NodeError::InactiveChain(_)) + | Err(NodeError::BlobsNotFound(_)) | Err(NodeError::ApplicationBytecodesNotFound(_)) => { // Some received certificates may be missing for this validator // (e.g. to create the chain or make the balance sufficient) so we are going to diff --git a/linera-core/src/worker.rs b/linera-core/src/worker.rs index da2ac0fbfad4..c2284bb8c9c9 100644 --- a/linera-core/src/worker.rs +++ b/linera-core/src/worker.rs @@ -14,9 +14,9 @@ use async_trait::async_trait; use futures::{future, FutureExt}; use linera_base::{ crypto::{CryptoHash, KeyPair}, - data_types::{ArithmeticError, BlockHeight, Round}, + data_types::{ArithmeticError, BlockHeight, HashedBlob, Round}, doc_scalar, ensure, - identifiers::{ChainId, Owner}, + identifiers::{BlobId, ChainId, Owner}, }; use linera_chain::{ data_types::{ @@ -24,7 +24,8 @@ use linera_chain::{ ExecutedBlock, HashedCertificateValue, IncomingMessage, LiteCertificate, Medium, MessageAction, MessageBundle, Origin, OutgoingMessage, Target, }, - manager, ChainError, ChainStateView, + manager::{self}, + ChainError, ChainStateView, }; use linera_execution::{ committee::{Committee, Epoch}, @@ -123,6 +124,7 @@ pub trait ValidatorWorker { &mut self, certificate: Certificate, hashed_certificate_values: Vec, + hashed_blobs: Vec, notify_message_delivery: Option>, ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError>; @@ -245,12 +247,16 @@ pub enum WorkerError { InvalidLiteCertificate, #[error("An additional value was provided that is not required: {value_hash}.")] UnneededValue { value_hash: CryptoHash }, + #[error("An additional blob was provided that is not required: {blob_id}.")] + UnneededBlob { blob_id: BlobId }, #[error("The following values containing application bytecode are missing: {0:?}.")] ApplicationBytecodesNotFound(Vec), #[error("The certificate in the block proposal is not a ValidatedBlock")] MissingExecutedBlockInProposal, #[error("Fast blocks cannot query oracles")] FastBlockUsingOracles, + #[error("The following blobs are missing: {0:?}.")] + BlobsNotFound(Vec), } impl From for WorkerError { @@ -278,8 +284,10 @@ pub struct WorkerState { /// Blocks with a timestamp this far in the future will still be accepted, but the validator /// will wait until that timestamp before voting. grace_period: Duration, - /// Cached values by hash. - recent_values: Arc>>, + /// Cached hashed certificate values by hash. + recent_hashed_certificate_values: Arc>>, + /// Cached hashed blobs by `BlobId`. + recent_hashed_blobs: Arc>>, /// One-shot channels to notify callers when messages of a particular chain have been /// delivered. delivery_notifiers: Arc>, @@ -290,7 +298,10 @@ pub(crate) type DeliveryNotifiers = impl WorkerState { pub fn new(nickname: String, key_pair: Option, storage: StorageClient) -> Self { - let recent_values = Arc::new(Mutex::new(LruCache::new( + let recent_hashed_certificate_values = Arc::new(Mutex::new(LruCache::new( + NonZeroUsize::try_from(DEFAULT_VALUE_CACHE_SIZE).unwrap(), + ))); + let recent_hashed_blobs = Arc::new(Mutex::new(LruCache::new( NonZeroUsize::try_from(DEFAULT_VALUE_CACHE_SIZE).unwrap(), ))); WorkerState { @@ -300,7 +311,8 @@ impl WorkerState { allow_inactive_chains: false, allow_messages_from_deprecated_epochs: false, grace_period: Duration::ZERO, - recent_values, + recent_hashed_certificate_values, + recent_hashed_blobs, delivery_notifiers: Arc::default(), } } @@ -308,7 +320,8 @@ impl WorkerState { pub fn new_for_client( nickname: String, storage: StorageClient, - recent_values: Arc>>, + recent_hashed_certificate_values: Arc>>, + recent_hashed_blobs: Arc>>, delivery_notifiers: Arc>, ) -> Self { WorkerState { @@ -318,7 +331,8 @@ impl WorkerState { allow_inactive_chains: false, allow_messages_from_deprecated_epochs: false, grace_period: Duration::ZERO, - recent_values, + recent_hashed_certificate_values, + recent_hashed_blobs, delivery_notifiers, } } @@ -370,8 +384,9 @@ impl WorkerState { certificate: LiteCertificate<'_>, ) -> Result { let hash = certificate.value.value_hash; - let mut recent_values = self.recent_values.lock().await; - let value = recent_values + let mut recent_hashed_certificate_values = + self.recent_hashed_certificate_values.lock().await; + let value = recent_hashed_certificate_values .get(&hash) .ok_or(WorkerError::MissingCertificateValue)?; certificate @@ -379,11 +394,19 @@ impl WorkerState { .ok_or(WorkerError::InvalidLiteCertificate) } - pub(crate) async fn recent_value( + pub(crate) async fn recent_hashed_certificate_value( &mut self, hash: &CryptoHash, ) -> Option { - self.recent_values.lock().await.get(hash).cloned() + self.recent_hashed_certificate_values + .lock() + .await + .get(hash) + .cloned() + } + + pub(crate) async fn recent_blob(&mut self, blob_id: &BlobId) -> Option { + self.recent_hashed_blobs.lock().await.get(blob_id).cloned() } } @@ -398,10 +421,12 @@ where &mut self, certificate: Certificate, hashed_certificate_values: Vec, + hashed_blobs: Vec, ) -> Result { self.fully_handle_certificate_with_notifications( certificate, hashed_certificate_values, + hashed_blobs, None, ) .await @@ -412,10 +437,11 @@ where &mut self, certificate: Certificate, hashed_certificate_values: Vec, + hashed_blobs: Vec, mut notifications: Option<&mut Vec>, ) -> Result { let (response, actions) = self - .handle_certificate(certificate, hashed_certificate_values, None) + .handle_certificate(certificate, hashed_certificate_values, hashed_blobs, None) .await?; if let Some(notifications) = notifications.as_mut() { notifications.extend(actions.notifications); @@ -600,6 +626,7 @@ where &mut self, certificate: Certificate, hashed_certificate_values: &[HashedCertificateValue], + hashed_blobs: &[HashedBlob], notify_when_messages_are_delivered: Option>, ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> { let CertificateValue::ConfirmedBlock { executed_block, .. } = certificate.value() else { @@ -668,14 +695,27 @@ where .await?; // Persist certificate and hashed certificate values. for value in hashed_certificate_values { - self.cache_recent_value(Cow::Borrowed(value)).await; + self.cache_recent_hashed_certificate_value(Cow::Borrowed(value)) + .await; } - let (result_hashed_certificate_value, result_certificate) = tokio::join!( + + let pending_blobs = &chain.manager.get().pending_blobs; + let blob_ids_in_block = block.blob_ids(); + self.check_no_missing_blob(blob_ids_in_block.clone(), hashed_blobs, pending_blobs) + .await?; + for hashed_blob in hashed_blobs { + self.cache_recent_blob(Cow::Borrowed(hashed_blob)).await; + } + + let blobs_in_block = self.get_blobs(blob_ids_in_block, pending_blobs).await?; + let (result_hashed_certificate_value, result_blobs, result_certificate) = tokio::join!( self.storage .write_hashed_certificate_values(hashed_certificate_values), + self.storage.write_hashed_blobs(&blobs_in_block), self.storage.write_certificate(&certificate) ); result_hashed_certificate_value?; + result_blobs?; result_certificate?; // Execute the block and update inboxes. chain.remove_events_from_inboxes(block).await?; @@ -726,7 +766,8 @@ where notify_when_messages_are_delivered, ) .await; - self.cache_recent_value(Cow::Owned(certificate.value)).await; + self.cache_recent_hashed_certificate_value(Cow::Owned(certificate.value)) + .await; #[cfg(with_metrics)] NUM_BLOCKS.with_label_values(&[]).inc(); @@ -734,6 +775,55 @@ where Ok((info, actions)) } + /// Returns an error if the block requires a blob we don't have, or if unrelated blobs were provided. + async fn check_no_missing_blob( + &self, + mut required_blob_ids: HashSet, + hashed_blobs: &[HashedBlob], + pending_blobs: &BTreeMap, + ) -> Result<(), WorkerError> { + // Find all certificates containing blobs used when executing this block. + for hashed_blob in hashed_blobs { + let blob_id = hashed_blob.id(); + ensure!( + required_blob_ids.remove(&blob_id), + WorkerError::UnneededBlob { blob_id } + ); + } + + let recent_blobs = self.recent_hashed_blobs.lock().await; + let missing_blob_ids = required_blob_ids + .into_iter() + .filter(|blob_id| { + !recent_blobs.contains(blob_id) && !pending_blobs.contains_key(blob_id) + }) + .map(|blob_id| blob_id.to_owned()) + .collect::>(); + if missing_blob_ids.is_empty() { + Ok(()) + } else { + Err(WorkerError::BlobsNotFound(missing_blob_ids)) + } + } + + async fn get_blobs( + &self, + blob_ids: HashSet, + pending_blobs: &BTreeMap, + ) -> Result, WorkerError> { + let mut blobs = Vec::new(); + let mut recent_blobs = self.recent_hashed_blobs.lock().await; + for blob_id in blob_ids { + if let Some(blob) = recent_blobs.get(&blob_id) { + blobs.push(blob.clone()); + } else if let Some(blob) = pending_blobs.get(&blob_id) { + blobs.push(blob.clone()); + } + } + + Ok(blobs) + } + /// Returns an error if the block requires bytecode we don't have, or if unrelated bytecode /// hashed certificate values were provided. async fn check_no_missing_bytecode( @@ -758,7 +848,7 @@ where .iter() .map(|hashed_certificate_value| hashed_certificate_value.hash()) .collect(); - let recent_values = self.recent_values.lock().await; + let recent_values = self.recent_hashed_certificate_values.lock().await; let tasks = required_locations .into_keys() .filter(|location| { @@ -943,22 +1033,40 @@ where Ok(Some(last_updated_height)) } - pub async fn cache_recent_value<'a>(&mut self, value: Cow<'a, HashedCertificateValue>) -> bool { + pub async fn cache_recent_hashed_certificate_value<'a>( + &mut self, + value: Cow<'a, HashedCertificateValue>, + ) -> bool { let hash = value.hash(); - let mut recent_values = self.recent_values.lock().await; - if recent_values.contains(&hash) { + let mut recent_hashed_certificate_values = + self.recent_hashed_certificate_values.lock().await; + if recent_hashed_certificate_values.contains(&hash) { return false; } // Cache the certificate so that clients don't have to send the value again. - recent_values.push(hash, value.into_owned()); + recent_hashed_certificate_values.push(hash, value.into_owned()); + true + } + + pub async fn cache_recent_blob<'a>(&mut self, hashed_blob: Cow<'a, HashedBlob>) -> bool { + let mut recent_blobs = self.recent_hashed_blobs.lock().await; + if recent_blobs.contains(&hashed_blob.id()) { + return false; + } + // Cache the blob so that clients don't have to send it again. + recent_blobs.push(hashed_blob.id(), hashed_blob.into_owned()); true } /// Caches the validated block and the corresponding confirmed block. async fn cache_validated(&mut self, value: &HashedCertificateValue) { - if self.cache_recent_value(Cow::Borrowed(value)).await { + if self + .cache_recent_hashed_certificate_value(Cow::Borrowed(value)) + .await + { if let Some(value) = value.validated_to_confirmed() { - self.cache_recent_value(Cow::Owned(value)).await; + self.cache_recent_hashed_certificate_value(Cow::Owned(value)) + .await; } } } @@ -1088,6 +1196,7 @@ where content: BlockAndRound { block, round }, owner, hashed_certificate_values, + hashed_blobs, validated, signature: _, } = &proposal; @@ -1130,6 +1239,13 @@ where // Verify that all required bytecode hashed certificate values are available, and no unrelated ones provided. self.check_no_missing_bytecode(block, hashed_certificate_values) .await?; + + self.check_no_missing_blob( + block.blob_ids(), + hashed_blobs, + &chain.manager.get().pending_blobs, + ) + .await?; // Write the values so that the bytecode is available during execution. self.storage .write_hashed_certificate_values(hashed_certificate_values) @@ -1193,8 +1309,13 @@ where notify_when_messages_are_delivered: Option>, ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> { let full_cert = self.full_certificate(certificate).await?; - self.handle_certificate(full_cert, vec![], notify_when_messages_are_delivered) - .await + self.handle_certificate( + full_cert, + vec![], + vec![], + notify_when_messages_are_delivered, + ) + .await } /// Processes a certificate. @@ -1207,6 +1328,7 @@ where &mut self, certificate: Certificate, hashed_certificate_values: Vec, + hashed_blobs: Vec, notify_when_messages_are_delivered: Option>, ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> { trace!("{} <-- {:?}", self.nickname, certificate); @@ -1249,6 +1371,7 @@ where self.process_confirmed_block( certificate, &hashed_certificate_values, + &hashed_blobs, notify_when_messages_are_delivered, ) .await? @@ -1371,6 +1494,9 @@ where info.requested_hashed_certificate_value = Some(self.storage.read_hashed_certificate_value(hash).await?); } + if let Some(blob_id) = query.request_blob { + info.requested_blob = Some(self.storage.read_hashed_blob(blob_id).await?); + } if query.request_manager_values { info.manager.add_values(chain.manager.get()); } diff --git a/linera-execution/src/system.rs b/linera-execution/src/system.rs index 0cc87707bd23..2aad44f1fc6e 100644 --- a/linera-execution/src/system.rs +++ b/linera-execution/src/system.rs @@ -14,7 +14,7 @@ use linera_base::{ crypto::{CryptoHash, PublicKey}, data_types::{Amount, ApplicationPermissions, ArithmeticError, Timestamp}, ensure, hex_debug, - identifiers::{Account, BytecodeId, ChainDescription, ChainId, MessageId, Owner}, + identifiers::{Account, BlobId, BytecodeId, ChainDescription, ChainId, MessageId, Owner}, ownership::{ChainOwnership, TimeoutConfig}, }; use linera_views::{ @@ -158,6 +158,8 @@ pub enum SystemOperation { contract: Bytecode, service: Bytecode, }, + /// Publishes a new blob + PublishBlob { blob_id: BlobId }, /// Creates a new application. CreateApplication { bytecode_id: BytecodeId, @@ -670,6 +672,7 @@ where }; outcome.messages.push(message); } + PublishBlob { .. } => (), } Ok((outcome, new_application)) diff --git a/linera-rpc/proto/rpc.proto b/linera-rpc/proto/rpc.proto index 9c9b3f8125b5..990be00f2418 100644 --- a/linera-rpc/proto/rpc.proto +++ b/linera-rpc/proto/rpc.proto @@ -147,6 +147,9 @@ message ChainInfoQuery { // Request a signed vote for fallback mode. bool request_fallback = 11; + + // Query a value that contains a binary blob (e.g. bytecode) required by this chain. + optional bytes request_blob = 12; } // An authenticated proposal for a new block. @@ -168,6 +171,9 @@ message BlockProposal { // A certificate for a validated block that justifies the proposal in this round. optional bytes validated = 6; + + // Required blob + bytes blobs = 7; } // A certified statement from the committee, without the value. @@ -210,6 +216,9 @@ message Certificate { // Wait until all outgoing cross-chain messages from this certificate have // been received by the target chains. bool wait_for_outgoing_messages = 6; + + // Blobs required by this certificate + bytes blobs = 7; } message ChainId { diff --git a/linera-rpc/src/client.rs b/linera-rpc/src/client.rs index 20bc69d6942d..ade53319ac96 100644 --- a/linera-rpc/src/client.rs +++ b/linera-rpc/src/client.rs @@ -1,7 +1,7 @@ // Copyright (c) Zefchain Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use linera_base::identifiers::ChainId; +use linera_base::{data_types::HashedBlob, identifiers::ChainId}; use linera_chain::data_types::{ BlockProposal, Certificate, HashedCertificateValue, LiteCertificate, }; @@ -80,19 +80,30 @@ impl ValidatorNode for Client { &mut self, certificate: Certificate, hashed_certificate_values: Vec, + hashed_blobs: Vec, delivery: CrossChainMessageDelivery, ) -> Result { match self { Client::Grpc(grpc_client) => { grpc_client - .handle_certificate(certificate, hashed_certificate_values, delivery) + .handle_certificate( + certificate, + hashed_certificate_values, + hashed_blobs, + delivery, + ) .await } #[cfg(with_simple_network)] Client::Simple(simple_client) => { simple_client - .handle_certificate(certificate, hashed_certificate_values, delivery) + .handle_certificate( + certificate, + hashed_certificate_values, + hashed_blobs, + delivery, + ) .await } } diff --git a/linera-rpc/src/grpc/client.rs b/linera-rpc/src/grpc/client.rs index b2d77de40a7a..2072b249a21d 100644 --- a/linera-rpc/src/grpc/client.rs +++ b/linera-rpc/src/grpc/client.rs @@ -4,7 +4,7 @@ use std::{iter, time::Duration}; use futures::{future, stream, StreamExt}; -use linera_base::identifiers::ChainId; +use linera_base::{data_types::HashedBlob, identifiers::ChainId}; use linera_chain::data_types; #[cfg(web)] use linera_core::node::{ @@ -162,12 +162,14 @@ impl ValidatorNode for GrpcClient { &mut self, certificate: data_types::Certificate, hashed_certificate_values: Vec, + hashed_blobs: Vec, delivery: CrossChainMessageDelivery, ) -> Result { let wait_for_outgoing_messages = delivery.wait_for_outgoing_messages(); let request = HandleCertificateRequest { certificate, hashed_certificate_values, + hashed_blobs, wait_for_outgoing_messages, }; client_delegate!(self, handle_certificate, request) diff --git a/linera-rpc/src/grpc/conversions.rs b/linera-rpc/src/grpc/conversions.rs index 86d138e0bb3f..17889a4de620 100644 --- a/linera-rpc/src/grpc/conversions.rs +++ b/linera-rpc/src/grpc/conversions.rs @@ -173,6 +173,7 @@ impl TryFrom for api::BlockProposal { hashed_certificate_values: bincode::serialize( &block_proposal.hashed_certificate_values, )?, + blobs: bincode::serialize(&block_proposal.hashed_blobs)?, validated: block_proposal .validated .map(|cert| bincode::serialize(&cert)) @@ -197,6 +198,7 @@ impl TryFrom for BlockProposal { hashed_certificate_values: bincode::deserialize( &block_proposal.hashed_certificate_values, )?, + hashed_blobs: bincode::deserialize(&block_proposal.blobs)?, validated: block_proposal .validated .map(|bytes| bincode::deserialize(&bytes)) @@ -310,11 +312,13 @@ impl TryFrom for HandleCertificateRequest { ); let signatures = bincode::deserialize(&cert_request.signatures)?; let values = bincode::deserialize(&cert_request.hashed_certificate_values)?; + let blobs = bincode::deserialize(&cert_request.blobs)?; let round = bincode::deserialize(&cert_request.round)?; Ok(HandleCertificateRequest { certificate: Certificate::new(value, round, signatures), wait_for_outgoing_messages: cert_request.wait_for_outgoing_messages, hashed_certificate_values: values, + hashed_blobs: blobs, }) } } @@ -329,6 +333,7 @@ impl TryFrom for api::Certificate { round: bincode::serialize(&request.certificate.round)?, signatures: bincode::serialize(request.certificate.signatures())?, hashed_certificate_values: bincode::serialize(&request.hashed_certificate_values)?, + blobs: bincode::serialize(&request.hashed_blobs)?, wait_for_outgoing_messages: request.wait_for_outgoing_messages, }) } @@ -346,6 +351,10 @@ impl TryFrom for ChainInfoQuery { .request_hashed_certificate_value .map(|bytes| bincode::deserialize(&bytes)) .transpose()?; + let request_blob = chain_info_query + .request_blob + .map(|bytes| bincode::deserialize(&bytes)) + .transpose()?; Ok(Self { request_committees: chain_info_query.request_committees, @@ -363,6 +372,7 @@ impl TryFrom for ChainInfoQuery { request_leader_timeout: chain_info_query.request_leader_timeout, request_fallback: chain_info_query.request_fallback, request_hashed_certificate_value, + request_blob, }) } } @@ -379,6 +389,10 @@ impl TryFrom for api::ChainInfoQuery { .request_hashed_certificate_value .map(|hash| bincode::serialize(&hash)) .transpose()?; + let request_blob = chain_info_query + .request_blob + .map(|blob_id| bincode::serialize(&blob_id)) + .transpose()?; Ok(Self { chain_id: Some(chain_info_query.chain_id.into()), @@ -393,6 +407,7 @@ impl TryFrom for api::ChainInfoQuery { request_leader_timeout: chain_info_query.request_leader_timeout, request_fallback: chain_info_query.request_fallback, request_hashed_certificate_value, + request_blob, }) } } @@ -613,6 +628,7 @@ pub mod tests { count_received_log: 0, requested_received_log: vec![], requested_hashed_certificate_value: None, + requested_blob: None, }); let chain_info_response_none = ChainInfoResponse { @@ -650,6 +666,7 @@ pub mod tests { request_leader_timeout: false, request_fallback: true, request_hashed_certificate_value: None, + request_blob: None, }; round_trip_check::<_, api::ChainInfoQuery>(chain_info_query_some); } @@ -703,6 +720,7 @@ pub mod tests { let request = HandleCertificateRequest { certificate, hashed_certificate_values: values, + hashed_blobs: vec![], wait_for_outgoing_messages: false, }; @@ -749,6 +767,7 @@ pub mod tests { } .with(get_block()), )], + hashed_blobs: vec![], validated: Some(Certificate::new( HashedCertificateValue::new_validated( BlockExecutionOutcome { diff --git a/linera-rpc/src/grpc/server.rs b/linera-rpc/src/grpc/server.rs index 2e0deb3f0783..dbe52331adb6 100644 --- a/linera-rpc/src/grpc/server.rs +++ b/linera-rpc/src/grpc/server.rs @@ -516,6 +516,7 @@ where let HandleCertificateRequest { certificate, hashed_certificate_values, + hashed_blobs, wait_for_outgoing_messages, } = request.into_inner().try_into()?; debug!(?certificate, "Handling certificate"); @@ -523,7 +524,7 @@ where match self .state .clone() - .handle_certificate(certificate, hashed_certificate_values, sender) + .handle_certificate(certificate, hashed_certificate_values, hashed_blobs, sender) .await { Ok((info, actions)) => { diff --git a/linera-rpc/src/lib.rs b/linera-rpc/src/lib.rs index ff351d5be167..11d0a3cf38eb 100644 --- a/linera-rpc/src/lib.rs +++ b/linera-rpc/src/lib.rs @@ -36,6 +36,7 @@ pub struct HandleCertificateRequest { pub certificate: linera_chain::data_types::Certificate, pub wait_for_outgoing_messages: bool, pub hashed_certificate_values: Vec, + pub hashed_blobs: Vec, } pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("file_descriptor_set"); diff --git a/linera-rpc/src/simple/client.rs b/linera-rpc/src/simple/client.rs index 7d29fbfe5e0d..b2640fc5596d 100644 --- a/linera-rpc/src/simple/client.rs +++ b/linera-rpc/src/simple/client.rs @@ -6,7 +6,7 @@ use std::{future::Future, time::Duration}; use async_trait::async_trait; use futures::{sink::SinkExt, stream::StreamExt}; -use linera_base::identifiers::ChainId; +use linera_base::{data_types::HashedBlob, identifiers::ChainId}; use linera_chain::data_types::{ BlockProposal, Certificate, HashedCertificateValue, LiteCertificate, }; @@ -100,12 +100,14 @@ impl ValidatorNode for SimpleClient { &mut self, certificate: Certificate, hashed_certificate_values: Vec, + hashed_blobs: Vec, delivery: CrossChainMessageDelivery, ) -> Result { let wait_for_outgoing_messages = delivery.wait_for_outgoing_messages(); let request = HandleCertificateRequest { certificate, hashed_certificate_values, + hashed_blobs, wait_for_outgoing_messages, }; self.query(request.into()).await diff --git a/linera-rpc/src/simple/server.rs b/linera-rpc/src/simple/server.rs index 4e7e458192ee..1c17eb421831 100644 --- a/linera-rpc/src/simple/server.rs +++ b/linera-rpc/src/simple/server.rs @@ -237,6 +237,7 @@ where .handle_certificate( request.certificate, request.hashed_certificate_values, + request.hashed_blobs, sender, ) .await diff --git a/linera-rpc/tests/snapshots/format__format.yaml.snap b/linera-rpc/tests/snapshots/format__format.yaml.snap index 2382e9f1b10d..51be736df742 100644 --- a/linera-rpc/tests/snapshots/format__format.yaml.snap +++ b/linera-rpc/tests/snapshots/format__format.yaml.snap @@ -40,6 +40,12 @@ ApplicationPermissions: - close_chain: SEQ: TYPENAME: ApplicationId +Blob: + STRUCT: + - bytes: BYTES +BlobId: + NEWTYPESTRUCT: + TYPENAME: CryptoHash Block: STRUCT: - chain_id: @@ -99,6 +105,9 @@ BlockProposal: - hashed_certificate_values: SEQ: TYPENAME: CertificateValue + - hashed_blobs: + SEQ: + TYPENAME: Blob - validated: OPTION: TYPENAME: Certificate @@ -211,6 +220,9 @@ ChainInfo: - requested_hashed_certificate_value: OPTION: TYPENAME: CertificateValue + - requested_blob: + OPTION: + TYPENAME: Blob ChainInfoQuery: STRUCT: - chain_id: @@ -234,6 +246,9 @@ ChainInfoQuery: - request_hashed_certificate_value: OPTION: TYPENAME: CryptoHash + - request_blob: + OPTION: + TYPENAME: BlobId ChainInfoResponse: STRUCT: - info: @@ -409,6 +424,9 @@ HandleCertificateRequest: - hashed_certificate_values: SEQ: TYPENAME: CertificateValue + - hashed_blobs: + SEQ: + TYPENAME: Blob HandleLiteCertRequest: STRUCT: - certificate: @@ -548,40 +566,45 @@ NodeError: SEQ: TYPENAME: BytecodeLocation 8: - MissingCertificateValue: UNIT + BlobsNotFound: + NEWTYPE: + SEQ: + TYPENAME: BlobId 9: - MissingVoteInValidatorResponse: UNIT + MissingCertificateValue: UNIT 10: + MissingVoteInValidatorResponse: UNIT + 11: InactiveLocalChain: NEWTYPE: TYPENAME: ChainId - 11: - InvalidChainInfoResponse: UNIT 12: - InvalidDecoding: UNIT + InvalidChainInfoResponse: UNIT 13: - UnexpectedMessage: UNIT + InvalidDecoding: UNIT 14: + UnexpectedMessage: UNIT + 15: GrpcError: STRUCT: - error: STR - 15: + 16: ClientIoError: STRUCT: - error: STR - 16: + 17: CannotResolveValidatorAddress: STRUCT: - address: STR - 17: + 18: SubscriptionError: STRUCT: - transport: STR - 18: + 19: SubscriptionFailed: STRUCT: - status: STR - 19: + 20: LocalNodeQuery: STRUCT: - error: STR @@ -912,6 +935,11 @@ SystemOperation: - service: TYPENAME: Bytecode 9: + PublishBlob: + STRUCT: + - blob_id: + TYPENAME: BlobId + 10: CreateApplication: STRUCT: - bytecode_id: @@ -921,14 +949,14 @@ SystemOperation: - required_application_ids: SEQ: TYPENAME: ApplicationId - 10: + 11: RequestApplication: STRUCT: - chain_id: TYPENAME: ChainId - application_id: TYPENAME: ApplicationId - 11: + 12: Admin: NEWTYPE: TYPENAME: AdminOperation diff --git a/linera-sdk/src/test/chain.rs b/linera-sdk/src/test/chain.rs index abfb8bdadd0b..8a14df5014fd 100644 --- a/linera-sdk/src/test/chain.rs +++ b/linera-sdk/src/test/chain.rs @@ -93,7 +93,7 @@ impl ActiveChain { self.validator .worker() .await - .fully_handle_certificate(certificate.clone(), vec![]) + .fully_handle_certificate(certificate.clone(), vec![], vec![]) .await .expect("Rejected certificate"); diff --git a/linera-service-graphql-client/gql/service_schema.graphql b/linera-service-graphql-client/gql/service_schema.graphql index 9c2a52cd0af2..02f388e57635 100644 --- a/linera-service-graphql-client/gql/service_schema.graphql +++ b/linera-service-graphql-client/gql/service_schema.graphql @@ -19,6 +19,16 @@ type ApplicationOverview { link: String! } +""" +A blob of binary data. +""" +scalar Blob + +""" +A content-addressed blob ID i.e. the hash of the Blob +""" +scalar BlobId + """ A block containing operations to apply on a given chain, as well as the acknowledgment of a number of incoming messages from other chains. @@ -604,6 +614,10 @@ type MutationRoot { """ publishBytecode(chainId: ChainId!, contract: Bytecode!, service: Bytecode!): BytecodeId! """ + Publishes a new blob. + """ + publishBlob(chainId: ChainId!, blob: Blob!): BlobId! + """ Creates a new application. """ createApplication(chainId: ChainId!, bytecodeId: BytecodeId!, parameters: String!, instantiationArgument: String!, requiredApplicationIds: [ApplicationId!]!): ApplicationId! diff --git a/linera-service/src/linera/client_context.rs b/linera-service/src/linera/client_context.rs index c7aaf81162d2..1807b489dbc1 100644 --- a/linera-service/src/linera/client_context.rs +++ b/linera-service/src/linera/client_context.rs @@ -12,8 +12,8 @@ use colored::Colorize; use futures::{lock::OwnedMutexGuard, Future}; use linera_base::{ crypto::{CryptoRng, KeyPair}, - data_types::{BlockHeight, Timestamp}, - identifiers::{Account, BytecodeId, ChainId}, + data_types::{BlockHeight, HashedBlob, Timestamp}, + identifiers::{Account, BlobId, BytecodeId, ChainId}, ownership::ChainOwnership, }; use linera_chain::data_types::Certificate; @@ -405,6 +405,37 @@ impl ClientContext { Ok(bytecode_id) } + pub async fn publish_blob( + &mut self, + chain_client: &ArcChainClient, + blob_path: PathBuf, + ) -> anyhow::Result + where + S: Storage + Clone + Send + Sync + 'static, + ViewError: From, + { + info!("Loading blob file"); + let blob = HashedBlob::load_from_file(&blob_path) + .await + .context(format!("failed to load blob from {:?}", &blob_path))?; + let blob_id = blob.id(); + + info!("Publishing blob"); + self.apply_client_command(chain_client, |mut chain_client| { + let blob = blob.clone(); + async move { + chain_client + .publish_blob(blob) + .await + .context("Failed to publish blob") + } + }) + .await?; + + info!("{}", "Blob published successfully!".green().bold()); + Ok(blob_id) + } + pub fn generate_key_pair(&mut self) -> KeyPair { KeyPair::generate_from(&mut self.prng) } @@ -713,6 +744,7 @@ impl ClientContext { }, key_pair, vec![], + vec![], None, ); proposals.push(proposal.into()); @@ -842,7 +874,9 @@ impl ClientContext { // Second replay the certificates locally. for certificate in certificates { // No required certificates from other chains: This is only used with benchmark. - node.handle_certificate(certificate, vec![]).await.unwrap(); + node.handle_certificate(certificate, vec![], vec![]) + .await + .unwrap(); } // Last update the wallet. for chain in self.wallet_mut().chains_mut() { diff --git a/linera-service/src/linera/client_options.rs b/linera-service/src/linera/client_options.rs index 447949b3ba7e..151d706c67f2 100644 --- a/linera-service/src/linera/client_options.rs +++ b/linera-service/src/linera/client_options.rs @@ -585,6 +585,15 @@ pub enum ClientCommand { publisher: Option, }, + /// Publish a blob of binary data. + PublishBlob { + /// Path to blob file to be published. + blob_path: PathBuf, + /// An optional chain ID to publish the blob. The default chain of the wallet + /// is used otherwise. + publisher: Option, + }, + /// Create an application. CreateApplication { /// The bytecode ID of the application to create. diff --git a/linera-service/src/linera/main.rs b/linera-service/src/linera/main.rs index d6f5a053f4b7..d83b6c70bf3a 100644 --- a/linera-service/src/linera/main.rs +++ b/linera-service/src/linera/main.rs @@ -702,6 +702,7 @@ impl Runnable for Job { HandleCertificateRequest { certificate: certificate.clone(), hashed_certificate_values: vec![], + hashed_blobs: vec![], wait_for_outgoing_messages: true, } .into() @@ -803,6 +804,20 @@ impl Runnable for Job { info!("Time elapsed: {} ms", start_time.elapsed().as_millis()); } + PublishBlob { + blob_path, + publisher, + } => { + let start_time = Instant::now(); + let publisher = publisher.unwrap_or_else(|| context.default_chain()); + info!("Publishing blob on chain {}", publisher); + let chain_client = context.make_chain_client(storage, publisher).into_arc(); + let blob_id = context.publish_blob(&chain_client, blob_path).await?; + println!("{}", blob_id); + info!("{}", "Blob published successfully!".green().bold()); + info!("Time elapsed: {} ms", start_time.elapsed().as_millis()); + } + CreateApplication { bytecode_id, creator, diff --git a/linera-service/src/node_service.rs b/linera-service/src/node_service.rs index e0dd2a444d9c..2a35d74dc378 100644 --- a/linera-service/src/node_service.rs +++ b/linera-service/src/node_service.rs @@ -18,8 +18,8 @@ use futures::{ }; use linera_base::{ crypto::{CryptoError, CryptoHash, PublicKey}, - data_types::{Amount, ApplicationPermissions, TimeDelta, Timestamp}, - identifiers::{ApplicationId, BytecodeId, ChainId, Owner}, + data_types::{Amount, ApplicationPermissions, Blob, TimeDelta, Timestamp}, + identifiers::{ApplicationId, BlobId, BytecodeId, ChainId, Owner}, ownership::{ChainOwnership, TimeoutConfig}, BcsHexParseError, }; @@ -622,6 +622,22 @@ where .await } + /// Publishes a new blob. + async fn publish_blob(&self, chain_id: ChainId, blob: Blob) -> Result { + self.apply_client_command(&chain_id, move |mut client| { + let blob = blob.clone(); + async move { + let result = client + .publish_blob(blob.into()) + .await + .map_err(Error::from) + .map(|outcome| outcome.map(|(blob_id, _)| blob_id)); + (result, client) + } + }) + .await + } + /// Creates a new application. async fn create_application( &self, diff --git a/linera-service/src/schema_export.rs b/linera-service/src/schema_export.rs index 32139098e0eb..fe173ed133da 100644 --- a/linera-service/src/schema_export.rs +++ b/linera-service/src/schema_export.rs @@ -2,7 +2,11 @@ // SPDX-License-Identifier: Apache-2.0 use async_trait::async_trait; -use linera_base::{crypto::KeyPair, data_types::Timestamp, identifiers::ChainId}; +use linera_base::{ + crypto::KeyPair, + data_types::{HashedBlob, Timestamp}, + identifiers::ChainId, +}; use linera_chain::data_types::{ BlockProposal, Certificate, HashedCertificateValue, LiteCertificate, }; @@ -52,6 +56,7 @@ impl ValidatorNode for DummyValidatorNode { &mut self, _: Certificate, _: Vec, + _: Vec, _delivery: CrossChainMessageDelivery, ) -> Result { Err(NodeError::UnexpectedMessage) diff --git a/linera-storage/src/db_storage.rs b/linera-storage/src/db_storage.rs index de60252265a2..d5f0b4d0e5f0 100644 --- a/linera-storage/src/db_storage.rs +++ b/linera-storage/src/db_storage.rs @@ -7,8 +7,8 @@ use async_trait::async_trait; use dashmap::DashMap; use linera_base::{ crypto::CryptoHash, - data_types::{TimeDelta, Timestamp}, - identifiers::{Blob, BlobId, ChainId}, + data_types::{Blob, HashedBlob, TimeDelta, Timestamp}, + identifiers::{BlobId, ChainId}, }; use linera_chain::{ data_types::{Certificate, CertificateValue, HashedCertificateValue, LiteCertificate}, @@ -428,12 +428,13 @@ where Ok(value.with_hash_unchecked(hash)) } - async fn read_blob(&self, blob_id: BlobId) -> Result { + async fn read_hashed_blob(&self, blob_id: BlobId) -> Result { let blob_key = bcs::to_bytes(&BaseKey::BlobId(blob_id))?; - let maybe_value = self.client.client.read_value::(&blob_key).await?; + let maybe_blob = self.client.client.read_value::(&blob_key).await?; #[cfg(with_metrics)] READ_BLOB_COUNTER.with_label_values(&[]).inc(); - Ok(maybe_value.ok_or_else(|| ViewError::not_found("value for blob id", blob_id))?) + let blob = maybe_blob.ok_or_else(|| ViewError::not_found("value for blob ID", blob_id))?; + Ok(blob.with_hash_unchecked(blob_id)) } async fn read_hashed_certificate_values_downward( @@ -466,11 +467,11 @@ where self.write_batch(batch).await } - async fn write_blob(&self, blob: &Blob) -> Result { + async fn write_hashed_blob(&self, blob: &HashedBlob) -> Result<(), ViewError> { let mut batch = Batch::new(); - let blob_id = self.add_blob_to_batch(blob, &mut batch)?; + self.add_blob_to_batch(&blob.id(), blob, &mut batch)?; self.write_batch(batch).await?; - Ok(blob_id) + Ok(()) } async fn write_hashed_certificate_values( @@ -484,10 +485,10 @@ where self.write_batch(batch).await } - async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError> { + async fn write_hashed_blobs(&self, blobs: &[HashedBlob]) -> Result<(), ViewError> { let mut batch = Batch::new(); for blob in blobs { - self.add_blob_to_batch(blob, &mut batch)?; + self.add_blob_to_batch(&blob.id(), blob, &mut batch)?; } self.write_batch(batch).await } @@ -566,13 +567,17 @@ where Ok(()) } - fn add_blob_to_batch(&self, blob: &Blob, batch: &mut Batch) -> Result { + fn add_blob_to_batch( + &self, + blob_id: &BlobId, + blob: &HashedBlob, + batch: &mut Batch, + ) -> Result<(), ViewError> { #[cfg(with_metrics)] WRITE_BLOB_COUNTER.with_label_values(&[]).inc(); - let blob_id = BlobId(CryptoHash::new(blob)); - let blob_key = bcs::to_bytes(&BaseKey::BlobId(blob_id))?; + let blob_key = bcs::to_bytes(&BaseKey::BlobId(*blob_id))?; batch.put_key_value(blob_key.to_vec(), blob)?; - Ok(blob_id) + Ok(()) } fn add_certificate_to_batch( diff --git a/linera-storage/src/lib.rs b/linera-storage/src/lib.rs index e0cde72b28e4..3b61f1d80cdf 100644 --- a/linera-storage/src/lib.rs +++ b/linera-storage/src/lib.rs @@ -23,8 +23,8 @@ use dashmap::{mapref::entry::Entry, DashMap}; use futures::future; use linera_base::{ crypto::{CryptoHash, PublicKey}, - data_types::{Amount, BlockHeight, Timestamp}, - identifiers::{Blob, BlobId, ChainDescription, ChainId, GenericApplicationId}, + data_types::{Amount, BlockHeight, HashedBlob, Timestamp}, + identifiers::{BlobId, ChainDescription, ChainId, GenericApplicationId}, ownership::ChainOwnership, }; use linera_chain::{ @@ -101,7 +101,7 @@ pub trait Storage: Sized { ) -> Result; /// Reads the blob with the given blob id. - async fn read_blob(&self, blob_id: BlobId) -> Result; + async fn read_hashed_blob(&self, blob_id: BlobId) -> Result; /// Reads the hashed certificate values in descending order from the given hash. async fn read_hashed_certificate_values_downward( @@ -117,7 +117,7 @@ pub trait Storage: Sized { ) -> Result<(), ViewError>; /// Writes the given blob. - async fn write_blob(&self, blob: &Blob) -> Result; + async fn write_hashed_blob(&self, blob: &HashedBlob) -> Result<(), ViewError>; /// Writes several hashed certificate values async fn write_hashed_certificate_values( @@ -126,7 +126,7 @@ pub trait Storage: Sized { ) -> Result<(), ViewError>; /// Writes several blobs - async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError>; + async fn write_hashed_blobs(&self, blobs: &[HashedBlob]) -> Result<(), ViewError>; /// Tests existence of the certificate with the given hash. async fn contains_certificate(&self, hash: CryptoHash) -> Result;