Skip to content

Commit

Permalink
Remove blobs cache
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-ds committed Nov 1, 2024
1 parent 22415cf commit 5119ff6
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 148 deletions.
4 changes: 1 addition & 3 deletions linera-core/src/chain_worker/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{
use linera_base::{
crypto::CryptoHash,
data_types::{Blob, BlockHeight, Timestamp, UserApplicationDescription},
identifiers::{BlobId, ChainId, UserApplicationId},
identifiers::{ChainId, UserApplicationId},
};
use linera_chain::{
data_types::{
Expand Down Expand Up @@ -147,7 +147,6 @@ where
config: ChainWorkerConfig,
storage: StorageClient,
certificate_value_cache: Arc<ValueCache<CryptoHash, HashedCertificateValue>>,
blob_cache: Arc<ValueCache<BlobId, Blob>>,
tracked_chains: Option<Arc<RwLock<HashSet<ChainId>>>>,
delivery_notifier: DeliveryNotifier,
chain_id: ChainId,
Expand All @@ -165,7 +164,6 @@ where
config,
storage,
certificate_value_cache,
blob_cache,
tracked_chains,
delivery_notifier,
chain_id,
Expand Down
20 changes: 11 additions & 9 deletions linera-core/src/chain_worker/state/attempted_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,12 @@ where
self.state
.check_for_unneeded_blobs(&required_blob_ids, blobs)
.await?;
for blob in blobs {
self.state.cache_recent_blob(Cow::Borrowed(blob)).await;
}
let remaining_required_blob_ids = required_blob_ids
.difference(&blobs.iter().map(|blob| blob.id()).collect())
.cloned()
.collect();
self.state
.check_no_missing_blobs(&required_blob_ids)
.check_no_missing_blobs(&remaining_required_blob_ids)
.await?;
let old_round = self.state.chain.manager.get().current_round;
self.state.chain.manager.get_mut().create_final_vote(
Expand Down Expand Up @@ -302,12 +303,13 @@ where
self.state
.check_for_unneeded_blobs(&required_blob_ids, blobs)
.await?;
let remaining_required_blob_ids = required_blob_ids
.difference(&blobs.iter().map(|blob| blob.id()).collect())
.cloned()
.collect();
let mut blobs_in_block = self.state.get_blobs(&remaining_required_blob_ids).await?;
blobs_in_block.extend_from_slice(blobs);

for blob in blobs {
self.state.cache_recent_blob(Cow::Borrowed(blob)).await;
}

let blobs_in_block = self.state.get_blobs(&required_blob_ids).await?;
let certificate_hash = certificate.hash();

self.state
Expand Down
49 changes: 11 additions & 38 deletions linera-core/src/chain_worker/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ mod attempted_changes;
mod temporary_changes;

use std::{
borrow::Cow,
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
sync::{self, Arc},
};
Expand Down Expand Up @@ -56,7 +55,6 @@ where
shared_chain_view: Option<Arc<RwLock<ChainStateView<StorageClient::Context>>>>,
service_runtime_endpoint: Option<ServiceRuntimeEndpoint>,
recent_hashed_certificate_values: Arc<ValueCache<CryptoHash, HashedCertificateValue>>,
recent_blobs: Arc<ValueCache<BlobId, Blob>>,
tracked_chains: Option<Arc<sync::RwLock<HashSet<ChainId>>>>,
delivery_notifier: DeliveryNotifier,
knows_chain_is_active: bool,
Expand All @@ -72,7 +70,6 @@ where
config: ChainWorkerConfig,
storage: StorageClient,
certificate_value_cache: Arc<ValueCache<CryptoHash, HashedCertificateValue>>,
blob_cache: Arc<ValueCache<BlobId, Blob>>,
tracked_chains: Option<Arc<sync::RwLock<HashSet<ChainId>>>>,
delivery_notifier: DeliveryNotifier,
chain_id: ChainId,
Expand All @@ -87,7 +84,6 @@ where
shared_chain_view: None,
service_runtime_endpoint,
recent_hashed_certificate_values: certificate_value_cache,
recent_blobs: blob_cache,
tracked_chains,
delivery_notifier,
knows_chain_is_active: false,
Expand Down Expand Up @@ -316,11 +312,8 @@ where
required_blob_ids: &HashSet<BlobId>,
) -> Result<(), WorkerError> {
let pending_blobs = &self.chain.manager.get().pending_blobs;
let missing_blob_ids = self
.recent_blobs
.subtract_cached_items_from::<_, Vec<_>>(required_blob_ids, |id| id)
.await
.into_iter()
let missing_blob_ids = required_blob_ids
.iter()
.filter(|blob_id| !pending_blobs.contains_key(blob_id))
.cloned()
.collect::<Vec<_>>();
Expand Down Expand Up @@ -355,21 +348,20 @@ where
Ok(())
}

/// Returns the blobs requested by their `blob_ids` that are either in the `recent_blobs`
/// cache or in storage.
/// Returns the blobs requested by their `blob_ids` that are in storage.
async fn get_blobs(&self, blob_ids: &HashSet<BlobId>) -> Result<Vec<Blob>, WorkerError> {
let (found_blobs, not_found_blobs): (HashMap<BlobId, Blob>, HashSet<BlobId>) =
self.recent_blobs.try_get_many(blob_ids.clone()).await;
let pending_blobs = &self.chain.manager.get().pending_blobs;

let mut found_blobs = found_blobs.into_values().collect::<Vec<_>>();
let mut found_blobs = Vec::new();
let mut missing_blob_ids = Vec::new();
for blob_id in not_found_blobs {
if let Ok(blob) = self.storage.read_blob(blob_id).await {
for blob_id in blob_ids {
if let Some(blob) = pending_blobs.get(blob_id) {
found_blobs.push(blob.clone());
} else if let Ok(blob) = self.storage.read_blob(*blob_id).await {
found_blobs.push(blob);
continue;
} else {
missing_blob_ids.push(*blob_id);
}

missing_blob_ids.push(blob_id);
}

if missing_blob_ids.is_empty() {
Expand All @@ -379,25 +371,6 @@ where
}
}

/// Returns the blobs requested by their `blob_ids` that are in the `recent_blobs` cache.
async fn get_cached_blobs(&self, blob_ids: HashSet<BlobId>) -> Result<Vec<Blob>, WorkerError> {
let (found_blobs, missing_blob_ids): (HashMap<BlobId, Blob>, HashSet<BlobId>) =
self.recent_blobs.try_get_many(blob_ids).await;

if missing_blob_ids.is_empty() {
Ok(found_blobs.into_values().collect())
} else {
Err(WorkerError::BlobsNotFound(
missing_blob_ids.into_iter().collect(),
))
}
}

/// Inserts a [`Blob`] into the worker's cache.
async fn cache_recent_blob<'a>(&mut self, blob: Cow<'a, Blob>) -> bool {
self.recent_blobs.insert(blob).await
}

/// Adds any newly created chains to the set of `tracked_chains`, if the parent chain is
/// also tracked.
///
Expand Down
21 changes: 9 additions & 12 deletions linera-core/src/chain_worker/state/temporary_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

//! Operations that don't persist any changes to the chain state.

use std::{borrow::Cow, collections::BTreeSet};

use linera_base::{
data_types::{ArithmeticError, BlobContent, Timestamp, UserApplicationDescription},
ensure,
Expand Down Expand Up @@ -206,22 +204,21 @@ where
// legitimately required.
// Actual execution happens below, after other validity checks.
self.0.chain.remove_bundles_from_inboxes(block).await?;
// Verify that all required bytecode hashed certificate values and blobs are available, and no
// unrelated ones provided.
// Verify that no unrelated blobs were provided.
let published_blob_ids = block.published_blob_ids();
self.0
.check_for_unneeded_blobs(&published_blob_ids, blobs)
.await?;
let missing_published_blob_ids = published_blob_ids
.difference(&blobs.iter().map(|blob| blob.id()).collect())
.cloned()
.collect::<Vec<_>>();
ensure!(
missing_published_blob_ids.is_empty(),
WorkerError::BlobsNotFound(missing_published_blob_ids)
);
for blob in blobs {
Self::check_blob_size(blob.content(), &policy)?;
self.0.cache_recent_blob(Cow::Borrowed(blob)).await;
}

let checked_blobs = blobs.iter().map(|blob| blob.id()).collect::<BTreeSet<_>>();
for blob in self.0.get_cached_blobs(published_blob_ids).await? {
if !checked_blobs.contains(&blob.id()) {
Self::check_blob_size(blob.content(), &policy)?;
}
}

let local_time = self.0.storage.clock().current_time();
Expand Down
18 changes: 7 additions & 11 deletions linera-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -959,17 +959,21 @@ where
proposal: Box<BlockProposal>,
value: HashedCertificateValue,
) -> Result<Certificate, ChainClientError> {
let blob_ids = value
let required_blob_ids = value
.inner()
.executed_block()
.expect("The result of executing a proposal is always an executed block")
.outcome
.required_blob_ids();
let submit_action = CommunicateAction::SubmitBlock { proposal, blob_ids };
let proposed_blobs = proposal.blobs.clone();
let submit_action = CommunicateAction::SubmitBlock {
proposal,
blob_ids: required_blob_ids,
};
let certificate = self
.communicate_chain_action(committee, submit_action, value)
.await?;
self.process_certificate(certificate.clone(), vec![])
self.process_certificate(certificate.clone(), proposed_blobs)
.await?;
if certificate.value().is_confirmed() {
Ok(certificate)
Expand Down Expand Up @@ -1778,14 +1782,8 @@ where
) -> Result<Vec<Blob>, LocalNodeError> {
let mut blobs = Vec::new();
for blob_id in blob_ids {
if let Some(blob) = self.client.local_node.recent_blob(&blob_id).await {
blobs.push(blob);
continue;
}

let maybe_blob = self.pending_blobs().get(&blob_id).cloned();
if let Some(blob) = maybe_blob {
self.client.local_node.cache_recent_blob(&blob).await;
blobs.push(blob);
continue;
}
Expand All @@ -1801,7 +1799,6 @@ where
};

if let Some(blob) = maybe_blob {
self.client.local_node.cache_recent_blob(&blob).await;
blobs.push(blob);
continue;
}
Expand Down Expand Up @@ -2679,7 +2676,6 @@ where
/// Adds pending blobs
pub async fn add_pending_blobs(&self, pending_blobs: impl IntoIterator<Item = Blob>) {
for blob in pending_blobs {
self.client.local_node.cache_recent_blob(&blob).await;
self.state_mut().insert_pending_blob(blob);
}
}
Expand Down
38 changes: 5 additions & 33 deletions linera-core/src/local_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
// SPDX-License-Identifier: Apache-2.0

use std::{
borrow::Cow,
collections::{HashMap, HashSet, VecDeque},
collections::{HashSet, VecDeque},
future::Future,
sync::Arc,
};
Expand Down Expand Up @@ -33,7 +32,6 @@ use crate::{
node::{NodeError, ValidatorNode},
notifier::Notifier,
remote_node::RemoteNode,
value_cache::ValueCache,
worker::{WorkerError, WorkerState},
};

Expand Down Expand Up @@ -247,29 +245,18 @@ where
return Err(NodeError::InvalidChainInfoResponse);
}

let (found_blobs, not_found_blobs): (HashMap<BlobId, Blob>, Vec<BlobId>) = self
.recent_blobs()
.await
.try_get_many(missing_blob_ids.clone())
.await;
let found_blob_ids = found_blobs.clone().into_keys().collect::<HashSet<_>>();
let mut found_blobs = found_blobs.clone().into_values().collect::<Vec<_>>();

unique_missing_blob_ids = unique_missing_blob_ids
.difference(&found_blob_ids)
.copied()
.collect::<HashSet<BlobId>>();
let chain_manager_pending_blobs = self
.chain_state_view(chain_id)
.await?
.manager
.get()
.pending_blobs
.clone();
for blob_id in not_found_blobs {
if let Some(blob) = chain_manager_pending_blobs.get(&blob_id).cloned() {
let mut found_blobs = Vec::new();
for blob_id in missing_blob_ids {
if let Some(blob) = chain_manager_pending_blobs.get(blob_id).cloned() {
found_blobs.push(blob);
unique_missing_blob_ids.remove(&blob_id);
unique_missing_blob_ids.remove(blob_id);
}
}

Expand Down Expand Up @@ -379,21 +366,6 @@ where
Ok(response)
}

#[instrument(level = "trace", skip(self))]
pub async fn recent_blob(&self, blob_id: &BlobId) -> Option<Blob> {
self.node.state.recent_blob(blob_id).await
}

#[instrument(level = "trace", skip(self))]
async fn recent_blobs(&self) -> Arc<ValueCache<BlobId, Blob>> {
self.node.state.recent_blobs()
}

#[instrument(level = "trace", skip(self, blob), fields(blob_id = ?blob.id()))]
pub async fn cache_recent_blob(&self, blob: &Blob) -> bool {
self.node.state.cache_recent_blob(Cow::Borrowed(blob)).await
}

/// Downloads and processes all certificates up to (excluding) the specified height.
#[instrument(level = "trace", skip(self, validators, notifier))]
pub async fn download_certificates(
Expand Down
11 changes: 4 additions & 7 deletions linera-core/src/unit_tests/wasm_worker_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
#![allow(clippy::large_futures)]
#![cfg(any(feature = "wasmer", feature = "wasmtime"))]

use std::borrow::Cow;

use linera_base::{
crypto::KeyPair,
data_types::{
Expand Down Expand Up @@ -153,12 +151,11 @@ where
);
let publish_certificate = make_certificate(&committee, &worker, publish_block_proposal);

worker
.cache_recent_blob(Cow::Borrowed(&contract_blob))
.await;

let info = worker
.fully_handle_certificate(publish_certificate.clone(), vec![service_blob.clone()])
.fully_handle_certificate(
publish_certificate.clone(),
vec![contract_blob.clone(), service_blob.clone()],
)
.await
.unwrap()
.info;
Expand Down
Loading

0 comments on commit 5119ff6

Please sign in to comment.