Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track the certified block that published a blob last #2156

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion linera-base/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use serde::{Deserialize, Deserializer, Serialize};
use thiserror::Error;

use crate::{
crypto::BcsHashable,
crypto::{BcsHashable, CryptoHash},
doc_scalar,
identifiers::{ApplicationId, BlobId, Destination, GenericApplicationId},
time::{Duration, SystemTime},
Expand Down Expand Up @@ -800,6 +800,13 @@ pub struct HashedBlob {
blob: Blob,
}

/// The state of a blob of binary data.
#[derive(Eq, PartialEq, Debug, Hash, Clone, Serialize, Deserialize)]
pub struct BlobState {
/// Hash of the last `Certificate` that published or used this blob.
pub last_used_by: CryptoHash,
}

impl HashedBlob {
/// Loads a hashed blob from a file.
pub async fn load_from_file(path: impl AsRef<std::path::Path>) -> std::io::Result<Self> {
Expand Down
4 changes: 3 additions & 1 deletion linera-core/src/chain_worker/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,10 +521,12 @@ where
}

let blobs_in_block = self.get_blobs(block.blob_ids()).await?;
let certificate_hash = certificate.hash();
let (result_hashed_certificate_value, result_blobs, result_certificate) = tokio::join!(
self.storage
.write_hashed_certificate_values(hashed_certificate_values),
self.storage.write_hashed_blobs(&blobs_in_block),
self.storage
.write_hashed_blobs(&blobs_in_block, &certificate_hash),
self.storage.write_certificate(&certificate)
);
result_hashed_certificate_value?;
Expand Down
63 changes: 58 additions & 5 deletions linera-storage/src/db_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use async_trait::async_trait;
use dashmap::DashMap;
use linera_base::{
crypto::CryptoHash,
data_types::{Blob, HashedBlob, TimeDelta, Timestamp},
data_types::{Blob, BlobState, HashedBlob, TimeDelta, Timestamp},
identifiers::{BlobId, ChainId},
};
use linera_chain::{
Expand Down Expand Up @@ -97,6 +97,18 @@ pub static READ_BLOB_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
.expect("Counter creation should not fail")
});

/// The metric counting how often a blob state is read from storage.
#[cfg(with_metrics)]
#[doc(hidden)]
pub static READ_BLOB_STATE_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
prometheus_util::register_int_counter_vec(
"read_blob_state",
"The metric counting how often a blob state is read from storage",
&[],
)
.expect("Counter creation should not fail")
});

/// The metric counting how often a hashed certificate value is written to storage.
#[cfg(with_metrics)]
#[doc(hidden)]
Expand Down Expand Up @@ -237,6 +249,7 @@ enum BaseKey {
Certificate(CryptoHash),
Value(CryptoHash),
BlobId(BlobId),
BlobStateId(BlobId),
}

/// A clock that can be used to get the current `Timestamp`.
Expand Down Expand Up @@ -442,6 +455,20 @@ where
Ok(blob.with_hash_unchecked(blob_id))
}

async fn read_blob_state(&self, blob_id: BlobId) -> Result<BlobState, ViewError> {
let blob_state_key = bcs::to_bytes(&BaseKey::BlobStateId(blob_id))?;
let maybe_blob_state = self
.client
.client
.read_value::<BlobState>(&blob_state_key)
.await?;
#[cfg(with_metrics)]
READ_BLOB_STATE_COUNTER.with_label_values(&[]).inc();
let blob_state = maybe_blob_state
.ok_or_else(|| ViewError::not_found("blob state for blob ID", blob_id))?;
Ok(blob_state)
}

async fn read_hashed_certificate_values_downward(
&self,
from: CryptoHash,
Expand Down Expand Up @@ -472,9 +499,13 @@ where
self.write_batch(batch).await
}

async fn write_hashed_blob(&self, blob: &HashedBlob) -> Result<(), ViewError> {
async fn write_hashed_blob(
&self,
blob: &HashedBlob,
last_used_by: &CryptoHash,
) -> Result<(), ViewError> {
let mut batch = Batch::new();
self.add_blob_to_batch(&blob.id(), blob, &mut batch)?;
self.add_blob_to_batch(&blob.id(), blob, last_used_by, &mut batch)?;
self.write_batch(batch).await?;
Ok(())
}
Expand All @@ -490,10 +521,14 @@ where
self.write_batch(batch).await
}

async fn write_hashed_blobs(&self, blobs: &[HashedBlob]) -> Result<(), ViewError> {
async fn write_hashed_blobs(
&self,
blobs: &[HashedBlob],
last_used_by: &CryptoHash,
) -> Result<(), ViewError> {
let mut batch = Batch::new();
for blob in blobs {
self.add_blob_to_batch(&blob.id(), blob, &mut batch)?;
self.add_blob_to_batch(&blob.id(), blob, last_used_by, &mut batch)?;
}
self.write_batch(batch).await
}
Expand Down Expand Up @@ -576,12 +611,30 @@ where
&self,
blob_id: &BlobId,
blob: &HashedBlob,
last_used_by: &CryptoHash,
batch: &mut Batch,
) -> Result<(), ViewError> {
#[cfg(with_metrics)]
WRITE_BLOB_COUNTER.with_label_values(&[]).inc();
let blob_key = bcs::to_bytes(&BaseKey::BlobId(*blob_id))?;
batch.put_key_value(blob_key.to_vec(), blob)?;
self.add_blob_state_to_batch(blob_id, last_used_by, batch)?;
Ok(())
}

fn add_blob_state_to_batch(
&self,
blob_id: &BlobId,
last_used_by: &CryptoHash,
batch: &mut Batch,
) -> Result<(), ViewError> {
let blob_state_key = bcs::to_bytes(&BaseKey::BlobStateId(*blob_id))?;
batch.put_key_value(
blob_state_key.to_vec(),
&BlobState {
last_used_by: *last_used_by,
},
)?;
Ok(())
}

Expand Down
21 changes: 16 additions & 5 deletions linera-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use dashmap::{mapref::entry::Entry, DashMap};
use futures::future;
use linera_base::{
crypto::{CryptoHash, PublicKey},
data_types::{Amount, BlockHeight, HashedBlob, Timestamp},
data_types::{Amount, BlobState, BlockHeight, HashedBlob, Timestamp},
identifiers::{BlobId, ChainDescription, ChainId, GenericApplicationId},
ownership::ChainOwnership,
};
Expand Down Expand Up @@ -105,6 +105,9 @@ pub trait Storage: Sized {
/// Reads the blob with the given blob ID.
async fn read_hashed_blob(&self, blob_id: BlobId) -> Result<HashedBlob, ViewError>;

/// Reads the blob state with the given blob ID.
async fn read_blob_state(&self, blob_id: BlobId) -> Result<BlobState, ViewError>;

/// Reads the hashed certificate values in descending order from the given hash.
async fn read_hashed_certificate_values_downward(
&self,
Expand All @@ -119,16 +122,24 @@ pub trait Storage: Sized {
) -> Result<(), ViewError>;

/// Writes the given blob.
async fn write_hashed_blob(&self, blob: &HashedBlob) -> Result<(), ViewError>;
async fn write_hashed_blob(
&self,
blob: &HashedBlob,
last_used_by: &CryptoHash,
) -> Result<(), ViewError>;

/// Writes several hashed certificate values
/// Writes several hashed certificate values.
async fn write_hashed_certificate_values(
&self,
values: &[HashedCertificateValue],
) -> Result<(), ViewError>;

/// Writes several blobs
async fn write_hashed_blobs(&self, blobs: &[HashedBlob]) -> Result<(), ViewError>;
/// Writes several blobs.
async fn write_hashed_blobs(
&self,
blobs: &[HashedBlob],
last_used_by: &CryptoHash,
) -> Result<(), ViewError>;

/// Tests existence of the certificate with the given hash.
async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError>;
Expand Down
Loading