Skip to content

Commit

Permalink
Track the certified block that published a blob last
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-ds committed Jun 18, 2024
1 parent af93f1d commit 4d03523
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 12 deletions.
9 changes: 8 additions & 1 deletion linera-base/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use serde::{Deserialize, Deserializer, Serialize};
use thiserror::Error;

use crate::{
crypto::BcsHashable,
crypto::{BcsHashable, CryptoHash},
doc_scalar,
identifiers::{ApplicationId, BlobId, Destination, GenericApplicationId},
time::{Duration, SystemTime},
Expand Down Expand Up @@ -800,6 +800,13 @@ pub struct HashedBlob {
blob: Blob,
}

/// The state of a blob of binary data.
#[derive(Eq, PartialEq, Debug, Hash, Clone, Serialize, Deserialize)]
pub struct BlobState {
/// Hash of the last `Certificate` that used this blob.
pub last_used_by: CryptoHash,
}

impl HashedBlob {
/// Loads a hashed blob from a file.
pub async fn load_from_file(path: impl AsRef<std::path::Path>) -> std::io::Result<Self> {
Expand Down
4 changes: 3 additions & 1 deletion linera-core/src/chain_worker/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,10 +521,12 @@ where
}

let blobs_in_block = self.get_blobs(block.blob_ids()).await?;
let certificate_hash = certificate.hash();
let (result_hashed_certificate_value, result_blobs, result_certificate) = tokio::join!(
self.storage
.write_hashed_certificate_values(hashed_certificate_values),
self.storage.write_hashed_blobs(&blobs_in_block),
self.storage
.write_hashed_blobs(&blobs_in_block, &certificate_hash),
self.storage.write_certificate(&certificate)
);
result_hashed_certificate_value?;
Expand Down
63 changes: 58 additions & 5 deletions linera-storage/src/db_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use async_trait::async_trait;
use dashmap::DashMap;
use linera_base::{
crypto::CryptoHash,
data_types::{Blob, HashedBlob, TimeDelta, Timestamp},
data_types::{Blob, BlobState, HashedBlob, TimeDelta, Timestamp},
identifiers::{BlobId, ChainId},
};
use linera_chain::{
Expand Down Expand Up @@ -97,6 +97,18 @@ pub static READ_BLOB_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
.expect("Counter creation should not fail")
});

/// The metric counting how often a blob state is read from storage.
#[cfg(with_metrics)]
#[doc(hidden)]
pub static READ_BLOB_STATE_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
prometheus_util::register_int_counter_vec(
"read_blob_state",
"The metric counting how often a blob state is read from storage",
&[],
)
.expect("Counter creation should not fail")
});

/// The metric counting how often a hashed certificate value is written to storage.
#[cfg(with_metrics)]
#[doc(hidden)]
Expand Down Expand Up @@ -237,6 +249,7 @@ enum BaseKey {
Certificate(CryptoHash),
Value(CryptoHash),
BlobId(BlobId),
BlobStateId(BlobId),
}

/// A clock that can be used to get the current `Timestamp`.
Expand Down Expand Up @@ -442,6 +455,20 @@ where
Ok(blob.with_hash_unchecked(blob_id))
}

async fn read_blob_state(&self, blob_id: BlobId) -> Result<BlobState, ViewError> {
let blob_state_key = bcs::to_bytes(&BaseKey::BlobStateId(blob_id))?;
let maybe_blob_state = self
.client
.client
.read_value::<BlobState>(&blob_state_key)
.await?;
#[cfg(with_metrics)]
READ_BLOB_STATE_COUNTER.with_label_values(&[]).inc();
let blob_state = maybe_blob_state
.ok_or_else(|| ViewError::not_found("blob state for blob ID", blob_id))?;
Ok(blob_state)
}

async fn read_hashed_certificate_values_downward(
&self,
from: CryptoHash,
Expand Down Expand Up @@ -472,9 +499,13 @@ where
self.write_batch(batch).await
}

async fn write_hashed_blob(&self, blob: &HashedBlob) -> Result<(), ViewError> {
async fn write_hashed_blob(
&self,
blob: &HashedBlob,
last_used_by: &CryptoHash,
) -> Result<(), ViewError> {
let mut batch = Batch::new();
self.add_blob_to_batch(&blob.id(), blob, &mut batch)?;
self.add_blob_to_batch(&blob.id(), blob, last_used_by, &mut batch)?;
self.write_batch(batch).await?;
Ok(())
}
Expand All @@ -490,10 +521,14 @@ where
self.write_batch(batch).await
}

async fn write_hashed_blobs(&self, blobs: &[HashedBlob]) -> Result<(), ViewError> {
async fn write_hashed_blobs(
&self,
blobs: &[HashedBlob],
last_used_by: &CryptoHash,
) -> Result<(), ViewError> {
let mut batch = Batch::new();
for blob in blobs {
self.add_blob_to_batch(&blob.id(), blob, &mut batch)?;
self.add_blob_to_batch(&blob.id(), blob, last_used_by, &mut batch)?;
}
self.write_batch(batch).await
}
Expand Down Expand Up @@ -576,12 +611,30 @@ where
&self,
blob_id: &BlobId,
blob: &HashedBlob,
last_used_by: &CryptoHash,
batch: &mut Batch,
) -> Result<(), ViewError> {
#[cfg(with_metrics)]
WRITE_BLOB_COUNTER.with_label_values(&[]).inc();
let blob_key = bcs::to_bytes(&BaseKey::BlobId(*blob_id))?;
batch.put_key_value(blob_key.to_vec(), blob)?;
self.add_blob_state_to_batch(blob_id, last_used_by, batch)?;
Ok(())
}

fn add_blob_state_to_batch(
&self,
blob_id: &BlobId,
last_used_by: &CryptoHash,
batch: &mut Batch,
) -> Result<(), ViewError> {
let blob_state_key = bcs::to_bytes(&BaseKey::BlobStateId(*blob_id))?;
batch.put_key_value(
blob_state_key.to_vec(),
&BlobState {
last_used_by: *last_used_by,
},
)?;
Ok(())
}

Expand Down
21 changes: 16 additions & 5 deletions linera-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use dashmap::{mapref::entry::Entry, DashMap};
use futures::future;
use linera_base::{
crypto::{CryptoHash, PublicKey},
data_types::{Amount, BlockHeight, HashedBlob, Timestamp},
data_types::{Amount, BlobState, BlockHeight, HashedBlob, Timestamp},
identifiers::{BlobId, ChainDescription, ChainId, GenericApplicationId},
ownership::ChainOwnership,
};
Expand Down Expand Up @@ -105,6 +105,9 @@ pub trait Storage: Sized {
/// Reads the blob with the given blob ID.
async fn read_hashed_blob(&self, blob_id: BlobId) -> Result<HashedBlob, ViewError>;

/// Reads the blob state with the given blob ID.
async fn read_blob_state(&self, blob_id: BlobId) -> Result<BlobState, ViewError>;

/// Reads the hashed certificate values in descending order from the given hash.
async fn read_hashed_certificate_values_downward(
&self,
Expand All @@ -119,16 +122,24 @@ pub trait Storage: Sized {
) -> Result<(), ViewError>;

/// Writes the given blob.
async fn write_hashed_blob(&self, blob: &HashedBlob) -> Result<(), ViewError>;
async fn write_hashed_blob(
&self,
blob: &HashedBlob,
last_used_by: &CryptoHash,
) -> Result<(), ViewError>;

/// Writes several hashed certificate values
/// Writes several hashed certificate values.
async fn write_hashed_certificate_values(
&self,
values: &[HashedCertificateValue],
) -> Result<(), ViewError>;

/// Writes several blobs
async fn write_hashed_blobs(&self, blobs: &[HashedBlob]) -> Result<(), ViewError>;
/// Writes several blobs.
async fn write_hashed_blobs(
&self,
blobs: &[HashedBlob],
last_used_by: &CryptoHash,
) -> Result<(), ViewError>;

/// Tests existence of the certificate with the given hash.
async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError>;
Expand Down

0 comments on commit 4d03523

Please sign in to comment.