Skip to content

Commit

Permalink
Add liquidity purchases to the AuditDb
Browse files Browse the repository at this point in the history
Whenever liquidity is purchased, we store it in the `AuditDb`. This lets
node operators gather useful statistics on their peers, and which ones
are actively using the liquidity that is purchased.

We store minimal information about the liquidity ads itself to be more
easily compatible with potential changes in the spec.
  • Loading branch information
t-bast committed Sep 4, 2024
1 parent f16eeaf commit 629ce61
Show file tree
Hide file tree
Showing 10 changed files with 237 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@ package fr.acinq.eclair.channel

import akka.actor.ActorRef
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, Transaction}
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, Transaction, TxId}
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
import fr.acinq.eclair.channel.Helpers.Closing.ClosingType
import fr.acinq.eclair.io.Peer.OpenChannelResponse
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate}
import fr.acinq.eclair.{BlockHeight, Features, ShortChannelId}
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, LiquidityAds}
import fr.acinq.eclair.{BlockHeight, Features, MilliSatoshi, ShortChannelId}

/**
* Created by PM on 17/08/2016.
Expand Down Expand Up @@ -79,6 +78,14 @@ case class ChannelSignatureSent(channel: ActorRef, commitments: Commitments) ext

case class ChannelSignatureReceived(channel: ActorRef, commitments: Commitments) extends ChannelEvent

case class LiquidityPurchase(fundingTxId: TxId, fundingTxIndex: Long, isBuyer: Boolean, amount: Satoshi, fees: LiquidityAds.Fees, capacity: Satoshi, localContribution: Satoshi, remoteContribution: Satoshi, localBalance: MilliSatoshi, remoteBalance: MilliSatoshi, outgoingHtlcCount: Long, incomingHtlcCount: Long) {
val previousCapacity: Satoshi = capacity - localContribution - remoteContribution
val previousLocalBalance: MilliSatoshi = if (isBuyer) localBalance - localContribution + fees.total else localBalance - localContribution - fees.total
val previousRemoteBalance: MilliSatoshi = if (isBuyer) remoteBalance - remoteContribution - fees.total else remoteBalance - remoteContribution + fees.total
}

case class ChannelLiquidityPurchased(channel: ActorRef, channelId: ByteVector32, remoteNodeId: PublicKey, purchase: LiquidityPurchase) extends ChannelEvent

case class ChannelErrorOccurred(channel: ActorRef, channelId: ByteVector32, remoteNodeId: PublicKey, error: ChannelError, isFatal: Boolean) extends ChannelEvent

// NB: the fee should be set to 0 when we're not paying it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -982,7 +982,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
sessionId,
nodeParams, fundingParams,
channelParams = d.commitments.params,
purpose = InteractiveTxBuilder.SpliceTx(parentCommitment),
purpose = InteractiveTxBuilder.SpliceTx(parentCommitment, d.commitments.changes),
localPushAmount = spliceAck.pushAmount, remotePushAmount = msg.pushAmount,
liquidityPurchase_opt = willFund_opt.map(_.purchase),
wallet
Expand Down Expand Up @@ -1029,7 +1029,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
sessionId,
nodeParams, fundingParams,
channelParams = d.commitments.params,
purpose = InteractiveTxBuilder.SpliceTx(parentCommitment),
purpose = InteractiveTxBuilder.SpliceTx(parentCommitment, d.commitments.changes),
localPushAmount = cmd.pushAmount, remotePushAmount = msg.pushAmount,
liquidityPurchase_opt = liquidityPurchase_opt,
wallet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package fr.acinq.eclair.channel.fund

import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.adapter.TypedActorRefOps
import akka.actor.typed.scaladsl.{ActorContext, Behaviors, StashBuffer}
import akka.actor.typed.{ActorRef, Behavior}
import akka.event.LoggingAdapter
Expand Down Expand Up @@ -163,6 +165,8 @@ object InteractiveTxBuilder {
def previousFundingAmount: Satoshi
def localCommitIndex: Long
def remoteCommitIndex: Long
def localNextHtlcId: Long
def remoteNextHtlcId: Long
def remotePerCommitmentPoint: PublicKey
def commitTxFeerate: FeeratePerKw
def fundingTxIndex: Long
Expand All @@ -175,15 +179,19 @@ object InteractiveTxBuilder {
override val previousFundingAmount: Satoshi = 0 sat
override val localCommitIndex: Long = 0
override val remoteCommitIndex: Long = 0
override val localNextHtlcId: Long = 0
override val remoteNextHtlcId: Long = 0
override val fundingTxIndex: Long = 0
override val localHtlcs: Set[DirectedHtlc] = Set.empty
}
case class SpliceTx(parentCommitment: Commitment) extends Purpose {
case class SpliceTx(parentCommitment: Commitment, changes: CommitmentChanges) extends Purpose {
override val previousLocalBalance: MilliSatoshi = parentCommitment.localCommit.spec.toLocal
override val previousRemoteBalance: MilliSatoshi = parentCommitment.remoteCommit.spec.toLocal
override val previousFundingAmount: Satoshi = parentCommitment.capacity
override val localCommitIndex: Long = parentCommitment.localCommit.index
override val remoteCommitIndex: Long = parentCommitment.remoteCommit.index
override val localNextHtlcId: Long = changes.localNextHtlcId
override val remoteNextHtlcId: Long = changes.remoteNextHtlcId
override val remotePerCommitmentPoint: PublicKey = parentCommitment.remoteCommit.remotePerCommitmentPoint
override val commitTxFeerate: FeeratePerKw = parentCommitment.localCommit.spec.commitTxFeerate
override val fundingTxIndex: Long = parentCommitment.fundingTxIndex + 1
Expand All @@ -199,6 +207,8 @@ object InteractiveTxBuilder {
override val previousFundingAmount: Satoshi = (previousLocalBalance + previousRemoteBalance).truncateToSatoshi
override val localCommitIndex: Long = replacedCommitment.localCommit.index
override val remoteCommitIndex: Long = replacedCommitment.remoteCommit.index
override val localNextHtlcId: Long = 0
override val remoteNextHtlcId: Long = 0
override val remotePerCommitmentPoint: PublicKey = replacedCommitment.remoteCommit.remotePerCommitmentPoint
override val commitTxFeerate: FeeratePerKw = replacedCommitment.localCommit.spec.commitTxFeerate
override val fundingTxIndex: Long = replacedCommitment.fundingTxIndex
Expand Down Expand Up @@ -792,6 +802,29 @@ private class InteractiveTxBuilder(replyTo: ActorRef[InteractiveTxBuilder.Respon
Behaviors.receiveMessagePartial {
case SignTransactionResult(signedTx) =>
log.info(s"interactive-tx txid=${signedTx.txId} partially signed with {} local inputs, {} remote inputs, {} local outputs and {} remote outputs", signedTx.tx.localInputs.length, signedTx.tx.remoteInputs.length, signedTx.tx.localOutputs.length, signedTx.tx.remoteOutputs.length)
// At this point, we're not completely sure that the transaction will succeed: if our peer doesn't send their
// commit_sig, the transaction will be aborted. But it's a best effort, because after sending our commit_sig,
// we won't store details about the liquidity purchase so we'll be unable to emit that event later. Even after
// fully signing the transaction, it may be double-spent by a force-close, which would invalidate it as well.
// The right solution is to check confirmations on the funding transaction before considering that a liquidity
// purchase is completed, which is what we do in our AuditDb.
liquidityPurchase_opt.foreach { p =>
val purchase = LiquidityPurchase(
fundingTxId = signedTx.txId,
fundingTxIndex = purpose.fundingTxIndex,
isBuyer = fundingParams.isInitiator,
amount = p.amount,
fees = p.fees,
capacity = fundingParams.fundingAmount,
localContribution = fundingParams.localContribution,
remoteContribution = fundingParams.remoteContribution,
localBalance = localCommit.spec.toLocal,
remoteBalance = localCommit.spec.toRemote,
outgoingHtlcCount = purpose.localNextHtlcId,
incomingHtlcCount = purpose.remoteNextHtlcId,
)
context.system.eventStream ! EventStream.Publish(ChannelLiquidityPurchased(replyTo.toClassic, channelParams.channelId, remoteNodeId, purchase))
}
replyTo ! Succeeded(InteractiveTxSigningSession.WaitingForSigs(fundingParams, purpose.fundingTxIndex, signedTx, Left(localCommit), remoteCommit), commitSig)
Behaviors.stopped
case WalletFailure(t) =>
Expand Down
4 changes: 4 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/AuditDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ trait AuditDb {

def add(paymentRelayed: PaymentRelayed): Unit

def add(liquidityPurchase: ChannelLiquidityPurchased): Unit

def add(txPublished: TransactionPublished): Unit

def add(txConfirmed: TransactionConfirmed): Unit
Expand All @@ -52,6 +54,8 @@ trait AuditDb {

def listRelayed(from: TimestampMilli, to: TimestampMilli, paginated_opt: Option[Paginated] = None): Seq[PaymentRelayed]

def listLiquidityPurchases(remoteNodeId: PublicKey): Seq[LiquidityPurchase]

def listNetworkFees(from: TimestampMilli, to: TimestampMilli): Seq[NetworkFee]

def stats(from: TimestampMilli, to: TimestampMilli, paginated_opt: Option[Paginated] = None): Seq[Stats]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorL
context.system.eventStream.subscribe(self, classOf[PaymentFailed])
context.system.eventStream.subscribe(self, classOf[PaymentReceived])
context.system.eventStream.subscribe(self, classOf[PaymentRelayed])
context.system.eventStream.subscribe(self, classOf[ChannelLiquidityPurchased])
context.system.eventStream.subscribe(self, classOf[TransactionPublished])
context.system.eventStream.subscribe(self, classOf[TransactionConfirmed])
context.system.eventStream.subscribe(self, classOf[ChannelErrorOccurred])
Expand Down Expand Up @@ -92,6 +93,8 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorL
}
auditDb.add(e)

case e: ChannelLiquidityPurchased => auditDb.add(e)

case e: TransactionPublished =>
log.info(s"paying mining fee=${e.miningFee} for txid=${e.tx.txid} desc=${e.desc}")
auditDb.add(e)
Expand Down
10 changes: 10 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ case class DualAuditDb(primary: AuditDb, secondary: AuditDb) extends AuditDb {
primary.add(paymentRelayed)
}

override def add(liquidityPurchase: ChannelLiquidityPurchased): Unit = {
runAsync(secondary.add(liquidityPurchase))
primary.add(liquidityPurchase)
}

override def add(txPublished: TransactionPublished): Unit = {
runAsync(secondary.add(txPublished))
primary.add(txPublished)
Expand Down Expand Up @@ -196,6 +201,11 @@ case class DualAuditDb(primary: AuditDb, secondary: AuditDb) extends AuditDb {
primary.listRelayed(from, to, paginated_opt)
}

override def listLiquidityPurchases(remoteNodeId: PublicKey): Seq[LiquidityPurchase] = {
runAsync(secondary.listLiquidityPurchases(remoteNodeId))
primary.listLiquidityPurchases(remoteNodeId)
}

override def listNetworkFees(from: TimestampMilli, to: TimestampMilli): Seq[AuditDb.NetworkFee] = {
runAsync(secondary.listNetworkFees(from, to))
primary.listNetworkFees(from, to)
Expand Down
Loading

0 comments on commit 629ce61

Please sign in to comment.