Skip to content

Commit

Permalink
Rework ElectrumClient (#512)
Browse files Browse the repository at this point in the history
* Move socketBuilder to connect()

This is only used when connecting, it doesn't need to be an argument of
the `ElectrumClient`.

* Refactor and document `linesFlow`

TCP sockets with electrum servers require some non-trivial code to
reconstruct individual messages from the bytes received on the socket.

We process bytes in chunks, then reconstruct utf8 strings, and finally
split those strings at newline characters into individual messages.

We document that code, rename fields to make it easier to understand,
and add unit tests. We remove it from the TCP socket abstraction, since
this is specific to electrum connections.

We also change the behavior in case the socket is closed while we have
buffered a partial message: it doesn't make sense to emit it, as listeners
won't be able to decode it.

* Remove duplicate `connectionState` flow

This is a pure 1:1 mapping from `connectionStatus`, there is no reason for
that duplication. Clients can trivially migrate or `map` using the
`toConnectionState` helper function.

* Rework connection flow

We clean up the coroutine hierarchy: the `ElectrumClient` has a single
internal coroutine that is launched once the connection has been
established with the electrum server. This coroutine launches three child
coroutines and uses supervision to gracefully stop if the connection is
closed or an error is received.

Connection establishment happens in the context of the job that calls
`connect()` and doesn't throw exceptions.

* Handle electrum server errors

On most RPC calls, we can gracefully handle server errors. Since we cannot
trust the electrum server anyway, and they may lie to us by omission, this
doesn't downgrade the security model.

The previous behavior was to throw an exception, which was never properly
handled and would just result in a crash of the wallet application.

* Add timeout and retry to RPC calls

We add an explicit timeout to RPC calls and a retry. If that retry also
fails, two strategies are available:

- handle the error and gracefully degrade (non-critical RPC calls)
- disconnect and retry with new electrum server (critical RPCs such as
  subscriptions)

The timeout can be updated by the application, for example when a slow
network is detected or when Tor is activated.
  • Loading branch information
t-bast authored Sep 11, 2023
1 parent 64c4415 commit fdba64a
Show file tree
Hide file tree
Showing 15 changed files with 577 additions and 360 deletions.
9 changes: 9 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,12 @@ jobs:
- name: Check without integration
if: matrix.os == 'macOS-latest'
run: ./gradlew build -x jvmTest

# Uncomment the lines below to store test results for debugging failed tests (useful for iOS)
# - name: Store test results
# if: always()
# uses: actions/upload-artifact@v3
# with:
# name: test results
# path: build/reports
# retention-days: 1

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,22 @@ import fr.acinq.lightning.transactions.Transactions
import fr.acinq.lightning.utils.MDCLogger
import fr.acinq.lightning.utils.sat


suspend fun IElectrumClient.getConfirmations(txId: ByteVector32): Int? {
val tx = kotlin.runCatching { getTx(txId) }.getOrNull()
return tx?.let { getConfirmations(tx) }
}
suspend fun IElectrumClient.getConfirmations(txId: ByteVector32): Int? = getTx(txId)?.let { tx -> getConfirmations(tx) }

/**
* @return the number of confirmations, zero if the transaction is in the mempool, null if the transaction is not found
* @return the number of confirmations, zero if the transaction is in the mempool, null if the transaction is not found
*/
suspend fun IElectrumClient.getConfirmations(tx: Transaction): Int? {
val scriptHash = ElectrumClient.computeScriptHash(tx.txOut.first().publicKeyScript)
val scriptHashHistory = getScriptHashHistory(scriptHash)
val item = scriptHashHistory.find { it.txid == tx.txid }
val blockHeight = startHeaderSubscription().blockHeight
return item?.let { if (item.blockHeight > 0) blockHeight - item.blockHeight + 1 else 0 }
return when (val status = connectionStatus.value) {
is ElectrumConnectionStatus.Connected -> {
val currentBlockHeight = status.height
val scriptHash = ElectrumClient.computeScriptHash(tx.txOut.first().publicKeyScript)
val scriptHashHistory = getScriptHashHistory(scriptHash)
val item = scriptHashHistory.find { it.txid == tx.txid }
item?.let { if (item.blockHeight > 0) currentBlockHeight - item.blockHeight + 1 else 0 }
}
else -> null
}
}

suspend fun IElectrumClient.computeSpliceCpfpFeerate(commitments: Commitments, targetFeerate: FeeratePerKw, spliceWeight: Int, logger: MDCLogger): Pair<FeeratePerKw, Satoshi> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import kotlinx.serialization.json.*

/**
* Common communication objects between [ElectrumClient] and external ressources (e.g. [ElectrumWatcher])
* See the documentation for the ElectrumX protocol there: https://electrumx.readthedocs.io
* See the documentation for the ElectrumX protocol there: https://electrumx-spesmilo.readthedocs.io
*/

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package fr.acinq.lightning.blockchain.electrum

import fr.acinq.bitcoin.*
import fr.acinq.lightning.utils.Connection
import fr.acinq.lightning.utils.sum
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
Expand Down Expand Up @@ -138,11 +137,8 @@ class ElectrumMiniWallet(
val unspents = client.getScriptHashUnspents(msg.scriptHash)
val newUtxos = unspents.minus((_walletStateFlow.value.addresses[bitcoinAddress] ?: emptyList()).toSet())
// request new parent txs
val parentTxs = newUtxos.map { utxo ->
val tx = client.getTx(utxo.txid)
logger.mdcinfo { "received parent transaction with txid=${tx.txid}" }
tx
}
val parentTxs = newUtxos.mapNotNull { utxo -> client.getTx(utxo.txid) }
parentTxs.forEach { tx -> logger.mdcinfo { "received parent transaction with txid=${tx.txid}" } }
val nextWalletState = this.copy(addresses = this.addresses + (bitcoinAddress to unspents), parentTxs = this.parentTxs + parentTxs.associateBy { it.txid })
logger.mdcinfo { "${unspents.size} utxo(s) for address=$bitcoinAddress balance=${nextWalletState.totalBalance}" }
unspents.forEach { logger.debug { "utxo=${it.outPoint.txid}:${it.outPoint.index} amount=${it.value} sat" } }
Expand All @@ -162,7 +158,6 @@ class ElectrumMiniWallet(
logger.error { "cannot subscribe to $bitcoinAddress ($result)" }
null
}

is AddressToPublicKeyScriptResult.Success -> {
val pubkeyScript = ByteVector(Script.write(result.script))
val scriptHash = ElectrumClient.computeScriptHash(pubkeyScript)
Expand All @@ -178,7 +173,7 @@ class ElectrumMiniWallet(
job = launch {
launch {
// listen to connection events
client.connectionState.filterIsInstance<Connection.ESTABLISHED>().collect { mailbox.send(WalletCommand.Companion.ElectrumConnected) }
client.connectionStatus.filterIsInstance<ElectrumConnectionStatus.Connected>().collect { mailbox.send(WalletCommand.Companion.ElectrumConnected) }
}
launch {
// listen to subscriptions events
Expand All @@ -191,13 +186,11 @@ class ElectrumMiniWallet(
logger.mdcinfo { "electrum connected" }
scriptHashes.values.forEach { scriptHash -> subscribe(scriptHash) }
}

is WalletCommand.Companion.ElectrumNotification -> {
if (it.msg is ScriptHashSubscriptionResponse) {
_walletStateFlow.value = _walletStateFlow.value.processSubscriptionResponse(it.msg)
}
}

is WalletCommand.Companion.AddAddress -> {
logger.mdcinfo { "adding new address=${it.bitcoinAddress}" }
subscribe(it.bitcoinAddress)?.let {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class ElectrumWatcher(val client: IElectrumClient, val scope: CoroutineScope, lo
logger.info { "initializing electrum watcher" }

suspend fun processScripHashHistory(history: List<TransactionHistoryItem>) = runCatching {
val txs = history.filter { it.blockHeight >= -1 }.map { client.getTx(it.txid) }
val txs = history.filter { it.blockHeight >= -1 }.mapNotNull { client.getTx(it.txid) }

// WatchSpent
txs.forEach { tx ->
Expand All @@ -91,30 +91,30 @@ class ElectrumWatcher(val client: IElectrumClient, val scope: CoroutineScope, lo
.filter { it.txId == item.txid }
.filter { state.height - item.blockHeight + 1 >= it.minDepth }
triggered.forEach { w ->
val merkle = client.getMerkle(w.txId, item.blockHeight)
val confirmations = state.height - merkle.block_height + 1
logger.info { "txid=${w.txId} had confirmations=$confirmations in block=${merkle.block_height} pos=${merkle.pos}" }
_notificationsFlow.emit(WatchEventConfirmed(w.channelId, w.event, merkle.block_height, merkle.pos, txMap[w.txId]!!))

// check whether we have transactions to publish
when (val event = w.event) {
is BITCOIN_PARENT_TX_CONFIRMED -> {
val tx = event.childTx
logger.info { "parent tx of txid=${tx.txid} has been confirmed" }
val cltvTimeout = Scripts.cltvTimeout(tx)
val csvTimeout = Scripts.csvTimeout(tx)
val absTimeout = max(merkle.block_height + csvTimeout, cltvTimeout)
state = if (absTimeout > state.height) {
logger.info { "delaying publication of txid=${tx.txid} until block=$absTimeout (curblock=${state.height})" }
val block2tx = state.block2tx + (absTimeout to state.block2tx.getOrElse(absTimeout) { setOf() } + tx)
state.copy(block2tx = block2tx)
} else {
client.broadcastTransaction(tx)
state.copy(sent = state.sent + tx)
client.getMerkle(w.txId, item.blockHeight)?.let { merkle ->
val confirmations = state.height - merkle.block_height + 1
logger.info { "txid=${w.txId} had confirmations=$confirmations in block=${merkle.block_height} pos=${merkle.pos}" }
_notificationsFlow.emit(WatchEventConfirmed(w.channelId, w.event, merkle.block_height, merkle.pos, txMap[w.txId]!!))

// check whether we have transactions to publish
when (val event = w.event) {
is BITCOIN_PARENT_TX_CONFIRMED -> {
val tx = event.childTx
logger.info { "parent tx of txid=${tx.txid} has been confirmed" }
val cltvTimeout = Scripts.cltvTimeout(tx)
val csvTimeout = Scripts.csvTimeout(tx)
val absTimeout = max(merkle.block_height + csvTimeout, cltvTimeout)
state = if (absTimeout > state.height) {
logger.info { "delaying publication of txid=${tx.txid} until block=$absTimeout (curblock=${state.height})" }
val block2tx = state.block2tx + (absTimeout to state.block2tx.getOrElse(absTimeout) { setOf() } + tx)
state.copy(block2tx = block2tx)
} else {
client.broadcastTransaction(tx)
state.copy(sent = state.sent + tx)
}
}
else -> {}
}

else -> {}
}
}
state = state.copy(watches = state.watches - triggered.toSet())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,49 @@
package fr.acinq.lightning.blockchain.electrum

import fr.acinq.bitcoin.BlockHeader
import fr.acinq.bitcoin.ByteVector32
import fr.acinq.bitcoin.Transaction
import fr.acinq.lightning.utils.Connection
import kotlinx.coroutines.CompletableDeferred
import fr.acinq.lightning.blockchain.fee.FeeratePerKw
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.StateFlow

/** Note to implementers: methods exposed through this interface must *not* throw exceptions. */
interface IElectrumClient {
val notifications: Flow<ElectrumSubscriptionResponse>
val connectionStatus: StateFlow<ElectrumConnectionStatus>

/** Return the transaction matching the txId provided, if it can be found. */
suspend fun getTx(txid: ByteVector32): Transaction?

suspend fun send(request: ElectrumRequest, replyTo: CompletableDeferred<ElectrumResponse>)
/** Return the block header at the given height, if it exists. */
suspend fun getHeader(blockHeight: Int): BlockHeader?

suspend fun getTx(txid: ByteVector32): Transaction
/** Return the block headers starting at the given height, if they exist (empty list otherwise). */
suspend fun getHeaders(startHeight: Int, count: Int): List<BlockHeader>

suspend fun getMerkle(txid: ByteVector32, blockHeight: Int, contextOpt: Transaction? = null): GetMerkleResponse
/** Return a merkle proof for the given transaction, if it can be found. */
suspend fun getMerkle(txid: ByteVector32, blockHeight: Int, contextOpt: Transaction? = null): GetMerkleResponse?

/** Return the transaction history for a given script, or an empty list if the script is unknown. */
suspend fun getScriptHashHistory(scriptHash: ByteVector32): List<TransactionHistoryItem>

/** Return the utxos matching a given script, or an empty list if the script is unknown. */
suspend fun getScriptHashUnspents(scriptHash: ByteVector32): List<UnspentItem>

suspend fun startScriptHashSubscription(scriptHash: ByteVector32): ScriptHashSubscriptionResponse

suspend fun startHeaderSubscription(): HeaderSubscriptionResponse
/**
* Try broadcasting a transaction: we cannot know whether the remote server really broadcast the transaction,
* so we always consider it to be a success. The client should regularly retry transactions that don't confirm.
*/
suspend fun broadcastTransaction(tx: Transaction): ByteVector32

suspend fun broadcastTransaction(tx: Transaction): BroadcastTransactionResponse
/** Estimate the feerate required for a transaction to be confirmed in the next [confirmations] blocks. */
suspend fun estimateFees(confirmations: Int): FeeratePerKw?

suspend fun estimateFees(confirmations: Int): EstimateFeeResponse
/******************** Subscriptions ********************/

val notifications: Flow<ElectrumSubscriptionResponse>

val connectionStatus: StateFlow<ElectrumConnectionStatus>
/** Subscribe to changes to a given script. */
suspend fun startScriptHashSubscription(scriptHash: ByteVector32): ScriptHashSubscriptionResponse

val connectionState: StateFlow<Connection>
/** Subscribe to headers for new blocks found. */
suspend fun startHeaderSubscription(): HeaderSubscriptionResponse
}
12 changes: 6 additions & 6 deletions src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class Peer(
}
}
launch {
watcher.client.connectionState.filter { it == Connection.ESTABLISHED }.collect {
watcher.client.connectionStatus.filter { it is ElectrumConnectionStatus.Connected }.collect {
// onchain fees are retrieved punctually, when electrum status moves to Connection.ESTABLISHED
// since the application is not running most of the time, and when it is, it will be only for a few minutes, this is good enough.
// (for a node that is online most of the time things would be different and we would need to re-evaluate onchain fee estimates on a regular basis)
Expand Down Expand Up @@ -255,7 +255,7 @@ class Peer(
}

private suspend fun updateEstimateFees() {
watcher.client.connectionState.filter { it == Connection.ESTABLISHED }.first()
watcher.client.connectionStatus.filter { it is ElectrumConnectionStatus.Connected }.first()
val sortedFees = listOf(
watcher.client.estimateFees(2),
watcher.client.estimateFees(6),
Expand All @@ -265,10 +265,10 @@ class Peer(
logger.info { "on-chain fees: $sortedFees" }
// TODO: If some feerates are null, we may implement a retry
onChainFeeratesFlow.value = OnChainFeerates(
fundingFeerate = sortedFees[3].feerate ?: FeeratePerKw(FeeratePerByte(2.sat)),
mutualCloseFeerate = sortedFees[2].feerate ?: FeeratePerKw(FeeratePerByte(10.sat)),
claimMainFeerate = sortedFees[1].feerate ?: FeeratePerKw(FeeratePerByte(20.sat)),
fastFeerate = sortedFees[0].feerate ?: FeeratePerKw(FeeratePerByte(50.sat))
fundingFeerate = sortedFees[3] ?: FeeratePerKw(FeeratePerByte(2.sat)),
mutualCloseFeerate = sortedFees[2] ?: FeeratePerKw(FeeratePerByte(10.sat)),
claimMainFeerate = sortedFees[1] ?: FeeratePerKw(FeeratePerByte(20.sat)),
fastFeerate = sortedFees[0] ?: FeeratePerKw(FeeratePerByte(50.sat))
)
}

Expand Down
13 changes: 0 additions & 13 deletions src/commonMain/kotlin/fr/acinq/lightning/io/TcpSocket.kt
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
package fr.acinq.lightning.io

import fr.acinq.lightning.utils.decodeToString
import fr.acinq.lightning.utils.splitByLines
import fr.acinq.lightning.utils.subArray
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import org.kodein.log.LoggerFactory

interface TcpSocket {
Expand Down Expand Up @@ -69,11 +64,3 @@ suspend fun TcpSocket.receiveAvailable(buffer: ByteArray) = receiveAvailable(buf
internal expect object PlatformSocketBuilder : TcpSocket.Builder

suspend fun TcpSocket.receiveFully(size: Int): ByteArray = ByteArray(size).also { receiveFully(it) }

fun TcpSocket.linesFlow(): Flow<String> = flow {
val buffer = ByteArray(8192)
while (true) {
val size = receiveAvailable(buffer)
emit(buffer.subArray(size))
}
}.decodeToString().splitByLines()
Loading

0 comments on commit fdba64a

Please sign in to comment.