Skip to content

Commit

Permalink
introduce ChunkResultVerifier (#14912)
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse authored Oct 10, 2024
1 parent 543d75f commit 136058a
Show file tree
Hide file tree
Showing 8 changed files with 271 additions and 224 deletions.
61 changes: 0 additions & 61 deletions execution/executor-types/src/ledger_update_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use aptos_storage_interface::cached_state_view::ShardedStateCache;
use aptos_types::{
contract_event::ContractEvent,
epoch_state::EpochState,
ledger_info::LedgerInfoWithSignatures,
proof::accumulator::InMemoryTransactionAccumulator,
state_store::{combine_or_add_sharded_state_updates, ShardedStateUpdates},
transaction::{
Expand Down Expand Up @@ -72,66 +71,6 @@ impl LedgerUpdateOutput {
Ok(())
}

pub fn maybe_select_chunk_ending_ledger_info(
&self,
verified_target_li: &LedgerInfoWithSignatures,
epoch_change_li: Option<&LedgerInfoWithSignatures>,
next_epoch_state: Option<&EpochState>,
) -> Result<Option<LedgerInfoWithSignatures>> {
if verified_target_li.ledger_info().version() + 1
== self.transaction_accumulator.num_leaves()
{
// If the chunk corresponds to the target LI, the target LI can be added to storage.
ensure!(
verified_target_li
.ledger_info()
.transaction_accumulator_hash()
== self.transaction_accumulator.root_hash(),
"Root hash in target ledger info does not match local computation. {:?} != {:?}",
verified_target_li,
self.transaction_accumulator,
);
Ok(Some(verified_target_li.clone()))
} else if let Some(epoch_change_li) = epoch_change_li {
// If the epoch change LI is present, it must match the version of the chunk:

// Verify that the given ledger info corresponds to the new accumulator.
ensure!(
epoch_change_li.ledger_info().transaction_accumulator_hash()
== self.transaction_accumulator.root_hash(),
"Root hash of a given epoch LI does not match local computation. {:?} vs {:?}",
epoch_change_li,
self.transaction_accumulator,
);
ensure!(
epoch_change_li.ledger_info().version() + 1
== self.transaction_accumulator.num_leaves(),
"Version of a given epoch LI does not match local computation. {:?} vs {:?}",
epoch_change_li,
self.transaction_accumulator,
);
ensure!(
epoch_change_li.ledger_info().ends_epoch(),
"Epoch change LI does not carry validator set. version:{}",
epoch_change_li.ledger_info().version(),
);
ensure!(
epoch_change_li.ledger_info().next_epoch_state() == next_epoch_state,
"New validator set of a given epoch LI does not match local computation. {:?} vs {:?}",
epoch_change_li.ledger_info().next_epoch_state(),
next_epoch_state,
);
Ok(Some(epoch_change_li.clone()))
} else {
ensure!(
next_epoch_state.is_none(),
"End of epoch chunk based on local computation but no EoE LedgerInfo provided. version: {:?}",
self.transaction_accumulator.num_leaves().checked_sub(1),
);
Ok(None)
}
}

pub fn ensure_transaction_infos_match(
&self,
transaction_infos: &[TransactionInfo],
Expand Down
174 changes: 88 additions & 86 deletions execution/executor/src/chunk_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ use crate::{
apply_chunk_output::{ensure_no_discard, ensure_no_retry, ApplyChunkOutput},
chunk_commit_queue::{ChunkCommitQueue, ChunkToUpdateLedger},
chunk_output::ChunkOutput,
chunk_result_verifier::{ChunkResultVerifier, StateSyncChunkVerifier},
executed_chunk::ExecutedChunk,
transaction_chunk::TransactionChunkWithProof,
transaction_chunk::{ChunkToApply, ChunkToExecute, TransactionChunk},
},
logging::{LogEntry, LogSchema},
metrics::{APPLY_CHUNK, CHUNK_OTHER_TIMERS, COMMIT_CHUNK, CONCURRENCY_GAUGE, EXECUTE_CHUNK},
};
use anyhow::{ensure, Result};
use aptos_crypto::HashValue;
use anyhow::{anyhow, ensure, Result};
use aptos_drop_helper::DEFAULT_DROPPER;
use aptos_executor_types::{
ChunkCommitNotification, ChunkExecutorTrait, ParsedTransactionOutput, TransactionReplayer,
Expand All @@ -33,8 +33,7 @@ use aptos_storage_interface::{
use aptos_types::{
block_executor::config::BlockExecutorConfigFromOnchain,
contract_event::ContractEvent,
ledger_info::{LedgerInfo, LedgerInfoWithSignatures},
proof::TransactionInfoListWithProof,
ledger_info::LedgerInfoWithSignatures,
state_store::StateViewId,
transaction::{
signature_verified_transaction::SignatureVerifiedTransaction, Transaction,
Expand Down Expand Up @@ -107,14 +106,36 @@ impl<V: VMExecutor> ChunkExecutorTrait for ChunkExecutor<V> {
let _timer = EXECUTE_CHUNK.start_timer();

self.maybe_initialize()?;
self.with_inner(|inner| {
inner.enqueue_chunk(
txn_list_with_proof,
verified_target_li,
epoch_change_li,
"execute",
)
})

// Verify input data.
// In consensus-only mode, txn_list_with_proof is fake.
if !cfg!(feature = "consensus-only-perf-test") {
txn_list_with_proof.verify(
verified_target_li.ledger_info(),
txn_list_with_proof.first_transaction_version,
)?;
}

// Compose enqueue_chunk parameters.
let TransactionListWithProof {
transactions,
events: _,
first_transaction_version: v,
proof: txn_infos_with_proof,
} = txn_list_with_proof;

let chunk = ChunkToExecute {
transactions,
first_version: v.ok_or_else(|| anyhow!("first version is None"))?,
};
let chunk_verifier = Arc::new(StateSyncChunkVerifier {
txn_infos_with_proof,
verified_target_li: verified_target_li.clone(),
epoch_change_li: epoch_change_li.cloned(),
});

// Call the shared implementation.
self.with_inner(|inner| inner.enqueue_chunk(chunk, chunk_verifier, "execute"))
}

fn enqueue_chunk_by_transaction_outputs(
Expand All @@ -126,14 +147,36 @@ impl<V: VMExecutor> ChunkExecutorTrait for ChunkExecutor<V> {
let _guard = CONCURRENCY_GAUGE.concurrency_with(&["chunk", "enqueue_by_outputs"]);
let _timer = APPLY_CHUNK.start_timer();

self.with_inner(|inner| {
inner.enqueue_chunk(
txn_output_list_with_proof,
verified_target_li,
epoch_change_li,
"apply",
// Verify input data.
THREAD_MANAGER.get_exe_cpu_pool().install(|| {
let _timer = CHUNK_OTHER_TIMERS.timer_with(&["apply_chunk__verify"]);
txn_output_list_with_proof.verify(
verified_target_li.ledger_info(),
txn_output_list_with_proof.first_transaction_output_version,
)
})
})?;

// Compose enqueue_chunk parameters.
let TransactionOutputListWithProof {
transactions_and_outputs,
first_transaction_output_version: v,
proof: txn_infos_with_proof,
} = txn_output_list_with_proof;
let (transactions, transaction_outputs) = transactions_and_outputs.into_iter().unzip();

let chunk = ChunkToApply {
transactions,
transaction_outputs,
first_version: v.ok_or_else(|| anyhow!("first version is None"))?,
};
let chunk_verifier = Arc::new(StateSyncChunkVerifier {
txn_infos_with_proof,
verified_target_li: verified_target_li.clone(),
epoch_change_li: epoch_change_li.cloned(),
});

// Call the shared implementation.
self.with_inner(|inner| inner.enqueue_chunk(chunk, chunk_verifier, "apply"))
}

fn update_ledger(&self) -> Result<()> {
Expand Down Expand Up @@ -197,25 +240,6 @@ impl<V: VMExecutor> ChunkExecutorInner<V> {
)?)
}

fn verify_extends_ledger(
&self,
proof: &TransactionInfoListWithProof,
first_version: Version,
my_root_hash: HashValue,
) -> Result<()> {
// In consensus-only mode, we cannot verify the proof against the executed output,
// because the proof returned by the remote peer is an empty one.
if cfg!(feature = "consensus-only-perf-test") {
return Ok(());
}

let num_overlap =
proof.verify_extends_ledger(first_version, my_root_hash, Some(first_version))?;
assert_eq!(num_overlap, 0, "overlapped chunks");

Ok(())
}

fn commit_chunk_impl(&self) -> Result<ExecutedChunk> {
let _timer = CHUNK_OTHER_TIMERS.timer_with(&["commit_chunk_impl__total"]);
let (persisted_state, chunk) = {
Expand Down Expand Up @@ -255,50 +279,35 @@ impl<V: VMExecutor> ChunkExecutorInner<V> {
Ok(chunk)
}

fn verify_chunk<Chunk: TransactionChunkWithProof + Sync>(
chunk: &Chunk,
ledger_info: &LedgerInfo,
first_version: Version,
) -> Result<()> {
// In consensus-only mode, the [TransactionListWithProof](transaction list) is *not*
// verified against the proof and the [LedgerInfoWithSignatures](ledger info).
// This is because the [FakeAptosDB] from where these transactions come from
// returns an empty proof and not an actual proof, so proof verification will
// fail regardless. This function does not skip any transactions that may be
// already in the ledger, because it is not necessary as execution is disabled.
if cfg!(feature = "consensus-only-perf-test") {
return Ok(());
}

THREAD_MANAGER
.get_exe_cpu_pool()
.install(|| chunk.verify_chunk(ledger_info, first_version))
}

// ************************* Chunk Executor Implementation *************************
fn enqueue_chunk<Chunk: TransactionChunkWithProof + Sync>(
fn enqueue_chunk<Chunk: TransactionChunk + Sync>(
&self,
chunk: Chunk,
verified_target_li: &LedgerInfoWithSignatures,
epoch_change_li: Option<&LedgerInfoWithSignatures>,
chunk_verifier: Arc<dyn ChunkResultVerifier + Send + Sync>,
mode_for_log: &'static str,
) -> Result<()> {
let parent_state = self.commit_queue.lock().latest_state();

let first_version = parent_state.next_version();
Self::verify_chunk(&chunk, verified_target_li.ledger_info(), first_version)?;
ensure!(
chunk.first_version() == parent_state.next_version(),
"Chunk carries unexpected first version. Expected: {}, got: {}",
parent_state.next_version(),
chunk.first_version(),
);

let num_txns = chunk.len();

let state_view = self.latest_state_view(&parent_state)?;
let (chunk_output, txn_infos_with_proof) = chunk.into_chunk_output::<V>(state_view)?;
let chunk_output = chunk.into_output::<V>(state_view)?;

// Calculate state snapshot
let (result_state, next_epoch_state, state_checkpoint_output) =
ApplyChunkOutput::calculate_state_checkpoint(
chunk_output,
&self.commit_queue.lock().latest_state(),
None, // append_state_checkpoint_to_block
Some(txn_infos_with_proof.state_checkpoint_hashes()),
Some(chunk_verifier.state_checkpoint_hashes()),
false, // is_block
)?;

Expand All @@ -309,9 +318,7 @@ impl<V: VMExecutor> ChunkExecutorInner<V> {
result_state,
state_checkpoint_output,
next_epoch_state,
verified_target_li: verified_target_li.clone(),
epoch_change_li: epoch_change_li.cloned(),
txn_infos_with_proof,
chunk_verifier,
})?;

info!(
Expand All @@ -336,29 +343,24 @@ impl<V: VMExecutor> ChunkExecutorInner<V> {
result_state,
state_checkpoint_output,
next_epoch_state,
verified_target_li,
epoch_change_li,
txn_infos_with_proof,
chunk_verifier,
} = chunk;

let first_version = parent_accumulator.num_leaves();
self.verify_extends_ledger(
&txn_infos_with_proof,
first_version,
parent_accumulator.root_hash(),
)?;

let (ledger_update_output, to_discard, to_retry) = {
let _timer = CHUNK_OTHER_TIMERS.timer_with(&["chunk_update_ledger__calculate"]);
ApplyChunkOutput::calculate_ledger_update(state_checkpoint_output, parent_accumulator)?
ApplyChunkOutput::calculate_ledger_update(
state_checkpoint_output,
parent_accumulator.clone(),
)?
};

ensure!(to_discard.is_empty(), "Unexpected discard.");
ensure!(to_retry.is_empty(), "Unexpected retry.");
ledger_update_output
.ensure_transaction_infos_match(&txn_infos_with_proof.transaction_infos)?;
let ledger_info_opt = ledger_update_output.maybe_select_chunk_ending_ledger_info(
&verified_target_li,
epoch_change_li.as_ref(),
chunk_verifier.verify_chunk_result(&parent_accumulator, &ledger_update_output)?;

let ledger_info_opt = chunk_verifier.maybe_select_chunk_ending_ledger_info(
&ledger_update_output,
next_epoch_state.as_ref(),
)?;

Expand Down Expand Up @@ -648,7 +650,7 @@ impl<V: VMExecutor> ChunkExecutorInner<V> {
) -> Result<()> {
let num_txns = (end_version - begin_version) as usize;
let txn_infos: Vec<_> = transaction_infos.drain(..num_txns).collect();
let txns_and_outputs = multizip((
let (txns, txn_outs) = multizip((
transactions.drain(..num_txns),
txn_infos.iter(),
write_sets.drain(..num_txns),
Expand All @@ -666,10 +668,10 @@ impl<V: VMExecutor> ChunkExecutorInner<V> {
),
)
})
.collect();
.unzip();

let state_view = self.latest_state_view(latest_view.state())?;
let chunk_output = ChunkOutput::by_transaction_output(txns_and_outputs, state_view)?;
let chunk_output = ChunkOutput::by_transaction_output(txns, txn_outs, state_view)?;
let (executed_batch, to_discard, to_retry) = chunk_output.apply_to_ledger(
latest_view,
Some(
Expand Down
14 changes: 6 additions & 8 deletions execution/executor/src/components/chunk_commit_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@

#![forbid(unsafe_code)]

use crate::components::executed_chunk::ExecutedChunk;
use crate::components::{
chunk_result_verifier::ChunkResultVerifier, executed_chunk::ExecutedChunk,
};
use anyhow::{anyhow, ensure, Result};
use aptos_executor_types::state_checkpoint_output::StateCheckpointOutput;
use aptos_storage_interface::{state_delta::StateDelta, DbReader, ExecutedTrees};
use aptos_types::{
epoch_state::EpochState,
ledger_info::LedgerInfoWithSignatures,
proof::{accumulator::InMemoryTransactionAccumulator, TransactionInfoListWithProof},
epoch_state::EpochState, proof::accumulator::InMemoryTransactionAccumulator,
transaction::Version,
};
use std::{collections::VecDeque, sync::Arc};
Expand All @@ -23,11 +23,9 @@ pub(crate) struct ChunkToUpdateLedger {
/// If set, this is the new epoch info that should be changed to if this is committed.
pub next_epoch_state: Option<EpochState>,

/// the below are from the input -- can be checked / used only after the transaction accumulator
/// from the input -- can be checked / used only after the transaction accumulator
/// is updated.
pub verified_target_li: LedgerInfoWithSignatures,
pub epoch_change_li: Option<LedgerInfoWithSignatures>,
pub txn_infos_with_proof: TransactionInfoListWithProof,
pub chunk_verifier: Arc<dyn ChunkResultVerifier + Send + Sync>,
}

/// It's a two stage pipeline:
Expand Down
Loading

0 comments on commit 136058a

Please sign in to comment.