From 7a3c06884f4baab35ac5703b76c0520e25f789de Mon Sep 17 00:00:00 2001 From: abelanger5 Date: Thu, 22 Aug 2024 11:21:49 -0400 Subject: [PATCH] fix: don't queue cancelled step runs (#805) * fix: cancelled step runs should not be assigned * check cancellations before planning * remove reassign logic from controller --- .../services/controllers/jobs/controller.go | 50 +---- pkg/repository/prisma/dbsqlc/step_runs.sql | 13 +- pkg/repository/prisma/dbsqlc/step_runs.sql.go | 49 ++++- pkg/repository/prisma/step_run.go | 198 +++++++++--------- 4 files changed, 151 insertions(+), 159 deletions(-) diff --git a/internal/services/controllers/jobs/controller.go b/internal/services/controllers/jobs/controller.go index a23f4b48a..27c74f930 100644 --- a/internal/services/controllers/jobs/controller.go +++ b/internal/services/controllers/jobs/controller.go @@ -748,59 +748,13 @@ func (ec *JobsControllerImpl) runStepRunReassignTenant(ctx context.Context, tena ctx, span := telemetry.NewSpan(ctx, "handle-step-run-reassign") defer span.End() - stepRuns, err := ec.repo.StepRun().ListStepRunsToReassign(ctx, tenantId) + _, err := ec.repo.StepRun().ListStepRunsToReassign(ctx, tenantId) if err != nil { return fmt.Errorf("could not list step runs to reassign for tenant %s: %w", tenantId, err) } - if num := len(stepRuns); num > 0 { - ec.l.Info().Msgf("reassigning %d step runs", num) - } - - return MakeBatched(10, stepRuns, func(group []*dbsqlc.GetStepRunForEngineRow) error { - scheduleCtx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - - scheduleCtx, span := telemetry.NewSpan(scheduleCtx, "handle-step-run-reassign-step-run") - defer span.End() - - for i := range group { - stepRunCp := stepRuns[i] - // wrap in func to get defer on the span to avoid leaking spans - stepRunId := sqlchelpers.UUIDToStr(stepRunCp.SRID) - - // if the current time is after the scheduleTimeoutAt, then mark this as timed out - now := time.Now().UTC().UTC() - scheduleTimeoutAt := stepRunCp.SRScheduleTimeoutAt.Time - - // timed out if there was no scheduleTimeoutAt set and the current time is after the step run created at time plus the default schedule timeout, - // or if the scheduleTimeoutAt is set and the current time is after the scheduleTimeoutAt - isTimedOut := !scheduleTimeoutAt.IsZero() && scheduleTimeoutAt.Before(now) - - if isTimedOut { - return ec.cancelStepRun(scheduleCtx, tenantId, stepRunId, "SCHEDULING_TIMED_OUT") - } - - eventData := map[string]interface{}{ - "worker_id": sqlchelpers.UUIDToStr(stepRunCp.SRWorkerId), - } - - // TODO: batch this query to avoid n+1 issues - err = ec.repo.StepRun().CreateStepRunEvent(scheduleCtx, tenantId, stepRunId, repository.CreateStepRunEventOpts{ - EventReason: repository.StepRunEventReasonPtr(dbsqlc.StepRunEventReasonREASSIGNED), - EventSeverity: repository.StepRunEventSeverityPtr(dbsqlc.StepRunEventSeverityCRITICAL), - EventMessage: repository.StringPtr("Worker has become inactive"), - EventData: eventData, - }) - - if err != nil { - return fmt.Errorf("could not create step run event: %w", err) - } - } - - return nil - }) + return nil } func (jc *JobsControllerImpl) runStepRunTimeout(ctx context.Context) func() { diff --git a/pkg/repository/prisma/dbsqlc/step_runs.sql b/pkg/repository/prisma/dbsqlc/step_runs.sql index f86b9a702..fe576cfee 100644 --- a/pkg/repository/prisma/dbsqlc/step_runs.sql +++ b/pkg/repository/prisma/dbsqlc/step_runs.sql @@ -617,7 +617,7 @@ WITH already_assigned_step_runs AS ( ) AS input WHERE sr."id" = input."id" - RETURNING input."id", input."slotId" + RETURNING input."id", input."slotId", input."workerId" ) UPDATE "WorkerSemaphoreSlot" wss @@ -626,7 +626,16 @@ SET FROM updated_step_runs WHERE wss."id" = updated_step_runs."slotId" -RETURNING wss."id"; +RETURNING updated_step_runs."id"::uuid, updated_step_runs."workerId"::uuid; + +-- name: GetCancelledStepRuns :many +SELECT + "id" +FROM + "StepRun" +WHERE + "id" = ANY(@stepRunIds::uuid[]) + AND "status" != 'PENDING_ASSIGNMENT'; -- name: BulkMarkStepRunsAsCancelling :many UPDATE diff --git a/pkg/repository/prisma/dbsqlc/step_runs.sql.go b/pkg/repository/prisma/dbsqlc/step_runs.sql.go index 9370289f1..7f4c4dbe8 100644 --- a/pkg/repository/prisma/dbsqlc/step_runs.sql.go +++ b/pkg/repository/prisma/dbsqlc/step_runs.sql.go @@ -151,7 +151,7 @@ WITH already_assigned_step_runs AS ( ) AS input WHERE sr."id" = input."id" - RETURNING input."id", input."slotId" + RETURNING input."id", input."slotId", input."workerId" ) UPDATE "WorkerSemaphoreSlot" wss @@ -160,7 +160,7 @@ SET FROM updated_step_runs WHERE wss."id" = updated_step_runs."slotId" -RETURNING wss."id" +RETURNING updated_step_runs."id"::uuid, updated_step_runs."workerId"::uuid ` type BulkAssignStepRunsToWorkersParams struct { @@ -170,7 +170,12 @@ type BulkAssignStepRunsToWorkersParams struct { Workerids []pgtype.UUID `json:"workerids"` } -func (q *Queries) BulkAssignStepRunsToWorkers(ctx context.Context, db DBTX, arg BulkAssignStepRunsToWorkersParams) ([]pgtype.UUID, error) { +type BulkAssignStepRunsToWorkersRow struct { + UpdatedStepRunsID pgtype.UUID `json:"updated_step_runs_id"` + UpdatedStepRunsWorkerId pgtype.UUID `json:"updated_step_runs_workerId"` +} + +func (q *Queries) BulkAssignStepRunsToWorkers(ctx context.Context, db DBTX, arg BulkAssignStepRunsToWorkersParams) ([]*BulkAssignStepRunsToWorkersRow, error) { rows, err := db.Query(ctx, bulkAssignStepRunsToWorkers, arg.Steprunids, arg.Slotids, @@ -181,13 +186,13 @@ func (q *Queries) BulkAssignStepRunsToWorkers(ctx context.Context, db DBTX, arg return nil, err } defer rows.Close() - var items []pgtype.UUID + var items []*BulkAssignStepRunsToWorkersRow for rows.Next() { - var id pgtype.UUID - if err := rows.Scan(&id); err != nil { + var i BulkAssignStepRunsToWorkersRow + if err := rows.Scan(&i.UpdatedStepRunsID, &i.UpdatedStepRunsWorkerId); err != nil { return nil, err } - items = append(items, id) + items = append(items, &i) } if err := rows.Err(); err != nil { return nil, err @@ -509,6 +514,36 @@ func (q *Queries) CreateStepRunEvent(ctx context.Context, db DBTX, arg CreateSte return err } +const getCancelledStepRuns = `-- name: GetCancelledStepRuns :many +SELECT + "id" +FROM + "StepRun" +WHERE + "id" = ANY($1::uuid[]) + AND "status" != 'PENDING_ASSIGNMENT' +` + +func (q *Queries) GetCancelledStepRuns(ctx context.Context, db DBTX, steprunids []pgtype.UUID) ([]pgtype.UUID, error) { + rows, err := db.Query(ctx, getCancelledStepRuns, steprunids) + if err != nil { + return nil, err + } + defer rows.Close() + var items []pgtype.UUID + for rows.Next() { + var id pgtype.UUID + if err := rows.Scan(&id); err != nil { + return nil, err + } + items = append(items, id) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const getDesiredLabels = `-- name: GetDesiredLabels :many SELECT "key", diff --git a/pkg/repository/prisma/step_run.go b/pkg/repository/prisma/step_run.go index 81444d2b3..ccd0ccad4 100644 --- a/pkg/repository/prisma/step_run.go +++ b/pkg/repository/prisma/step_run.go @@ -426,23 +426,6 @@ var deadlockRetry = func(l *zerolog.Logger, f func() error) error { }, 50*time.Millisecond, 200*time.Millisecond) } -var unassignedRetry = func(l *zerolog.Logger, f func() error) error { - return genericRetry(l.Debug(), 5, f, "unassigned", func(err error) (bool, error) { - var target *errNoWorkerWithSlots - - if errors.As(err, &target) { - // if there are no slots available at all, don't retry - if target.totalSlots != 0 { - return true, err - } - - return false, repository.ErrNoWorkerAvailable - } - - return errors.Is(err, repository.ErrNoWorkerAvailable), err - }, 50*time.Millisecond, 100*time.Millisecond) -} - var genericRetry = func(l *zerolog.Event, maxRetries int, f func() error, msg string, condition func(err error) (bool, error), minSleep, maxSleep time.Duration) error { retries := 0 @@ -559,50 +542,6 @@ func (s *stepRunEngineRepository) ReleaseStepRunSemaphore(ctx context.Context, t }) } -func (s *stepRunEngineRepository) releaseWorkerSemaphore(ctx context.Context, stepRun *dbsqlc.GetStepRunForEngineRow) error { - return deadlockRetry(s.l, func() error { - tx, err := s.pool.Begin(ctx) - - if err != nil { - return err - } - - defer deferRollback(ctx, s.l, tx.Rollback) - - _, err = s.queries.ReleaseWorkerSemaphoreSlot(ctx, tx, dbsqlc.ReleaseWorkerSemaphoreSlotParams{ - Steprunid: stepRun.SRID, - Tenantid: stepRun.SRTenantId, - }) - - if err != nil && !errors.Is(err, pgx.ErrNoRows) { - return fmt.Errorf("could not release previous worker semaphore: %w", err) - } - - // this means that a worker is assigned: unlink the existing worker from the step run, - // so that we don't re-increment the old worker semaphore on each retry - if err == nil { - _, err = s.queries.UnlinkStepRunFromWorker(ctx, tx, dbsqlc.UnlinkStepRunFromWorkerParams{ - Steprunid: stepRun.SRID, - Tenantid: stepRun.SRTenantId, - }) - - if err != nil { - return fmt.Errorf("could not unlink step run from worker: %w", err) - } - } - - return tx.Commit(ctx) - }) -} - -type errNoWorkerWithSlots struct { - totalSlots int -} - -func (e *errNoWorkerWithSlots) Error() string { - return fmt.Sprintf("no worker available, slots left: %d", e.totalSlots) -} - func (s *stepRunEngineRepository) DeferredStepRunEvent( stepRunId pgtype.UUID, reason dbsqlc.StepRunEventReason, @@ -859,20 +798,6 @@ func UniqueSet[T any](i []T, keyFunc func(T) string) map[string]struct{} { return set } -type debugInfo struct { - UniqueActions []string `json:"unique_actions"` - TotalStepRuns int `json:"total_step_runs"` - TotalStepRunsAssigned int `json:"total_step_runs_assigned"` - TotalSlots int `json:"total_slots"` - StartingSlotsPerAction map[string]int `json:"starting_slots"` - EndingSlotsPerAction map[string]int `json:"ending_slots"` - Duration string `json:"duration"` - TimeToStartDuration string `json:"time_to_start_duration"` - TimeToListQueuesDuration string `json:"time_to_list_queues_duration"` - TimeToListQueueItemsDuration string `json:"time_to_list_queue_items_duration"` - DurationsOfQueueListResults []string `json:"durations_of_queue_list_results"` -} - func (s *stepRunEngineRepository) QueueStepRuns(ctx context.Context, tenantId string) (repository.QueueStepRunsResult, error) { startedAt := time.Now().UTC() @@ -991,8 +916,14 @@ func (s *stepRunEngineRepository) QueueStepRuns(ctx context.Context, tenantId st } var duplicates []*scheduling.QueueItemWithOrder + var cancelled []*scheduling.QueueItemWithOrder queueItems, duplicates = removeDuplicates(queueItems) + queueItems, cancelled, err = s.removeCancelledStepRuns(ctx, tx, queueItems) + + if err != nil { + return emptyRes, fmt.Errorf("could not remove cancelled step runs: %w", err) + } // sort the queue items by Order from least to greatest, then by queue id sort.Slice(queueItems, func(i, j int) bool { @@ -1202,6 +1133,11 @@ func (s *stepRunEngineRepository) QueueStepRuns(ctx context.Context, tenantId st popItems = append(popItems, item.QueueItem.ID) } + // we'd like to remove cancelled step runs from the queue items as well + for _, item := range cancelled { + popItems = append(popItems, item.QueueItem.ID) + } + err = s.queries.BulkQueueItems(ctx, tx, popItems) if err != nil { @@ -1227,33 +1163,6 @@ func (s *stepRunEngineRepository) QueueStepRuns(ctx context.Context, tenantId st return emptyRes, fmt.Errorf("could not commit transaction: %w", err) } - // // print debug information - // endingSlotsPerAction := make(map[string]int) - // for action, slots := range actionsToSlots { - // endingSlotsPerAction[action] = len(slots) - // } - - // defer func() { - // // pretty-print json with 2 spaces - // debugInfo := debugInfo{ - // UniqueActions: uniqueActionsArr, - // TotalStepRuns: len(queueItems), - // TotalStepRunsAssigned: len(stepRunIds), - // TotalSlots: len(slots), - // StartingSlotsPerAction: startingSlotsPerAction, - // EndingSlotsPerAction: endingSlotsPerAction, - // } - - // debugInfoBytes, err := json.MarshalIndent(debugInfo, "", " ") - - // if err != nil { - // s.l.Warn().Err(err).Msg("could not marshal debug info") - // return - // } - - // s.l.Warn().Msg(string(debugInfoBytes)) - // }() - // update the cache with the min queued id for name, qiId := range plan.MinQueuedIds { s.cachedMinQueuedIds.Store(getCacheName(tenantId, name), qiId) @@ -1270,6 +1179,8 @@ func (s *stepRunEngineRepository) QueueStepRuns(ctx context.Context, tenantId st timedOutStepRunsStr[i] = sqlchelpers.UUIDToStr(id) } + defer printQueueDebugInfo(s.l, queues, queueItems, duplicates, cancelled, plan, slots, startedAt) + return repository.QueueStepRunsResult{ Queued: plan.QueuedStepRuns, SchedulingTimedOut: timedOutStepRunsStr, @@ -2127,6 +2038,41 @@ func getCacheName(tenantId, queue string) string { return fmt.Sprintf("%s:%s", tenantId, queue) } +func (s *stepRunEngineRepository) removeCancelledStepRuns(ctx context.Context, tx pgx.Tx, qis []*scheduling.QueueItemWithOrder) ([]*scheduling.QueueItemWithOrder, []*scheduling.QueueItemWithOrder, error) { + currStepRunIds := make([]pgtype.UUID, len(qis)) + + for i, qi := range qis { + currStepRunIds[i] = qi.StepRunId + } + + cancelledStepRuns, err := s.queries.GetCancelledStepRuns(ctx, tx, currStepRunIds) + + if err != nil { + return nil, nil, err + } + + cancelledStepRunsMap := make(map[string]bool, len(cancelledStepRuns)) + + for _, sr := range cancelledStepRuns { + cancelledStepRunsMap[sqlchelpers.UUIDToStr(sr)] = true + } + + // remove cancelled step runs from the queue items + remaining := make([]*scheduling.QueueItemWithOrder, 0, len(qis)) + cancelled := make([]*scheduling.QueueItemWithOrder, 0, len(qis)) + + for _, qi := range qis { + if _, ok := cancelledStepRunsMap[sqlchelpers.UUIDToStr(qi.StepRunId)]; ok { + cancelled = append(cancelled, qi) + continue + } + + remaining = append(remaining, qi) + } + + return remaining, cancelled, nil +} + // removes duplicates from a slice of queue items by step run id func removeDuplicates(qis []*scheduling.QueueItemWithOrder) ([]*scheduling.QueueItemWithOrder, []*scheduling.QueueItemWithOrder) { encountered := map[string]bool{} @@ -2146,3 +2092,51 @@ func removeDuplicates(qis []*scheduling.QueueItemWithOrder) ([]*scheduling.Queue return result, duplicates } + +type debugInfo struct { + NumQueues int `json:"num_queues"` + TotalStepRuns int `json:"total_step_runs"` + TotalStepRunsAssigned int `json:"total_step_runs_assigned"` + TotalSlots int `json:"total_slots"` + NumDuplicates int `json:"num_duplicates"` + NumCancelled int `json:"num_cancelled"` + TotalDuration string `json:"total_duration"` +} + +func printQueueDebugInfo( + l *zerolog.Logger, + queues []*dbsqlc.Queue, + queueItems []*scheduling.QueueItemWithOrder, + duplicates []*scheduling.QueueItemWithOrder, + cancelled []*scheduling.QueueItemWithOrder, + plan scheduling.SchedulePlan, + slots []*dbsqlc.ListSemaphoreSlotsToAssignRow, + startedAt time.Time, +) { + duration := time.Since(startedAt) + + // pretty-print json with 2 spaces + debugInfo := debugInfo{ + NumQueues: len(queues), + TotalStepRuns: len(queueItems), + TotalStepRunsAssigned: len(plan.StepRunIds), + TotalSlots: len(slots), + NumDuplicates: len(duplicates), + NumCancelled: len(cancelled), + TotalDuration: duration.String(), + } + + debugInfoBytes, err := json.MarshalIndent(debugInfo, "", " ") + + if err != nil { + l.Warn().Err(err).Msg("could not marshal debug info") + return + } + + // if the duration is greater than 100 milliseconds, log the debug info + if duration > 100*time.Millisecond { + l.Warn().Msgf("queue duration was greater than 100ms: %s", string(debugInfoBytes)) + } else { + l.Debug().Msgf("queue debug information: %s", string(debugInfoBytes)) + } +}