Skip to content

Commit

Permalink
feat: integrate rocksdb to persist btc payments
Browse files Browse the repository at this point in the history
Signed-off-by: Gregory Hill <[email protected]>
  • Loading branch information
gregdhill committed Mar 8, 2022
1 parent 1f41767 commit 72227ed
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 42 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vault/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ futures = "0.3.5"
async-trait = "0.1.40"
sha2 = "0.8.2"
git-version = "0.3.4"
rocksdb = { version = "0.17", features = ["snappy"], default-features = false }

tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.2.12", features = ["registry", "env-filter", "fmt"] }
Expand Down
66 changes: 30 additions & 36 deletions vault/src/execution.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{error::Error, VaultIdManager};
use bitcoin::{
BitcoinCoreApi, Transaction, TransactionExt, TransactionMetadata, BLOCK_INTERVAL as BITCOIN_BLOCK_INTERVAL,
BitcoinCoreApi, Hash, TransactionExt, TransactionMetadata, Txid, BLOCK_INTERVAL as BITCOIN_BLOCK_INTERVAL,
};
use futures::{
stream::{self, StreamExt},
Expand All @@ -11,7 +11,7 @@ use runtime::{
InterBtcReplaceRequest, IssuePallet, RedeemPallet, RedeemRequestStatus, RefundPallet, ReplacePallet,
ReplaceRequestStatus, RequestRefundEvent, SecurityPallet, UtilFuncs, VaultId, VaultRegistryPallet, H256,
};
use std::{collections::HashMap, convert::TryInto, time::Duration};
use std::{collections::HashMap, convert::TryInto, sync::Arc, time::Duration};
use tokio::time::sleep;

const ON_FORK_RETRY_DELAY: Duration = Duration::from_secs(10);
Expand Down Expand Up @@ -180,6 +180,7 @@ impl Request {
&self,
parachain_rpc: P,
btc_rpc: B,
rocksdb: Arc<rocksdb::DB>,
num_confirmations: u32,
) -> Result<(), Error> {
// ensure the deadline has not expired yet
Expand All @@ -192,7 +193,13 @@ impl Request {
}

let tx_metadata = self
.transfer_btc(&parachain_rpc, btc_rpc, num_confirmations, self.vault_id.clone())
.transfer_btc(
&parachain_rpc,
btc_rpc,
rocksdb,
num_confirmations,
self.vault_id.clone(),
)
.await?;
self.execute(parachain_rpc, tx_metadata).await
}
Expand All @@ -213,6 +220,7 @@ impl Request {
&self,
parachain_rpc: &P,
btc_rpc: B,
rocksdb: Arc<rocksdb::DB>,
num_confirmations: u32,
vault_id: VaultId,
) -> Result<TransactionMetadata, Error> {
Expand Down Expand Up @@ -244,6 +252,7 @@ impl Request {
};

let txid = btc_rpc.send_transaction(tx).await?;
rocksdb.put(self.hash, txid).expect("could not write to db");

loop {
let tx_metadata = btc_rpc.wait_for_transaction_metadata(txid, num_confirmations).await?;
Expand Down Expand Up @@ -310,7 +319,7 @@ impl Request {
pub async fn execute_open_requests<B: BitcoinCoreApi + Clone + Send + Sync + 'static>(
parachain_rpc: InterBtcParachain,
btc_rpc: VaultIdManager<B>,
read_only_btc_rpc: B,
rocksdb: Arc<rocksdb::DB>,
num_confirmations: u32,
payment_margin: Duration,
process_refunds: bool,
Expand Down Expand Up @@ -364,25 +373,19 @@ pub async fn execute_open_requests<B: BitcoinCoreApi + Clone + Send + Sync + 'st
.map(|x| (x.hash, x))
.collect::<HashMap<_, _>>();

// find the height of bitcoin chain corresponding to the earliest btc_height
let btc_start_height = match open_requests
.iter()
.map(|(_, request)| request.btc_height.unwrap_or(u32::MAX))
.min()
{
Some(x) => x,
None => return Ok(()), // the iterator is empty so we have nothing to do
};

// iterate through transactions in reverse order, starting from those in the mempool
let mut transaction_stream = bitcoin::reverse_stream_transactions(&read_only_btc_rpc, btc_start_height).await?;
while let Some(result) = transaction_stream.next().await {
let tx = result?;

for (hash, request) in open_requests.clone().into_iter() {
// get the request this transaction corresponds to, if any
if let Some(request) = get_request_for_btc_tx(&tx, &open_requests) {
if let Ok(Some(raw_txid)) = rocksdb.get(&hash) {
// remove request from the hashmap
open_requests.retain(|&key, _| key != request.hash);
open_requests.remove(&request.hash);

let txid = match Txid::from_slice(&raw_txid) {
Ok(txid) => txid,
Err(err) => {
tracing::error!("Could not decode txid: {}", err);
continue;
}
};

tracing::info!(
"{:?} request #{:?} has valid bitcoin payment - processing...",
Expand All @@ -409,7 +412,7 @@ pub async fn execute_open_requests<B: BitcoinCoreApi + Clone + Send + Sync + 'st
// Payment has been made, but it might not have been confirmed enough times yet
let tx_metadata = btc_rpc
.clone()
.wait_for_transaction_metadata(tx.txid(), num_confirmations)
.wait_for_transaction_metadata(txid, num_confirmations)
.await;

match tx_metadata {
Expand Down Expand Up @@ -456,6 +459,7 @@ pub async fn execute_open_requests<B: BitcoinCoreApi + Clone + Send + Sync + 'st
// make copies of the variables we move into the task
let parachain_rpc = parachain_rpc.clone();
let btc_rpc = btc_rpc.clone();
let rocksdb = rocksdb.clone();
tokio::spawn(async move {
let btc_rpc = match btc_rpc.get_bitcoin_rpc(&request.vault_id).await {
Some(x) => x,
Expand All @@ -474,7 +478,10 @@ pub async fn execute_open_requests<B: BitcoinCoreApi + Clone + Send + Sync + 'st
request.hash
);

match request.pay_and_execute(parachain_rpc, btc_rpc, num_confirmations).await {
match request
.pay_and_execute(parachain_rpc, btc_rpc, rocksdb, num_confirmations)
.await
{
Ok(_) => tracing::info!(
"{:?} request #{:?} successfully executed",
request.request_type,
Expand All @@ -493,19 +500,6 @@ pub async fn execute_open_requests<B: BitcoinCoreApi + Clone + Send + Sync + 'st
Ok(())
}

/// Get the Request from the hashmap that the given Transaction satisfies, based
/// on the OP_RETURN and the amount of btc that is transfered to the address
fn get_request_for_btc_tx(tx: &Transaction, hash_map: &HashMap<H256, Request>) -> Option<Request> {
let hash = tx.get_op_return()?;
let request = hash_map.get(&hash)?;
let paid_amount = tx.get_payment_amount_to(request.btc_address)?;
if paid_amount as u128 >= request.amount {
Some(request.clone())
} else {
None
}
}

#[cfg(all(test, feature = "standalone-metadata"))]
mod tests {
use super::*;
Expand Down
8 changes: 6 additions & 2 deletions vault/src/redeem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{execution::*, system::VaultIdManager};
use bitcoin::BitcoinCoreApi;
use runtime::{InterBtcParachain, RedeemPallet, RequestRedeemEvent};
use service::Error as ServiceError;
use std::time::Duration;
use std::{sync::Arc, time::Duration};

/// Listen for RequestRedeemEvent directed at this vault; upon reception, transfer
/// bitcoin and call execute_redeem
Expand All @@ -16,6 +16,7 @@ use std::time::Duration;
pub async fn listen_for_redeem_requests<B: BitcoinCoreApi + Clone + Send + Sync + 'static>(
parachain_rpc: InterBtcParachain,
btc_rpc: VaultIdManager<B>,
rocksdb: Arc<rocksdb::DB>,
num_confirmations: u32,
payment_margin: Duration,
) -> Result<(), ServiceError> {
Expand All @@ -33,6 +34,7 @@ pub async fn listen_for_redeem_requests<B: BitcoinCoreApi + Clone + Send + Sync
// by reference. Since spawn requires static lifetimes, we will need to capture the
// arguments by value rather than by reference, so clone these:
let parachain_rpc = parachain_rpc.clone();
let rocksdb = rocksdb.clone();
// Spawn a new task so that we handle these events concurrently
tokio::spawn(async move {
tracing::info!("Executing redeem #{:?}", event.redeem_id);
Expand All @@ -42,7 +44,9 @@ pub async fn listen_for_redeem_requests<B: BitcoinCoreApi + Clone + Send + Sync
parachain_rpc.get_redeem_request(event.redeem_id).await?,
payment_margin,
)?;
request.pay_and_execute(parachain_rpc, btc_rpc, num_confirmations).await
request
.pay_and_execute(parachain_rpc, btc_rpc, rocksdb, num_confirmations)
.await
}
.await;

Expand Down
7 changes: 6 additions & 1 deletion vault/src/refund.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{execution::*, system::VaultIdManager};
use bitcoin::BitcoinCoreApi;
use runtime::{InterBtcParachain, RequestRefundEvent};
use service::Error as ServiceError;
use std::sync::Arc;

/// Listen for RequestRefundEvent directed at this vault; upon reception, transfer
/// bitcoin and call execute_refund
Expand All @@ -16,6 +17,7 @@ use service::Error as ServiceError;
pub async fn listen_for_refund_requests<B: BitcoinCoreApi + Clone + Send + Sync + 'static>(
parachain_rpc: InterBtcParachain,
btc_rpc: VaultIdManager<B>,
rocksdb: Arc<rocksdb::DB>,
num_confirmations: u32,
process_refunds: bool,
) -> Result<(), ServiceError> {
Expand All @@ -37,12 +39,15 @@ pub async fn listen_for_refund_requests<B: BitcoinCoreApi + Clone + Send + Sync
// by reference. Since spawn requires static lifetimes, we will need to capture the
// arguments by value rather than by reference, so clone these:
let parachain_rpc = parachain_rpc.clone();
let rocksdb = rocksdb.clone();
// Spawn a new task so that we handle these events concurrently
tokio::spawn(async move {
tracing::info!("Executing refund #{:?}", event.refund_id);
// prepare the action that will be executed after the bitcoin transfer
let request = Request::from_refund_request_event(&event);
let result = request.pay_and_execute(parachain_rpc, btc_rpc, num_confirmations).await;
let result = request
.pay_and_execute(parachain_rpc, btc_rpc, rocksdb, num_confirmations)
.await;

match result {
Ok(_) => tracing::info!(
Expand Down
9 changes: 7 additions & 2 deletions vault/src/replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use runtime::{
RequestReplaceEvent, UtilFuncs, VaultId, VaultRegistryPallet,
};
use service::Error as ServiceError;
use std::time::Duration;
use std::{sync::Arc, time::Duration};

/// Listen for AcceptReplaceEvent directed at this vault and continue the replacement
/// procedure by transferring bitcoin and calling execute_replace
Expand All @@ -19,11 +19,13 @@ use std::time::Duration;
pub async fn listen_for_accept_replace<B: BitcoinCoreApi + Clone + Send + Sync + 'static>(
parachain_rpc: InterBtcParachain,
btc_rpc: VaultIdManager<B>,
rocksdb: Arc<rocksdb::DB>,
num_confirmations: u32,
payment_margin: Duration,
) -> Result<(), ServiceError> {
let parachain_rpc = &parachain_rpc;
let btc_rpc = &btc_rpc;
let rocksdb = &rocksdb;
parachain_rpc
.on_event::<AcceptReplaceEvent, _, _, _>(
|event| async move {
Expand All @@ -37,6 +39,7 @@ pub async fn listen_for_accept_replace<B: BitcoinCoreApi + Clone + Send + Sync +
// by reference. Since spawn requires static lifetimes, we will need to capture the
// arguments by value rather than by reference, so clone these:
let parachain_rpc = parachain_rpc.clone();
let rocksdb = rocksdb.clone();
// Spawn a new task so that we handle these events concurrently
tokio::spawn(async move {
tracing::info!("Executing accept replace #{:?}", event.replace_id);
Expand All @@ -47,7 +50,9 @@ pub async fn listen_for_accept_replace<B: BitcoinCoreApi + Clone + Send + Sync +
parachain_rpc.get_replace_request(event.replace_id).await?,
payment_margin,
)?;
request.pay_and_execute(parachain_rpc, btc_rpc, num_confirmations).await
request
.pay_and_execute(parachain_rpc, btc_rpc, rocksdb, num_confirmations)
.await
}
.await;

Expand Down
13 changes: 12 additions & 1 deletion vault/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ pub struct VaultServiceConfig {
/// Defaults to the relay chain currency if not set.
#[clap(long, parse(try_from_str = parse_collateral_currency))]
pub collateral_currency_id: Option<CurrencyId>,

#[clap(long, default_value = ".vault_db")]
pub rocksdb_path: String,
}

async fn active_block_listener(
Expand Down Expand Up @@ -324,10 +327,15 @@ impl VaultService {

let startup_height = self.await_parachain_block().await?;

// persist request payments since checking mempool is unreliable
// we could use the `listtransactions` bitcoin rpc to fetch txs
// but this requires paging and may eventually be very large
let rocksdb = Arc::new(rocksdb::DB::open_default(self.config.rocksdb_path.clone()).expect("could not open db"));

let open_request_executor = execute_open_requests(
self.btc_parachain.clone(),
self.vault_id_manager.clone(),
walletless_btc_rpc.clone(),
rocksdb.clone(),
num_confirmations,
self.config.payment_margin_minutes,
!self.config.no_auto_refund,
Expand Down Expand Up @@ -416,6 +424,7 @@ impl VaultService {
listen_for_accept_replace(
self.btc_parachain.clone(),
self.vault_id_manager.clone(),
rocksdb.clone(),
num_confirmations,
self.config.payment_margin_minutes,
),
Expand Down Expand Up @@ -466,6 +475,7 @@ impl VaultService {
listen_for_redeem_requests(
self.btc_parachain.clone(),
self.vault_id_manager.clone(),
rocksdb.clone(),
num_confirmations,
self.config.payment_margin_minutes,
),
Expand All @@ -477,6 +487,7 @@ impl VaultService {
listen_for_refund_requests(
self.btc_parachain.clone(),
self.vault_id_manager.clone(),
rocksdb.clone(),
num_confirmations,
!self.config.no_auto_refund,
),
Expand Down

0 comments on commit 72227ed

Please sign in to comment.