diff --git a/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt b/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt index b7ffc6f87..87ddd9fc4 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt @@ -11,10 +11,7 @@ import fr.acinq.lightning.channel.* import fr.acinq.lightning.channel.states.* import fr.acinq.lightning.crypto.noise.* import fr.acinq.lightning.db.* -import fr.acinq.lightning.payment.IncomingPaymentHandler -import fr.acinq.lightning.payment.OutgoingPaymentFailure -import fr.acinq.lightning.payment.OutgoingPaymentHandler -import fr.acinq.lightning.payment.PaymentRequest +import fr.acinq.lightning.payment.* import fr.acinq.lightning.serialization.Encryption.from import fr.acinq.lightning.serialization.Serialization.DeserializationResult import fr.acinq.lightning.transactions.Transactions @@ -411,6 +408,9 @@ class Peer( receiveLoop() // This suspends until the coroutines is cancelled or the socket is closed } + /** We try swapping funds in whenever one of those fields is updated. */ + data class TrySwapInFlow(val currentBlockHeight: Int, val walletState: WalletState, val feerate: FeeratePerKw, val liquidityPolicy: LiquidityPolicy) + /** * This function needs to be called after [Peer] is initialized, to start watching the swap-in wallet * and trigger swap-ins. @@ -418,23 +418,26 @@ class Peer( */ suspend fun startWatchSwapInWallet() { logger.info { "starting swap-in watch job" } - if (swapInJob != null) return - // wait to have a swap-in feerate available - logger.info { "waiting for feerates" } - swapInFeeratesFlow.filterNotNull().first() + if (swapInJob != null) { + logger.info { "swap-in watch job already started" } + return + } logger.info { "waiting for peer to be ready" } waitForPeerReady() swapInJob = launch { - swapInWallet.walletStateFlow.combine(currentTipFlow.filterNotNull()) { walletState, currentTip -> currentTip.first to walletState } - .filter { (_, walletState) -> walletState.consistent } - .collect { (currentBlockHeight, walletState) -> + swapInWallet.walletStateFlow + .filter { it.consistent } + .combine(currentTipFlow.filterNotNull()) { walletState, currentTip -> Pair(walletState, currentTip.first) } + .combine(swapInFeeratesFlow.filterNotNull()) { (walletState, currentTip), feerate -> Triple(walletState, currentTip, feerate) } + .combine(nodeParams.liquidityPolicy) { (walletState, currentTip, feerate), policy -> TrySwapInFlow(currentTip, walletState, feerate, policy) } + .collect { w -> // Local mutual close txs from pre-splice channels can be used as zero-conf inputs for swap-in to facilitate migration val mutualCloseTxs = channels.values .filterIsInstance() .filterNot { it.commitments.params.channelFeatures.hasFeature(Feature.DualFunding) } .flatMap { state -> state.mutualClosePublished.map { closingTx -> closingTx.tx.txid } } val trustedTxs = trustedSwapInTxs + mutualCloseTxs - swapInCommands.send(SwapInCommand.TrySwapIn(currentBlockHeight, walletState, walletParams.swapInParams, trustedTxs)) + swapInCommands.send(SwapInCommand.TrySwapIn(w.currentBlockHeight, w.walletState, walletParams.swapInParams, trustedTxs)) } } } @@ -768,6 +771,7 @@ class Peer( // MUST ONLY BE SET BY processEvent() private var peerConnection: PeerConnection? = null + @OptIn(ExperimentalCoroutinesApi::class) private suspend fun processEvent(cmd: PeerCommand, logger: MDCLogger) { when (cmd) { is Connected -> { @@ -1008,7 +1012,6 @@ class Peer( val (feerate, fee) = watcher.client.computeSpliceCpfpFeerate(channel.commitments, targetFeerate, spliceWeight = weight, logger) logger.info { "requesting splice-in using balance=${cmd.walletInputs.balance} feerate=$feerate fee=$fee" } - nodeParams.liquidityPolicy.value.maybeReject(cmd.walletInputs.balance.toMilliSatoshi(), fee.toMilliSatoshi(), LiquidityEvents.Source.OnChainWallet, logger)?.let { rejected -> logger.info { "rejecting splice: reason=${rejected.reason}" } nodeParams._nodeEvents.emit(rejected) @@ -1022,6 +1025,12 @@ class Peer( spliceOut = null, feerate = feerate ) + // If the splice fails, we immediately unlock the utxos to reuse them in the next attempt. + spliceCommand.replyTo.invokeOnCompletion { ex -> + if (ex == null && spliceCommand.replyTo.getCompleted() is ChannelCommand.Commitment.Splice.Response.Failure) { + swapInCommands.trySend(SwapInCommand.UnlockWalletInputs(cmd.walletInputs.map { it.outPoint }.toSet())) + } + } input.send(WrappedChannelCommand(channel.channelId, spliceCommand)) } else -> {