diff --git a/pkg/repository/prisma/step_run.go b/pkg/repository/prisma/step_run.go index 26e408e3d..7ea9a9ef9 100644 --- a/pkg/repository/prisma/step_run.go +++ b/pkg/repository/prisma/step_run.go @@ -2544,38 +2544,6 @@ func (s *stepRunEngineRepository) StepRunSucceeded(ctx context.Context, tenantId ctx, span := telemetry.NewSpan(ctx, "step-run-started-db") defer span.End() - finished := string(dbsqlc.StepRunStatusSUCCEEDED) - - data := &updateStepRunQueueData{ - Hash: hashToBucket(sqlchelpers.UUIDFromStr(workflowRunId), s.maxHashFactor), - StepRunId: stepRunId, - TenantId: tenantId, - FinishedAt: &finishedAt, - Status: &finished, - Output: output, - } - - // we write to the buffer first so we don't get race conditions when we resolve workflow run statuses - done, err := s.bulkStatusBuffer.BuffItem(tenantId, data) - - if err != nil { - return fmt.Errorf("could not buffer step run succeeded: %w", err) - } - - var response *buffer.FlushResponse[pgtype.UUID] - - select { - case response = <-done: - case <-ctx.Done(): - return ctx.Err() - case <-time.After(20 * time.Second): - return fmt.Errorf("timeout waiting for step run succeeded to be flushed to db") - } - - if response.Err != nil { - return fmt.Errorf("could not flush step run succeeded: %w", response.Err) - } - tx, err := s.pool.Begin(ctx) if err != nil { @@ -2609,6 +2577,39 @@ func (s *stepRunEngineRepository) StepRunSucceeded(ctx context.Context, tenantId return fmt.Errorf("could not commit transaction: %w", err) } + finished := string(dbsqlc.StepRunStatusSUCCEEDED) + + data := &updateStepRunQueueData{ + Hash: hashToBucket(sqlchelpers.UUIDFromStr(workflowRunId), s.maxHashFactor), + StepRunId: stepRunId, + TenantId: tenantId, + FinishedAt: &finishedAt, + Status: &finished, + Output: output, + } + + // we write to the buffer after updating the job run lookup data so we don't start a step run (which has multiple + // parents) before the job run lookup data is updated + done, err := s.bulkStatusBuffer.BuffItem(tenantId, data) + + if err != nil { + return fmt.Errorf("could not buffer step run succeeded: %w", err) + } + + var response *buffer.FlushResponse[pgtype.UUID] + + select { + case response = <-done: + case <-ctx.Done(): + return ctx.Err() + case <-time.After(20 * time.Second): + return fmt.Errorf("timeout waiting for step run succeeded to be flushed to db") + } + + if response.Err != nil { + return fmt.Errorf("could not flush step run succeeded: %w", response.Err) + } + return nil }