Skip to content

Commit

Permalink
[experiment] Generation to keep cache in sync
Browse files Browse the repository at this point in the history
  • Loading branch information
georgemitenkov committed Nov 2, 2024
1 parent a824ea1 commit 5d432d0
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 41 deletions.
4 changes: 1 addition & 3 deletions aptos-move/aptos-vm/src/block_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,9 +554,7 @@ impl BlockAptosVM {
Arc::clone(&RAYON_EXEC_POOL),
signature_verified_block,
state_view,
// TODO(loader_v2): Remove me once replay is done. For debug only.
Arc::new(ImmutableModuleCache::empty()),
// Arc::clone(&GLOBAL_MODULE_CACHE),
Arc::clone(&GLOBAL_MODULE_CACHE),
config,
transaction_commit_listener,
)
Expand Down
48 changes: 37 additions & 11 deletions aptos-move/block-executor/src/code_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use move_core_types::{
use move_vm_runtime::{Module, RuntimeEnvironment, Script, WithRuntimeEnvironment};
use move_vm_types::code::{
ambassador_impl_ScriptCache, Code, ModuleCache, ModuleCode, ModuleCodeBuilder, ScriptCache,
WithBytes,
WithBytes, WithHash,
};
use std::sync::Arc;

Expand Down Expand Up @@ -145,17 +145,30 @@ impl<'a, T: Transaction, S: TStateView<Key = T::Key>, X: Executable> ModuleCache
> {
// First, look up the module in the cross-block global module cache. Record the read for
// later validation in case the read module is republished.
if let Some(module) = self.global_module_cache.get(key) {
match &self.latest_view {
ViewState::Sync(state) => state
.captured_reads
.borrow_mut()
.capture_global_cache_read(key.clone()),
ViewState::Unsync(state) => {
state.read_set.borrow_mut().capture_module_read(key.clone())
},
if let Some((module, needs_validation)) = self.global_module_cache.get(key) {
// If we do not need to validate global cache, return early.
if !needs_validation {
self.capture_global_cache_read(key);
return Ok(Some(module.clone()));
}

// Otherwise, this is the first time this module gets accessed in this block. We need
// to validate it is the same as in the state. We do it only once on the first access.
let is_valid = builder.build(key)?.is_some_and(|m| {
m.extension().hash() == module.extension().hash()
&& m.extension().state_value_metadata()
== module.extension().state_value_metadata()
});
if is_valid {
// This module is valid for this block, mark as so.
self.global_module_cache.set_generation(key);
self.capture_global_cache_read(key);
return Ok(Some(module.clone()));
}
return Ok(Some(module.clone()));

// Validation failed, global cache is not consistent with the state! Mark the entry as
// invalid and fall-back to slow path via sync module cache.
self.global_module_cache.mark_invalid(key);
}

// Global cache miss: check module cache in versioned/unsync maps.
Expand Down Expand Up @@ -241,4 +254,17 @@ impl<'a, T: Transaction, S: TStateView<Key = T::Key>, X: Executable> LatestView<
ViewState::Unsync(state) => state.unsync_map.module_cache(),
}
}

/// Captures the read from global module cache.
fn capture_global_cache_read(&self, key: &ModuleId) {
match &self.latest_view {
ViewState::Sync(state) => state
.captured_reads
.borrow_mut()
.capture_global_cache_read(key.clone()),
ViewState::Unsync(state) => {
state.read_set.borrow_mut().capture_module_read(key.clone())
},
}
}
}
120 changes: 95 additions & 25 deletions aptos-move/block-executor/src/code_cache_global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,22 @@ use std::{
hash::Hash,
ops::Deref,
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicU32, Ordering},
Arc,
},
};

/// Module code stored in cross-block module cache.
// TODO(loader_v2):
// We can move this to move-vm-types, but then we also need to have version generic or expose
// transaction index there, and define PanicError in Move (or convert from VMError).
struct ImmutableModuleCode<DC, VC, E> {
/// True if this code is "valid" within the block execution context (i.e, there has been no
/// republishing of this module so far). If false, executor needs to read the module from the
/// sync/unsync module caches.
valid: CachePadded<AtomicBool>,
/// Represents the generation of the module cache for which this entry has been validated
/// against the global state. For example, if the generation of the cache is 1, but the entry
/// has generation 0, it should be re-validated against the state and has its generation reset
/// accordingly.
generation: CachePadded<AtomicU32>,
/// Cached verified module. While [ModuleCode] type is used, the following invariants always
/// hold:
/// 1. Module's version is [None] (storage version).
Expand All @@ -38,7 +40,10 @@ where
{
/// Returns a new valid module. Returns a (panic) error if the module is not verified or has
/// non-storage version.
fn new(module: Arc<ModuleCode<DC, VC, E, Option<TxnIndex>>>) -> Result<Self, PanicError> {
fn new(
module: Arc<ModuleCode<DC, VC, E, Option<TxnIndex>>>,
generation: u32,
) -> Result<Self, PanicError> {
if !module.code().is_verified() || module.version().is_some() {
let msg = format!(
"Invariant violated for immutable module code : verified ({}), version({:?})",
Expand All @@ -50,6 +55,7 @@ where

Ok(Self {
valid: CachePadded::new(AtomicBool::new(true)),
generation: CachePadded::new(AtomicU32::new(generation)),
module: CachePadded::new(module),
})
}
Expand All @@ -59,6 +65,16 @@ where
self.valid.store(false, Ordering::Release)
}

/// Returns the generation of this module.
pub(crate) fn generation(&self) -> u32 {
self.generation.load(Ordering::Acquire)
}

/// Resets the generation of this module.
fn set_generation(&self, new_generation: u32) {
self.generation.store(new_generation, Ordering::Release);
}

/// Returns true if the module is valid.
pub(crate) fn is_valid(&self) -> bool {
self.valid.load(Ordering::Acquire)
Expand All @@ -78,6 +94,9 @@ pub struct ImmutableModuleCache<K, DC, VC, E> {
/// Maximum cache size. If the size is greater than this limit, the cache is flushed. Note that
/// this can only be done at block boundaries.
capacity: usize,

/// Represents the generation of this cache. Incremented for every block.
generation: ExplicitSyncWrapper<u32>,
}

impl<K, DC, VC, E> ImmutableModuleCache<K, DC, VC, E>
Expand All @@ -96,6 +115,7 @@ where
Self {
module_cache: ExplicitSyncWrapper::new(HashMap::new()),
capacity,
generation: ExplicitSyncWrapper::new(0),
}
}

Expand All @@ -116,12 +136,24 @@ where
}
}

/// Sets the generation of the module stored at associated key to the generation of the cache.
pub(crate) fn set_generation(&self, key: &K) {
if let Some(module) = self.module_cache.acquire().get(key) {
module.set_generation(*self.generation.acquire());
}
}

/// Returns the module stored in cache. If the module has not been cached, or it exists but is
/// not valid, [None] is returned.
pub(crate) fn get(&self, key: &K) -> Option<Arc<ModuleCode<DC, VC, E, Option<TxnIndex>>>> {
/// not valid, [None] is returned. Also returns a boolean flag to indicate if the cached module
/// needs validation (i.e., its generation is nto equal to the generation of the cache).
pub(crate) fn get(
&self,
key: &K,
) -> Option<(Arc<ModuleCode<DC, VC, E, Option<TxnIndex>>>, bool)> {
self.module_cache.acquire().get(key).and_then(|module| {
if module.is_valid() {
Some(module.inner().clone())
let needs_validation = *self.generation.acquire() != module.generation();
Some((module.inner().clone(), needs_validation))
} else {
None
}
Expand All @@ -133,8 +165,8 @@ where
self.module_cache.acquire().clear();
}

/// Inserts modules into the cache. Should never be called throughout block-execution. Use with
/// caution.
/// Inserts modules into the cache, and increments the generation counter of the cache. Should
/// never be called throughout block-execution. Use with caution.
///
/// Notes:
/// 1. Only verified modules are inserted.
Expand All @@ -143,12 +175,13 @@ where
/// these constraints are violated, a panic error is returned.
/// 4. If the cache size exceeds its capacity after all verified modules have been inserted,
/// the cache is flushed.
pub(crate) fn insert_verified_unchecked(
pub(crate) fn insert_verified_and_increment_generation_unchecked(
&self,
modules: impl Iterator<Item = (K, Arc<ModuleCode<DC, VC, E, Option<TxnIndex>>>)>,
) -> Result<(), PanicError> {
use hashbrown::hash_map::Entry::*;

let current_generation = *self.generation.acquire();
let mut guard = self.module_cache.acquire();
let module_cache = guard.dereference_mut();

Expand All @@ -167,18 +200,24 @@ where
if module.code().is_verified() {
let mut module = module.as_ref().clone();
module.set_version(None);
let prev =
module_cache.insert(key.clone(), ImmutableModuleCode::new(Arc::new(module))?);

let code = ImmutableModuleCode::new(Arc::new(module), current_generation)?;
let prev = module_cache.insert(key.clone(), code);

// At this point, we must have removed the entry, or returned a panic error.
assert!(prev.is_none())
}
}

// In case capacity is exceeded, flush the cache.
if module_cache.len() > self.capacity {
module_cache.clear();
}

// Increment generation counter to ensure we can later check that module cache is in sync
// with the state.
*self.generation.acquire() = current_generation.wrapping_add(1);

Ok(())
}

Expand All @@ -187,7 +226,7 @@ where
pub fn insert(&self, key: K, module: Arc<ModuleCode<DC, VC, E, Option<TxnIndex>>>) {
self.module_cache
.acquire()
.insert(key, ImmutableModuleCode::new(module).unwrap());
.insert(key, ImmutableModuleCode::new(module, 0).unwrap());
}

/// Removes the module from cache. Used for tests only.
Expand All @@ -201,6 +240,12 @@ where
pub fn size(&self) -> usize {
self.module_cache.acquire().len()
}

/// Returns the generation counter for the cache. Used for tests only.
#[cfg(any(test, feature = "testing"))]
pub fn generation(&self) -> u32 {
*self.generation.acquire()
}
}

#[cfg(test)]
Expand All @@ -211,21 +256,30 @@ mod test {

#[test]
fn test_immutable_module_code() {
assert!(ImmutableModuleCode::new(mock_deserialized_code(0, None)).is_err());
assert!(ImmutableModuleCode::new(mock_deserialized_code(0, Some(22))).is_err());
assert!(ImmutableModuleCode::new(mock_verified_code(0, Some(22))).is_err());
assert!(ImmutableModuleCode::new(mock_verified_code(0, None)).is_ok());
assert!(ImmutableModuleCode::new(mock_deserialized_code(0, None), 0).is_err());
assert!(ImmutableModuleCode::new(mock_deserialized_code(0, Some(22)), 0).is_err());
assert!(ImmutableModuleCode::new(mock_verified_code(0, Some(22)), 0).is_err());
assert!(ImmutableModuleCode::new(mock_verified_code(0, None), 0).is_ok());
}

#[test]
fn test_immutable_module_code_validity() {
let module_code = assert_ok!(ImmutableModuleCode::new(mock_verified_code(0, None)));
let module_code = assert_ok!(ImmutableModuleCode::new(mock_verified_code(0, None), 0));
assert!(module_code.is_valid());

module_code.mark_invalid();
assert!(!module_code.is_valid());
}

#[test]
fn test_immutable_module_code_generation() {
let module_code = assert_ok!(ImmutableModuleCode::new(mock_verified_code(0, None), 7));
assert_eq!(module_code.generation(), 7);

module_code.set_generation(10);
assert_eq!(module_code.generation(), 10);
}

#[test]
fn test_global_module_cache() {
let global_cache = ImmutableModuleCache::empty();
Expand Down Expand Up @@ -254,37 +308,53 @@ mod test {
for i in 0..capacity {
new_modules.push((i, mock_verified_code(i, Some(i as TxnIndex))));
}
let result = global_cache.insert_verified_unchecked(new_modules.into_iter());
let result = global_cache
.insert_verified_and_increment_generation_unchecked(new_modules.into_iter());
assert!(result.is_ok());
assert_eq!(global_cache.size(), capacity);
assert_eq!(global_cache.generation(), 1);

// Versions should be set to storage.
for key in 0..capacity {
let code = assert_some!(global_cache.get(&key));
let (code, _) = assert_some!(global_cache.get(&key));
assert!(code.version().is_none())
}

// Too many modules added, the cache should be flushed.
let new_modules = vec![(11, mock_verified_code(11, None))];
let result = global_cache.insert_verified_unchecked(new_modules.into_iter());
let result = global_cache
.insert_verified_and_increment_generation_unchecked(new_modules.into_iter());
assert!(result.is_ok());
assert_eq!(global_cache.size(), 0);
assert_eq!(global_cache.generation(), 2);

// Should not add deserialized code.
let deserialized_modules = vec![(0, mock_deserialized_code(0, None))];
assert_ok!(global_cache.insert_verified_unchecked(deserialized_modules.into_iter()));
assert_ok!(global_cache
.insert_verified_and_increment_generation_unchecked(deserialized_modules.into_iter()));
assert_eq!(global_cache.size(), 0);
assert_eq!(global_cache.generation(), 3);

// Should not override valid modules.
global_cache.insert(0, mock_verified_code(0, None));
let new_modules = vec![(0, mock_verified_code(100, None))];
assert_err!(global_cache.insert_verified_unchecked(new_modules.into_iter()));
assert_err!(global_cache
.insert_verified_and_increment_generation_unchecked(new_modules.into_iter()));
assert_eq!(global_cache.generation(), 3);

// Can override invalid modules.
global_cache.mark_invalid(&0);
let new_modules = vec![(0, mock_verified_code(100, None))];
let result = global_cache.insert_verified_unchecked(new_modules.into_iter());
let result = global_cache
.insert_verified_and_increment_generation_unchecked(new_modules.into_iter());
assert!(result.is_ok());
assert_eq!(global_cache.size(), 1);
assert_eq!(global_cache.generation(), 4);

// Generation incremented even if there are no code publishes.
let result =
global_cache.insert_verified_and_increment_generation_unchecked(vec![].into_iter());
assert!(result.is_ok());
assert_eq!(global_cache.generation(), 5);
}
}
4 changes: 2 additions & 2 deletions aptos-move/block-executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1169,7 +1169,7 @@ where

counters::update_state_counters(versioned_cache.stats(), true);
self.global_module_cache
.insert_verified_unchecked(versioned_cache.take_modules_iter())
.insert_verified_and_increment_generation_unchecked(versioned_cache.take_modules_iter())
.map_err(|err| {
alert!("[BlockSTM] Encountered panic error: {:?}", err);
})?;
Expand Down Expand Up @@ -1652,7 +1652,7 @@ where

counters::update_state_counters(unsync_map.stats(), false);
self.global_module_cache
.insert_verified_unchecked(unsync_map.into_modules_iter())?;
.insert_verified_and_increment_generation_unchecked(unsync_map.into_modules_iter())?;

let block_end_info = if self
.config
Expand Down

0 comments on commit 5d432d0

Please sign in to comment.