Skip to content

Commit

Permalink
Look for blobs in the expected places (#2751)
Browse files Browse the repository at this point in the history
## Motivation

When checking for blobs and when getting blobs, we should be trying to get them only from the places where they're expected to be

## Proposal

Look for blobs in the expected places

## Test Plan

CI

## Release Plan

- Nothing to do / These changes follow the usual release cycle.
  • Loading branch information
Andre da Silva authored Oct 31, 2024
1 parent 3d1a93f commit 0b45e43
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 86 deletions.
20 changes: 13 additions & 7 deletions linera-core/src/chain_worker/state/attempted_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,16 @@ where
.recent_hashed_certificate_values
.insert(Cow::Borrowed(&certificate.value))
.await;
// Verify that all required bytecode hashed certificate values and blobs are available, and no
// unrelated ones provided.
let required_blob_ids = executed_block.required_blob_ids();
// Verify that no unrelated blobs were provided.
self.state
.check_no_missing_blobs(executed_block.required_blob_ids(), blobs)
.check_for_unneeded_blobs(&required_blob_ids, blobs)
.await?;
for blob in blobs {
self.state.cache_recent_blob(Cow::Borrowed(blob)).await;
}
self.state
.check_no_missing_blobs(&required_blob_ids)
.await?;
let old_round = self.state.chain.manager.get().current_round;
self.state.chain.manager.get_mut().create_final_vote(
Expand Down Expand Up @@ -292,16 +298,16 @@ where
);

let required_blob_ids = executed_block.required_blob_ids();
// Verify that all required bytecode hashed certificate values and blobs are available, and no
// unrelated ones provided.
// Verify that no unrelated blobs were provided.
self.state
.check_no_missing_blobs(required_blob_ids.clone(), blobs)
.check_for_unneeded_blobs(&required_blob_ids, blobs)
.await?;

for blob in blobs {
self.state.cache_recent_blob(Cow::Borrowed(blob)).await;
}

let blobs_in_block = self.state.get_blobs(required_blob_ids.clone()).await?;
let blobs_in_block = self.state.get_blobs(&required_blob_ids).await?;
let certificate_hash = certificate.hash();

self.state
Expand Down
90 changes: 54 additions & 36 deletions linera-core/src/chain_worker/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,70 +308,88 @@ where
Ok(())
}

/// Returns an error if the block requires a blob we don't have, or if unrelated blobs were provided.
/// Returns an error if the block requires a blob we don't have.
/// Looks for the blob in: chain manager's pending blobs, recent blobs cache
/// and storage.
async fn check_no_missing_blobs(
&self,
blobs_in_block: HashSet<BlobId>,
blobs: &[Blob],
required_blob_ids: &HashSet<BlobId>,
) -> Result<(), WorkerError> {
let missing_blobs = self.get_missing_blobs(blobs_in_block, blobs).await?;
let pending_blobs = &self.chain.manager.get().pending_blobs;
let missing_blob_ids = self
.recent_blobs
.subtract_cached_items_from::<_, Vec<_>>(required_blob_ids, |id| id)
.await
.into_iter()
.filter(|blob_id| !pending_blobs.contains_key(blob_id))
.cloned()
.collect::<Vec<_>>();

let missing_blob_ids = self
.storage
.missing_blobs(missing_blob_ids.as_slice())
.await?;

if missing_blobs.is_empty() {
if missing_blob_ids.is_empty() {
return Ok(());
}

Err(WorkerError::BlobsNotFound(missing_blobs))
Err(WorkerError::BlobsNotFound(missing_blob_ids))
}

/// Returns the blobs required by the block that we don't have, or an error if unrelated blobs were provided.
async fn get_missing_blobs(
/// Returns an error if unrelated blobs were provided.
async fn check_for_unneeded_blobs(
&self,
mut required_blob_ids: HashSet<BlobId>,
required_blob_ids: &HashSet<BlobId>,
blobs: &[Blob],
) -> Result<Vec<BlobId>, WorkerError> {
) -> Result<(), WorkerError> {
// Find all certificates containing blobs used when executing this block.
for blob in blobs {
let blob_id = blob.id();
ensure!(
required_blob_ids.remove(&blob_id),
required_blob_ids.contains(&blob_id),
WorkerError::UnneededBlob { blob_id }
);
}

let pending_blobs = &self.chain.manager.get().pending_blobs;
let blob_ids = self
.recent_blobs
.subtract_cached_items_from::<_, Vec<_>>(required_blob_ids, |id| id)
.await
.into_iter()
.filter(|blob_id| !pending_blobs.contains_key(blob_id))
.collect::<Vec<_>>();
Ok(self.storage.missing_blobs(blob_ids).await?)
Ok(())
}

/// Returns the blobs requested by their `blob_ids` that are either in pending in the
/// chain, in the `recent_blobs` cache or in storage.
async fn get_blobs(&self, blob_ids: HashSet<BlobId>) -> Result<Vec<Blob>, WorkerError> {
let pending_blobs = &self.chain.manager.get().pending_blobs;
/// Returns the blobs requested by their `blob_ids` that are either in the `recent_blobs`
/// cache or in storage.
async fn get_blobs(&self, blob_ids: &HashSet<BlobId>) -> Result<Vec<Blob>, WorkerError> {
let (found_blobs, not_found_blobs): (HashMap<BlobId, Blob>, HashSet<BlobId>) =
self.recent_blobs.try_get_many(blob_ids).await;
self.recent_blobs.try_get_many(blob_ids.clone()).await;

let mut blobs = found_blobs.into_values().collect::<Vec<_>>();
let mut missing_blobs = Vec::new();
let mut found_blobs = found_blobs.into_values().collect::<Vec<_>>();
let mut missing_blob_ids = Vec::new();
for blob_id in not_found_blobs {
if let Some(blob) = pending_blobs.get(&blob_id) {
blobs.push(blob.clone());
} else if let Ok(blob) = self.storage.read_blob(blob_id).await {
blobs.push(blob);
} else {
missing_blobs.push(blob_id);
if let Ok(blob) = self.storage.read_blob(blob_id).await {
found_blobs.push(blob);
continue;
}

missing_blob_ids.push(blob_id);
}

if missing_blob_ids.is_empty() {
Ok(found_blobs)
} else {
Err(WorkerError::BlobsNotFound(missing_blob_ids))
}
}

/// Returns the blobs requested by their `blob_ids` that are in the `recent_blobs` cache.
async fn get_cached_blobs(&self, blob_ids: HashSet<BlobId>) -> Result<Vec<Blob>, WorkerError> {
let (found_blobs, missing_blob_ids): (HashMap<BlobId, Blob>, HashSet<BlobId>) =
self.recent_blobs.try_get_many(blob_ids).await;

if missing_blobs.is_empty() {
Ok(blobs)
if missing_blob_ids.is_empty() {
Ok(found_blobs.into_values().collect())
} else {
Err(WorkerError::BlobsNotFound(missing_blobs))
Err(WorkerError::BlobsNotFound(
missing_blob_ids.into_iter().collect(),
))
}
}

Expand Down
5 changes: 3 additions & 2 deletions linera-core/src/chain_worker/state/temporary_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,14 +210,15 @@ where
// unrelated ones provided.
let published_blob_ids = block.published_blob_ids();
self.0
.check_no_missing_blobs(published_blob_ids.clone(), blobs)
.check_for_unneeded_blobs(&published_blob_ids, blobs)
.await?;
for blob in blobs {
Self::check_blob_size(blob.content(), &policy)?;
self.0.cache_recent_blob(Cow::Borrowed(blob)).await;
}

let checked_blobs = blobs.iter().map(|blob| blob.id()).collect::<BTreeSet<_>>();
for blob in self.0.get_blobs(published_blob_ids).await? {
for blob in self.0.get_cached_blobs(published_blob_ids).await? {
if !checked_blobs.contains(&blob.id()) {
Self::check_blob_size(blob.content(), &policy)?;
}
Expand Down
38 changes: 5 additions & 33 deletions linera-storage/src/db_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,6 @@ static CONTAINS_HASHED_CERTIFICATE_VALUE_COUNTER: LazyLock<IntCounterVec> = Lazy
.expect("Counter creation should not fail")
});

/// The metric counting how often hashed certificate values are tested for existence from storage.
#[cfg(with_metrics)]
static CONTAINS_HASHED_CERTIFICATE_VALUES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
prometheus_util::register_int_counter_vec(
"contains_hashed_certificate_values",
"The metric counting how often hashed certificate values are tested for existence from storage",
&[],
)
.expect("Counter creation should not fail")
});

/// The metric counting how often a blob is tested for existence from storage
#[cfg(with_metrics)]
static CONTAINS_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
Expand Down Expand Up @@ -406,23 +395,6 @@ where
Ok(test)
}

async fn contains_hashed_certificate_values(
&self,
hashes: Vec<CryptoHash>,
) -> Result<Vec<bool>, ViewError> {
let mut keys = Vec::new();
for hash in hashes {
let value_key = bcs::to_bytes(&BaseKey::CertificateValue(hash))?;
keys.push(value_key);
}
let test = self.store.contains_keys(keys).await?;
#[cfg(with_metrics)]
CONTAINS_HASHED_CERTIFICATE_VALUES_COUNTER
.with_label_values(&[])
.inc();
Ok(test)
}

async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
let blob_key = bcs::to_bytes(&BaseKey::Blob(blob_id))?;
let test = self.store.contains_key(&blob_key).await?;
Expand All @@ -431,17 +403,17 @@ where
Ok(test)
}

async fn missing_blobs(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, ViewError> {
async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError> {
let mut keys = Vec::new();
for blob_id in blob_ids.clone() {
let key = bcs::to_bytes(&BaseKey::Blob(blob_id))?;
for blob_id in blob_ids {
let key = bcs::to_bytes(&BaseKey::Blob(*blob_id))?;
keys.push(key);
}
let results = self.store.contains_keys(keys).await?;
let mut missing_blobs = Vec::new();
for (blob_id, result) in blob_ids.into_iter().zip(results) {
for (blob_id, result) in blob_ids.iter().zip(results) {
if !result {
missing_blobs.push(blob_id);
missing_blobs.push(*blob_id);
}
}
#[cfg(with_metrics)]
Expand Down
10 changes: 2 additions & 8 deletions linera-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,11 @@ pub trait Storage: Sized {
/// Tests existence of a hashed certificate value with the given hash.
async fn contains_hashed_certificate_value(&self, hash: CryptoHash) -> Result<bool, ViewError>;

/// Tests existence of hashed certificate values with given hashes.
async fn contains_hashed_certificate_values(
&self,
hash: Vec<CryptoHash>,
) -> Result<Vec<bool>, ViewError>;

/// Tests the existence of a blob with the given blob ID.
async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError>;

/// Lists the missing blobs from storage.
async fn missing_blobs(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, ViewError>;
/// Returns what blobs from the input are missing from storage.
async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError>;

/// Tests existence of a blob state with the given blob ID.
async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError>;
Expand Down

0 comments on commit 0b45e43

Please sign in to comment.