Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added packet count telemetry metrics #2797

Merged
merged 11 commits into from
Aug 26, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@
"uid": "PBFA97CFB590B2093"
},
"editorMode": "code",
"expr": "(increase(transactions_amount_total[30s]) * 0.5 * 0.0001)",
"expr": "(increase(packet_amount_fulfill_total[30s]) * 0.5 * 0.0001)",
"hide": false,
"instant": false,
"interval": "",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1161,6 +1161,79 @@ describe('OutgoingPaymentService', (): void => {
return payment
}

test('Telemetry Transaction Counter increments for COMPLETED transactions', async (): Promise<void> => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for adding this

const incrementTrxCounterSpy = jest
.spyOn(telemetryService!, 'incrementCounter')
.mockImplementation(() => Promise.resolve())

incomingPayment = await createIncomingPayment(deps, {
walletAddressId: receiverWalletAddress.id,
incomingAmount: {
value: receiveAmount.value,
assetCode: receiverWalletAddress.asset.code,
assetScale: receiverWalletAddress.asset.scale
}
})
assert.ok(incomingPayment.walletAddress)

const createdPayment = await setup({
receiver: incomingPayment.getUrl(incomingPayment.walletAddress),
receiveAmount,
method: 'ilp'
})

mockPaymentHandlerPay(createdPayment, incomingPayment)
const payment = await processNext(
createdPayment.id,
OutgoingPaymentState.Completed
)
await expectOutcome(payment, {
accountBalance: 0n,
amountSent: payment.debitAmount.value,
amountDelivered: payment.receiveAmount.value,
incomingPaymentReceived: payment.receiveAmount.value,
withdrawAmount: 0n
})

expect(incrementTrxCounterSpy).toHaveBeenCalledTimes(1)
JoblersTune marked this conversation as resolved.
Show resolved Hide resolved
})

test('Telemetry Transaction Counter does not increments for FAILED transactions', async (): Promise<void> => {
const incrementTrxCounterSpy = jest
.spyOn(telemetryService!, 'incrementCounter')
.mockImplementation(() => Promise.resolve())
const createdPayment = await setup({
receiver,
debitAmount,
method: 'ilp'
})

mockPaymentHandlerPay(
createdPayment,
incomingPayment,
new PaymentMethodHandlerError('Failed payment', {
description: '',
retryable: false
})
)

const payment = await processNext(
createdPayment.id,
OutgoingPaymentState.Failed,
'Failed payment'
)

await expectOutcome(payment, {
accountBalance: payment.debitAmount.value,
amountSent: 0n,
amountDelivered: 0n,
incomingPaymentReceived: incomingPayment.receivedAmount.value,
withdrawAmount: 0n
})

expect(incrementTrxCounterSpy).not.toHaveBeenCalled()
})

test('COMPLETED with Telemetry Fee Counter (receiveAmount < incomingPayment.incomingAmount)', async (): Promise<void> => {
const spyTelFeeAmount = jest.spyOn(
telemetryService,
Expand Down
40 changes: 40 additions & 0 deletions packages/backend/src/payment-method/ilp/connector/core/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,43 @@
# `core`

> Toolkit comprised of core services and middleware needed to develop ILP applications.

## Telemetry Collection

Currently, we collect packet count and packet amount metrics directly within the connector core. These metrics are captured at the Interledger layer to track packet activity while ensuring privacy by collecting amounts at the packet level, rather than at the transaction level. This approach helps to preserve privacy, as we do not expose entire transaction amounts, while also incorporating privacy-preserving measures into the collected amounts. You can read more about privacy [here](https://rafiki.dev/telemetry/privacy/).

### Why We Collect on the Sending Side

The first decision in collecting this data was whether to do so on the sender's side or the receiver's side. We opted for the sender’s side to maintain consistency across our metrics, particularly for calculating average transaction amounts and times, which are tied to the outgoing payment flow in the Open Payments (OP) layer. By collecting metrics from the same perspective, we ensure alignment with how other metrics (like transaction completion times) are captured.

We also considered collecting metrics on both the sending and receiving sides, capturing metrics when prepare packets were received by the receiver and when fulfill or reject responses were received by the sender. However, this could lead to unreliable metrics, as telemetry is optional and might not be enabled on all nodes.

### Why We Chose handleIlpData

Given these considerations, we decided to place our packet count and amount metrics in the `handleIlpData` function within the Rafiki Connector Core. This function plays a crucial role in processing ILP packets, handling both outgoing payments and quotes.

The specific metrics we collect here include:

- packet_count_prepare: Counts the prepare packets sent, collected before the middleware routes are executed.
- packet_count_fulfill: Counts the fulfill packets received, collected after receiving a reply from the receiver.
- packet_count_reject: Counts the reject packets received.
- packet_amount_fulfill: Records the amount sent in fulfill packets.

These metrics provide valuable insights, including potential packet loss.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about this, and this is not exactly packet loss like it is defined in the networking sense (since we only count them in the originating connector) , but something like rate of success/error instead


### Challenges with the Current Setup

While `handleIlpData` is an effective location for telemetry collection, it has some limitations:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer not having function names in documentation since it becomes outdated quite quickly, I think we can just remove references to it and keep just the general explanation which you have

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I've included function names is try and avoid any duplicated work when the next person needs to update this or change the metric location. Is there somewhere better for me to use function names and get really specific outside of the README?


Sender-Side Limitation: Currently, metrics are only collected on the sender side. This is adequate for now but does not capture data from connecting nodes, which we plan to address in future implementations when multi-hop support is added.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to add a short explanation for the connector nodes: eg in A > B > C, we only collect packet information in A


### Other Considered Locations for Metrics Collection

We explored several alternative locations for collecting telemetry metrics within the Rafiki Connector Core:

- Dedicated Middleware: Initially, we implemented a dedicated telemetry middleware. However, this approach resulted in data being duplicated on both the sender and receiver sides, leading to inaccurate metrics. To address this, we would need to filter the data to ensure metrics are only collected on the sender side. Additionally, the telemetry middleware would need to effectively handle errors thrown in the middleware chain. To achieve this, it might be necessary to place the telemetry middleware right before the error-handling middleware, allowing it to catch and reflect any errors that occur. This would involve collecting the prepare packet count, wrapping the `next()` function in a try-catch block to capture any new errors, and then collecting reply and amount metrics after `next()` has resolved.
- `ilpHandler` on the Receiving Side: We also considered adding telemetry to the `ilpHandler` function, which processes middleware on receiving and connecting nodes. However, this would lead to a fragmented metric collection, with some metrics gathered on the receiver side and others on the sender side. This fragmentation could complicate the handling of concepts like transaction count, which would require dual collection on both sender and receiver sides. We'd also have to watch for the possibility of data duplication on the connectors because their middleware might trigger in each direction, as they receive prepares and again as they receive responses.

### Moving Forward

As we implement multi-hop capabilities and further refine the connector architecture, we will likely identify better entry and exit points for telemetry collection. This will allow us to capture a more comprehensive set of metrics across both sender and receiver nodes, ensuring a complete and accurate understanding of the network’s behavior.

This file was deleted.

28 changes: 28 additions & 0 deletions packages/backend/src/payment-method/ilp/connector/core/rafiki.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import {
createIlpPacketMiddleware
} from './middleware/ilp-packet'
import { TelemetryService } from '../../../../telemetry/service'
import {
incrementPreparePacketCount,
incrementFulfillOrRejectPacketCount,
incrementAmount
} from './telemetry'

// Model classes that represent an Interledger sender, receiver, or
// connector SHOULD implement this ConnectorAccount interface.
Expand Down Expand Up @@ -155,6 +160,11 @@ export class Rafiki<T = any> {
): Promise<Buffer> {
const prepare = new ZeroCopyIlpPrepare(rawPrepare)
const response = new IlpResponse()
const telemetry = this.publicServer.context.services.telemetry

if (telemetry) {
incrementPreparePacketCount(unfulfillable, prepare.amount, telemetry)
}

await this.routes(
{
Expand Down Expand Up @@ -183,6 +193,24 @@ export class Rafiki<T = any> {
}
)
if (!response.rawReply) throw new Error('error generating reply')

if (telemetry) {
const { code, scale } = sourceAccount.asset
incrementFulfillOrRejectPacketCount(
unfulfillable,
prepare.amount,
response,
telemetry
)
incrementAmount(
JoblersTune marked this conversation as resolved.
Show resolved Hide resolved
unfulfillable,
prepare.amount,
response,
code,
scale,
telemetry
)
}
return response.rawReply
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { TelemetryService } from '../../../../telemetry/service'
import { IlpResponse } from './middleware/ilp-packet'
import { ValueType } from '@opentelemetry/api'

export function incrementPreparePacketCount(
unfulfillable: boolean,
prepareAmount: string,
JoblersTune marked this conversation as resolved.
Show resolved Hide resolved
telemetry: TelemetryService
): void {
if (!unfulfillable && Number(prepareAmount)) {
telemetry.incrementCounter('packet_count_prepare', 1, {
description: 'Count of prepare packets that are sent'
})
}
}

export function incrementFulfillOrRejectPacketCount(
unfulfillable: boolean,
prepareAmount: string,
JoblersTune marked this conversation as resolved.
Show resolved Hide resolved
response: IlpResponse,
telemetry: TelemetryService
): void {
if (!unfulfillable && Number(prepareAmount)) {
if (response.fulfill) {
telemetry.incrementCounter('packet_count_fulfill', 1, {
description: 'Count of fulfill packets'
})
} else if (response.reject) {
telemetry.incrementCounter('packet_count_reject', 1, {
description: 'Count of reject packets'
})
}
}
}

export async function incrementAmount(
unfulfillable: boolean,
prepareAmount: string,
response: IlpResponse,
code: string,
scale: number,
telemetry: TelemetryService
): Promise<void> {
if (!unfulfillable && Number(prepareAmount)) {
JoblersTune marked this conversation as resolved.
Show resolved Hide resolved
if (response.fulfill) {
const value = BigInt(prepareAmount)
await telemetry.incrementCounterWithTransactionAmount(
'packet_amount_fulfill',
{
value,
assetCode: code,
assetScale: scale
},
{
description: 'Amount sent through the network',
valueType: ValueType.DOUBLE
}
)
}
}
}

This file was deleted.

Loading
Loading