From 6b8162dade56b5d0735a64b499e0e5c71bd4a42c Mon Sep 17 00:00:00 2001 From: t-bast Date: Thu, 18 Jul 2024 18:04:30 +0200 Subject: [PATCH] Unlock swap-in utxos if initial channel creation fails Add a `replyTo` field when opening or accepting a channel. This is used for swaps, to free utxos in case a failure happens during funding. Without this mechanism, the user needs to restart the app to be able to reuse those utxos. Fixes #680 --- .../acinq/lightning/channel/ChannelCommand.kt | 66 +++++++------- .../acinq/lightning/channel/InteractiveTx.kt | 2 +- .../acinq/lightning/channel/states/Normal.kt | 36 ++++---- .../channel/states/WaitForAcceptChannel.kt | 16 +++- .../channel/states/WaitForFundingCreated.kt | 32 ++++++- .../lightning/channel/states/WaitForInit.kt | 1 + .../channel/states/WaitForOpenChannel.kt | 11 ++- .../kotlin/fr/acinq/lightning/io/Peer.kt | 85 ++++++++++++------- .../fr/acinq/lightning/channel/TestsHelper.kt | 29 ++++++- .../channel/states/QuiescenceTestsCommon.kt | 4 +- .../channel/states/SpliceTestsCommon.kt | 10 +-- .../states/WaitForAcceptChannelTestsCommon.kt | 16 ++-- .../WaitForFundingCreatedTestsCommon.kt | 14 ++- .../states/WaitForOpenChannelTestsCommon.kt | 10 ++- 14 files changed, 223 insertions(+), 109 deletions(-) diff --git a/src/commonMain/kotlin/fr/acinq/lightning/channel/ChannelCommand.kt b/src/commonMain/kotlin/fr/acinq/lightning/channel/ChannelCommand.kt index a4086d9b4..4e52aae8e 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/channel/ChannelCommand.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/channel/ChannelCommand.kt @@ -25,6 +25,7 @@ sealed class ChannelCommand { data object Disconnected : ChannelCommand() sealed class Init : ChannelCommand() { data class Initiator( + val replyTo: CompletableDeferred, val fundingAmount: Satoshi, val pushAmount: MilliSatoshi, val walletInputs: List, @@ -42,6 +43,7 @@ sealed class ChannelCommand { } data class NonInitiator( + val replyTo: CompletableDeferred, val temporaryChannelId: ByteVector32, val fundingAmount: Satoshi, val pushAmount: MilliSatoshi, @@ -88,7 +90,7 @@ sealed class ChannelCommand { data object CheckHtlcTimeout : Commitment() sealed class Splice : Commitment() { data class Request( - val replyTo: CompletableDeferred, + val replyTo: CompletableDeferred, val spliceIn: SpliceIn?, val spliceOut: SpliceOut?, val requestRemoteFunding: LiquidityAds.RequestFunding?, @@ -102,36 +104,6 @@ sealed class ChannelCommand { data class SpliceIn(val walletInputs: List, val pushAmount: MilliSatoshi = 0.msat) data class SpliceOut(val amount: Satoshi, val scriptPubKey: ByteVector) } - - sealed class Response { - /** - * This response doesn't fully guarantee that the splice will confirm, because our peer may potentially double-spend - * the splice transaction. Callers should wait for on-chain confirmations and handle double-spend events. - */ - data class Created( - val channelId: ByteVector32, - val fundingTxIndex: Long, - val fundingTxId: TxId, - val capacity: Satoshi, - val balance: MilliSatoshi, - val liquidityPurchase: LiquidityAds.Purchase?, - ) : Response() - - sealed class Failure : Response() { - data class InsufficientFunds(val balanceAfterFees: MilliSatoshi, val liquidityFees: MilliSatoshi, val currentFeeCredit: MilliSatoshi) : Failure() - data object InvalidSpliceOutPubKeyScript : Failure() - data object SpliceAlreadyInProgress : Failure() - data object ConcurrentRemoteSplice : Failure() - data object ChannelNotQuiescent : Failure() - data class InvalidLiquidityAds(val reason: ChannelException) : Failure() - data class FundingFailure(val reason: FundingContributionFailure) : Failure() - data object CannotStartSession : Failure() - data class InteractiveTxSessionFailed(val reason: InteractiveTxSessionAction.RemoteFailure) : Failure() - data class CannotCreateCommitTx(val reason: ChannelException) : Failure() - data class AbortedByPeer(val reason: String) : Failure() - data object Disconnected : Failure() - } - } } } @@ -144,4 +116,36 @@ sealed class ChannelCommand { data class GetHtlcInfosResponse(val revokedCommitTxId: TxId, val htlcInfos: List) : Closing() } // @formatter:on +} + +sealed class ChannelFundingResponse { + /** + * This response doesn't fully guarantee that the channel transaction will confirm, because our peer may potentially double-spend it. + * Callers should wait for on-chain confirmations and handle double-spend events. + */ + data class Success( + val channelId: ByteVector32, + val fundingTxIndex: Long, + val fundingTxId: TxId, + val capacity: Satoshi, + val balance: MilliSatoshi, + val liquidityPurchase: LiquidityAds.Purchase?, + ) : ChannelFundingResponse() + + sealed class Failure : ChannelFundingResponse() { + data class InsufficientFunds(val balanceAfterFees: MilliSatoshi, val liquidityFees: MilliSatoshi, val currentFeeCredit: MilliSatoshi) : Failure() + data object InvalidSpliceOutPubKeyScript : ChannelFundingResponse.Failure() + data object SpliceAlreadyInProgress : Failure() + data object ConcurrentRemoteSplice : Failure() + data object ChannelNotQuiescent : Failure() + data class InvalidChannelParameters(val reason: ChannelException) : Failure() + data class InvalidLiquidityAds(val reason: ChannelException) : Failure() + data class FundingFailure(val reason: FundingContributionFailure) : Failure() + data object CannotStartSession : Failure() + data class InteractiveTxSessionFailed(val reason: InteractiveTxSessionAction.RemoteFailure) : Failure() + data class CannotCreateCommitTx(val reason: ChannelException) : Failure() + data class AbortedByPeer(val reason: String) : Failure() + data class UnexpectedMessage(val msg: LightningMessage) : Failure() + data object Disconnected : Failure() + } } \ No newline at end of file diff --git a/src/commonMain/kotlin/fr/acinq/lightning/channel/InteractiveTx.kt b/src/commonMain/kotlin/fr/acinq/lightning/channel/InteractiveTx.kt index 83a426763..4c6ba4c1a 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/channel/InteractiveTx.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/channel/InteractiveTx.kt @@ -1201,7 +1201,7 @@ sealed class SpliceStatus { data class Requested(val command: ChannelCommand.Commitment.Splice.Request, val spliceInit: SpliceInit) : QuiescentSpliceStatus() /** We both agreed to splice and are building the splice transaction. */ data class InProgress( - val replyTo: CompletableDeferred?, + val replyTo: CompletableDeferred?, val spliceSession: InteractiveTxSession, val localPushAmount: MilliSatoshi, val remotePushAmount: MilliSatoshi, diff --git a/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Normal.kt b/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Normal.kt index 99f010425..951a26229 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Normal.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Normal.kt @@ -115,7 +115,7 @@ data class Normal( } else -> { logger.warning { "cannot initiate splice, another splice is already in progress" } - cmd.replyTo.complete(ChannelCommand.Commitment.Splice.Response.Failure.SpliceAlreadyInProgress) + cmd.replyTo.complete(ChannelFundingResponse.Failure.SpliceAlreadyInProgress) Pair(this@Normal, emptyList()) } } @@ -370,7 +370,7 @@ data class Normal( // We could keep track of our splice attempt and merge it with the remote splice instead of cancelling it. // But this is an edge case that should rarely occur, so it's probably not worth the additional complexity. logger.warning { "our peer initiated quiescence before us, cancelling our splice attempt" } - spliceStatus.command.replyTo.complete(ChannelCommand.Commitment.Splice.Response.Failure.ConcurrentRemoteSplice) + spliceStatus.command.replyTo.complete(ChannelFundingResponse.Failure.ConcurrentRemoteSplice) Pair(this@Normal.copy(spliceStatus = SpliceStatus.ReceivedStfu(cmd.message)), emptyList()) } is SpliceStatus.InitiatorQuiescent -> { @@ -404,12 +404,12 @@ data class Normal( val balanceAfterFees = parentCommitment.localCommit.spec.toLocal + fundingContribution.toMilliSatoshi() - liquidityFeesOwed if (balanceAfterFees < parentCommitment.localChannelReserve(commitments.params).max(commitTxFees)) { logger.warning { "cannot do splice: insufficient funds (balanceAfterFees=$balanceAfterFees, liquidityFees=$liquidityFees, feeCredit=${spliceStatus.command.currentFeeCredit})" } - spliceStatus.command.replyTo.complete(ChannelCommand.Commitment.Splice.Response.Failure.InsufficientFunds(balanceAfterFees, liquidityFees, spliceStatus.command.currentFeeCredit)) + spliceStatus.command.replyTo.complete(ChannelFundingResponse.Failure.InsufficientFunds(balanceAfterFees, liquidityFees, spliceStatus.command.currentFeeCredit)) val action = listOf(ChannelAction.Message.Send(TxAbort(channelId, InvalidSpliceRequest(channelId).message))) Pair(this@Normal.copy(spliceStatus = SpliceStatus.Aborted), action) } else if (spliceStatus.command.spliceOut?.scriptPubKey?.let { Helpers.Closing.isValidFinalScriptPubkey(it, allowAnySegwit = true) } == false) { logger.warning { "cannot do splice: invalid splice-out script" } - spliceStatus.command.replyTo.complete(ChannelCommand.Commitment.Splice.Response.Failure.InvalidSpliceOutPubKeyScript) + spliceStatus.command.replyTo.complete(ChannelFundingResponse.Failure.InvalidSpliceOutPubKeyScript) val action = listOf(ChannelAction.Message.Send(TxAbort(channelId, InvalidSpliceRequest(channelId).message))) Pair(this@Normal.copy(spliceStatus = SpliceStatus.Aborted), action) } else { @@ -427,7 +427,7 @@ data class Normal( } } else { logger.warning { "cannot initiate splice, channel not quiescent" } - spliceStatus.command.replyTo.complete(ChannelCommand.Commitment.Splice.Response.Failure.ChannelNotQuiescent) + spliceStatus.command.replyTo.complete(ChannelFundingResponse.Failure.ChannelNotQuiescent) val actions = buildList { add(ChannelAction.Message.Send(Warning(channelId, InvalidSpliceNotQuiescent(channelId).message))) add(ChannelAction.Disconnect) @@ -436,7 +436,7 @@ data class Normal( } } else { logger.warning { "concurrent stfu received and our peer is the channel initiator, cancelling our splice attempt" } - spliceStatus.command.replyTo.complete(ChannelCommand.Commitment.Splice.Response.Failure.ConcurrentRemoteSplice) + spliceStatus.command.replyTo.complete(ChannelFundingResponse.Failure.ConcurrentRemoteSplice) Pair(this@Normal.copy(spliceStatus = SpliceStatus.NonInitiatorQuiescent), emptyList()) } } @@ -525,7 +525,7 @@ data class Normal( )) { is Either.Left -> { logger.error { "rejecting liquidity proposal: ${liquidityPurchase.value.message}" } - spliceStatus.command.replyTo.complete(ChannelCommand.Commitment.Splice.Response.Failure.InvalidLiquidityAds(liquidityPurchase.value)) + spliceStatus.command.replyTo.complete(ChannelFundingResponse.Failure.InvalidLiquidityAds(liquidityPurchase.value)) Pair(this@Normal.copy(spliceStatus = SpliceStatus.Aborted), listOf(ChannelAction.Message.Send(TxAbort(channelId, liquidityPurchase.value.message)))) } is Either.Right -> { @@ -557,7 +557,7 @@ data class Normal( )) { is Either.Left -> { logger.error { "could not create splice contributions: ${fundingContributions.value}" } - spliceStatus.command.replyTo.complete(ChannelCommand.Commitment.Splice.Response.Failure.FundingFailure(fundingContributions.value)) + spliceStatus.command.replyTo.complete(ChannelFundingResponse.Failure.FundingFailure(fundingContributions.value)) Pair(this@Normal.copy(spliceStatus = SpliceStatus.Aborted), listOf(ChannelAction.Message.Send(TxAbort(channelId, ChannelFundingError(channelId).message)))) } is Either.Right -> { @@ -589,7 +589,7 @@ data class Normal( } else -> { logger.error { "could not start interactive-tx session: $interactiveTxAction" } - spliceStatus.command.replyTo.complete(ChannelCommand.Commitment.Splice.Response.Failure.CannotStartSession) + spliceStatus.command.replyTo.complete(ChannelFundingResponse.Failure.CannotStartSession) Pair(this@Normal.copy(spliceStatus = SpliceStatus.Aborted), listOf(ChannelAction.Message.Send(TxAbort(channelId, ChannelFundingError(channelId).message)))) } } @@ -629,7 +629,7 @@ data class Normal( when (signingSession) { is Either.Left -> { logger.error(signingSession.value) { "cannot initiate interactive-tx splice signing session" } - spliceStatus.replyTo?.complete(ChannelCommand.Commitment.Splice.Response.Failure.CannotCreateCommitTx(signingSession.value)) + spliceStatus.replyTo?.complete(ChannelFundingResponse.Failure.CannotCreateCommitTx(signingSession.value)) Pair(this@Normal.copy(spliceStatus = SpliceStatus.Aborted), listOf(ChannelAction.Message.Send(TxAbort(channelId, signingSession.value.message)))) } is Either.Right -> { @@ -639,7 +639,7 @@ data class Normal( // It is likely that we will restart before the transaction is confirmed, in which case we will lose the replyTo and the ability to notify the caller. // We should be able to resume the signing steps and complete the splice if we disconnect, so we optimistically notify the caller now. spliceStatus.replyTo?.complete( - ChannelCommand.Commitment.Splice.Response.Created( + ChannelFundingResponse.Success( channelId = channelId, fundingTxIndex = session.fundingTxIndex, fundingTxId = session.fundingTx.txId, @@ -660,7 +660,7 @@ data class Normal( } is InteractiveTxSessionAction.RemoteFailure -> { logger.warning { "interactive-tx failed: $interactiveTxAction" } - spliceStatus.replyTo?.complete(ChannelCommand.Commitment.Splice.Response.Failure.InteractiveTxSessionFailed(interactiveTxAction)) + spliceStatus.replyTo?.complete(ChannelFundingResponse.Failure.InteractiveTxSessionFailed(interactiveTxAction)) Pair(this@Normal.copy(spliceStatus = SpliceStatus.Aborted), listOf(ChannelAction.Message.Send(TxAbort(channelId, interactiveTxAction.toString())))) } } @@ -723,7 +723,7 @@ data class Normal( is TxAbort -> when (spliceStatus) { is SpliceStatus.Requested -> { logger.info { "our peer rejected our splice request: ascii='${cmd.message.toAscii()}' bin=${cmd.message.data}" } - spliceStatus.command.replyTo.complete(ChannelCommand.Commitment.Splice.Response.Failure.AbortedByPeer(cmd.message.toAscii())) + spliceStatus.command.replyTo.complete(ChannelFundingResponse.Failure.AbortedByPeer(cmd.message.toAscii())) val actions = buildList { add(ChannelAction.Message.Send(TxAbort(channelId, SpliceAborted(channelId).message))) addAll(endQuiescence()) @@ -732,7 +732,7 @@ data class Normal( } is SpliceStatus.InProgress -> { logger.info { "our peer aborted the splice attempt: ascii='${cmd.message.toAscii()}' bin=${cmd.message.data}" } - spliceStatus.replyTo?.complete(ChannelCommand.Commitment.Splice.Response.Failure.AbortedByPeer(cmd.message.toAscii())) + spliceStatus.replyTo?.complete(ChannelFundingResponse.Failure.AbortedByPeer(cmd.message.toAscii())) val actions = buildList { add(ChannelAction.Message.Send(TxAbort(channelId, SpliceAborted(channelId).message))) addAll(endQuiescence()) @@ -778,7 +778,7 @@ data class Normal( is CancelOnTheFlyFunding -> when (spliceStatus) { is SpliceStatus.Requested -> { logger.info { "our peer rejected our on-the-fly splice request: ascii='${cmd.message.toAscii()}'" } - spliceStatus.command.replyTo.complete(ChannelCommand.Commitment.Splice.Response.Failure.AbortedByPeer(cmd.message.toAscii())) + spliceStatus.command.replyTo.complete(ChannelFundingResponse.Failure.AbortedByPeer(cmd.message.toAscii())) Pair(this@Normal.copy(spliceStatus = SpliceStatus.None), endQuiescence()) } else -> { @@ -832,18 +832,18 @@ data class Normal( is SpliceStatus.None -> SpliceStatus.None is SpliceStatus.Aborted -> SpliceStatus.None is SpliceStatus.Requested -> { - spliceStatus.command.replyTo.complete(ChannelCommand.Commitment.Splice.Response.Failure.Disconnected) + spliceStatus.command.replyTo.complete(ChannelFundingResponse.Failure.Disconnected) SpliceStatus.None } is SpliceStatus.InProgress -> { - spliceStatus.replyTo?.complete(ChannelCommand.Commitment.Splice.Response.Failure.Disconnected) + spliceStatus.replyTo?.complete(ChannelFundingResponse.Failure.Disconnected) SpliceStatus.None } is SpliceStatus.WaitingForSigs -> spliceStatus is SpliceStatus.NonInitiatorQuiescent -> SpliceStatus.None is QuiescenceNegotiation.NonInitiator -> SpliceStatus.None is QuiescenceNegotiation.Initiator -> { - spliceStatus.command.replyTo.complete(ChannelCommand.Commitment.Splice.Response.Failure.Disconnected) + spliceStatus.command.replyTo.complete(ChannelFundingResponse.Failure.Disconnected) SpliceStatus.None } } diff --git a/src/commonMain/kotlin/fr/acinq/lightning/channel/states/WaitForAcceptChannel.kt b/src/commonMain/kotlin/fr/acinq/lightning/channel/states/WaitForAcceptChannel.kt index 0148a6011..db1287933 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/channel/states/WaitForAcceptChannel.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/channel/states/WaitForAcceptChannel.kt @@ -64,6 +64,7 @@ data class WaitForAcceptChannel( )) { is Either.Left -> { logger.error { "rejecting liquidity proposal: ${liquidityPurchase.value.message}" } + init.replyTo.complete(ChannelFundingResponse.Failure.InvalidLiquidityAds(liquidityPurchase.value)) Pair(Aborted, listOf(ChannelAction.Message.Send(Error(cmd.message.temporaryChannelId, liquidityPurchase.value.message)))) } is Either.Right -> when (val fundingContributions = FundingContributions.create( @@ -77,6 +78,7 @@ data class WaitForAcceptChannel( )) { is Either.Left -> { logger.error { "could not fund channel: ${fundingContributions.value}" } + init.replyTo.complete(ChannelFundingResponse.Failure.FundingFailure(fundingContributions.value)) Pair(Aborted, listOf(ChannelAction.Message.Send(Error(channelId, ChannelFundingError(channelId).message)))) } is Either.Right -> { @@ -94,6 +96,7 @@ data class WaitForAcceptChannel( when (interactiveTxAction) { is InteractiveTxSessionAction.SendMessage -> { val nextState = WaitForFundingCreated( + init.replyTo, init.localParams, remoteParams, interactiveTxSession, @@ -117,6 +120,7 @@ data class WaitForAcceptChannel( } else -> { logger.error { "could not start interactive-tx session: $interactiveTxAction" } + init.replyTo.complete(ChannelFundingResponse.Failure.CannotStartSession) Pair(Aborted, listOf(ChannelAction.Message.Send(Error(channelId, ChannelFundingError(channelId).message)))) } } @@ -126,6 +130,7 @@ data class WaitForAcceptChannel( } is Either.Left -> { logger.error(res.value) { "invalid ${cmd.message::class} in state ${this::class}" } + init.replyTo.complete(ChannelFundingResponse.Failure.InvalidChannelParameters(res.value)) return Pair(Aborted, listOf(ChannelAction.Message.Send(Error(init.temporaryChannelId(keyManager), res.value.message)))) } } @@ -133,15 +138,22 @@ data class WaitForAcceptChannel( is CancelOnTheFlyFunding -> { // Our peer won't accept this on-the-fly funding attempt: they probably already failed the corresponding HTLCs. logger.warning { "on-the-fly funding was rejected by our peer: ${cmd.message.toAscii()}" } + init.replyTo.complete(ChannelFundingResponse.Failure.AbortedByPeer(cmd.message.toAscii())) Pair(Aborted, listOf()) } - is Error -> handleRemoteError(cmd.message) + is Error -> { + init.replyTo.complete(ChannelFundingResponse.Failure.AbortedByPeer(cmd.message.toAscii())) + handleRemoteError(cmd.message) + } else -> unhandled(cmd) } is ChannelCommand.Close.MutualClose -> Pair(this@WaitForAcceptChannel, listOf(ChannelAction.ProcessCmdRes.NotExecuted(cmd, CommandUnavailableInThisState(temporaryChannelId, stateName)))) is ChannelCommand.Close.ForceClose -> handleLocalError(cmd, ForcedLocalCommit(temporaryChannelId)) is ChannelCommand.Connected -> unhandled(cmd) - is ChannelCommand.Disconnected -> Pair(Aborted, listOf()) + is ChannelCommand.Disconnected -> { + init.replyTo.complete(ChannelFundingResponse.Failure.Disconnected) + Pair(Aborted, listOf()) + } is ChannelCommand.Init -> unhandled(cmd) is ChannelCommand.Commitment -> unhandled(cmd) is ChannelCommand.Htlc -> unhandled(cmd) diff --git a/src/commonMain/kotlin/fr/acinq/lightning/channel/states/WaitForFundingCreated.kt b/src/commonMain/kotlin/fr/acinq/lightning/channel/states/WaitForFundingCreated.kt index 41839bb0c..92b14d2ca 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/channel/states/WaitForFundingCreated.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/channel/states/WaitForFundingCreated.kt @@ -7,6 +7,7 @@ import fr.acinq.lightning.MilliSatoshi import fr.acinq.lightning.blockchain.fee.FeeratePerKw import fr.acinq.lightning.channel.* import fr.acinq.lightning.wire.* +import kotlinx.coroutines.CompletableDeferred /* * We build the funding transaction for a new channel. @@ -30,6 +31,7 @@ import fr.acinq.lightning.wire.* * |--------------------------->| */ data class WaitForFundingCreated( + val replyTo: CompletableDeferred, val localParams: LocalParams, val remoteParams: RemoteParams, val interactiveTxSession: InteractiveTxSession, @@ -74,10 +76,24 @@ data class WaitForFundingCreated( when (signingSession) { is Either.Left -> { logger.error(signingSession.value) { "cannot initiate interactive-tx signing session" } + replyTo.complete(ChannelFundingResponse.Failure.CannotStartSession) handleLocalError(cmd, signingSession.value) } is Either.Right -> { val (session, commitSig) = signingSession.value + // We cannot guarantee that the channel creation is successful: the only way to guarantee that is to wait for on-chain confirmations. + // It is likely that we will restart before the transaction is confirmed, in which case we will lose the replyTo and the ability to notify the caller. + // We should be able to resume the signing steps and complete the funding process if we disconnect, so we optimistically notify the caller now. + replyTo.complete( + ChannelFundingResponse.Success( + channelId = channelId, + fundingTxIndex = 0, + fundingTxId = session.fundingTx.txId, + capacity = session.fundingParams.fundingAmount, + balance = session.localCommit.fold({ it.spec }, { it.spec }).toLocal, + liquidityPurchase = liquidityPurchase, + ) + ) val nextState = WaitForFundingSigned( channelParams, session, @@ -98,31 +114,40 @@ data class WaitForFundingCreated( } is InteractiveTxSessionAction.RemoteFailure -> { logger.warning { "interactive-tx failed: $interactiveTxAction" } + replyTo.complete(ChannelFundingResponse.Failure.InteractiveTxSessionFailed(interactiveTxAction)) handleLocalError(cmd, DualFundingAborted(channelId, interactiveTxAction.toString())) } } } is CommitSig -> { logger.warning { "received commit_sig too early, aborting" } + replyTo.complete(ChannelFundingResponse.Failure.UnexpectedMessage(cmd.message)) handleLocalError(cmd, UnexpectedCommitSig(channelId)) } is TxSignatures -> { logger.warning { "received tx_signatures too early, aborting" } + replyTo.complete(ChannelFundingResponse.Failure.UnexpectedMessage(cmd.message)) handleLocalError(cmd, UnexpectedFundingSignatures(channelId)) } is TxInitRbf -> { logger.info { "ignoring unexpected tx_init_rbf message" } + replyTo.complete(ChannelFundingResponse.Failure.UnexpectedMessage(cmd.message)) Pair(this@WaitForFundingCreated, listOf(ChannelAction.Message.Send(Warning(channelId, InvalidRbfAttempt(channelId).message)))) } is TxAckRbf -> { logger.info { "ignoring unexpected tx_ack_rbf message" } + replyTo.complete(ChannelFundingResponse.Failure.UnexpectedMessage(cmd.message)) Pair(this@WaitForFundingCreated, listOf(ChannelAction.Message.Send(Warning(channelId, InvalidRbfAttempt(channelId).message)))) } is TxAbort -> { logger.warning { "our peer aborted the dual funding flow: ascii='${cmd.message.toAscii()}' bin=${cmd.message.data.toHex()}" } + replyTo.complete(ChannelFundingResponse.Failure.AbortedByPeer(cmd.message.toAscii())) Pair(Aborted, listOf(ChannelAction.Message.Send(TxAbort(channelId, DualFundingAborted(channelId, "requested by peer").message)))) } - is Error -> handleRemoteError(cmd.message) + is Error -> { + replyTo.complete(ChannelFundingResponse.Failure.AbortedByPeer(cmd.message.toAscii())) + handleRemoteError(cmd.message) + } else -> unhandled(cmd) } is ChannelCommand.Close.MutualClose -> Pair(this@WaitForFundingCreated, listOf(ChannelAction.ProcessCmdRes.NotExecuted(cmd, CommandUnavailableInThisState(channelId, stateName)))) @@ -134,7 +159,10 @@ data class WaitForFundingCreated( is ChannelCommand.Funding -> unhandled(cmd) is ChannelCommand.Closing -> unhandled(cmd) is ChannelCommand.Connected -> unhandled(cmd) - is ChannelCommand.Disconnected -> Pair(Aborted, listOf()) + is ChannelCommand.Disconnected -> { + replyTo.complete(ChannelFundingResponse.Failure.Disconnected) + Pair(Aborted, listOf()) + } } } } diff --git a/src/commonMain/kotlin/fr/acinq/lightning/channel/states/WaitForInit.kt b/src/commonMain/kotlin/fr/acinq/lightning/channel/states/WaitForInit.kt index dcdd8b513..db8b5f520 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/channel/states/WaitForInit.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/channel/states/WaitForInit.kt @@ -18,6 +18,7 @@ data object WaitForInit : ChannelState() { return when (cmd) { is ChannelCommand.Init.NonInitiator -> { val nextState = WaitForOpenChannel( + cmd.replyTo, cmd.temporaryChannelId, cmd.fundingAmount, cmd.pushAmount, diff --git a/src/commonMain/kotlin/fr/acinq/lightning/channel/states/WaitForOpenChannel.kt b/src/commonMain/kotlin/fr/acinq/lightning/channel/states/WaitForOpenChannel.kt index 205acfb63..d27e313c5 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/channel/states/WaitForOpenChannel.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/channel/states/WaitForOpenChannel.kt @@ -10,6 +10,7 @@ import fr.acinq.lightning.channel.* import fr.acinq.lightning.channel.Helpers.Funding.computeChannelId import fr.acinq.lightning.utils.msat import fr.acinq.lightning.wire.* +import kotlinx.coroutines.CompletableDeferred /* * We are waiting for our peer to initiate a channel open. @@ -21,6 +22,7 @@ import fr.acinq.lightning.wire.* * |--------------------------->| */ data class WaitForOpenChannel( + val replyTo: CompletableDeferred, val temporaryChannelId: ByteVector32, val fundingAmount: Satoshi, val pushAmount: MilliSatoshi, @@ -94,11 +96,13 @@ data class WaitForOpenChannel( when (val fundingContributions = FundingContributions.create(channelKeys, keyManager.swapInOnChainWallet, fundingParams, walletInputs, accept.pushAmount, open.pushAmount, null)) { is Either.Left -> { logger.error { "could not fund channel: ${fundingContributions.value}" } + replyTo.complete(ChannelFundingResponse.Failure.FundingFailure(fundingContributions.value)) Pair(Aborted, listOf(ChannelAction.Message.Send(Error(temporaryChannelId, ChannelFundingError(temporaryChannelId).message)))) } is Either.Right -> { val interactiveTxSession = InteractiveTxSession(staticParams.remoteNodeId, channelKeys, keyManager.swapInOnChainWallet, fundingParams, 0.msat, 0.msat, emptySet(), fundingContributions.value) val nextState = WaitForFundingCreated( + replyTo, // If our peer asks us to pay the commit tx fees, we accept (only used in tests, as we're otherwise always the channel opener). localParams.copy(paysCommitTxFees = open.channelFlags.nonInitiatorPaysCommitFees), remoteParams, @@ -125,12 +129,14 @@ data class WaitForOpenChannel( } is Either.Left -> { logger.error(res.value) { "invalid ${cmd.message::class} in state ${this::class}" } + replyTo.complete(ChannelFundingResponse.Failure.InvalidChannelParameters(res.value)) Pair(Aborted, listOf(ChannelAction.Message.Send(Error(temporaryChannelId, res.value.message)))) } } } is Error -> { logger.error { "peer sent error: ascii=${cmd.message.toAscii()} bin=${cmd.message.data.toHex()}" } + replyTo.complete(ChannelFundingResponse.Failure.AbortedByPeer(cmd.message.toAscii())) return Pair(Aborted, listOf()) } else -> unhandled(cmd) @@ -138,7 +144,10 @@ data class WaitForOpenChannel( is ChannelCommand.Close.MutualClose -> Pair(this@WaitForOpenChannel, listOf(ChannelAction.ProcessCmdRes.NotExecuted(cmd, CommandUnavailableInThisState(temporaryChannelId, stateName)))) is ChannelCommand.Close.ForceClose -> handleLocalError(cmd, ForcedLocalCommit(temporaryChannelId)) is ChannelCommand.Connected -> unhandled(cmd) - is ChannelCommand.Disconnected -> Pair(Aborted, listOf()) + is ChannelCommand.Disconnected -> { + replyTo.complete(ChannelFundingResponse.Failure.Disconnected) + Pair(Aborted, listOf()) + } is ChannelCommand.Init -> unhandled(cmd) is ChannelCommand.Commitment -> unhandled(cmd) is ChannelCommand.Htlc -> unhandled(cmd) diff --git a/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt b/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt index 0144026ca..cd5dba383 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt @@ -629,11 +629,11 @@ class Peer( } /** - * Do a splice out using any suitable channel - * @return [ChannelCommand.Commitment.Splice.Response] if a splice was attempted, or {null} if no suitable - * channel was found + * Do a splice out using any suitable channel. + * + * @return [ChannelFundingResponse] if a splice was attempted, or {null} if no suitable channel was found */ - suspend fun spliceOut(amount: Satoshi, scriptPubKey: ByteVector, feerate: FeeratePerKw): ChannelCommand.Commitment.Splice.Response? { + suspend fun spliceOut(amount: Satoshi, scriptPubKey: ByteVector, feerate: FeeratePerKw): ChannelFundingResponse? { return channels.values .filterIsInstance() .firstOrNull { it.commitments.availableBalanceForSend() > amount } @@ -652,7 +652,7 @@ class Peer( } } - suspend fun spliceCpfp(channelId: ByteVector32, feerate: FeeratePerKw): ChannelCommand.Commitment.Splice.Response? { + suspend fun spliceCpfp(channelId: ByteVector32, feerate: FeeratePerKw): ChannelFundingResponse? { return channels.values .filterIsInstance() .find { it.channelId == channelId } @@ -672,7 +672,7 @@ class Peer( } } - suspend fun requestInboundLiquidity(amount: Satoshi, feerate: FeeratePerKw, fundingRate: LiquidityAds.FundingRate): ChannelCommand.Commitment.Splice.Response? { + suspend fun requestInboundLiquidity(amount: Satoshi, feerate: FeeratePerKw, fundingRate: LiquidityAds.FundingRate): ChannelFundingResponse? { return channels.values .filterIsInstance() .firstOrNull() @@ -1078,7 +1078,18 @@ class Peer( val localParams = LocalParams(nodeParams, isChannelOpener = false, payCommitTxFees = msg.channelFlags.nonInitiatorPaysCommitFees) val state = WaitForInit val channelConfig = ChannelConfig.standard - val (state1, actions1) = state.process(ChannelCommand.Init.NonInitiator(msg.temporaryChannelId, 0.sat, 0.msat, listOf(), localParams, channelConfig, theirInit!!, fundingRates = null)) + val initCommand = ChannelCommand.Init.NonInitiator( + replyTo = CompletableDeferred(), + temporaryChannelId = msg.temporaryChannelId, + fundingAmount = 0.sat, + pushAmount = 0.msat, + walletInputs = listOf(), + localParams = localParams, + channelConfig = channelConfig, + remoteInit = theirInit!!, + fundingRates = null + ) + val (state1, actions1) = state.process(initCommand) val (state2, actions2) = state1.process(ChannelCommand.MessageReceived(msg)) _channels = _channels + (msg.temporaryChannelId to state2) processActions(msg.temporaryChannelId, peerConnection, actions1 + actions2) @@ -1250,16 +1261,17 @@ class Peer( val state = WaitForInit val (state1, actions1) = state.process( ChannelCommand.Init.Initiator( - cmd.fundingAmount, - cmd.pushAmount, - cmd.walletInputs, - cmd.commitTxFeerate, - cmd.fundingTxFeerate, - localParams, - theirInit!!, - ChannelFlags(announceChannel = false, nonInitiatorPaysCommitFees = false), - ChannelConfig.standard, - cmd.channelType, + replyTo = CompletableDeferred(), + fundingAmount = cmd.fundingAmount, + pushAmount = cmd.pushAmount, + walletInputs = cmd.walletInputs, + commitTxFeerate = cmd.commitTxFeerate, + fundingTxFeerate = cmd.fundingTxFeerate, + localParams = localParams, + remoteInit = theirInit!!, + channelFlags = ChannelFlags(announceChannel = false, nonInitiatorPaysCommitFees = false), + channelConfig = ChannelConfig.standard, + channelType = cmd.channelType, requestRemoteFunding = null, channelOrigin = null, ) @@ -1294,7 +1306,7 @@ class Peer( ) // If the splice fails, we immediately unlock the utxos to reuse them in the next attempt. spliceCommand.replyTo.invokeOnCompletion { ex -> - if (ex == null && spliceCommand.replyTo.getCompleted() is ChannelCommand.Commitment.Splice.Response.Failure) { + if (ex == null && spliceCommand.replyTo.getCompleted() is ChannelFundingResponse.Failure) { swapInCommands.trySend(SwapInCommand.UnlockWalletInputs(cmd.walletInputs.map { it.outPoint }.toSet())) } } @@ -1353,22 +1365,28 @@ class Peer( // We ask our peer to pay the commit tx fees. val localParams = LocalParams(nodeParams, isChannelOpener = true, payCommitTxFees = false) val channelFlags = ChannelFlags(announceChannel = false, nonInitiatorPaysCommitFees = true) - val (state, actions) = WaitForInit.process( - ChannelCommand.Init.Initiator( - fundingAmount = localFundingAmount, - pushAmount = 0.msat, - walletInputs = cmd.walletInputs, - commitTxFeerate = currentFeerates.commitmentFeerate, - fundingTxFeerate = currentFeerates.fundingFeerate, - localParams = localParams, - remoteInit = theirInit!!, - channelFlags = channelFlags, - channelConfig = ChannelConfig.standard, - channelType = ChannelType.SupportedChannelType.AnchorOutputsZeroReserve, - requestRemoteFunding = requestRemoteFunding, - channelOrigin = Origin.OnChainWallet(cmd.walletInputs.map { it.outPoint }.toSet(), cmd.totalAmount.toMilliSatoshi(), fees), - ) + val initCommand = ChannelCommand.Init.Initiator( + replyTo = CompletableDeferred(), + fundingAmount = localFundingAmount, + pushAmount = 0.msat, + walletInputs = cmd.walletInputs, + commitTxFeerate = currentFeerates.commitmentFeerate, + fundingTxFeerate = currentFeerates.fundingFeerate, + localParams = localParams, + remoteInit = theirInit!!, + channelFlags = channelFlags, + channelConfig = ChannelConfig.standard, + channelType = ChannelType.SupportedChannelType.AnchorOutputsZeroReserve, + requestRemoteFunding = requestRemoteFunding, + channelOrigin = Origin.OnChainWallet(cmd.walletInputs.map { it.outPoint }.toSet(), cmd.totalAmount.toMilliSatoshi(), fees), ) + // If the channel creation fails, we immediately unlock the utxos to reuse them in the next attempt. + initCommand.replyTo.invokeOnCompletion { ex -> + if (ex == null && initCommand.replyTo.getCompleted() is ChannelFundingResponse.Failure) { + swapInCommands.trySend(SwapInCommand.UnlockWalletInputs(cmd.walletInputs.map { it.outPoint }.toSet())) + } + } + val (state, actions) = WaitForInit.process(initCommand) val msg = actions.filterIsInstance().map { it.message }.filterIsInstance().first() _channels = _channels + (msg.temporaryChannelId to state) processActions(msg.temporaryChannelId, peerConnection, actions) @@ -1465,6 +1483,7 @@ class Peer( logger.info { "requesting on-the-fly channel for paymentHash=${cmd.paymentHash} feerate=$fundingFeerate fee=${totalFees.total} paymentType=${paymentDetails.paymentType}" } val (state, actions) = WaitForInit.process( ChannelCommand.Init.Initiator( + replyTo = CompletableDeferred(), fundingAmount = 0.sat, // we don't have funds to contribute pushAmount = 0.msat, walletInputs = listOf(), diff --git a/src/commonTest/kotlin/fr/acinq/lightning/channel/TestsHelper.kt b/src/commonTest/kotlin/fr/acinq/lightning/channel/TestsHelper.kt index 90e38efc1..c3b63ad94 100644 --- a/src/commonTest/kotlin/fr/acinq/lightning/channel/TestsHelper.kt +++ b/src/commonTest/kotlin/fr/acinq/lightning/channel/TestsHelper.kt @@ -21,6 +21,7 @@ import fr.acinq.lightning.tests.utils.testLoggerFactory import fr.acinq.lightning.transactions.Transactions import fr.acinq.lightning.utils.* import fr.acinq.lightning.wire.* +import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.runBlocking import kotlinx.serialization.encodeToString @@ -189,6 +190,7 @@ object TestsHelper { val bobInit = Init(bobFeatures) val (alice1, actionsAlice1) = alice.process( ChannelCommand.Init.Initiator( + CompletableDeferred(), aliceFundingAmount, alicePushAmount, createWallet(aliceNodeParams.keyManager, aliceFundingAmount + 3500.sat).second, @@ -206,7 +208,19 @@ object TestsHelper { assertIs>(alice1) val temporaryChannelId = aliceChannelParams.channelKeys(alice.ctx.keyManager).temporaryChannelId val bobWallet = if (bobFundingAmount > 0.sat) createWallet(bobNodeParams.keyManager, bobFundingAmount + 1500.sat).second else listOf() - val (bob1, _) = bob.process(ChannelCommand.Init.NonInitiator(temporaryChannelId, bobFundingAmount, bobPushAmount, bobWallet, bobChannelParams, ChannelConfig.standard, aliceInit, TestConstants.fundingRates)) + val (bob1, _) = bob.process( + ChannelCommand.Init.NonInitiator( + CompletableDeferred(), + temporaryChannelId, + bobFundingAmount, + bobPushAmount, + bobWallet, + bobChannelParams, + ChannelConfig.standard, + aliceInit, + TestConstants.fundingRates + ) + ) assertIs>(bob1) val open = actionsAlice1.findOutgoingMessage() return Triple(alice1, bob1, open) @@ -224,7 +238,18 @@ object TestsHelper { requestRemoteFunding: Satoshi? = null, zeroConf: Boolean = false, ): Triple, LNChannel, Transaction> { - val (alice, channelReadyAlice, bob, channelReadyBob) = WaitForChannelReadyTestsCommon.init(channelType, aliceFeatures, bobFeatures, currentHeight, aliceFundingAmount, bobFundingAmount, alicePushAmount, bobPushAmount, requestRemoteFunding, zeroConf) + val (alice, channelReadyAlice, bob, channelReadyBob) = WaitForChannelReadyTestsCommon.init( + channelType, + aliceFeatures, + bobFeatures, + currentHeight, + aliceFundingAmount, + bobFundingAmount, + alicePushAmount, + bobPushAmount, + requestRemoteFunding, + zeroConf + ) val (alice1, actionsAlice1) = alice.process(ChannelCommand.MessageReceived(channelReadyBob)) assertIs>(alice1) actionsAlice1.has() diff --git a/src/commonTest/kotlin/fr/acinq/lightning/channel/states/QuiescenceTestsCommon.kt b/src/commonTest/kotlin/fr/acinq/lightning/channel/states/QuiescenceTestsCommon.kt index a27a3cd25..8b4979192 100644 --- a/src/commonTest/kotlin/fr/acinq/lightning/channel/states/QuiescenceTestsCommon.kt +++ b/src/commonTest/kotlin/fr/acinq/lightning/channel/states/QuiescenceTestsCommon.kt @@ -406,7 +406,7 @@ class QuiescenceTestsCommon : LightningTestSuite() { val (_, actionsAlice3) = alice2.process(ChannelCommand.MessageReceived(spliceAck)) actionsAlice3.hasOutgoingMessage() withTimeout(100) { - assertIs(cmdBob.replyTo.await()) + assertIs(cmdBob.replyTo.await()) } } @@ -434,7 +434,7 @@ class QuiescenceTestsCommon : LightningTestSuite() { val (_, actionsBob5) = bob4.process(ChannelCommand.MessageReceived(spliceAck)) actionsBob5.hasOutgoingMessage() withTimeout(100) { - assertIs(cmdAlice.replyTo.await()) + assertIs(cmdAlice.replyTo.await()) } } diff --git a/src/commonTest/kotlin/fr/acinq/lightning/channel/states/SpliceTestsCommon.kt b/src/commonTest/kotlin/fr/acinq/lightning/channel/states/SpliceTestsCommon.kt index 6d73e55ab..8996ee4e3 100644 --- a/src/commonTest/kotlin/fr/acinq/lightning/channel/states/SpliceTestsCommon.kt +++ b/src/commonTest/kotlin/fr/acinq/lightning/channel/states/SpliceTestsCommon.kt @@ -164,7 +164,7 @@ class SpliceTestsCommon : LightningTestSuite() { actionsAlice3.findOutgoingMessage() runBlocking { val response = cmd.replyTo.await() - assertIs(response) + assertIs(response) } } @@ -260,7 +260,7 @@ class SpliceTestsCommon : LightningTestSuite() { actionsBob2.hasOutgoingMessage() runBlocking { val response = cmd.replyTo.await() - assertIs(response) + assertIs(response) assertEquals(10_000_000.msat, response.liquidityFees) } val (bob3, actionsBob3) = bob2.process(ChannelCommand.MessageReceived(TxAbort(bob.channelId, SpliceAborted(bob.channelId).message))) @@ -332,7 +332,7 @@ class SpliceTestsCommon : LightningTestSuite() { actionsBob2.hasOutgoingMessage() runBlocking { val response = cmd.replyTo.await() - assertIs(response) + assertIs(response) assertEquals(500_000.msat, response.liquidityFees) assertEquals(currentFeeCredit, response.currentFeeCredit) } @@ -1570,9 +1570,9 @@ class SpliceTestsCommon : LightningTestSuite() { return exchangeSpliceSigs(alice1, commitSigAlice, bob1, commitSigBob) } - private fun checkCommandResponse(replyTo: CompletableDeferred, parentCommitment: Commitment, spliceInit: SpliceInit): TxId = runBlocking { + private fun checkCommandResponse(replyTo: CompletableDeferred, parentCommitment: Commitment, spliceInit: SpliceInit): TxId = runBlocking { val response = replyTo.await() - assertIs(response) + assertIs(response) assertEquals(response.capacity, parentCommitment.fundingAmount + spliceInit.fundingContribution) assertEquals(response.balance, parentCommitment.localCommit.spec.toLocal + spliceInit.fundingContribution.toMilliSatoshi() - spliceInit.pushAmount) assertEquals(response.fundingTxIndex, parentCommitment.fundingTxIndex + 1) diff --git a/src/commonTest/kotlin/fr/acinq/lightning/channel/states/WaitForAcceptChannelTestsCommon.kt b/src/commonTest/kotlin/fr/acinq/lightning/channel/states/WaitForAcceptChannelTestsCommon.kt index 5b5fe0a36..97cdda5fc 100644 --- a/src/commonTest/kotlin/fr/acinq/lightning/channel/states/WaitForAcceptChannelTestsCommon.kt +++ b/src/commonTest/kotlin/fr/acinq/lightning/channel/states/WaitForAcceptChannelTestsCommon.kt @@ -8,6 +8,7 @@ import fr.acinq.lightning.Lightning.randomBytes64 import fr.acinq.lightning.channel.* import fr.acinq.lightning.tests.TestConstants import fr.acinq.lightning.tests.utils.LightningTestSuite +import fr.acinq.lightning.tests.utils.runSuspendTest import fr.acinq.lightning.utils.msat import fr.acinq.lightning.utils.sat import fr.acinq.lightning.utils.toMilliSatoshi @@ -67,12 +68,13 @@ class WaitForAcceptChannelTestsCommon : LightningTestSuite() { } @Test - fun `recv AcceptChannel -- missing channel type`() { + fun `recv AcceptChannel -- missing channel type`() = runSuspendTest { val (alice, _, accept) = init() val (alice1, actions1) = alice.process(ChannelCommand.MessageReceived(accept.copy(tlvStream = TlvStream.empty()))) assertIs>(alice1) val error = actions1.hasOutgoingMessage() assertEquals(error, Error(accept.temporaryChannelId, MissingChannelType(accept.temporaryChannelId).message)) + assertIs(alice.state.init.replyTo.await()) } @Test @@ -94,26 +96,28 @@ class WaitForAcceptChannelTestsCommon : LightningTestSuite() { } @Test - fun `recv AcceptChannel -- missing liquidity ads`() { + fun `recv AcceptChannel -- missing liquidity ads`() = runSuspendTest { val (alice, _, accept) = init(requestRemoteFunding = TestConstants.bobFundingAmount) val accept1 = accept.copy(tlvStream = accept.tlvStream.copy(records = accept.tlvStream.records.filterNot { it is ChannelTlv.ProvideFundingTlv }.toSet())) val (alice1, actions1) = alice.process(ChannelCommand.MessageReceived(accept1)) assertIs>(alice1) val error = actions1.hasOutgoingMessage() assertEquals(error, Error(accept.temporaryChannelId, MissingLiquidityAds(accept.temporaryChannelId).message)) + assertIs(alice.state.init.replyTo.await()) } @Test - fun `recv AcceptChannel -- invalid liquidity ads amount`() { + fun `recv AcceptChannel -- invalid liquidity ads amount`() = runSuspendTest { val (alice, _, accept) = init(requestRemoteFunding = TestConstants.bobFundingAmount) val (alice1, actions1) = alice.process(ChannelCommand.MessageReceived(accept.copy(fundingAmount = TestConstants.bobFundingAmount - 100.sat))) assertIs>(alice1) val error = actions1.hasOutgoingMessage() assertEquals(error, Error(accept.temporaryChannelId, InvalidLiquidityAdsAmount(accept.temporaryChannelId, TestConstants.bobFundingAmount - 100.sat, TestConstants.bobFundingAmount).message)) + assertIs(alice.state.init.replyTo.await()) } @Test - fun `recv AcceptChannel -- invalid liquidity ads signature`() { + fun `recv AcceptChannel -- invalid liquidity ads signature`() = runSuspendTest { val (alice, _, accept) = init(requestRemoteFunding = TestConstants.bobFundingAmount) val willFund = ChannelTlv.ProvideFundingTlv(accept.willFund!!.copy(signature = randomBytes64())) val accept1 = accept.copy(tlvStream = accept.tlvStream.copy(records = accept.tlvStream.records.filterNot { it is ChannelTlv.ProvideFundingTlv }.toSet() + willFund)) @@ -121,6 +125,7 @@ class WaitForAcceptChannelTestsCommon : LightningTestSuite() { assertIs>(alice1) val error = actions1.hasOutgoingMessage() assertEquals(error, Error(accept.temporaryChannelId, InvalidLiquidityAdsSig(accept.temporaryChannelId).message)) + assertIs(alice.state.init.replyTo.await()) } @Test @@ -179,11 +184,12 @@ class WaitForAcceptChannelTestsCommon : LightningTestSuite() { } @Test - fun `recv Error`() { + fun `recv Error`() = runSuspendTest { val (alice, _, _) = init() val (alice1, actions1) = alice.process(ChannelCommand.MessageReceived(Error(ByteVector32.Zeroes, "oops"))) assertIs>(alice1) assertTrue(actions1.isEmpty()) + assertIs(alice.state.init.replyTo.await()) } companion object { diff --git a/src/commonTest/kotlin/fr/acinq/lightning/channel/states/WaitForFundingCreatedTestsCommon.kt b/src/commonTest/kotlin/fr/acinq/lightning/channel/states/WaitForFundingCreatedTestsCommon.kt index 1160524d9..bd89f3ae4 100644 --- a/src/commonTest/kotlin/fr/acinq/lightning/channel/states/WaitForFundingCreatedTestsCommon.kt +++ b/src/commonTest/kotlin/fr/acinq/lightning/channel/states/WaitForFundingCreatedTestsCommon.kt @@ -11,6 +11,7 @@ import fr.acinq.lightning.blockchain.fee.FeeratePerKw import fr.acinq.lightning.channel.* import fr.acinq.lightning.tests.TestConstants import fr.acinq.lightning.tests.utils.LightningTestSuite +import fr.acinq.lightning.tests.utils.runSuspendTest import fr.acinq.lightning.utils.msat import fr.acinq.lightning.utils.sat import fr.acinq.lightning.utils.toByteVector @@ -38,7 +39,7 @@ class WaitForFundingCreatedTestsCommon : LightningTestSuite() { } @Test - fun `complete interactive-tx protocol`() { + fun `complete interactive-tx protocol`() = runSuspendTest { val (alice, bob, inputAlice) = init(ChannelType.SupportedChannelType.AnchorOutputs, bobFundingAmount = 0.sat) // Alice ---- tx_add_input ----> Bob val (bob1, actionsBob1) = bob.process(ChannelCommand.MessageReceived(inputAlice)) @@ -63,6 +64,8 @@ class WaitForFundingCreatedTestsCommon : LightningTestSuite() { assertIs(bob3.state) assertEquals(alice2.state.channelParams.channelFeatures, ChannelFeatures(setOf(Feature.StaticRemoteKey, Feature.AnchorOutputs, Feature.DualFunding))) assertEquals(bob3.state.channelParams.channelFeatures, ChannelFeatures(setOf(Feature.StaticRemoteKey, Feature.AnchorOutputs, Feature.DualFunding))) + assertIs(alice.state.replyTo.await()).also { assertEquals(0, it.fundingTxIndex) } + assertIs(bob.state.replyTo.await()).also { assertEquals(0, it.fundingTxIndex) } verifyCommits(alice2.state.signingSession, bob3.state.signingSession, TestConstants.aliceFundingAmount.toMilliSatoshi() - TestConstants.alicePushAmount, TestConstants.alicePushAmount) } @@ -148,7 +151,7 @@ class WaitForFundingCreatedTestsCommon : LightningTestSuite() { } @Test - fun `complete interactive-tx protocol -- initiator can't pay fees`() { + fun `complete interactive-tx protocol -- initiator can't pay fees`() = runSuspendTest { val (alice, bob, inputAlice) = init(ChannelType.SupportedChannelType.AnchorOutputs, aliceFundingAmount = 1_000_100.sat, bobFundingAmount = 0.sat, alicePushAmount = 1_000_000.sat.toMilliSatoshi()) // Alice ---- tx_add_input ----> Bob val (bob1, actionsBob1) = bob.process(ChannelCommand.MessageReceived(inputAlice)) @@ -163,6 +166,7 @@ class WaitForFundingCreatedTestsCommon : LightningTestSuite() { val (bob3, actionsBob3) = bob2.process(ChannelCommand.MessageReceived(actionsAlice2.findOutgoingMessage())) actionsBob3.hasOutgoingMessage() assertIs(bob3.state) + assertIs(bob.state.replyTo.await()) } @Test @@ -265,11 +269,12 @@ class WaitForFundingCreatedTestsCommon : LightningTestSuite() { } @Test - fun `recv Error`() { + fun `recv Error`() = runSuspendTest { val (_, bob, _) = init(ChannelType.SupportedChannelType.AnchorOutputs, bobFundingAmount = 0.sat) val (bob1, actions1) = bob.process(ChannelCommand.MessageReceived(Error(ByteVector32.Zeroes, "oops"))) assertIs(bob1.state) assertTrue(actions1.isEmpty()) + assertIs(bob.state.replyTo.await()) } @Test @@ -281,13 +286,14 @@ class WaitForFundingCreatedTestsCommon : LightningTestSuite() { } @Test - fun `recv Disconnected`() { + fun `recv Disconnected`() = runSuspendTest { val (_, bob, txAddInput) = init(ChannelType.SupportedChannelType.AnchorOutputs, bobFundingAmount = 0.sat) val (bob1, _) = bob.process(ChannelCommand.MessageReceived(txAddInput)) assertIs(bob1.state) val (bob2, actions2) = bob1.process(ChannelCommand.Disconnected) assertIs(bob2.state) assertTrue(actions2.isEmpty()) + assertIs(bob.state.replyTo.await()) } companion object { diff --git a/src/commonTest/kotlin/fr/acinq/lightning/channel/states/WaitForOpenChannelTestsCommon.kt b/src/commonTest/kotlin/fr/acinq/lightning/channel/states/WaitForOpenChannelTestsCommon.kt index 9d71a8347..f59c71124 100644 --- a/src/commonTest/kotlin/fr/acinq/lightning/channel/states/WaitForOpenChannelTestsCommon.kt +++ b/src/commonTest/kotlin/fr/acinq/lightning/channel/states/WaitForOpenChannelTestsCommon.kt @@ -6,6 +6,7 @@ import fr.acinq.lightning.* import fr.acinq.lightning.channel.* import fr.acinq.lightning.tests.TestConstants import fr.acinq.lightning.tests.utils.LightningTestSuite +import fr.acinq.lightning.tests.utils.runSuspendTest import fr.acinq.lightning.utils.* import fr.acinq.lightning.wire.* import kotlin.test.Test @@ -58,13 +59,14 @@ class WaitForOpenChannelTestsCommon : LightningTestSuite() { } @Test - fun `recv OpenChannel -- missing channel type`() { + fun `recv OpenChannel -- missing channel type`() = runSuspendTest { val (_, bob, open) = TestsHelper.init() val open1 = open.copy(tlvStream = TlvStream.empty()) val (bob1, actions) = bob.process(ChannelCommand.MessageReceived(open1)) val error = actions.findOutgoingMessage() assertEquals(error, Error(open.temporaryChannelId, MissingChannelType(open.temporaryChannelId).message)) assertIs>(bob1) + assertIs(bob.state.replyTo.await()) } @Test @@ -139,18 +141,20 @@ class WaitForOpenChannelTestsCommon : LightningTestSuite() { } @Test - fun `recv Error`() { + fun `recv Error`() = runSuspendTest { val (_, bob, _) = TestsHelper.init() val (bob1, actions) = bob.process(ChannelCommand.MessageReceived(Error(ByteVector32.Zeroes, "oops"))) assertIs>(bob1) + assertIs(bob.state.replyTo.await()) assertTrue(actions.isEmpty()) } @Test - fun `recv Disconnected`() { + fun `recv Disconnected`() = runSuspendTest { val (_, bob, _) = TestsHelper.init() val (bob1, actions) = bob.process(ChannelCommand.Disconnected) assertIs>(bob1) + assertIs(bob.state.replyTo.await()) assertTrue(actions.isEmpty()) }