Skip to content

Commit

Permalink
Track new blocks and check swaps
Browse files Browse the repository at this point in the history
  • Loading branch information
dangeross committed Nov 13, 2024
1 parent 7cbe176 commit facf3fd
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 189 deletions.
249 changes: 111 additions & 138 deletions lib/core/src/chain_swap.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{str::FromStr, sync::Arc};

use anyhow::{anyhow, Result};
use async_trait::async_trait;
use boltz_client::{
boltz::{self},
swaps::boltz::{ChainSwapStates, CreateChainResponse, SwapUpdateTxDetails},
Expand All @@ -14,9 +14,9 @@ use lwk_wollet::{
hashes::hex::DisplayHex,
History,
};
use tokio::sync::{broadcast, watch, Mutex};
use tokio::time::MissedTickBehavior;
use tokio::sync::{broadcast, Mutex};

use crate::model::BlockListener;
use crate::{
chain::{bitcoin::BitcoinChainService, liquid::LiquidChainService},
ensure_sdk,
Expand Down Expand Up @@ -46,6 +46,30 @@ pub(crate) struct ChainSwapHandler {
subscription_notifier: broadcast::Sender<String>,
}

#[async_trait]
impl BlockListener for ChainSwapHandler {
async fn on_bitcoin_block(&self, height: u32) {
if let Err(e) = self.check_incoming_refunds(height).await {
warn!("Error checking incoming refunds: {e:?}");
}
if let Err(e) = self.rescan_incoming_user_lockup_txs(height, false).await {
error!("Error rescanning incoming user txs: {e:?}");
}
if let Err(e) = self.rescan_outgoing_claim_txs(height).await {
error!("Error rescanning outgoing server txs: {e:?}");
}
}

async fn on_liquid_block(&self, height: u32) {
if let Err(e) = self.check_outgoing_refunds(height).await {
warn!("Error checking outgoing refunds: {e:?}");
}
if let Err(e) = self.rescan_incoming_server_lockup_txs(height).await {
error!("Error rescanning incoming server txs: {e:?}");
}
}
}

impl ChainSwapHandler {
pub(crate) fn new(
config: Config,
Expand All @@ -67,38 +91,6 @@ impl ChainSwapHandler {
})
}

pub(crate) async fn start(self: Arc<Self>, mut shutdown: watch::Receiver<()>) {
let cloned = self.clone();
tokio::spawn(async move {
let mut bitcoin_rescan_interval = tokio::time::interval(Duration::from_secs(60 * 10));
let mut liquid_rescan_interval = tokio::time::interval(Duration::from_secs(60));
bitcoin_rescan_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
liquid_rescan_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);

loop {
tokio::select! {
_ = bitcoin_rescan_interval.tick() => {
if let Err(e) = cloned.rescan_incoming_user_lockup_txs(false).await {
error!("Error checking incoming user txs: {e:?}");
}
if let Err(e) = cloned.rescan_outgoing_claim_txs().await {
error!("Error checking outgoing server txs: {e:?}");
}
},
_ = liquid_rescan_interval.tick() => {
if let Err(e) = cloned.rescan_incoming_server_lockup_txs().await {
error!("Error checking incoming server txs: {e:?}");
}
},
_ = shutdown.changed() => {
info!("Received shutdown signal, exiting chain swap loop");
return;
}
}
}
});
}

pub(crate) fn subscribe_payment_updates(&self) -> broadcast::Receiver<String> {
self.subscription_notifier.subscribe()
}
Expand All @@ -119,9 +111,9 @@ impl ChainSwapHandler {

pub(crate) async fn rescan_incoming_user_lockup_txs(
&self,
height: u32,
ignore_monitoring_block_height: bool,
) -> Result<()> {
let current_height = self.bitcoin_chain_service.lock().await.tip()?.height as u32;
let chain_swaps: Vec<ChainSwap> = self
.persister
.list_chain_swaps()?
Expand All @@ -131,13 +123,13 @@ impl ChainSwapHandler {
info!(
"Rescanning {} incoming Chain Swap(s) user lockup txs at height {}",
chain_swaps.len(),
current_height
height
);
for swap in chain_swaps {
if let Err(e) = self
.rescan_incoming_chain_swap_user_lockup_tx(
&swap,
current_height,
height,
ignore_monitoring_block_height,
)
.await
Expand Down Expand Up @@ -215,8 +207,7 @@ impl ChainSwapHandler {
Ok(())
}

pub(crate) async fn rescan_incoming_server_lockup_txs(&self) -> Result<()> {
let current_height = self.liquid_chain_service.lock().await.tip().await?;
pub(crate) async fn rescan_incoming_server_lockup_txs(&self, height: u32) -> Result<()> {
let chain_swaps: Vec<ChainSwap> = self
.persister
.list_chain_swaps()?
Expand All @@ -228,7 +219,7 @@ impl ChainSwapHandler {
info!(
"Rescanning {} incoming Chain Swap(s) server lockup txs at height {}",
chain_swaps.len(),
current_height
height
);
for swap in chain_swaps {
if let Err(e) = self
Expand Down Expand Up @@ -267,8 +258,7 @@ impl ChainSwapHandler {
Ok(())
}

pub(crate) async fn rescan_outgoing_claim_txs(&self) -> Result<()> {
let current_height = self.bitcoin_chain_service.lock().await.tip()?.height as u32;
pub(crate) async fn rescan_outgoing_claim_txs(&self, height: u32) -> Result<()> {
let chain_swaps: Vec<ChainSwap> = self
.persister
.list_chain_swaps()?
Expand All @@ -280,7 +270,7 @@ impl ChainSwapHandler {
info!(
"Rescanning {} outgoing Chain Swap(s) claim txs at height {}",
chain_swaps.len(),
current_height
height
);
for swap in chain_swaps {
if let Err(e) = self.rescan_outgoing_chain_swap_claim_tx(&swap).await {
Expand Down Expand Up @@ -1019,111 +1009,94 @@ impl ChainSwapHandler {
Ok(refund_tx_id)
}

async fn check_swap_expiry(&self, swap: &ChainSwap) -> Result<bool> {
let swap_creation_time = UNIX_EPOCH + Duration::from_secs(swap.created_at as u64);
let duration_since_creation_time = SystemTime::now().duration_since(swap_creation_time)?;
if duration_since_creation_time.as_secs() < 60 * 10 {
return Ok(false);
}

match swap.direction {
Direction::Incoming => {
let swap_script = swap.get_lockup_swap_script()?.as_bitcoin_script()?;
let current_height = self.bitcoin_chain_service.lock().await.tip()?.height as u32;
let locktime_from_height =
LockTime::from_height(current_height).map_err(|e| PaymentError::Generic {
err: format!("Error getting locktime from height {current_height:?}: {e}",),
})?;

info!("Checking Chain Swap {} expiration: locktime_from_height = {locktime_from_height:?}, swap_script.locktime = {:?}", swap.id, swap_script.locktime);
Ok(swap_script.locktime.is_implied_by(locktime_from_height))
}
Direction::Outgoing => {
let swap_script = swap.get_lockup_swap_script()?.as_liquid_script()?;
let current_height = self.liquid_chain_service.lock().await.tip().await?;
let locktime_from_height = ElementsLockTime::from_height(current_height)?;

info!("Checking Chain Swap {} expiration: locktime_from_height = {locktime_from_height:?}, swap_script.locktime = {:?}", swap.id, swap_script.locktime);
Ok(utils::is_locktime_expired(
locktime_from_height,
swap_script.locktime,
))
pub(crate) async fn check_incoming_refunds(&self, height: u32) -> Result<(), PaymentError> {
// Get all pending incoming chain swaps with a user lockup tx and no refund tx
let pending_swaps: Vec<ChainSwap> = self
.persister
.list_pending_chain_swaps()?
.into_iter()
.filter(|s| {
s.direction == Direction::Incoming
&& s.user_lockup_tx_id.is_some()
&& s.refund_tx_id.is_none()
})
.collect();
for swap in pending_swaps {
let swap_script = swap.get_lockup_swap_script()?.as_bitcoin_script()?;
let locktime_from_height =
LockTime::from_height(height).map_err(|e| PaymentError::Generic {
err: format!("Error getting locktime from height {height:?}: {e}"),
})?;
info!("Checking Chain Swap {} expiration: locktime_from_height = {locktime_from_height:?}, swap_script.locktime = {:?}", swap.id, swap_script.locktime);
if swap_script.locktime.is_implied_by(locktime_from_height) {
let update_swap_info_res = self
.update_swap_info(&swap.id, Refundable, None, None, None, None)
.await;
if let Err(err) = update_swap_info_res {
warn!(
"Could not update incoming Chain swap {} information: {err:?}",
swap.id
);
}
}
}
Ok(())
}

pub(crate) async fn track_refunds_and_refundables(&self) -> Result<(), PaymentError> {
let pending_swaps = self.persister.list_pending_chain_swaps()?;
pub(crate) async fn check_outgoing_refunds(&self, height: u32) -> Result<(), PaymentError> {
// Get all pending outgoing chain swaps with no refund tx
let pending_swaps: Vec<ChainSwap> = self
.persister
.list_pending_chain_swaps()?
.into_iter()
.filter(|s| s.direction == Direction::Outgoing && s.refund_tx_id.is_none())
.collect();
for swap in pending_swaps {
if swap.refund_tx_id.is_some() {
continue;
}

let has_swap_expired = self.check_swap_expiry(&swap).await.unwrap_or(false);

if !has_swap_expired && swap.state == Pending {
continue;
}

match swap.direction {
// Track refunds
Direction::Outgoing => {
let refund_tx_id_result: Result<String, PaymentError> = match swap.state {
Pending => self.refund_outgoing_swap(&swap, false).await,
RefundPending => match has_swap_expired {
true => {
self.refund_outgoing_swap(&swap, true)
.or_else(|e| {
warn!("Failed to initiate cooperative refund, switching to non-cooperative: {e:?}");
self.refund_outgoing_swap(&swap, false)
})
.await
}
false => self.refund_outgoing_swap(&swap, true).await,
},
_ => {
continue;
let swap_script = swap.get_lockup_swap_script()?.as_liquid_script()?;
let locktime_from_height = ElementsLockTime::from_height(height)
.map_err(|e| PaymentError::Generic { err: e.to_string() })?;
info!("Checking Chain Swap {} expiration: locktime_from_height = {locktime_from_height:?}, swap_script.locktime = {:?}", swap.id, swap_script.locktime);
let has_swap_expired =
utils::is_locktime_expired(locktime_from_height, swap_script.locktime);
if has_swap_expired || swap.state == RefundPending {
let refund_tx_id_res = match swap.state {
Pending => self.refund_outgoing_swap(&swap, false).await,
RefundPending => match has_swap_expired {
true => {
self.refund_outgoing_swap(&swap, true)
.or_else(|e| {
warn!("Failed to initiate cooperative refund, switching to non-cooperative: {e:?}");
self.refund_outgoing_swap(&swap, false)
})
.await
}
};

if let Ok(refund_tx_id) = refund_tx_id_result {
let update_swap_info_result = self
.update_swap_info(
&swap.id,
RefundPending,
None,
None,
None,
Some(&refund_tx_id),
)
.await;
if let Err(err) = update_swap_info_result {
warn!(
"Could not update outgoing Chain swap {} information, error: {err:?}",
swap.id
);
};
false => self.refund_outgoing_swap(&swap, true).await,
},
_ => {
continue;
}
}
};

// Track refundables by verifying that the expiry has elapsed, and set the state of the incoming swap to `Refundable`
Direction::Incoming => {
if swap.user_lockup_tx_id.is_some() && has_swap_expired {
let update_swap_info_result = self
.update_swap_info(&swap.id, Refundable, None, None, None, None)
.await;

if let Err(err) = update_swap_info_result {
warn!(
"Could not update Chain swap {} information, error: {err:?}",
swap.id
);
}
}
if let Ok(refund_tx_id) = refund_tx_id_res {
let update_swap_info_res = self
.update_swap_info(
&swap.id,
RefundPending,
None,
None,
None,
Some(&refund_tx_id),
)
.await;
if let Err(err) = update_swap_info_res {
warn!(
"Could not update outgoing Chain swap {} information: {err:?}",
swap.id
);
};
}
}
}

Ok(())
}

Expand Down
9 changes: 8 additions & 1 deletion lib/core/src/model.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::path::PathBuf;

use anyhow::{anyhow, Result};

use async_trait::async_trait;
use boltz_client::{
bitcoin::ScriptBuf,
network::Chain,
Expand Down Expand Up @@ -542,6 +542,13 @@ pub enum GetPaymentRequest {
Lightning { payment_hash: String },
}

/// Trait that can be used to react to new blocks from Bitcoin and Liquid chains
#[async_trait]
pub(crate) trait BlockListener: Send + Sync {
async fn on_bitcoin_block(&self, height: u32);
async fn on_liquid_block(&self, height: u32);
}

// A swap enum variant
#[derive(Clone, Debug)]
pub(crate) enum Swap {
Expand Down
Loading

0 comments on commit facf3fd

Please sign in to comment.