Skip to content

Commit

Permalink
refactor: performance and throughput (#756)
Browse files Browse the repository at this point in the history
Refactors the queueing logic to be fairly balanced between actions, with each action backed as a separate FIFO queue. Also adds support for priority queueing and custom queues, though those aren't exposed on the API layer yet. Improves throughput to be > 5000 tasks/second on a single queue. 

---------

Co-authored-by: Alexander Belanger <[email protected]>
  • Loading branch information
grutt and abelanger5 authored Aug 12, 2024
1 parent 2af003e commit 4ea4712
Show file tree
Hide file tree
Showing 114 changed files with 71,671 additions and 53,990 deletions.
4 changes: 4 additions & 0 deletions api-contracts/openapi/components/schemas/_index.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ StepRunArchiveList:
$ref: "./workflow_run.yaml#/StepRunArchiveList"
WorkerList:
$ref: "./worker.yaml#/WorkerList"
SemaphoreSlots:
$ref: "./worker.yaml#/SemaphoreSlots"
RecentStepRuns:
$ref: "./worker.yaml#/RecentStepRuns"
Worker:
$ref: "./worker.yaml#/Worker"
WorkerLabel:
Expand Down
67 changes: 65 additions & 2 deletions api-contracts/openapi/components/schemas/worker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,64 @@ WorkerLabel:
- key
type: object

SemaphoreSlots:
properties:
slot:
type: string
description: The slot name.
format: uuid
stepRunId:
type: string
description: The step run id.
format: uuid
actionId:
type: string
description: The action id.
startedAt:
type: string
description: The time this slot was started.
format: date-time
timeoutAt:
type: string
description: The time this slot will timeout.
format: date-time
workflowRunId:
type: string
description: The workflow run id.
format: uuid
status:
$ref: "./_index.yaml#/StepRunStatus"
required:
- slot

RecentStepRuns:
properties:
metadata:
$ref: "./metadata.yaml#/APIResourceMeta"
actionId:
type: string
description: The action id.
status:
$ref: "./_index.yaml#/StepRunStatus"
startedAt:
type: string
format: date-time
finishedAt:
type: string
format: date-time
cancelledAt:
type: string
format: date-time
workflowRunId:
type: string
format: uuid
required:
- actionId
- metadata
- status
- workflowRunId
type: object

Worker:
properties:
metadata:
Expand All @@ -35,11 +93,16 @@ Worker:
description: The actions this worker can perform.
items:
type: string
slots:
type: array
description: The semaphore slot state for the worker.
items:
$ref: "./_index.yaml#/SemaphoreSlots"
recentStepRuns:
type: array
description: The recent step runs for this worker.
description: The recent step runs for the worker.
items:
$ref: "./_index.yaml#/StepRun"
$ref: "./_index.yaml#/RecentStepRuns"
status:
type: string
description: The status of the worker.
Expand Down
2 changes: 0 additions & 2 deletions api-contracts/openapi/components/schemas/workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ Workflow:
items:
$ref: "#/WorkflowTag"
description: The tags of the workflow.
lastRun:
$ref: "./_index.yaml#/WorkflowRun"
jobs:
type: array
items:
Expand Down
6 changes: 6 additions & 0 deletions api-contracts/openapi/paths/worker/worker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ withWorker:
format: uuid
minLength: 36
maxLength: 36
- description: Filter recent by failed
in: query
name: recentFailed
required: false
schema:
type: boolean
responses:
"200":
content:
Expand Down
7 changes: 6 additions & 1 deletion api/v1/server/handlers/events/list.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package events

import (
"context"
"encoding/json"
"fmt"
"math"
"strings"
"time"

"github.com/labstack/echo/v4"

Expand Down Expand Up @@ -89,7 +91,10 @@ func (t *EventService) EventList(ctx echo.Context, request gen.EventListRequestO
listOpts.AdditionalMetadata = additionalMetadataBytes
}

listRes, err := t.config.APIRepository.Event().ListEvents(tenant.ID, listOpts)
dbCtx, cancel := context.WithTimeout(ctx.Request().Context(), 30*time.Second)
defer cancel()

listRes, err := t.config.APIRepository.Event().ListEvents(dbCtx, tenant.ID, listOpts)

if err != nil {
return nil, err
Expand Down
15 changes: 11 additions & 4 deletions api/v1/server/handlers/workers/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,22 @@ import (
func (t *WorkerService) WorkerGet(ctx echo.Context, request gen.WorkerGetRequestObject) (gen.WorkerGetResponseObject, error) {
worker := ctx.Get("worker").(*db.WorkerModel)

stepRuns, err := t.config.APIRepository.Worker().ListRecentWorkerStepRuns(worker.TenantID, worker.ID)
recentFailFilter := false

if request.Params.RecentFailed != nil {
recentFailFilter = *request.Params.RecentFailed
}

slotState, recent, err := t.config.APIRepository.Worker().ListWorkerState(worker.TenantID, worker.ID, recentFailFilter)

if err != nil {
return nil, err
}

respStepRuns := make([]gen.StepRun, len(stepRuns))
respStepRuns := make([]gen.RecentStepRuns, len(recent))

for i := range stepRuns {
genStepRun, err := transformers.ToStepRun(&stepRuns[i])
for i := range recent {
genStepRun, err := transformers.ToRecentStepRun(recent[i])

if err != nil {
return nil, err
Expand All @@ -32,6 +38,7 @@ func (t *WorkerService) WorkerGet(ctx echo.Context, request gen.WorkerGetRequest
workerResp := *transformers.ToWorker(worker)

workerResp.RecentStepRuns = &respStepRuns
workerResp.Slots = transformers.ToSlotState(slotState)

affinity, err := t.config.APIRepository.Worker().ListWorkerLabels(worker.TenantID, worker.ID)

Expand Down
8 changes: 7 additions & 1 deletion api/v1/server/handlers/workflow-runs/replay_batch.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package workflowruns

import (
"context"
"time"

"github.com/hashicorp/go-multierror"
"github.com/labstack/echo/v4"

Expand Down Expand Up @@ -53,7 +56,10 @@ func (t *WorkflowRunsService) WorkflowRunUpdateReplay(ctx echo.Context, request
return nil, allErrs
}

newWorkflowRuns, err := t.config.APIRepository.WorkflowRun().ListWorkflowRuns(tenant.ID, &repository.ListWorkflowRunsOpts{
dbCtx, cancel := context.WithTimeout(ctx.Request().Context(), 60*time.Second)
defer cancel()

newWorkflowRuns, err := t.config.APIRepository.WorkflowRun().ListWorkflowRuns(dbCtx, tenant.ID, &repository.ListWorkflowRunsOpts{
Ids: workflowRunIds,
Limit: &limit,
})
Expand Down
2 changes: 1 addition & 1 deletion api/v1/server/handlers/workflows/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func (t *WorkflowService) WorkflowGet(ctx echo.Context, request gen.WorkflowGetRequestObject) (gen.WorkflowGetResponseObject, error) {
workflow := ctx.Get("workflow").(*db.WorkflowModel)

resp, err := transformers.ToWorkflow(workflow, nil)
resp, err := transformers.ToWorkflow(workflow)

if err != nil {
return nil, err
Expand Down
6 changes: 1 addition & 5 deletions api/v1/server/handlers/workflows/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,7 @@ func (t *WorkflowService) WorkflowList(ctx echo.Context, request gen.WorkflowLis
rows := make([]gen.Workflow, len(listResp.Rows))

for i := range listResp.Rows {
workflow, err := transformers.ToWorkflow(listResp.Rows[i].WorkflowModel, listResp.Rows[i].LatestRun)

if err != nil {
return nil, err
}
workflow := transformers.ToWorkflowFromSQLC(listResp.Rows[i])

rows[i] = *workflow
}
Expand Down
7 changes: 6 additions & 1 deletion api/v1/server/handlers/workflows/list_runs.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package workflows

import (
"context"
"fmt"
"math"
"strings"
"time"

"github.com/labstack/echo/v4"

Expand Down Expand Up @@ -96,7 +98,10 @@ func (t *WorkflowService) WorkflowRunList(ctx echo.Context, request gen.Workflow
listOpts.AdditionalMetadata = additionalMetadata
}

workflowRuns, err := t.config.APIRepository.WorkflowRun().ListWorkflowRuns(tenant.ID, listOpts)
dbCtx, cancel := context.WithTimeout(ctx.Request().Context(), 30*time.Second)
defer cancel()

workflowRuns, err := t.config.APIRepository.WorkflowRun().ListWorkflowRuns(dbCtx, tenant.ID, listOpts)

if err != nil {
return nil, err
Expand Down
7 changes: 6 additions & 1 deletion api/v1/server/handlers/workflows/metrics_runs.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package workflows

import (
"context"
"fmt"
"strings"
"time"

"github.com/labstack/echo/v4"

Expand Down Expand Up @@ -54,7 +56,10 @@ func (t *WorkflowService) WorkflowRunGetMetrics(ctx echo.Context, request gen.Wo
listOpts.AdditionalMetadata = additionalMetadata
}

workflowRunsMetricsCount, err := t.config.APIRepository.WorkflowRun().WorkflowRunMetricsCount(tenant.ID, listOpts)
dbCtx, cancel := context.WithTimeout(ctx.Request().Context(), 30*time.Second)
defer cancel()

workflowRunsMetricsCount, err := t.config.APIRepository.WorkflowRun().WorkflowRunMetricsCount(dbCtx, tenant.ID, listOpts)

if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 4ea4712

Please sign in to comment.