Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backend): add tracing, metrics, get peer tweak #2910

Merged
merged 1 commit into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
430 changes: 308 additions & 122 deletions localenv/telemetry/grafana/provisioning/dashboards/example.json

Large diffs are not rendered by default.

17 changes: 16 additions & 1 deletion packages/backend/src/accounting/psql/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,28 @@ export async function getAccountTotalSent(
deps: ServiceDependencies,
accountRef: string
): Promise<bigint | undefined> {
deps.telemetry &&
deps.telemetry.startTimer('psql_getAccountTotalSent', {
callName: 'psql_getAccountTotalSent'
})
const account = await getLiquidityAccount(deps, accountRef)

if (!account) {
deps.telemetry &&
deps.telemetry.startTimer('psql_getAccountTotalSent', {
callName: 'psql_getAccountTotalSent'
})
return
}

return (await getAccountBalances(deps, account)).debitsPosted
const totalsSent = (await getAccountBalances(deps, account)).debitsPosted

deps.telemetry &&
deps.telemetry.startTimer('psql_getAccountTotalSent', {
callName: 'psql_getAccountTotalSent'
})

return totalsSent
}

export async function getAccountsTotalSent(
Expand Down
8 changes: 8 additions & 0 deletions packages/backend/src/accounting/tigerbeetle/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {
getAccountTransfers
} from './transfers'
import { toTigerBeetleId } from './utils'
import { TelemetryService } from '../../telemetry/service'

export enum TigerBeetleAccountCode {
LIQUIDITY_WEB_MONETIZATION = 1,
Expand Down Expand Up @@ -69,6 +70,7 @@ export const convertToTigerBeetleTransferCode: {
export interface ServiceDependencies extends BaseService {
tigerBeetle: Client
withdrawalThrottleDelay?: number
telemetry?: TelemetryService
}

export function createAccountingService(
Expand Down Expand Up @@ -218,10 +220,16 @@ export async function getAccountTotalSent(
deps: ServiceDependencies,
id: string
): Promise<bigint | undefined> {
deps.telemetry &&
deps.telemetry.startTimer('tb_getAccountTotalSent', {
callName: 'tb_getAccountTotalSent'
})
const account = (await getAccounts(deps, [id]))[0]
if (account) {
deps.telemetry && deps.telemetry.stopTimer('tb_getAccountTotalSent')
return account.debits_posted
}
deps.telemetry && deps.telemetry.stopTimer('tb_getAccountTotalSent')
}

export async function getAccountsTotalSent(
Expand Down
11 changes: 6 additions & 5 deletions packages/backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ export function initIocContainer(
const logger = await deps.use('logger')
const knex = await deps.use('knex')
const config = await deps.use('config')
const telemetry = await deps.use('telemetry')

if (config.useTigerBeetle) {
container.singleton('tigerBeetle', async (deps) => {
Expand All @@ -228,17 +229,16 @@ export function initIocContainer(
logger,
knex,
tigerBeetle,
withdrawalThrottleDelay: config.withdrawalThrottleDelay
withdrawalThrottleDelay: config.withdrawalThrottleDelay,
telemetry
})
}

return createPsqlAccountingService({
logger,
knex,
withdrawalThrottleDelay: config.withdrawalThrottleDelay,
telemetry: config.enableTelemetry
? await deps.use('telemetry')
: undefined
telemetry
})
})
container.singleton('peerService', async (deps) => {
Expand Down Expand Up @@ -346,7 +346,8 @@ export function initIocContainer(
walletAddressService: await deps.use('walletAddressService'),
remoteIncomingPaymentService: await deps.use(
'remoteIncomingPaymentService'
)
),
telemetry: deps.use('telemetry') ? await deps.use('telemetry') : undefined
})
})

Expand Down
37 changes: 25 additions & 12 deletions packages/backend/src/open_payments/payment/outgoing/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,24 +76,24 @@ export async function handleSending(
throw LifecycleError.BadState
}

const payStartTime = Date.now()
await deps.paymentMethodHandlerService.pay('ILP', {
receiver,
outgoingPayment: payment,
finalDebitAmount: maxDebitAmount,
finalReceiveAmount: maxReceiveAmount
})
const payEndTime = Date.now()
deps.telemetry &&
deps.telemetry.startTimer('ilp_pay_time_ms', {
description: 'Time to complete an ILP payment',
callName: 'paymentMethodHandlerService.pay (ILP)'
}),
await deps.paymentMethodHandlerService.pay('ILP', {
receiver,
outgoingPayment: payment,
finalDebitAmount: maxDebitAmount,
finalReceiveAmount: maxReceiveAmount
})

if (deps.telemetry) {
const payDuration = payEndTime - payStartTime
await Promise.all([
deps.telemetry.incrementCounter('transactions_total', 1, {
description: 'Count of funded transactions'
}),
deps.telemetry.recordHistogram('ilp_pay_time_ms', payDuration, {
description: 'Time to complete an ILP payment'
}),
deps.telemetry.stopTimer('ilp_pay_time_ms'),
deps.telemetry.incrementCounterWithTransactionAmountDifference(
'transaction_fee_amounts',
payment.sentAmount,
Expand Down Expand Up @@ -144,17 +144,24 @@ export async function handleFailed(
payment: OutgoingPayment,
error: string
): Promise<void> {
deps.telemetry &&
deps.telemetry.startTimer('handleFailed', { callName: 'handleFailed ' })
await payment.$query(deps.knex).patch({
state: OutgoingPaymentState.Failed,
error
})
await sendWebhookEvent(deps, payment, OutgoingPaymentEventType.PaymentFailed)
deps.telemetry && deps.telemetry.startTimer('handleFailed')
}

async function handleCompleted(
deps: ServiceDependencies,
payment: OutgoingPayment
): Promise<void> {
deps.telemetry &&
deps.telemetry.startTimer('handleCompleted', {
callName: 'handleCompleted'
})
await payment.$query(deps.knex).patch({
state: OutgoingPaymentState.Completed
})
Expand All @@ -164,6 +171,7 @@ async function handleCompleted(
payment,
OutgoingPaymentEventType.PaymentCompleted
)
deps.telemetry && deps.telemetry.stopTimer('handleCompleted')
}

export async function sendWebhookEvent(
Expand All @@ -172,6 +180,10 @@ export async function sendWebhookEvent(
type: OutgoingPaymentEventType,
trx?: TransactionOrKnex
): Promise<void> {
deps.telemetry &&
deps.telemetry.startTimer('sendWebhookEvent', {
callName: 'outgoingPaymentLifecycle_sendwebhookEvent'
})
// TigerBeetle accounts are only created as the OutgoingPayment is funded.
// So default the amountSent and balance to 0 for outgoing payments still in the funding state
const amountSent =
Expand Down Expand Up @@ -201,6 +213,7 @@ export async function sendWebhookEvent(
data: payment.toData({ amountSent, balance }),
withdrawal
})
deps.telemetry && deps.telemetry.stopTimer('sendWebhookEvent')
}

function validateAssets(
Expand Down
13 changes: 11 additions & 2 deletions packages/backend/src/open_payments/receiver/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
errorToMessage as receiverErrorToMessage
} from './errors'
import { isRemoteIncomingPaymentError } from '../payment/incoming_remote/errors'
import { TelemetryService } from '../../telemetry/service'

interface CreateReceiverArgs {
walletAddressUrl: string
Expand All @@ -33,6 +34,7 @@ export interface ServiceDependencies extends BaseService {
incomingPaymentService: IncomingPaymentService
walletAddressService: WalletAddressService
remoteIncomingPaymentService: RemoteIncomingPaymentService
telemetry?: TelemetryService
}

const INCOMING_PAYMENT_URL_REGEX =
Expand Down Expand Up @@ -136,21 +138,28 @@ async function getReceiver(
deps: ServiceDependencies,
url: string
): Promise<Receiver | undefined> {
deps.telemetry &&
deps.telemetry.startTimer('getReceiver', { callName: 'getReceiver' })
try {
const localIncomingPayment = await getLocalIncomingPayment(deps, url)
if (localIncomingPayment) {
return new Receiver(localIncomingPayment, true)
const receiver = new Receiver(localIncomingPayment, true)
deps.telemetry && deps.telemetry.stopTimer('getReceiver')
return receiver
}

const remoteIncomingPayment = await getRemoteIncomingPayment(deps, url)
if (remoteIncomingPayment) {
return new Receiver(remoteIncomingPayment, false)
const receiver = new Receiver(remoteIncomingPayment, false)
deps.telemetry && deps.telemetry.stopTimer('getReceiver')
return receiver
}
} catch (err) {
deps.logger.error(
{ errorMessage: err instanceof Error && err.message },
'Could not get incoming payment'
)
deps.telemetry && deps.telemetry.stopTimer('getReceiver')
}
}

Expand Down
109 changes: 82 additions & 27 deletions packages/backend/src/payment-method/ilp/connector/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,49 +68,104 @@ export async function createConnectorService({
streamServer,
telemetry
},
compose([
// Incoming Rules
createIncomingErrorHandlerMiddleware(ilpAddress),
createStreamAddressMiddleware(),
createAccountMiddleware(ilpAddress),
createIncomingMaxPacketAmountMiddleware(),
createIncomingRateLimitMiddleware({}),
createIncomingThroughputMiddleware(),
createIldcpMiddleware(ilpAddress),
compose(
[
// Incoming Rules
{
name: 'createIncomingErrorHandlerMiddleware',
fn: createIncomingErrorHandlerMiddleware(ilpAddress)
},
{
name: 'createStreamAddressMiddleware',
fn: createStreamAddressMiddleware()
},
{
name: 'createAccountMiddleware',
fn: createAccountMiddleware(ilpAddress)
},
{
name: 'createIncomingMaxPacketAmountMiddleware',
fn: createIncomingMaxPacketAmountMiddleware()
},
{
name: 'createIncomingRateLimitMiddleware',
fn: createIncomingRateLimitMiddleware({})
},
{
name: 'createIncomingThroughputMiddleware',
fn: createIncomingThroughputMiddleware()
},
{
name: 'createIldcpMiddleware',
fn: createIldcpMiddleware(ilpAddress)
},

// Local pay
createBalanceMiddleware(),
// Local pay
{ name: 'createBalanceMiddleware', fn: createBalanceMiddleware() },

// Outgoing Rules
createStreamController(),
createOutgoingThroughputMiddleware(),
createOutgoingReduceExpiryMiddleware({}),
createOutgoingExpireMiddleware(),
createOutgoingValidateFulfillmentMiddleware(),
// Outgoing Rules
{ name: 'createStreamController', fn: createStreamController() },
{
name: 'createOutgoingThroughputMiddleware',
fn: createOutgoingThroughputMiddleware()
},
{
name: 'createOutgoingReduceExpiryMiddleware',
fn: createOutgoingReduceExpiryMiddleware({})
},
{
name: 'createOutgoingExpireMiddleware',
fn: createOutgoingExpireMiddleware()
},
{
name: 'createOutgoingValidateFulfillmentMiddleware',
fn: createOutgoingValidateFulfillmentMiddleware()
},

// Send outgoing packets
createClientController()
])
// Send outgoing packets
{ name: 'createClientController', fn: createClientController() }
],
telemetry
)
)
}

// Adapted from koa-compose
function compose(middlewares: ILPMiddleware[]): ILPMiddleware {
function compose(
middlewares: { name: string; fn: ILPMiddleware }[],
telemetry: TelemetryService | undefined
): ILPMiddleware {
return function (ctx: ILPContext, next: () => Promise<void>): Promise<void> {
// last called middleware
let index = -1
return (function dispatch(i: number): Promise<void> {

telemetry &&
telemetry.startTimer('connector_middleware_stack', {
callName: 'connector_middleware_stack'
})

async function dispatch(i: number): Promise<void> {
if (i <= index)
return Promise.reject(new Error('next() called multiple times'))
index = i
let fn = middlewares[i]
if (i === middlewares.length) fn = next
if (!fn) return Promise.resolve()
let m = middlewares[i]
if (i === middlewares.length) m.fn = next
if (!m.fn) return Promise.resolve()
try {
return Promise.resolve(fn(ctx, dispatch.bind(null, i + 1)))
telemetry &&
telemetry.startTimer('connector_middleware', {
callName: m.name
})
const p = Promise.resolve(m.fn(ctx, dispatch.bind(null, i + 1)))
telemetry && telemetry.stopTimer('connector_middleware')
return p
} catch (err) {
return Promise.reject(err)
}
})(0)
}

return dispatch(0).finally(() => {
telemetry && telemetry.stopTimer('connector_middleware_stack')
})
}
}
2 changes: 1 addition & 1 deletion packages/backend/src/payment-method/ilp/peer/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ async function getPeerByDestinationAddress(
// for `staticIlpAddress`s in the accounts table:
// new RegExp('^' + staticIlpAddress + '($|\\.)')).test(destinationAddress)
const peerQuery = Peer.query(deps.knex)
.withGraphJoined('asset')
.withGraphFetched('asset')
.where(
raw('?', [destinationAddress]),
'like',
Expand Down
Loading
Loading