Skip to content

Commit

Permalink
Some mempool code
Browse files Browse the repository at this point in the history
  • Loading branch information
vusirikala committed Nov 7, 2024
1 parent 1f983df commit 46b25d9
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 60 deletions.
45 changes: 31 additions & 14 deletions mempool/src/core_mempool/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use aptos_logger::prelude::*;
use aptos_types::{
account_address::AccountAddress,
mempool_status::{MempoolStatus, MempoolStatusCode},
transaction::{use_case::UseCaseKey, SignedTransaction},
transaction::{use_case::UseCaseKey, ReplayProtector, SignedTransaction},
vm_status::DiscardedVMStatus,
};
use std::{
Expand All @@ -33,6 +33,8 @@ use std::{
time::{Duration, Instant, SystemTime},
};

use super::AccountSequenceNumberInfo;

pub struct Mempool {
// Stores the metadata of all transactions in mempool (of all states).
transactions: TransactionStore,
Expand Down Expand Up @@ -276,7 +278,7 @@ impl Mempool {
&mut self,
txn: SignedTransaction,
ranking_score: u64,
db_sequence_number: u64,
account_sequence_number: AccountSequenceNumberInfo,
timeline_state: TimelineState,
client_submitted: bool,
// The time at which the transaction was inserted into the mempool of the
Expand All @@ -287,19 +289,34 @@ impl Mempool {
) -> MempoolStatus {
trace!(
LogSchema::new(LogEntry::AddTxn)
.txns(TxnsLog::new_txn(txn.sender(), txn.sequence_number())),
committed_seq_number = db_sequence_number
.txns(TxnsLog::new_txn(txn.sender(), txn.replay_protector())),
committed_seq_number = account_sequence_number
);

// don't accept old transactions (e.g. seq is less than account's current seq_number)
if txn.sequence_number() < db_sequence_number {
return MempoolStatus::new(MempoolStatusCode::InvalidSeqNumber).with_message(format!(
"transaction sequence number is {}, current sequence number is {}",
txn.sequence_number(),
db_sequence_number,
));
}

if let ReplayProtector::SequenceNumber(txn_seq_num) = txn.replay_protector() {
// don't accept old transactions (e.g. seq is less than account's current seq_number)
match &account_sequence_number {
AccountSequenceNumberInfo::Required(account_sequence_number) => {
if txn_seq_num < account_sequence_number {
return MempoolStatus::new(MempoolStatusCode::InvalidSeqNumber).with_message(
format!(
"transaction sequence number is {}, current sequence number is {}",
txn_seq_num, account_sequence_number,
),
);
}
},
AccountSequenceNumberInfo::NotRequired => {
return MempoolStatus::new(MempoolStatusCode::InvalidSeqNumber).with_message(
format!(
"transaction has sequence number {}, but not sequence number provided for sender's account",
txn_seq_num,
),
);
}
}
};

let now = SystemTime::now();
let expiration_time =
aptos_infallible::duration_since_epoch_at(&now) + self.system_transaction_timeout;
Expand All @@ -310,7 +327,7 @@ impl Mempool {
expiration_time,
ranking_score,
timeline_state,
db_sequence_number,
account_sequence_number,
now,
client_submitted,
priority.clone(),
Expand Down
2 changes: 1 addition & 1 deletion mempool/src/core_mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub mod transaction;
mod transaction_store;

pub use self::{
mempool::Mempool as CoreMempool, transaction::TimelineState,
mempool::Mempool as CoreMempool, transaction::{AccountSequenceNumberInfo, TimelineState},
transaction_store::TXN_INDEX_ESTIMATED_BYTES,
};
#[cfg(test)]
Expand Down
21 changes: 15 additions & 6 deletions mempool/src/core_mempool/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use crate::{core_mempool::TXN_INDEX_ESTIMATED_BYTES, counters, network::BroadcastPeerPriority};
use aptos_crypto::HashValue;
use aptos_types::{account_address::AccountAddress, transaction::SignedTransaction};
use aptos_types::{account_address::AccountAddress, transaction::{ReplayProtector, SignedTransaction}};
use serde::{Deserialize, Serialize};
use std::{
mem::size_of,
Expand All @@ -15,6 +15,15 @@ use std::{
/// Estimated per-txn size minus the raw transaction
pub const TXN_FIXED_ESTIMATED_BYTES: usize = size_of::<MempoolTransaction>();

// This is the sequence number for an account.
// For the sender of regular transactions, the sequence number is required.
// For the sender of orderless transactions, we don't calculate the sequence number.
pub enum AccountSequenceNumberInfo {
// Question: Please suggest some better names.
Required(u64),
NotRequired,
}

#[derive(Clone, Debug)]
pub struct MempoolTransaction {
pub txn: SignedTransaction,
Expand All @@ -35,15 +44,15 @@ impl MempoolTransaction {
expiration_time: Duration,
ranking_score: u64,
timeline_state: TimelineState,
seqno: u64,
account_sequence_number: AccountSequenceNumberInfo,
insertion_time: SystemTime,
client_submitted: bool,
priority_of_sender: Option<BroadcastPeerPriority>,
) -> Self {
Self {
sequence_info: SequenceInfo {
transaction_sequence_number: txn.sequence_number(),
account_sequence_number: seqno,
transaction_replay_protector: txn.replay_protector(),
account_sequence_number,
},
txn,
expiration_time,
Expand Down Expand Up @@ -86,8 +95,8 @@ pub enum TimelineState {

#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub struct SequenceInfo {
pub transaction_sequence_number: u64,
pub account_sequence_number: u64,
pub transaction_replay_protector: ReplayProtector,
pub account_sequence_number: AccountSequenceNumberInfo,
}

#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
Expand Down
24 changes: 12 additions & 12 deletions mempool/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ use anyhow::Error;
use aptos_config::network_id::{NetworkId, PeerNetworkId};
use aptos_logger::Schema;
use aptos_mempool_notifications::MempoolCommitNotification;
use aptos_types::account_address::AccountAddress;
use aptos_types::{account_address::AccountAddress, transaction::ReplayProtector};
use serde::Serialize;
use std::{fmt, fmt::Write, time::SystemTime};

#[derive(Default)]
pub struct TxnsLog {
txns: Vec<(AccountAddress, u64, Option<String>, Option<SystemTime>)>,
txns: Vec<(AccountAddress, ReplayProtector, Option<String>, Option<SystemTime>)>,
len: usize,
max_displayed: usize,
}
Expand All @@ -31,39 +31,39 @@ impl TxnsLog {
}
}

pub fn new_txn(account: AccountAddress, seq_num: u64) -> Self {
pub fn new_txn(account: AccountAddress, replay_protector: ReplayProtector) -> Self {
Self {
txns: vec![(account, seq_num, None, None)],
txns: vec![(account, replay_protector, None, None)],
len: 0,
max_displayed: usize::MAX,
}
}

pub fn add(&mut self, account: AccountAddress, seq_num: u64) {
pub fn add(&mut self, account: AccountAddress, replay_protector: ReplayProtector) {
if self.txns.len() < self.max_displayed {
self.txns.push((account, seq_num, None, None));
self.txns.push((account, replay_protector, None, None));
}
self.len += 1;
}

pub fn add_with_status(&mut self, account: AccountAddress, seq_num: u64, status: &str) {
pub fn add_with_status(&mut self, account: AccountAddress, replay_protector: ReplayProtector, status: &str) {
if self.txns.len() < self.max_displayed {
self.txns
.push((account, seq_num, Some(status.to_string()), None));
.push((account, replay_protector, Some(status.to_string()), None));
}
self.len += 1;
}

pub fn add_full_metadata(
&mut self,
account: AccountAddress,
seq_num: u64,
replay_protector: ReplayProtector,
status: &str,
timestamp: SystemTime,
) {
if self.txns.len() < self.max_displayed {
self.txns
.push((account, seq_num, Some(status.to_string()), Some(timestamp)));
.push((account, replay_protector, Some(status.to_string()), Some(timestamp)));
}
self.len += 1;
}
Expand All @@ -77,8 +77,8 @@ impl fmt::Display for TxnsLog {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut txns = "".to_string();

for (account, seq_num, status, timestamp) in self.txns.iter() {
let mut txn = format!("{}:{}", account, seq_num);
for (account, replay_protector, status, timestamp) in self.txns.iter() {
let mut txn = format!("{}:{}", account, replay_protector);
if let Some(status) = status {
write!(txn, ":{}", status)?;
}
Expand Down
67 changes: 40 additions & 27 deletions mempool/src/shared_mempool/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! Tasks that are executed by coordinators (short-lived compared to coordinators)
use super::types::MempoolMessageId;
use crate::{
core_mempool::{CoreMempool, TimelineState},
core_mempool::{AccountSequenceNumberInfo, CoreMempool, TimelineState},
counters,
logging::{LogEntry, LogEvent, LogSchema},
network::{BroadcastError, BroadcastPeerPriority, MempoolSyncMsg},
Expand All @@ -30,11 +30,7 @@ use aptos_metrics_core::HistogramTimer;
use aptos_network::application::interface::NetworkClientInterface;
use aptos_storage_interface::state_view::LatestDbStateCheckpointView;
use aptos_types::{
account_address::AccountAddress,
mempool_status::{MempoolStatus, MempoolStatusCode},
on_chain_config::{OnChainConfigPayload, OnChainConfigProvider, OnChainConsensusConfig},
transaction::SignedTransaction,
vm_status::{DiscardedVMStatus, StatusCode},
account_address::AccountAddress, account_config::account, mempool_status::{MempoolStatus, MempoolStatusCode}, on_chain_config::{OnChainConfigPayload, OnChainConfigProvider, OnChainConsensusConfig}, transaction::{ReplayProtector, SignedTransaction}, vm_status::{DiscardedVMStatus, StatusCode}
};
use aptos_vm_validator::vm_validator::{get_account_sequence_number, TransactionValidation};
use futures::{channel::oneshot, stream::FuturesUnordered};
Expand Down Expand Up @@ -312,39 +308,56 @@ where
.expect("Failed to get latest state checkpoint view.");

// Track latency: fetching seq number
let seq_numbers = IO_POOL.install(|| {
let account_seq_numbers = IO_POOL.install(|| {
transactions
.par_iter()
.map(|(t, _, _)| {
get_account_sequence_number(&state_view, t.sender()).map_err(|e| {
error!(LogSchema::new(LogEntry::DBError).error(&e));
counters::DB_ERROR.inc();
e
})
match t.replay_protector() {
ReplayProtector::Nonce(_) => Ok(AccountSequenceNumberInfo::NotRequired),
ReplayProtector::SequenceNumber(_) => {
get_account_sequence_number(&state_view, t.sender())
.map(|sequence_number| AccountSequenceNumberInfo::Required(sequence_number))
.map_err(|e| {
error!(LogSchema::new(LogEntry::DBError).error(&e));
counters::DB_ERROR.inc();
e
})
},
}
})
.collect::<Vec<_>>()
});

// Track latency for storage read fetching sequence number
let storage_read_latency = start_storage_read.elapsed();
counters::PROCESS_TXN_BREAKDOWN_LATENCY
.with_label_values(&[counters::FETCH_SEQ_NUM_LABEL])
.observe(storage_read_latency.as_secs_f64() / transactions.len() as f64);



let transactions: Vec<_> = transactions
.into_iter()
.enumerate()
.filter_map(|(idx, (t, ready_time_at_sender, priority))| {
if let Ok(sequence_num) = seq_numbers[idx] {
if t.sequence_number() >= sequence_num {
return Some((t, sequence_num, ready_time_at_sender, priority));
} else {
statuses.push((
t,
(
MempoolStatus::new(MempoolStatusCode::VmError),
Some(DiscardedVMStatus::SEQUENCE_NUMBER_TOO_OLD),
),
));
if let Ok(account_sequence_num) = account_seq_numbers[idx] {
match account_sequence_num {
AccountSequenceNumberInfo::Required(sequence_num) => {
if t.sequence_number() >= sequence_num {
return Some((t, sequence_num, ready_time_at_sender, priority));
} else {
statuses.push((
t,
(
MempoolStatus::new(MempoolStatusCode::VmError),
Some(DiscardedVMStatus::SEQUENCE_NUMBER_TOO_OLD),
),
));
}
Some((t, AccountSequenceNumberInfo::Required(sequence_num), ready_time_at_sender, priority))
},
AccountSequenceNumberInfo::NotRequired => {
Some((t, AccountSequenceNumberInfo::NotRequired, ready_time_at_sender, priority))
},
}
} else {
// Failed to get transaction
Expand Down Expand Up @@ -377,7 +390,7 @@ where
fn validate_and_add_transactions<NetworkClient, TransactionValidator>(
transactions: Vec<(
SignedTransaction,
u64,
AccountSequenceNumberInfo,
Option<u64>,
Option<BroadcastPeerPriority>,
)>,
Expand Down Expand Up @@ -408,7 +421,7 @@ fn validate_and_add_transactions<NetworkClient, TransactionValidator>(
vm_validation_timer.stop_and_record();
{
let mut mempool = smp.mempool.lock();
for (idx, (transaction, sequence_info, ready_time_at_sender, priority)) in
for (idx, (transaction, account_sequence_info, ready_time_at_sender, priority)) in
transactions.into_iter().enumerate()
{
if let Ok(validation_result) = &validation_results[idx] {
Expand All @@ -418,7 +431,7 @@ fn validate_and_add_transactions<NetworkClient, TransactionValidator>(
let mempool_status = mempool.add_txn(
transaction.clone(),
ranking_score,
sequence_info,
account_sequence_info,
timeline_state,
client_submitted,
ready_time_at_sender,
Expand Down

0 comments on commit 46b25d9

Please sign in to comment.