From a393f3487c25e8182004b9352d6638bad6828c66 Mon Sep 17 00:00:00 2001 From: Gregory Hill Date: Wed, 27 Apr 2022 14:10:46 +0100 Subject: [PATCH] feat: integrate rocksdb to persist btc payments Signed-off-by: Gregory Hill --- Cargo.lock | 133 +++++++++++++------------ vault/Cargo.toml | 2 +- vault/src/error.rs | 5 + vault/src/execution.rs | 115 +++++++++++---------- vault/src/lib.rs | 1 + vault/src/redeem.rs | 20 +++- vault/src/refund.rs | 17 +++- vault/src/replace.rs | 19 +++- vault/src/storage.rs | 41 ++++++++ vault/src/system.rs | 13 ++- vault/tests/vault_integration_tests.rs | 35 +++++-- 11 files changed, 257 insertions(+), 144 deletions(-) create mode 100644 vault/src/storage.rs diff --git a/Cargo.lock b/Cargo.lock index a45cef3e5..62a3804ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -112,9 +112,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.56" +version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4361135be9122e0870de935d7c439aef945b9f9ddd4199a553b5270b49c82a27" +checksum = "08f9b8508dccb7687a1d6c4ce66b2b0ecef467c94667de27d8d7fe1f8d2a9cdc" [[package]] name = "approx" @@ -368,24 +368,24 @@ dependencies = [ [[package]] name = "backtrace" -version = "0.3.64" +version = "0.3.65" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e121dee8023ce33ab248d9ce1493df03c3b38a659b240096fcbd7048ff9c31f" +checksum = "11a17d453482a265fd5f8479f2a3f405566e6ca627837aaddb85af8b1ab8ef61" dependencies = [ "addr2line", "cc", "cfg-if 1.0.0", "libc", - "miniz_oxide 0.4.4", - "object", + "miniz_oxide", + "object 0.28.3", "rustc-demangle", ] [[package]] name = "base-x" -version = "0.2.8" +version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4521f3e3d031370679b3b140beb36dfe4801b09ac77e30c61941f97df3ef28b" +checksum = "dc19a4937b4fbd3fe3379793130e42060d10627a360f2127802b10b87e7baf74" [[package]] name = "base58" @@ -961,16 +961,16 @@ dependencies = [ [[package]] name = "clap" -version = "3.1.8" +version = "3.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71c47df61d9e16dc010b55dba1952a57d8c215dbb533fd13cdd13369aac73b1c" +checksum = "7c167e37342afc5f33fd87bbc870cedd020d2a6dffa05d45ccd9241fbdd146db" dependencies = [ "atty", "bitflags", "clap_derive", + "clap_lex", "indexmap", "lazy_static", - "os_str_bytes", "strsim", "termcolor", "textwrap", @@ -989,6 +989,15 @@ dependencies = [ "syn", ] +[[package]] +name = "clap_lex" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "189ddd3b5d32a70b35e7686054371742a937b0d99128e76dde6340210e966669" +dependencies = [ + "os_str_bytes", +] + [[package]] name = "concurrent-queue" version = "1.2.2" @@ -1877,7 +1886,7 @@ dependencies = [ "crc32fast", "libc", "libz-sys", - "miniz_oxide 0.5.1", + "miniz_oxide", ] [[package]] @@ -2415,9 +2424,9 @@ dependencies = [ [[package]] name = "gloo-timers" -version = "0.2.3" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d12a7f4e95cfe710f1d624fb1210b7d961a5fb05c4fd942f4feab06e61f590e" +checksum = "5fb7d06c1c8cc2a29bee7ec961009a0b2caa0793ee4900c2ffb348734ba1c8f9" dependencies = [ "futures-channel", "futures-core", @@ -2732,7 +2741,7 @@ dependencies = [ "hyper 0.14.18", "log 0.4.16", "rustls 0.20.4", - "rustls-native-certs 0.6.1", + "rustls-native-certs 0.6.2", "tokio", "tokio-rustls 0.23.3", "webpki-roots 0.22.3", @@ -3076,9 +3085,9 @@ dependencies = [ [[package]] name = "ipnet" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35e70ee094dc02fd9c13fdad4940090f22dbd6ac7c9e7094a46cf0232a50bc7c" +checksum = "879d54834c8c76457ef4293a689b2a8c59b076067ad77b15efafbb05f92a592b" [[package]] name = "issue" @@ -3321,7 +3330,7 @@ dependencies = [ "jsonrpsee-core", "jsonrpsee-types", "pin-project 1.0.10", - "rustls-native-certs 0.6.1", + "rustls-native-certs 0.6.2", "soketto", "thiserror", "tokio", @@ -3507,9 +3516,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.123" +version = "0.2.124" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb691a747a7ab48abc15c5b42066eaafde10dc427e3b6ee2a1cf43db04c763bd" +checksum = "21a41fed9d98f27ab1c6d161da622a4fa35e8a54a8adc24bbf3ddd0ef70b0e50" [[package]] name = "libloading" @@ -4329,16 +4338,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" -[[package]] -name = "miniz_oxide" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a92518e98c078586bc6c934028adcca4c92a53d6a958196de835170a01d84e4b" -dependencies = [ - "adler", - "autocfg", -] - [[package]] name = "miniz_oxide" version = "0.5.1" @@ -5062,6 +5061,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "object" +version = "0.28.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40bec70ba014595f99f7aa110b84331ffe1ee9aece7fe6f387cc7e3ecda4d456" +dependencies = [ + "memchr", +] + [[package]] name = "once_cell" version = "1.10.0" @@ -5242,9 +5250,6 @@ name = "os_str_bytes" version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e22443d1643a904602595ba1cd8f7d896afe56d26712531c5ff73a15b2fbf64" -dependencies = [ - "memchr", -] [[package]] name = "owning_ref" @@ -6356,9 +6361,9 @@ checksum = "60a357793950651c4ed0f3f52338f53b2f809f32d83a07f72909fa13e4c6c1e3" [[package]] name = "rayon" -version = "1.5.1" +version = "1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c06aca804d41dbc8ba42dfd964f0d01334eceb64314b9ecf7c5fad5188a06d90" +checksum = "fd249e82c21598a9a426a4e00dd7adc1d640b22445ec8545feef801d1a74c221" dependencies = [ "autocfg", "crossbeam-deque", @@ -6368,14 +6373,13 @@ dependencies = [ [[package]] name = "rayon-core" -version = "1.9.1" +version = "1.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d78120e2c850279833f1dd3582f730c4ab53ed95aeaaaa862a2a5c71b1656d8e" +checksum = "9f51245e1e62e1f1629cbfec37b5793bbabcaeb90f30e94d2ba03564687353e4" dependencies = [ "crossbeam-channel", "crossbeam-deque", "crossbeam-utils", - "lazy_static", "num_cpus", ] @@ -6439,18 +6443,18 @@ dependencies = [ [[package]] name = "ref-cast" -version = "1.0.6" +version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "300f2a835d808734ee295d45007adacb9ebb29dd3ae2424acfa17930cae541da" +checksum = "685d58625b6c2b83e4cc88a27c4bf65adb7b6b16dbdc413e515c9405b47432ab" dependencies = [ "ref-cast-impl", ] [[package]] name = "ref-cast-impl" -version = "1.0.6" +version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c38e3aecd2b21cb3959637b883bb3714bc7e43f0268b9a29d3743ee3e55cdd2" +checksum = "a043824e29c94169374ac5183ac0ed43f5724dc4556b19568007486bd840fa1f" dependencies = [ "proc-macro2", "quote", @@ -6842,9 +6846,9 @@ dependencies = [ [[package]] name = "rustls-native-certs" -version = "0.6.1" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ca9ebdfa27d3fc180e42879037b5338ab1c040c06affd00d8338598e7800943" +checksum = "0167bac7a9f490495f3c33013e7722b53cb087ecbe082fb0c6387c96f634ea50" dependencies = [ "openssl-probe", "rustls-pemfile", @@ -6854,9 +6858,9 @@ dependencies = [ [[package]] name = "rustls-pemfile" -version = "0.2.1" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5eebeaeb360c87bfb72e84abdb3447159c0eaececf1bef2aecd65a8be949d1c9" +checksum = "e7522c9de787ff061458fe9a829dc790a3f5b22dc571694fc5883f448b94d9a9" dependencies = [ "base64 0.13.0", ] @@ -7687,9 +7691,9 @@ dependencies = [ [[package]] name = "scale-info" -version = "2.1.0" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21fe4ca2b2295d5519b364579162b8e18f4902f6c9e8a975e7d05e7ef63622f5" +checksum = "8980cafbe98a7ee7a9cc16b32ebce542c77883f512d83fbf2ddc8f6a85ea74c9" dependencies = [ "bitvec", "cfg-if 1.0.0", @@ -7701,9 +7705,9 @@ dependencies = [ [[package]] name = "scale-info-derive" -version = "2.1.0" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e767e52f2d5b2e71e86b1c82976bf4c3acdef88d72af7fa3498cef31cd9708cb" +checksum = "4260c630e8a8a33429d1688eff2f163f24c65a4e1b1578ef6b565061336e4b6f" dependencies = [ "proc-macro-crate 1.1.3", "proc-macro2", @@ -9264,9 +9268,9 @@ dependencies = [ [[package]] name = "tinyvec" -version = "1.5.1" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c1c1d5a42b6245520c249549ec267180beaffcc0615401ac8e31853d4b6d8d2" +checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50" dependencies = [ "tinyvec_macros", ] @@ -9405,9 +9409,9 @@ dependencies = [ [[package]] name = "toml" -version = "0.5.8" +version = "0.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a31142970826733df8241ef35dc040ef98c679ab14d7c3e54d827099b3acecaa" +checksum = "8d82e1a7758622a465f8cee077614c73484dac5b836c02ff6a40d5d1010324d7" dependencies = [ "serde", ] @@ -9420,9 +9424,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" [[package]] name = "tracing" -version = "0.1.33" +version = "0.1.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80b9fa4360528139bc96100c160b7ae879f5567f49f1782b0b02035b0358ebf3" +checksum = "5d0ecdcb44a79f0fe9844f0c4f33a342cbcbb5117de8001e6ba0dc2351327d09" dependencies = [ "cfg-if 1.0.0", "log 0.4.16", @@ -9444,9 +9448,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.24" +version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90442985ee2f57c9e1b548ee72ae842f4a9a20e3f417cc38dbc5dc684d9bb4ee" +checksum = "f54c8ca710e81886d498c2fd3331b56c93aa248d49de2222ad2742247c60072f" dependencies = [ "lazy_static", "valuable", @@ -9464,9 +9468,9 @@ dependencies = [ [[package]] name = "tracing-log" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6923477a48e41c1951f1999ef8bb5a3023eb723ceadafe78ffb65dc366761e3" +checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922" dependencies = [ "lazy_static", "log 0.4.16", @@ -9811,6 +9815,7 @@ dependencies = [ "lazy_static", "mockall", "parity-scale-codec", + "rocksdb", "runtime", "serde", "service", @@ -10102,7 +10107,7 @@ dependencies = [ "lazy_static", "libc", "log 0.4.16", - "object", + "object 0.27.1", "paste", "psm", "rayon", @@ -10154,7 +10159,7 @@ dependencies = [ "gimli", "log 0.4.16", "more-asserts", - "object", + "object 0.27.1", "target-lexicon", "thiserror", "wasmparser", @@ -10173,7 +10178,7 @@ dependencies = [ "indexmap", "log 0.4.16", "more-asserts", - "object", + "object 0.27.1", "serde", "target-lexicon", "thiserror", @@ -10192,7 +10197,7 @@ dependencies = [ "bincode", "cfg-if 1.0.0", "gimli", - "object", + "object 0.27.1", "region", "rustix", "serde", diff --git a/vault/Cargo.toml b/vault/Cargo.toml index 4960ed5e9..c5f90ef77 100644 --- a/vault/Cargo.toml +++ b/vault/Cargo.toml @@ -24,7 +24,7 @@ futures = "0.3.5" async-trait = "0.1.40" sha2 = "0.8.2" git-version = "0.3.4" - +rocksdb = { version = "0.18", features = ["snappy"], default-features = false } lazy_static = "1.4" tracing = { version = "0.1", features = ["log"] } diff --git a/vault/src/error.rs b/vault/src/error.rs index f27a191c4..bc9432841 100644 --- a/vault/src/error.rs +++ b/vault/src/error.rs @@ -2,6 +2,7 @@ use bitcoin::Error as BitcoinError; use hex::FromHexError; use jsonrpc_core_client::RpcError; use parity_scale_codec::Error as CodecError; +use rocksdb::Error as RocksDbError; use runtime::{Error as RuntimeError, SubxtError}; use service::Error as ServiceError; use thiserror::Error; @@ -24,6 +25,8 @@ pub enum Error { TryIntoIntError(#[from] std::num::TryFromIntError), #[error("Deadline has expired")] DeadlineExpired, + #[error("Transaction not found")] + TransactionNotFound, #[error("ServiceError: {0}")] ServiceError(#[from] ServiceError), @@ -39,4 +42,6 @@ pub enum Error { SubxtError(#[from] SubxtError), #[error("CodecError: {0}")] CodecError(#[from] CodecError), + #[error("RocksDbError: {0}")] + RocksDbError(#[from] RocksDbError), } diff --git a/vault/src/execution.rs b/vault/src/execution.rs index f332b5e4c..0dd269408 100644 --- a/vault/src/execution.rs +++ b/vault/src/execution.rs @@ -1,7 +1,8 @@ -use crate::{error::Error, metrics::update_bitcoin_metrics, system::VaultData, VaultIdManager}; +use crate::{ + error::Error, metrics::update_bitcoin_metrics, storage::TransactionStore, system::VaultData, VaultIdManager, +}; use bitcoin::{ - BitcoinCoreApi, PartialAddress, Transaction, TransactionExt, TransactionMetadata, - BLOCK_INTERVAL as BITCOIN_BLOCK_INTERVAL, + BitcoinCoreApi, PartialAddress, TransactionExt, TransactionMetadata, BLOCK_INTERVAL as BITCOIN_BLOCK_INTERVAL, }; use futures::{ stream::{self, StreamExt}, @@ -13,7 +14,7 @@ use runtime::{ ReplaceRequestStatus, RequestRefundEvent, SecurityPallet, UtilFuncs, VaultId, VaultRegistryPallet, H256, }; use service::{spawn_cancelable, ShutdownSender}; -use std::{collections::HashMap, convert::TryInto, time::Duration}; +use std::{collections::HashMap, convert::TryInto, sync::Arc, time::Duration}; use tokio::time::sleep; const ON_FORK_RETRY_DELAY: Duration = Duration::from_secs(10); @@ -183,10 +184,12 @@ impl Request { + Clone + Send + Sync, + TS: TransactionStore, >( &self, parachain_rpc: P, vault: VaultData, + tx_store: Arc, num_confirmations: u32, ) -> Result<(), Error> { // ensure the deadline has not expired yet @@ -199,7 +202,13 @@ impl Request { } let tx_metadata = self - .transfer_btc(¶chain_rpc, &vault.btc_rpc, num_confirmations, self.vault_id.clone()) + .transfer_btc( + ¶chain_rpc, + &vault.btc_rpc, + tx_store, + num_confirmations, + self.vault_id.clone(), + ) .await?; update_bitcoin_metrics(&vault, tx_metadata.fee, self.fee_budget).await; self.execute(parachain_rpc, tx_metadata).await @@ -208,27 +217,31 @@ impl Request { /// Make a bitcoin transfer to fulfil the request #[tracing::instrument( name = "transfer_btc", - skip(self, parachain_rpc, btc_rpc), + skip(self, parachain_rpc, btc_rpc, tx_store), fields( request_type = ?self.request_type, request_id = ?self.hash, ) )] - async fn transfer_btc< - B: BitcoinCoreApi + Clone, - P: BtcRelayPallet + VaultRegistryPallet + UtilFuncs + Clone + Send + Sync, - >( + async fn transfer_btc( &self, parachain_rpc: &P, btc_rpc: &B, + tx_store: Arc, num_confirmations: u32, vault_id: VaultId, - ) -> Result { + ) -> Result + where + B: BitcoinCoreApi + Clone, + P: BtcRelayPallet + VaultRegistryPallet + UtilFuncs + Clone + Send + Sync, + TS: TransactionStore, + { let tx = btc_rpc .create_transaction(self.btc_address, self.amount as u64, Some(self.hash)) .await?; let recipient = tx.recipient.clone(); tracing::info!("Sending bitcoin to {}", recipient); + tx_store.put_tx(self.hash, tx.transaction.clone())?; let return_to_self_addresses = tx .transaction @@ -329,15 +342,19 @@ impl Request { /// Queries the parachain for open requests and executes them. It checks the /// bitcoin blockchain to see if a payment has already been made. -pub async fn execute_open_requests( +pub async fn execute_open_requests( shutdown_tx: ShutdownSender, parachain_rpc: InterBtcParachain, vault_id_manager: VaultIdManager, - read_only_btc_rpc: B, + tx_store: Arc, num_confirmations: u32, payment_margin: Duration, process_refunds: bool, -) -> Result<(), Error> { +) -> Result<(), Error> +where + B: BitcoinCoreApi + Clone + Send + Sync + 'static, + TS: TransactionStore + Send + Sync + 'static, +{ let parachain_rpc = ¶chain_rpc; let vault_id = parachain_rpc.get_account_id().clone(); @@ -387,31 +404,11 @@ pub async fn execute_open_requests>(); - // find the height of bitcoin chain corresponding to the earliest btc_height - let btc_start_height = match open_requests - .iter() - .map(|(_, request)| request.btc_height.unwrap_or(u32::MAX)) - .min() - { - Some(x) => x, - None => return Ok(()), // the iterator is empty so we have nothing to do - }; - - // iterate through transactions in reverse order, starting from those in the mempool - let mut transaction_stream = bitcoin::reverse_stream_transactions(&read_only_btc_rpc, btc_start_height).await?; - while let Some(result) = transaction_stream.next().await { - let tx = match result { - Ok(x) => x, - Err(e) => { - tracing::warn!("Failed to process transaction: {}", e); - continue; - } - }; - + for (hash, request) in open_requests.clone().into_iter() { // get the request this transaction corresponds to, if any - if let Some(request) = get_request_for_btc_tx(&tx, &open_requests) { + if let Ok(tx) = tx_store.get_tx(&hash) { // remove request from the hashmap - open_requests.retain(|&key, _| key != request.hash); + open_requests.remove(&request.hash); tracing::info!( "{:?} request #{:?} has valid bitcoin payment - processing...", @@ -485,6 +482,7 @@ pub async fn execute_open_requests x, @@ -503,7 +501,10 @@ pub async fn execute_open_requests tracing::info!( "{:?} request #{:?} successfully executed", request.request_type, @@ -522,19 +523,6 @@ pub async fn execute_open_requests) -> Option { - let hash = tx.get_op_return()?; - let request = hash_map.get(&hash)?; - let paid_amount = tx.get_payment_amount_to(request.btc_address)?; - if paid_amount as u128 >= request.amount { - Some(request.clone()) - } else { - None - } -} - #[cfg(all(test, feature = "standalone-metadata"))] mod tests { use crate::metrics::PerCurrencyMetrics; @@ -550,7 +538,10 @@ mod tests { InterBtcVault, StatusCode, Token, DOT, IBTC, }; use sp_core::H160; - use std::collections::BTreeSet; + use std::{ + collections::{BTreeSet, HashMap}, + sync::Mutex, + }; macro_rules! assert_ok { ( $x:expr $(,)? ) => { @@ -826,30 +817,34 @@ mod tests { #[tokio::test] async fn should_pay_and_execute_redeem_if_neither_parachain_nor_bitcoin_deadlines_expired() { let (request, parachain_rpc, btc_rpc) = should_pay_and_execute_with_deadlines(100, 50, 100, 50); + let tx_store = Arc::new(Mutex::new(HashMap::new())); - assert_ok!(request.pay_and_execute(parachain_rpc, btc_rpc, 6).await); + assert_ok!(request.pay_and_execute(parachain_rpc, btc_rpc, tx_store, 6).await); } #[tokio::test] async fn should_pay_and_execute_redeem_if_only_parachain_deadline_expired() { let (request, parachain_rpc, btc_rpc) = should_pay_and_execute_with_deadlines(100, 101, 100, 50); + let tx_store = Arc::new(Mutex::new(HashMap::new())); - assert_ok!(request.pay_and_execute(parachain_rpc, btc_rpc, 6).await); + assert_ok!(request.pay_and_execute(parachain_rpc, btc_rpc, tx_store, 6).await); } #[tokio::test] async fn should_pay_and_execute_redeem_if_only_bitcoin_deadline_expired() { let (request, parachain_rpc, btc_rpc) = should_pay_and_execute_with_deadlines(100, 50, 100, 101); + let tx_store = Arc::new(Mutex::new(HashMap::new())); - assert_ok!(request.pay_and_execute(parachain_rpc, btc_rpc, 6).await); + assert_ok!(request.pay_and_execute(parachain_rpc, btc_rpc, tx_store, 6).await); } #[tokio::test] async fn should_not_pay_and_execute_redeem_if_both_deadlines_expired() { let (request, parachain_rpc, btc_rpc) = should_pay_and_execute_with_deadlines(100, 101, 100, 101); + let tx_store = Arc::new(Mutex::new(HashMap::new())); assert_err!( - request.pay_and_execute(parachain_rpc, btc_rpc, 6).await, + request.pay_and_execute(parachain_rpc, btc_rpc, tx_store, 6).await, Error::DeadlineExpired ); } @@ -886,8 +881,10 @@ mod tests { metrics: PerCurrencyMetrics::dummy(), }; + let tx_store = Arc::new(Mutex::new(HashMap::new())); + assert_err!( - request.pay_and_execute(parachain_rpc, vault_data, 6).await, + request.pay_and_execute(parachain_rpc, vault_data, tx_store, 6).await, Error::DeadlineExpired ); } @@ -957,6 +954,8 @@ mod tests { metrics: PerCurrencyMetrics::dummy(), }; - assert_ok!(request.pay_and_execute(parachain_rpc, vault_data, 6).await); + let tx_store = Arc::new(Mutex::new(HashMap::new())); + + assert_ok!(request.pay_and_execute(parachain_rpc, vault_data, tx_store, 6).await); } } diff --git a/vault/src/lib.rs b/vault/src/lib.rs index dec684b39..8be3cfb96 100644 --- a/vault/src/lib.rs +++ b/vault/src/lib.rs @@ -11,6 +11,7 @@ mod redeem; mod refund; mod relay; mod replace; +mod storage; mod system; mod types; mod vaults; diff --git a/vault/src/redeem.rs b/vault/src/redeem.rs index a7007f8ed..03d03e65a 100644 --- a/vault/src/redeem.rs +++ b/vault/src/redeem.rs @@ -1,8 +1,10 @@ -use crate::{execution::*, metrics::publish_expected_bitcoin_balance, system::VaultIdManager}; +use crate::{ + execution::*, metrics::publish_expected_bitcoin_balance, storage::TransactionStore, system::VaultIdManager, +}; use bitcoin::BitcoinCoreApi; use runtime::{InterBtcParachain, RedeemPallet, RequestRedeemEvent}; use service::{spawn_cancelable, Error as ServiceError, ShutdownSender}; -use std::time::Duration; +use std::{sync::Arc, time::Duration}; /// Listen for RequestRedeemEvent directed at this vault; upon reception, transfer /// bitcoin and call execute_redeem @@ -13,13 +15,18 @@ use std::time::Duration; /// * `btc_rpc` - the bitcoin RPC handle /// * `network` - network the bitcoin network used (i.e. regtest/testnet/mainnet) /// * `num_confirmations` - the number of bitcoin confirmation to await -pub async fn listen_for_redeem_requests( +pub async fn listen_for_redeem_requests( shutdown_tx: ShutdownSender, parachain_rpc: InterBtcParachain, vault_id_manager: VaultIdManager, + tx_store: Arc, num_confirmations: u32, payment_margin: Duration, -) -> Result<(), ServiceError> { +) -> Result<(), ServiceError> +where + B: BitcoinCoreApi + Clone + Send + Sync + 'static, + TS: TransactionStore + Send + Sync + 'static, +{ parachain_rpc .on_event::( |event| async { @@ -36,6 +43,7 @@ pub async fn listen_for_redeem_requests( +pub async fn listen_for_refund_requests( shutdown_tx: ShutdownSender, parachain_rpc: InterBtcParachain, vault_id_manager: VaultIdManager, + tx_store: Arc, num_confirmations: u32, process_refunds: bool, -) -> Result<(), ServiceError> { +) -> Result<(), ServiceError> +where + B: BitcoinCoreApi + Clone + Send + Sync + 'static, + TS: TransactionStore + Send + Sync + 'static, +{ parachain_rpc .on_event::( |event| async { @@ -38,12 +44,15 @@ pub async fn listen_for_refund_requests tracing::info!( diff --git a/vault/src/replace.rs b/vault/src/replace.rs index e4aedb1c8..0580de8ec 100644 --- a/vault/src/replace.rs +++ b/vault/src/replace.rs @@ -1,6 +1,6 @@ use crate::{ cancellation::Event, error::Error, execution::Request, metrics::publish_expected_bitcoin_balance, - system::VaultIdManager, + storage::TransactionStore, system::VaultIdManager, }; use bitcoin::BitcoinCoreApi; use futures::{channel::mpsc::Sender, future::try_join3, SinkExt}; @@ -9,7 +9,7 @@ use runtime::{ RequestReplaceEvent, UtilFuncs, VaultId, VaultRegistryPallet, }; use service::{spawn_cancelable, Error as ServiceError, ShutdownSender}; -use std::time::Duration; +use std::{sync::Arc, time::Duration}; /// Listen for AcceptReplaceEvent directed at this vault and continue the replacement /// procedure by transferring bitcoin and calling execute_replace @@ -19,16 +19,22 @@ use std::time::Duration; /// * `parachain_rpc` - the parachain RPC handle /// * `btc_rpc` - the bitcoin RPC handle /// * `num_confirmations` - the number of bitcoin confirmation to await -pub async fn listen_for_accept_replace( +pub async fn listen_for_accept_replace( shutdown_tx: ShutdownSender, parachain_rpc: InterBtcParachain, vault_id_manager: VaultIdManager, + tx_store: Arc, num_confirmations: u32, payment_margin: Duration, -) -> Result<(), ServiceError> { +) -> Result<(), ServiceError> +where + B: BitcoinCoreApi + Clone + Send + Sync + 'static, + TS: TransactionStore + Send + Sync + 'static, +{ let parachain_rpc = ¶chain_rpc; let vault_id_manager = &vault_id_manager; let shutdown_tx = &shutdown_tx; + let tx_store = &tx_store; parachain_rpc .on_event::( |event| async move { @@ -44,6 +50,7 @@ pub async fn listen_for_accept_replace Result; + + /// Insert the transaction. + fn put_tx(&self, id: H256, tx: Transaction) -> Result<(), Error>; +} + +impl TransactionStore for Mutex> { + fn get_tx(&self, id: &H256) -> Result { + self.lock() + .unwrap() + .get(id) + .map(ToOwned::to_owned) + .ok_or(Error::TransactionNotFound) + } + + fn put_tx(&self, id: H256, tx: Transaction) -> Result<(), Error> { + let mut tx_store = self.lock().unwrap(); + tx_store.insert(id, tx); + Ok(()) + } +} + +impl TransactionStore for RocksDb { + fn get_tx(&self, id: &H256) -> Result { + let raw_tx = self.get(id)?.ok_or(Error::TransactionNotFound)?; + Ok(deserialize(&raw_tx).map_err(Into::::into)?) + } + + fn put_tx(&self, id: H256, tx: Transaction) -> Result<(), Error> { + self.put(id, serialize(&tx))?; + Ok(()) + } +} diff --git a/vault/src/system.rs b/vault/src/system.rs index 304afd320..18890cc4f 100644 --- a/vault/src/system.rs +++ b/vault/src/system.rs @@ -107,6 +107,9 @@ pub struct VaultServiceConfig { /// Defaults to the relay chain currency if not set. #[clap(long, parse(try_from_str = parse_collateral_currency))] pub collateral_currency_id: Option, + + #[clap(long, default_value = ".vault_db")] + pub rocksdb_path: String, } async fn active_block_listener( @@ -480,11 +483,16 @@ impl VaultService { let startup_height = self.await_parachain_block().await?; + // persist request payments since checking mempool is unreliable + // we could use the `listtransactions` bitcoin rpc to fetch txs + // but this requires paging and may eventually be very large + let rocksdb = Arc::new(rocksdb::DB::open_default(self.config.rocksdb_path.clone()).expect("could not open db")); + let open_request_executor = execute_open_requests( self.shutdown.clone(), self.btc_parachain.clone(), self.vault_id_manager.clone(), - self.btc_rpc_master_wallet.clone(), + rocksdb.clone(), num_confirmations, self.config.payment_margin_minutes, !self.config.no_auto_refund, @@ -556,6 +564,7 @@ impl VaultService { self.shutdown.clone(), self.btc_parachain.clone(), self.vault_id_manager.clone(), + rocksdb.clone(), num_confirmations, self.config.payment_margin_minutes, )), @@ -599,6 +608,7 @@ impl VaultService { self.shutdown.clone(), self.btc_parachain.clone(), self.vault_id_manager.clone(), + rocksdb.clone(), num_confirmations, self.config.payment_margin_minutes, )), @@ -609,6 +619,7 @@ impl VaultService { self.shutdown.clone(), self.btc_parachain.clone(), self.vault_id_manager.clone(), + rocksdb.clone(), num_confirmations, !self.config.no_auto_refund, )), diff --git a/vault/tests/vault_integration_tests.rs b/vault/tests/vault_integration_tests.rs index d6dc1e1f2..8a1f751f0 100644 --- a/vault/tests/vault_integration_tests.rs +++ b/vault/tests/vault_integration_tests.rs @@ -15,7 +15,11 @@ use runtime::{ }; use sp_core::{H160, H256}; use sp_keyring::AccountKeyring; -use std::{sync::Arc, time::Duration}; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, + time::Duration, +}; use vault::{self, Event as CancellationEvent, IssueRequests, VaultIdManager}; const TIMEOUT: Duration = Duration::from_secs(90); @@ -321,6 +325,7 @@ async fn test_redeem_succeeds() { let btc_rpcs = vec![(vault_id.clone(), btc_rpc.clone())].into_iter().collect(); let btc_rpc_master_wallet = btc_rpc.clone(); let vault_id_manager = VaultIdManager::from_map(vault_provider.clone(), btc_rpc_master_wallet, btc_rpcs); + let tx_store = Arc::new(Mutex::new(HashMap::new())); let issue_amount = 100000; let vault_collateral = @@ -345,6 +350,7 @@ async fn test_redeem_succeeds() { shutdown_tx, vault_provider.clone(), vault_id_manager, + tx_store, 0, Duration::from_secs(0), ), @@ -386,6 +392,7 @@ async fn test_replace_succeeds() { let old_btc_rpc_master_wallet = btc_rpc.clone(); let vault_id_manager = VaultIdManager::from_map(old_vault_provider.clone(), old_btc_rpc_master_wallet, btc_rpcs); + let tx_store = Arc::new(Mutex::new(HashMap::new())); let issue_amount = 100000; let vault_collateral = get_required_vault_collateral_for_issue( @@ -429,6 +436,7 @@ async fn test_replace_succeeds() { shutdown_tx.clone(), old_vault_provider.clone(), vault_id_manager.clone(), + tx_store, 0, Duration::from_secs(0), ), @@ -787,10 +795,17 @@ async fn test_refund_succeeds() { let btc_rpcs = vec![(vault_id.clone(), btc_rpc.clone())].into_iter().collect(); let btc_rpc_master_wallet = btc_rpc.clone(); let vault_id_manager = VaultIdManager::from_map(vault_provider.clone(), btc_rpc_master_wallet, btc_rpcs); + let tx_store = Arc::new(Mutex::new(HashMap::new())); let (shutdown_tx, _) = tokio::sync::broadcast::channel(16); - let refund_service = - vault::service::listen_for_refund_requests(shutdown_tx, vault_provider.clone(), vault_id_manager, 0, true); + let refund_service = vault::service::listen_for_refund_requests( + shutdown_tx, + vault_provider.clone(), + vault_id_manager, + tx_store, + 0, + true, + ); assert_ok!(sudo_provider.set_parachain_confirmations(0).await); @@ -868,10 +883,17 @@ async fn test_issue_overpayment_succeeds() { let btc_rpcs = vec![(vault_id.clone(), btc_rpc.clone())].into_iter().collect(); let btc_rpc_master_wallet = btc_rpc.clone(); let vault_id_manager = VaultIdManager::from_map(vault_provider.clone(), btc_rpc_master_wallet, btc_rpcs); + let tx_store = Arc::new(Mutex::new(HashMap::new())); let (shutdown_tx, _) = tokio::sync::broadcast::channel(16); - let refund_service = - vault::service::listen_for_refund_requests(shutdown_tx, vault_provider.clone(), vault_id_manager, 0, true); + let refund_service = vault::service::listen_for_refund_requests( + shutdown_tx, + vault_provider.clone(), + vault_id_manager, + tx_store, + 0, + true, + ); let issue_amount = 100000; let over_payment_factor = 3; @@ -1082,6 +1104,7 @@ async fn test_execute_open_requests_succeeds() { let btc_rpcs = vec![(vault_id.clone(), btc_rpc.clone())].into_iter().collect(); let btc_rpc_master_wallet = btc_rpc.clone(); let vault_id_manager = VaultIdManager::from_map(vault_provider.clone(), btc_rpc_master_wallet, btc_rpcs); + let tx_store = Arc::new(Mutex::new(HashMap::new())); let issue_amount = 100000; let vault_collateral = @@ -1135,7 +1158,7 @@ async fn test_execute_open_requests_succeeds() { shutdown_tx.clone(), vault_provider, vault_id_manager, - btc_rpc.clone(), + tx_store, 0, Duration::from_secs(0), true,