Skip to content

Commit

Permalink
TMP: Migrating to collecting metrics on sender side
Browse files Browse the repository at this point in the history
  • Loading branch information
JoblersTune committed Aug 6, 2024
1 parent 717b939 commit 9333411
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 46 deletions.
14 changes: 9 additions & 5 deletions packages/backend/src/accounting/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@ export enum LedgerTransferState {
VOIDED = 'VOIDED'
}

export interface LiquidityAccountAsset {
id: string
code?: string
scale?: number
ledger: number
onDebit?: (options: OnDebitOptions) => Promise<LiquidityAccount>
}

export interface LiquidityAccount {
id: string
asset: {
id: string
ledger: number
onDebit?: (options: OnDebitOptions) => Promise<LiquidityAccount>
}
asset: LiquidityAccountAsset
onCredit?: (options: OnCreditOptions) => Promise<LiquidityAccount>
onDebit?: (options: OnDebitOptions) => Promise<LiquidityAccount>
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ export function createClientController({
expiresAt
)

response.rawReply = await send(axios, outgoing, outgoingPrepare)
response.rawReply = await send(axios, outgoing, outgoingPrepare) // This is the sender's response from the receiver, but we don't collect here in case an error is thrown which will be caught by error handling middleware
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ export function createStreamController(): ILPMiddleware {
return
}

const moneyOrReply = streamServer.createReply(request.prepare)
const moneyOrReply = streamServer.createReply(request.prepare) // Obviosly this creates a reply
if (isIlpReply(moneyOrReply)) {
response.reply = moneyOrReply
response.reply = moneyOrReply // The receiver sets their reply here
return
}

Expand All @@ -44,7 +44,7 @@ export function createStreamController(): ILPMiddleware {
if (query) {
const [[err, totalReceived], [err2]] = query
if (typeof totalReceived === 'string' && !err && !err2) {
moneyOrReply.setTotalReceived(totalReceived)
moneyOrReply.setTotalReceived(totalReceived) // the amount received is incremented on the reciever
ctx.revertTotalReceived = () =>
redis.decrby(
connectionKey,
Expand All @@ -61,7 +61,7 @@ export function createStreamController(): ILPMiddleware {
'failed to increment stream totalReceived'
)
}
response.reply = moneyOrReply.accept()
response.reply = moneyOrReply.accept() // The receiver sets their reply here when they have incremeneted the total received, i.e. a fulfill response
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
import { Readable } from 'stream'
import { HttpContext, HttpMiddleware, ILPMiddleware } from '../rafiki'
import getRawBody from 'raw-body'
import { ValueType } from '@opentelemetry/api'
// import { ValueType } from '@opentelemetry/api'

export const CONTENT_TYPE = 'application/octet-stream'

Expand Down Expand Up @@ -234,35 +234,6 @@ export function createIlpPacketMiddleware(

ctx.assert(!ctx.body, 500, 'response body already set')
ctx.assert(response.rawReply, 500, 'ilp reply not set')
ctx.body = response.rawReply

if (ctx.services.telemetry && Number(prepare.amount)) {
ctx.services.telemetry.incrementCounter('packet_count_prepare', 1, {
description: 'Count of incoming prepare packets'
})
if (response.fulfill) {
const { code, scale } = ctx.state.incomingAccount.asset
const value = BigInt(prepare.amount)
await ctx.services.telemetry.incrementCounterWithTransactionAmount(
'packet_amount_fulfill',
{
value,
assetCode: code,
assetScale: scale
},
{
description: 'Amount sent through the network',
valueType: ValueType.DOUBLE
}
)
ctx.services.telemetry.incrementCounter('packet_count_fulfill', 1, {
description: 'Count of outgoing fulfill packets'
})
} else if (response.reject) {
ctx.services.telemetry.incrementCounter('packet_count_reject', 1, {
description: 'Count of outgoing reject packets'
})
}
}
ctx.body = response.rawReply // Receiver sets their reply here
}
}
59 changes: 57 additions & 2 deletions packages/backend/src/payment-method/ilp/connector/core/rafiki.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
createIlpPacketMiddleware
} from './middleware/ilp-packet'
import { TelemetryService } from '../../../../telemetry/service'
import { ValueType } from '@opentelemetry/api'

// Model classes that represent an Interledger sender, receiver, or
// connector SHOULD implement this ConnectorAccount interface.
Expand Down Expand Up @@ -145,16 +146,33 @@ export class Rafiki<T = any> {
}

this.publicServer.use(createTokenAuthMiddleware())
this.publicServer.use(createIlpPacketMiddleware(routes))
this.publicServer.use(createIlpPacketMiddleware(routes)) // seems to only be called when on receiver side
}

// Why this feels like a good fit
// Called on sender side
// It calls the middleware routes so we can check a prepare before the middleware begins and a response after the fact
// Can skip unfulfillable packet counts
// Unless we want to count prepares where they are sent, but I can't find that and wonder if it's done in the '@interledger/pay' library
async handleIlpData(
sourceAccount: IncomingAccount,
unfulfillable: boolean,
rawPrepare: Buffer
): Promise<Buffer> {
const prepare = new ZeroCopyIlpPrepare(rawPrepare)
const response = new IlpResponse()
const telemetry = this.publicServer.context.services.telemetry

if (telemetry && !unfulfillable && Number(prepare.amount)) {
telemetry.incrementCounter(
'packet_count_prepare',
1,
{
description: 'Count of incoming prepare packets'
}
)
console.log('TEST RAFIKI PREPARE PACKT COUNT ADDED')
}

await this.routes(
{
Expand Down Expand Up @@ -182,7 +200,44 @@ export class Rafiki<T = any> {
// unreachable, this is the end of the middleware chain
}
)
if (!response.rawReply) throw new Error('error generating reply')
if (!response.rawReply) throw new Error('error generating reply') // packet loss? Sent a request but got no reply? Maybe we should log that?

if (telemetry && !unfulfillable && Number(prepare.amount)) {
if (response.fulfill) {
const { code, scale } = sourceAccount.asset
const value = BigInt(prepare.amount)
await telemetry.incrementCounterWithTransactionAmount(
'packet_amount_fulfill',
{
value,
assetCode: code,
assetScale: scale
},
{
description: 'Amount sent through the network',
valueType: ValueType.DOUBLE
}
)
console.log('TEST RAFIKI AMNT ADDED', value )
telemetry.incrementCounter(
'packet_count_fulfill',
1,
{
description: 'Count of outgoing fulfill packets'
}
)
console.log('TEST RAFIKI FULFILL PACKT COUNT ADDED')
} else if (response.reject) {
telemetry.incrementCounter(
'packet_count_reject',
1,
{
description: 'Count of outgoing reject packets'
}
)
console.log('TEST RAFIKI REJECT PACKT COUNT ADDED')
}
}
return response.rawReply
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export async function sendToPeer(
account: OutgoingAccount,
prepare: Buffer
): Promise<Buffer> {

const { http } = account
if (!http) {
throw new Errors.UnreachableError('no outgoing endpoint')
Expand Down
8 changes: 5 additions & 3 deletions packages/backend/src/payment-method/ilp/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ async function getQuote(
let ilpQuote: Pay.Quote | undefined
const rateProbeStartTime = Date.now()
try {
ilpQuote = await Pay.startQuote({
console.log('TEST ILP SERVICE initiating quote')
ilpQuote = await Pay.startQuote({ // gets ilp Quote here
...quoteOptions,
slippage: deps.config.slippage,
prices: convertRatesToIlpPrices(rates)
Expand All @@ -104,6 +105,7 @@ async function getQuote(
}
)
}
console.log('TEST ILP SERVICE ends quote')
// Pay.startQuote should return PaymentError.InvalidSourceAmount or
// PaymentError.InvalidDestinationAmount for non-positive amounts.
// Outgoing payments' sendAmount or receiveAmount should never be
Expand Down Expand Up @@ -241,7 +243,7 @@ async function pay(
const destination = receiver.toResolvedPayment()

try {
const receipt = await Pay.pay({ plugin, destination, quote })
const receipt = await Pay.pay({ plugin, destination, quote }) // this is the line that makes an ILP payment

if (receipt.error) {
throw receipt.error
Expand All @@ -256,7 +258,7 @@ async function pay(
receiptAmountSent: receipt.amountSent,
receiptAmountDelivered: receipt.amountDelivered
},
'ILP payment completed'
'ILP payment completed' // does this get called in response to every fulfill?
)
} catch (err) {
const errorMessage = 'Received error during ILP pay'
Expand Down

0 comments on commit 9333411

Please sign in to comment.