From 82997ca8402abac27d33734b954e986a8bbb8ede Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Thu, 19 Sep 2024 15:22:33 +0200 Subject: [PATCH 01/10] Introduce a tx cache --- Cargo.lock | 60 ++++++++++++++++++++++++++++++++++++++++- Cargo.toml | 1 + src/new_index/schema.rs | 36 ++++++++++++++++++++++--- 3 files changed, 93 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c399a9585..31c150c61 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28,6 +28,18 @@ dependencies = [ "cpufeatures", ] +[[package]] +name = "ahash" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.2" @@ -37,6 +49,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -290,6 +308,15 @@ dependencies = [ "serde", ] +[[package]] +name = "bitcoin_slices" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8041a1be831c809ada090db2e3bd1469c65b72321bb2f31d7f56261eefc8321" +dependencies = [ + "hashbrown 0.14.5", +] + [[package]] name = "bitcoincore-rpc" version = "0.18.0" @@ -738,6 +765,7 @@ dependencies = [ "bincode", "bitcoin 0.31.2", "bitcoin-test-data", + "bitcoin_slices", "bitcoind", "clap 2.34.0", "criterion", @@ -1022,6 +1050,16 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash", + "allocator-api2", +] + [[package]] name = "hermit-abi" version = "0.1.19" @@ -1194,7 +1232,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" dependencies = [ "autocfg", - "hashbrown", + "hashbrown 0.12.3", ] [[package]] @@ -3077,6 +3115,26 @@ dependencies = [ "rustix", ] +[[package]] +name = "zerocopy" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "zip" version = "0.6.6" diff --git a/Cargo.toml b/Cargo.toml index 6bae3b716..cf8705ebf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,6 +56,7 @@ tokio = { version = "1", features = ["sync", "macros"] } # optional dependencies for electrum-discovery electrum-client = { version = "0.8", optional = true } +bitcoin_slices = { version = "0.8.0", features = ["slice_cache"] } [dev-dependencies] diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index 1ea2a97ef..d7086c843 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -2,10 +2,12 @@ use bitcoin::hashes::sha256d::Hash as Sha256dHash; #[cfg(not(feature = "liquid"))] use bitcoin::merkle_tree::MerkleBlock; use bitcoin::VarInt; +use bitcoin_slices::SliceCache; use crypto::digest::Digest; use crypto::sha2::Sha256; use hex::FromHex; use itertools::Itertools; +use prometheus::IntCounter; use rayon::prelude::*; #[cfg(not(feature = "liquid"))] @@ -17,9 +19,12 @@ use elements::{ AssetId, }; -use std::collections::{BTreeSet, HashMap, HashSet}; use std::path::Path; use std::sync::{Arc, RwLock}; +use std::{ + collections::{BTreeSet, HashMap, HashSet}, + sync::Mutex, +}; use crate::chain::{ BlockHash, BlockHeader, Network, OutPoint, Script, Transaction, TxOut, Txid, Value, @@ -198,6 +203,11 @@ pub struct ChainQuery { light_mode: bool, duration: HistogramVec, network: Network, + + /// By default 1GB of cached transactions, `Txid -> Transaction` + txs_cache: Mutex>, + cache_hit: IntCounter, + cache_miss: IntCounter, } // TODO: &[Block] should be an iterator / a queue. @@ -360,6 +370,9 @@ impl ChainQuery { HistogramOpts::new("query_duration", "Index query duration (in seconds)"), &["name"], ), + txs_cache: Mutex::new(SliceCache::new(1_000_000_000)), + cache_hit: metrics.counter(MetricOpts::new("tx_cache_hit", "Tx cache Hit")), + cache_miss: metrics.counter(MetricOpts::new("tx_cache_miss", "Tx cache Miss")), } } @@ -838,7 +851,16 @@ impl ChainQuery { pub fn lookup_raw_txn(&self, txid: &Txid, blockhash: Option<&BlockHash>) -> Option { let _timer = self.start_timer("lookup_raw_txn"); - if self.light_mode { + if let Ok(cache) = self.txs_cache.lock() { + if let Some(bytes) = cache.get(txid) { + self.cache_hit.inc(); + return Some(bytes.to_vec()); + } else { + self.cache_miss.inc(); + } + } + + let result = if self.light_mode { let queried_blockhash = blockhash.map_or_else(|| self.tx_confirming_block(txid).map(|b| b.hash), |_| None); let blockhash = blockhash.or_else(|| queried_blockhash.as_ref())?; @@ -848,10 +870,18 @@ impl ChainQuery { .gettransaction_raw(txid, blockhash, false) .ok()?; let txhex = txval.as_str().expect("valid tx from bitcoind"); - Some(Bytes::from_hex(txhex).expect("valid tx from bitcoind")) + let vec = Bytes::from_hex(txhex).expect("valid tx from bitcoind"); + + Some(vec) } else { self.store.txstore_db.get(&TxRow::key(&txid[..])) + }; + if let Some(result) = result.as_ref() { + if let Ok(mut cache) = self.txs_cache.lock() { + let _ = cache.insert(txid.clone(), result); + } } + result } pub fn lookup_txo(&self, outpoint: &OutPoint) -> Option { From 7dee400a998445065d051a3ea781a2fb465decd0 Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Thu, 19 Sep 2024 15:30:03 +0200 Subject: [PATCH 02/10] Add default run by running `cargo run` electrs is started otherwise you need to specify the indended binary to run: `cargo run --bin electrs` --- Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.toml b/Cargo.toml index cf8705ebf..8a473968f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ keywords = ["bitcoin", "electrum", "server", "index", "database"] documentation = "https://docs.rs/electrs/" readme = "README.md" edition = "2018" +default-run = "electrs" [features] liquid = ["elements"] From 1a680b0d84be270c80962d71d8947911d04e7d6a Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Thu, 19 Sep 2024 15:44:42 +0200 Subject: [PATCH 03/10] Make the tx cache size configurable --- src/config.rs | 9 +++++++++ src/new_index/schema.rs | 2 +- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/config.rs b/src/config.rs index 8696ecf8f..86d8f2bc3 100644 --- a/src/config.rs +++ b/src/config.rs @@ -41,6 +41,9 @@ pub struct Config { pub electrum_banner: String, pub electrum_rpc_logging: Option, + /// Tx cache size in megabytes + pub tx_cache_size: usize, + #[cfg(feature = "liquid")] pub parent_network: BNetwork, #[cfg(feature = "liquid")] @@ -191,6 +194,11 @@ impl Config { .long("electrum-rpc-logging") .help(&rpc_logging_help) .takes_value(true), + ).arg( + Arg::with_name("tx_cache_size") + .long("tx-cache-size") + .help("The amount of MB for a in-memory cache for transactions.") + .default_value("1000") ); #[cfg(unix)] @@ -403,6 +411,7 @@ impl Config { index_unspendables: m.is_present("index_unspendables"), cors: m.value_of("cors").map(|s| s.to_string()), precache_scripts: m.value_of("precache_scripts").map(|s| s.to_string()), + tx_cache_size: value_t_or_exit!(m, "tx_cache_size", usize), #[cfg(feature = "liquid")] parent_network, diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index d7086c843..e923a209a 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -370,7 +370,7 @@ impl ChainQuery { HistogramOpts::new("query_duration", "Index query duration (in seconds)"), &["name"], ), - txs_cache: Mutex::new(SliceCache::new(1_000_000_000)), + txs_cache: Mutex::new(SliceCache::new(config.tx_cache_size << 20)), cache_hit: metrics.counter(MetricOpts::new("tx_cache_hit", "Tx cache Hit")), cache_miss: metrics.counter(MetricOpts::new("tx_cache_miss", "Tx cache Miss")), } From 32d23167533bb79f6b8a4d8d4b6b13fd6b8b21d8 Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Fri, 20 Sep 2024 09:05:28 +0200 Subject: [PATCH 04/10] refactor out method to add to tx cache --- src/new_index/schema.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index e923a209a..5751ca6d9 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -877,13 +877,19 @@ impl ChainQuery { self.store.txstore_db.get(&TxRow::key(&txid[..])) }; if let Some(result) = result.as_ref() { - if let Ok(mut cache) = self.txs_cache.lock() { - let _ = cache.insert(txid.clone(), result); - } + self.add_txs_to_cache(&[(*txid, result)]); } result } + pub fn add_txs_to_cache>(&self, txs: &[(Txid, T)]) { + if let Ok(mut cache) = self.txs_cache.lock() { + for (txid, tx) in txs { + let _ = cache.insert(*txid, &tx.as_ref()); + } + } + } + pub fn lookup_txo(&self, outpoint: &OutPoint) -> Option { let _timer = self.start_timer("lookup_txo"); lookup_txo(&self.store.txstore_db, outpoint) From 18fad46d76e2d5ace0042817323130fa3b32b0fc Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Fri, 20 Sep 2024 09:14:52 +0200 Subject: [PATCH 05/10] add method to check cache misses --- src/new_index/schema.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index 5751ca6d9..4612f64d4 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -882,6 +882,18 @@ impl ChainQuery { result } + pub fn txs_cache_miss(&self, txids: &[Txid]) -> Vec { + let mut result = vec![]; + if let Ok(cache) = self.txs_cache.lock() { + for txid in txids { + if !cache.contains(txid) { + result.push(*txid); + } + } + } + result + } + pub fn add_txs_to_cache>(&self, txs: &[(Txid, T)]) { if let Ok(mut cache) = self.txs_cache.lock() { for (txid, tx) in txs { From bfc323a4d41c48fedef610dcdf46f88462a12c7e Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Fri, 20 Sep 2024 09:39:29 +0200 Subject: [PATCH 06/10] populate tx cache with mempool txs --- src/new_index/mempool.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/new_index/mempool.rs b/src/new_index/mempool.rs index 94ba7a41d..50d55d18f 100644 --- a/src/new_index/mempool.rs +++ b/src/new_index/mempool.rs @@ -309,6 +309,18 @@ impl Mempool { txids.push(txid); self.txstore.insert(txid, tx); } + + // Populate tx cache + let txid_misses = self.chain.txs_cache_miss(&txids); + let mut tx_misses = vec![]; + for txid in txid_misses { + if let Some(tx) = self.txstore.get(&txid) { + let bytes = serialize(tx); + tx_misses.push((txid, bytes)); + } + } + self.chain.add_txs_to_cache(&tx_misses); + // Phase 2: index history and spend edges (can fail if some txos cannot be found) let txos = match self.lookup_txos(self.get_prevouts(&txids)) { Ok(txos) => txos, From ece6e733b377f4ab906a5dcb19ab9665b5c3f644 Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Fri, 20 Sep 2024 10:08:52 +0200 Subject: [PATCH 07/10] enable compaction during initial sync flag --- src/config.rs | 11 +++++++++++ src/new_index/db.rs | 2 +- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/src/config.rs b/src/config.rs index 86d8f2bc3..60720f384 100644 --- a/src/config.rs +++ b/src/config.rs @@ -44,6 +44,12 @@ pub struct Config { /// Tx cache size in megabytes pub tx_cache_size: usize, + /// Enable compaction during initial sync + /// + /// By default compaction is off until initial sync is finished for performance reasons, + /// however, this requires much more disk space. + pub initial_sync_compaction: bool, + #[cfg(feature = "liquid")] pub parent_network: BNetwork, #[cfg(feature = "liquid")] @@ -199,6 +205,10 @@ impl Config { .long("tx-cache-size") .help("The amount of MB for a in-memory cache for transactions.") .default_value("1000") + ).arg( + Arg::with_name("initial_sync_compaction") + .long("initial-sync-compaction") + .help("Perform compaction during initial sync (slower but less disk space required)") ); #[cfg(unix)] @@ -412,6 +422,7 @@ impl Config { cors: m.value_of("cors").map(|s| s.to_string()), precache_scripts: m.value_of("precache_scripts").map(|s| s.to_string()), tx_cache_size: value_t_or_exit!(m, "tx_cache_size", usize), + initial_sync_compaction: m.is_present("initial_sync_compaction"), #[cfg(feature = "liquid")] parent_network, diff --git a/src/new_index/db.rs b/src/new_index/db.rs index f68da233c..77e1a66d4 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -90,7 +90,7 @@ impl DB { db_opts.set_compression_type(rocksdb::DBCompressionType::Snappy); db_opts.set_target_file_size_base(1_073_741_824); db_opts.set_write_buffer_size(256 << 20); - db_opts.set_disable_auto_compactions(true); // for initial bulk load + db_opts.set_disable_auto_compactions(!config.initial_sync_compaction); // for initial bulk load // db_opts.set_advise_random_on_open(???); db_opts.set_compaction_readahead_size(1 << 20); From 3ffbda004184798780fc7a8c36b6f85601ff2692 Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Fri, 20 Sep 2024 10:52:14 +0200 Subject: [PATCH 08/10] fix test --- tests/common.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/common.rs b/tests/common.rs index 23c7ce1e2..32026da7d 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -113,6 +113,8 @@ impl TestRunner { asset_db_path: None, // XXX #[cfg(feature = "liquid")] parent_network: bitcoin::Network::Regtest, + tx_cache_size: 100, + initial_sync_compaction: false, //#[cfg(feature = "electrum-discovery")] //electrum_public_hosts: Option, //#[cfg(feature = "electrum-discovery")] From 33a8d22a5db0a068acf591b22623ac9bd412bf58 Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Fri, 20 Sep 2024 11:47:48 +0200 Subject: [PATCH 09/10] improve warn message --- src/daemon.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/daemon.rs b/src/daemon.rs index 457bf4230..133896c66 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -424,7 +424,7 @@ impl Daemon { loop { match self.handle_request_batch(method, params_list) { Err(Error(ErrorKind::Connection(msg), _)) => { - warn!("reconnecting to bitcoind: {}", msg); + warn!("reconnecting to bitcoind: {msg}\nmethod was:{method}\nwith_params:{params_list:?}"); self.signal.wait(Duration::from_secs(3), false)?; let mut conn = self.conn.lock().unwrap(); *conn = conn.reconnect()?; From 9753745f65a5b8bbafbe1fb77e96a0f307c87ebb Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Fri, 20 Sep 2024 12:14:17 +0200 Subject: [PATCH 10/10] improve logging during initial sync it is less interesting to see how many rows are written in the db and more interesting knowing the last height indexed --- src/new_index/db.rs | 2 +- src/new_index/fetch.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/new_index/db.rs b/src/new_index/db.rs index 77e1a66d4..b6617d425 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -154,7 +154,7 @@ impl DB { } pub fn write(&self, mut rows: Vec, flush: DBFlush) { - debug!( + log::trace!( "writing {} rows to {:?}, flush={:?}", rows.len(), self.db, diff --git a/src/new_index/fetch.rs b/src/new_index/fetch.rs index 11843f5d7..d7637ee5c 100644 --- a/src/new_index/fetch.rs +++ b/src/new_index/fetch.rs @@ -99,6 +99,7 @@ fn bitcoind_fetcher( sender .send(block_entries) .expect("failed to send fetched blocks"); + log::debug!("last fetch {:?}", entries.last()); } }), ))