From 5e7e1271cd11e08eba9d9f2f910add0a84a3d086 Mon Sep 17 00:00:00 2001 From: Alexander Belanger Date: Thu, 26 Sep 2024 21:40:48 -0400 Subject: [PATCH] add timeout and context to queue metrics --- .../server/handlers/tenants/get_queue_metrics.go | 2 +- pkg/repository/prisma/tenant.go | 14 +++++++++----- pkg/repository/tenant.go | 2 +- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/api/v1/server/handlers/tenants/get_queue_metrics.go b/api/v1/server/handlers/tenants/get_queue_metrics.go index f5bafab92..0d2092f25 100644 --- a/api/v1/server/handlers/tenants/get_queue_metrics.go +++ b/api/v1/server/handlers/tenants/get_queue_metrics.go @@ -38,7 +38,7 @@ func (t *TenantService) TenantGetQueueMetrics(ctx echo.Context, request gen.Tena opts.WorkflowIds = *request.Params.Workflows } - metrics, err := t.config.APIRepository.Tenant().GetQueueMetrics(tenant.ID, &opts) + metrics, err := t.config.APIRepository.Tenant().GetQueueMetrics(ctx.Request().Context(), tenant.ID, &opts) if err != nil { return nil, err diff --git a/pkg/repository/prisma/tenant.go b/pkg/repository/prisma/tenant.go index e34e8ab62..902357474 100644 --- a/pkg/repository/prisma/tenant.go +++ b/pkg/repository/prisma/tenant.go @@ -197,7 +197,7 @@ func (r *tenantAPIRepository) DeleteTenantMember(memberId string) (*db.TenantMem ).Delete().Exec(context.Background()) } -func (r *tenantAPIRepository) GetQueueMetrics(tenantId string, opts *repository.GetQueueMetricsOpts) (*repository.GetQueueMetricsResponse, error) { +func (r *tenantAPIRepository) GetQueueMetrics(ctx context.Context, tenantId string, opts *repository.GetQueueMetricsOpts) (*repository.GetQueueMetricsResponse, error) { if err := r.v.Validate(opts); err != nil { return nil, err } @@ -231,28 +231,32 @@ func (r *tenantAPIRepository) GetQueueMetrics(tenantId string, opts *repository. totalParams.WorkflowIds = uuids } - tx, err := r.pool.Begin(context.Background()) + tx, commit, rollback, err := prepareTx(ctx, r.pool, r.l, 60*1000) if err != nil { return nil, err } - defer deferRollback(context.Background(), r.l, tx.Rollback) + defer rollback() // get the totals - total, err := r.queries.GetTenantTotalQueueMetrics(context.Background(), tx, totalParams) + total, err := r.queries.GetTenantTotalQueueMetrics(ctx, tx, totalParams) if err != nil { return nil, err } // get the workflow metrics - workflowMetrics, err := r.queries.GetTenantWorkflowQueueMetrics(context.Background(), tx, workflowParams) + workflowMetrics, err := r.queries.GetTenantWorkflowQueueMetrics(ctx, tx, workflowParams) if err != nil { return nil, err } + if err := commit(ctx); err != nil { + return nil, err + } + workflowMetricsMap := make(map[string]repository.QueueMetric) for _, metric := range workflowMetrics { diff --git a/pkg/repository/tenant.go b/pkg/repository/tenant.go index 631091a9f..7f51bf523 100644 --- a/pkg/repository/tenant.go +++ b/pkg/repository/tenant.go @@ -98,7 +98,7 @@ type TenantAPIRepository interface { DeleteTenantMember(memberId string) (*db.TenantMemberModel, error) // GetQueueMetrics returns the queue metrics for the given tenant - GetQueueMetrics(tenantId string, opts *GetQueueMetricsOpts) (*GetQueueMetricsResponse, error) + GetQueueMetrics(ctx context.Context, tenantId string, opts *GetQueueMetricsOpts) (*GetQueueMetricsResponse, error) } type TenantEngineRepository interface {