From 72227edf985c2c28a542e7a7706412e43409f1c0 Mon Sep 17 00:00:00 2001 From: Gregory Hill Date: Tue, 8 Mar 2022 16:26:41 +0000 Subject: [PATCH] feat: integrate rocksdb to persist btc payments Signed-off-by: Gregory Hill --- Cargo.lock | 1 + vault/Cargo.toml | 1 + vault/src/execution.rs | 66 +++++++++++++++++++----------------------- vault/src/redeem.rs | 8 +++-- vault/src/refund.rs | 7 ++++- vault/src/replace.rs | 9 ++++-- vault/src/system.rs | 13 ++++++++- 7 files changed, 63 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2a7786312..e9e91787a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9599,6 +9599,7 @@ dependencies = [ "jsonrpc-core-client", "mockall", "parity-scale-codec", + "rocksdb", "runtime", "serde", "service", diff --git a/vault/Cargo.toml b/vault/Cargo.toml index df2fa36a0..7c67d887f 100644 --- a/vault/Cargo.toml +++ b/vault/Cargo.toml @@ -22,6 +22,7 @@ futures = "0.3.5" async-trait = "0.1.40" sha2 = "0.8.2" git-version = "0.3.4" +rocksdb = { version = "0.17", features = ["snappy"], default-features = false } tracing = { version = "0.1", features = ["log"] } tracing-subscriber = { version = "0.2.12", features = ["registry", "env-filter", "fmt"] } diff --git a/vault/src/execution.rs b/vault/src/execution.rs index 2b139c0e9..927cc1430 100644 --- a/vault/src/execution.rs +++ b/vault/src/execution.rs @@ -1,6 +1,6 @@ use crate::{error::Error, VaultIdManager}; use bitcoin::{ - BitcoinCoreApi, Transaction, TransactionExt, TransactionMetadata, BLOCK_INTERVAL as BITCOIN_BLOCK_INTERVAL, + BitcoinCoreApi, Hash, TransactionExt, TransactionMetadata, Txid, BLOCK_INTERVAL as BITCOIN_BLOCK_INTERVAL, }; use futures::{ stream::{self, StreamExt}, @@ -11,7 +11,7 @@ use runtime::{ InterBtcReplaceRequest, IssuePallet, RedeemPallet, RedeemRequestStatus, RefundPallet, ReplacePallet, ReplaceRequestStatus, RequestRefundEvent, SecurityPallet, UtilFuncs, VaultId, VaultRegistryPallet, H256, }; -use std::{collections::HashMap, convert::TryInto, time::Duration}; +use std::{collections::HashMap, convert::TryInto, sync::Arc, time::Duration}; use tokio::time::sleep; const ON_FORK_RETRY_DELAY: Duration = Duration::from_secs(10); @@ -180,6 +180,7 @@ impl Request { &self, parachain_rpc: P, btc_rpc: B, + rocksdb: Arc, num_confirmations: u32, ) -> Result<(), Error> { // ensure the deadline has not expired yet @@ -192,7 +193,13 @@ impl Request { } let tx_metadata = self - .transfer_btc(¶chain_rpc, btc_rpc, num_confirmations, self.vault_id.clone()) + .transfer_btc( + ¶chain_rpc, + btc_rpc, + rocksdb, + num_confirmations, + self.vault_id.clone(), + ) .await?; self.execute(parachain_rpc, tx_metadata).await } @@ -213,6 +220,7 @@ impl Request { &self, parachain_rpc: &P, btc_rpc: B, + rocksdb: Arc, num_confirmations: u32, vault_id: VaultId, ) -> Result { @@ -244,6 +252,7 @@ impl Request { }; let txid = btc_rpc.send_transaction(tx).await?; + rocksdb.put(self.hash, txid).expect("could not write to db"); loop { let tx_metadata = btc_rpc.wait_for_transaction_metadata(txid, num_confirmations).await?; @@ -310,7 +319,7 @@ impl Request { pub async fn execute_open_requests( parachain_rpc: InterBtcParachain, btc_rpc: VaultIdManager, - read_only_btc_rpc: B, + rocksdb: Arc, num_confirmations: u32, payment_margin: Duration, process_refunds: bool, @@ -364,25 +373,19 @@ pub async fn execute_open_requests>(); - // find the height of bitcoin chain corresponding to the earliest btc_height - let btc_start_height = match open_requests - .iter() - .map(|(_, request)| request.btc_height.unwrap_or(u32::MAX)) - .min() - { - Some(x) => x, - None => return Ok(()), // the iterator is empty so we have nothing to do - }; - - // iterate through transactions in reverse order, starting from those in the mempool - let mut transaction_stream = bitcoin::reverse_stream_transactions(&read_only_btc_rpc, btc_start_height).await?; - while let Some(result) = transaction_stream.next().await { - let tx = result?; - + for (hash, request) in open_requests.clone().into_iter() { // get the request this transaction corresponds to, if any - if let Some(request) = get_request_for_btc_tx(&tx, &open_requests) { + if let Ok(Some(raw_txid)) = rocksdb.get(&hash) { // remove request from the hashmap - open_requests.retain(|&key, _| key != request.hash); + open_requests.remove(&request.hash); + + let txid = match Txid::from_slice(&raw_txid) { + Ok(txid) => txid, + Err(err) => { + tracing::error!("Could not decode txid: {}", err); + continue; + } + }; tracing::info!( "{:?} request #{:?} has valid bitcoin payment - processing...", @@ -409,7 +412,7 @@ pub async fn execute_open_requests x, @@ -474,7 +478,10 @@ pub async fn execute_open_requests tracing::info!( "{:?} request #{:?} successfully executed", request.request_type, @@ -493,19 +500,6 @@ pub async fn execute_open_requests) -> Option { - let hash = tx.get_op_return()?; - let request = hash_map.get(&hash)?; - let paid_amount = tx.get_payment_amount_to(request.btc_address)?; - if paid_amount as u128 >= request.amount { - Some(request.clone()) - } else { - None - } -} - #[cfg(all(test, feature = "standalone-metadata"))] mod tests { use super::*; diff --git a/vault/src/redeem.rs b/vault/src/redeem.rs index 61197f945..c0053fb43 100644 --- a/vault/src/redeem.rs +++ b/vault/src/redeem.rs @@ -2,7 +2,7 @@ use crate::{execution::*, system::VaultIdManager}; use bitcoin::BitcoinCoreApi; use runtime::{InterBtcParachain, RedeemPallet, RequestRedeemEvent}; use service::Error as ServiceError; -use std::time::Duration; +use std::{sync::Arc, time::Duration}; /// Listen for RequestRedeemEvent directed at this vault; upon reception, transfer /// bitcoin and call execute_redeem @@ -16,6 +16,7 @@ use std::time::Duration; pub async fn listen_for_redeem_requests( parachain_rpc: InterBtcParachain, btc_rpc: VaultIdManager, + rocksdb: Arc, num_confirmations: u32, payment_margin: Duration, ) -> Result<(), ServiceError> { @@ -33,6 +34,7 @@ pub async fn listen_for_redeem_requests( parachain_rpc: InterBtcParachain, btc_rpc: VaultIdManager, + rocksdb: Arc, num_confirmations: u32, process_refunds: bool, ) -> Result<(), ServiceError> { @@ -37,12 +39,15 @@ pub async fn listen_for_refund_requests tracing::info!( diff --git a/vault/src/replace.rs b/vault/src/replace.rs index 38a440220..fb7919113 100644 --- a/vault/src/replace.rs +++ b/vault/src/replace.rs @@ -6,7 +6,7 @@ use runtime::{ RequestReplaceEvent, UtilFuncs, VaultId, VaultRegistryPallet, }; use service::Error as ServiceError; -use std::time::Duration; +use std::{sync::Arc, time::Duration}; /// Listen for AcceptReplaceEvent directed at this vault and continue the replacement /// procedure by transferring bitcoin and calling execute_replace @@ -19,11 +19,13 @@ use std::time::Duration; pub async fn listen_for_accept_replace( parachain_rpc: InterBtcParachain, btc_rpc: VaultIdManager, + rocksdb: Arc, num_confirmations: u32, payment_margin: Duration, ) -> Result<(), ServiceError> { let parachain_rpc = ¶chain_rpc; let btc_rpc = &btc_rpc; + let rocksdb = &rocksdb; parachain_rpc .on_event::( |event| async move { @@ -37,6 +39,7 @@ pub async fn listen_for_accept_replace, + + #[clap(long, default_value = ".vault_db")] + pub rocksdb_path: String, } async fn active_block_listener( @@ -324,10 +327,15 @@ impl VaultService { let startup_height = self.await_parachain_block().await?; + // persist request payments since checking mempool is unreliable + // we could use the `listtransactions` bitcoin rpc to fetch txs + // but this requires paging and may eventually be very large + let rocksdb = Arc::new(rocksdb::DB::open_default(self.config.rocksdb_path.clone()).expect("could not open db")); + let open_request_executor = execute_open_requests( self.btc_parachain.clone(), self.vault_id_manager.clone(), - walletless_btc_rpc.clone(), + rocksdb.clone(), num_confirmations, self.config.payment_margin_minutes, !self.config.no_auto_refund, @@ -416,6 +424,7 @@ impl VaultService { listen_for_accept_replace( self.btc_parachain.clone(), self.vault_id_manager.clone(), + rocksdb.clone(), num_confirmations, self.config.payment_margin_minutes, ), @@ -466,6 +475,7 @@ impl VaultService { listen_for_redeem_requests( self.btc_parachain.clone(), self.vault_id_manager.clone(), + rocksdb.clone(), num_confirmations, self.config.payment_margin_minutes, ), @@ -477,6 +487,7 @@ impl VaultService { listen_for_refund_requests( self.btc_parachain.clone(), self.vault_id_manager.clone(), + rocksdb.clone(), num_confirmations, !self.config.no_auto_refund, ),