Skip to content

Commit

Permalink
Allow client node to read ChainStateView outside the `ChainWorkerSt…
Browse files Browse the repository at this point in the history
…ate` (#2124)

* Derive `ClonableView` for `ChainStateView`

Prepare to allow snapshots to be taken of the `ChainStateView` and used
when there's a need to perform mutations on `ChainStateView` that won't
be saved back to persistent storage.

* Create a `save` helper method

Prepare to only save after shared views into the chain state have been
dropped.

* Prepare to share read-only view of the chain state

Only persist the chain state if there are no active readers on the
shared chain state.

* Add `ChainWorkerState::chain_state_view`

Allow retrieving a read-only view of the `ChainStateView`, holding a
lock on it.

* Add `WorkerState::chain_state_view`

Allow retrieving a read-only view of a `ChainStateView`, holding a lock
on it.

* Add a `LocalNodeClient::chain_state_view` method

Allow retrieving a read-only view of a `ChainStateView`, holding a lock
on it.

* Create a `ReadOnlyChainStateView` helper type

Allow resolving GraphQL queries to a `ChainStateView` that's behind an
`OwnedRwLockReadGuard`.

* Remove `load_chain` call from `ChainClient`

Use `LocalNodeClient::chain_state_view` instead.

* Add a `LocalNodeClient` to `ValidatorUpdater`

Replace some fields with usage of the local node client instead.

* Don't load chains directly from storage in updater

Use the provided `LocalNodeClient` instance instead.

* Prevent caching of `ChainStateView` resolver

Ensure that the latest chain state is always accessed, and avoid any
risk that the resolver is kept alive longer than expected, holding a
lock and preventing the `ChainWorkerState` from progressing.

* Work around stack overflow

The `change_voting_rights::rocks_db` test in `linera-core` was failing
because it overflowed its stack. While trying to investigate why, this
refactor seemed to fix it. This should be good enough for now, and since
this part of the code will be refactored so that it runs inside a
separate task running a `ChainWorkerActor`, the stack pressure should be
reduced then.
  • Loading branch information
jvff authored Jun 14, 2024
1 parent 916c8b2 commit cb95ff5
Show file tree
Hide file tree
Showing 12 changed files with 183 additions and 67 deletions.
7 changes: 4 additions & 3 deletions linera-chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use linera_views::{
reentrant_collection_view::ReentrantCollectionView,
register_view::RegisterView,
set_view::SetView,
views::{CryptoHashView, RootView, View, ViewError},
views::{ClonableView, CryptoHashView, RootView, View, ViewError},
};
use serde::{Deserialize, Serialize};
#[cfg(with_testing)]
Expand Down Expand Up @@ -231,7 +231,8 @@ impl InboxEntry {
}

/// A view accessing the state of a chain.
#[derive(Debug, RootView, SimpleObject)]
#[derive(Debug, RootView, ClonableView, SimpleObject)]
#[graphql(cache_control(no_cache))]
pub struct ChainStateView<C>
where
C: Clone + Context + Send + Sync + 'static,
Expand Down Expand Up @@ -348,7 +349,7 @@ impl ChainTipState {
}

/// The state of a channel followed by subscribers.
#[derive(Debug, View, SimpleObject)]
#[derive(Debug, ClonableView, View, SimpleObject)]
pub struct ChannelStateView<C>
where
C: Context + Send + Sync,
Expand Down
4 changes: 2 additions & 2 deletions linera-chain/src/inbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use linera_views::{
common::Context,
queue_view::QueueView,
register_view::RegisterView,
views::{View, ViewError},
views::{ClonableView, View, ViewError},
};
use serde::{Deserialize, Serialize};
use thiserror::Error;
Expand All @@ -35,7 +35,7 @@ mod inbox_tests;
/// * The cursors of added events (resp. removed events) must be increasing over time.
/// * Reconciliation of added and removed events is allowed to skip some added events.
/// However, the opposite is not true: every removed event must be eventually added.
#[derive(Debug, View, async_graphql::SimpleObject)]
#[derive(Debug, ClonableView, View, async_graphql::SimpleObject)]
pub struct InboxStateView<C>
where
C: Clone + Context + Send + Sync,
Expand Down
4 changes: 2 additions & 2 deletions linera-chain/src/outbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use linera_views::{
common::Context,
queue_view::QueueView,
register_view::RegisterView,
views::{View, ViewError},
views::{ClonableView, View, ViewError},
};

#[cfg(test)]
Expand All @@ -22,7 +22,7 @@ mod outbox_tests;
/// we just send the certified blocks over and let the receivers figure out what were the
/// messages for them.
/// * When marking block heights as received, messages at lower heights are also marked (ie. dequeued).
#[derive(Debug, View, async_graphql::SimpleObject)]
#[derive(Debug, ClonableView, View, async_graphql::SimpleObject)]
pub struct OutboxStateView<C>
where
C: Context + Send + Sync + 'static,
Expand Down
59 changes: 50 additions & 9 deletions linera-core/src/chain_worker/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ use linera_execution::{
use linera_storage::Storage;
use linera_views::{
common::Context,
views::{RootView, View, ViewError},
views::{ClonableView, RootView, View, ViewError},
};
use tokio::sync::{OwnedRwLockReadGuard, RwLock};
use tracing::{debug, warn};
#[cfg(with_testing)]
use {linera_base::identifiers::BytecodeId, linera_chain::data_types::Event};
Expand All @@ -53,6 +54,7 @@ where
config: ChainWorkerConfig,
storage: StorageClient,
chain: ChainStateView<StorageClient::Context>,
shared_chain_view: Option<Arc<RwLock<ChainStateView<StorageClient::Context>>>>,
recent_hashed_certificate_values: Arc<ValueCache<CryptoHash, HashedCertificateValue>>,
recent_hashed_blobs: Arc<ValueCache<BlobId, HashedBlob>>,
knows_chain_is_active: bool,
Expand All @@ -77,6 +79,7 @@ where
config,
storage,
chain,
shared_chain_view: None,
recent_hashed_certificate_values: certificate_value_cache,
recent_hashed_blobs: blob_cache,
knows_chain_is_active: false,
Expand All @@ -88,6 +91,26 @@ where
self.chain.chain_id()
}

/// Returns a read-only view of the [`ChainStateView`].
///
/// The returned view holds a lock on the chain state, which prevents the worker from changing
/// it.
pub async fn chain_state_view(
&mut self,
) -> Result<OwnedRwLockReadGuard<ChainStateView<StorageClient::Context>>, WorkerError> {
if self.shared_chain_view.is_none() {
self.shared_chain_view = Some(Arc::new(RwLock::new(self.chain.clone_unchecked()?)));
}

Ok(self
.shared_chain_view
.as_ref()
.expect("`shared_chain_view` should be initialized above")
.clone()
.read_owned()
.await)
}

/// Returns a stored [`Certificate`] for the chain's block at the requested [`BlockHeight`].
#[cfg(with_testing)]
pub async fn read_certificate(
Expand Down Expand Up @@ -240,7 +263,7 @@ where
})
}
let info = ChainInfoResponse::new(&self.chain, self.config.key_pair());
self.chain.save().await?;
self.save().await?;
Ok((info, actions))
}

Expand Down Expand Up @@ -358,7 +381,7 @@ where
.await;
}
let info = ChainInfoResponse::new(&self.chain, self.config.key_pair());
self.chain.save().await?;
self.save().await?;
// Trigger any outgoing cross-chain messages that haven't been confirmed yet.
let actions = self.create_network_actions().await?;
Ok((info, actions))
Expand Down Expand Up @@ -414,7 +437,7 @@ where
self.storage.clock().current_time(),
);
let info = ChainInfoResponse::new(&self.chain, self.config.key_pair());
self.chain.save().await?;
self.save().await?;
let round = self.chain.manager.get().current_round;
if round > old_round {
actions.notifications.push(Notification {
Expand Down Expand Up @@ -545,7 +568,7 @@ where
},
});
// Persist chain.
self.chain.save().await?;
self.save().await?;
self.recent_hashed_certificate_values
.insert(Cow::Owned(certificate.value))
.await;
Expand Down Expand Up @@ -594,7 +617,7 @@ where
return Ok(None);
}
// Save the chain.
self.chain.save().await?;
self.save().await?;
Ok(Some(last_updated_height))
}

Expand All @@ -617,7 +640,7 @@ where
}
}

self.chain.save().await?;
self.save().await?;

Ok(height_with_fully_delivered_messages)
}
Expand All @@ -635,7 +658,7 @@ where
let local_time = self.storage.clock().current_time();
let manager = self.chain.manager.get_mut();
if manager.vote_timeout(chain_id, height, *epoch, key_pair, local_time) {
self.chain.save().await?;
self.save().await?;
}
}
}
Expand All @@ -651,7 +674,7 @@ where
let key_pair = self.config.key_pair();
let manager = self.chain.manager.get_mut();
if manager.vote_fallback(chain_id, height, *epoch, key_pair) {
self.chain.save().await?;
self.save().await?;
}
}
}
Expand Down Expand Up @@ -950,6 +973,24 @@ where
bundle_vecs,
})
}

/// Stores the chain state in persistent storage.
///
/// Waits until the [`ChainStateView`] is no longer shared before persisting the changes.
async fn save(&mut self) -> Result<(), WorkerError> {
// SAFETY: this is the only place a write-lock is acquired, and read-locks are acquired in
// the `chain_state_view` method, which has a `&mut self` receiver like this `save` method.
// That means that when the write-lock is acquired, no readers will be waiting to acquire
// the lock. This is important because otherwise readers could have a stale view of the
// chain state.
let maybe_shared_chain_view = self.shared_chain_view.take();
let _maybe_write_guard = match &maybe_shared_chain_view {
Some(shared_chain_view) => Some(shared_chain_view.write().await),
None => None,
};

Ok(self.chain.save().await?)
}
}

/// Helper type for handling cross-chain updates.
Expand Down
22 changes: 7 additions & 15 deletions linera-core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use linera_storage::Storage;
use linera_views::views::ViewError;
use serde::Serialize;
use thiserror::Error;
use tokio::sync::OwnedRwLockReadGuard;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, error, info};

Expand Down Expand Up @@ -332,13 +333,8 @@ where
/// Obtains a `ChainStateView` for a given `ChainId`.
pub async fn chain_state_view(
&self,
) -> Result<Arc<ChainStateView<S::Context>>, LocalNodeError> {
let chain_state_view = self
.storage_client()
.await
.load_chain(self.chain_id)
.await?;
Ok(Arc::new(chain_state_view))
) -> Result<OwnedRwLockReadGuard<ChainStateView<S::Context>>, LocalNodeError> {
Ok(self.node_client.chain_state_view(self.chain_id).await?)
}

/// Subscribes to notifications from this client's chain.
Expand Down Expand Up @@ -657,8 +653,7 @@ where
height: BlockHeight,
delivery: CrossChainMessageDelivery,
) -> Result<(), ChainClientError> {
let storage_client = self.storage_client().await;
let recent_hashed_blobs = self.node_client.recent_hashed_blobs().await;
let local_node = self.node_client.clone();
let chain_manager_pending_blobs = self.chain_managers_pending_blobs().await?;
let nodes: Vec<_> = self.validator_node_provider.make_nodes(committee)?;
communicate_with_quorum(
Expand All @@ -669,8 +664,7 @@ where
let mut updater = ValidatorUpdater {
name,
node,
storage: storage_client.clone(),
local_node_recent_hashed_blobs: recent_hashed_blobs.clone(),
local_node: local_node.clone(),
local_node_chain_managers_pending_blobs: chain_manager_pending_blobs.clone(),
};
Box::pin(async move {
Expand All @@ -695,8 +689,7 @@ where
action: CommunicateAction,
value: HashedCertificateValue,
) -> Result<Certificate, ChainClientError> {
let storage_client = self.storage_client().await;
let recent_hashed_blobs = self.node_client.recent_hashed_blobs().await;
let local_node = self.node_client.clone();
let chain_manager_pending_blobs = self.chain_managers_pending_blobs().await?;
let nodes: Vec<_> = self.validator_node_provider.make_nodes(committee)?;
let ((votes_hash, votes_round), votes) = communicate_with_quorum(
Expand All @@ -707,8 +700,7 @@ where
let mut updater = ValidatorUpdater {
name,
node,
storage: storage_client.clone(),
local_node_recent_hashed_blobs: recent_hashed_blobs.clone(),
local_node: local_node.clone(),
local_node_chain_managers_pending_blobs: chain_manager_pending_blobs.clone(),
};
let action = action.clone();
Expand Down
21 changes: 19 additions & 2 deletions linera-core/src/local_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ use linera_base::{
data_types::{ArithmeticError, Blob, BlockHeight, HashedBlob},
identifiers::{BlobId, ChainId, MessageId},
};
use linera_chain::data_types::{
Block, BlockProposal, Certificate, ExecutedBlock, HashedCertificateValue, LiteCertificate,
use linera_chain::{
data_types::{
Block, BlockProposal, Certificate, ExecutedBlock, HashedCertificateValue, LiteCertificate,
},
ChainStateView,
};
use linera_execution::{
committee::ValidatorName, BytecodeLocation, Query, Response, UserApplicationDescription,
Expand All @@ -20,6 +23,7 @@ use linera_storage::Storage;
use linera_views::views::ViewError;
use rand::prelude::SliceRandom;
use thiserror::Error;
use tokio::sync::OwnedRwLockReadGuard;

use crate::{
data_types::{BlockHeightRange, ChainInfo, ChainInfoQuery, ChainInfoResponse},
Expand Down Expand Up @@ -296,6 +300,19 @@ where
info
}

/// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its
/// [`ChainId`].
///
/// The returned view holds a lock on the chain state, which prevents the local node from
/// changing the state of that chain.
pub async fn chain_state_view(
&self,
chain_id: ChainId,
) -> Result<OwnedRwLockReadGuard<ChainStateView<S::Context>>, WorkerError> {
let node = self.node.lock().await;
node.state.chain_state_view(chain_id).await
}

pub(crate) async fn local_chain_info(
&self,
chain_id: ChainId,
Expand Down
Loading

0 comments on commit cb95ff5

Please sign in to comment.