From 108a23d8a786026041c731c4bec913f3b89bc665 Mon Sep 17 00:00:00 2001 From: George Mitenkov Date: Sat, 2 Nov 2024 15:00:31 +0000 Subject: [PATCH] [experiment] Generation to keep cache in sync --- aptos-move/aptos-vm/src/block_executor/mod.rs | 4 +- aptos-move/block-executor/src/code_cache.rs | 48 +++++-- .../block-executor/src/code_cache_global.rs | 120 ++++++++++++++---- aptos-move/block-executor/src/executor.rs | 4 +- 4 files changed, 135 insertions(+), 41 deletions(-) diff --git a/aptos-move/aptos-vm/src/block_executor/mod.rs b/aptos-move/aptos-vm/src/block_executor/mod.rs index c7764b2a6160f9..de16c676676b69 100644 --- a/aptos-move/aptos-vm/src/block_executor/mod.rs +++ b/aptos-move/aptos-vm/src/block_executor/mod.rs @@ -554,9 +554,7 @@ impl BlockAptosVM { Arc::clone(&RAYON_EXEC_POOL), signature_verified_block, state_view, - // TODO(loader_v2): Remove me once replay is done. For debug only. - Arc::new(ImmutableModuleCache::empty()), - // Arc::clone(&GLOBAL_MODULE_CACHE), + Arc::clone(&GLOBAL_MODULE_CACHE), config, transaction_commit_listener, ) diff --git a/aptos-move/block-executor/src/code_cache.rs b/aptos-move/block-executor/src/code_cache.rs index 5f4864621b1f18..560560999293a5 100644 --- a/aptos-move/block-executor/src/code_cache.rs +++ b/aptos-move/block-executor/src/code_cache.rs @@ -25,7 +25,7 @@ use move_core_types::{ use move_vm_runtime::{Module, RuntimeEnvironment, Script, WithRuntimeEnvironment}; use move_vm_types::code::{ ambassador_impl_ScriptCache, Code, ModuleCache, ModuleCode, ModuleCodeBuilder, ScriptCache, - WithBytes, + WithBytes, WithHash, }; use std::sync::Arc; @@ -145,17 +145,30 @@ impl<'a, T: Transaction, S: TStateView, X: Executable> ModuleCache > { // First, look up the module in the cross-block global module cache. Record the read for // later validation in case the read module is republished. - if let Some(module) = self.global_module_cache.get(key) { - match &self.latest_view { - ViewState::Sync(state) => state - .captured_reads - .borrow_mut() - .capture_global_cache_read(key.clone()), - ViewState::Unsync(state) => { - state.read_set.borrow_mut().capture_module_read(key.clone()) - }, + if let Some((module, needs_validation)) = self.global_module_cache.get(key) { + // If we do not need to validate global cache, return early. + if !needs_validation { + self.capture_global_cache_read(key); + return Ok(Some(module.clone())); + } + + // Otherwise, this is the first time this module gets accessed in this block. We need + // to validate it is the same as in the state. We do it only once on the first access. + let is_valid = builder.build(key)?.is_some_and(|m| { + m.extension().hash() == module.extension().hash() + && m.extension().state_value_metadata() + == module.extension().state_value_metadata() + }); + if is_valid { + // This module is valid for this block, mark as so. + self.global_module_cache.set_generation(key); + self.capture_global_cache_read(key); + return Ok(Some(module.clone())); } - return Ok(Some(module.clone())); + + // Validation failed, global cache is not consistent with the state! Mark the entry as + // invalid and fall-back to slow path via sync module cache. + self.global_module_cache.mark_invalid(key); } // Global cache miss: check module cache in versioned/unsync maps. @@ -241,4 +254,17 @@ impl<'a, T: Transaction, S: TStateView, X: Executable> LatestView< ViewState::Unsync(state) => state.unsync_map.module_cache(), } } + + /// Captures the read from global module cache. + fn capture_global_cache_read(&self, key: &ModuleId) { + match &self.latest_view { + ViewState::Sync(state) => state + .captured_reads + .borrow_mut() + .capture_global_cache_read(key.clone()), + ViewState::Unsync(state) => { + state.read_set.borrow_mut().capture_module_read(key.clone()) + }, + } + } } diff --git a/aptos-move/block-executor/src/code_cache_global.rs b/aptos-move/block-executor/src/code_cache_global.rs index 6ff31da656e793..6a5ada11f4edff 100644 --- a/aptos-move/block-executor/src/code_cache_global.rs +++ b/aptos-move/block-executor/src/code_cache_global.rs @@ -11,20 +11,22 @@ use std::{ hash::Hash, ops::Deref, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU32, Ordering}, Arc, }, }; /// Module code stored in cross-block module cache. -// TODO(loader_v2): -// We can move this to move-vm-types, but then we also need to have version generic or expose -// transaction index there, and define PanicError in Move (or convert from VMError). struct ImmutableModuleCode { /// True if this code is "valid" within the block execution context (i.e, there has been no /// republishing of this module so far). If false, executor needs to read the module from the /// sync/unsync module caches. valid: CachePadded, + /// Represents the generation of the module cache for which this entry has been validated + /// against the global state. For example, if the generation of the cache is 1, but the entry + /// has generation 0, it should be re-validated against the state and has its generation reset + /// accordingly. + generation: CachePadded, /// Cached verified module. While [ModuleCode] type is used, the following invariants always /// hold: /// 1. Module's version is [None] (storage version). @@ -38,7 +40,10 @@ where { /// Returns a new valid module. Returns a (panic) error if the module is not verified or has /// non-storage version. - fn new(module: Arc>>) -> Result { + fn new( + module: Arc>>, + generation: u32, + ) -> Result { if !module.code().is_verified() || module.version().is_some() { let msg = format!( "Invariant violated for immutable module code : verified ({}), version({:?})", @@ -50,6 +55,7 @@ where Ok(Self { valid: CachePadded::new(AtomicBool::new(true)), + generation: CachePadded::new(AtomicU32::new(generation)), module: CachePadded::new(module), }) } @@ -59,6 +65,16 @@ where self.valid.store(false, Ordering::Release) } + /// Returns the generation of this module. + pub(crate) fn generation(&self) -> u32 { + self.generation.load(Ordering::Acquire) + } + + /// Resets the generation of this module. + fn set_generation(&self, new_generation: u32) { + self.generation.store(new_generation, Ordering::Release); + } + /// Returns true if the module is valid. pub(crate) fn is_valid(&self) -> bool { self.valid.load(Ordering::Acquire) @@ -78,6 +94,9 @@ pub struct ImmutableModuleCache { /// Maximum cache size. If the size is greater than this limit, the cache is flushed. Note that /// this can only be done at block boundaries. capacity: usize, + + /// Represents the generation of this cache. Incremented for every block. + generation: ExplicitSyncWrapper, } impl ImmutableModuleCache @@ -96,6 +115,7 @@ where Self { module_cache: ExplicitSyncWrapper::new(HashMap::new()), capacity, + generation: ExplicitSyncWrapper::new(0), } } @@ -116,12 +136,24 @@ where } } + /// Sets the generation of the module stored at associated key to the generation of the cache. + pub(crate) fn set_generation(&self, key: &K) { + if let Some(module) = self.module_cache.acquire().get(key) { + module.set_generation(*self.generation.acquire()); + } + } + /// Returns the module stored in cache. If the module has not been cached, or it exists but is - /// not valid, [None] is returned. - pub(crate) fn get(&self, key: &K) -> Option>>> { + /// not valid, [None] is returned. Also returns a boolean flag to indicate if the cached module + /// needs validation (i.e., its generation is not equal to the generation of the cache). + pub(crate) fn get( + &self, + key: &K, + ) -> Option<(Arc>>, bool)> { self.module_cache.acquire().get(key).and_then(|module| { if module.is_valid() { - Some(module.inner().clone()) + let needs_validation = *self.generation.acquire() != module.generation(); + Some((module.inner().clone(), needs_validation)) } else { None } @@ -133,8 +165,8 @@ where self.module_cache.acquire().clear(); } - /// Inserts modules into the cache. Should never be called throughout block-execution. Use with - /// caution. + /// Inserts modules into the cache, and increments the generation counter of the cache. Should + /// never be called throughout block-execution. Use with caution. /// /// Notes: /// 1. Only verified modules are inserted. @@ -143,12 +175,13 @@ where /// these constraints are violated, a panic error is returned. /// 4. If the cache size exceeds its capacity after all verified modules have been inserted, /// the cache is flushed. - pub(crate) fn insert_verified_unchecked( + pub(crate) fn insert_verified_and_increment_generation_unchecked( &self, modules: impl Iterator>>)>, ) -> Result<(), PanicError> { use hashbrown::hash_map::Entry::*; + let current_generation = *self.generation.acquire(); let mut guard = self.module_cache.acquire(); let module_cache = guard.dereference_mut(); @@ -167,18 +200,24 @@ where if module.code().is_verified() { let mut module = module.as_ref().clone(); module.set_version(None); - let prev = - module_cache.insert(key.clone(), ImmutableModuleCode::new(Arc::new(module))?); + + let code = ImmutableModuleCode::new(Arc::new(module), current_generation)?; + let prev = module_cache.insert(key.clone(), code); // At this point, we must have removed the entry, or returned a panic error. assert!(prev.is_none()) } } + // In case capacity is exceeded, flush the cache. if module_cache.len() > self.capacity { module_cache.clear(); } + // Increment generation counter to ensure we can later check that module cache is in sync + // with the state. + *self.generation.acquire() = current_generation.wrapping_add(1); + Ok(()) } @@ -187,7 +226,7 @@ where pub fn insert(&self, key: K, module: Arc>>) { self.module_cache .acquire() - .insert(key, ImmutableModuleCode::new(module).unwrap()); + .insert(key, ImmutableModuleCode::new(module, 0).unwrap()); } /// Removes the module from cache. Used for tests only. @@ -201,6 +240,12 @@ where pub fn size(&self) -> usize { self.module_cache.acquire().len() } + + /// Returns the generation counter for the cache. Used for tests only. + #[cfg(any(test, feature = "testing"))] + pub fn generation(&self) -> u32 { + *self.generation.acquire() + } } #[cfg(test)] @@ -211,21 +256,30 @@ mod test { #[test] fn test_immutable_module_code() { - assert!(ImmutableModuleCode::new(mock_deserialized_code(0, None)).is_err()); - assert!(ImmutableModuleCode::new(mock_deserialized_code(0, Some(22))).is_err()); - assert!(ImmutableModuleCode::new(mock_verified_code(0, Some(22))).is_err()); - assert!(ImmutableModuleCode::new(mock_verified_code(0, None)).is_ok()); + assert!(ImmutableModuleCode::new(mock_deserialized_code(0, None), 0).is_err()); + assert!(ImmutableModuleCode::new(mock_deserialized_code(0, Some(22)), 0).is_err()); + assert!(ImmutableModuleCode::new(mock_verified_code(0, Some(22)), 0).is_err()); + assert!(ImmutableModuleCode::new(mock_verified_code(0, None), 0).is_ok()); } #[test] fn test_immutable_module_code_validity() { - let module_code = assert_ok!(ImmutableModuleCode::new(mock_verified_code(0, None))); + let module_code = assert_ok!(ImmutableModuleCode::new(mock_verified_code(0, None), 0)); assert!(module_code.is_valid()); module_code.mark_invalid(); assert!(!module_code.is_valid()); } + #[test] + fn test_immutable_module_code_generation() { + let module_code = assert_ok!(ImmutableModuleCode::new(mock_verified_code(0, None), 7)); + assert_eq!(module_code.generation(), 7); + + module_code.set_generation(10); + assert_eq!(module_code.generation(), 10); + } + #[test] fn test_global_module_cache() { let global_cache = ImmutableModuleCache::empty(); @@ -254,37 +308,53 @@ mod test { for i in 0..capacity { new_modules.push((i, mock_verified_code(i, Some(i as TxnIndex)))); } - let result = global_cache.insert_verified_unchecked(new_modules.into_iter()); + let result = global_cache + .insert_verified_and_increment_generation_unchecked(new_modules.into_iter()); assert!(result.is_ok()); assert_eq!(global_cache.size(), capacity); + assert_eq!(global_cache.generation(), 1); // Versions should be set to storage. for key in 0..capacity { - let code = assert_some!(global_cache.get(&key)); + let (code, _) = assert_some!(global_cache.get(&key)); assert!(code.version().is_none()) } // Too many modules added, the cache should be flushed. let new_modules = vec![(11, mock_verified_code(11, None))]; - let result = global_cache.insert_verified_unchecked(new_modules.into_iter()); + let result = global_cache + .insert_verified_and_increment_generation_unchecked(new_modules.into_iter()); assert!(result.is_ok()); assert_eq!(global_cache.size(), 0); + assert_eq!(global_cache.generation(), 2); // Should not add deserialized code. let deserialized_modules = vec![(0, mock_deserialized_code(0, None))]; - assert_ok!(global_cache.insert_verified_unchecked(deserialized_modules.into_iter())); + assert_ok!(global_cache + .insert_verified_and_increment_generation_unchecked(deserialized_modules.into_iter())); assert_eq!(global_cache.size(), 0); + assert_eq!(global_cache.generation(), 3); // Should not override valid modules. global_cache.insert(0, mock_verified_code(0, None)); let new_modules = vec![(0, mock_verified_code(100, None))]; - assert_err!(global_cache.insert_verified_unchecked(new_modules.into_iter())); + assert_err!(global_cache + .insert_verified_and_increment_generation_unchecked(new_modules.into_iter())); + assert_eq!(global_cache.generation(), 3); // Can override invalid modules. global_cache.mark_invalid(&0); let new_modules = vec![(0, mock_verified_code(100, None))]; - let result = global_cache.insert_verified_unchecked(new_modules.into_iter()); + let result = global_cache + .insert_verified_and_increment_generation_unchecked(new_modules.into_iter()); assert!(result.is_ok()); assert_eq!(global_cache.size(), 1); + assert_eq!(global_cache.generation(), 4); + + // Generation incremented even if there are no code publishes. + let result = + global_cache.insert_verified_and_increment_generation_unchecked(vec![].into_iter()); + assert!(result.is_ok()); + assert_eq!(global_cache.generation(), 5); } } diff --git a/aptos-move/block-executor/src/executor.rs b/aptos-move/block-executor/src/executor.rs index eb7965ace5adba..d800e03b87aafd 100644 --- a/aptos-move/block-executor/src/executor.rs +++ b/aptos-move/block-executor/src/executor.rs @@ -1169,7 +1169,7 @@ where counters::update_state_counters(versioned_cache.stats(), true); self.global_module_cache - .insert_verified_unchecked(versioned_cache.take_modules_iter()) + .insert_verified_and_increment_generation_unchecked(versioned_cache.take_modules_iter()) .map_err(|err| { alert!("[BlockSTM] Encountered panic error: {:?}", err); })?; @@ -1652,7 +1652,7 @@ where counters::update_state_counters(unsync_map.stats(), false); self.global_module_cache - .insert_verified_unchecked(unsync_map.into_modules_iter())?; + .insert_verified_and_increment_generation_unchecked(unsync_map.into_modules_iter())?; let block_end_info = if self .config