Skip to content

Commit

Permalink
Move ready checks to cache manager, better encapsulation by passing h…
Browse files Browse the repository at this point in the history
…ash values throughout
  • Loading branch information
georgemitenkov committed Nov 12, 2024
1 parent 28116bc commit d45df1f
Show file tree
Hide file tree
Showing 29 changed files with 375 additions and 244 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions aptos-move/aptos-debugger/src/aptos_debugger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,8 @@ fn execute_block_no_limit(
onchain: BlockExecutorConfigFromOnchain::new_no_block_limit(),
},
None,
None,
None,
)
.map(BlockOutput::into_transaction_outputs_forced)
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl DataCollection {
let val = debugger_state_view.get_state_value(TOTAL_SUPPLY_STATE_KEY.deref());
assert!(val.is_ok() && val.unwrap().is_some());
AptosVMBlockExecutor::new()
.execute_block_no_limit(&sig_verified_txns, debugger_state_view)
.execute_block_no_limit(&sig_verified_txns, debugger_state_view, None, None)
.map_err(|err| format_err!("Unexpected VM Error: {:?}", err))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ where
&ModuleCacheManager::new(),
BlockExecutorConfig::new_maybe_block_limit(1, maybe_block_gas_limit),
None,
None,
None,
)
.expect("VM should not fail to start")
.into_transaction_outputs_forced();
Expand Down Expand Up @@ -275,6 +277,8 @@ where
maybe_block_gas_limit,
),
None,
None,
None,
)
.expect("VM should not fail to start")
.into_transaction_outputs_forced();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,14 +515,12 @@ impl<'a> AptosTestAdapter<'a> {
fn run_transaction(&mut self, txn: Transaction) -> Result<TransactionOutput> {
let txn_block = vec![txn];
let sig_verified_block = into_signature_verified_block(txn_block);

let executor = AptosVMBlockExecutor::new();
if let Some(module_cache_manager) = executor.module_cache_manager() {
module_cache_manager.mark_ready(None, None);
}

let mut outputs =
executor.execute_block_no_limit(&sig_verified_block, &self.storage.clone())?;
let mut outputs = AptosVMBlockExecutor::new().execute_block_no_limit(
&sig_verified_block,
&self.storage.clone(),
None,
None,
)?;

assert_eq!(outputs.len(), 1);

Expand Down
3 changes: 2 additions & 1 deletion aptos-move/aptos-vm-profiling/src/bins/run_aptos_p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ fn main() -> Result<()> {
})
.collect();

let outputs = AptosVMBlockExecutor::new().execute_block_no_limit(&txns, &state_store)?;
let outputs =
AptosVMBlockExecutor::new().execute_block_no_limit(&txns, &state_store, None, None)?;
for i in 0..NUM_TXNS {
assert!(outputs[i as usize].status().status().unwrap().is_success());
}
Expand Down
12 changes: 4 additions & 8 deletions aptos-move/aptos-vm/src/aptos_vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2793,19 +2793,13 @@ impl VMBlockExecutor for AptosVMBlockExecutor {
}
}

fn module_cache_manager(
&self,
) -> Option<
&ModuleCacheManager<HashValue, ModuleId, CompiledModule, Module, AptosModuleExtension>,
> {
Some(&self.module_cache_manager)
}

fn execute_block(
&self,
transactions: &[SignatureVerifiedTransaction],
state_view: &(impl StateView + Sync),
onchain_config: BlockExecutorConfigFromOnchain,
parent_block: Option<&HashValue>,
current_block: Option<HashValue>,
) -> Result<BlockOutput<TransactionOutput>, VMStatus> {
fail_point!("move_adapter::execute_block", |_| {
Err(VMStatus::error(
Expand Down Expand Up @@ -2837,6 +2831,8 @@ impl VMBlockExecutor for AptosVMBlockExecutor {
},
onchain: onchain_config,
},
parent_block,
current_block,
None,
);
if ret.is_ok() {
Expand Down
62 changes: 24 additions & 38 deletions aptos-move/aptos-vm/src/block_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,15 @@ use aptos_vm_types::{
output::VMOutput,
resolver::ResourceGroupSize,
};
use move_binary_format::{
errors::{Location, VMError},
CompiledModule,
};
use move_binary_format::{errors::VMError, CompiledModule};
use move_core_types::{
account_address::AccountAddress,
ident_str,
language_storage::{ModuleId, StructTag},
value::MoveTypeLayout,
vm_status::{StatusCode, VMStatus},
};
use move_vm_runtime::{Module, ModuleStorage, WithRuntimeEnvironment};
use move_vm_runtime::{Module, ModuleStorage};
use move_vm_types::delayed_values::delayed_field_id::DelayedFieldID;
use once_cell::sync::{Lazy, OnceCell};
use std::{
Expand Down Expand Up @@ -420,6 +417,8 @@ impl BlockAptosVM {
AptosModuleExtension,
>,
config: BlockExecutorConfig,
parent_block: Option<&HashValue>,
current_block: Option<HashValue>,
transaction_commit_listener: Option<L>,
) -> Result<BlockOutput<TransactionOutput>, VMStatus> {
let _timer = BLOCK_EXECUTOR_EXECUTE_BLOCK_SECONDS.start_timer();
Expand All @@ -433,42 +432,19 @@ impl BlockAptosVM {

BLOCK_EXECUTOR_CONCURRENCY.set(config.local.concurrency_level as i64);

let (environment, module_cache) = if module_cache_manager.mark_executing() {
let environment = module_cache_manager.get_or_initialize_environment(state_view);
let module_cache = module_cache_manager.module_cache();
(environment, module_cache)
} else {
// Either we do not have global caches , in which case we can create new ones, or
// something went wrong, and we were not able to mark the state as executing. In
// this case, fallback to empty caches. Note that the alert should have been raised
// during marking.
let environment =
AptosEnvironment::new_with_delayed_field_optimization_enabled(state_view);
let module_cache = Arc::new(GlobalModuleCache::empty());
(environment, module_cache)
};

// We should be checking different module cache configurations here.
let module_cache_config = &config.local.module_cache_config;

// Check 1: struct re-indexing map is not too large. If it is, we flush the cache. Also, we
// need to flush modules because they store indices into re-indexing map.
let runtime_environment = environment.runtime_environment();
let struct_name_index_map_size = runtime_environment
.struct_name_index_map_size()
.map_err(|err| err.finish(Location::Undefined).into_vm_status())?;
if struct_name_index_map_size > module_cache_config.max_struct_name_index_map_num_entries {
module_cache.flush_unsync();
runtime_environment.flush_struct_name_and_info_caches();
}

// Check 2: If the module cache is too big, flush it.
if module_cache.size_in_bytes() > module_cache_config.max_module_cache_size_in_bytes {
module_cache.flush_unsync();
if !module_cache_manager.mark_ready(parent_block, current_block) {
return Err(VMStatus::error(
StatusCode::UNKNOWN_INVARIANT_VIOLATION_ERROR,
Some("Unable to mark module caches for block execution as ready".to_string()),
));
}
let (environment, module_cache) = module_cache_manager
.check_ready_and_get_caches(state_view, &config.local.module_cache_config)?;

// Finally, to avoid cold starts, fetch the framework code prior to block execution.
if module_cache.num_modules() == 0 && module_cache_config.prefetch_framework_code {
if module_cache.num_modules() == 0
&& config.local.module_cache_config.prefetch_framework_code
{
let code_storage = state_view.as_aptos_code_storage(environment.clone());
prefetch_aptos_framework(code_storage, &module_cache).map_err(|err| {
alert!("Failed to load Aptos framework to module cache: {:?}", err);
Expand All @@ -489,6 +465,12 @@ impl BlockAptosVM {
transaction_commit_listener,
);

if !module_cache_manager.mark_executing() {
return Err(VMStatus::error(
StatusCode::UNKNOWN_INVARIANT_VIOLATION_ERROR,
Some("Unable to mark block execution start".to_string()),
));
}
let ret = executor.execute_block(environment, signature_verified_block, state_view);
if !module_cache_manager.mark_done() {
return Err(VMStatus::error(
Expand Down Expand Up @@ -542,6 +524,8 @@ impl BlockAptosVM {
AptosModuleExtension,
>,
config: BlockExecutorConfig,
parent_block: Option<&HashValue>,
current_block: Option<HashValue>,
transaction_commit_listener: Option<L>,
) -> Result<BlockOutput<TransactionOutput>, VMStatus> {
Self::execute_block_on_thread_pool::<S, L>(
Expand All @@ -550,6 +534,8 @@ impl BlockAptosVM {
state_view,
module_cache_manager,
config,
parent_block,
current_block,
transaction_commit_listener,
)
}
Expand Down
21 changes: 6 additions & 15 deletions aptos-move/aptos-vm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ pub mod verifier;

pub use crate::aptos_vm::{AptosSimulationVM, AptosVM};
use crate::sharded_block_executor::{executor_client::ExecutorClient, ShardedBlockExecutor};
use aptos_block_executor::code_cache_global_manager::ModuleCacheManager;
use aptos_crypto::HashValue;
use aptos_types::{
block_executor::{
Expand All @@ -137,13 +136,9 @@ use aptos_types::{
signature_verified_transaction::SignatureVerifiedTransaction, BlockOutput,
SignedTransaction, TransactionOutput, VMValidatorResult,
},
vm::modules::AptosModuleExtension,
vm_status::VMStatus,
};
use aptos_vm_types::module_and_script_storage::code_storage::AptosCodeStorage;
use move_binary_format::CompiledModule;
use move_core_types::language_storage::ModuleId;
use move_vm_runtime::Module;
use std::{marker::Sync, sync::Arc};
pub use verifier::view_function::determine_is_view;

Expand All @@ -166,22 +161,14 @@ pub trait VMBlockExecutor: Send + Sync {
/// an old one.
fn new() -> Self;

/// Returns the cache manager responsible for keeping module caches in sync. By default, is
/// [None].
fn module_cache_manager(
&self,
) -> Option<
&ModuleCacheManager<HashValue, ModuleId, CompiledModule, Module, AptosModuleExtension>,
> {
None
}

/// Executes a block of transactions and returns output for each one of them.
fn execute_block(
&self,
transactions: &[SignatureVerifiedTransaction],
state_view: &(impl StateView + Sync),
onchain_config: BlockExecutorConfigFromOnchain,
parent_block: Option<&HashValue>,
current_block: Option<HashValue>,
) -> Result<BlockOutput<TransactionOutput>, VMStatus>;

/// Executes a block of transactions and returns output for each one of them, without applying
Expand All @@ -190,11 +177,15 @@ pub trait VMBlockExecutor: Send + Sync {
&self,
transactions: &[SignatureVerifiedTransaction],
state_view: &(impl StateView + Sync),
parent_block: Option<&HashValue>,
current_block: Option<HashValue>,
) -> Result<Vec<TransactionOutput>, VMStatus> {
self.execute_block(
transactions,
state_view,
BlockExecutorConfigFromOnchain::new_no_block_limit(),
parent_block,
current_block,
)
.map(BlockOutput::into_transaction_outputs_forced)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,17 +136,16 @@ impl<S: StateView + Sync + Send + 'static> ShardedExecutorService<S> {
);
});
s.spawn(move |_| {
// Since we execute blocks in parallel, we cannot share module caches, so each
// thread has its own caches.
let module_cache_manager = ModuleCacheManager::new();
module_cache_manager.mark_ready(None, None);

let ret = BlockAptosVM::execute_block_on_thread_pool(
executor_thread_pool,
&signature_verified_transactions,
aggr_overridden_state_view.as_ref(),
&module_cache_manager,
// Since we execute blocks in parallel, we cannot share module caches, so each
// thread has its own caches.
&ModuleCacheManager::new(),
config,
None,
None,
cross_shard_commit_sender,
)
.map(BlockOutput::into_transaction_outputs_forced);
Expand Down
24 changes: 6 additions & 18 deletions aptos-move/aptos-vm/tests/sharded_block_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,12 +308,8 @@ mod test_utils {
.into_iter()
.map(|t| t.into_txn())
.collect();
let block_executor = AptosVMBlockExecutor::new();
if let Some(module_cache_manager) = block_executor.module_cache_manager() {
module_cache_manager.mark_ready(None, None);
}
let unsharded_txn_output = block_executor
.execute_block_no_limit(&ordered_txns, executor.data_store())
let unsharded_txn_output = AptosVMBlockExecutor::new()
.execute_block_no_limit(&ordered_txns, executor.data_store(), None, None)
.unwrap();
compare_txn_outputs(unsharded_txn_output, sharded_txn_output);
}
Expand Down Expand Up @@ -362,12 +358,8 @@ mod test_utils {
)
.unwrap();

let block_executor = AptosVMBlockExecutor::new();
if let Some(module_cache_manager) = block_executor.module_cache_manager() {
module_cache_manager.mark_ready(None, None);
}
let unsharded_txn_output = block_executor
.execute_block_no_limit(&execution_ordered_txns, executor.data_store())
let unsharded_txn_output = AptosVMBlockExecutor::new()
.execute_block_no_limit(&execution_ordered_txns, executor.data_store(), None, None)
.unwrap();
compare_txn_outputs(unsharded_txn_output, sharded_txn_output);
}
Expand Down Expand Up @@ -420,12 +412,8 @@ mod test_utils {
)
.unwrap();

let block_executor = AptosVMBlockExecutor::new();
if let Some(module_cache_manager) = block_executor.module_cache_manager() {
module_cache_manager.mark_ready(None, None);
}
let unsharded_txn_output = block_executor
.execute_block_no_limit(&execution_ordered_txns, executor.data_store())
let unsharded_txn_output = AptosVMBlockExecutor::new()
.execute_block_no_limit(&execution_ordered_txns, executor.data_store(), None, None)
.unwrap();
compare_txn_outputs(unsharded_txn_output, sharded_txn_output);
}
Expand Down
1 change: 1 addition & 0 deletions aptos-move/block-executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ aptos-types = { workspace = true, features = ["testing"] }
criterion = { workspace = true }
fail = { workspace = true, features = ["failpoints"] }
itertools = { workspace = true }
move-vm-runtime = { workspace = true, features = ["testing"] }
move-vm-types = { workspace = true, features = ["testing"] }
proptest = { workspace = true }
proptest-derive = { workspace = true }
Expand Down
Loading

0 comments on commit d45df1f

Please sign in to comment.