Skip to content

Commit

Permalink
feat: added packet count and amount metrics to the connector core
Browse files Browse the repository at this point in the history
  • Loading branch information
JoblersTune committed Aug 15, 2024
1 parent 00d24ce commit 6e29243
Show file tree
Hide file tree
Showing 10 changed files with 351 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@
"uid": "PBFA97CFB590B2093"
},
"editorMode": "code",
"expr": "(increase(transactions_amount_total[30s]) * 0.5 * 0.0001)",
"expr": "(increase(packet_amount_fulfill_total[30s]) * 0.5 * 0.0001)",
"hide": false,
"instant": false,
"interval": "",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1161,6 +1161,79 @@ describe('OutgoingPaymentService', (): void => {
return payment
}

test('Telemetry Transaction Counter increments for COMPLETED transactions', async (): Promise<void> => {
const incrementTrxCounterSpy = jest
.spyOn(telemetryService!, 'incrementCounter')
.mockImplementation(() => Promise.resolve())

incomingPayment = await createIncomingPayment(deps, {
walletAddressId: receiverWalletAddress.id,
incomingAmount: {
value: receiveAmount.value,
assetCode: receiverWalletAddress.asset.code,
assetScale: receiverWalletAddress.asset.scale
}
})
assert.ok(incomingPayment.walletAddress)

const createdPayment = await setup({
receiver: incomingPayment.getUrl(incomingPayment.walletAddress),
receiveAmount,
method: 'ilp'
})

mockPaymentHandlerPay(createdPayment, incomingPayment)
const payment = await processNext(
createdPayment.id,
OutgoingPaymentState.Completed
)
await expectOutcome(payment, {
accountBalance: 0n,
amountSent: payment.debitAmount.value,
amountDelivered: payment.receiveAmount.value,
incomingPaymentReceived: payment.receiveAmount.value,
withdrawAmount: 0n
})

expect(incrementTrxCounterSpy).toHaveBeenCalledTimes(1)
})

test('Telemetry Transaction Counter does not increments for FAILED transactions', async (): Promise<void> => {
const incrementTrxCounterSpy = jest
.spyOn(telemetryService!, 'incrementCounter')
.mockImplementation(() => Promise.resolve())
const createdPayment = await setup({
receiver,
debitAmount,
method: 'ilp'
})

mockPaymentHandlerPay(
createdPayment,
incomingPayment,
new PaymentMethodHandlerError('Failed payment', {
description: '',
retryable: false
})
)

const payment = await processNext(
createdPayment.id,
OutgoingPaymentState.Failed,
'Failed payment'
)

await expectOutcome(payment, {
accountBalance: payment.debitAmount.value,
amountSent: 0n,
amountDelivered: 0n,
incomingPaymentReceived: incomingPayment.receivedAmount.value,
withdrawAmount: 0n
})

expect(incrementTrxCounterSpy).not.toHaveBeenCalled()
})

test('COMPLETED with Telemetry Fee Counter (receiveAmount < incomingPayment.incomingAmount)', async (): Promise<void> => {
const spyTelFeeAmount = jest.spyOn(
telemetryService,
Expand Down
40 changes: 40 additions & 0 deletions packages/backend/src/payment-method/ilp/connector/core/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,43 @@
# `core`

> Toolkit comprised of core services and middleware needed to develop ILP applications.
## Telemetry Collection

Currently, we collect packet count and packet amount metrics directly within the connector core. These metrics are captured at the Interledger layer to track packet activity while ensuring privacy by collecting amounts at the packet level, rather than at the transaction level. This approach helps to preserve privacy, as we do not expose entire transaction amounts, while also incorporating privacy-preserving measures into the collected amounts. You can read more about privacy [here](https://rafiki.dev/telemetry/privacy/).

### Why We Collect on the Sending Side

The first decision in collecting this data was whether to do so on the sender's side or the receiver's side. We opted for the sender’s side to maintain consistency across our metrics, particularly for calculating average transaction amounts and times, which are tied to the outgoing payment flow in the Open Payments (OP) layer. By collecting metrics from the same perspective, we ensure alignment with how other metrics (like transaction completion times) are captured.

We also considered collecting metrics on both the sending and receiving sides, capturing metrics when prepare packets were received by the receiver and when fulfill or reject responses were received by the sender. However, this could lead to unreliable metrics, as telemetry is optional and might not be enabled on all nodes.

### Why We Chose handleIlpData

Given these considerations, we decided to place our packet count and amount metrics in the `handleIlpData` function within the Rafiki Connector Core. This function plays a crucial role in processing ILP packets, handling both outgoing payments and quotes.

The specific metrics we collect here include:

- packet_count_prepare: Counts the prepare packets sent, collected before the middleware routes are executed.
- packet_count_fulfill: Counts the fulfill packets received, collected after receiving a reply from the receiver.
- packet_count_reject: Counts the reject packets received.
- packet_amount_fulfill: Records the amount sent in fulfill packets.

These metrics provide valuable insights, including potential packet loss.

### Challenges with the Current Setup

While `handleIlpData` is an effective location for telemetry collection, it has some limitations:

Sender-Side Limitation: Currently, metrics are only collected on the sender side. This is adequate for now but does not capture data from connecting nodes, which we plan to address in future implementations when multi-hop support is added.

### Other Considered Locations for Metrics Collection

We explored several alternative locations for collecting telemetry metrics within the Rafiki Connector Core:

- Dedicated Middleware: Initially, we implemented a dedicated telemetry middleware. However, this approach resulted in data being duplicated on both the sender and receiver sides, leading to inaccurate metrics. To address this, we would need to filter the data to ensure metrics are only collected on the sender side. Additionally, the telemetry middleware would need to effectively handle errors thrown in the middleware chain. To achieve this, it might be necessary to place the telemetry middleware right before the error-handling middleware, allowing it to catch and reflect any errors that occur. This would involve collecting the prepare packet count, wrapping the `next()` function in a try-catch block to capture any new errors, and then collecting reply and amount metrics after `next()` has resolved.
- `ilpHandler` on the Receiving Side: We also considered adding telemetry to the `ilpHandler` function, which processes middleware on receiving and connecting nodes. However, this would lead to a fragmented metric collection, with some metrics gathered on the receiver side and others on the sender side. This fragmentation could complicate the handling of concepts like transaction count, which would require dual collection on both sender and receiver sides. We'd also have to watch for the possibility of data duplication on the connectors because their middleware might trigger in each direction, as they receive prepares and again as they receive responses.

### Moving Forward

As we implement multi-hop capabilities and further refine the connector architecture, we will likely identify better entry and exit points for telemetry collection. This will allow us to capture a more comprehensive set of metrics across both sender and receiver nodes, ensuring a complete and accurate understanding of the network’s behavior.

This file was deleted.

28 changes: 28 additions & 0 deletions packages/backend/src/payment-method/ilp/connector/core/rafiki.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import {
createIlpPacketMiddleware
} from './middleware/ilp-packet'
import { TelemetryService } from '../../../../telemetry/service'
import {
incrementPreparePacketCount,
incrementFulfillOrRejectPacketCount,
incrementAmount
} from './telemetry'

// Model classes that represent an Interledger sender, receiver, or
// connector SHOULD implement this ConnectorAccount interface.
Expand Down Expand Up @@ -155,6 +160,11 @@ export class Rafiki<T = any> {
): Promise<Buffer> {
const prepare = new ZeroCopyIlpPrepare(rawPrepare)
const response = new IlpResponse()
const telemetry = this.publicServer.context.services.telemetry

if (telemetry) {
incrementPreparePacketCount(unfulfillable, prepare.amount, telemetry)
}

await this.routes(
{
Expand Down Expand Up @@ -183,6 +193,24 @@ export class Rafiki<T = any> {
}
)
if (!response.rawReply) throw new Error('error generating reply')

if (telemetry) {
const { code, scale } = sourceAccount.asset
incrementFulfillOrRejectPacketCount(
unfulfillable,
prepare.amount,
response,
telemetry
)
incrementAmount(
unfulfillable,
prepare.amount,
response,
code,
scale,
telemetry
)
}
return response.rawReply
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { TelemetryService } from '../../../../telemetry/service'
import { IlpResponse } from './middleware/ilp-packet'
import { ValueType } from '@opentelemetry/api'

export function incrementPreparePacketCount(
unfulfillable: boolean,
prepareAmount: string,
telemetry: TelemetryService
): void {
if (!unfulfillable && Number(prepareAmount)) {
telemetry.incrementCounter('packet_count_prepare', 1, {
description: 'Count of prepare packets that are sent'
})
}
}

export function incrementFulfillOrRejectPacketCount(
unfulfillable: boolean,
prepareAmount: string,
response: IlpResponse,
telemetry: TelemetryService
): void {
if (!unfulfillable && Number(prepareAmount)) {
if (response.fulfill) {
telemetry.incrementCounter('packet_count_fulfill', 1, {
description: 'Count of fulfill packets'
})
} else if (response.reject) {
telemetry.incrementCounter('packet_count_reject', 1, {
description: 'Count of reject packets'
})
}
}
}

export async function incrementAmount(
unfulfillable: boolean,
prepareAmount: string,
response: IlpResponse,
code: string,
scale: number,
telemetry: TelemetryService
): Promise<void> {
if (!unfulfillable && Number(prepareAmount)) {
if (response.fulfill) {
const value = BigInt(prepareAmount)
await telemetry.incrementCounterWithTransactionAmount(
'packet_amount_fulfill',
{
value,
assetCode: code,
assetScale: scale
},
{
description: 'Amount sent through the network',
valueType: ValueType.DOUBLE
}
)
}
}
}

This file was deleted.

Loading

0 comments on commit 6e29243

Please sign in to comment.