Skip to content

Commit

Permalink
fix: make retries with exp backoff atomic, and fix issues related to …
Browse files Browse the repository at this point in the history
…cancelling states (#1132)

* fix: exp backoff retries and cancelling states

* fix flaky concurrency test
  • Loading branch information
abelanger5 authored Dec 18, 2024
1 parent 4e81305 commit 23dc410
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 136 deletions.
10 changes: 9 additions & 1 deletion examples/concurrency/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,17 @@ func run(events chan<- string) (func() error, error) {
err = ctx.WorkflowInput(input)

// we sleep to simulate a long running task
time.Sleep(30 * time.Second)
time.Sleep(10 * time.Second)

if err != nil {

return nil, err
}

if ctx.Err() != nil {
return nil, ctx.Err()
}

log.Printf("step-one")
events <- "step-one"

Expand All @@ -97,6 +101,10 @@ func run(events chan<- string) (func() error, error) {
return nil, err
}

if ctx.Err() != nil {
return nil, ctx.Err()
}

log.Printf("step-two")
events <- "step-two"

Expand Down
37 changes: 2 additions & 35 deletions internal/services/controllers/jobs/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,43 +400,10 @@ func (q *queue) processStepRunRetries(ctx context.Context, tenantId string) (boo
ctx, span := telemetry.NewSpan(ctx, "handle-step-run-timeout")
defer span.End()

shouldContinue, stepRuns, err := q.repo.StepRun().ListRetryableStepRuns(ctx, tenantId)
shouldContinue, err := q.repo.StepRun().RetryStepRuns(ctx, tenantId)

if err != nil {
return false, fmt.Errorf("could not list step runs to retry for tenant %s: %w", tenantId, err)
}

if num := len(stepRuns); num > 0 {
q.l.Info().Msgf("retrying %d step runs", num)
}

err = queueutils.BatchConcurrent(10, stepRuns, func(group []*dbsqlc.GetStepRunForEngineRow) error {
scheduleCtx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

scheduleCtx, span := telemetry.NewSpan(scheduleCtx, "handle-step-run-retry-step-run")
defer span.End()

for i := range group {
stepRunCp := group[i]
stepRunCp.SRRetryCount++

if err := q.mq.AddMessage(
scheduleCtx,
msgqueue.JOB_PROCESSING_QUEUE,
tasktypes.StepRunQueuedToTask(
stepRunCp,
),
); err != nil {
q.l.Error().Err(err).Msg("could not add step run retry task to job controller queue")
}
}

return nil
})

if err != nil {
return false, fmt.Errorf("could not process step run retries: %w", err)
return false, fmt.Errorf("could not retry step runs for tenant %s: %w", tenantId, err)
}

return shouldContinue, nil
Expand Down
67 changes: 57 additions & 10 deletions pkg/repository/prisma/dbsqlc/queue.sql
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ WHERE
AND w."isActive" = true
AND w."isPaused" = false;

-- name: ListStepRunsToRetry :many
-- name: RetryStepRuns :one
WITH retries AS (
SELECT
*
Expand All @@ -577,16 +577,63 @@ WITH retries AS (
retries
WHERE
rqi."stepRunId" = retries."stepRunId"
), srs AS (
SELECT
sr."id",
sr."tenantId",
sr."scheduleTimeoutAt",
sr."retryCount",
sr."internalRetryCount",
s."actionId",
s."id" AS "stepId",
s."timeout" AS "stepTimeout",
s."scheduleTimeout" AS "scheduleTimeout"
FROM
retries
JOIN
"StepRun" sr ON retries."stepRunId" = sr."id"
JOIN
"Step" s ON sr."stepId" = s."id"
WHERE
sr."status" NOT IN ('SUCCEEDED', 'FAILED', 'CANCELLED')
), updated_step_runs AS (
UPDATE "StepRun" sr
SET
"status" = 'PENDING_ASSIGNMENT',
"scheduleTimeoutAt" = CURRENT_TIMESTAMP + COALESCE(convert_duration_to_interval(srs."scheduleTimeout"), INTERVAL '5 minutes'),
"updatedAt" = CURRENT_TIMESTAMP,
"retryCount" = srs."retryCount" + 1
FROM srs
WHERE sr."id" = srs."id"
RETURNING sr."id"
), inserted_sqs AS (
INSERT INTO "QueueItem" (
"stepRunId",
"stepId",
"actionId",
"scheduleTimeoutAt",
"stepTimeout",
"priority",
"isQueued",
"tenantId",
"queue"
)
SELECT
srs."id",
srs."stepId",
srs."actionId",
CURRENT_TIMESTAMP + COALESCE(convert_duration_to_interval(srs."scheduleTimeout"), INTERVAL '5 minutes'),
srs."stepTimeout",
-- Queue with priority 4 so that retry gets highest priority
4,
true,
srs."tenantId",
srs."actionId"
FROM
srs
RETURNING "stepRunId"
)
SELECT
retries.*
FROM
retries
JOIN
"StepRun" sr ON retries."stepRunId" = sr."id"
WHERE
-- we remove any step runs in a finalized state from the retry queue
sr."status" NOT IN ('SUCCEEDED', 'FAILED', 'CANCELLED', 'CANCELLING');
SELECT COUNT(*) FROM retries;

-- name: CreateRetryQueueItem :exec
INSERT INTO
Expand Down
160 changes: 90 additions & 70 deletions pkg/repository/prisma/dbsqlc/queue.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/repository/prisma/dbsqlc/step_runs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ WHERE
"JobRun"."deletedAt" IS NULL AND
"StepRun"."tenantId" = @tenantId::uuid AND
"StepRun"."jobRunId" = @jobRunId::uuid AND
"StepRun"."status" = ANY(ARRAY['PENDING', 'PENDING_ASSIGNMENT', 'ASSIGNED', 'RUNNING']::"StepRunStatus"[]);
"StepRun"."status" = ANY(ARRAY['PENDING', 'PENDING_ASSIGNMENT', 'ASSIGNED', 'RUNNING', 'CANCELLING']::"StepRunStatus"[]);

-- name: QueueStepRun :exec
UPDATE
Expand Down
2 changes: 1 addition & 1 deletion pkg/repository/prisma/dbsqlc/step_runs.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 5 additions & 17 deletions pkg/repository/prisma/step_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -1407,29 +1407,17 @@ func (s *stepRunEngineRepository) StepRunRetryBackoff(ctx context.Context, tenan
})
}

func (s *stepRunEngineRepository) ListRetryableStepRuns(ctx context.Context, tenantId string) (bool, []*dbsqlc.GetStepRunForEngineRow, error) {
ctx, span := telemetry.NewSpan(ctx, "list-retryable-step-runs-db")
func (s *stepRunEngineRepository) RetryStepRuns(ctx context.Context, tenantId string) (bool, error) {
ctx, span := telemetry.NewSpan(ctx, "retry-step-runs-db")
defer span.End()

rows, err := s.queries.ListStepRunsToRetry(ctx, s.pool, sqlchelpers.UUIDFromStr(tenantId))
count, err := s.queries.RetryStepRuns(ctx, s.pool, sqlchelpers.UUIDFromStr(tenantId))

if err != nil {
return false, nil, fmt.Errorf("could not list retryable step runs: %w", err)
return false, fmt.Errorf("could not list retryable step runs: %w", err)
}

ids := make([]pgtype.UUID, 0, len(rows))

for _, row := range rows {
ids = append(ids, row.StepRunId)
}

// get step runs for engine
stepRuns, err := s.queries.GetStepRunForEngine(ctx, s.pool, dbsqlc.GetStepRunForEngineParams{
Ids: ids,
TenantId: sqlchelpers.UUIDFromStr(tenantId),
})

return len(stepRuns) == 1000, stepRuns, err
return count == 1000, err
}

func (s *stepRunEngineRepository) ReplayStepRun(ctx context.Context, tenantId, stepRunId string, input []byte) (*dbsqlc.GetStepRunForEngineRow, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/repository/step_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ type StepRunEngineRepository interface {

StepRunRetryBackoff(ctx context.Context, tenantId, stepRunId string, retryAfter time.Time) error

ListRetryableStepRuns(ctx context.Context, tenantId string) (bool, []*dbsqlc.GetStepRunForEngineRow, error)
RetryStepRuns(ctx context.Context, tenantId string) (bool, error)

ReplayStepRun(ctx context.Context, tenantId, stepRunId string, input []byte) (*dbsqlc.GetStepRunForEngineRow, error)

Expand Down

0 comments on commit 23dc410

Please sign in to comment.