Skip to content

Commit

Permalink
fix: cancelling status on frontend (#779)
Browse files Browse the repository at this point in the history
* fix: cancelling status on frontend

* fix: slot grid and dedupe on slots
  • Loading branch information
abelanger5 authored Aug 13, 2024
1 parent b091952 commit dd50515
Show file tree
Hide file tree
Showing 9 changed files with 243 additions and 136 deletions.
1 change: 1 addition & 0 deletions api-contracts/openapi/components/schemas/workflow_run.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ StepRunStatus:
- SUCCEEDED
- FAILED
- CANCELLED
- CANCELLING

JobRunStatus:
type: string
Expand Down
223 changes: 112 additions & 111 deletions api/v1/server/oas/gen/openapi.gen.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions frontend/app/src/lib/api/generated/data-contracts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,7 @@ export enum StepRunStatus {
SUCCEEDED = 'SUCCEEDED',
FAILED = 'FAILED',
CANCELLED = 'CANCELLED',
CANCELLING = 'CANCELLING',
}

export interface JobRun {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const statusColors = {
RUNNING: 'bg-green-500',
SUCCEEDED: 'bg-green-700',
FAILED: 'bg-red-500',
CANCELLING: 'bg-yellow-500',
CANCELLED: 'bg-gray-500',
UNDEFINED: 'bg-gray-300', // Default color for undefined status
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ const RUN_STATUS_VARIANTS: Record<RunStatusType, RunStatusVariant> = {
text: 'Cancelled',
variant: 'failed',
},
CANCELLING: {
text: 'Cancelling',
variant: 'inProgress',
},
RUNNING: {
text: 'Running',
variant: 'inProgress',
Expand Down
1 change: 1 addition & 0 deletions pkg/client/rest/gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 43 additions & 11 deletions pkg/repository/prisma/dbsqlc/step_runs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,30 @@ WHERE
FOR UPDATE SKIP LOCKED;

-- name: BulkAssignStepRunsToWorkers :many
WITH updated_step_runs AS (
WITH already_assigned_step_runs AS (
SELECT
input."id",
wss."id" AS "slotId"
FROM
(
SELECT
unnest(@stepRunIds::uuid[]) AS "id"
) AS input
JOIN
"WorkerSemaphoreSlot" wss ON input."id" = wss."stepRunId"
), already_assigned_slots AS (
SELECT
wss."id"
FROM
(
SELECT
unnest(@slotIds::uuid[]) AS "id"
) AS input
JOIN
"WorkerSemaphoreSlot" wss ON input."id" = wss."id"
WHERE
wss."stepRunId" IS NOT NULL
), updated_step_runs AS (
UPDATE
"StepRun" sr
SET
Expand All @@ -567,24 +590,33 @@ WITH updated_step_runs AS (
"timeoutAt" = CURRENT_TIMESTAMP + convert_duration_to_interval(input."stepTimeout")
FROM (
SELECT
unnest(@stepRunIds::uuid[]) AS "id",
unnest(@stepRunTimeouts::text[]) AS "stepTimeout",
unnest(@workerIds::uuid[]) AS "workerId"
"id",
"stepTimeout",
"workerId",
"slotId"
FROM
(
SELECT
unnest(@stepRunIds::uuid[]) AS "id",
unnest(@stepRunTimeouts::text[]) AS "stepTimeout",
unnest(@workerIds::uuid[]) AS "workerId",
unnest(@slotIds::uuid[]) AS "slotId"
) AS subquery
WHERE
"id" NOT IN (SELECT "id" FROM already_assigned_step_runs)
AND "slotId" NOT IN (SELECT "id" FROM already_assigned_slots)
) AS input
WHERE
sr."id" = input."id"
RETURNING input."id", input."slotId"
)
UPDATE
"WorkerSemaphoreSlot" wss
SET
"stepRunId" = input."stepRunId"
FROM (
SELECT
unnest(@slotIds::uuid[]) AS "id",
unnest(@stepRunIds::uuid[]) AS "stepRunId"
) AS input
"stepRunId" = updated_step_runs."id"
FROM updated_step_runs
WHERE
wss."id" = input."id"
wss."id" = updated_step_runs."slotId"
RETURNING wss."id";

-- name: BulkMarkStepRunsAsCancelling :many
Expand Down
58 changes: 45 additions & 13 deletions pkg/repository/prisma/dbsqlc/step_runs.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 35 additions & 1 deletion pkg/repository/prisma/step_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -965,6 +965,10 @@ func (s *stepRunEngineRepository) QueueStepRuns(ctx context.Context, tenantId st
return emptyRes, nil
}

var duplicates []*scheduling.QueueItemWithOrder

queueItems, duplicates = removeDuplicates(queueItems)

// sort the queue items by Order from least to greatest, then by queue id
sort.Slice(queueItems, func(i, j int) bool {
// sort by priority, then by order, then by id
Expand Down Expand Up @@ -1163,7 +1167,17 @@ func (s *stepRunEngineRepository) QueueStepRuns(ctx context.Context, tenantId st
return emptyRes, fmt.Errorf("could not bulk assign step runs to workers: %w", err)
}

err = s.queries.BulkQueueItems(ctx, tx, plan.QueuedItems)
popItems := plan.QueuedItems

// we'd like to remove duplicates from the queue items as well
for _, item := range duplicates {
// print a warning for duplicates
s.l.Warn().Msgf("duplicate queue item: %d for step run %s", item.QueueItem.ID, sqlchelpers.UUIDToStr(item.QueueItem.StepRunId))

popItems = append(popItems, item.QueueItem.ID)
}

err = s.queries.BulkQueueItems(ctx, tx, popItems)

if err != nil {
return emptyRes, fmt.Errorf("could not bulk queue items: %w", err)
Expand Down Expand Up @@ -2087,3 +2101,23 @@ func (s *stepRunEngineRepository) ClearStepRunPayloadData(ctx context.Context, t
func getCacheName(tenantId, queue string) string {
return fmt.Sprintf("%s:%s", tenantId, queue)
}

// removes duplicates from a slice of queue items by step run id
func removeDuplicates(qis []*scheduling.QueueItemWithOrder) ([]*scheduling.QueueItemWithOrder, []*scheduling.QueueItemWithOrder) {
encountered := map[string]bool{}
result := []*scheduling.QueueItemWithOrder{}
duplicates := []*scheduling.QueueItemWithOrder{}

for _, v := range qis {
stepRunId := sqlchelpers.UUIDToStr(v.StepRunId)
if encountered[stepRunId] {
duplicates = append(duplicates, v)
continue
}

encountered[stepRunId] = true
result = append(result, v)
}

return result, duplicates
}

0 comments on commit dd50515

Please sign in to comment.