Skip to content

Commit

Permalink
fix: don't queue cancelled step runs (#805)
Browse files Browse the repository at this point in the history
* fix: cancelled step runs should not be assigned

* check cancellations before planning

* remove reassign logic from controller
  • Loading branch information
abelanger5 authored Aug 22, 2024
1 parent e40d77d commit 7a3c068
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 159 deletions.
50 changes: 2 additions & 48 deletions internal/services/controllers/jobs/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,59 +748,13 @@ func (ec *JobsControllerImpl) runStepRunReassignTenant(ctx context.Context, tena
ctx, span := telemetry.NewSpan(ctx, "handle-step-run-reassign")
defer span.End()

stepRuns, err := ec.repo.StepRun().ListStepRunsToReassign(ctx, tenantId)
_, err := ec.repo.StepRun().ListStepRunsToReassign(ctx, tenantId)

if err != nil {
return fmt.Errorf("could not list step runs to reassign for tenant %s: %w", tenantId, err)
}

if num := len(stepRuns); num > 0 {
ec.l.Info().Msgf("reassigning %d step runs", num)
}

return MakeBatched(10, stepRuns, func(group []*dbsqlc.GetStepRunForEngineRow) error {
scheduleCtx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

scheduleCtx, span := telemetry.NewSpan(scheduleCtx, "handle-step-run-reassign-step-run")
defer span.End()

for i := range group {
stepRunCp := stepRuns[i]
// wrap in func to get defer on the span to avoid leaking spans
stepRunId := sqlchelpers.UUIDToStr(stepRunCp.SRID)

// if the current time is after the scheduleTimeoutAt, then mark this as timed out
now := time.Now().UTC().UTC()
scheduleTimeoutAt := stepRunCp.SRScheduleTimeoutAt.Time

// timed out if there was no scheduleTimeoutAt set and the current time is after the step run created at time plus the default schedule timeout,
// or if the scheduleTimeoutAt is set and the current time is after the scheduleTimeoutAt
isTimedOut := !scheduleTimeoutAt.IsZero() && scheduleTimeoutAt.Before(now)

if isTimedOut {
return ec.cancelStepRun(scheduleCtx, tenantId, stepRunId, "SCHEDULING_TIMED_OUT")
}

eventData := map[string]interface{}{
"worker_id": sqlchelpers.UUIDToStr(stepRunCp.SRWorkerId),
}

// TODO: batch this query to avoid n+1 issues
err = ec.repo.StepRun().CreateStepRunEvent(scheduleCtx, tenantId, stepRunId, repository.CreateStepRunEventOpts{
EventReason: repository.StepRunEventReasonPtr(dbsqlc.StepRunEventReasonREASSIGNED),
EventSeverity: repository.StepRunEventSeverityPtr(dbsqlc.StepRunEventSeverityCRITICAL),
EventMessage: repository.StringPtr("Worker has become inactive"),
EventData: eventData,
})

if err != nil {
return fmt.Errorf("could not create step run event: %w", err)
}
}

return nil
})
return nil
}

func (jc *JobsControllerImpl) runStepRunTimeout(ctx context.Context) func() {
Expand Down
13 changes: 11 additions & 2 deletions pkg/repository/prisma/dbsqlc/step_runs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ WITH already_assigned_step_runs AS (
) AS input
WHERE
sr."id" = input."id"
RETURNING input."id", input."slotId"
RETURNING input."id", input."slotId", input."workerId"
)
UPDATE
"WorkerSemaphoreSlot" wss
Expand All @@ -626,7 +626,16 @@ SET
FROM updated_step_runs
WHERE
wss."id" = updated_step_runs."slotId"
RETURNING wss."id";
RETURNING updated_step_runs."id"::uuid, updated_step_runs."workerId"::uuid;

-- name: GetCancelledStepRuns :many
SELECT
"id"
FROM
"StepRun"
WHERE
"id" = ANY(@stepRunIds::uuid[])
AND "status" != 'PENDING_ASSIGNMENT';

-- name: BulkMarkStepRunsAsCancelling :many
UPDATE
Expand Down
49 changes: 42 additions & 7 deletions pkg/repository/prisma/dbsqlc/step_runs.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 7a3c068

Please sign in to comment.