diff --git a/lib/core/src/chain_swap.rs b/lib/core/src/chain_swap.rs index e9aacd62..4285fba5 100644 --- a/lib/core/src/chain_swap.rs +++ b/lib/core/src/chain_swap.rs @@ -1,7 +1,7 @@ -use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::{str::FromStr, sync::Arc}; use anyhow::{anyhow, Result}; +use async_trait::async_trait; use boltz_client::{ boltz::{self}, swaps::boltz::{ChainSwapStates, CreateChainResponse, SwapUpdateTxDetails}, @@ -14,9 +14,9 @@ use lwk_wollet::{ hashes::hex::DisplayHex, History, }; -use tokio::sync::{broadcast, watch, Mutex}; -use tokio::time::MissedTickBehavior; +use tokio::sync::{broadcast, Mutex}; +use crate::model::BlockListener; use crate::{ chain::{bitcoin::BitcoinChainService, liquid::LiquidChainService}, ensure_sdk, @@ -46,6 +46,30 @@ pub(crate) struct ChainSwapHandler { subscription_notifier: broadcast::Sender, } +#[async_trait] +impl BlockListener for ChainSwapHandler { + async fn on_bitcoin_block(&self, height: u32) { + if let Err(e) = self.check_incoming_refunds(height).await { + warn!("Error checking incoming refunds: {e:?}"); + } + if let Err(e) = self.rescan_incoming_user_lockup_txs(height, false).await { + error!("Error rescanning incoming user txs: {e:?}"); + } + if let Err(e) = self.rescan_outgoing_claim_txs(height).await { + error!("Error rescanning outgoing server txs: {e:?}"); + } + } + + async fn on_liquid_block(&self, height: u32) { + if let Err(e) = self.check_outgoing_refunds(height).await { + warn!("Error checking outgoing refunds: {e:?}"); + } + if let Err(e) = self.rescan_incoming_server_lockup_txs(height).await { + error!("Error rescanning incoming server txs: {e:?}"); + } + } +} + impl ChainSwapHandler { pub(crate) fn new( config: Config, @@ -67,38 +91,6 @@ impl ChainSwapHandler { }) } - pub(crate) async fn start(self: Arc, mut shutdown: watch::Receiver<()>) { - let cloned = self.clone(); - tokio::spawn(async move { - let mut bitcoin_rescan_interval = tokio::time::interval(Duration::from_secs(60 * 10)); - let mut liquid_rescan_interval = tokio::time::interval(Duration::from_secs(60)); - bitcoin_rescan_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - liquid_rescan_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - - loop { - tokio::select! { - _ = bitcoin_rescan_interval.tick() => { - if let Err(e) = cloned.rescan_incoming_user_lockup_txs(false).await { - error!("Error checking incoming user txs: {e:?}"); - } - if let Err(e) = cloned.rescan_outgoing_claim_txs().await { - error!("Error checking outgoing server txs: {e:?}"); - } - }, - _ = liquid_rescan_interval.tick() => { - if let Err(e) = cloned.rescan_incoming_server_lockup_txs().await { - error!("Error checking incoming server txs: {e:?}"); - } - }, - _ = shutdown.changed() => { - info!("Received shutdown signal, exiting chain swap loop"); - return; - } - } - } - }); - } - pub(crate) fn subscribe_payment_updates(&self) -> broadcast::Receiver { self.subscription_notifier.subscribe() } @@ -119,9 +111,9 @@ impl ChainSwapHandler { pub(crate) async fn rescan_incoming_user_lockup_txs( &self, + height: u32, ignore_monitoring_block_height: bool, ) -> Result<()> { - let current_height = self.bitcoin_chain_service.lock().await.tip()?.height as u32; let chain_swaps: Vec = self .persister .list_chain_swaps()? @@ -131,13 +123,13 @@ impl ChainSwapHandler { info!( "Rescanning {} incoming Chain Swap(s) user lockup txs at height {}", chain_swaps.len(), - current_height + height ); for swap in chain_swaps { if let Err(e) = self .rescan_incoming_chain_swap_user_lockup_tx( &swap, - current_height, + height, ignore_monitoring_block_height, ) .await @@ -215,8 +207,7 @@ impl ChainSwapHandler { Ok(()) } - pub(crate) async fn rescan_incoming_server_lockup_txs(&self) -> Result<()> { - let current_height = self.liquid_chain_service.lock().await.tip().await?; + pub(crate) async fn rescan_incoming_server_lockup_txs(&self, height: u32) -> Result<()> { let chain_swaps: Vec = self .persister .list_chain_swaps()? @@ -228,7 +219,7 @@ impl ChainSwapHandler { info!( "Rescanning {} incoming Chain Swap(s) server lockup txs at height {}", chain_swaps.len(), - current_height + height ); for swap in chain_swaps { if let Err(e) = self @@ -267,8 +258,7 @@ impl ChainSwapHandler { Ok(()) } - pub(crate) async fn rescan_outgoing_claim_txs(&self) -> Result<()> { - let current_height = self.bitcoin_chain_service.lock().await.tip()?.height as u32; + pub(crate) async fn rescan_outgoing_claim_txs(&self, height: u32) -> Result<()> { let chain_swaps: Vec = self .persister .list_chain_swaps()? @@ -280,7 +270,7 @@ impl ChainSwapHandler { info!( "Rescanning {} outgoing Chain Swap(s) claim txs at height {}", chain_swaps.len(), - current_height + height ); for swap in chain_swaps { if let Err(e) = self.rescan_outgoing_chain_swap_claim_tx(&swap).await { @@ -1019,111 +1009,94 @@ impl ChainSwapHandler { Ok(refund_tx_id) } - async fn check_swap_expiry(&self, swap: &ChainSwap) -> Result { - let swap_creation_time = UNIX_EPOCH + Duration::from_secs(swap.created_at as u64); - let duration_since_creation_time = SystemTime::now().duration_since(swap_creation_time)?; - if duration_since_creation_time.as_secs() < 60 * 10 { - return Ok(false); - } - - match swap.direction { - Direction::Incoming => { - let swap_script = swap.get_lockup_swap_script()?.as_bitcoin_script()?; - let current_height = self.bitcoin_chain_service.lock().await.tip()?.height as u32; - let locktime_from_height = - LockTime::from_height(current_height).map_err(|e| PaymentError::Generic { - err: format!("Error getting locktime from height {current_height:?}: {e}",), - })?; - - info!("Checking Chain Swap {} expiration: locktime_from_height = {locktime_from_height:?}, swap_script.locktime = {:?}", swap.id, swap_script.locktime); - Ok(swap_script.locktime.is_implied_by(locktime_from_height)) - } - Direction::Outgoing => { - let swap_script = swap.get_lockup_swap_script()?.as_liquid_script()?; - let current_height = self.liquid_chain_service.lock().await.tip().await?; - let locktime_from_height = ElementsLockTime::from_height(current_height)?; - - info!("Checking Chain Swap {} expiration: locktime_from_height = {locktime_from_height:?}, swap_script.locktime = {:?}", swap.id, swap_script.locktime); - Ok(utils::is_locktime_expired( - locktime_from_height, - swap_script.locktime, - )) + pub(crate) async fn check_incoming_refunds(&self, height: u32) -> Result<(), PaymentError> { + // Get all pending incoming chain swaps with a user lockup tx and no refund tx + let pending_swaps: Vec = self + .persister + .list_pending_chain_swaps()? + .into_iter() + .filter(|s| { + s.direction == Direction::Incoming + && s.user_lockup_tx_id.is_some() + && s.refund_tx_id.is_none() + }) + .collect(); + for swap in pending_swaps { + let swap_script = swap.get_lockup_swap_script()?.as_bitcoin_script()?; + let locktime_from_height = + LockTime::from_height(height).map_err(|e| PaymentError::Generic { + err: format!("Error getting locktime from height {height:?}: {e}"), + })?; + info!("Checking Chain Swap {} expiration: locktime_from_height = {locktime_from_height:?}, swap_script.locktime = {:?}", swap.id, swap_script.locktime); + if swap_script.locktime.is_implied_by(locktime_from_height) { + let update_swap_info_res = self + .update_swap_info(&swap.id, Refundable, None, None, None, None) + .await; + if let Err(err) = update_swap_info_res { + warn!( + "Could not update incoming Chain swap {} information: {err:?}", + swap.id + ); + } } } + Ok(()) } - pub(crate) async fn track_refunds_and_refundables(&self) -> Result<(), PaymentError> { - let pending_swaps = self.persister.list_pending_chain_swaps()?; + pub(crate) async fn check_outgoing_refunds(&self, height: u32) -> Result<(), PaymentError> { + // Get all pending outgoing chain swaps with no refund tx + let pending_swaps: Vec = self + .persister + .list_pending_chain_swaps()? + .into_iter() + .filter(|s| s.direction == Direction::Outgoing && s.refund_tx_id.is_none()) + .collect(); for swap in pending_swaps { - if swap.refund_tx_id.is_some() { - continue; - } - - let has_swap_expired = self.check_swap_expiry(&swap).await.unwrap_or(false); - - if !has_swap_expired && swap.state == Pending { - continue; - } - - match swap.direction { - // Track refunds - Direction::Outgoing => { - let refund_tx_id_result: Result = match swap.state { - Pending => self.refund_outgoing_swap(&swap, false).await, - RefundPending => match has_swap_expired { - true => { - self.refund_outgoing_swap(&swap, true) - .or_else(|e| { - warn!("Failed to initiate cooperative refund, switching to non-cooperative: {e:?}"); - self.refund_outgoing_swap(&swap, false) - }) - .await - } - false => self.refund_outgoing_swap(&swap, true).await, - }, - _ => { - continue; + let swap_script = swap.get_lockup_swap_script()?.as_liquid_script()?; + let locktime_from_height = ElementsLockTime::from_height(height) + .map_err(|e| PaymentError::Generic { err: e.to_string() })?; + info!("Checking Chain Swap {} expiration: locktime_from_height = {locktime_from_height:?}, swap_script.locktime = {:?}", swap.id, swap_script.locktime); + let has_swap_expired = + utils::is_locktime_expired(locktime_from_height, swap_script.locktime); + if has_swap_expired || swap.state == RefundPending { + let refund_tx_id_res = match swap.state { + Pending => self.refund_outgoing_swap(&swap, false).await, + RefundPending => match has_swap_expired { + true => { + self.refund_outgoing_swap(&swap, true) + .or_else(|e| { + warn!("Failed to initiate cooperative refund, switching to non-cooperative: {e:?}"); + self.refund_outgoing_swap(&swap, false) + }) + .await } - }; - - if let Ok(refund_tx_id) = refund_tx_id_result { - let update_swap_info_result = self - .update_swap_info( - &swap.id, - RefundPending, - None, - None, - None, - Some(&refund_tx_id), - ) - .await; - if let Err(err) = update_swap_info_result { - warn!( - "Could not update outgoing Chain swap {} information, error: {err:?}", - swap.id - ); - }; + false => self.refund_outgoing_swap(&swap, true).await, + }, + _ => { + continue; } - } + }; - // Track refundables by verifying that the expiry has elapsed, and set the state of the incoming swap to `Refundable` - Direction::Incoming => { - if swap.user_lockup_tx_id.is_some() && has_swap_expired { - let update_swap_info_result = self - .update_swap_info(&swap.id, Refundable, None, None, None, None) - .await; - - if let Err(err) = update_swap_info_result { - warn!( - "Could not update Chain swap {} information, error: {err:?}", - swap.id - ); - } - } + if let Ok(refund_tx_id) = refund_tx_id_res { + let update_swap_info_res = self + .update_swap_info( + &swap.id, + RefundPending, + None, + None, + None, + Some(&refund_tx_id), + ) + .await; + if let Err(err) = update_swap_info_res { + warn!( + "Could not update outgoing Chain swap {} information: {err:?}", + swap.id + ); + }; } } } - Ok(()) } diff --git a/lib/core/src/model.rs b/lib/core/src/model.rs index 5abe63f8..41a2000e 100644 --- a/lib/core/src/model.rs +++ b/lib/core/src/model.rs @@ -1,7 +1,7 @@ use std::path::PathBuf; use anyhow::{anyhow, Result}; - +use async_trait::async_trait; use boltz_client::{ bitcoin::ScriptBuf, network::Chain, @@ -542,6 +542,13 @@ pub enum GetPaymentRequest { Lightning { payment_hash: String }, } +/// Trait that can be used to react to new blocks from Bitcoin and Liquid chains +#[async_trait] +pub(crate) trait BlockListener: Send + Sync { + async fn on_bitcoin_block(&self, height: u32); + async fn on_liquid_block(&self, height: u32); +} + // A swap enum variant #[derive(Clone, Debug)] pub(crate) enum Swap { diff --git a/lib/core/src/sdk.rs b/lib/core/src/sdk.rs index c55ba6d7..8cfdc677 100644 --- a/lib/core/src/sdk.rs +++ b/lib/core/src/sdk.rs @@ -250,23 +250,6 @@ impl LiquidSdk { /// /// Internal method. Should only be used as part of [LiquidSdk::start]. async fn start_background_tasks(self: &Arc) -> SdkResult<()> { - // Periodically run sync() in the background - let sdk_clone = self.clone(); - let mut shutdown_rx_sync_loop = self.shutdown_receiver.clone(); - tokio::spawn(async move { - loop { - _ = sdk_clone.sync().await; - - tokio::select! { - _ = tokio::time::sleep(Duration::from_secs(30)) => {} - _ = shutdown_rx_sync_loop.changed() => { - info!("Received shutdown signal, exiting periodic sync loop"); - return; - } - } - } - }); - let reconnect_handler = Box::new(SwapperReconnectHandler::new( self.persister.clone(), self.status_stream.clone(), @@ -275,12 +258,8 @@ impl LiquidSdk { .clone() .start(reconnect_handler, self.shutdown_receiver.clone()) .await; - self.chain_swap_handler - .clone() - .start(self.shutdown_receiver.clone()) - .await; + self.track_new_blocks().await; self.track_swap_updates().await; - self.track_pending_swaps().await; Ok(()) } @@ -303,6 +282,58 @@ impl LiquidSdk { Ok(()) } + async fn track_new_blocks(self: &Arc) { + let cloned = self.clone(); + tokio::spawn(async move { + let mut current_liquid_block: u32 = 0; + let mut current_bitcoin_block: u32 = 0; + let mut shutdown_receiver = cloned.shutdown_receiver.clone(); + let mut interval = tokio::time::interval(Duration::from_secs(30)); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + loop { + tokio::select! { + _ = interval.tick() => { + // Get the Liquid tip and process a new block + let liquid_tip_res = cloned.liquid_chain_service.lock().await.tip().await; + match liquid_tip_res { + Ok(height) => { + debug!("Got Liquid tip: {height}"); + if height > current_liquid_block { + // Sync on new Liquid block + _ = cloned.sync().await; + // Update swap handlers + cloned.chain_swap_handler.on_liquid_block(height).await; + cloned.send_swap_handler.on_liquid_block(height).await; + current_liquid_block = height; + } + }, + Err(e) => error!("Failed to fetch Liquid tip {e}"), + }; + // Get the Bitcoin tip and process a new block + let bitcoin_tip_res = cloned.bitcoin_chain_service.lock().await.tip().map(|tip| tip.height as u32); + match bitcoin_tip_res { + Ok(height) => { + debug!("Got Bitcoin tip: {height}"); + if height > current_bitcoin_block { + // Update swap handlers + cloned.chain_swap_handler.on_bitcoin_block(height).await; + cloned.send_swap_handler.on_bitcoin_block(height).await; + current_bitcoin_block = height; + } + }, + Err(e) => error!("Failed to fetch Bitcoin tip {e}"), + }; + } + + _ = shutdown_receiver.changed() => { + info!("Received shutdown signal, exiting track blocks loop"); + return; + } + } + } + }); + } + async fn track_swap_updates(self: &Arc) { let cloned = self.clone(); tokio::spawn(async move { @@ -361,31 +392,6 @@ impl LiquidSdk { }); } - async fn track_pending_swaps(self: &Arc) { - let cloned = self.clone(); - tokio::spawn(async move { - let mut shutdown_receiver = cloned.shutdown_receiver.clone(); - let mut interval = tokio::time::interval(Duration::from_secs(60)); - interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - loop { - tokio::select! { - _ = interval.tick() => { - if let Err(err) = cloned.send_swap_handler.track_refunds().await { - warn!("Could not refund expired swaps, error: {err:?}"); - } - if let Err(err) = cloned.chain_swap_handler.track_refunds_and_refundables().await { - warn!("Could not refund expired swaps, error: {err:?}"); - } - }, - _ = shutdown_receiver.changed() => { - info!("Received shutdown signal, exiting pending swaps loop"); - return; - } - } - } - }); - } - async fn notify_event_listeners(&self, e: SdkEvent) -> Result<()> { self.event_manager.notify(e).await; Ok(()) @@ -1966,8 +1972,14 @@ impl LiquidSdk { /// (within last [CHAIN_SWAP_MONITORING_PERIOD_BITCOIN_BLOCKS] blocks = ~30 days), calling this /// is not necessary as it happens automatically in the background. pub async fn rescan_onchain_swaps(&self) -> SdkResult<()> { + let height = self + .bitcoin_chain_service + .lock() + .await + .tip() + .map(|tip| tip.height as u32)?; self.chain_swap_handler - .rescan_incoming_user_lockup_txs(true) + .rescan_incoming_user_lockup_txs(height, true) .await?; Ok(()) } diff --git a/lib/core/src/send_swap.rs b/lib/core/src/send_swap.rs index f152ca38..380355c1 100644 --- a/lib/core/src/send_swap.rs +++ b/lib/core/src/send_swap.rs @@ -2,6 +2,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::{str::FromStr, sync::Arc}; use anyhow::{anyhow, Result}; +use async_trait::async_trait; use boltz_client::swaps::boltz; use boltz_client::swaps::{boltz::CreateSubmarineResponse, boltz::SubSwapStates}; use boltz_client::util::secrets::Preimage; @@ -14,7 +15,7 @@ use lwk_wollet::hashes::{sha256, Hash}; use tokio::sync::{broadcast, Mutex}; use crate::chain::liquid::LiquidChainService; -use crate::model::{Config, PaymentState::*, SendSwap}; +use crate::model::{BlockListener, Config, PaymentState::*, SendSwap}; use crate::prelude::{PaymentTxData, PaymentType, Swap}; use crate::swapper::Swapper; use crate::wallet::OnchainWallet; @@ -35,6 +36,17 @@ pub(crate) struct SendSwapHandler { subscription_notifier: broadcast::Sender, } +#[async_trait] +impl BlockListener for SendSwapHandler { + async fn on_bitcoin_block(&self, _height: u32) {} + + async fn on_liquid_block(&self, _height: u32) { + if let Err(err) = self.check_refunds().await { + warn!("Could not refund expired swaps, error: {err:?}"); + } + } +} + impl SendSwapHandler { pub(crate) fn new( config: Config, @@ -489,7 +501,7 @@ impl SendSwapHandler { // Attempts refunding all payments whose state is `RefundPending` and with no // refund_tx_id field present - pub(crate) async fn track_refunds(&self) -> Result<(), PaymentError> { + pub(crate) async fn check_refunds(&self) -> Result<(), PaymentError> { let pending_swaps = self.persister.list_pending_send_swaps()?; self.try_refund_all(&pending_swaps).await; Ok(())