diff --git a/Cargo.lock b/Cargo.lock index bd80614bb7da1..a61a62bab4508 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -682,6 +682,7 @@ dependencies = [ "aptos-metrics-core", "aptos-mvhashmap", "aptos-types", + "aptos-vm-environment", "aptos-vm-logging", "aptos-vm-types", "arc-swap", @@ -811,7 +812,6 @@ version = "0.1.0" dependencies = [ "anyhow", "aptos-framework", - "aptos-global-cache-manager", "aptos-language-e2e-tests", "aptos-rest-client", "aptos-types", @@ -1264,7 +1264,6 @@ dependencies = [ "aptos-executor", "aptos-executor-test-helpers", "aptos-executor-types", - "aptos-global-cache-manager", "aptos-logger", "aptos-storage-interface", "aptos-temppath", @@ -1463,7 +1462,6 @@ dependencies = [ "aptos-executor-types", "aptos-experimental-runtimes", "aptos-genesis", - "aptos-global-cache-manager", "aptos-indexer-grpc-table-info", "aptos-infallible", "aptos-logger", @@ -1505,7 +1503,6 @@ dependencies = [ "aptos-experimental-ptx-executor", "aptos-experimental-runtimes", "aptos-genesis", - "aptos-global-cache-manager", "aptos-jellyfish-merkle", "aptos-logger", "aptos-metrics-core", @@ -1544,7 +1541,6 @@ version = "0.1.0" dependencies = [ "aptos-block-partitioner", "aptos-config", - "aptos-global-cache-manager", "aptos-infallible", "aptos-language-e2e-tests", "aptos-logger", @@ -1651,7 +1647,6 @@ name = "aptos-experimental-ptx-executor" version = "0.1.0" dependencies = [ "aptos-experimental-runtimes", - "aptos-global-cache-manager", "aptos-infallible", "aptos-logger", "aptos-metrics-core", @@ -2055,24 +2050,6 @@ dependencies = [ "ureq", ] -[[package]] -name = "aptos-global-cache-manager" -version = "0.0.1" -dependencies = [ - "aptos-crypto", - "aptos-types", - "aptos-vm-environment", - "aptos-vm-types", - "bcs 0.1.4", - "claims", - "move-binary-format", - "move-core-types", - "move-vm-runtime", - "move-vm-types", - "parking_lot 0.12.1", - "test-case", -] - [[package]] name = "aptos-global-constants" version = "0.1.0" @@ -2857,7 +2834,6 @@ dependencies = [ "aptos-consensus", "aptos-crypto", "aptos-gas-profiling", - "aptos-global-cache-manager", "aptos-logger", "aptos-rest-client", "aptos-types", @@ -4209,7 +4185,6 @@ dependencies = [ "aptos-block-executor", "aptos-block-partitioner", "aptos-crypto", - "aptos-global-cache-manager", "aptos-language-e2e-tests", "aptos-logger", "aptos-metrics-core", @@ -4314,7 +4289,6 @@ dependencies = [ "aptos-crypto", "aptos-framework", "aptos-gas-schedule", - "aptos-global-cache-manager", "aptos-language-e2e-tests", "aptos-resource-viewer", "aptos-storage-interface", @@ -4373,7 +4347,6 @@ dependencies = [ "claims", "coset", "criterion", - "crossbeam", "dashmap", "derivative", "fixed", @@ -4488,7 +4461,6 @@ dependencies = [ "aptos-gas-algebra", "aptos-gas-meter", "aptos-gas-schedule", - "aptos-global-cache-manager", "aptos-infallible", "aptos-language-e2e-tests", "aptos-logger", @@ -4612,7 +4584,6 @@ dependencies = [ "anyhow", "aptos-cached-packages", "aptos-gas-schedule", - "aptos-global-cache-manager", "aptos-language-e2e-tests", "aptos-move-stdlib", "aptos-native-interface", diff --git a/Cargo.toml b/Cargo.toml index 0c6a40ff966b9..64384fd8c1c8f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,6 @@ members = [ "aptos-move/aptos-gas-profiling", "aptos-move/aptos-gas-schedule", "aptos-move/aptos-gas-schedule-updator", - "aptos-move/aptos-global-cache-manager", "aptos-move/aptos-memory-usage-tracker", "aptos-move/aptos-native-interface", "aptos-move/aptos-release-builder", @@ -358,7 +357,6 @@ aptos-gas-schedule = { path = "aptos-move/aptos-gas-schedule" } aptos-gas-schedule-updator = { path = "aptos-move/aptos-gas-schedule-updator" } aptos-genesis = { path = "crates/aptos-genesis" } aptos-github-client = { path = "crates/aptos-github-client" } -aptos-global-cache-manager = { path = "aptos-move/aptos-global-cache-manager" } aptos-global-constants = { path = "config/global-constants" } aptos-id-generator = { path = "crates/aptos-id-generator" } aptos-indexer = { path = "crates/indexer" } diff --git a/aptos-move/aptos-debugger/Cargo.toml b/aptos-move/aptos-debugger/Cargo.toml index d8bfc76cd3e60..8e83673767603 100644 --- a/aptos-move/aptos-debugger/Cargo.toml +++ b/aptos-move/aptos-debugger/Cargo.toml @@ -18,7 +18,6 @@ aptos-block-executor = { workspace = true } aptos-consensus = { workspace = true } aptos-crypto = { workspace = true } aptos-gas-profiling = { workspace = true } -aptos-global-cache-manager = { workspace = true } aptos-logger = { workspace = true } aptos-rest-client = { workspace = true } aptos-types = { workspace = true } diff --git a/aptos-move/aptos-debugger/src/aptos_debugger.rs b/aptos-move/aptos-debugger/src/aptos_debugger.rs index 81e95438fb8d1..0b4946fce95dc 100644 --- a/aptos-move/aptos-debugger/src/aptos_debugger.rs +++ b/aptos-move/aptos-debugger/src/aptos_debugger.rs @@ -4,7 +4,6 @@ use anyhow::{bail, format_err, Result}; use aptos_block_executor::txn_commit_hook::NoOpTransactionCommitHook; use aptos_gas_profiling::{GasProfiler, TransactionGasLog}; -use aptos_global_cache_manager::GlobalCacheManager; use aptos_rest_client::Client; use aptos_types::{ account_address::AccountAddress, @@ -429,26 +428,15 @@ fn execute_block_no_limit( state_view: &DebuggerStateView, concurrency_level: usize, ) -> Result, VMStatus> { - let global_cache_manager = GlobalCacheManager::new_with_default_config(); - global_cache_manager.mark_block_execution_start(state_view, None)?; - let result = BlockAptosVM::execute_block::< - _, - NoOpTransactionCommitHook, - >( + BlockAptosVM::execute_block::<_, NoOpTransactionCommitHook>( sig_verified_txns, state_view, - &global_cache_manager, + None, BlockExecutorConfig { - local: BlockExecutorLocalConfig { - concurrency_level, - allow_fallback: true, - discard_failed_blocks: false, - }, + local: BlockExecutorLocalConfig::default_with_concurrency_level(concurrency_level), onchain: BlockExecutorConfigFromOnchain::new_no_block_limit(), }, None, ) - .map(BlockOutput::into_transaction_outputs_forced); - global_cache_manager.mark_block_execution_end(None)?; - result + .map(BlockOutput::into_transaction_outputs_forced) } diff --git a/aptos-move/aptos-e2e-comparison-testing/Cargo.toml b/aptos-move/aptos-e2e-comparison-testing/Cargo.toml index 71af9ffdc387d..30c851670b4df 100644 --- a/aptos-move/aptos-e2e-comparison-testing/Cargo.toml +++ b/aptos-move/aptos-e2e-comparison-testing/Cargo.toml @@ -13,7 +13,6 @@ default-run = "aptos-comparison-testing" [dependencies] anyhow = { workspace = true } aptos-framework = { workspace = true } -aptos-global-cache-manager = { workspace = true } aptos-language-e2e-tests = { workspace = true } aptos-rest-client = { workspace = true } aptos-types = { workspace = true } diff --git a/aptos-move/aptos-global-cache-manager/Cargo.toml b/aptos-move/aptos-global-cache-manager/Cargo.toml deleted file mode 100644 index 203c76ef0927e..0000000000000 --- a/aptos-move/aptos-global-cache-manager/Cargo.toml +++ /dev/null @@ -1,32 +0,0 @@ -[package] -name = "aptos-global-cache-manager" -description = "Aptos global module and environement cache manager" -version = "0.0.1" - -# Workspace inherited keys -authors = { workspace = true } -edition = { workspace = true } -homepage = { workspace = true } -license = { workspace = true } -publish = { workspace = true } -repository = { workspace = true } -rust-version = { workspace = true } - -[dependencies] -aptos-crypto = { workspace = true } -aptos-types = { workspace = true } -aptos-vm-environment = { workspace = true } -aptos-vm-types = { workspace = true } -move-binary-format = { workspace = true } -move-core-types = { workspace = true } -move-vm-runtime = { workspace = true } -move-vm-types = { workspace = true } -parking_lot = { workspace = true } - -[dev-dependencies] -aptos-crypto = { workspace = true, features = ["fuzzing"] } -aptos-types = { workspace = true, features = ["testing"] } -bcs = { workspace = true } -claims = { workspace = true } -move-vm-types = { workspace = true, features = ["testing"] } -test-case = { workspace = true } diff --git a/aptos-move/aptos-global-cache-manager/src/config.rs b/aptos-move/aptos-global-cache-manager/src/config.rs deleted file mode 100644 index 8427f087ffa66..0000000000000 --- a/aptos-move/aptos-global-cache-manager/src/config.rs +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -/// Configuration used for global caches. -pub struct GlobalCacheConfig { - /// If true, when global caches are empty, Aptos framework is prefetched into module cache. - pub prefetch_framework_code: bool, - /// The maximum size serialized modules can take in module cache. - pub max_module_cache_size_in_bytes: usize, - /// The maximum size (in terms of entries) of struct name re-indexing map stored in runtime - /// environment. - pub max_struct_name_index_map_size: usize, -} - -impl Default for GlobalCacheConfig { - fn default() -> Self { - // TODO(loader_v2): - // Right now these are hardcoded here, we probably want to add them to gas schedule or - // some on-chain config. - Self { - prefetch_framework_code: true, - // Use 50 Mb for now, should be large enough to cache many modules. - max_module_cache_size_in_bytes: 50 * 1024 * 1024, - max_struct_name_index_map_size: 100_000, - } - } -} diff --git a/aptos-move/aptos-global-cache-manager/src/lib.rs b/aptos-move/aptos-global-cache-manager/src/lib.rs deleted file mode 100644 index a5844418599cc..0000000000000 --- a/aptos-move/aptos-global-cache-manager/src/lib.rs +++ /dev/null @@ -1,7 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -pub(crate) mod config; -mod manager; - -pub use manager::GlobalCacheManager; diff --git a/aptos-move/aptos-global-cache-manager/src/manager.rs b/aptos-move/aptos-global-cache-manager/src/manager.rs deleted file mode 100644 index 5f0b96d4aa665..0000000000000 --- a/aptos-move/aptos-global-cache-manager/src/manager.rs +++ /dev/null @@ -1,640 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -use crate::config::GlobalCacheConfig; -use aptos_crypto::HashValue; -use aptos_types::{ - read_only_module_cache::ReadOnlyModuleCache, state_store::StateView, - vm::modules::AptosModuleExtension, -}; -use aptos_vm_environment::environment::AptosEnvironment; -use aptos_vm_types::module_and_script_storage::AsAptosCodeStorage; -use move_binary_format::CompiledModule; -use move_core_types::{ - account_address::AccountAddress, - ident_str, - language_storage::ModuleId, - vm_status::{StatusCode, VMStatus}, -}; -use move_vm_runtime::{Module, ModuleStorage, WithRuntimeEnvironment}; -use move_vm_types::code::WithSize; -use parking_lot::Mutex; -use std::{ - hash::Hash, - ops::Deref, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, -}; - -/// Returns an invariant violation [VMStatus]. -fn invariant_violation(msg: &str) -> VMStatus { - VMStatus::error( - StatusCode::UNKNOWN_INVARIANT_VIOLATION_ERROR, - Some(msg.to_string()), - ) -} - -/// Represents previously executed block, recorded by [GlobalCacheManager]. -#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq, PartialOrd, Ord)] -enum BlockId { - /// No block has been executed yet. - Unset, - /// Block of transactions has been executed, with known or unknown hash. Usually, the hash is - /// [None] in tests, replay, etc. - Set(Option), -} - -impl BlockId { - /// Returns true if the ID corresponds to no executed blocks. - fn is_unset(&self) -> bool { - matches!(self, Self::Unset) - } -} - -/// Manages global caches, e.g., modules or execution environment. Should not be used concurrently. -struct GlobalCacheManagerInner { - config: GlobalCacheConfig, - - /// Cache for modules. It is read-only for any concurrent execution, and can only be mutated - /// when it is known that there are no concurrent accesses, e.g., at block boundaries. - /// [GlobalCacheManagerInner] must ensure that these invariants always hold. - module_cache: Arc>, - /// Identifies previously executed block, initially [BlockId::Unset]. - previous_block_id: Mutex, - /// Identifies the previously used execution environment, initially [None]. The environment, as - /// long as it does not change, it maintained across multiple block executions. - previous_environment: Mutex>, - - /// A marker that indicates that the state of global caches is ready for block execution. Used - /// to prevent concurrent block executions. - ready_for_next_block: AtomicBool, -} - -impl GlobalCacheManagerInner -where - K: Hash + Eq + Clone, - VC: Deref>, - E: WithSize, -{ - /// Returns a new instance of [GlobalCacheManagerInner] with default [GlobalCacheConfig]. - fn new_with_default_config() -> Self { - Self::new_with_config(GlobalCacheConfig::default()) - } - - /// Returns a new instance of [GlobalCacheManagerInner] with the provided [GlobalCacheConfig]. - fn new_with_config(config: GlobalCacheConfig) -> Self { - Self { - config, - module_cache: Arc::new(ReadOnlyModuleCache::empty()), - previous_block_id: Mutex::new(BlockId::Unset), - previous_environment: Mutex::new(None), - ready_for_next_block: AtomicBool::new(true), - } - } - - /// See the documentation for [GlobalCacheManager::mark_block_execution_start]. The only - /// difference here is that there is no framework prefetching. - fn mark_block_execution_start( - &self, - state_view: &impl StateView, - previous_block_id: Option, - ) -> Result<(), VMStatus> { - let recorded_previous_block_id = { - // Acquire a lock, and check if we are ready to execute the next block. - let previous_block_id = self.previous_block_id.lock(); - if !self.ready_for_next_block() { - let msg = "Trying to execute blocks concurrently over shared global state"; - return Err(invariant_violation(msg)); - } - - // Prepare for execution. Set the flag as not ready to ensure that blocks are not - // executed concurrently using the same cache. - self.mark_not_ready_for_next_block(); - *previous_block_id - }; - - // From here, we perform checks if we need to flush the global caches. If so, this variable - // is set to true. - let mut flush_all_caches = false; - - // Check 1: We must be executing on top of the state we have seen just before. - use BlockId::*; - match (recorded_previous_block_id, previous_block_id) { - // We execute on top of empty state, everything is ok. - (Unset, None) | (Unset, Some(_)) => {}, - - // We execute on top of different (maybe also unspecified) state. In this case, caches - // need to be reset. - (Set(None), None) | (Set(None), Some(_)) | (Set(Some(_)), None) => { - flush_all_caches = true; - }, - - // Otherwise, just check if block hashes do not match. - (Set(Some(recorded_hash)), Some(hash)) => { - if recorded_hash != hash { - flush_all_caches = true; - }; - }, - }; - - // Check 2: Reset global environment if it has changed. If so, caches needs to be flushed. - let new_environment = - AptosEnvironment::new_with_delayed_field_optimization_enabled(state_view); - let mut previous_environment = self.previous_environment.lock(); - match previous_environment.as_ref() { - Some(environment) => { - if environment != &new_environment { - *previous_environment = Some(new_environment); - flush_all_caches = true; - } - }, - None => { - // If the environment is not yet set, set it. - debug_assert!(self.previous_block_id.lock().is_unset()); - *previous_environment = Some(new_environment); - }, - } - - // Check 3: At this point, environment is set to the most-up-to-date value. Check the size - // of caches is within bounds. - let runtime_environment = previous_environment - .as_ref() - .expect("Environment has to be set") - .runtime_environment(); - let struct_name_index_map_size = match runtime_environment.struct_name_index_map_size() { - Err(err) => { - // Unlock the cache, reset all states, and return. - drop(previous_environment); - let err = self.reset_and_return_invariant_violation(&format!( - "Error when getting struct name index map size: {:?}", - err - )); - return Err(err); - }, - Ok(size) => size, - }; - - if struct_name_index_map_size > self.config.max_struct_name_index_map_size { - flush_all_caches = true; - } - if self.module_cache.size_in_bytes() > self.config.max_module_cache_size_in_bytes { - // Technically, if we flush modules we do not need to flush type caches, but we unify - // flushing logic for easier reasoning. - flush_all_caches = true; - } - - // Finally, if flag is set, flush the caches. - if flush_all_caches { - runtime_environment.flush_struct_name_and_info_caches(); - self.module_cache.flush_unchecked(); - } - - Ok(()) - } - - /// See the documentation for [GlobalCacheManager::mark_block_execution_end]. - fn mark_block_execution_end( - &self, - executed_block_id: Option, - ) -> Result<(), VMStatus> { - // We are done executing a block, reset the previous block id. Do everything under lock to - // ensure it is not possible to execute blocks concurrently. - let mut previous_block_id = self.previous_block_id.lock(); - if self.ready_for_next_block() { - // This means we are executing concurrently. If so, all-but-one thread will return an - // error. Note that the caches are still consistent for that one thread. - let msg = "Should not be possible to mark block execution end for execution-ready \ - global cache, check if blocks are executed concurrently"; - return Err(invariant_violation(msg)); - } - *previous_block_id = BlockId::Set(executed_block_id); - - // Set the flag that the global cache is ready for next execution. - self.mark_ready_for_next_block(); - - Ok(()) - } - - /// Returns true of a next block is ready be executed. This is the case only when: - /// 1. the global caches have just been created, or - /// 2. [GlobalCacheManagerInner::mark_block_execution_end] was called indicating that - /// previous block execution has finished. - fn ready_for_next_block(&self) -> bool { - self.ready_for_next_block.load(Ordering::SeqCst) - } - - /// Marks caches as ready for next block execution. - fn mark_ready_for_next_block(&self) { - self.ready_for_next_block.store(true, Ordering::SeqCst); - } - - /// Marks caches as not ready for next block execution. - fn mark_not_ready_for_next_block(&self) { - self.ready_for_next_block.store(false, Ordering::SeqCst); - } - - /// Resets all states (under a lock) as if global caches are empty and no blocks have been - /// executed so far. Returns an invariant violation error. - fn reset_and_return_invariant_violation(&self, msg: &str) -> VMStatus { - // Lock to reset the state under lock. - let mut previous_block_id = self.previous_block_id.lock(); - - // 1. Should be ready for next execution. - self.mark_not_ready_for_next_block(); - // 2. Should contain no environment. - *self.previous_environment.lock() = None; - // 3. Module cache is empty. - self.module_cache.flush_unchecked(); - // 4. Block ID is unset. - *previous_block_id = BlockId::Unset; - - // State reset, unlock. - drop(previous_block_id); - - invariant_violation(msg) - } -} - -/// Same as [GlobalCacheManagerInner], but uses concrete types used by execution on Aptos instead -/// of generics. Allows us not to propagate generic type parameters everywhere (for now), but be -/// able to mock and test. -pub struct GlobalCacheManager { - inner: GlobalCacheManagerInner, -} - -impl GlobalCacheManager { - /// Returns a new instance of [GlobalCacheManager] with default [GlobalCacheConfig]. - pub fn new_with_default_config() -> Self { - Self { - inner: GlobalCacheManagerInner::new_with_default_config(), - } - } - - /// Sets the state of global caches prior to block execution on top of the provided state (with - /// the block ID). Should always sbe called prior to block execution. - /// - /// The caches stored globally (modules, struct name re-indexing map and type caches) are all - /// flushed if: - /// 1. Previously executed block ID does not match the provided value. - /// 2. The environment has changed for this state. - /// 3. The size of the struct name re-indexing map is too large. - /// 4. The size (in bytes) of the module cache is too large. - /// - /// Additionally, if cache is empty, prefetches the framework code into it. - /// - /// Marks [GlobalCacheManager] as not ready for next block execution. If called concurrently, - /// only a single invocation ever succeeds and other calls return an error. - pub fn mark_block_execution_start( - &self, - state_view: &impl StateView, - previous_block_id: Option, - ) -> Result<(), VMStatus> { - self.inner - .mark_block_execution_start(state_view, previous_block_id)?; - - if self.inner.config.prefetch_framework_code && self.module_cache().num_modules() == 0 { - let code_storage = state_view.as_aptos_code_storage(self.environment()?); - - // If framework code exists in storage, the transitive closure will be verified and - // cached. - let result = code_storage - .fetch_verified_module(&AccountAddress::ONE, ident_str!("transaction_validation")); - - match result { - Ok(Some(_)) => { - // Framework must have been loaded. Drain verified modules from local cache - // into global cache. - let verified_module_code_iter = code_storage - .into_verified_module_code_iter() - .map_err(|err| { - let msg = format!( - "Unable to convert cached modules into verified code: {:?}", - err - ); - self.inner.reset_and_return_invariant_violation(&msg) - })?; - self.inner - .module_cache - .insert_verified_unchecked(verified_module_code_iter) - .map_err(|err| { - let msg = format!("Unable to cache verified framework: {:?}", err); - self.inner.reset_and_return_invariant_violation(&msg) - })?; - }, - Ok(None) => { - // No framework in the state, do nothing. - }, - Err(err) => { - // There should be no errors when pre-fetching the framework, if there are, we - // better return an error here. - let msg = format!("Error when pre-fetching the framework: {:?}", err); - return Err(self.inner.reset_and_return_invariant_violation(&msg)); - }, - } - } - Ok(()) - } - - /// Should always be called after block execution. Sets the [GlobalCacheManager] to be ready - /// for execution (and if it is already execution-ready, returns an error). Sets the ID for the - /// executed block so that the next execution can check it. - pub fn mark_block_execution_end( - &self, - executed_block_id: Option, - ) -> Result<(), VMStatus> { - self.inner.mark_block_execution_end(executed_block_id) - } - - /// Returns the cached environment set by [GlobalCacheManager::mark_block_execution_start]. If - /// it has not been set, an invariant violation error is returned. - pub fn environment(&self) -> Result { - self.inner - .previous_environment - .lock() - .clone() - .ok_or_else(|| { - // Note: we do not expect this to happen (this is really more of an unreachable). - invariant_violation("Environment must always be set at block execution start") - }) - } - - /// Returns the global module cache. - pub fn module_cache( - &self, - ) -> Arc> { - self.inner.module_cache.clone() - } -} - -#[cfg(test)] -mod test { - use super::*; - use aptos_types::{ - on_chain_config::{FeatureFlag, Features, OnChainConfig}, - state_store::{state_key::StateKey, state_value::StateValue, MockStateView}, - }; - use claims::{assert_err, assert_ok}; - use move_vm_types::code::{ - mock_verified_code, MockDeserializedCode, MockExtension, MockVerifiedCode, - }; - use std::{collections::HashMap, thread, thread::JoinHandle}; - use test_case::test_case; - - /// Joins threads. Succeeds only if a single handle evaluates to [Ok] and the rest are [Err]s. - fn join_and_assert_single_ok(handles: Vec>>) { - let mut num_oks = 0; - let mut num_errs = 0; - - let num_handles = handles.len(); - for handle in handles { - let result = handle.join().unwrap(); - if result.is_ok() { - num_oks += 1; - } else { - num_errs += 1; - } - } - assert_eq!(num_oks, 1); - assert_eq!(num_errs, num_handles - 1); - } - - #[test] - fn environment_should_always_be_set() { - let global_cache_manager = GlobalCacheManager::new_with_default_config(); - assert!(global_cache_manager.environment().is_err()); - - let state_view = MockStateView::empty(); - assert_ok!(global_cache_manager.mark_block_execution_start(&state_view, None)); - assert_ok!(global_cache_manager.environment()); - } - - #[test] - fn mark_ready() { - let global_cache_manager = GlobalCacheManagerInner::< - u32, - MockDeserializedCode, - MockVerifiedCode, - MockExtension, - >::new_with_default_config(); - assert!(global_cache_manager.ready_for_next_block()); - - global_cache_manager.mark_not_ready_for_next_block(); - assert!(!global_cache_manager.ready_for_next_block()); - - global_cache_manager.mark_ready_for_next_block(); - assert!(global_cache_manager.ready_for_next_block()); - } - - #[test] - fn mark_execution_start_when_different_environment() { - let state_view = MockStateView::empty(); - let global_cache_manager = GlobalCacheManagerInner::new_with_default_config(); - - global_cache_manager - .module_cache - .insert(0, mock_verified_code(0, MockExtension::new(8))); - global_cache_manager - .module_cache - .insert(1, mock_verified_code(1, MockExtension::new(8))); - assert_eq!(global_cache_manager.module_cache.num_modules(), 2); - - assert_ok!(global_cache_manager.mark_block_execution_start(&state_view, None)); - let old_environment = global_cache_manager - .previous_environment - .lock() - .clone() - .unwrap(); - assert_ok!(global_cache_manager.mark_block_execution_end(Some(HashValue::zero()))); - assert_eq!(global_cache_manager.module_cache.num_modules(), 2); - - // Tweak feature flags to force a different config. - let mut features = old_environment.features().clone(); - assert!(features.is_enabled(FeatureFlag::LIMIT_VM_TYPE_SIZE)); - features.disable(FeatureFlag::LIMIT_VM_TYPE_SIZE); - let bytes = bcs::to_bytes(&features).unwrap(); - let state_key = StateKey::resource(Features::address(), &Features::struct_tag()).unwrap(); - - let state_view = MockStateView::new(HashMap::from([( - state_key, - StateValue::new_legacy(bytes.into()), - )])); - - // We use the same previous ID, but the cache is still flushed: the environment changed. - assert_ok!( - global_cache_manager.mark_block_execution_start(&state_view, Some(HashValue::zero())) - ); - assert_eq!(global_cache_manager.module_cache.num_modules(), 0); - - let new_environment = global_cache_manager - .previous_environment - .lock() - .clone() - .unwrap(); - assert!(old_environment != new_environment); - } - - #[test] - fn mark_execution_start_when_too_many_types() { - // TODO(loader_v2): - // Propagate type caches/struct name index map APIs to here so we can mock & test. - } - - #[test] - fn mark_execution_start_when_module_cache_is_too_large() { - let state_view = MockStateView::empty(); - - let config = GlobalCacheConfig { - max_module_cache_size_in_bytes: 8, - ..Default::default() - }; - let global_cache_manager = GlobalCacheManagerInner::new_with_config(config); - - global_cache_manager - .module_cache - .insert(0, mock_verified_code(0, MockExtension::new(8))); - global_cache_manager - .module_cache - .insert(1, mock_verified_code(1, MockExtension::new(24))); - assert_eq!(global_cache_manager.module_cache.num_modules(), 2); - assert_eq!(global_cache_manager.module_cache.size_in_bytes(), 32); - - // Cache is too large, should be flushed for next block. - assert_ok!( - global_cache_manager.mark_block_execution_start(&state_view, Some(HashValue::random())) - ); - assert_eq!(global_cache_manager.module_cache.num_modules(), 0); - assert_eq!(global_cache_manager.module_cache.size_in_bytes(), 0); - } - - #[test_case(None)] - #[test_case(Some(HashValue::zero()))] - fn mark_execution_start_when_unset(previous_block_id: Option) { - let state_view = MockStateView::empty(); - let global_cache_manager = GlobalCacheManagerInner::new_with_default_config(); - - global_cache_manager - .module_cache - .insert(0, mock_verified_code(0, MockExtension::new(8))); - assert_eq!(global_cache_manager.module_cache.num_modules(), 1); - - // If executed on top of unset state, or the state with matching previous hash, the cache - // is not flushed. - assert_ok!(global_cache_manager.mark_block_execution_start(&state_view, previous_block_id)); - assert_eq!(global_cache_manager.module_cache.num_modules(), 1); - assert!(!global_cache_manager.ready_for_next_block()); - } - - #[test_case(None, None)] - #[test_case(None, Some(HashValue::zero()))] - #[test_case(Some(HashValue::zero()), None)] - #[test_case(Some(HashValue::zero()), Some(HashValue::zero()))] - #[test_case(Some(HashValue::from_u64(0)), Some(HashValue::from_u64(1)))] - fn mark_execution_start_when_set( - recorded_previous_block_id: Option, - previous_block_id: Option, - ) { - let state_view = MockStateView::empty(); - let global_cache_manager = GlobalCacheManagerInner::new_with_default_config(); - - assert_ok!( - global_cache_manager.mark_block_execution_start(&state_view, Some(HashValue::random())) - ); - assert_ok!(global_cache_manager.mark_block_execution_end(recorded_previous_block_id)); - - global_cache_manager - .module_cache - .insert(0, mock_verified_code(0, MockExtension::new(8))); - assert_eq!(global_cache_manager.module_cache.num_modules(), 1); - - assert_ok!(global_cache_manager.mark_block_execution_start(&state_view, previous_block_id)); - assert!(!global_cache_manager.ready_for_next_block()); - - if recorded_previous_block_id.is_some() && recorded_previous_block_id == previous_block_id { - // In this case both IDs match, no cache flushing. - assert_eq!(global_cache_manager.module_cache.num_modules(), 1); - } else { - // If previous block IDs do not match, or are unknown, caches must be flushed! - assert_eq!(global_cache_manager.module_cache.num_modules(), 0); - } - } - - #[test] - fn mark_execution_start_concurrent() { - let state_view = Box::new(MockStateView::empty()); - let state_view: &'static _ = Box::leak(state_view); - - let global_cache_manager = Arc::new(GlobalCacheManagerInner::< - u32, - MockDeserializedCode, - MockVerifiedCode, - MockExtension, - >::new_with_default_config()); - assert!(global_cache_manager.ready_for_next_block()); - - let mut handles = vec![]; - for _ in 0..32 { - let handle = thread::spawn({ - let global_cache_manager = global_cache_manager.clone(); - move || global_cache_manager.mark_block_execution_start(state_view, None) - }); - handles.push(handle); - } - join_and_assert_single_ok(handles); - } - - #[test_case(None)] - #[test_case(Some(HashValue::from_u64(0)))] - fn mark_block_execution_end(block_id: Option) { - let global_cache_manager = GlobalCacheManagerInner::< - u32, - MockDeserializedCode, - MockVerifiedCode, - MockExtension, - >::new_with_default_config(); - assert!(global_cache_manager.previous_block_id.lock().is_unset()); - - // The global cache is ready, so we cannot mark execution end. - assert_err!(global_cache_manager.mark_block_execution_end(block_id)); - - global_cache_manager.mark_not_ready_for_next_block(); - let previous_block_id = *global_cache_manager.previous_block_id.lock(); - assert!(previous_block_id.is_unset()); - assert_ok!(global_cache_manager.mark_block_execution_end(block_id)); - - // The previous block ID should be set now, and the state is ready. - let new_block_id = *global_cache_manager.previous_block_id.lock(); - assert_eq!(new_block_id, BlockId::Set(block_id)); - assert!(global_cache_manager.ready_for_next_block()); - - global_cache_manager.mark_not_ready_for_next_block(); - let next_block_id = Some(HashValue::from_u64(1)); - assert_ok!(global_cache_manager.mark_block_execution_end(next_block_id)); - - // Previous block ID is again reset. - let new_block_id = *global_cache_manager.previous_block_id.lock(); - assert_eq!(new_block_id, BlockId::Set(next_block_id)); - } - - #[test] - fn mark_block_execution_end_concurrent() { - let global_cache_manager = Arc::new(GlobalCacheManagerInner::< - u32, - MockDeserializedCode, - MockVerifiedCode, - MockExtension, - >::new_with_default_config()); - global_cache_manager.mark_not_ready_for_next_block(); - - let mut handles = vec![]; - for _ in 0..32 { - let handle = thread::spawn({ - let global_cache_manager = global_cache_manager.clone(); - move || global_cache_manager.mark_block_execution_end(None) - }); - handles.push(handle); - } - join_and_assert_single_ok(handles); - } -} diff --git a/aptos-move/aptos-transaction-benchmarks/Cargo.toml b/aptos-move/aptos-transaction-benchmarks/Cargo.toml index 43674345b0732..3fe147d12d47a 100644 --- a/aptos-move/aptos-transaction-benchmarks/Cargo.toml +++ b/aptos-move/aptos-transaction-benchmarks/Cargo.toml @@ -17,7 +17,6 @@ aptos-bitvec = { workspace = true } aptos-block-executor = { workspace = true } aptos-block-partitioner = { workspace = true } aptos-crypto = { workspace = true } -aptos-global-cache-manager = { workspace = true } aptos-language-e2e-tests = { workspace = true } aptos-logger = { workspace = true } aptos-metrics-core = { workspace = true } diff --git a/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs b/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs index 1e177095ef345..a6f4b68a9ce1d 100644 --- a/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs +++ b/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs @@ -8,7 +8,6 @@ use aptos_block_partitioner::{ v2::config::PartitionerV2Config, BlockPartitioner, PartitionerConfig, }; use aptos_crypto::HashValue; -use aptos_global_cache_manager::GlobalCacheManager; use aptos_language_e2e_tests::{ account_universe::{AUTransactionGen, AccountPickStyle, AccountUniverse, AccountUniverseGen}, data_store::FakeDataStore, @@ -212,11 +211,6 @@ where maybe_block_gas_limit: Option, ) -> (Vec, usize) { let block_size = transactions.len(); - let global_cache_manager = GlobalCacheManager::new_with_default_config(); - global_cache_manager - .mark_block_execution_start(self.state_view.as_ref(), None) - .unwrap(); - let timer = Instant::now(); let output = BlockAptosVM::execute_block::< _, @@ -224,14 +218,13 @@ where >( transactions, self.state_view.as_ref(), - &global_cache_manager, + None, BlockExecutorConfig::new_maybe_block_limit(1, maybe_block_gas_limit), None, ) .expect("VM should not fail to start") .into_transaction_outputs_forced(); let exec_time = timer.elapsed().as_millis(); - global_cache_manager.mark_block_execution_end(None).unwrap(); (output, block_size * 1000 / exec_time as usize) } @@ -267,11 +260,6 @@ where maybe_block_gas_limit: Option, ) -> (Vec, usize) { let block_size = transactions.len(); - let global_cache_manager = GlobalCacheManager::new_with_default_config(); - global_cache_manager - .mark_block_execution_start(self.state_view.as_ref(), None) - .unwrap(); - let timer = Instant::now(); let output = BlockAptosVM::execute_block::< _, @@ -279,7 +267,7 @@ where >( transactions, self.state_view.as_ref(), - &global_cache_manager, + None, BlockExecutorConfig::new_maybe_block_limit( concurrency_level_per_shard, maybe_block_gas_limit, @@ -290,8 +278,6 @@ where .into_transaction_outputs_forced(); let exec_time = timer.elapsed().as_millis(); - global_cache_manager.mark_block_execution_end(None).unwrap(); - (output, block_size * 1000 / exec_time as usize) } @@ -301,7 +287,7 @@ where partitioned_txns: Option, run_par: bool, run_seq: bool, - conurrency_level_per_shard: usize, + concurrency_level_per_shard: usize, maybe_block_gas_limit: Option, ) -> (usize, usize) { let (output, par_tps) = if run_par { @@ -309,13 +295,13 @@ where let (output, tps) = if self.is_shareded() { self.execute_benchmark_sharded( partitioned_txns.unwrap(), - conurrency_level_per_shard, + concurrency_level_per_shard, maybe_block_gas_limit, ) } else { self.execute_benchmark_parallel( &transactions, - conurrency_level_per_shard, + concurrency_level_per_shard, maybe_block_gas_limit, ) }; diff --git a/aptos-move/aptos-transactional-test-harness/Cargo.toml b/aptos-move/aptos-transactional-test-harness/Cargo.toml index af96620411278..c0e44b746718d 100644 --- a/aptos-move/aptos-transactional-test-harness/Cargo.toml +++ b/aptos-move/aptos-transactional-test-harness/Cargo.toml @@ -19,7 +19,6 @@ aptos-cached-packages = { workspace = true } aptos-crypto = { workspace = true } aptos-framework = { workspace = true } aptos-gas-schedule = { workspace = true } -aptos-global-cache-manager = { workspace = true } aptos-language-e2e-tests = { workspace = true } aptos-resource-viewer = { workspace = true } aptos-storage-interface = { workspace = true } diff --git a/aptos-move/aptos-transactional-test-harness/src/aptos_test_harness.rs b/aptos-move/aptos-transactional-test-harness/src/aptos_test_harness.rs index e07aec25a6fc5..a96506db49b47 100644 --- a/aptos-move/aptos-transactional-test-harness/src/aptos_test_harness.rs +++ b/aptos-move/aptos-transactional-test-harness/src/aptos_test_harness.rs @@ -515,25 +515,9 @@ impl<'a> AptosTestAdapter<'a> { fn run_transaction(&mut self, txn: Transaction) -> Result { let txn_block = vec![txn]; let sig_verified_block = into_signature_verified_block(txn_block); - let onchain_config = BlockExecutorConfigFromOnchain { - // TODO fetch values from state? - // Or should we just use execute_block_no_limit ? - block_gas_limit_type: BlockGasLimitType::Limit(30000), - }; - let (mut outputs, _) = AptosVMBlockExecutor::new() - .execute_block(&sig_verified_block, &self.storage.clone(), onchain_config)? - .into_inner(); - - let global_cache_manager = GlobalCacheManager::new_with_default_config(); - global_cache_manager.mark_block_execution_start(&state_view, None)?; - let result = AptosVM::execute_block_no_limit( - &sig_verified_block, - &state_view, - &global_cache_manager, - ); - global_cache_manager.mark_block_execution_end(None)?; + let mut outputs = AptosVMBlockExecutor::new() + .execute_block_no_limit(&sig_verified_block, &self.storage.clone())?; - let mut outputs = result?; assert_eq!(outputs.len(), 1); let output = outputs.pop().unwrap(); diff --git a/aptos-move/aptos-vm-profiling/Cargo.toml b/aptos-move/aptos-vm-profiling/Cargo.toml index dcb4c5701e4c5..68bfadfcd39ea 100644 --- a/aptos-move/aptos-vm-profiling/Cargo.toml +++ b/aptos-move/aptos-vm-profiling/Cargo.toml @@ -19,7 +19,6 @@ smallvec = { workspace = true } aptos-cached-packages = { workspace = true } aptos-gas-schedule = { workspace = true } -aptos-global-cache-manager = { workspace = true } aptos-language-e2e-tests = { workspace = true } aptos-move-stdlib = { workspace = true } aptos-native-interface = { workspace = true } diff --git a/aptos-move/aptos-vm-profiling/src/bins/run_aptos_p2p.rs b/aptos-move/aptos-vm-profiling/src/bins/run_aptos_p2p.rs index 3ad2a722eab4a..02a5ed3c37a8c 100644 --- a/aptos-move/aptos-vm-profiling/src/bins/run_aptos_p2p.rs +++ b/aptos-move/aptos-vm-profiling/src/bins/run_aptos_p2p.rs @@ -48,13 +48,9 @@ fn main() -> Result<()> { }) .collect(); - let res = AptosVMBlockExecutor::new().execute_block_no_limit(&txns, &state_store)?; + let outputs = AptosVMBlockExecutor::new().execute_block_no_limit(&txns, &state_store)?; for i in 0..NUM_TXNS { - assert!(result.as_ref().unwrap()[i as usize] - .status() - .status() - .unwrap() - .is_success()); + assert!(outputs[i as usize].status().status().unwrap().is_success()); } Ok(()) diff --git a/aptos-move/aptos-vm-types/src/module_and_script_storage/state_view_adapter.rs b/aptos-move/aptos-vm-types/src/module_and_script_storage/state_view_adapter.rs index 0e78bfce3d1b9..9a40888290199 100644 --- a/aptos-move/aptos-vm-types/src/module_and_script_storage/state_view_adapter.rs +++ b/aptos-move/aptos-vm-types/src/module_and_script_storage/state_view_adapter.rs @@ -84,8 +84,7 @@ impl<'s, S: StateView, E: WithRuntimeEnvironment> AptosCodeStorageAdapter<'s, S, } /// Drains cached verified modules from the code storage, transforming them into format used by - /// global caches (i.e., with extension and no versioning). Should only be called when the code - /// storage borrows [StateView]. + /// global caches. Should only be called when the code storage borrows [StateView]. pub fn into_verified_module_code_iter( self, ) -> Result< @@ -134,7 +133,6 @@ impl<'s, S: StateView, E: WithRuntimeEnvironment> AptosCodeStorageAdapter<'s, S, PanicError::CodeInvariantError(msg) })?; - // We are using storage version here. let module = ModuleCode::from_verified_ref(verified_code, Arc::new(extension)); modules_to_add.push((key, Arc::new(module))) } diff --git a/aptos-move/aptos-vm/Cargo.toml b/aptos-move/aptos-vm/Cargo.toml index abae28d9434ef..a3458288716d2 100644 --- a/aptos-move/aptos-vm/Cargo.toml +++ b/aptos-move/aptos-vm/Cargo.toml @@ -24,7 +24,6 @@ aptos-framework = { workspace = true } aptos-gas-algebra = { workspace = true } aptos-gas-meter = { workspace = true } aptos-gas-schedule = { workspace = true } -aptos-global-cache-manager = { workspace = true } aptos-infallible = { workspace = true } aptos-logger = { workspace = true } aptos-memory-usage-tracker = { workspace = true } diff --git a/aptos-move/aptos-vm/src/aptos_vm.rs b/aptos-move/aptos-vm/src/aptos_vm.rs index 8f5f411416101..e85934e3fbaa3 100644 --- a/aptos-move/aptos-vm/src/aptos_vm.rs +++ b/aptos-move/aptos-vm/src/aptos_vm.rs @@ -27,7 +27,9 @@ use crate::{ VMBlockExecutor, VMValidator, }; use anyhow::anyhow; -use aptos_block_executor::txn_commit_hook::NoOpTransactionCommitHook; +use aptos_block_executor::{ + code_cache_global_manager::ModuleCacheManager, txn_commit_hook::NoOpTransactionCommitHook, +}; use aptos_crypto::HashValue; use aptos_framework::{ natives::{code::PublishRequest, randomness::RandomnessContext}, @@ -36,7 +38,6 @@ use aptos_framework::{ use aptos_gas_algebra::{Gas, GasQuantity, NumBytes, Octa}; use aptos_gas_meter::{AptosGasMeter, GasAlgebra}; use aptos_gas_schedule::{AptosGasParameters, VMGasParameters}; -use aptos_global_cache_manager::GlobalCacheManager; use aptos_logger::{enabled, prelude::*, Level}; use aptos_metrics_core::TimerHelper; #[cfg(any(test, feature = "testing"))] @@ -44,7 +45,10 @@ use aptos_types::state_store::StateViewId; use aptos_types::{ account_config::{self, new_block_event_key, AccountResource}, block_executor::{ - config::{BlockExecutorConfig, BlockExecutorConfigFromOnchain, BlockExecutorLocalConfig}, + config::{ + BlockExecutorConfig, BlockExecutorConfigFromOnchain, BlockExecutorLocalConfig, + BlockExecutorModuleCacheLocalConfig, + }, partitioner::PartitionedTransactions, }, block_metadata::BlockMetadata, @@ -66,6 +70,7 @@ use aptos_types::{ TransactionAuxiliaryData, TransactionOutput, TransactionPayload, TransactionStatus, VMValidatorResult, ViewFunctionOutput, WriteSetPayload, }, + vm::modules::AptosModuleExtension, vm_status::{AbortLocation, StatusCode, VMStatus}, }; use aptos_utils::aptos_try; @@ -112,7 +117,7 @@ use move_vm_metrics::{Timer, VM_TIMER}; use move_vm_runtime::{ logging::expect_no_verification_errors, module_traversal::{TraversalContext, TraversalStorage}, - RuntimeEnvironment, WithRuntimeEnvironment, + Module, RuntimeEnvironment, WithRuntimeEnvironment, }; use move_vm_types::gas::{GasMeter, UnmeteredGasMeter}; use num_cpus; @@ -2774,29 +2779,32 @@ impl AptosVM { /// Transaction execution: AptosVM /// Executing conflicts: in the input order, via BlockSTM, /// State: BlockSTM-provided MVHashMap-based view with caching -pub struct AptosVMBlockExecutor; +pub struct AptosVMBlockExecutor { + /// Manages module cache and execution environment of this block executor. Users of executor + /// must use manager's API to ensure the correct state of caches. + module_cache_manager: + ModuleCacheManager, +} -// Executor external API impl VMBlockExecutor for AptosVMBlockExecutor { - // NOTE: At the moment there are no persistent caches that live past the end of a block (that's - // why AptosVMBlockExecutor has no state) - // There are some cache invalidation issues around transactions publishing code that need to be - // sorted out before that's possible. - fn new() -> Self { - Self + Self { + module_cache_manager: ModuleCacheManager::new(), + } + } + + fn module_cache_manager( + &self, + ) -> Option< + &ModuleCacheManager, + > { + Some(&self.module_cache_manager) } - /// Execute a block of `transactions`. The output vector will have the exact same length as the - /// input vector. The discarded transactions will be marked as `TransactionStatus::Discard` and - /// have an empty `WriteSet`. Also `state_view` is immutable, and does not have interior - /// mutability. Writes to be applied to the data view are encoded in the write set part of a - /// transaction output. fn execute_block( &self, transactions: &[SignatureVerifiedTransaction], state_view: &(impl StateView + Sync), - global_cache_manager: &GlobalCacheManager, onchain_config: BlockExecutorConfigFromOnchain, ) -> Result, VMStatus> { fail_point!("move_adapter::execute_block", |_| { @@ -2819,12 +2827,13 @@ impl VMBlockExecutor for AptosVMBlockExecutor { >( transactions, state_view, - global_cache_manager, + Some(&self.module_cache_manager), BlockExecutorConfig { local: BlockExecutorLocalConfig { concurrency_level: AptosVM::get_concurrency_level(), allow_fallback: true, discard_failed_blocks: AptosVM::get_discard_failed_blocks(), + module_cache_config: BlockExecutorModuleCacheLocalConfig::default(), }, onchain: onchain_config, }, diff --git a/aptos-move/aptos-vm/src/block_executor/mod.rs b/aptos-move/aptos-vm/src/block_executor/mod.rs index 5fae5f7faec66..5e503344faaa3 100644 --- a/aptos-move/aptos-vm/src/block_executor/mod.rs +++ b/aptos-move/aptos-vm/src/block_executor/mod.rs @@ -12,11 +12,12 @@ use aptos_aggregator::{ delayed_change::DelayedChange, delta_change_set::DeltaOp, resolver::TAggregatorV1View, }; use aptos_block_executor::{ + code_cache_global::GlobalModuleCache, code_cache_global_manager::ModuleCacheManager, errors::BlockExecutionError, executor::BlockExecutor, task::TransactionOutput as BlockExecutorTransactionOutput, txn_commit_hook::TransactionCommitHook, types::InputOutputKey, }; -use aptos_global_cache_manager::GlobalCacheManager; +use aptos_crypto::HashValue; use aptos_infallible::Mutex; use aptos_types::{ block_executor::config::BlockExecutorConfig, @@ -29,18 +30,22 @@ use aptos_types::{ signature_verified_transaction::SignatureVerifiedTransaction, BlockOutput, TransactionOutput, TransactionStatus, }, + vm::modules::AptosModuleExtension, write_set::WriteOp, }; +use aptos_vm_environment::environment::AptosEnvironment; use aptos_vm_logging::{flush_speculative_logs, init_speculative_logs}; use aptos_vm_types::{ abstract_write_op::AbstractResourceWriteOp, module_write_set::ModuleWrite, output::VMOutput, resolver::ResourceGroupSize, }; +use move_binary_format::{errors::Location, CompiledModule}; use move_core_types::{ - language_storage::StructTag, + language_storage::{ModuleId, StructTag}, value::MoveTypeLayout, vm_status::{StatusCode, VMStatus}, }; +use move_vm_runtime::{Module, WithRuntimeEnvironment}; use move_vm_types::delayed_values::delayed_field_id::DelayedFieldID; use once_cell::sync::{Lazy, OnceCell}; use std::{ @@ -386,21 +391,62 @@ impl BlockExecutorTransactionOutput for AptosTransactionOutput { } } +// fn prefetch_aptos_framework(&self, state_view: &impl StateView) -> anyhow::Result<()> { +// let environment = self +// .environment() +// .ok_or_else(|| anyhow!("Environment must be set before fetching the framework"))?; +// let code_storage = state_view.as_aptos_code_storage(environment); +// +// // If framework code exists in storage, the transitive closure will be verified and cached. +// let result = code_storage +// .fetch_verified_module(&AccountAddress::ONE, ident_str!("transaction_validation")); +// +// match result { +// Ok(Some(_)) => { +// // Framework must have been loaded. Drain verified modules from local cache into +// // global cache. +// let verified_module_code_iter = code_storage +// .into_verified_module_code_iter() +// .map_err(|err| { +// anyhow!( +// "Unable to convert cached modules into verified code: {:?}", +// err +// ) +// })?; +// self.module_cache +// .insert_verified_unchecked(verified_module_code_iter) +// .map_err(|err| anyhow!("Unable to cache verified framework: {:?}", err))?; +// }, +// Ok(None) => { +// // No framework in the state, do nothing. +// }, +// Err(err) => { +// // There should be no errors when pre-fetching the framework, if there are, we +// // better return an error here. +// bail!("Error when pre-fetching the framework: {:?}", err); +// }, +// } +// Ok(()) +// } + pub struct BlockAptosVM; impl BlockAptosVM { - fn execute_block_on_thread_pool< + pub fn execute_block_on_thread_pool< S: StateView + Sync, L: TransactionCommitHook, >( executor_thread_pool: Arc, signature_verified_block: &[SignatureVerifiedTransaction], state_view: &S, - global_cache_manager: &GlobalCacheManager, + module_cache_manager: Option< + &ModuleCacheManager, + >, config: BlockExecutorConfig, transaction_commit_listener: Option, ) -> Result, VMStatus> { let _timer = BLOCK_EXECUTOR_EXECUTE_BLOCK_SECONDS.start_timer(); + let num_txns = signature_verified_block.len(); if state_view.id() != StateViewId::Miscellaneous { // Speculation is disabled in Miscellaneous context, which is used by testing and @@ -410,6 +456,51 @@ impl BlockAptosVM { BLOCK_EXECUTOR_CONCURRENCY.set(config.local.concurrency_level as i64); + // We should be checking different module cache configurations here. + let module_cache_config = &config.local.module_cache_config; + + let (environment, module_cache) = match module_cache_manager { + Some(module_cache_manager) if !module_cache_manager.mark_executing() => { + let environment = + module_cache_manager.get_or_initialize_environment_unchecked(state_view); + let module_cache = module_cache_manager.module_cache(); + (environment, module_cache) + }, + _ => { + // Either we do not have global caches , in which case we can create new ones, or + // something went wrong, and we were not able to mark the state as executing. In + // this case, fallback to empty caches. Note that the alert should have been raised + // during marking. + let environment = + AptosEnvironment::new_with_delayed_field_optimization_enabled(state_view); + let module_cache = Arc::new(GlobalModuleCache::empty()); + (environment, module_cache) + }, + }; + + // Check 1: struct re-indexing map is not too large. If it is, we flush the cache. Also, we + // need to flush modules because they store indices into re-indexing map. + let runtime_environment = environment.runtime_environment(); + let struct_name_index_map_size = runtime_environment + .struct_name_index_map_size() + .map_err(|err| err.finish(Location::Undefined).into_vm_status())?; + if struct_name_index_map_size > module_cache_config.max_struct_name_index_map_size { + module_cache.flush_unchecked(); + runtime_environment.flush_struct_name_and_info_caches(); + } + + // Check 2: If the module cache is too big, flush it. + if module_cache + .size_in_bytes_is_greater_than(module_cache_config.max_module_cache_size_in_bytes) + { + module_cache.flush_unchecked(); + } + + // Finally, to avoid cold starts, fetch the framework code prior to block execution. + if module_cache.num_modules() == 0 && module_cache_config.prefetch_framework_code { + // TODO: prefetch. + } + let executor = BlockExecutor::< SignatureVerifiedTransaction, AptosExecutorTask, @@ -419,12 +510,22 @@ impl BlockAptosVM { >::new( config, executor_thread_pool, - global_cache_manager.module_cache(), + module_cache, transaction_commit_listener, ); - let environment = global_cache_manager.environment()?; let ret = executor.execute_block(environment, signature_verified_block, state_view); + if let Some(module_cache_manager) = module_cache_manager { + if !module_cache_manager.mark_done() { + // Something is wrong as we were not able to mark execution as done. Return an + // error. + return Err(VMStatus::error( + StatusCode::UNKNOWN_INVARIANT_VIOLATION_ERROR, + Some("Unable to mark block execution as done".to_string()), + )); + } + } + match ret { Ok(block_output) => { let (transaction_outputs, block_end_info) = block_output.into_inner(); @@ -455,31 +556,6 @@ impl BlockAptosVM { } } - pub fn execute_block_on_thread_pool_without_global_caches< - S: StateView + Sync, - L: TransactionCommitHook, - >( - executor_thread_pool: Arc, - signature_verified_block: &[SignatureVerifiedTransaction], - state_view: &S, - config: BlockExecutorConfig, - transaction_commit_listener: Option, - ) -> Result, VMStatus> { - let global_cache_manager = GlobalCacheManager::new_with_default_config(); - global_cache_manager.mark_block_execution_start(state_view, None)?; - - let result = Self::execute_block_on_thread_pool::( - executor_thread_pool, - signature_verified_block, - state_view, - &global_cache_manager, - config, - transaction_commit_listener, - ); - global_cache_manager.mark_block_execution_end(None)?; - result - } - /// Uses shared thread pool to execute blocks. pub fn execute_block< S: StateView + Sync, @@ -487,7 +563,9 @@ impl BlockAptosVM { >( signature_verified_block: &[SignatureVerifiedTransaction], state_view: &S, - global_cache_manager: &GlobalCacheManager, + module_cache_manager: Option< + &ModuleCacheManager, + >, config: BlockExecutorConfig, transaction_commit_listener: Option, ) -> Result, VMStatus> { @@ -495,7 +573,7 @@ impl BlockAptosVM { Arc::clone(&RAYON_EXEC_POOL), signature_verified_block, state_view, - global_cache_manager, + module_cache_manager, config, transaction_commit_listener, ) diff --git a/aptos-move/aptos-vm/src/lib.rs b/aptos-move/aptos-vm/src/lib.rs index e99784753a552..977287304b10c 100644 --- a/aptos-move/aptos-vm/src/lib.rs +++ b/aptos-move/aptos-vm/src/lib.rs @@ -126,6 +126,8 @@ pub mod verifier; pub use crate::aptos_vm::{AptosSimulationVM, AptosVM}; use crate::sharded_block_executor::{executor_client::ExecutorClient, ShardedBlockExecutor}; +use aptos_block_executor::code_cache_global_manager::ModuleCacheManager; +use aptos_crypto::HashValue; use aptos_types::{ block_executor::{ config::BlockExecutorConfigFromOnchain, partitioner::PartitionedTransactions, @@ -135,9 +137,13 @@ use aptos_types::{ signature_verified_transaction::SignatureVerifiedTransaction, BlockOutput, SignedTransaction, TransactionOutput, VMValidatorResult, }, + vm::modules::AptosModuleExtension, vm_status::VMStatus, }; use aptos_vm_types::module_and_script_storage::code_storage::AptosCodeStorage; +use move_binary_format::CompiledModule; +use move_core_types::language_storage::ModuleId; +use move_vm_runtime::Module; use std::{marker::Sync, sync::Arc}; pub use verifier::view_function::determine_is_view; @@ -152,13 +158,24 @@ pub trait VMValidator { ) -> VMValidatorResult; } -/// This trait describes the VM's execution interface. +/// This trait describes the block executor interface. pub trait VMBlockExecutor: Send + Sync { - /// Be careful if any state is kept in VMBlockExecutor, as all validations are implementers responsibility - /// (and state_view passed in execute_block can go both backwards and forwards in time). - /// TODO: Currently, production uses new() on every block, and only executor-benchmark reuses across. + /// Be careful if any state (such as caches) is kept in [VMBlockExecutor]. It is the + /// responsibility of the implementation to ensure the state is valid across multiple + /// executions. For example, the same executor may be used to run on a new state, and then on + /// an old one. fn new() -> Self; + /// Returns the cache manager responsible for keeping module caches in sync. By default, is + /// [None]. + fn module_cache_manager( + &self, + ) -> Option< + &ModuleCacheManager, + > { + None + } + /// Executes a block of transactions and returns output for each one of them. fn execute_block( &self, @@ -173,7 +190,6 @@ pub trait VMBlockExecutor: Send + Sync { &self, transactions: &[SignatureVerifiedTransaction], state_view: &(impl StateView + Sync), - global_cache_manager: &GlobalCacheManager, ) -> Result, VMStatus> { self.execute_block( transactions, diff --git a/aptos-move/aptos-vm/src/sharded_block_executor/global_executor.rs b/aptos-move/aptos-vm/src/sharded_block_executor/global_executor.rs index 90a12556deb57..239bbaba7bb24 100644 --- a/aptos-move/aptos-vm/src/sharded_block_executor/global_executor.rs +++ b/aptos-move/aptos-vm/src/sharded_block_executor/global_executor.rs @@ -60,11 +60,9 @@ impl GlobalExecutor { GLOBAL_ROUND_ID, state_view, BlockExecutorConfig { - local: BlockExecutorLocalConfig { - concurrency_level: self.concurrency_level, - allow_fallback: true, - discard_failed_blocks: false, - }, + local: BlockExecutorLocalConfig::default_with_concurrency_level( + self.concurrency_level, + ), onchain: onchain_config, }, ) diff --git a/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_service.rs b/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_service.rs index 5ad1dc5e602f6..193302692725d 100644 --- a/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_service.rs +++ b/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_service.rs @@ -135,10 +135,12 @@ impl ShardedExecutorService { ); }); s.spawn(move |_| { - let ret = BlockAptosVM::execute_block_on_thread_pool_without_global_caches( + let ret = BlockAptosVM::execute_block_on_thread_pool( executor_thread_pool, &signature_verified_transactions, aggr_overridden_state_view.as_ref(), + // Since we execute blocks in parallel, we cannot share module caches. + None, config, cross_shard_commit_sender, ) @@ -230,11 +232,9 @@ impl ShardedExecutorService { transactions, state_view.as_ref(), BlockExecutorConfig { - local: BlockExecutorLocalConfig { - concurrency_level: concurrency_level_per_shard, - allow_fallback: true, - discard_failed_blocks: false, - }, + local: BlockExecutorLocalConfig::default_with_concurrency_level( + concurrency_level_per_shard, + ), onchain: onchain_config, }, ); diff --git a/aptos-move/aptos-vm/tests/sharded_block_executor.rs b/aptos-move/aptos-vm/tests/sharded_block_executor.rs index 2f5c08eaa86a1..5968d0a495ab9 100644 --- a/aptos-move/aptos-vm/tests/sharded_block_executor.rs +++ b/aptos-move/aptos-vm/tests/sharded_block_executor.rs @@ -187,7 +187,6 @@ fn test_partitioner_v2_connected_component_sharded_block_executor_with_random_tr mod test_utils { use aptos_block_partitioner::BlockPartitioner; - use aptos_global_cache_manager::GlobalCacheManager; use aptos_language_e2e_tests::{ account::AccountData, common_transactions::peer_to_peer_txn, data_store::FakeDataStore, executor::FakeExecutor, diff --git a/aptos-move/block-executor/Cargo.toml b/aptos-move/block-executor/Cargo.toml index aefd0ae6f13cc..d0eedea80dcfb 100644 --- a/aptos-move/block-executor/Cargo.toml +++ b/aptos-move/block-executor/Cargo.toml @@ -23,6 +23,7 @@ aptos-metrics-core = { workspace = true } aptos-mvhashmap = { workspace = true } aptos-types = { workspace = true } aptos-vm-logging = { workspace = true } +aptos-vm-environment = { workspace = true } aptos-vm-types = { workspace = true } arc-swap = { workspace = true } bcs = { workspace = true } diff --git a/aptos-move/block-executor/src/captured_reads.rs b/aptos-move/block-executor/src/captured_reads.rs index 7abe1da817a9a..c1214baa633db 100644 --- a/aptos-move/block-executor/src/captured_reads.rs +++ b/aptos-move/block-executor/src/captured_reads.rs @@ -1,7 +1,10 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::{types::InputOutputKey, value_exchange::filter_value_for_exchange}; +use crate::{ + code_cache_global::GlobalModuleCache, types::InputOutputKey, + value_exchange::filter_value_for_exchange, +}; use anyhow::bail; use aptos_aggregator::{ delta_math::DeltaHistory, @@ -19,7 +22,6 @@ use aptos_mvhashmap::{ use aptos_types::{ error::{code_invariant_error, PanicError, PanicOr}, executable::ModulePath, - read_only_module_cache::ReadOnlyModuleCache, state_store::state_value::StateValueMetadata, transaction::BlockExecutableTransaction as Transaction, write_set::TransactionWrite, @@ -651,7 +653,7 @@ where /// 3. Entries that were in per-block cache have the same commit index. pub(crate) fn validate_module_reads( &self, - global_module_cache: &ReadOnlyModuleCache, + global_module_cache: &GlobalModuleCache, per_block_module_cache: &SyncModuleCache>, ) -> bool { if self.non_delayed_field_speculative_failure { @@ -872,7 +874,10 @@ where #[cfg(test)] mod test { use super::*; - use crate::proptest_types::types::{raw_metadata, KeyType, MockEvent, ValueType}; + use crate::{ + code_cache_global::GlobalModuleCache, + proptest_types::types::{raw_metadata, KeyType, MockEvent, ValueType}, + }; use aptos_mvhashmap::{types::StorageVersion, MVHashMap}; use aptos_types::executable::ExecutableTestType; use claims::{ @@ -1520,7 +1525,7 @@ mod test { MockVerifiedCode, MockExtension, >::new(); - let global_module_cache = ReadOnlyModuleCache::empty(); + let global_module_cache = GlobalModuleCache::empty(); let per_block_module_cache = SyncModuleCache::empty(); assert!(captured_reads.validate_module_reads(&global_module_cache, &per_block_module_cache)); @@ -1555,7 +1560,7 @@ mod test { MockVerifiedCode, MockExtension, >::new(); - let global_module_cache = ReadOnlyModuleCache::empty(); + let global_module_cache = GlobalModuleCache::empty(); let per_block_module_cache = SyncModuleCache::empty(); global_module_cache.insert(0, mock_verified_code(0, MockExtension::new(8))); @@ -1632,7 +1637,7 @@ mod test { MockVerifiedCode, MockExtension, >::new(); - let global_module_cache = ReadOnlyModuleCache::empty(); + let global_module_cache = GlobalModuleCache::empty(); let per_block_module_cache = SyncModuleCache::empty(); let a = mock_deserialized_code(0, MockExtension::new(8)); @@ -1692,7 +1697,7 @@ mod test { MockVerifiedCode, MockExtension, >::new(); - let global_module_cache = ReadOnlyModuleCache::empty(); + let global_module_cache = GlobalModuleCache::empty(); let per_block_module_cache = SyncModuleCache::empty(); // Module exists in global cache. diff --git a/aptos-move/block-executor/src/code_cache.rs b/aptos-move/block-executor/src/code_cache.rs index 7ffe0d78b2ef4..8d875dca1ce2d 100644 --- a/aptos-move/block-executor/src/code_cache.rs +++ b/aptos-move/block-executor/src/code_cache.rs @@ -54,6 +54,8 @@ impl<'a, T: Transaction, S: TStateView, X: Executable> ModuleCodeB .map_err(|err| err.finish(Location::Undefined))? .map(|state_value| { let extension = Arc::new(AptosModuleExtension::new(state_value)); + + // TODO(loader_v2): This recomputes module hash twice, we should avoid it. let (compiled_module, _, _) = self .runtime_environment() .deserialize_into_compiled_module(extension.bytes())?; diff --git a/types/src/read_only_module_cache.rs b/aptos-move/block-executor/src/code_cache_global.rs similarity index 91% rename from types/src/read_only_module_cache.rs rename to aptos-move/block-executor/src/code_cache_global.rs index a7d70138822aa..d8fa927a60008 100644 --- a/types/src/read_only_module_cache.rs +++ b/aptos-move/block-executor/src/code_cache_global.rs @@ -1,7 +1,8 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::{error::PanicError, explicit_sync_wrapper::ExplicitSyncWrapper}; +use crate::explicit_sync_wrapper::ExplicitSyncWrapper; +use aptos_types::error::PanicError; use crossbeam::utils::CachePadded; use hashbrown::HashMap; use move_vm_types::code::{ModuleCode, WithSize}; @@ -14,7 +15,7 @@ use std::{ }, }; -/// Entry stored in [ReadOnlyModuleCache]. +/// Entry stored in [GlobalModuleCache]. struct Entry { /// True if this code is "valid" within the block execution context (i.e., there has been no /// republishing of this module so far). If false, executor needs to read the module from the @@ -61,14 +62,14 @@ where /// A read-only module cache for verified code, that can be accessed concurrently within the block. /// Can only be modified safely at block boundaries. -pub struct ReadOnlyModuleCache { +pub struct GlobalModuleCache { /// Module cache containing the verified code. module_cache: ExplicitSyncWrapper>>, /// Sum of serialized sizes (in bytes) of all cached modules. size: AtomicUsize, } -impl ReadOnlyModuleCache +impl GlobalModuleCache where K: Hash + Eq + Clone, VC: Deref>, @@ -124,6 +125,12 @@ where self.size.load(Ordering::Relaxed) } + /// Returns true if the sum of serialized sizes of modules stored in cache is greater than the + /// specified value. + pub fn size_in_bytes_is_greater_than(&self, size: usize) -> bool { + self.size_in_bytes() > size + } + /// Inserts modules into the cache. Should never be called throughout block-execution. Use with /// caution. /// @@ -214,7 +221,7 @@ mod test { #[test] fn test_cache_contains_valid_and_get() { - let cache = ReadOnlyModuleCache::empty(); + let cache = GlobalModuleCache::empty(); // Set the state. cache.insert(0, mock_verified_code(0, MockExtension::new(8))); @@ -233,8 +240,8 @@ mod test { } #[test] - fn test_num_modules_and_flush_unchecked() { - let cache = ReadOnlyModuleCache::empty(); + fn test_cache_sizes_and_flush_unchecked() { + let cache = GlobalModuleCache::empty(); assert_eq!(cache.num_modules(), 0); assert_eq!(cache.size_in_bytes(), 0); @@ -248,6 +255,10 @@ mod test { assert_eq!(cache.num_modules(), 2); assert_eq!(cache.size_in_bytes(), 24); + assert!(cache.size_in_bytes_is_greater_than(23)); + assert!(!cache.size_in_bytes_is_greater_than(24)); + assert!(!cache.size_in_bytes_is_greater_than(25)); + cache.flush_unchecked(); assert_eq!(cache.num_modules(), 0); assert_eq!(cache.size_in_bytes(), 0); @@ -255,7 +266,7 @@ mod test { #[test] fn test_cache_insert_verified_unchecked() { - let cache = ReadOnlyModuleCache::empty(); + let cache = GlobalModuleCache::empty(); let mut new_modules = vec![]; for i in 0..10 { @@ -271,7 +282,7 @@ mod test { #[test] fn test_cache_insert_verified_unchecked_does_not_add_deserialized_code() { - let cache = ReadOnlyModuleCache::empty(); + let cache = GlobalModuleCache::empty(); let deserialized_modules = vec![(0, mock_deserialized_code(0, MockExtension::new(8)))]; assert_ok!(cache.insert_verified_unchecked(deserialized_modules.into_iter())); @@ -282,7 +293,7 @@ mod test { #[test] fn test_cache_insert_verified_unchecked_does_not_override_valid_modules() { - let cache = ReadOnlyModuleCache::empty(); + let cache = GlobalModuleCache::empty(); cache.insert(0, mock_verified_code(0, MockExtension::new(8))); assert_eq!(cache.num_modules(), 1); @@ -294,7 +305,7 @@ mod test { #[test] fn test_cache_insert_verified_unchecked_overrides_invalid_modules() { - let cache = ReadOnlyModuleCache::empty(); + let cache = GlobalModuleCache::empty(); cache.insert(0, mock_verified_code(0, MockExtension::new(8))); cache.mark_invalid_if_contains(&0); diff --git a/aptos-move/block-executor/src/code_cache_global_manager.rs b/aptos-move/block-executor/src/code_cache_global_manager.rs new file mode 100644 index 0000000000000..65905e165bf96 --- /dev/null +++ b/aptos-move/block-executor/src/code_cache_global_manager.rs @@ -0,0 +1,431 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::code_cache_global::GlobalModuleCache; +use aptos_types::state_store::StateView; +use aptos_vm_environment::environment::AptosEnvironment; +use move_vm_types::code::WithSize; +use parking_lot::Mutex; +use std::{fmt::Debug, hash::Hash, ops::Deref, sync::Arc}; + +/// Raises an alert with the specified message. In case we run in testing mode, instead prints the +/// message to standard output. +macro_rules! alert_or_println { + ($($arg:tt)*) => { + if cfg!(any(test, feature = "testing")) { + println!($($arg)*) + } else { + use aptos_vm_logging::{alert, prelude::CRITICAL_ERRORS}; + use aptos_logger::error; + alert!($($arg)*); + } + }; +} + +/// Represents the state of [GlobalModuleCache]. The following transitions are allowed: +/// 1. [State::Clean] --> [State::Ready]. +/// 2. [State::Ready] --> [State::Executing]. +/// 3. [State::Executing] --> [State::Done]. +/// 4. [State::Done] --> [State::Ready]. +#[derive(Clone, Debug, Eq, PartialEq)] +enum State { + Clean, + Ready(T), + Executing(T), + Done(T), +} + +impl State { + /// If the state is [State::Clean] returns true, and false otherwise. + fn is_clean(&self) -> bool { + match self { + State::Clean => true, + State::Ready(_) | State::Executing(_) | State::Done(_) => false, + } + } + + /// If the state is [State::Done], returns true. + fn is_done(&self) -> bool { + match self { + State::Done(_) => true, + State::Clean | State::Ready(_) | State::Executing(_) => false, + } + } + + /// If the state is [State::Done] and its value equals the one provided, returns true. In other + /// cases, returns false. + fn is_done_with_value(&self, value: &T) -> bool { + match self { + State::Done(v) => v == value, + State::Clean | State::Executing(_) | State::Ready(_) => false, + } + } + + /// If the state is [State::Ready], returns its value. Otherwise, returns [None]. + fn value_from_ready(&self) -> Option { + match self { + State::Ready(v) => Some(v.clone()), + State::Clean | State::Executing(_) | State::Done(_) => None, + } + } + + /// If the state is [State::Executing], returns its value. Otherwise, returns [None]. + fn value_from_executing(&self) -> Option { + match self { + State::Executing(v) => Some(v.clone()), + State::Clean | State::Ready(_) | State::Done(_) => None, + } + } + + /// Sets the current state to [State::Ready]. + fn set_ready(&mut self, value: T) { + *self = Self::Ready(value); + } + + /// Sets the current state to [State::Executing]. + fn set_executing(&mut self, value: T) { + *self = Self::Executing(value); + } + + /// Sets the current state to [State::Done]. + fn set_done(&mut self, value: T) { + *self = Self::Done(value); + } +} + +/// Manages module caches and the execution environment, possible across multiple blocks. +pub struct ModuleCacheManager { + /// The state of global caches. + state: Mutex>, + + /// During concurrent executions, this module cache is read-only. However, it can be mutated + /// when it is known that there are no concurrent accesses. [ModuleCacheManager] must ensure + /// the safety. + module_cache: Arc>, + /// The execution environment, initially set to [None]. The environment, as long as it does not + /// change, can be kept for multiple block executions. + environment: Mutex>, +} + +impl ModuleCacheManager +where + T: Clone + Debug + Eq, + K: Hash + Eq + Clone, + VC: Deref>, + E: WithSize, +{ + /// Returns a new instance of [ModuleCacheManager]. + #[allow(clippy::new_without_default)] + pub fn new() -> Self { + Self { + state: Mutex::new(State::Clean), + module_cache: Arc::new(GlobalModuleCache::empty()), + environment: Mutex::new(None), + } + } + + /// If state is [State::Clean], or [State::Ready] with matching previous value, sets the state + /// to [State::Ready] with the current value and returns true. Otherwise, raises an alert and + /// returns false. + pub fn mark_ready(&self, previous: &T, current: T) -> bool { + let mut state = self.state.lock(); + + if state.is_clean() || state.is_done_with_value(previous) { + state.set_ready(current); + return true; + } + + if state.is_done() { + // If the state is done, but the values to not match, we still set the state as ready, + // but also flush module caches because they execute not on top of the previous state. + self.module_cache.flush_unchecked(); + state.set_ready(current); + return true; + } + + alert_or_println!( + "Unable to mark ready, state: {:?}, previous: {:?}, current: {:?}", + state, + previous, + current + ); + false + } + + /// If state is [State::Ready], changes it to [State::Executing] with the same value, returning + /// true. Otherwise, returns false indicating that state transition failed, also raising an + /// alert. + pub fn mark_executing(&self) -> bool { + let mut state = self.state.lock(); + if let Some(value) = state.value_from_ready() { + state.set_executing(value); + return true; + } + + alert_or_println!("Unable to mark executing, state: {:?}", state); + false + } + + /// If state is [State::Executing], changes it to [State::Done] with the same value, returning + /// true. Otherwise, returns false indicating that state transition failed, also raising an + /// alert. + pub fn mark_done(&self) -> bool { + let mut state = self.state.lock(); + if let Some(value) = state.value_from_executing() { + state.set_done(value); + return true; + } + + alert_or_println!("Unable to mark done, state: {:?}", state); + false + } + + /// Returns the cached global environment if it already exists, and matches the one in storage. + /// If it does not exist, or does not match, the new environment is initialized from the given + /// state, cached, and returned. + pub fn get_or_initialize_environment_unchecked( + &self, + state_view: &impl StateView, + ) -> AptosEnvironment { + let new_environment = + AptosEnvironment::new_with_delayed_field_optimization_enabled(state_view); + let mut existing_environment = self.environment.lock(); + + let (environment, is_new) = match existing_environment.as_ref() { + None => { + *existing_environment = Some(new_environment.clone()); + (new_environment, true) + }, + Some(environment) => { + if environment == &new_environment { + (environment.clone(), false) + } else { + *existing_environment = Some(new_environment.clone()); + (new_environment, true) + } + }, + }; + + // If this environment has been (re-)initialized, we need to flush the module cache because + // it can contain now out-dated code. + if is_new { + self.module_cache.flush_unchecked(); + } + + environment + } + + /// Returns the global module cache. + pub fn module_cache(&self) -> Arc> { + self.module_cache.clone() + } +} + +#[cfg(test)] +mod test { + use std::collections::HashMap; + use std::thread::JoinHandle; + use claims::assert_matches; + use aptos_types::on_chain_config::{FeatureFlag, Features, OnChainConfig}; + use aptos_types::state_store::MockStateView; + use aptos_types::state_store::state_key::StateKey; + use aptos_types::state_store::state_value::StateValue; + use move_vm_types::code::{mock_verified_code, MockExtension}; + use super::*; + + #[test] + fn test_clean_state() { + let state = State::Clean; + + assert!(state.is_clean()); + assert!(!state.is_done()); + assert!(!state.is_done_with_value(&0)); + assert!(state.value_from_ready().is_none()); + assert!(state.value_from_executing().is_none()); + } + + #[test] + fn test_ready_state() { + let state = State::Ready(0); + + assert!(!state.is_clean()); + assert!(!state.is_done()); + assert!(!state.is_done_with_value(&0)); + assert_eq!(state.value_from_ready(), Some(0)); + assert!(state.value_from_executing().is_none()); + } + + #[test] + fn test_executing_state() { + let state = State::Executing(0); + + assert!(!state.is_clean()); + assert!(!state.is_done()); + assert!(!state.is_done_with_value(&0)); + assert!(state.value_from_ready().is_none()); + assert!(state.value_from_executing().is_none()); + } + + #[test] + fn test_done_state() { + let state = State::Done(0); + + assert!(!state.is_clean()); + assert!(state.is_done()); + assert!(state.is_done_with_value(&0)); + assert!(!state.is_done_with_value(&10)); + assert!(state.value_from_ready().is_none()); + assert!(state.value_from_executing().is_none()); + } + + #[test] + fn test_set_state() { + let mut state = State::Clean; + + state.set_ready(0); + assert_matches!(state, State::Ready(0)); + + state.set_executing(10); + assert_matches!(state, State::Executing(10)); + + state.set_done(100); + assert_matches!(state, State::Done(100)); + } + + #[test] + fn test_marking() { + let module_cache_manager = ModuleCacheManager::new(); + assert!(module_cache_manager.state.lock().is_clean()); + + // Pre-populate module cache to test flushing. + module_cache_manager + .module_cache + .insert(0, mock_verified_code(0, MockExtension::new(8))); + assert_eq!(module_cache_manager.module_cache.num_modules(), 1); + + // Can only go to ready state from clean state. + assert!(!module_cache_manager.mark_executing()); + assert!(!module_cache_manager.mark_done()); + assert!(module_cache_manager.mark_ready(&0, 1)); + + assert_eq!(module_cache_manager.module_cache.num_modules(), 1); + + // Can only go to executing state from ready state. + assert!(!module_cache_manager.mark_done()); + assert!(!module_cache_manager.mark_ready(&0, 1)); + assert!(module_cache_manager.mark_executing()); + + assert_eq!(module_cache_manager.module_cache.num_modules(), 1); + + // Can only go to done state from executing state. + assert!(!module_cache_manager.mark_executing()); + assert!(!module_cache_manager.mark_ready(&0, 1)); + assert!(module_cache_manager.mark_done()); + + assert_eq!(module_cache_manager.module_cache.num_modules(), 1); + + // Can only go to ready state from done state. + assert!(!module_cache_manager.mark_executing()); + assert!(!module_cache_manager.mark_done()); + } + + /// Joins threads. Succeeds only if a single handle evaluates to [Ok] and the rest are [Err]s. + fn join_and_assert_single_true(handles: Vec>) { + let mut num_true = 0; + let mut num_false = 0; + + let num_handles = handles.len(); + for handle in handles { + let result = handle.join().unwrap(); + if result.is_ok() { + num_true += 1; + } else { + num_false += 1; + } + } + assert_eq!(num_true, 1); + assert_eq!(num_false, num_handles - 1); + } + + // #[test] + // fn mark_block_execution_end_concurrent() { + // let global_cache_manager = Arc::new(GlobalCacheManagerInner::< + // u32, + // MockDeserializedCode, + // MockVerifiedCode, + // MockExtension, + // >::new_with_default_config()); + // global_cache_manager.mark_not_ready_for_next_block(); + // + // let mut handles = vec![]; + // for _ in 0..32 { + // let handle = thread::spawn({ + // let global_cache_manager = global_cache_manager.clone(); + // move || global_cache_manager.mark_block_execution_end(None) + // }); + // handles.push(handle); + // } + // join_and_assert_single_ok(handles); + // } + + fn state_view_with_changed_feature_flag(feature_flag: Option) -> MockStateView { + // Tweak feature flags to force a different config. + let mut features = Features::default(); + + if let Some(feature_flag) = feature_flag { + if features.is_enabled(feature_flag) { + features.disable(feature_flag); + } else { + features.enable(feature_flag); + } + } + + MockStateView::new(HashMap::from([( + StateKey::resource(Features::address(), &Features::struct_tag()).unwrap(), + StateValue::new_legacy(bcs::to_bytes(&features).unwrap().into()), + )])) + } + + #[test] + fn mark_execution_start_when_different_environment() { + let module_cache_manager = ModuleCacheManager::new(); + + module_cache_manager + .module_cache + .insert(0, mock_verified_code(0, MockExtension::new(8))); + module_cache_manager + .module_cache + .insert(1, mock_verified_code(1, MockExtension::new(8))); + assert_eq!(module_cache_manager.module_cache.num_modules(), 2); + assert!(module_cache_manager.environment.lock().is_none()); + + // Environment has to be set to the same value, cache flushed. + let state_view = state_view_with_changed_feature_flag(None); + let environment = module_cache_manager.get_or_initialize_environment_unchecked(&state_view); + assert_eq!(module_cache_manager.module_cache.num_modules(), 0); + assert!(module_cache_manager.environment.lock().as_ref().is_some_and(|cached_environment| cached_environment == &environment)); + + module_cache_manager + .module_cache + .insert(2, mock_verified_code(2, MockExtension::new(8))); + assert_eq!(module_cache_manager.module_cache.num_modules(), 1); + assert!(module_cache_manager.environment.lock().is_some()); + + // Environment has to be re-set to the new value, cache flushed. + let state_view = state_view_with_changed_feature_flag(Some(FeatureFlag::CODE_DEPENDENCY_CHECK)); + let environment = module_cache_manager.get_or_initialize_environment_unchecked(&state_view); + assert_eq!(module_cache_manager.module_cache.num_modules(), 0); + assert!(module_cache_manager.environment.lock().as_ref().is_some_and(|cached_environment| cached_environment == &environment)); + + module_cache_manager + .module_cache + .insert(3, mock_verified_code(3, MockExtension::new(8))); + assert_eq!(module_cache_manager.module_cache.num_modules(), 1); + assert!(module_cache_manager.environment.lock().is_some()); + + // Environment is kept, and module caches are not flushed. + let environment = module_cache_manager.get_or_initialize_environment_unchecked(&state_view); + assert_eq!(module_cache_manager.module_cache.num_modules(), 1); + assert!(module_cache_manager.environment.lock().as_ref().is_some_and(|cached_environment| cached_environment == &environment)); + } +} diff --git a/aptos-move/block-executor/src/executor.rs b/aptos-move/block-executor/src/executor.rs index 05521aea84c5f..151dc89522013 100644 --- a/aptos-move/block-executor/src/executor.rs +++ b/aptos-move/block-executor/src/executor.rs @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ + code_cache_global::GlobalModuleCache, counters::{ self, BLOCK_EXECUTOR_INNER_EXECUTE_BLOCK, PARALLEL_EXECUTION_SECONDS, RAYON_EXECUTION_SECONDS, TASK_EXECUTE_SECONDS, TASK_VALIDATE_SECONDS, VM_INIT_SECONDS, @@ -10,6 +11,7 @@ use crate::{ }, errors::*, executor_utilities::*, + explicit_sync_wrapper::ExplicitSyncWrapper, limit_processor::BlockGasLimitProcessor, scheduler::{DependencyStatus, ExecutionTaskType, Scheduler, SchedulerTask, Wave}, task::{ExecutionStatus, ExecutorTask, TransactionOutput}, @@ -34,9 +36,7 @@ use aptos_types::{ block_executor::config::BlockExecutorConfig, error::{code_invariant_error, expect_ok, PanicError, PanicOr}, executable::Executable, - explicit_sync_wrapper::ExplicitSyncWrapper, on_chain_config::BlockGasLimitType, - read_only_module_cache::ReadOnlyModuleCache, state_store::{state_value::StateValue, TStateView}, transaction::{ block_epilogue::BlockEndInfo, BlockExecutableTransaction as Transaction, BlockOutput, @@ -75,7 +75,7 @@ pub struct BlockExecutor { config: BlockExecutorConfig, executor_thread_pool: Arc, global_module_cache: - Arc>, + Arc>, transaction_commit_hook: Option, phantom: PhantomData<(T, E, S, L, X)>, } @@ -94,7 +94,7 @@ where config: BlockExecutorConfig, executor_thread_pool: Arc, global_module_cache: Arc< - ReadOnlyModuleCache, + GlobalModuleCache, >, transaction_commit_hook: Option, ) -> Self { @@ -120,7 +120,7 @@ where versioned_cache: &MVHashMap, executor: &E, base_view: &S, - global_module_cache: &ReadOnlyModuleCache< + global_module_cache: &GlobalModuleCache< ModuleId, CompiledModule, Module, @@ -402,7 +402,7 @@ where fn validate( idx_to_validate: TxnIndex, last_input_output: &TxnLastInputOutput, - global_module_cache: &ReadOnlyModuleCache< + global_module_cache: &GlobalModuleCache< ModuleId, CompiledModule, Module, @@ -755,7 +755,7 @@ where fn publish_module_writes( txn_idx: TxnIndex, module_write_set: BTreeMap>, - global_module_cache: &ReadOnlyModuleCache< + global_module_cache: &GlobalModuleCache< ModuleId, CompiledModule, Module, @@ -1198,7 +1198,7 @@ where write: ModuleWrite, txn_idx: TxnIndex, runtime_environment: &RuntimeEnvironment, - global_module_cache: &ReadOnlyModuleCache< + global_module_cache: &GlobalModuleCache< ModuleId, CompiledModule, Module, @@ -1247,7 +1247,7 @@ where fn apply_output_sequential( txn_idx: TxnIndex, runtime_environment: &RuntimeEnvironment, - global_module_cache: &ReadOnlyModuleCache< + global_module_cache: &GlobalModuleCache< ModuleId, CompiledModule, Module, diff --git a/types/src/explicit_sync_wrapper.rs b/aptos-move/block-executor/src/explicit_sync_wrapper.rs similarity index 98% rename from types/src/explicit_sync_wrapper.rs rename to aptos-move/block-executor/src/explicit_sync_wrapper.rs index 7f57b2ae81c00..c088c71f88958 100644 --- a/types/src/explicit_sync_wrapper.rs +++ b/aptos-move/block-executor/src/explicit_sync_wrapper.rs @@ -1,8 +1,6 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -#![allow(unsafe_code)] - use std::{ cell::UnsafeCell, ops::{Deref, DerefMut}, diff --git a/aptos-move/block-executor/src/lib.rs b/aptos-move/block-executor/src/lib.rs index 902fe9caa0876..f2d388f769008 100644 --- a/aptos-move/block-executor/src/lib.rs +++ b/aptos-move/block-executor/src/lib.rs @@ -141,10 +141,13 @@ extern crate scopeguard; mod captured_reads; mod code_cache; +pub mod code_cache_global; +pub mod code_cache_global_manager; pub mod counters; pub mod errors; pub mod executor; mod executor_utilities; +pub mod explicit_sync_wrapper; mod limit_processor; #[cfg(any(test, feature = "fuzzing"))] pub mod proptest_types; diff --git a/aptos-move/block-executor/src/proptest_types/bencher.rs b/aptos-move/block-executor/src/proptest_types/bencher.rs index d4b989c041c98..527c926f1b636 100644 --- a/aptos-move/block-executor/src/proptest_types/bencher.rs +++ b/aptos-move/block-executor/src/proptest_types/bencher.rs @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ + code_cache_global::GlobalModuleCache, executor::BlockExecutor, proptest_types::{ baseline::BaselineOutput, @@ -15,8 +16,7 @@ use crate::{ }; use aptos_types::{ block_executor::config::BlockExecutorConfig, contract_event::TransactionEvent, - executable::ExecutableTestType, read_only_module_cache::ReadOnlyModuleCache, - state_store::MockStateView, + executable::ExecutableTestType, state_store::MockStateView, }; use criterion::{BatchSize, Bencher as CBencher}; use num_cpus; @@ -126,7 +126,7 @@ where .build() .unwrap(), ); - let global_module_cache = Arc::new(ReadOnlyModuleCache::empty()); + let global_module_cache = Arc::new(GlobalModuleCache::empty()); let config = BlockExecutorConfig::new_no_block_limit(num_cpus::get()); let env = MockEnvironment::new(); diff --git a/aptos-move/block-executor/src/proptest_types/tests.rs b/aptos-move/block-executor/src/proptest_types/tests.rs index f065ae3e307b1..5d83c2fe50578 100644 --- a/aptos-move/block-executor/src/proptest_types/tests.rs +++ b/aptos-move/block-executor/src/proptest_types/tests.rs @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ + code_cache_global::GlobalModuleCache, errors::SequentialBlockExecutionError, executor::BlockExecutor, proptest_types::{ @@ -17,8 +18,7 @@ use crate::{ }; use aptos_types::{ block_executor::config::BlockExecutorConfig, contract_event::TransactionEvent, - executable::ExecutableTestType, read_only_module_cache::ReadOnlyModuleCache, - state_store::MockStateView, + executable::ExecutableTestType, state_store::MockStateView, }; use claims::{assert_matches, assert_ok}; use num_cpus; @@ -80,7 +80,7 @@ fn run_transactions( >::new( BlockExecutorConfig::new_maybe_block_limit(num_cpus::get(), maybe_block_gas_limit), executor_thread_pool.clone(), - Arc::new(ReadOnlyModuleCache::empty()), + Arc::new(GlobalModuleCache::empty()), None, ) .execute_transactions_parallel(&env, &transactions, &state_view); @@ -217,7 +217,7 @@ fn deltas_writes_mixed_with_block_gas_limit(num_txns: usize, maybe_block_gas_lim >::new( BlockExecutorConfig::new_maybe_block_limit(num_cpus::get(), maybe_block_gas_limit), executor_thread_pool.clone(), - Arc::new(ReadOnlyModuleCache::empty()), + Arc::new(GlobalModuleCache::empty()), None, ) .execute_transactions_parallel(&env, &transactions, &data_view); @@ -270,7 +270,7 @@ fn deltas_resolver_with_block_gas_limit(num_txns: usize, maybe_block_gas_limit: >::new( BlockExecutorConfig::new_maybe_block_limit(num_cpus::get(), maybe_block_gas_limit), executor_thread_pool.clone(), - Arc::new(ReadOnlyModuleCache::empty()), + Arc::new(GlobalModuleCache::empty()), None, ) .execute_transactions_parallel(&env, &transactions, &data_view); @@ -428,7 +428,7 @@ fn publishing_fixed_params_with_block_gas_limit( >::new( BlockExecutorConfig::new_maybe_block_limit(num_cpus::get(), maybe_block_gas_limit), executor_thread_pool, - Arc::new(ReadOnlyModuleCache::empty()), + Arc::new(GlobalModuleCache::empty()), None, ) .execute_transactions_parallel(&env, &transactions, &data_view); @@ -475,7 +475,7 @@ fn publishing_fixed_params_with_block_gas_limit( Some(max(w_index, r_index) as u64 * MAX_GAS_PER_TXN + 1), ), executor_thread_pool.clone(), - Arc::new(ReadOnlyModuleCache::empty()), + Arc::new(GlobalModuleCache::empty()), None, ) // Ensure enough gas limit to commit the module txns (4 is maximum gas per txn) .execute_transactions_parallel(&env, &transactions, &data_view); @@ -556,7 +556,7 @@ fn non_empty_group( >::new( BlockExecutorConfig::new_no_block_limit(num_cpus::get()), executor_thread_pool.clone(), - Arc::new(ReadOnlyModuleCache::empty()), + Arc::new(GlobalModuleCache::empty()), None, ) .execute_transactions_parallel(&env, &transactions, &data_view); @@ -575,7 +575,7 @@ fn non_empty_group( >::new( BlockExecutorConfig::new_no_block_limit(num_cpus::get()), executor_thread_pool.clone(), - Arc::new(ReadOnlyModuleCache::empty()), + Arc::new(GlobalModuleCache::empty()), None, ) .execute_transactions_sequential(&env, &transactions, &data_view, false); diff --git a/aptos-move/block-executor/src/scheduler.rs b/aptos-move/block-executor/src/scheduler.rs index 796d024619075..0d27eb94ba8f5 100644 --- a/aptos-move/block-executor/src/scheduler.rs +++ b/aptos-move/block-executor/src/scheduler.rs @@ -2,12 +2,10 @@ // Parts of the project are originally copyright © Meta Platforms, Inc. // SPDX-License-Identifier: Apache-2.0 +use crate::explicit_sync_wrapper::ExplicitSyncWrapper; use aptos_infallible::Mutex; use aptos_mvhashmap::types::{Incarnation, TxnIndex}; -use aptos_types::{ - error::{code_invariant_error, PanicError}, - explicit_sync_wrapper::ExplicitSyncWrapper, -}; +use aptos_types::error::{code_invariant_error, PanicError}; use concurrent_queue::{ConcurrentQueue, PopError}; use crossbeam::utils::CachePadded; use parking_lot::{RwLock, RwLockUpgradableReadGuard}; diff --git a/aptos-move/block-executor/src/txn_last_input_output.rs b/aptos-move/block-executor/src/txn_last_input_output.rs index e8391b3ea6986..fcfbde7452aa7 100644 --- a/aptos-move/block-executor/src/txn_last_input_output.rs +++ b/aptos-move/block-executor/src/txn_last_input_output.rs @@ -4,6 +4,7 @@ use crate::{ captured_reads::CapturedReads, errors::ParallelBlockExecutionError, + explicit_sync_wrapper::ExplicitSyncWrapper, task::{ExecutionStatus, TransactionOutput}, types::{InputOutputKey, ReadWriteSummary}, }; @@ -11,7 +12,6 @@ use aptos_logger::error; use aptos_mvhashmap::types::TxnIndex; use aptos_types::{ error::{code_invariant_error, PanicError}, - explicit_sync_wrapper::ExplicitSyncWrapper, fee_statement::FeeStatement, state_store::state_value::StateValueMetadata, transaction::BlockExecutableTransaction as Transaction, diff --git a/aptos-move/block-executor/src/unit_tests/mod.rs b/aptos-move/block-executor/src/unit_tests/mod.rs index 39dca409555c5..0cb4a946dbc27 100644 --- a/aptos-move/block-executor/src/unit_tests/mod.rs +++ b/aptos-move/block-executor/src/unit_tests/mod.rs @@ -5,6 +5,7 @@ mod code_cache_tests; use crate::{ + code_cache_global::GlobalModuleCache, errors::SequentialBlockExecutionError, executor::BlockExecutor, proptest_types::{ @@ -29,7 +30,6 @@ use aptos_types::{ block_executor::config::BlockExecutorConfig, contract_event::TransactionEvent, executable::{ExecutableTestType, ModulePath}, - read_only_module_cache::ReadOnlyModuleCache, state_store::state_value::StateValueMetadata, write_set::WriteOpKind, }; @@ -87,7 +87,7 @@ fn test_resource_group_deletion() { >::new( BlockExecutorConfig::new_no_block_limit(num_cpus::get()), executor_thread_pool, - Arc::new(ReadOnlyModuleCache::empty()), + Arc::new(GlobalModuleCache::empty()), None, ); @@ -154,7 +154,7 @@ fn resource_group_bcs_fallback() { >::new( BlockExecutorConfig::new_no_block_limit(num_cpus::get()), executor_thread_pool, - Arc::new(ReadOnlyModuleCache::empty()), + Arc::new(GlobalModuleCache::empty()), None, ); @@ -254,7 +254,7 @@ fn block_output_err_precedence() { >::new( BlockExecutorConfig::new_no_block_limit(num_cpus::get()), executor_thread_pool, - Arc::new(ReadOnlyModuleCache::empty()), + Arc::new(GlobalModuleCache::empty()), None, ); @@ -294,7 +294,7 @@ fn skip_rest_gas_limit() { >::new( BlockExecutorConfig::new_maybe_block_limit(num_cpus::get(), Some(5)), executor_thread_pool, - Arc::new(ReadOnlyModuleCache::empty()), + Arc::new(GlobalModuleCache::empty()), None, ); @@ -330,7 +330,7 @@ where >::new( BlockExecutorConfig::new_no_block_limit(num_cpus::get()), executor_thread_pool, - Arc::new(ReadOnlyModuleCache::empty()), + Arc::new(GlobalModuleCache::empty()), None, ) .execute_transactions_parallel(&env, &transactions, &data_view); diff --git a/aptos-move/block-executor/src/view.rs b/aptos-move/block-executor/src/view.rs index 96956b91d6886..0659662c09e89 100644 --- a/aptos-move/block-executor/src/view.rs +++ b/aptos-move/block-executor/src/view.rs @@ -8,6 +8,7 @@ use crate::{ CapturedReads, DataRead, DelayedFieldRead, DelayedFieldReadKind, GroupRead, ReadKind, UnsyncReadSet, }, + code_cache_global::GlobalModuleCache, counters, scheduler::{DependencyResult, DependencyStatus, Scheduler, TWaitForDependency}, value_exchange::{ @@ -35,7 +36,6 @@ use aptos_mvhashmap::{ use aptos_types::{ error::{code_invariant_error, expect_ok, PanicError, PanicOr}, executable::{Executable, ModulePath}, - read_only_module_cache::ReadOnlyModuleCache, state_store::{ errors::StateviewError, state_storage_usage::StateStorageUsage, @@ -991,7 +991,7 @@ impl<'a, T: Transaction, X: Executable> ViewState<'a, T, X> { pub(crate) struct LatestView<'a, T: Transaction, S: TStateView, X: Executable> { base_view: &'a S, pub(crate) global_module_cache: - &'a ReadOnlyModuleCache, + &'a GlobalModuleCache, pub(crate) runtime_environment: &'a RuntimeEnvironment, pub(crate) latest_view: ViewState<'a, T, X>, pub(crate) txn_idx: TxnIndex, @@ -1000,7 +1000,7 @@ pub(crate) struct LatestView<'a, T: Transaction, S: TStateView, X: impl<'a, T: Transaction, S: TStateView, X: Executable> LatestView<'a, T, S, X> { pub(crate) fn new( base_view: &'a S, - global_module_cache: &'a ReadOnlyModuleCache< + global_module_cache: &'a GlobalModuleCache< ModuleId, CompiledModule, Module, @@ -2491,7 +2491,7 @@ mod test { let base_view = MockStateView::empty(); let start_counter = 5; let runtime_environment = RuntimeEnvironment::new(vec![]); - let global_module_cache = ReadOnlyModuleCache::empty(); + let global_module_cache = GlobalModuleCache::empty(); let latest_view = LatestView::>, MockExecutable>::new( @@ -2761,7 +2761,7 @@ mod test { counter: RefCell, base_view: MockStateView>, empty_global_module_cache: - ReadOnlyModuleCache, + GlobalModuleCache, runtime_environment: RuntimeEnvironment, } @@ -2775,7 +2775,7 @@ mod test { unsync_map, counter, base_view, - empty_global_module_cache: ReadOnlyModuleCache::empty(), + empty_global_module_cache: GlobalModuleCache::empty(), runtime_environment, } } diff --git a/aptos-move/e2e-tests/src/executor.rs b/aptos-move/e2e-tests/src/executor.rs index 6c64a10fffa30..c98c4e352006e 100644 --- a/aptos-move/e2e-tests/src/executor.rs +++ b/aptos-move/e2e-tests/src/executor.rs @@ -28,6 +28,7 @@ use aptos_types::{ }, block_executor::config::{ BlockExecutorConfig, BlockExecutorConfigFromOnchain, BlockExecutorLocalConfig, + BlockExecutorModuleCacheLocalConfig, }, block_metadata::BlockMetadata, chain_id::ChainId, @@ -633,16 +634,19 @@ impl FakeExecutor { }, allow_fallback: self.allow_block_executor_fallback, discard_failed_blocks: false, + module_cache_config: BlockExecutorModuleCacheLocalConfig::default(), }, onchain: onchain_config, }; - BlockAptosVM::execute_block_on_thread_pool_without_global_caches::< + BlockAptosVM::execute_block_on_thread_pool::< _, NoOpTransactionCommitHook, >( self.executor_thread_pool.clone(), txn_block, &state_view, + // Do not use module caches in tests. + None, config, None, ) diff --git a/execution/executor-benchmark/Cargo.toml b/execution/executor-benchmark/Cargo.toml index 6c572c0b7d329..4f99fe0a77268 100644 --- a/execution/executor-benchmark/Cargo.toml +++ b/execution/executor-benchmark/Cargo.toml @@ -25,7 +25,6 @@ aptos-executor-types = { workspace = true } aptos-experimental-ptx-executor = { workspace = true } aptos-experimental-runtimes = { workspace = true } aptos-genesis = { workspace = true, features = ["testing"] } -aptos-global-cache-manager = { workspace = true } aptos-jellyfish-merkle = { workspace = true } aptos-logger = { workspace = true } aptos-metrics-core = { workspace = true } diff --git a/execution/executor-benchmark/src/native_executor.rs b/execution/executor-benchmark/src/native_executor.rs index e25bb063d2fbc..f90eb1498f79e 100644 --- a/execution/executor-benchmark/src/native_executor.rs +++ b/execution/executor-benchmark/src/native_executor.rs @@ -358,7 +358,6 @@ impl VMBlockExecutor for NativeExecutor { &self, transactions: &[SignatureVerifiedTransaction], state_view: &(impl StateView + Sync), - _global_cache_manager: &GlobalCacheManager, _onchain_config: BlockExecutorConfigFromOnchain, ) -> Result, VMStatus> { let transaction_outputs = NATIVE_EXECUTOR_POOL diff --git a/execution/executor-service/Cargo.toml b/execution/executor-service/Cargo.toml index 5d631700f1af2..e590b54912c69 100644 --- a/execution/executor-service/Cargo.toml +++ b/execution/executor-service/Cargo.toml @@ -15,7 +15,6 @@ rust-version = { workspace = true } [dependencies] aptos-block-partitioner = { workspace = true } aptos-config = { workspace = true } -aptos-global-cache-manager = { workspace = true } aptos-infallible = { workspace = true } aptos-language-e2e-tests = { workspace = true } aptos-logger = { workspace = true } diff --git a/execution/executor/Cargo.toml b/execution/executor/Cargo.toml index 39ab296d4f35e..7fc27599e38fd 100644 --- a/execution/executor/Cargo.toml +++ b/execution/executor/Cargo.toml @@ -20,7 +20,6 @@ aptos-drop-helper = { workspace = true } aptos-executor-service = { workspace = true } aptos-executor-types = { workspace = true } aptos-experimental-runtimes = { workspace = true } -aptos-global-cache-manager = { workspace = true } aptos-indexer-grpc-table-info = { workspace = true } aptos-infallible = { workspace = true } aptos-logger = { workspace = true } diff --git a/execution/executor/src/block_executor/mod.rs b/execution/executor/src/block_executor/mod.rs index 553804df81d68..6599ee4a0af3b 100644 --- a/execution/executor/src/block_executor/mod.rs +++ b/execution/executor/src/block_executor/mod.rs @@ -232,6 +232,16 @@ where ))) }); + // In case block executor has a cache manager, we need to mark it as ready for + // execution. If for some reason this fails, return an error. + if let Some(module_cache_manager) = self.block_executor.module_cache_manager() { + if !module_cache_manager.mark_ready(&parent_block_id, block_id) { + return Err(ExecutorError::internal_err( + "Unable to mark module cache manager as ready", + )); + } + } + DoGetExecutionOutput::by_transaction_execution( &self.block_executor, transactions, diff --git a/execution/executor/src/chunk_executor/mod.rs b/execution/executor/src/chunk_executor/mod.rs index db8c2052dc288..26391ae28acd6 100644 --- a/execution/executor/src/chunk_executor/mod.rs +++ b/execution/executor/src/chunk_executor/mod.rs @@ -605,7 +605,6 @@ impl ChunkExecutorInner { BlockExecutorConfigFromOnchain::new_no_block_limit(), None, )?; - // not `zip_eq`, deliberately for (version, txn_out, txn_info, write_set, events) in multizip(( begin_version..end_version, diff --git a/execution/executor/src/chunk_executor/transaction_chunk.rs b/execution/executor/src/chunk_executor/transaction_chunk.rs index aca1d387e2ddc..26f873162eb61 100644 --- a/execution/executor/src/chunk_executor/transaction_chunk.rs +++ b/execution/executor/src/chunk_executor/transaction_chunk.rs @@ -87,7 +87,6 @@ impl TransactionChunk for ChunkToExecute { &V::new(), sig_verified_txns.into(), state_view, - &global_cache_manager, BlockExecutorConfigFromOnchain::new_no_block_limit(), None, ) diff --git a/execution/executor/src/db_bootstrapper/mod.rs b/execution/executor/src/db_bootstrapper/mod.rs index 0445db5a6a2ff..1d9a3c2742cc8 100644 --- a/execution/executor/src/db_bootstrapper/mod.rs +++ b/execution/executor/src/db_bootstrapper/mod.rs @@ -138,7 +138,6 @@ pub fn calculate_genesis( BlockExecutorConfigFromOnchain::new_no_block_limit(), None, )?; - ensure!( execution_output.num_transactions_to_commit() != 0, "Genesis txn execution failed." diff --git a/execution/executor/src/fuzzing.rs b/execution/executor/src/fuzzing.rs index 2ea96225c51ed..67639b5831107 100644 --- a/execution/executor/src/fuzzing.rs +++ b/execution/executor/src/fuzzing.rs @@ -78,7 +78,6 @@ impl VMBlockExecutor for FakeVM { &self, _transactions: &[SignatureVerifiedTransaction], _state_view: &impl StateView, - _global_cache_manager: &GlobalCacheManager, _onchain_config: BlockExecutorConfigFromOnchain, ) -> Result, VMStatus> { Ok(BlockOutput::new(vec![], None)) diff --git a/execution/executor/src/tests/mock_vm/mock_vm_test.rs b/execution/executor/src/tests/mock_vm/mock_vm_test.rs index fcf97bb230bad..4df0ef06d0665 100644 --- a/execution/executor/src/tests/mock_vm/mock_vm_test.rs +++ b/execution/executor/src/tests/mock_vm/mock_vm_test.rs @@ -25,8 +25,11 @@ fn test_mock_vm_different_senders() { txns.push(encode_mint_transaction(gen_address(i), amount)); } - let outputs = MockVM - .execute_block_no_limit(&into_signature_verified_block(txns.clone()), &MockStateView::empty()) + let outputs = MockVM::new() + .execute_block_no_limit( + &into_signature_verified_block(txns.clone()), + &MockStateView::empty(), + ) .expect("MockVM should not fail to start"); for (output, txn) in itertools::zip_eq(outputs.iter(), txns.iter()) { @@ -62,8 +65,11 @@ fn test_mock_vm_same_sender() { txns.push(encode_mint_transaction(sender, amount)); } - let outputs = MockVM - .execute_block_no_limit(&into_signature_verified_block(txns), &MockStateView::empty()) + let outputs = MockVM::new() + .execute_block_no_limit( + &into_signature_verified_block(txns), + &MockStateView::empty(), + ) .expect("MockVM should not fail to start"); for (i, output) in outputs.iter().enumerate() { @@ -97,8 +103,11 @@ fn test_mock_vm_payment() { encode_transfer_transaction(gen_address(0), gen_address(1), 50), ]; - let output = MockVM - .execute_block_no_limit(&into_signature_verified_block(txns), &MockStateView::empty()) + let output = MockVM::new() + .execute_block_no_limit( + &into_signature_verified_block(txns), + &MockStateView::empty(), + ) .expect("MockVM should not fail to start"); let mut output_iter = output.iter(); diff --git a/execution/executor/src/tests/mock_vm/mod.rs b/execution/executor/src/tests/mock_vm/mod.rs index 608c45076976e..bb9ea70a99393 100644 --- a/execution/executor/src/tests/mock_vm/mod.rs +++ b/execution/executor/src/tests/mock_vm/mod.rs @@ -52,7 +52,6 @@ enum MockVMTransaction { pub static KEEP_STATUS: Lazy = Lazy::new(|| TransactionStatus::Keep(ExecutionStatus::Success)); -// We use 10 as the assertion error code for insufficient balance within the Aptos coin contract. pub static DISCARD_STATUS: Lazy = Lazy::new(|| TransactionStatus::Discard(StatusCode::INSUFFICIENT_BALANCE_FOR_TRANSACTION_FEE)); @@ -67,7 +66,6 @@ impl VMBlockExecutor for MockVM { &self, transactions: &[SignatureVerifiedTransaction], state_view: &impl StateView, - _global_cache_manager: &GlobalCacheManager, _onchain_config: BlockExecutorConfigFromOnchain, ) -> Result, VMStatus> { // output_cache is used to store the output of transactions so they are visible to later diff --git a/execution/executor/src/tests/mod.rs b/execution/executor/src/tests/mod.rs index ba76568c38f7e..8ca28af1a83ff 100644 --- a/execution/executor/src/tests/mod.rs +++ b/execution/executor/src/tests/mod.rs @@ -675,20 +675,23 @@ fn run_transactions_naive( ) -> HashValue { let executor = TestExecutor::new(); let db = &executor.db; - let global_cache_manager = GlobalCacheManager::new_with_default_config(); for txn in transactions { let ledger_view: ExecutedTrees = db.reader.get_latest_executed_trees().unwrap(); let out = DoGetExecutionOutput::by_transaction_execution( &MockVM::new(), vec![txn].into(), - state_view, - &global_cache_manager, + ledger_view + .verified_state_view( + StateViewId::Miscellaneous, + Arc::clone(&db.reader), + Arc::new(AsyncProofFetcher::new(db.reader.clone())), + ) + .unwrap(), block_executor_onchain_config.clone(), None, ) .unwrap(); - let output = ApplyExecutionOutput::run(out, &ledger_view).unwrap(); db.writer .save_transactions( diff --git a/execution/executor/src/workflow/do_get_execution_output.rs b/execution/executor/src/workflow/do_get_execution_output.rs index cb1c76c96b284..4fad63d2e8261 100644 --- a/execution/executor/src/workflow/do_get_execution_output.rs +++ b/execution/executor/src/workflow/do_get_execution_output.rs @@ -48,7 +48,6 @@ impl DoGetExecutionOutput { executor: &V, transactions: ExecutableTransactions, state_view: CachedStateView, - global_cache_manager: &GlobalCacheManager, onchain_config: BlockExecutorConfigFromOnchain, append_state_checkpoint_to_block: Option, ) -> Result { @@ -58,7 +57,6 @@ impl DoGetExecutionOutput { executor, txns, state_view, - global_cache_manager, onchain_config, append_state_checkpoint_to_block, )? @@ -90,7 +88,6 @@ impl DoGetExecutionOutput { executor: &V, transactions: Vec, state_view: CachedStateView, - global_cache_manager: &GlobalCacheManager, onchain_config: BlockExecutorConfigFromOnchain, append_state_checkpoint_to_block: Option, ) -> Result { @@ -204,7 +201,6 @@ impl DoGetExecutionOutput { executor: &V, transactions: &[SignatureVerifiedTransaction], state_view: &CachedStateView, - global_cache_manager: &GlobalCacheManager, onchain_config: BlockExecutorConfigFromOnchain, ) -> Result> { let _timer = OTHER_TIMERS.timer_with(&["vm_execute_block"]); @@ -220,7 +216,6 @@ impl DoGetExecutionOutput { executor: &V, transactions: &[SignatureVerifiedTransaction], state_view: &CachedStateView, - global_cache_manager: &GlobalCacheManager, onchain_config: BlockExecutorConfigFromOnchain, ) -> Result> { use aptos_types::{ diff --git a/experimental/execution/ptx-executor/Cargo.toml b/experimental/execution/ptx-executor/Cargo.toml index 20b9e6a3a28e3..0da896d31500a 100644 --- a/experimental/execution/ptx-executor/Cargo.toml +++ b/experimental/execution/ptx-executor/Cargo.toml @@ -14,7 +14,6 @@ rust-version = { workspace = true } [dependencies] aptos-experimental-runtimes = { workspace = true } -aptos-global-cache-manager = { workspace = true } aptos-infallible = { workspace = true } aptos-logger = { workspace = true } aptos-metrics-core = { workspace = true } diff --git a/experimental/execution/ptx-executor/src/lib.rs b/experimental/execution/ptx-executor/src/lib.rs index a080223eb7159..9ed83b7d7a3f6 100644 --- a/experimental/execution/ptx-executor/src/lib.rs +++ b/experimental/execution/ptx-executor/src/lib.rs @@ -22,7 +22,6 @@ use crate::{ scheduler::PtxScheduler, sorter::PtxSorter, state_reader::PtxStateReader, }; use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER; -use aptos_global_cache_manager::GlobalCacheManager; use aptos_infallible::Mutex; use aptos_metrics_core::TimerHelper; use aptos_types::{ @@ -53,7 +52,6 @@ impl VMBlockExecutor for PtxBlockExecutor { &self, transactions: &[SignatureVerifiedTransaction], state_view: &(impl StateView + Sync), - _global_cache_manager: &GlobalCacheManager, _onchain_config: BlockExecutorConfigFromOnchain, ) -> Result, VMStatus> { let _timer = TIMER.timer_with(&["block_total"]); diff --git a/storage/db-tool/Cargo.toml b/storage/db-tool/Cargo.toml index 0f9f772776d00..b2feeb952750d 100644 --- a/storage/db-tool/Cargo.toml +++ b/storage/db-tool/Cargo.toml @@ -18,7 +18,6 @@ aptos-config = { workspace = true } aptos-db = { workspace = true, features = ["db-debugger"] } aptos-executor = { workspace = true } aptos-executor-types = { workspace = true } -aptos-global-cache-manager = { workspace = true } aptos-logger = { workspace = true } aptos-storage-interface = { workspace = true } aptos-temppath = { workspace = true } diff --git a/storage/db-tool/src/replay_on_archive.rs b/storage/db-tool/src/replay_on_archive.rs index 6153dab12de28..188cb5a37f3c0 100644 --- a/storage/db-tool/src/replay_on_archive.rs +++ b/storage/db-tool/src/replay_on_archive.rs @@ -29,7 +29,6 @@ use std::{ sync::{atomic::AtomicU64, Arc}, time::Instant, }; - // Replay Verify controller is responsible for providing legit range with start and end versions. #[derive(Parser)] pub struct Opt { @@ -277,7 +276,9 @@ impl Verifier { .map(|txn| SignatureVerifiedTransaction::from(txn.clone())) .collect::>() .as_slice(), - &state_view, + &self + .arc_db + .state_view_at_version(start_version.checked_sub(1))?, )?; let mut failed_txns = Vec::new(); diff --git a/third_party/move/move-vm/runtime/src/storage/implementations/unsync_module_storage.rs b/third_party/move/move-vm/runtime/src/storage/implementations/unsync_module_storage.rs index 578b3690d3cb6..8138bac66e02c 100644 --- a/third_party/move/move-vm/runtime/src/storage/implementations/unsync_module_storage.rs +++ b/third_party/move/move-vm/runtime/src/storage/implementations/unsync_module_storage.rs @@ -170,10 +170,11 @@ impl<'s, S: ModuleBytesStorage, E: WithRuntimeEnvironment> UnsyncModuleStorage<' .module_cache .into_modules_iter() .flat_map(|(key, module)| { - module - .code() - .is_verified() - .then(|| (key, module.code().verified().clone())) + module.code().is_verified().then(|| { + // TODO(loader_v2): + // We should be able to take ownership here, instead of clones. + (key, module.code().verified().clone()) + }) }) } diff --git a/types/Cargo.toml b/types/Cargo.toml index 79cb83ee9e9cc..088b34699ac06 100644 --- a/types/Cargo.toml +++ b/types/Cargo.toml @@ -32,7 +32,6 @@ arr_macro = { workspace = true } base64 = { workspace = true } bcs = { workspace = true } bytes = { workspace = true } -crossbeam = { workspace = true } dashmap = { workspace = true } derivative = { workspace = true } fixed = { workspace = true } diff --git a/types/src/block_executor/config.rs b/types/src/block_executor/config.rs index 586b76e8c7942..95e66d1609850 100644 --- a/types/src/block_executor/config.rs +++ b/types/src/block_executor/config.rs @@ -4,6 +4,31 @@ use crate::on_chain_config::BlockGasLimitType; use serde::{Deserialize, Serialize}; +/// Local, per-node configurations for module cache. While caches can be persisted across multiple +/// block executions, these configurations allow to specify cache sizes, etc. +#[derive(Clone, Debug)] +pub struct BlockExecutorModuleCacheLocalConfig { + /// If true, when global caches are empty, Aptos framework is prefetched into module cache. + pub prefetch_framework_code: bool, + /// The maximum size of module cache (the sum of serialized sizes of all cached modules in + /// bytes). + pub max_module_cache_size_in_bytes: usize, + /// The maximum size (in terms of entries) of struct name re-indexing map stored in the runtime + /// environment. + pub max_struct_name_index_map_size: usize, +} + +impl Default for BlockExecutorModuleCacheLocalConfig { + fn default() -> Self { + Self { + prefetch_framework_code: true, + // Use 50 Mb for now, should be large enough to cache many modules. + max_module_cache_size_in_bytes: 50 * 1024 * 1024, + max_struct_name_index_map_size: 100_000, + } + } +} + /// Local, per-node configuration. #[derive(Clone, Debug)] pub struct BlockExecutorLocalConfig { @@ -14,6 +39,24 @@ pub struct BlockExecutorLocalConfig { // If true, we will discard the failed blocks and continue with the next block. // (allow_fallback needs to be set) pub discard_failed_blocks: bool, + + /// Various cache configurations, see [BlockExecutorModuleCacheLocalConfig] for more details. + pub module_cache_config: BlockExecutorModuleCacheLocalConfig, +} + +impl BlockExecutorLocalConfig { + /// Returns a new config with specified concurrency level and: + /// - Allowed fallback to sequential execution from parallel. + /// - Not allowed discards of failed blocks. + /// - Default module cache configs. + pub fn default_with_concurrency_level(concurrency_level: usize) -> Self { + Self { + concurrency_level, + allow_fallback: true, + discard_failed_blocks: false, + module_cache_config: BlockExecutorModuleCacheLocalConfig::default(), + } + } } /// Configuration from on-chain configuration, that is @@ -40,7 +83,7 @@ impl BlockExecutorConfigFromOnchain { pub const fn on_but_large_for_test() -> Self { Self { block_gas_limit_type: - // present, so code is excercised, but large to not limit blocks + // present, so code is exercised, but large to not limit blocks BlockGasLimitType::ComplexLimitV1 { effective_block_gas_limit: 1_000_000_000, execution_gas_effective_multiplier: 1, @@ -69,11 +112,7 @@ pub struct BlockExecutorConfig { impl BlockExecutorConfig { pub fn new_no_block_limit(concurrency_level: usize) -> Self { Self { - local: BlockExecutorLocalConfig { - concurrency_level, - allow_fallback: true, - discard_failed_blocks: false, - }, + local: BlockExecutorLocalConfig::default_with_concurrency_level(concurrency_level), onchain: BlockExecutorConfigFromOnchain::new_no_block_limit(), } } @@ -83,11 +122,7 @@ impl BlockExecutorConfig { maybe_block_gas_limit: Option, ) -> Self { Self { - local: BlockExecutorLocalConfig { - concurrency_level, - allow_fallback: true, - discard_failed_blocks: false, - }, + local: BlockExecutorLocalConfig::default_with_concurrency_level(concurrency_level), onchain: BlockExecutorConfigFromOnchain::new_maybe_block_limit(maybe_block_gas_limit), } } diff --git a/types/src/lib.rs b/types/src/lib.rs index bec1414ad4104..9081b6c0f0a4d 100644 --- a/types/src/lib.rs +++ b/types/src/lib.rs @@ -2,7 +2,7 @@ // Parts of the project are originally copyright © Meta Platforms, Inc. // SPDX-License-Identifier: Apache-2.0 -#![deny(unsafe_code)] +#![forbid(unsafe_code)] pub mod access_path; pub mod account_address; @@ -18,7 +18,6 @@ pub mod epoch_state; pub mod error; pub mod event; pub mod executable; -pub mod explicit_sync_wrapper; pub mod fee_statement; pub mod governance; pub mod indexer; @@ -36,7 +35,6 @@ pub mod proof; #[cfg(any(test, feature = "fuzzing"))] pub mod proptest_types; pub mod randomness; -pub mod read_only_module_cache; pub mod serde_helper; pub mod stake_pool; pub mod staking_contract;