From 18a64271e3a98103041e77af95d6517674532c91 Mon Sep 17 00:00:00 2001 From: Ross Savage Date: Sun, 10 Nov 2024 21:09:50 +0100 Subject: [PATCH] Monitor server lockup tx in thread to verify/claim --- lib/core/src/chain_swap.rs | 156 ++++++++++++++++++++++++++----------- lib/core/src/sdk.rs | 2 +- 2 files changed, 113 insertions(+), 45 deletions(-) diff --git a/lib/core/src/chain_swap.rs b/lib/core/src/chain_swap.rs index 5ed31c36..3a1ab372 100644 --- a/lib/core/src/chain_swap.rs +++ b/lib/core/src/chain_swap.rs @@ -70,17 +70,24 @@ impl ChainSwapHandler { pub(crate) async fn start(self: Arc, mut shutdown: watch::Receiver<()>) { let cloned = self.clone(); tokio::spawn(async move { - let mut rescan_interval = tokio::time::interval(Duration::from_secs(60 * 10)); - rescan_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - + let mut bitcoin_rescan_interval = tokio::time::interval(Duration::from_secs(60 * 10)); + let mut liquid_rescan_interval = tokio::time::interval(Duration::from_secs(60)); + bitcoin_rescan_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + liquid_rescan_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + loop { tokio::select! { - _ = rescan_interval.tick() => { - if let Err(e) = cloned.rescan_incoming_chain_swaps(false).await { - error!("Error checking incoming chain swaps: {e:?}"); + _ = bitcoin_rescan_interval.tick() => { + if let Err(e) = cloned.rescan_incoming_user_lockup_txs(false).await { + error!("Error checking incoming user txs: {e:?}"); + } + if let Err(e) = cloned.rescan_outgoing_claim_txs().await { + error!("Error checking outgoing server txs: {e:?}"); } - if let Err(e) = cloned.rescan_outgoing_chain_swaps().await { - error!("Error checking outgoing chain swaps: {e:?}"); + }, + _ = liquid_rescan_interval.tick() => { + if let Err(e) = cloned.rescan_incoming_server_lockup_txs().await { + error!("Error checking incoming server txs: {e:?}"); } }, _ = shutdown.changed() => { @@ -110,7 +117,7 @@ impl ChainSwapHandler { } } - pub(crate) async fn rescan_incoming_chain_swaps( + pub(crate) async fn rescan_incoming_user_lockup_txs( &self, ignore_monitoring_block_height: bool, ) -> Result<()> { @@ -122,16 +129,23 @@ impl ChainSwapHandler { .filter(|s| s.direction == Direction::Incoming) .collect(); info!( - "Rescanning {} incoming Chain Swap(s) at height {}", + "Rescanning {} incoming Chain Swap(s) user lockup txs at height {}", chain_swaps.len(), current_height ); for swap in chain_swaps { if let Err(e) = self - .rescan_incoming_chain_swap(&swap, current_height, ignore_monitoring_block_height) + .rescan_incoming_chain_swap_user_lockup_tx( + &swap, + current_height, + ignore_monitoring_block_height, + ) .await { - error!("Error rescanning incoming Chain Swap {}: {e:?}", swap.id); + error!( + "Error rescanning user lockup of incoming Chain Swap {}: {e:?}", + swap.id + ); } } Ok(()) @@ -142,7 +156,7 @@ impl ChainSwapHandler { /// - `current_height`: the tip /// - `ignore_monitoring_block_height`: if true, it rescans an expired swap even after the /// cutoff monitoring block height - async fn rescan_incoming_chain_swap( + async fn rescan_incoming_chain_swap_user_lockup_tx( &self, swap: &ChainSwap, current_height: u32, @@ -201,7 +215,59 @@ impl ChainSwapHandler { Ok(()) } - pub(crate) async fn rescan_outgoing_chain_swaps(&self) -> Result<()> { + pub(crate) async fn rescan_incoming_server_lockup_txs(&self) -> Result<()> { + let current_height = self.liquid_chain_service.lock().await.tip().await?; + let chain_swaps: Vec = self + .persister + .list_chain_swaps()? + .into_iter() + .filter(|s| { + s.direction == Direction::Incoming && s.state == Pending && s.claim_tx_id.is_none() + }) + .collect(); + info!( + "Rescanning {} incoming Chain Swap(s) server lockup txs at height {}", + chain_swaps.len(), + current_height + ); + for swap in chain_swaps { + if let Err(e) = self + .rescan_incoming_chain_swap_server_lockup_tx(&swap) + .await + { + error!( + "Error rescanning server lockup of incoming Chain Swap {}: {e:?}", + swap.id + ); + } + } + Ok(()) + } + + async fn rescan_incoming_chain_swap_server_lockup_tx(&self, swap: &ChainSwap) -> Result<()> { + let Some(tx_id) = swap.server_lockup_tx_id.clone() else { + // Skip the rescan if there is no server_lockup_tx_id yet + return Ok(()); + }; + let swap_id = &swap.id; + let swap_script = swap.get_claim_swap_script()?; + let script_history = self.fetch_liquid_script_history(&swap_script).await?; + let tx_history = script_history + .iter() + .find(|h| h.txid.to_hex().eq(&tx_id)) + .ok_or(anyhow!( + "Server lockup tx for incoming Chain Swap {swap_id} was not found, txid={tx_id}" + ))?; + if tx_history.height > 0 { + info!("Incoming Chain Swap {swap_id} server lockup tx is confirmed"); + self.claim(swap_id) + .await + .map_err(|e| anyhow!("Could not claim Chain Swap {swap_id}: {e:?}"))?; + } + Ok(()) + } + + pub(crate) async fn rescan_outgoing_claim_txs(&self) -> Result<()> { let current_height = self.bitcoin_chain_service.lock().await.tip()?.height as u32; let chain_swaps: Vec = self .persister @@ -212,19 +278,19 @@ impl ChainSwapHandler { }) .collect(); info!( - "Rescanning {} outgoing Chain Swap(s) at height {}", + "Rescanning {} outgoing Chain Swap(s) claim txs at height {}", chain_swaps.len(), current_height ); for swap in chain_swaps { - if let Err(e) = self.rescan_outgoing_chain_swap(&swap).await { + if let Err(e) = self.rescan_outgoing_chain_swap_claim_tx(&swap).await { error!("Error rescanning outgoing Chain Swap {}: {e:?}", swap.id); } } Ok(()) } - async fn rescan_outgoing_chain_swap(&self, swap: &ChainSwap) -> Result<()> { + async fn rescan_outgoing_chain_swap_claim_tx(&self, swap: &ChainSwap) -> Result<()> { if let Some(claim_address) = &swap.claim_address { let address = Address::from_str(claim_address)?; let claim_tx_id = swap.claim_tx_id.clone().ok_or(anyhow!("No claim tx id"))?; @@ -327,28 +393,31 @@ impl ChainSwapHandler { return Err(anyhow!("Could not verify user lockup transaction: {e}",)); } - if let Err(e) = self.verify_server_lockup_tx(swap, &transaction, true).await - { - warn!("Server lockup transaction for incoming Chain Swap {} could not be verified. txid: {}, err: {}", - swap.id, - transaction.id, - e); - return Err(anyhow!( - "Could not verify server lockup transaction {}: {e}", - transaction.id - )); - } + let verify_res = + self.verify_server_lockup_tx(swap, &transaction, true).await; - info!( - "Server lockup transaction was verified for incoming Chain Swap {}", - swap.id - ); + // Set the server_lockup_tx_id if it is verified or not. + // If it is not yet confirmed, then it will be claimed after confirmation + // in rescan_incoming_chain_swap_server_lockup_tx() self.update_swap_info(id, Pending, Some(&transaction.id), None, None, None) .await?; - self.claim(id).await.map_err(|e| { - error!("Could not cooperate Chain Swap {id} claim: {e}"); - anyhow!("Could not post claim details. Err: {e:?}") - })?; + + match verify_res { + Ok(_) => { + info!("Server lockup transaction was verified for incoming Chain Swap {}", swap.id); + self.claim(id).await.map_err(|e| { + error!("Could not cooperate Chain Swap {id} claim: {e}"); + anyhow!("Could not post claim details. Err: {e:?}") + })?; + } + Err(e) => { + warn!("Server lockup transaction for incoming Chain Swap {} could not be verified. txid: {}, err: {}", swap.id, transaction.id, e); + return Err(anyhow!( + "Could not verify server lockup transaction {}: {e}", + transaction.id + )); + } + } } Some(claim_tx_id) => { warn!("Claim tx for Chain Swap {id} was already broadcast: txid {claim_tx_id}") @@ -1238,9 +1307,10 @@ impl ChainSwapHandler { } async fn verify_user_lockup_tx(&self, chain_swap: &ChainSwap) -> Result { + let swap_script = chain_swap.get_lockup_swap_script()?; let script_history = match chain_swap.direction { - Direction::Incoming => self.fetch_incoming_user_script_history(chain_swap).await, - Direction::Outgoing => self.fetch_outgoing_user_script_history(chain_swap).await, + Direction::Incoming => self.fetch_bitcoin_script_history(&swap_script).await, + Direction::Outgoing => self.fetch_liquid_script_history(&swap_script).await, }?; match chain_swap.user_lockup_tx_id.clone() { @@ -1264,11 +1334,10 @@ impl ChainSwapHandler { } } - async fn fetch_incoming_user_script_history( + async fn fetch_bitcoin_script_history( &self, - chain_swap: &ChainSwap, + swap_script: &SwapScriptV2, ) -> Result> { - let swap_script = chain_swap.get_lockup_swap_script()?; let address = swap_script .as_bitcoin_script()? .to_address(self.config.network.as_bitcoin_chain()) @@ -1282,11 +1351,10 @@ impl ChainSwapHandler { .await } - async fn fetch_outgoing_user_script_history( + async fn fetch_liquid_script_history( &self, - chain_swap: &ChainSwap, + swap_script: &SwapScriptV2, ) -> Result> { - let swap_script = chain_swap.get_lockup_swap_script()?; let address = swap_script .as_liquid_script()? .to_address(self.config.network.into()) diff --git a/lib/core/src/sdk.rs b/lib/core/src/sdk.rs index b53edbb8..2bea05d6 100644 --- a/lib/core/src/sdk.rs +++ b/lib/core/src/sdk.rs @@ -1895,7 +1895,7 @@ impl LiquidSdk { /// is not necessary as it happens automatically in the background. pub async fn rescan_onchain_swaps(&self) -> SdkResult<()> { self.chain_swap_handler - .rescan_incoming_chain_swaps(true) + .rescan_incoming_user_lockup_txs(true) .await?; Ok(()) }