Skip to content

Commit

Permalink
Don't initiate on-the-fly-funding if remote feature not active
Browse files Browse the repository at this point in the history
We check the remote features before initiating an on-the-fly funding
attempt: it doesn't make sense to initiate it if our peer has not
activated the feature.
  • Loading branch information
t-bast committed Sep 16, 2024
1 parent 021b3c8 commit 69014d2
Show file tree
Hide file tree
Showing 12 changed files with 113 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private class MessageRelay(nodeParams: NodeParams,

private def waitForPreviousPeerForPolicyCheck(msg: OnionMessage, nextNodeId: PublicKey): Behavior[Command] = {
Behaviors.receiveMessagePartial {
case WrappedPeerInfo(PeerInfo(_, _, _, _, channels)) if channels.nonEmpty =>
case WrappedPeerInfo(info: PeerInfo) if info.channels.nonEmpty =>
switchboard ! GetPeerInfo(context.messageAdapter(WrappedPeerInfo), nextNodeId)
waitForNextPeerForPolicyCheck(msg, nextNodeId)
case _ =>
Expand All @@ -167,8 +167,8 @@ private class MessageRelay(nodeParams: NodeParams,

private def waitForNextPeerForPolicyCheck(msg: OnionMessage, nextNodeId: PublicKey): Behavior[Command] = {
Behaviors.receiveMessagePartial {
case WrappedPeerInfo(PeerInfo(peer, _, _, _, channels)) if channels.nonEmpty =>
peer ! Peer.RelayOnionMessage(messageId, msg, replyTo_opt)
case WrappedPeerInfo(info: PeerInfo) if info.channels.nonEmpty =>
info.peer ! Peer.RelayOnionMessage(messageId, msg, replyTo_opt)
Behaviors.stopped
case _ =>
Metrics.OnionMessagesNotRelayed.withTag(Tags.Reason, Tags.Reasons.NoChannelWithNextPeer).increment()
Expand Down
11 changes: 6 additions & 5 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -475,10 +475,11 @@ class Peer(val nodeParams: NodeParams,

case Event(r: GetPeerInfo, d) =>
val replyTo = r.replyTo.getOrElse(sender().toTyped)
replyTo ! PeerInfo(self, remoteNodeId, stateName, d match {
case c: ConnectedData => Some(c.address)
case _ => None
}, d.channels.values.toSet)
val peerInfo = d match {
case c: ConnectedData => PeerInfo(self, remoteNodeId, stateName, Some(c.remoteFeatures), Some(c.address), c.channels.values.toSet)
case _ => PeerInfo(self, remoteNodeId, stateName, None, None, d.channels.values.toSet)
}
replyTo ! peerInfo
stay()

case Event(r: GetPeerChannels, d) =>
Expand Down Expand Up @@ -867,7 +868,7 @@ object Peer {

case class GetPeerInfo(replyTo: Option[typed.ActorRef[PeerInfoResponse]])
sealed trait PeerInfoResponse { def nodeId: PublicKey }
case class PeerInfo(peer: ActorRef, nodeId: PublicKey, state: State, address: Option[NodeAddress], channels: Set[ActorRef]) extends PeerInfoResponse
case class PeerInfo(peer: ActorRef, nodeId: PublicKey, state: State, features: Option[Features[InitFeature]], address: Option[NodeAddress], channels: Set[ActorRef]) extends PeerInfoResponse
case class PeerNotFound(nodeId: PublicKey) extends PeerInfoResponse with DisconnectResponse { override def toString: String = s"peer $nodeId not found" }

/** Return the peer's current channels: note that the data may change concurrently, never assume it is fully up-to-date. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import akka.actor.typed.scaladsl.{ActorContext, Behaviors, TimerScheduler}
import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy}
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.blockchain.CurrentBlockHeight
import fr.acinq.eclair.{BlockHeight, Logs, channel}
import fr.acinq.eclair.{BlockHeight, Features, InitFeature, Logs, channel}

import scala.concurrent.duration.{DurationInt, FiniteDuration}

Expand Down Expand Up @@ -107,15 +107,15 @@ object PeerReadyNotifier {
private case object PeerNotConnected extends Command
private case object PeerConnected extends Command
private case object PeerDisconnected extends Command
private case class WrappedPeerInfo(peer: ActorRef[Peer.GetPeerChannels], channelCount: Int) extends Command
private case class WrappedPeerInfo(peer: ActorRef[Peer.GetPeerChannels], remoteFeatures: Features[InitFeature], channelCount: Int) extends Command
private case class NewBlockNotTimedOut(currentBlockHeight: BlockHeight) extends Command
private case object CheckChannelsReady extends Command
private case class WrappedPeerChannels(wrapped: Peer.PeerChannels) extends Command
private case object Timeout extends Command
private case object ToBeIgnored extends Command

sealed trait Result { def remoteNodeId: PublicKey }
case class PeerReady(remoteNodeId: PublicKey, peer: akka.actor.ActorRef, channelInfos: Seq[Peer.ChannelInfo]) extends Result { val channelsCount: Int = channelInfos.size }
case class PeerReady(remoteNodeId: PublicKey, peer: akka.actor.ActorRef, remoteFeatures: Features[InitFeature], channelInfos: Seq[Peer.ChannelInfo]) extends Result { val channelsCount: Int = channelInfos.size }
case class PeerUnavailable(remoteNodeId: PublicKey) extends Result

private case object ChannelsReadyTimerKey
Expand Down Expand Up @@ -243,7 +243,7 @@ private class PeerReadyNotifier(replyTo: ActorRef[PeerReadyNotifier.Result],
// In that case we still want to wait for a connection, because we may want to open a channel to them.
case _: Peer.PeerNotFound => PeerNotConnected
case info: Peer.PeerInfo if info.state != Peer.CONNECTED => PeerNotConnected
case info: Peer.PeerInfo => WrappedPeerInfo(info.peer.toTyped, info.channels.size)
case info: Peer.PeerInfo => WrappedPeerInfo(info.peer.toTyped, info.features.getOrElse(Features.empty), info.channels.size)
}
// We check whether the peer is already connected.
switchboard ! Switchboard.GetPeerInfo(peerInfoAdapter, remoteNodeId)
Expand All @@ -256,14 +256,14 @@ private class PeerReadyNotifier(replyTo: ActorRef[PeerReadyNotifier.Result],
Behaviors.same
case PeerDisconnected =>
Behaviors.same
case WrappedPeerInfo(peer, channelCount) =>
case WrappedPeerInfo(peer, remoteFeatures, channelCount) =>
if (channelCount == 0) {
log.info("peer is ready with no channels")
replyTo ! PeerReady(remoteNodeId, peer.toClassic, Seq.empty)
replyTo ! PeerReady(remoteNodeId, peer.toClassic, remoteFeatures, Seq.empty)
Behaviors.stopped
} else {
log.debug("peer is connected with {} channels", channelCount)
waitForChannelsReady(peer, switchboard)
waitForChannelsReady(peer, switchboard, remoteFeatures)
}
case NewBlockNotTimedOut(currentBlockHeight) =>
log.debug("waiting for peer to connect at block {}", currentBlockHeight)
Expand All @@ -277,7 +277,7 @@ private class PeerReadyNotifier(replyTo: ActorRef[PeerReadyNotifier.Result],
}
}

private def waitForChannelsReady(peer: ActorRef[Peer.GetPeerChannels], switchboard: ActorRef[Switchboard.GetPeerInfo]): Behavior[Command] = {
private def waitForChannelsReady(peer: ActorRef[Peer.GetPeerChannels], switchboard: ActorRef[Switchboard.GetPeerInfo], remoteFeatures: Features[InitFeature]): Behavior[Command] = {
timers.startTimerWithFixedDelay(ChannelsReadyTimerKey, CheckChannelsReady, initialDelay = 50 millis, delay = 1 second)
Behaviors.receiveMessagePartial {
case CheckChannelsReady =>
Expand All @@ -286,7 +286,7 @@ private class PeerReadyNotifier(replyTo: ActorRef[PeerReadyNotifier.Result],
Behaviors.same
case WrappedPeerChannels(peerChannels) =>
if (peerChannels.channels.map(_.state).forall(isChannelReady)) {
replyTo ! PeerReady(remoteNodeId, peer.toClassic, peerChannels.channels)
replyTo ! PeerReady(remoteNodeId, peer.toClassic, remoteFeatures, peerChannels.channels)
Behaviors.stopped
} else {
log.debug("peer has {} channels that are not ready", peerChannels.channels.count(s => !isChannelReady(s.state)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import fr.acinq.eclair.payment.{ChannelPaymentRelayed, IncomingPaymentPacket}
import fr.acinq.eclair.wire.protocol.FailureMessageCodecs.createBadOnionFailure
import fr.acinq.eclair.wire.protocol.PaymentOnion.IntermediatePayload
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{Features, Logs, NodeParams, ShortChannelId, TimestampMilli, TimestampSecond, channel, nodeFee}
import fr.acinq.eclair.{Features, InitFeature, Logs, NodeParams, TimestampMilli, TimestampSecond, channel, nodeFee}

import java.util.UUID
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -153,7 +153,7 @@ class ChannelRelay private(nodeParams: NodeParams,
case Some(walletNodeId) if nodeParams.peerWakeUpConfig.enabled => wakeUp(walletNodeId)
case _ =>
context.self ! DoRelay
relay(Seq.empty)
relay(None, Seq.empty)
}
}

Expand All @@ -166,21 +166,21 @@ class ChannelRelay private(nodeParams: NodeParams,
Metrics.recordPaymentRelayFailed(Tags.FailureType.WakeUp, Tags.RelayType.Channel)
context.log.info("rejecting htlc: failed to wake-up remote peer")
safeSendAndStop(r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(UnknownNextPeer()), commit = true))
case WrappedPeerReadyResult(_: PeerReadyNotifier.PeerReady) =>
case WrappedPeerReadyResult(r: PeerReadyNotifier.PeerReady) =>
context.self ! DoRelay
relay(Seq.empty)
relay(Some(r.remoteFeatures), Seq.empty)
}
}

def relay(previousFailures: Seq[PreviouslyTried]): Behavior[Command] = {
def relay(remoteFeatures_opt: Option[Features[InitFeature]], previousFailures: Seq[PreviouslyTried]): Behavior[Command] = {
Behaviors.receiveMessagePartial {
case DoRelay =>
if (previousFailures.isEmpty) {
val nextNodeId_opt = channels.headOption.map(_._2.nextNodeId)
context.log.info("relaying htlc #{} from channelId={} to requestedShortChannelId={} nextNode={}", r.add.id, r.add.channelId, requestedShortChannelId_opt, nextNodeId_opt.getOrElse(""))
}
context.log.debug("attempting relay previousAttempts={}", previousFailures.size)
handleRelay(previousFailures) match {
handleRelay(remoteFeatures_opt, previousFailures) match {
case RelayFailure(cmdFail) =>
Metrics.recordPaymentRelayFailed(Tags.FailureType(cmdFail), Tags.RelayType.Channel)
context.log.info("rejecting htlc reason={}", cmdFail.reason)
Expand All @@ -192,12 +192,12 @@ class ChannelRelay private(nodeParams: NodeParams,
case RelaySuccess(selectedChannelId, cmdAdd) =>
context.log.info("forwarding htlc #{} from channelId={} to channelId={}", r.add.id, r.add.channelId, selectedChannelId)
register ! Register.Forward(forwardFailureAdapter, selectedChannelId, cmdAdd)
waitForAddResponse(selectedChannelId, previousFailures)
waitForAddResponse(selectedChannelId, remoteFeatures_opt, previousFailures)
}
}
}

private def waitForAddResponse(selectedChannelId: ByteVector32, previousFailures: Seq[PreviouslyTried]): Behavior[Command] =
private def waitForAddResponse(selectedChannelId: ByteVector32, remoteFeatures_opt: Option[Features[InitFeature]], previousFailures: Seq[PreviouslyTried]): Behavior[Command] =
Behaviors.receiveMessagePartial {
case WrappedForwardFailure(Register.ForwardFailure(Register.Forward(_, channelId, _))) =>
context.log.warn(s"couldn't resolve downstream channel $channelId, failing htlc #${upstream.add.id}")
Expand All @@ -208,7 +208,7 @@ class ChannelRelay private(nodeParams: NodeParams,
case WrappedAddResponse(addFailed: RES_ADD_FAILED[_]) =>
context.log.info("attempt failed with reason={}", addFailed.t.getClass.getSimpleName)
context.self ! DoRelay
relay(previousFailures :+ PreviouslyTried(selectedChannelId, addFailed))
relay(remoteFeatures_opt, previousFailures :+ PreviouslyTried(selectedChannelId, addFailed))

case WrappedAddResponse(_: RES_SUCCESS[_]) =>
context.log.debug("sent htlc to the downstream channel")
Expand Down Expand Up @@ -280,7 +280,7 @@ class ChannelRelay private(nodeParams: NodeParams,
* - a CMD_FAIL_HTLC to be sent back upstream
* - a CMD_ADD_HTLC to propagate downstream
*/
private def handleRelay(previousFailures: Seq[PreviouslyTried]): RelayResult = {
private def handleRelay(remoteFeatures_opt: Option[Features[InitFeature]], previousFailures: Seq[PreviouslyTried]): RelayResult = {
val alreadyTried = previousFailures.map(_.channelId)
selectPreferredChannel(alreadyTried) match {
case Some(outgoingChannel) => relayOrFail(outgoingChannel)
Expand All @@ -298,7 +298,7 @@ class ChannelRelay private(nodeParams: NodeParams,
CMD_FAIL_HTLC(r.add.id, Right(UnknownNextPeer()), commit = true)
}
walletNodeId_opt match {
case Some(walletNodeId) if shouldAttemptOnTheFlyFunding(previousFailures) => RelayNeedsFunding(walletNodeId, cmdFail)
case Some(walletNodeId) if shouldAttemptOnTheFlyFunding(remoteFeatures_opt, previousFailures) => RelayNeedsFunding(walletNodeId, cmdFail)
case _ => RelayFailure(cmdFail)
}
}
Expand Down Expand Up @@ -400,8 +400,8 @@ class ChannelRelay private(nodeParams: NodeParams,
}

/** If we fail to relay a payment, we may want to attempt on-the-fly funding. */
private def shouldAttemptOnTheFlyFunding(previousFailures: Seq[PreviouslyTried]): Boolean = {
val featureOk = nodeParams.features.hasFeature(Features.OnTheFlyFunding)
private def shouldAttemptOnTheFlyFunding(remoteFeatures_opt: Option[Features[InitFeature]], previousFailures: Seq[PreviouslyTried]): Boolean = {
val featureOk = Features.canUseFeature(nodeParams.features.initFeatures(), remoteFeatures_opt.getOrElse(Features.empty), Features.OnTheFlyFunding)
// If we have a channel with the next node, we only want to perform on-the-fly funding for liquidity issues.
val liquidityIssue = previousFailures.forall {
case PreviouslyTried(_, RES_ADD_FAILED(_, _: InsufficientFunds, _)) => true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import fr.acinq.eclair.router.Router.{ChannelHop, HopRelayParams, Route, RoutePa
import fr.acinq.eclair.router.{BalanceTooLow, RouteNotFound}
import fr.acinq.eclair.wire.protocol.PaymentOnion.IntermediatePayload
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{Alias, CltvExpiry, CltvExpiryDelta, EncodedNodeId, Features, Logs, MilliSatoshi, MilliSatoshiLong, NodeParams, TimestampMilli, UInt64, nodeFee, randomBytes32}
import fr.acinq.eclair.{Alias, CltvExpiry, CltvExpiryDelta, EncodedNodeId, Features, InitFeature, Logs, MilliSatoshi, MilliSatoshiLong, NodeParams, TimestampMilli, UInt64, nodeFee, randomBytes32}

import java.util.UUID
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -164,8 +164,8 @@ object NodeRelay {
}

/** If we fail to relay a payment, we may want to attempt on-the-fly funding if it makes sense. */
private def shouldAttemptOnTheFlyFunding(nodeParams: NodeParams, failures: Seq[PaymentFailure]): Boolean = {
val featureOk = nodeParams.features.hasFeature(Features.OnTheFlyFunding)
private def shouldAttemptOnTheFlyFunding(nodeParams: NodeParams, recipientFeatures_opt: Option[Features[InitFeature]], failures: Seq[PaymentFailure]): Boolean = {
val featureOk = Features.canUseFeature(nodeParams.features.initFeatures(), recipientFeatures_opt.getOrElse(Features.empty), Features.OnTheFlyFunding)
val balanceTooLow = failures.collectFirst { case f@LocalFailure(_, _, BalanceTooLow) => f }.nonEmpty
val routeNotFound = failures.collectFirst { case f@LocalFailure(_, _, RouteNotFound) => f }.nonEmpty
featureOk && (balanceTooLow || routeNotFound)
Expand Down Expand Up @@ -298,7 +298,7 @@ class NodeRelay private(nodeParams: NodeParams,
private def ensureRecipientReady(upstream: Upstream.Hot.Trampoline, recipient: Recipient, nextPayload: IntermediatePayload.NodeRelay, nextPacket_opt: Option[OnionRoutingPacket]): Behavior[Command] = {
nextWalletNodeId(nodeParams, recipient) match {
case Some(walletNodeId) if nodeParams.peerWakeUpConfig.enabled => waitForPeerReady(upstream, walletNodeId, recipient, nextPayload, nextPacket_opt)
case _ => relay(upstream, recipient, nextPayload, nextPacket_opt)
case _ => relay(upstream, recipient, None, nextPayload, nextPacket_opt)
}
}

Expand All @@ -316,14 +316,14 @@ class NodeRelay private(nodeParams: NodeParams,
context.log.warn("rejecting payment: failed to wake-up remote peer")
rejectPayment(upstream, Some(UnknownNextPeer()))
stopping()
case WrappedPeerReadyResult(_: PeerReadyNotifier.PeerReady) =>
relay(upstream, recipient, nextPayload, nextPacket_opt)
case WrappedPeerReadyResult(r: PeerReadyNotifier.PeerReady) =>
relay(upstream, recipient, Some(r.remoteFeatures), nextPayload, nextPacket_opt)
}
}
}

/** Relay the payment to the next identified node: this is similar to sending an outgoing payment. */
private def relay(upstream: Upstream.Hot.Trampoline, recipient: Recipient, payloadOut: IntermediatePayload.NodeRelay, packetOut_opt: Option[OnionRoutingPacket]): Behavior[Command] = {
private def relay(upstream: Upstream.Hot.Trampoline, recipient: Recipient, recipientFeatures_opt: Option[Features[InitFeature]], payloadOut: IntermediatePayload.NodeRelay, packetOut_opt: Option[OnionRoutingPacket]): Behavior[Command] = {
context.log.debug("relaying trampoline payment (amountIn={} expiryIn={} amountOut={} expiryOut={})", upstream.amountIn, upstream.expiryIn, payloadOut.amountToForward, payloadOut.outgoingCltv)
val confidence = (upstream.received.map(_.add.endorsement).min + 0.5) / 8
val paymentCfg = SendPaymentConfig(relayId, relayId, None, paymentHash, recipient.nodeId, upstream, None, None, storeInDb = false, publishEvent = false, recordPathFindingMetrics = true, confidence)
Expand All @@ -342,7 +342,7 @@ class NodeRelay private(nodeParams: NodeParams,
}
val payFSM = outgoingPaymentFactory.spawnOutgoingPayFSM(context, paymentCfg, useMultiPart)
payFSM ! payment
sending(upstream, recipient, payloadOut, TimestampMilli.now(), fulfilledUpstream = false)
sending(upstream, recipient, recipientFeatures_opt, payloadOut, TimestampMilli.now(), fulfilledUpstream = false)
}

/**
Expand All @@ -354,6 +354,7 @@ class NodeRelay private(nodeParams: NodeParams,
*/
private def sending(upstream: Upstream.Hot.Trampoline,
recipient: Recipient,
recipientFeatures_opt: Option[Features[InitFeature]],
nextPayload: IntermediatePayload.NodeRelay,
startedAt: TimestampMilli,
fulfilledUpstream: Boolean): Behavior[Command] =
Expand All @@ -365,7 +366,7 @@ class NodeRelay private(nodeParams: NodeParams,
// We want to fulfill upstream as soon as we receive the preimage (even if not all HTLCs have fulfilled downstream).
context.log.debug("got preimage from downstream")
fulfillPayment(upstream, paymentPreimage)
sending(upstream, recipient, nextPayload, startedAt, fulfilledUpstream = true)
sending(upstream, recipient, recipientFeatures_opt, nextPayload, startedAt, fulfilledUpstream = true)
} else {
// we don't want to fulfill multiple times
Behaviors.same
Expand All @@ -381,7 +382,7 @@ class NodeRelay private(nodeParams: NodeParams,
stopping()
case WrappedPaymentFailed(PaymentFailed(_, _, failures, _)) =>
nextWalletNodeId(nodeParams, recipient) match {
case Some(walletNodeId) if shouldAttemptOnTheFlyFunding(nodeParams, failures) =>
case Some(walletNodeId) if shouldAttemptOnTheFlyFunding(nodeParams, recipientFeatures_opt, failures) =>
context.log.info("trampoline payment failed, attempting on-the-fly funding")
attemptOnTheFlyFunding(upstream, walletNodeId, recipient, nextPayload, failures, startedAt)
case _ =>
Expand Down
Loading

0 comments on commit 69014d2

Please sign in to comment.