Skip to content

Commit

Permalink
Monitor server lockup tx in thread to verify/claim
Browse files Browse the repository at this point in the history
  • Loading branch information
dangeross committed Nov 10, 2024
1 parent df6ab51 commit 18a6427
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 45 deletions.
156 changes: 112 additions & 44 deletions lib/core/src/chain_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,24 @@ impl ChainSwapHandler {
pub(crate) async fn start(self: Arc<Self>, mut shutdown: watch::Receiver<()>) {
let cloned = self.clone();
tokio::spawn(async move {
let mut rescan_interval = tokio::time::interval(Duration::from_secs(60 * 10));
rescan_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);

let mut bitcoin_rescan_interval = tokio::time::interval(Duration::from_secs(60 * 10));
let mut liquid_rescan_interval = tokio::time::interval(Duration::from_secs(60));
bitcoin_rescan_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
liquid_rescan_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);

loop {
tokio::select! {
_ = rescan_interval.tick() => {
if let Err(e) = cloned.rescan_incoming_chain_swaps(false).await {
error!("Error checking incoming chain swaps: {e:?}");
_ = bitcoin_rescan_interval.tick() => {
if let Err(e) = cloned.rescan_incoming_user_lockup_txs(false).await {
error!("Error checking incoming user txs: {e:?}");
}
if let Err(e) = cloned.rescan_outgoing_claim_txs().await {
error!("Error checking outgoing server txs: {e:?}");
}
if let Err(e) = cloned.rescan_outgoing_chain_swaps().await {
error!("Error checking outgoing chain swaps: {e:?}");
},
_ = liquid_rescan_interval.tick() => {
if let Err(e) = cloned.rescan_incoming_server_lockup_txs().await {
error!("Error checking incoming server txs: {e:?}");
}
},
_ = shutdown.changed() => {
Expand Down Expand Up @@ -110,7 +117,7 @@ impl ChainSwapHandler {
}
}

pub(crate) async fn rescan_incoming_chain_swaps(
pub(crate) async fn rescan_incoming_user_lockup_txs(
&self,
ignore_monitoring_block_height: bool,
) -> Result<()> {
Expand All @@ -122,16 +129,23 @@ impl ChainSwapHandler {
.filter(|s| s.direction == Direction::Incoming)
.collect();
info!(
"Rescanning {} incoming Chain Swap(s) at height {}",
"Rescanning {} incoming Chain Swap(s) user lockup txs at height {}",
chain_swaps.len(),
current_height
);
for swap in chain_swaps {
if let Err(e) = self
.rescan_incoming_chain_swap(&swap, current_height, ignore_monitoring_block_height)
.rescan_incoming_chain_swap_user_lockup_tx(
&swap,
current_height,
ignore_monitoring_block_height,
)
.await
{
error!("Error rescanning incoming Chain Swap {}: {e:?}", swap.id);
error!(
"Error rescanning user lockup of incoming Chain Swap {}: {e:?}",
swap.id
);
}
}
Ok(())
Expand All @@ -142,7 +156,7 @@ impl ChainSwapHandler {
/// - `current_height`: the tip
/// - `ignore_monitoring_block_height`: if true, it rescans an expired swap even after the
/// cutoff monitoring block height
async fn rescan_incoming_chain_swap(
async fn rescan_incoming_chain_swap_user_lockup_tx(
&self,
swap: &ChainSwap,
current_height: u32,
Expand Down Expand Up @@ -201,7 +215,59 @@ impl ChainSwapHandler {
Ok(())
}

pub(crate) async fn rescan_outgoing_chain_swaps(&self) -> Result<()> {
pub(crate) async fn rescan_incoming_server_lockup_txs(&self) -> Result<()> {
let current_height = self.liquid_chain_service.lock().await.tip().await?;
let chain_swaps: Vec<ChainSwap> = self
.persister
.list_chain_swaps()?
.into_iter()
.filter(|s| {
s.direction == Direction::Incoming && s.state == Pending && s.claim_tx_id.is_none()
})
.collect();
info!(
"Rescanning {} incoming Chain Swap(s) server lockup txs at height {}",
chain_swaps.len(),
current_height
);
for swap in chain_swaps {
if let Err(e) = self
.rescan_incoming_chain_swap_server_lockup_tx(&swap)
.await
{
error!(
"Error rescanning server lockup of incoming Chain Swap {}: {e:?}",
swap.id
);
}
}
Ok(())
}

async fn rescan_incoming_chain_swap_server_lockup_tx(&self, swap: &ChainSwap) -> Result<()> {
let Some(tx_id) = swap.server_lockup_tx_id.clone() else {
// Skip the rescan if there is no server_lockup_tx_id yet
return Ok(());
};
let swap_id = &swap.id;
let swap_script = swap.get_claim_swap_script()?;
let script_history = self.fetch_liquid_script_history(&swap_script).await?;
let tx_history = script_history
.iter()
.find(|h| h.txid.to_hex().eq(&tx_id))
.ok_or(anyhow!(
"Server lockup tx for incoming Chain Swap {swap_id} was not found, txid={tx_id}"
))?;
if tx_history.height > 0 {
info!("Incoming Chain Swap {swap_id} server lockup tx is confirmed");
self.claim(swap_id)
.await
.map_err(|e| anyhow!("Could not claim Chain Swap {swap_id}: {e:?}"))?;
}
Ok(())
}

pub(crate) async fn rescan_outgoing_claim_txs(&self) -> Result<()> {
let current_height = self.bitcoin_chain_service.lock().await.tip()?.height as u32;
let chain_swaps: Vec<ChainSwap> = self
.persister
Expand All @@ -212,19 +278,19 @@ impl ChainSwapHandler {
})
.collect();
info!(
"Rescanning {} outgoing Chain Swap(s) at height {}",
"Rescanning {} outgoing Chain Swap(s) claim txs at height {}",
chain_swaps.len(),
current_height
);
for swap in chain_swaps {
if let Err(e) = self.rescan_outgoing_chain_swap(&swap).await {
if let Err(e) = self.rescan_outgoing_chain_swap_claim_tx(&swap).await {
error!("Error rescanning outgoing Chain Swap {}: {e:?}", swap.id);
}
}
Ok(())
}

async fn rescan_outgoing_chain_swap(&self, swap: &ChainSwap) -> Result<()> {
async fn rescan_outgoing_chain_swap_claim_tx(&self, swap: &ChainSwap) -> Result<()> {
if let Some(claim_address) = &swap.claim_address {
let address = Address::from_str(claim_address)?;
let claim_tx_id = swap.claim_tx_id.clone().ok_or(anyhow!("No claim tx id"))?;
Expand Down Expand Up @@ -327,28 +393,31 @@ impl ChainSwapHandler {
return Err(anyhow!("Could not verify user lockup transaction: {e}",));
}

if let Err(e) = self.verify_server_lockup_tx(swap, &transaction, true).await
{
warn!("Server lockup transaction for incoming Chain Swap {} could not be verified. txid: {}, err: {}",
swap.id,
transaction.id,
e);
return Err(anyhow!(
"Could not verify server lockup transaction {}: {e}",
transaction.id
));
}
let verify_res =
self.verify_server_lockup_tx(swap, &transaction, true).await;

info!(
"Server lockup transaction was verified for incoming Chain Swap {}",
swap.id
);
// Set the server_lockup_tx_id if it is verified or not.
// If it is not yet confirmed, then it will be claimed after confirmation
// in rescan_incoming_chain_swap_server_lockup_tx()
self.update_swap_info(id, Pending, Some(&transaction.id), None, None, None)
.await?;
self.claim(id).await.map_err(|e| {
error!("Could not cooperate Chain Swap {id} claim: {e}");
anyhow!("Could not post claim details. Err: {e:?}")
})?;

match verify_res {
Ok(_) => {
info!("Server lockup transaction was verified for incoming Chain Swap {}", swap.id);
self.claim(id).await.map_err(|e| {
error!("Could not cooperate Chain Swap {id} claim: {e}");
anyhow!("Could not post claim details. Err: {e:?}")
})?;
}
Err(e) => {
warn!("Server lockup transaction for incoming Chain Swap {} could not be verified. txid: {}, err: {}", swap.id, transaction.id, e);
return Err(anyhow!(
"Could not verify server lockup transaction {}: {e}",
transaction.id
));
}
}
}
Some(claim_tx_id) => {
warn!("Claim tx for Chain Swap {id} was already broadcast: txid {claim_tx_id}")
Expand Down Expand Up @@ -1238,9 +1307,10 @@ impl ChainSwapHandler {
}

async fn verify_user_lockup_tx(&self, chain_swap: &ChainSwap) -> Result<String> {
let swap_script = chain_swap.get_lockup_swap_script()?;
let script_history = match chain_swap.direction {
Direction::Incoming => self.fetch_incoming_user_script_history(chain_swap).await,
Direction::Outgoing => self.fetch_outgoing_user_script_history(chain_swap).await,
Direction::Incoming => self.fetch_bitcoin_script_history(&swap_script).await,
Direction::Outgoing => self.fetch_liquid_script_history(&swap_script).await,
}?;

match chain_swap.user_lockup_tx_id.clone() {
Expand All @@ -1264,11 +1334,10 @@ impl ChainSwapHandler {
}
}

async fn fetch_incoming_user_script_history(
async fn fetch_bitcoin_script_history(
&self,
chain_swap: &ChainSwap,
swap_script: &SwapScriptV2,
) -> Result<Vec<History>> {
let swap_script = chain_swap.get_lockup_swap_script()?;
let address = swap_script
.as_bitcoin_script()?
.to_address(self.config.network.as_bitcoin_chain())
Expand All @@ -1282,11 +1351,10 @@ impl ChainSwapHandler {
.await
}

async fn fetch_outgoing_user_script_history(
async fn fetch_liquid_script_history(
&self,
chain_swap: &ChainSwap,
swap_script: &SwapScriptV2,
) -> Result<Vec<History>> {
let swap_script = chain_swap.get_lockup_swap_script()?;
let address = swap_script
.as_liquid_script()?
.to_address(self.config.network.into())
Expand Down
2 changes: 1 addition & 1 deletion lib/core/src/sdk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1895,7 +1895,7 @@ impl LiquidSdk {
/// is not necessary as it happens automatically in the background.
pub async fn rescan_onchain_swaps(&self) -> SdkResult<()> {
self.chain_swap_handler
.rescan_incoming_chain_swaps(true)
.rescan_incoming_user_lockup_txs(true)
.await?;
Ok(())
}
Expand Down

0 comments on commit 18a6427

Please sign in to comment.