Skip to content

Commit

Permalink
refactor: check mempool and past blocks for payments as a fallback
Browse files Browse the repository at this point in the history
Signed-off-by: Gregory Hill <[email protected]>
  • Loading branch information
gregdhill committed May 11, 2022
1 parent 42f57fe commit 1bd6f00
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 37 deletions.
160 changes: 123 additions & 37 deletions vault/src/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
error::Error, metrics::update_bitcoin_metrics, storage::TransactionStore, system::VaultData, VaultIdManager,
};
use bitcoin::{
BitcoinCoreApi, LockedTransaction, PartialAddress, TransactionExt, TransactionMetadata,
BitcoinCoreApi, LockedTransaction, PartialAddress, Transaction, TransactionExt, TransactionMetadata, Txid,
BLOCK_INTERVAL as BITCOIN_BLOCK_INTERVAL,
};
use futures::{
Expand Down Expand Up @@ -172,6 +172,57 @@ impl Request {
}
}

async fn wait_and_execute<
B: BitcoinCoreApi + Clone + Send + Sync + 'static,
P: ReplacePallet
+ RefundPallet
+ BtcRelayPallet
+ RedeemPallet
+ SecurityPallet
+ VaultRegistryPallet
+ UtilFuncs
+ Clone
+ Send
+ Sync,
>(
&self,
parachain_rpc: P,
btc_rpc: B,
txid: Txid,
num_confirmations: u32,
) {
// Payment has been made, but it might not have been confirmed enough times yet
let tx_metadata = btc_rpc.wait_for_transaction_metadata(txid, num_confirmations).await;

match tx_metadata {
Ok(tx_metadata) => {
// we have enough btc confirmations, now make sure they have been relayed before we continue
if let Err(e) = parachain_rpc
.wait_for_block_in_relay(
H256Le::from_bytes_le(&tx_metadata.block_hash.to_vec()),
Some(num_confirmations),
)
.await
{
tracing::error!(
"Error while waiting for block inclusion for request #{}: {}",
self.hash,
e
);
// continue; try to execute anyway
}

match self.execute(parachain_rpc.clone(), tx_metadata).await {
Ok(_) => {
tracing::info!("Executed request #{:?}", self.hash);
}
Err(e) => tracing::error!("Failed to execute request #{}: {}", self.hash, e),
}
}
Err(e) => tracing::error!("Failed to confirm bitcoin transaction for request {}: {}", self.hash, e),
}
}

/// Makes the bitcoin transfer and executes the request
pub async fn pay_and_execute<
B: BitcoinCoreApi + Clone + Send + Sync + 'static,
Expand Down Expand Up @@ -355,6 +406,7 @@ pub async fn execute_open_requests<B, TS>(
shutdown_tx: ShutdownSender,
parachain_rpc: InterBtcParachain,
vault_id_manager: VaultIdManager<B>,
read_only_btc_rpc: B,
tx_store: Arc<TS>,
num_confirmations: u32,
payment_margin: Duration,
Expand Down Expand Up @@ -413,6 +465,7 @@ where
.map(|x| (x.hash, x))
.collect::<HashMap<_, _>>();

// 1. check tx store for request txs
for (hash, request) in open_requests.clone().into_iter() {
// get the request this transaction corresponds to, if any
if let Ok(tx) = tx_store.get_tx(&hash) {
Expand Down Expand Up @@ -448,48 +501,68 @@ where
// try sending but ignore the result as it may have already been processed
let _ = btc_rpc.send_transaction(locked_tx).await;

// Payment has been made, but it might not have been confirmed enough times yet
let tx_metadata = btc_rpc
.clone()
.wait_for_transaction_metadata(txid, num_confirmations)
request
.wait_and_execute(parachain_rpc, btc_rpc, txid, num_confirmations)
.await;
});
}
}

// find the height of bitcoin chain corresponding to the earliest btc_height
let btc_start_height = match open_requests
.iter()
.map(|(_, request)| request.btc_height.unwrap_or(u32::MAX))
.min()
{
Some(x) => x,
None => return Ok(()), // the iterator is empty so we have nothing to do
};

match tx_metadata {
Ok(tx_metadata) => {
// we have enough btc confirmations, now make sure they have been relayed before we continue
if let Err(e) = parachain_rpc
.wait_for_block_in_relay(
H256Le::from_bytes_le(&tx_metadata.block_hash.to_vec()),
Some(num_confirmations),
)
.await
{
tracing::error!(
"Error while waiting for block inclusion for request #{}: {}",
request.hash,
e
);
// continue; try to execute anyway
}

match request.execute(parachain_rpc.clone(), tx_metadata).await {
Ok(_) => {
tracing::info!("Executed request #{:?}", request.hash);
}
Err(e) => tracing::error!("Failed to execute request #{}: {}", request.hash, e),
}
// 2. fallback to mempool / blocks to find payments (for backward compatibility)
// iterate through transactions in reverse order, starting from those in the mempool
let mut transaction_stream = bitcoin::reverse_stream_transactions(&read_only_btc_rpc, btc_start_height).await?;
while let Some(result) = transaction_stream.next().await {
let tx = match result {
Ok(x) => x,
Err(e) => {
tracing::warn!("Failed to process transaction: {}", e);
continue;
}
};

// get the request this transaction corresponds to, if any
if let Some(request) = get_request_for_btc_tx(&tx, &open_requests) {
open_requests.remove(&request.hash);

tracing::info!(
"{:?} request #{:?} has valid bitcoin payment - processing...",
request.request_type,
request.hash
);
// start a new task to (potentially) await confirmation and to execute on the parachain
// make copies of the variables we move into the task
let parachain_rpc = parachain_rpc.clone();
let btc_rpc = vault_id_manager.clone();
spawn_cancelable(shutdown_tx.subscribe(), async move {
let btc_rpc = match btc_rpc.get_bitcoin_rpc(&request.vault_id).await {
Some(x) => x,
None => {
tracing::error!(
"Failed to fetch bitcoin rpc for vault {}",
request.vault_id.pretty_print()
);
return; // nothing we can do - bail
}
Err(e) => tracing::error!(
"Failed to confirm bitcoin transaction for request {}: {}",
request.hash,
e
),
}
};

request
.wait_and_execute(parachain_rpc, btc_rpc, tx.txid(), num_confirmations)
.await;
});
}
}

// All requests remaining in the hashmap did not have a bitcoin payment yet, so pay
// All requests remaining in the hashmap do not have a bitcoin payment yet, so pay
// and execute all of these
for (_, request) in open_requests {
// there are potentially a large number of open requests - pay and execute each
Expand Down Expand Up @@ -539,6 +612,19 @@ where
Ok(())
}

/// Get the Request from the hashmap that the given Transaction satisfies, based
/// on the OP_RETURN and the amount of btc that is transfered to the address
fn get_request_for_btc_tx(tx: &Transaction, hash_map: &HashMap<H256, Request>) -> Option<Request> {
let hash = tx.get_op_return()?;
let request = hash_map.get(&hash)?;
let paid_amount = tx.get_payment_amount_to(request.btc_address)?;
if paid_amount as u128 >= request.amount {
Some(request.clone())
} else {
None
}
}

#[cfg(all(test, feature = "standalone-metadata"))]
mod tests {
use crate::metrics::PerCurrencyMetrics;
Expand All @@ -547,7 +633,7 @@ mod tests {
use async_trait::async_trait;
use bitcoin::{
json, Amount, Block, BlockHash, BlockHeader, Error as BitcoinError, GetBlockResult, Network, PartialAddress,
PrivateKey, Transaction, TransactionMetadata, Txid, PUBLIC_KEY_SIZE,
PrivateKey, Transaction, TransactionMetadata, PUBLIC_KEY_SIZE,
};
use runtime::{
AccountId, BlockNumber, BtcPublicKey, CurrencyId, Error as RuntimeError, ErrorCode, InterBtcRichBlockHeader,
Expand Down
1 change: 1 addition & 0 deletions vault/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ impl VaultService {
self.shutdown.clone(),
self.btc_parachain.clone(),
self.vault_id_manager.clone(),
self.btc_rpc_master_wallet.clone(),
rocksdb.clone(),
num_confirmations,
self.config.payment_margin_minutes,
Expand Down

0 comments on commit 1bd6f00

Please sign in to comment.