From 6825d071bcd42778462b9b1003756ec9985187b7 Mon Sep 17 00:00:00 2001 From: Andre da Silva Date: Tue, 18 Jun 2024 17:23:30 +0200 Subject: [PATCH] Track the certified block that published a blob last --- linera-base/src/data_types.rs | 9 +++- linera-core/src/chain_worker/state.rs | 4 +- linera-storage/src/db_storage.rs | 63 ++++++++++++++++++++++++--- linera-storage/src/lib.rs | 21 ++++++--- 4 files changed, 85 insertions(+), 12 deletions(-) diff --git a/linera-base/src/data_types.rs b/linera-base/src/data_types.rs index c6a11fc27e65..e93e456069a9 100644 --- a/linera-base/src/data_types.rs +++ b/linera-base/src/data_types.rs @@ -14,7 +14,7 @@ use serde::{Deserialize, Deserializer, Serialize}; use thiserror::Error; use crate::{ - crypto::BcsHashable, + crypto::{BcsHashable, CryptoHash}, doc_scalar, identifiers::{ApplicationId, BlobId, Destination, GenericApplicationId}, time::{Duration, SystemTime}, @@ -800,6 +800,13 @@ pub struct HashedBlob { blob: Blob, } +/// The state of a blob of binary data. +#[derive(Eq, PartialEq, Debug, Hash, Clone, Serialize, Deserialize)] +pub struct BlobState { + /// Hash of the last `Certificate` that published or used this blob. + pub last_used_by: CryptoHash, +} + impl HashedBlob { /// Loads a hashed blob from a file. pub async fn load_from_file(path: impl AsRef) -> std::io::Result { diff --git a/linera-core/src/chain_worker/state.rs b/linera-core/src/chain_worker/state.rs index 2b52dd1269d4..c0020c44afba 100644 --- a/linera-core/src/chain_worker/state.rs +++ b/linera-core/src/chain_worker/state.rs @@ -521,10 +521,12 @@ where } let blobs_in_block = self.get_blobs(block.blob_ids()).await?; + let certificate_hash = certificate.hash(); let (result_hashed_certificate_value, result_blobs, result_certificate) = tokio::join!( self.storage .write_hashed_certificate_values(hashed_certificate_values), - self.storage.write_hashed_blobs(&blobs_in_block), + self.storage + .write_hashed_blobs(&blobs_in_block, &certificate_hash), self.storage.write_certificate(&certificate) ); result_hashed_certificate_value?; diff --git a/linera-storage/src/db_storage.rs b/linera-storage/src/db_storage.rs index db80f2188a86..95a231398a7b 100644 --- a/linera-storage/src/db_storage.rs +++ b/linera-storage/src/db_storage.rs @@ -7,7 +7,7 @@ use async_trait::async_trait; use dashmap::DashMap; use linera_base::{ crypto::CryptoHash, - data_types::{Blob, HashedBlob, TimeDelta, Timestamp}, + data_types::{Blob, BlobState, HashedBlob, TimeDelta, Timestamp}, identifiers::{BlobId, ChainId}, }; use linera_chain::{ @@ -97,6 +97,18 @@ pub static READ_BLOB_COUNTER: Lazy = Lazy::new(|| { .expect("Counter creation should not fail") }); +/// The metric counting how often a blob state is read from storage. +#[cfg(with_metrics)] +#[doc(hidden)] +pub static READ_BLOB_STATE_COUNTER: Lazy = Lazy::new(|| { + prometheus_util::register_int_counter_vec( + "read_blob_state", + "The metric counting how often a blob state is read from storage", + &[], + ) + .expect("Counter creation should not fail") +}); + /// The metric counting how often a hashed certificate value is written to storage. #[cfg(with_metrics)] #[doc(hidden)] @@ -237,6 +249,7 @@ enum BaseKey { Certificate(CryptoHash), Value(CryptoHash), BlobId(BlobId), + BlobStateId(BlobId), } /// A clock that can be used to get the current `Timestamp`. @@ -442,6 +455,20 @@ where Ok(blob.with_hash_unchecked(blob_id)) } + async fn read_blob_state(&self, blob_id: BlobId) -> Result { + let blob_state_key = bcs::to_bytes(&BaseKey::BlobStateId(blob_id))?; + let maybe_blob_state = self + .client + .client + .read_value::(&blob_state_key) + .await?; + #[cfg(with_metrics)] + READ_BLOB_STATE_COUNTER.with_label_values(&[]).inc(); + let blob_state = maybe_blob_state + .ok_or_else(|| ViewError::not_found("blob state for blob ID", blob_id))?; + Ok(blob_state) + } + async fn read_hashed_certificate_values_downward( &self, from: CryptoHash, @@ -472,9 +499,13 @@ where self.write_batch(batch).await } - async fn write_hashed_blob(&self, blob: &HashedBlob) -> Result<(), ViewError> { + async fn write_hashed_blob( + &self, + blob: &HashedBlob, + last_used_by: &CryptoHash, + ) -> Result<(), ViewError> { let mut batch = Batch::new(); - self.add_blob_to_batch(&blob.id(), blob, &mut batch)?; + self.add_blob_to_batch(&blob.id(), blob, last_used_by, &mut batch)?; self.write_batch(batch).await?; Ok(()) } @@ -490,10 +521,14 @@ where self.write_batch(batch).await } - async fn write_hashed_blobs(&self, blobs: &[HashedBlob]) -> Result<(), ViewError> { + async fn write_hashed_blobs( + &self, + blobs: &[HashedBlob], + last_used_by: &CryptoHash, + ) -> Result<(), ViewError> { let mut batch = Batch::new(); for blob in blobs { - self.add_blob_to_batch(&blob.id(), blob, &mut batch)?; + self.add_blob_to_batch(&blob.id(), blob, last_used_by, &mut batch)?; } self.write_batch(batch).await } @@ -576,12 +611,30 @@ where &self, blob_id: &BlobId, blob: &HashedBlob, + last_used_by: &CryptoHash, batch: &mut Batch, ) -> Result<(), ViewError> { #[cfg(with_metrics)] WRITE_BLOB_COUNTER.with_label_values(&[]).inc(); let blob_key = bcs::to_bytes(&BaseKey::BlobId(*blob_id))?; batch.put_key_value(blob_key.to_vec(), blob)?; + self.add_blob_state_to_batch(blob_id, last_used_by, batch)?; + Ok(()) + } + + fn add_blob_state_to_batch( + &self, + blob_id: &BlobId, + last_used_by: &CryptoHash, + batch: &mut Batch, + ) -> Result<(), ViewError> { + let blob_state_key = bcs::to_bytes(&BaseKey::BlobStateId(*blob_id))?; + batch.put_key_value( + blob_state_key.to_vec(), + &BlobState { + last_used_by: *last_used_by, + }, + )?; Ok(()) } diff --git a/linera-storage/src/lib.rs b/linera-storage/src/lib.rs index 192716276325..d09338c38ed3 100644 --- a/linera-storage/src/lib.rs +++ b/linera-storage/src/lib.rs @@ -25,7 +25,7 @@ use dashmap::{mapref::entry::Entry, DashMap}; use futures::future; use linera_base::{ crypto::{CryptoHash, PublicKey}, - data_types::{Amount, BlockHeight, HashedBlob, Timestamp}, + data_types::{Amount, BlobState, BlockHeight, HashedBlob, Timestamp}, identifiers::{BlobId, ChainDescription, ChainId, GenericApplicationId}, ownership::ChainOwnership, }; @@ -105,6 +105,9 @@ pub trait Storage: Sized { /// Reads the blob with the given blob ID. async fn read_hashed_blob(&self, blob_id: BlobId) -> Result; + /// Reads the blob state with the given blob ID. + async fn read_blob_state(&self, blob_id: BlobId) -> Result; + /// Reads the hashed certificate values in descending order from the given hash. async fn read_hashed_certificate_values_downward( &self, @@ -119,16 +122,24 @@ pub trait Storage: Sized { ) -> Result<(), ViewError>; /// Writes the given blob. - async fn write_hashed_blob(&self, blob: &HashedBlob) -> Result<(), ViewError>; + async fn write_hashed_blob( + &self, + blob: &HashedBlob, + last_used_by: &CryptoHash, + ) -> Result<(), ViewError>; - /// Writes several hashed certificate values + /// Writes several hashed certificate values. async fn write_hashed_certificate_values( &self, values: &[HashedCertificateValue], ) -> Result<(), ViewError>; - /// Writes several blobs - async fn write_hashed_blobs(&self, blobs: &[HashedBlob]) -> Result<(), ViewError>; + /// Writes several blobs. + async fn write_hashed_blobs( + &self, + blobs: &[HashedBlob], + last_used_by: &CryptoHash, + ) -> Result<(), ViewError>; /// Tests existence of the certificate with the given hash. async fn contains_certificate(&self, hash: CryptoHash) -> Result;