Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tx cache #110

Draft
wants to merge 10 commits into
base: new-index
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 59 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ keywords = ["bitcoin", "electrum", "server", "index", "database"]
documentation = "https://docs.rs/electrs/"
readme = "README.md"
edition = "2018"
default-run = "electrs"

[features]
liquid = ["elements"]
Expand Down Expand Up @@ -56,6 +57,7 @@ tokio = { version = "1", features = ["sync", "macros"] }

# optional dependencies for electrum-discovery
electrum-client = { version = "0.8", optional = true }
bitcoin_slices = { version = "0.8.0", features = ["slice_cache"] }


[dev-dependencies]
Expand Down
20 changes: 20 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ pub struct Config {
pub electrum_banner: String,
pub electrum_rpc_logging: Option<RpcLogging>,

/// Tx cache size in megabytes
pub tx_cache_size: usize,

/// Enable compaction during initial sync
///
/// By default compaction is off until initial sync is finished for performance reasons,
/// however, this requires much more disk space.
pub initial_sync_compaction: bool,

#[cfg(feature = "liquid")]
pub parent_network: BNetwork,
#[cfg(feature = "liquid")]
Expand Down Expand Up @@ -191,6 +200,15 @@ impl Config {
.long("electrum-rpc-logging")
.help(&rpc_logging_help)
.takes_value(true),
).arg(
Arg::with_name("tx_cache_size")
.long("tx-cache-size")
.help("The amount of MB for a in-memory cache for transactions.")
.default_value("1000")
).arg(
Arg::with_name("initial_sync_compaction")
.long("initial-sync-compaction")
.help("Perform compaction during initial sync (slower but less disk space required)")
);

#[cfg(unix)]
Expand Down Expand Up @@ -403,6 +421,8 @@ impl Config {
index_unspendables: m.is_present("index_unspendables"),
cors: m.value_of("cors").map(|s| s.to_string()),
precache_scripts: m.value_of("precache_scripts").map(|s| s.to_string()),
tx_cache_size: value_t_or_exit!(m, "tx_cache_size", usize),
initial_sync_compaction: m.is_present("initial_sync_compaction"),

#[cfg(feature = "liquid")]
parent_network,
Expand Down
2 changes: 1 addition & 1 deletion src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ impl Daemon {
loop {
match self.handle_request_batch(method, params_list) {
Err(Error(ErrorKind::Connection(msg), _)) => {
warn!("reconnecting to bitcoind: {}", msg);
warn!("reconnecting to bitcoind: {msg}\nmethod was:{method}\nwith_params:{params_list:?}");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to include this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, just a draft...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And by the way i was trying to undersand why was failing in my test run but the log, even changed, wasn't helping, in the end the error was "rpc queue limit reached"

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Were you running with #89? If so, note that it requires adjusting bitcoind's rpcworkqueue and rpcthreads options upwards.

self.signal.wait(Duration::from_secs(3), false)?;
let mut conn = self.conn.lock().unwrap();
*conn = conn.reconnect()?;
Expand Down
4 changes: 2 additions & 2 deletions src/new_index/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl DB {
db_opts.set_compression_type(rocksdb::DBCompressionType::Snappy);
db_opts.set_target_file_size_base(1_073_741_824);
db_opts.set_write_buffer_size(256 << 20);
db_opts.set_disable_auto_compactions(true); // for initial bulk load
db_opts.set_disable_auto_compactions(!config.initial_sync_compaction); // for initial bulk load

// db_opts.set_advise_random_on_open(???);
db_opts.set_compaction_readahead_size(1 << 20);
Expand Down Expand Up @@ -154,7 +154,7 @@ impl DB {
}

pub fn write(&self, mut rows: Vec<DBRow>, flush: DBFlush) {
debug!(
log::trace!(
"writing {} rows to {:?}, flush={:?}",
rows.len(),
self.db,
Expand Down
1 change: 1 addition & 0 deletions src/new_index/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ fn bitcoind_fetcher(
sender
.send(block_entries)
.expect("failed to send fetched blocks");
log::debug!("last fetch {:?}", entries.last());
}
}),
))
Expand Down
12 changes: 12 additions & 0 deletions src/new_index/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,18 @@ impl Mempool {
txids.push(txid);
self.txstore.insert(txid, tx);
}

// Populate tx cache
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an advantage to populating the cache with mempool transactions, which already are stored entirely in memory?

Also, it seems that the cache is only used when looking up on-chain txs (via ChainQuery), so the mempool transactions populated into the cache won't actually be used until they get confirmed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well when the mempool is evicted they can still be looked up from the tx cache

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but why store them while they're still in the mempool and not wait until they get confirmed (and cached naturally when they're used)?

Also its possible that by the time they confirm, they won't longer be in the FIFO cache

let txid_misses = self.chain.txs_cache_miss(&txids);
let mut tx_misses = vec![];
for txid in txid_misses {
if let Some(tx) = self.txstore.get(&txid) {
let bytes = serialize(tx);
tx_misses.push((txid, bytes));
}
}
self.chain.add_txs_to_cache(&tx_misses);

// Phase 2: index history and spend edges (can fail if some txos cannot be found)
let txos = match self.lookup_txos(self.get_prevouts(&txids)) {
Ok(txos) => txos,
Expand Down
54 changes: 51 additions & 3 deletions src/new_index/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ use bitcoin::hashes::sha256d::Hash as Sha256dHash;
#[cfg(not(feature = "liquid"))]
use bitcoin::merkle_tree::MerkleBlock;
use bitcoin::VarInt;
use bitcoin_slices::SliceCache;
use crypto::digest::Digest;
use crypto::sha2::Sha256;
use hex::FromHex;
use itertools::Itertools;
use prometheus::IntCounter;
use rayon::prelude::*;

#[cfg(not(feature = "liquid"))]
Expand All @@ -17,9 +19,12 @@ use elements::{
AssetId,
};

use std::collections::{BTreeSet, HashMap, HashSet};
use std::path::Path;
use std::sync::{Arc, RwLock};
use std::{
collections::{BTreeSet, HashMap, HashSet},
sync::Mutex,
};

use crate::chain::{
BlockHash, BlockHeader, Network, OutPoint, Script, Transaction, TxOut, Txid, Value,
Expand Down Expand Up @@ -198,6 +203,11 @@ pub struct ChainQuery {
light_mode: bool,
duration: HistogramVec,
network: Network,

/// By default 1GB of cached transactions, `Txid -> Transaction`
txs_cache: Mutex<SliceCache<Txid>>,
cache_hit: IntCounter,
cache_miss: IntCounter,
}

// TODO: &[Block] should be an iterator / a queue.
Expand Down Expand Up @@ -360,6 +370,9 @@ impl ChainQuery {
HistogramOpts::new("query_duration", "Index query duration (in seconds)"),
&["name"],
),
txs_cache: Mutex::new(SliceCache::new(config.tx_cache_size << 20)),
cache_hit: metrics.counter(MetricOpts::new("tx_cache_hit", "Tx cache Hit")),
cache_miss: metrics.counter(MetricOpts::new("tx_cache_miss", "Tx cache Miss")),
}
}

Expand Down Expand Up @@ -838,7 +851,16 @@ impl ChainQuery {
pub fn lookup_raw_txn(&self, txid: &Txid, blockhash: Option<&BlockHash>) -> Option<Bytes> {
let _timer = self.start_timer("lookup_raw_txn");

if self.light_mode {
if let Ok(cache) = self.txs_cache.lock() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it expected that locking may sometime fail and that the failure should be ignored?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think is expected to fail, but since here everything would work normally in case of failure (just perf degradation) I preferred this way

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A poisoned mutex is likely to indicate some underlying coding issue, wouldn't it be better to let it error visibly?

I would at least log a warn-level message instead of silently ignoring it, but generally my preference is the "fail fast" approach for unanticipated errors. Failing fast (and restarting) would also re-enable the tx cache, instead of having it continue with degraded performance (until the process eventually restarts for another reason).

if let Some(bytes) = cache.get(txid) {
self.cache_hit.inc();
return Some(bytes.to_vec());
} else {
self.cache_miss.inc();
}
}

let result = if self.light_mode {
let queried_blockhash =
blockhash.map_or_else(|| self.tx_confirming_block(txid).map(|b| b.hash), |_| None);
let blockhash = blockhash.or_else(|| queried_blockhash.as_ref())?;
Expand All @@ -848,9 +870,35 @@ impl ChainQuery {
.gettransaction_raw(txid, blockhash, false)
.ok()?;
let txhex = txval.as_str().expect("valid tx from bitcoind");
Some(Bytes::from_hex(txhex).expect("valid tx from bitcoind"))
let vec = Bytes::from_hex(txhex).expect("valid tx from bitcoind");

Some(vec)
} else {
self.store.txstore_db.get(&TxRow::key(&txid[..]))
};
if let Some(result) = result.as_ref() {
self.add_txs_to_cache(&[(*txid, result)]);
}
result
}

pub fn txs_cache_miss(&self, txids: &[Txid]) -> Vec<Txid> {
let mut result = vec![];
if let Ok(cache) = self.txs_cache.lock() {
for txid in txids {
if !cache.contains(txid) {
result.push(*txid);
}
}
}
result
}

pub fn add_txs_to_cache<T: AsRef<[u8]>>(&self, txs: &[(Txid, T)]) {
if let Ok(mut cache) = self.txs_cache.lock() {
for (txid, tx) in txs {
let _ = cache.insert(*txid, &tx.as_ref());
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ impl TestRunner {
asset_db_path: None, // XXX
#[cfg(feature = "liquid")]
parent_network: bitcoin::Network::Regtest,
tx_cache_size: 100,
initial_sync_compaction: false,
//#[cfg(feature = "electrum-discovery")]
//electrum_public_hosts: Option<crate::electrum::ServerHosts>,
//#[cfg(feature = "electrum-discovery")]
Expand Down
Loading