Skip to content

Commit

Permalink
fix: rewrite queries for checking child workflows (#983)
Browse files Browse the repository at this point in the history
* rewrite queries for child workflows

* add index

* fix: remove tenant id where it's not needed
  • Loading branch information
abelanger5 authored Oct 23, 2024
1 parent dd5bc90 commit 718d8f5
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 121 deletions.
27 changes: 8 additions & 19 deletions pkg/repository/prisma/dbsqlc/step_runs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,7 @@ WITH RECURSIVE currStepRun AS (
SELECT "id", "status", "cancelledReason"
FROM "StepRun"
WHERE
"id" = @stepRunId::uuid AND
"tenantId" = @tenantId::uuid
"id" = @stepRunId::uuid
), childStepRuns AS (
SELECT sr."id", sr."status"
FROM "StepRun" sr
Expand Down Expand Up @@ -537,8 +536,7 @@ SET "status" = CASE
FROM
childStepRuns csr
WHERE
sr."id" = csr."id" AND
sr."tenantId" = @tenantId::uuid
sr."id" = csr."id"
RETURNING sr.*;

-- name: UpdateStepRunOverridesData :one
Expand Down Expand Up @@ -1240,8 +1238,7 @@ WITH RECURSIVE currStepRun AS (
SELECT *
FROM "StepRun"
WHERE
"id" = @stepRunId::uuid AND
"tenantId" = @tenantId::uuid
"id" = @stepRunId::uuid
), childStepRuns AS (
SELECT sr."id", sr."status"
FROM "StepRun" sr
Expand All @@ -1260,17 +1257,14 @@ SELECT
FROM
"StepRun" sr
JOIN
childStepRuns csr ON sr."id" = csr."id"
WHERE
sr."tenantId" = @tenantId::uuid;
childStepRuns csr ON sr."id" = csr."id";

-- name: ReplayStepRunResetStepRuns :many
WITH RECURSIVE currStepRun AS (
SELECT *
FROM "StepRun"
WHERE
"id" = @stepRunId::uuid AND
"tenantId" = @tenantId::uuid
"id" = @stepRunId::uuid
), childStepRuns AS (
SELECT sr."id", sr."status"
FROM "StepRun" sr
Expand Down Expand Up @@ -1303,11 +1297,8 @@ SET
FROM
childStepRuns csr
WHERE
sr."tenantId" = @tenantId::uuid AND
(
sr."id" = csr."id" OR
sr."id" = @stepRunId::uuid
)
sr."id" = csr."id" OR
sr."id" = @stepRunId::uuid
RETURNING sr.*;

-- name: ResetStepRunsByIds :many
Expand All @@ -1334,8 +1325,7 @@ WITH RECURSIVE currStepRun AS (
SELECT *
FROM "StepRun"
WHERE
"id" = @stepRunId::uuid AND
"tenantId" = @tenantId::uuid
"id" = @stepRunId::uuid
), childStepRuns AS (
SELECT sr."id", sr."status"
FROM "StepRun" sr
Expand All @@ -1358,7 +1348,6 @@ FROM
JOIN
childStepRuns csr ON sr."id" = csr."id"
WHERE
sr."tenantId" = @tenantId::uuid AND
sr."deletedAt" IS NULL AND
sr."status" NOT IN ('SUCCEEDED', 'FAILED', 'CANCELLED');

Expand Down
49 changes: 13 additions & 36 deletions pkg/repository/prisma/dbsqlc/step_runs.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 14 additions & 24 deletions pkg/repository/prisma/dbsqlc/workflow_runs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1084,42 +1084,32 @@ WHERE
);

-- name: GetChildWorkflowRunsByIndex :many
WITH input_data AS (
SELECT
UNNEST(@parentIds::uuid[]) AS parentId,
UNNEST(@parentStepRunIds::uuid[]) AS parentStepRunId,
UNNEST(@childIndexes::int[]) AS childIndex
)
SELECT
wr.*
FROM
"WorkflowRun" wr
JOIN
input_data i ON
wr."parentId" = i.parentId AND
wr."parentStepRunId" = i.parentStepRunId AND
wr."childIndex" = i.childIndex
WHERE
wr."deletedAt" IS NULL;
(wr."parentId", wr."parentStepRunId", wr."childIndex") IN (
SELECT
UNNEST(@parentIds::uuid[]),
UNNEST(@parentStepRunIds::uuid[]),
UNNEST(@childIndexes::int[])
)
AND wr."deletedAt" IS NULL;

-- name: GetChildWorkflowRunsByKey :many
WITH input_data AS (
SELECT
UNNEST(@parentIds::uuid[]) AS parentId,
UNNEST(@parentStepRunIds::uuid[]) AS parentStepRunId,
UNNEST(@childKeys::text[]) AS childKey
)
SELECT
wr.*
FROM
"WorkflowRun" wr
JOIN
input_data i ON
wr."parentId" = i.parentId AND
wr."parentStepRunId" = i.parentStepRunId AND
wr."childKey" = i.childKey
WHERE
wr."deletedAt" IS NULL;
(wr."parentId", wr."parentStepRunId", wr."childKey") IN (
SELECT
UNNEST(@parentIds::uuid[]),
UNNEST(@parentStepRunIds::uuid[]),
UNNEST(@childKeys::text[])
)
AND wr."deletedAt" IS NULL;

-- name: GetScheduledChildWorkflowRun :one
SELECT
Expand Down
38 changes: 14 additions & 24 deletions pkg/repository/prisma/dbsqlc/workflow_runs.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 4 additions & 17 deletions pkg/repository/prisma/step_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2487,10 +2487,7 @@ func (s *stepRunEngineRepository) StepRunCancelled(ctx context.Context, tenantId
return fmt.Errorf("could not buffer step run succeeded: %w", err)
}

laterStepRuns, err := s.queries.GetLaterStepRuns(ctx, s.pool, dbsqlc.GetLaterStepRunsParams{
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
Steprunid: sqlchelpers.UUIDFromStr(stepRunId),
})
laterStepRuns, err := s.queries.GetLaterStepRuns(ctx, s.pool, sqlchelpers.UUIDFromStr(stepRunId))

if err != nil && !errors.Is(err, pgx.ErrNoRows) {
return fmt.Errorf("could not get later step runs: %w", err)
Expand Down Expand Up @@ -2549,10 +2546,7 @@ func (s *stepRunEngineRepository) StepRunFailed(ctx context.Context, tenantId, w
return fmt.Errorf("could not buffer step run succeeded: %w", err)
}

laterStepRuns, err := s.queries.GetLaterStepRuns(ctx, s.pool, dbsqlc.GetLaterStepRunsParams{
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
Steprunid: sqlchelpers.UUIDFromStr(stepRunId),
})
laterStepRuns, err := s.queries.GetLaterStepRuns(ctx, s.pool, sqlchelpers.UUIDFromStr(stepRunId))

if err != nil && !errors.Is(err, pgx.ErrNoRows) {
return fmt.Errorf("could not get later step runs: %w", err)
Expand Down Expand Up @@ -2636,10 +2630,7 @@ func (s *stepRunEngineRepository) ReplayStepRun(ctx context.Context, tenantId, s
return nil, err
}

laterStepRuns, err := s.queries.GetLaterStepRuns(ctx, tx, dbsqlc.GetLaterStepRunsParams{
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
Steprunid: sqlchelpers.UUIDFromStr(stepRunId),
})
laterStepRuns, err := s.queries.GetLaterStepRuns(ctx, tx, sqlchelpers.UUIDFromStr(stepRunId))

if err != nil {
return nil, err
Expand Down Expand Up @@ -2686,7 +2677,6 @@ func (s *stepRunEngineRepository) ReplayStepRun(ctx context.Context, tenantId, s

// reset all later step runs to a pending state
_, err = s.queries.ReplayStepRunResetStepRuns(ctx, tx, dbsqlc.ReplayStepRunResetStepRunsParams{
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
Steprunid: sqlchelpers.UUIDFromStr(stepRunId),
Input: input,
})
Expand Down Expand Up @@ -2723,10 +2713,7 @@ func (s *stepRunEngineRepository) PreflightCheckReplayStepRun(ctx context.Contex
}

// verify that child step runs are in a final state
childStepRuns, err := s.queries.ListNonFinalChildStepRuns(ctx, s.pool, dbsqlc.ListNonFinalChildStepRunsParams{
Steprunid: sqlchelpers.UUIDFromStr(stepRunId),
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
})
childStepRuns, err := s.queries.ListNonFinalChildStepRuns(ctx, s.pool, sqlchelpers.UUIDFromStr(stepRunId))

if err != nil && !errors.Is(err, pgx.ErrNoRows) {
return fmt.Errorf("could not list non-final child step runs: %w", err)
Expand Down
5 changes: 5 additions & 0 deletions sql/constraints.sql
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,8 @@ ON "WorkflowTriggers" ("workflowVersionId");
-- Additional indexes on WorkflowTriggerEventRef
CREATE INDEX idx_workflow_trigger_event_ref_event_key_parent_id
ON "WorkflowTriggerEventRef" ("eventKey", "parentId");

-- Additional indexes on WorkflowRun
CREATE INDEX CONCURRENTLY IF NOT EXISTS "WorkflowRun_parentId_parentStepRunId_childIndex_key"
ON "WorkflowRun"("parentId", "parentStepRunId", "childIndex")
WHERE "deletedAt" IS NULL;
Loading

0 comments on commit 718d8f5

Please sign in to comment.