Skip to content

Commit

Permalink
mempool: allow capping number of txs in mempool
Browse files Browse the repository at this point in the history
Updating transaction proofs is a very expensive procedure. Capping
the number of transactions in the mempool addresses this problem.
This cap allows miners to work on the block proof without waiting too
long for the updating of the transaction proofs.
  • Loading branch information
Sword-Smith committed Oct 15, 2024
1 parent 1f570ec commit 12c2a87
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 9 deletions.
10 changes: 10 additions & 0 deletions src/config_models/cli_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ pub struct Args {
#[clap(long, default_value = "1G", value_name = "SIZE")]
pub max_mempool_size: ByteSize,

/// Maximum number of transactions permitted in the mempool.
///
/// If too much time is spent updating transaction proofs, this
/// value can be capped.
///
/// E.g. --max-mempool-num-tx=4
#[clap(long)]
pub max_mempool_num_tx: Option<usize>,

/// Port on which to listen for peer connections.
#[clap(long, default_value = "9798", value_name = "PORT")]
pub peer_port: u16,
Expand Down Expand Up @@ -160,6 +169,7 @@ mod cli_args_tests {
IpAddr::from(Ipv6Addr::UNSPECIFIED),
default_args.listen_addr
);
assert_eq!(None, default_args.max_mempool_num_tx);
}

#[test]
Expand Down
6 changes: 5 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,11 @@ pub async fn initialize(cli_args: cli_args::Args) -> Result<()> {
archival_state,
};
let blockchain_state = BlockchainState::Archival(blockchain_archival_state);
let mempool = Mempool::new(cli_args.max_mempool_size, latest_block.hash());
let mempool = Mempool::new(
cli_args.max_mempool_size,
cli_args.max_mempool_num_tx,
latest_block.hash(),
);
let mut global_state_lock = GlobalStateLock::new(
wallet_state,
blockchain_state,
Expand Down
88 changes: 82 additions & 6 deletions src/models/state/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,13 @@ type LookupItem<'a> = (TransactionKernelId, &'a Transaction);

#[derive(Debug, Clone, PartialEq, Eq, GetSize)]
pub struct Mempool {
/// Maximum size this data structure may take up in memory.
max_total_size: usize,

/// If set, represents the maximum number of transactions allowed in the
/// mempool. If None, mempool is only restricted by size.
max_length: Option<usize>,

/// Contains transactions, with a mapping from transaction ID to transaction.
/// Maintain for constant lookup
tx_dictionary: HashMap<TransactionKernelId, Transaction>,
Expand All @@ -80,12 +85,17 @@ pub struct Mempool {

impl Mempool {
/// instantiate a new, empty `Mempool`
pub fn new(max_total_size: ByteSize, tip_digest: Digest) -> Self {
pub fn new(
max_total_size: ByteSize,
max_num_transactions: Option<usize>,
tip_digest: Digest,
) -> Self {
let table = Default::default();
let queue = Default::default();
let max_total_size = max_total_size.0.try_into().unwrap();
Self {
max_total_size,
max_length: max_num_transactions,
tx_dictionary: table,
queue,
tip_digest,
Expand Down Expand Up @@ -180,6 +190,7 @@ impl Mempool {
"mempool's table and queue length must agree prior to shrink"
);
self.shrink_to_max_size();
self.shrink_to_max_length();
assert_eq!(
self.tx_dictionary.len(),
self.queue.len(),
Expand Down Expand Up @@ -404,7 +415,7 @@ impl Mempool {
}

/// Shrink the memory pool to the value of its `max_size` field.
/// Likely computes in O(n)
/// Likely computes in O(n).
fn shrink_to_max_size(&mut self) {
// Repeately remove the least valuable transaction
while self.get_size() > self.max_total_size && self.pop_min().is_some() {
Expand All @@ -414,6 +425,18 @@ impl Mempool {
self.shrink_to_fit()
}

/// Shrink the memory pool to the value of its `max_length` field,
/// if that field is set.
fn shrink_to_max_length(&mut self) {
if let Some(max_length) = self.max_length {
while self.len() > max_length && self.pop_min().is_some() {
continue;
}
}

self.shrink_to_fit()
}

/// Shrinks internal data structures as much as possible.
/// Computes in O(n) (Likely)
fn shrink_to_fit(&mut self) {
Expand Down Expand Up @@ -488,7 +511,7 @@ mod tests {
pub async fn insert_then_get_then_remove_then_get() {
let network = Network::Main;
let genesis_block = Block::genesis_block(network);
let mut mempool = Mempool::new(ByteSize::gb(1), genesis_block.hash());
let mut mempool = Mempool::new(ByteSize::gb(1), None, genesis_block.hash());

let txs = make_plenty_mock_transaction_with_primitive_witness(2);
let transaction_digests = txs.iter().map(|tx| tx.kernel.txid()).collect_vec();
Expand Down Expand Up @@ -527,7 +550,7 @@ mod tests {
/// Create a mempool with n transactions.
async fn setup_mock_mempool(transactions_count: usize, network: Network) -> Mempool {
let genesis_block = Block::genesis_block(network);
let mut mempool = Mempool::new(ByteSize::gb(1), genesis_block.hash());
let mut mempool = Mempool::new(ByteSize::gb(1), None, genesis_block.hash());
let txs = make_plenty_mock_transaction_with_primitive_witness(transactions_count);
for tx in txs {
mempool.insert(&tx);
Expand Down Expand Up @@ -576,7 +599,7 @@ mod tests {
async fn prune_stale_transactions() {
let network = Network::Main;
let genesis_block = Block::genesis_block(network);
let mut mempool = Mempool::new(ByteSize::gb(1), genesis_block.hash());
let mut mempool = Mempool::new(ByteSize::gb(1), None, genesis_block.hash());
assert!(
mempool.is_empty(),
"Mempool must be empty after initialization"
Expand Down Expand Up @@ -677,7 +700,7 @@ mod tests {
bob.wallet_state.add_expected_utxos(expected_utxos).await;

// Add this transaction to a mempool
let mut mempool = Mempool::new(ByteSize::gb(1), block_1.hash());
let mut mempool = Mempool::new(ByteSize::gb(1), None, block_1.hash());
mempool.insert(&tx_by_bob);

// Create another transaction that's valid to be included in block 2, but isn't actually
Expand Down Expand Up @@ -987,6 +1010,59 @@ mod tests {
}
}

#[traced_test]
#[tokio::test]
async fn max_len_none() {
let network = Network::Main;
let genesis_block = Block::genesis_block(network);
let txs = make_plenty_mock_transaction_with_primitive_witness(11);
let mut mempool = Mempool::new(ByteSize::gb(1), None, genesis_block.hash());

for tx in txs.iter() {
mempool.insert(tx);
}

assert_eq!(
11,
mempool.len(),
"All transactions are inserted into mempool"
);
}

#[traced_test]
#[tokio::test]
async fn max_len_is_respected() {
let network = Network::Main;
let genesis_block = Block::genesis_block(network);
let txs = make_plenty_mock_transaction_with_primitive_witness(20);

let mut expected_txs = txs.clone();
expected_txs.sort_by_key(|x| x.fee_density());
expected_txs.reverse();

for i in 0..10 {
let mut mempool = Mempool::new(ByteSize::gb(1), Some(i), genesis_block.hash());
for tx in txs.iter() {
mempool.insert(tx);
}

assert_eq!(
i,
mempool.len(),
"Only {i} transactions are permitted in the mempool"
);

let expected_txs = expected_txs.iter().take(i).cloned().collect_vec();

let mut mempool_iter = mempool.get_sorted_iter();
for expected_tx in expected_txs.iter() {
let (txid, fee_density) = mempool_iter.next().unwrap();
assert_eq!(expected_tx, mempool.get(txid).unwrap());
assert_eq!(expected_tx.fee_density(), fee_density);
}
}
}

#[traced_test]
#[tokio::test]
async fn get_mempool_size() {
Expand Down
7 changes: 5 additions & 2 deletions src/tests/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use std::time::SystemTime;
use anyhow::Result;
use bytes::Bytes;
use bytes::BytesMut;
use bytesize::ByteSize;
use futures::sink;
use futures::stream;
use futures::task::Context;
Expand Down Expand Up @@ -210,11 +209,15 @@ pub(crate) async fn mock_genesis_global_state(
light_state,
archival_state,
});
let mempool = Mempool::new(ByteSize::gb(1), genesis_block.hash());
let cli_args = cli_args::Args {
network,
..Default::default()
};
let mempool = Mempool::new(
cli_args.max_mempool_size,
cli_args.max_mempool_num_tx,
genesis_block.hash(),
);

let wallet_state = mock_genesis_wallet_state(wallet, network).await;

Expand Down

0 comments on commit 12c2a87

Please sign in to comment.