diff --git a/internal/services/controllers/jobs/controller.go b/internal/services/controllers/jobs/controller.go index c025d9c9d..54c3e1040 100644 --- a/internal/services/controllers/jobs/controller.go +++ b/internal/services/controllers/jobs/controller.go @@ -405,14 +405,20 @@ func (ec *JobsControllerImpl) handleStepRunRetry(ctx context.Context, task *msgq return fmt.Errorf("could not get step run: %w", err) } - inputBytes := stepRun.StepRun.Input - retryCount := int(stepRun.StepRun.RetryCount) + 1 + data, err := ec.repo.StepRun().GetStepRunDataForEngine(ctx, metadata.TenantId, payload.StepRunId) + + if err != nil { + return fmt.Errorf("could not get step run data: %w", err) + } + + inputBytes := data.Input + retryCount := int(stepRun.SRRetryCount) + 1 // update step run _, _, err = ec.repo.StepRun().UpdateStepRun( ctx, metadata.TenantId, - sqlchelpers.UUIDToStr(stepRun.StepRun.ID), + sqlchelpers.UUIDToStr(stepRun.SRID), &repository.UpdateStepRunOpts{ Input: inputBytes, Status: repository.StepRunStatusPtr(db.StepRunStatusPending), @@ -480,8 +486,14 @@ func (ec *JobsControllerImpl) handleStepRunReplay(ctx context.Context, task *msg return fmt.Errorf("could not get step run: %w", err) } + data, err := ec.repo.StepRun().GetStepRunDataForEngine(ctx, metadata.TenantId, payload.StepRunId) + + if err != nil { + return fmt.Errorf("could not get step run data: %w", err) + } + var inputBytes []byte - retryCount := int(stepRun.StepRun.RetryCount) + 1 + retryCount := int(stepRun.SRRetryCount) + 1 // update the input schema for the step run based on the new input if payload.InputData != "" { @@ -491,7 +503,7 @@ func (ec *JobsControllerImpl) handleStepRunReplay(ctx context.Context, task *msg // input data because the user could have deleted fields that are required by the step. // A better solution would be to validate the user input ahead of time. // NOTE: this is an expensive operation. - if currentInput := stepRun.StepRun.Input; len(currentInput) > 0 { + if currentInput := data.Input; len(currentInput) > 0 { inputMap, err := datautils.JSONBytesToMap([]byte(payload.InputData)) if err != nil { @@ -525,14 +537,14 @@ func (ec *JobsControllerImpl) handleStepRunReplay(ctx context.Context, task *msg // if the input data has been manually set, we reset the retry count as this is a user-triggered retry retryCount = 0 } else { - inputBytes = stepRun.StepRun.Input + inputBytes = data.Input } // update step run _, err = ec.repo.StepRun().ReplayStepRun( ctx, metadata.TenantId, - sqlchelpers.UUIDToStr(stepRun.StepRun.ID), + sqlchelpers.UUIDToStr(stepRun.SRID), &repository.UpdateStepRunOpts{ Input: inputBytes, Status: repository.StepRunStatusPtr(db.StepRunStatusPending), @@ -645,10 +657,10 @@ func (ec *JobsControllerImpl) runStepRunRequeueTenant(ctx context.Context, tenan defer span.End() now := time.Now().UTC() - stepRunId := sqlchelpers.UUIDToStr(stepRunCp.StepRun.ID) + stepRunId := sqlchelpers.UUIDToStr(stepRunCp.SRID) // if the current time is after the scheduleTimeoutAt, then mark this as timed out - scheduleTimeoutAt := stepRunCp.StepRun.ScheduleTimeoutAt.Time + scheduleTimeoutAt := stepRunCp.SRScheduleTimeoutAt.Time // timed out if there was no scheduleTimeoutAt set and the current time is after the step run created at time plus the default schedule timeout, // or if the scheduleTimeoutAt is set and the current time is after the scheduleTimeoutAt @@ -730,11 +742,11 @@ func (ec *JobsControllerImpl) runStepRunReassignTenant(ctx context.Context, tena ctx, span := telemetry.NewSpan(ctx, "handle-step-run-reassign-step-run") defer span.End() - stepRunId := sqlchelpers.UUIDToStr(stepRunCp.StepRun.ID) + stepRunId := sqlchelpers.UUIDToStr(stepRunCp.SRID) // if the current time is after the scheduleTimeoutAt, then mark this as timed out now := time.Now().UTC().UTC() - scheduleTimeoutAt := stepRunCp.StepRun.ScheduleTimeoutAt.Time + scheduleTimeoutAt := stepRunCp.SRScheduleTimeoutAt.Time // timed out if there was no scheduleTimeoutAt set and the current time is after the step run created at time plus the default schedule timeout, // or if the scheduleTimeoutAt is set and the current time is after the scheduleTimeoutAt @@ -745,7 +757,7 @@ func (ec *JobsControllerImpl) runStepRunReassignTenant(ctx context.Context, tena } eventData := map[string]interface{}{ - "worker_id": sqlchelpers.UUIDToStr(stepRunCp.StepRun.WorkerId), + "worker_id": sqlchelpers.UUIDToStr(stepRunCp.SRWorkerId), } // TODO: batch this query to avoid n+1 issues @@ -825,7 +837,7 @@ func (ec *JobsControllerImpl) runStepRunTimeoutTenant(ctx context.Context, tenan ctx, span := telemetry.NewSpan(ctx, "handle-step-run-timeout-step-run") defer span.End() - stepRunId := sqlchelpers.UUIDToStr(stepRunCp.StepRun.ID) + stepRunId := sqlchelpers.UUIDToStr(stepRunCp.SRID) return ec.failStepRun(ctx, tenantId, stepRunId, "TIMED_OUT", time.Now().UTC()) }) @@ -851,6 +863,12 @@ func (ec *JobsControllerImpl) queueStepRun(ctx context.Context, tenantId, stepId return ec.a.WrapErr(fmt.Errorf("could not get step run: %w", err), errData) } + data, err := ec.repo.StepRun().GetStepRunDataForEngine(ctx, tenantId, stepRunId) + + if err != nil { + return ec.a.WrapErr(fmt.Errorf("could not get step run data: %w", err), errData) + } + servertel.WithStepRunModel(span, stepRun) requeueAfterTime := time.Now().Add(4 * time.Second).UTC() @@ -860,7 +878,7 @@ func (ec *JobsControllerImpl) queueStepRun(ctx context.Context, tenantId, stepId } // set scheduling timeout - if scheduleTimeoutAt := stepRun.StepRun.ScheduleTimeoutAt.Time; scheduleTimeoutAt.IsZero() { + if scheduleTimeoutAt := stepRun.SRScheduleTimeoutAt.Time; scheduleTimeoutAt.IsZero() { scheduleTimeoutAt = getScheduleTimeout(stepRun) updateStepOpts.ScheduleTimeoutAt = &scheduleTimeoutAt @@ -868,7 +886,7 @@ func (ec *JobsControllerImpl) queueStepRun(ctx context.Context, tenantId, stepId // If the step run input is not set, then we should set it. This will be set upstream if we've rerun // the step run manually with new inputs. It will not be set when the step is automatically queued. - if in := stepRun.StepRun.Input; len(in) == 0 || string(in) == "{}" { + if in := data.Input; len(in) == 0 || string(in) == "{}" { lookupDataBytes := stepRun.JobRunLookupData if lookupDataBytes != nil { @@ -932,7 +950,7 @@ func (ec *JobsControllerImpl) scheduleStepRun(ctx context.Context, tenantId stri ctx, span := telemetry.NewSpan(ctx, "schedule-step-run") defer span.End() - stepRunId := sqlchelpers.UUIDToStr(stepRun.StepRun.ID) + stepRunId := sqlchelpers.UUIDToStr(stepRun.SRID) selectedWorkerId, dispatcherId, err := ec.repo.StepRun().AssignStepRunToWorker(ctx, stepRun) @@ -1066,7 +1084,7 @@ func (ec *JobsControllerImpl) handleStepRunFinished(ctx context.Context, task *m // queue the next step runs jobRunId := sqlchelpers.UUIDToStr(stepRun.JobRunId) - stepRunId := sqlchelpers.UUIDToStr(stepRun.StepRun.ID) + stepRunId := sqlchelpers.UUIDToStr(stepRun.SRID) nextStepRuns, err := ec.repo.StepRun().ListStartableStepRuns(ctx, metadata.TenantId, jobRunId, &stepRunId) @@ -1076,7 +1094,7 @@ func (ec *JobsControllerImpl) handleStepRunFinished(ctx context.Context, task *m for _, nextStepRun := range nextStepRuns { nextStepId := sqlchelpers.UUIDToStr(nextStepRun.StepId) - nextStepRunId := sqlchelpers.UUIDToStr(nextStepRun.StepRun.ID) + nextStepRunId := sqlchelpers.UUIDToStr(nextStepRun.SRID) err = ec.queueStepRun(ctx, metadata.TenantId, nextStepId, nextStepRunId) @@ -1123,7 +1141,7 @@ func (ec *JobsControllerImpl) failStepRun(ctx context.Context, tenantId, stepRun } // determine if step run should be retried or not - shouldRetry := stepRun.StepRun.RetryCount < stepRun.StepRetries + shouldRetry := stepRun.SRRetryCount < stepRun.StepRetries status := db.StepRunStatusFailed @@ -1172,7 +1190,7 @@ func (ec *JobsControllerImpl) failStepRun(ctx context.Context, tenantId, stepRun attemptCancel = true } - if !stepRun.StepRun.WorkerId.Valid { + if !stepRun.SRWorkerId.Valid { // this is not a fatal error ec.l.Warn().Msgf("step run %s has no worker id, skipping cancellation", stepRunId) attemptCancel = false @@ -1181,7 +1199,7 @@ func (ec *JobsControllerImpl) failStepRun(ctx context.Context, tenantId, stepRun // Attempt to cancel the previous running step run if attemptCancel { - workerId := sqlchelpers.UUIDToStr(stepRun.StepRun.WorkerId) + workerId := sqlchelpers.UUIDToStr(stepRun.SRWorkerId) worker, err := ec.repo.Worker().GetWorkerForEngine(ctx, tenantId, workerId) @@ -1286,14 +1304,14 @@ func (ec *JobsControllerImpl) cancelStepRun(ctx context.Context, tenantId, stepR defer ec.handleStepRunUpdateInfo(stepRun, updateInfo) - if !stepRun.StepRun.WorkerId.Valid { + if !stepRun.SRWorkerId.Valid { // this is not a fatal error ec.l.Debug().Msgf("step run %s has no worker id, skipping cancellation", stepRunId) return nil } - workerId := sqlchelpers.UUIDToStr(stepRun.StepRun.WorkerId) + workerId := sqlchelpers.UUIDToStr(stepRun.SRWorkerId) worker, err := ec.repo.Worker().GetWorkerForEngine(ctx, tenantId, workerId) @@ -1330,7 +1348,7 @@ func (ec *JobsControllerImpl) handleStepRunUpdateInfo(stepRun *dbsqlc.GetStepRun context.Background(), msgqueue.WORKFLOW_PROCESSING_QUEUE, tasktypes.WorkflowRunFinishedToTask( - sqlchelpers.UUIDToStr(stepRun.StepRun.TenantId), + sqlchelpers.UUIDToStr(stepRun.SRTenantId), updateInfo.WorkflowRunId, updateInfo.WorkflowRunStatus, ), diff --git a/internal/services/controllers/workflows/queue.go b/internal/services/controllers/workflows/queue.go index 268924edb..599075fa6 100644 --- a/internal/services/controllers/workflows/queue.go +++ b/internal/services/controllers/workflows/queue.go @@ -780,7 +780,7 @@ func (wc *WorkflowsControllerImpl) cancelWorkflowRun(ctx context.Context, tenant for i := range stepRuns { stepRunCp := stepRuns[i] - stepRunId := sqlchelpers.UUIDToStr(stepRunCp.StepRun.ID) + stepRunId := sqlchelpers.UUIDToStr(stepRunCp.SRID) errGroup.Go(func() error { return wc.mq.AddMessage( diff --git a/internal/services/dispatcher/dispatcher.go b/internal/services/dispatcher/dispatcher.go index 8d9c251c9..0fb7c025b 100644 --- a/internal/services/dispatcher/dispatcher.go +++ b/internal/services/dispatcher/dispatcher.go @@ -449,13 +449,19 @@ func (d *DispatcherImpl) handleStepRunAssignedTask(ctx context.Context, task *ms return fmt.Errorf("could not get step run: %w", err) } + data, err := d.repo.StepRun().GetStepRunDataForEngine(ctx, metadata.TenantId, payload.StepRunId) + + if err != nil { + return fmt.Errorf("could not get step run data: %w", err) + } + servertel.WithStepRunModel(span, stepRun) var multiErr error var success bool for _, w := range workers { - err = w.StartStepRun(ctx, metadata.TenantId, stepRun) + err = w.StartStepRun(ctx, metadata.TenantId, stepRun, data) if err != nil { multiErr = multierror.Append(multiErr, fmt.Errorf("could not send step action to worker: %w", err)) diff --git a/internal/services/dispatcher/server.go b/internal/services/dispatcher/server.go index fac647cad..ca479f96b 100644 --- a/internal/services/dispatcher/server.go +++ b/internal/services/dispatcher/server.go @@ -39,14 +39,15 @@ func (worker *subscribedWorker) StartStepRun( ctx context.Context, tenantId string, stepRun *dbsqlc.GetStepRunForEngineRow, + stepRunData *dbsqlc.GetStepRunDataForEngineRow, ) error { ctx, span := telemetry.NewSpan(ctx, "start-step-run") // nolint:ineffassign defer span.End() inputBytes := []byte{} - if stepRun.StepRun.Input != nil { - inputBytes = stepRun.StepRun.Input + if stepRunData.Input != nil { + inputBytes = stepRunData.Input } stepName := stepRun.StepReadableId.String @@ -57,13 +58,13 @@ func (worker *subscribedWorker) StartStepRun( JobName: stepRun.JobName, JobRunId: sqlchelpers.UUIDToStr(stepRun.JobRunId), StepId: sqlchelpers.UUIDToStr(stepRun.StepId), - StepRunId: sqlchelpers.UUIDToStr(stepRun.StepRun.ID), + StepRunId: sqlchelpers.UUIDToStr(stepRun.SRID), ActionType: contracts.ActionType_START_STEP_RUN, ActionId: stepRun.ActionId, ActionPayload: string(inputBytes), StepName: stepName, WorkflowRunId: sqlchelpers.UUIDToStr(stepRun.WorkflowRunId), - RetryCount: stepRun.StepRun.RetryCount, + RetryCount: stepRun.SRRetryCount, }) } @@ -103,11 +104,11 @@ func (worker *subscribedWorker) CancelStepRun( JobName: stepRun.JobName, JobRunId: sqlchelpers.UUIDToStr(stepRun.JobRunId), StepId: sqlchelpers.UUIDToStr(stepRun.StepId), - StepRunId: sqlchelpers.UUIDToStr(stepRun.StepRun.ID), + StepRunId: sqlchelpers.UUIDToStr(stepRun.SRID), ActionType: contracts.ActionType_CANCEL_STEP_RUN, StepName: stepRun.StepReadableId.String, WorkflowRunId: sqlchelpers.UUIDToStr(stepRun.WorkflowRunId), - RetryCount: stepRun.StepRun.RetryCount, + RetryCount: stepRun.SRRetryCount, }) } @@ -1115,7 +1116,7 @@ func (s *DispatcherImpl) tenantTaskToWorkflowEvent(task *msgqueue.Message, tenan } workflowEvent.StepRetries = &stepRun.StepRetries - workflowEvent.RetryCount = &stepRun.StepRun.RetryCount + workflowEvent.RetryCount = &stepRun.SRRetryCount if workflowEvent.EventType == contracts.ResourceEventType_RESOURCE_EVENT_TYPE_STREAM { streamEventId, err := strconv.ParseInt(task.Metadata["stream_event_id"].(string), 10, 64) @@ -1203,23 +1204,30 @@ func (s *DispatcherImpl) getStepResultsForWorkflowRun(tenantId string, workflowR res := make(map[string][]*contracts.StepRunResult) for _, stepRun := range stepRuns { + + data, err := s.repo.StepRun().GetStepRunDataForEngine(context.Background(), tenantId, sqlchelpers.UUIDToStr(stepRun.SRID)) + + if err != nil { + return nil, fmt.Errorf("could not get step run data for %s, %e", sqlchelpers.UUIDToStr(stepRun.SRID), err) + } + resStepRun := &contracts.StepRunResult{ - StepRunId: sqlchelpers.UUIDToStr(stepRun.StepRun.ID), + StepRunId: sqlchelpers.UUIDToStr(stepRun.SRID), StepReadableId: stepRun.StepReadableId.String, JobRunId: sqlchelpers.UUIDToStr(stepRun.JobRunId), } - if stepRun.StepRun.Error.Valid { - resStepRun.Error = &stepRun.StepRun.Error.String + if data.Error.Valid { + resStepRun.Error = &data.Error.String } - if stepRun.StepRun.CancelledReason.Valid { - errString := fmt.Sprintf("this step run was cancelled due to %s", stepRun.StepRun.CancelledReason.String) + if stepRun.SRCancelledReason.Valid { + errString := fmt.Sprintf("this step run was cancelled due to %s", stepRun.SRCancelledReason.String) resStepRun.Error = &errString } - if stepRun.StepRun.Output != nil { - resStepRun.Output = repository.StringPtr(string(stepRun.StepRun.Output)) + if data.Output != nil { + resStepRun.Output = repository.StringPtr(string(data.Output)) } workflowRunId := sqlchelpers.UUIDToStr(stepRun.WorkflowRunId) diff --git a/internal/services/shared/tasktypes/step.go b/internal/services/shared/tasktypes/step.go index eacf6723f..3cbeaef00 100644 --- a/internal/services/shared/tasktypes/step.go +++ b/internal/services/shared/tasktypes/step.go @@ -152,8 +152,8 @@ func TenantToStepRunRequeueTask(tenant db.TenantModel) *msgqueue.Message { func StepRunRetryToTask(stepRun *dbsqlc.GetStepRunForEngineRow, inputData []byte) *msgqueue.Message { jobRunId := sqlchelpers.UUIDToStr(stepRun.JobRunId) - stepRunId := sqlchelpers.UUIDToStr(stepRun.StepRun.ID) - tenantId := sqlchelpers.UUIDToStr(stepRun.StepRun.TenantId) + stepRunId := sqlchelpers.UUIDToStr(stepRun.SRID) + tenantId := sqlchelpers.UUIDToStr(stepRun.SRTenantId) payload, _ := datautils.ToJSONMap(StepRunRetryTaskPayload{ JobRunId: jobRunId, @@ -175,8 +175,8 @@ func StepRunRetryToTask(stepRun *dbsqlc.GetStepRunForEngineRow, inputData []byte func StepRunReplayToTask(stepRun *dbsqlc.GetStepRunForEngineRow, inputData []byte) *msgqueue.Message { jobRunId := sqlchelpers.UUIDToStr(stepRun.JobRunId) - stepRunId := sqlchelpers.UUIDToStr(stepRun.StepRun.ID) - tenantId := sqlchelpers.UUIDToStr(stepRun.StepRun.TenantId) + stepRunId := sqlchelpers.UUIDToStr(stepRun.SRID) + tenantId := sqlchelpers.UUIDToStr(stepRun.SRTenantId) payload, _ := datautils.ToJSONMap(StepRunReplayTaskPayload{ JobRunId: jobRunId, @@ -197,8 +197,8 @@ func StepRunReplayToTask(stepRun *dbsqlc.GetStepRunForEngineRow, inputData []byt } func StepRunCancelToTask(stepRun *dbsqlc.GetStepRunForEngineRow, reason string) *msgqueue.Message { - stepRunId := sqlchelpers.UUIDToStr(stepRun.StepRun.ID) - tenantId := sqlchelpers.UUIDToStr(stepRun.StepRun.TenantId) + stepRunId := sqlchelpers.UUIDToStr(stepRun.SRID) + tenantId := sqlchelpers.UUIDToStr(stepRun.SRTenantId) payload, _ := datautils.ToJSONMap(StepRunNotifyCancelTaskPayload{ StepRunId: stepRunId, @@ -220,7 +220,7 @@ func StepRunCancelToTask(stepRun *dbsqlc.GetStepRunForEngineRow, reason string) func StepRunQueuedToTask(stepRun *dbsqlc.GetStepRunForEngineRow) *msgqueue.Message { payload, _ := datautils.ToJSONMap(StepRunTaskPayload{ JobRunId: sqlchelpers.UUIDToStr(stepRun.JobRunId), - StepRunId: sqlchelpers.UUIDToStr(stepRun.StepRun.ID), + StepRunId: sqlchelpers.UUIDToStr(stepRun.SRID), }) metadata, _ := datautils.ToJSONMap(StepRunTaskMetadata{ @@ -229,7 +229,7 @@ func StepRunQueuedToTask(stepRun *dbsqlc.GetStepRunForEngineRow) *msgqueue.Messa JobName: stepRun.JobName, JobId: sqlchelpers.UUIDToStr(stepRun.JobId), WorkflowVersionId: sqlchelpers.UUIDToStr(stepRun.WorkflowVersionId), - TenantId: sqlchelpers.UUIDToStr(stepRun.StepRun.TenantId), + TenantId: sqlchelpers.UUIDToStr(stepRun.SRTenantId), }) return &msgqueue.Message{ diff --git a/internal/telemetry/servertel/attributes.go b/internal/telemetry/servertel/attributes.go index f1e624801..fdc913db8 100644 --- a/internal/telemetry/servertel/attributes.go +++ b/internal/telemetry/servertel/attributes.go @@ -13,8 +13,8 @@ import ( func WithStepRunModel(span trace.Span, stepRun *dbsqlc.GetStepRunForEngineRow) { telemetry.WithAttributes( span, - TenantId(stepRun.StepRun.TenantId), - StepRunId(stepRun.StepRun.ID), + TenantId(stepRun.SRTenantId), + StepRunId(stepRun.SRID), Step(stepRun.StepId), JobRunId(stepRun.JobRunId), ) diff --git a/pkg/repository/prisma/dbsqlc/step_runs.sql b/pkg/repository/prisma/dbsqlc/step_runs.sql index 7f4bb2cdc..3002392ca 100644 --- a/pkg/repository/prisma/dbsqlc/step_runs.sql +++ b/pkg/repository/prisma/dbsqlc/step_runs.sql @@ -7,10 +7,41 @@ WHERE "id" = @id::uuid AND "tenantId" = @tenantId::uuid; +-- name: GetStepRunDataForEngine :one +SELECT + "input", + "output", + "error" +FROM + "StepRun" +WHERE + "id" = @id::uuid AND + "tenantId" = @tenantId::uuid; + -- name: GetStepRunForEngine :many SELECT DISTINCT ON (sr."id") - sqlc.embed(sr), + sr."id" AS "SR_id", + sr."createdAt" AS "SR_createdAt", + sr."updatedAt" AS "SR_updatedAt", + sr."deletedAt" AS "SR_deletedAt", + sr."tenantId" AS "SR_tenantId", + sr."order" AS "SR_order", + sr."workerId" AS "SR_workerId", + sr."tickerId" AS "SR_tickerId", + sr."status" AS "SR_status", + sr."requeueAfter" AS "SR_requeueAfter", + sr."scheduleTimeoutAt" AS "SR_scheduleTimeoutAt", + sr."startedAt" AS "SR_startedAt", + sr."finishedAt" AS "SR_finishedAt", + sr."timeoutAt" AS "SR_timeoutAt", + sr."cancelledAt" AS "SR_cancelledAt", + sr."cancelledReason" AS "SR_cancelledReason", + sr."cancelledError" AS "SR_cancelledError", + sr."callerFiles" AS "SR_callerFiles", + sr."gitRepoBranch" AS "SR_gitRepoBranch", + sr."retryCount" AS "SR_retryCount", + sr."semaphoreReleased" AS "SR_semaphoreReleased", jrld."data" AS "jobRunLookupData", -- TODO: everything below this line is cacheable and should be moved to a separate query jr."id" AS "jobRunId", diff --git a/pkg/repository/prisma/dbsqlc/step_runs.sql.go b/pkg/repository/prisma/dbsqlc/step_runs.sql.go index 2bb45588f..93a5586e7 100644 --- a/pkg/repository/prisma/dbsqlc/step_runs.sql.go +++ b/pkg/repository/prisma/dbsqlc/step_runs.sql.go @@ -544,10 +544,60 @@ func (q *Queries) GetStepRun(ctx context.Context, db DBTX, arg GetStepRunParams) return &i, err } +const getStepRunDataForEngine = `-- name: GetStepRunDataForEngine :one +SELECT + "input", + "output", + "error" +FROM + "StepRun" +WHERE + "id" = $1::uuid AND + "tenantId" = $2::uuid +` + +type GetStepRunDataForEngineParams struct { + ID pgtype.UUID `json:"id"` + Tenantid pgtype.UUID `json:"tenantid"` +} + +type GetStepRunDataForEngineRow struct { + Input []byte `json:"input"` + Output []byte `json:"output"` + Error pgtype.Text `json:"error"` +} + +func (q *Queries) GetStepRunDataForEngine(ctx context.Context, db DBTX, arg GetStepRunDataForEngineParams) (*GetStepRunDataForEngineRow, error) { + row := db.QueryRow(ctx, getStepRunDataForEngine, arg.ID, arg.Tenantid) + var i GetStepRunDataForEngineRow + err := row.Scan(&i.Input, &i.Output, &i.Error) + return &i, err +} + const getStepRunForEngine = `-- name: GetStepRunForEngine :many SELECT DISTINCT ON (sr."id") - sr.id, sr."createdAt", sr."updatedAt", sr."deletedAt", sr."tenantId", sr."jobRunId", sr."stepId", sr."order", sr."workerId", sr."tickerId", sr.status, sr.input, sr.output, sr."requeueAfter", sr."scheduleTimeoutAt", sr.error, sr."startedAt", sr."finishedAt", sr."timeoutAt", sr."cancelledAt", sr."cancelledReason", sr."cancelledError", sr."inputSchema", sr."callerFiles", sr."gitRepoBranch", sr."retryCount", sr."semaphoreReleased", + sr."id" AS "SR_id", + sr."createdAt" AS "SR_createdAt", + sr."updatedAt" AS "SR_updatedAt", + sr."deletedAt" AS "SR_deletedAt", + sr."tenantId" AS "SR_tenantId", + sr."order" AS "SR_order", + sr."workerId" AS "SR_workerId", + sr."tickerId" AS "SR_tickerId", + sr."status" AS "SR_status", + sr."requeueAfter" AS "SR_requeueAfter", + sr."scheduleTimeoutAt" AS "SR_scheduleTimeoutAt", + sr."startedAt" AS "SR_startedAt", + sr."finishedAt" AS "SR_finishedAt", + sr."timeoutAt" AS "SR_timeoutAt", + sr."cancelledAt" AS "SR_cancelledAt", + sr."cancelledReason" AS "SR_cancelledReason", + sr."cancelledError" AS "SR_cancelledError", + sr."callerFiles" AS "SR_callerFiles", + sr."gitRepoBranch" AS "SR_gitRepoBranch", + sr."retryCount" AS "SR_retryCount", + sr."semaphoreReleased" AS "SR_semaphoreReleased", jrld."data" AS "jobRunLookupData", -- TODO: everything below this line is cacheable and should be moved to a separate query jr."id" AS "jobRunId", @@ -589,21 +639,41 @@ type GetStepRunForEngineParams struct { } type GetStepRunForEngineRow struct { - StepRun StepRun `json:"step_run"` - JobRunLookupData []byte `json:"jobRunLookupData"` - JobRunId pgtype.UUID `json:"jobRunId"` - StepId pgtype.UUID `json:"stepId"` - StepRetries int32 `json:"stepRetries"` - StepTimeout pgtype.Text `json:"stepTimeout"` - StepScheduleTimeout string `json:"stepScheduleTimeout"` - StepReadableId pgtype.Text `json:"stepReadableId"` - StepCustomUserData []byte `json:"stepCustomUserData"` - JobName string `json:"jobName"` - JobId pgtype.UUID `json:"jobId"` - JobKind JobKind `json:"jobKind"` - WorkflowVersionId pgtype.UUID `json:"workflowVersionId"` - WorkflowRunId pgtype.UUID `json:"workflowRunId"` - ActionId string `json:"actionId"` + SRID pgtype.UUID `json:"SR_id"` + SRCreatedAt pgtype.Timestamp `json:"SR_createdAt"` + SRUpdatedAt pgtype.Timestamp `json:"SR_updatedAt"` + SRDeletedAt pgtype.Timestamp `json:"SR_deletedAt"` + SRTenantId pgtype.UUID `json:"SR_tenantId"` + SROrder int64 `json:"SR_order"` + SRWorkerId pgtype.UUID `json:"SR_workerId"` + SRTickerId pgtype.UUID `json:"SR_tickerId"` + SRStatus StepRunStatus `json:"SR_status"` + SRRequeueAfter pgtype.Timestamp `json:"SR_requeueAfter"` + SRScheduleTimeoutAt pgtype.Timestamp `json:"SR_scheduleTimeoutAt"` + SRStartedAt pgtype.Timestamp `json:"SR_startedAt"` + SRFinishedAt pgtype.Timestamp `json:"SR_finishedAt"` + SRTimeoutAt pgtype.Timestamp `json:"SR_timeoutAt"` + SRCancelledAt pgtype.Timestamp `json:"SR_cancelledAt"` + SRCancelledReason pgtype.Text `json:"SR_cancelledReason"` + SRCancelledError pgtype.Text `json:"SR_cancelledError"` + SRCallerFiles []byte `json:"SR_callerFiles"` + SRGitRepoBranch pgtype.Text `json:"SR_gitRepoBranch"` + SRRetryCount int32 `json:"SR_retryCount"` + SRSemaphoreReleased bool `json:"SR_semaphoreReleased"` + JobRunLookupData []byte `json:"jobRunLookupData"` + JobRunId pgtype.UUID `json:"jobRunId"` + StepId pgtype.UUID `json:"stepId"` + StepRetries int32 `json:"stepRetries"` + StepTimeout pgtype.Text `json:"stepTimeout"` + StepScheduleTimeout string `json:"stepScheduleTimeout"` + StepReadableId pgtype.Text `json:"stepReadableId"` + StepCustomUserData []byte `json:"stepCustomUserData"` + JobName string `json:"jobName"` + JobId pgtype.UUID `json:"jobId"` + JobKind JobKind `json:"jobKind"` + WorkflowVersionId pgtype.UUID `json:"workflowVersionId"` + WorkflowRunId pgtype.UUID `json:"workflowRunId"` + ActionId string `json:"actionId"` } func (q *Queries) GetStepRunForEngine(ctx context.Context, db DBTX, arg GetStepRunForEngineParams) ([]*GetStepRunForEngineRow, error) { @@ -616,33 +686,27 @@ func (q *Queries) GetStepRunForEngine(ctx context.Context, db DBTX, arg GetStepR for rows.Next() { var i GetStepRunForEngineRow if err := rows.Scan( - &i.StepRun.ID, - &i.StepRun.CreatedAt, - &i.StepRun.UpdatedAt, - &i.StepRun.DeletedAt, - &i.StepRun.TenantId, - &i.StepRun.JobRunId, - &i.StepRun.StepId, - &i.StepRun.Order, - &i.StepRun.WorkerId, - &i.StepRun.TickerId, - &i.StepRun.Status, - &i.StepRun.Input, - &i.StepRun.Output, - &i.StepRun.RequeueAfter, - &i.StepRun.ScheduleTimeoutAt, - &i.StepRun.Error, - &i.StepRun.StartedAt, - &i.StepRun.FinishedAt, - &i.StepRun.TimeoutAt, - &i.StepRun.CancelledAt, - &i.StepRun.CancelledReason, - &i.StepRun.CancelledError, - &i.StepRun.InputSchema, - &i.StepRun.CallerFiles, - &i.StepRun.GitRepoBranch, - &i.StepRun.RetryCount, - &i.StepRun.SemaphoreReleased, + &i.SRID, + &i.SRCreatedAt, + &i.SRUpdatedAt, + &i.SRDeletedAt, + &i.SRTenantId, + &i.SROrder, + &i.SRWorkerId, + &i.SRTickerId, + &i.SRStatus, + &i.SRRequeueAfter, + &i.SRScheduleTimeoutAt, + &i.SRStartedAt, + &i.SRFinishedAt, + &i.SRTimeoutAt, + &i.SRCancelledAt, + &i.SRCancelledReason, + &i.SRCancelledError, + &i.SRCallerFiles, + &i.SRGitRepoBranch, + &i.SRRetryCount, + &i.SRSemaphoreReleased, &i.JobRunLookupData, &i.JobRunId, &i.StepId, diff --git a/pkg/repository/prisma/step_run.go b/pkg/repository/prisma/step_run.go index 80631c30e..2d254e6cb 100644 --- a/pkg/repository/prisma/step_run.go +++ b/pkg/repository/prisma/step_run.go @@ -243,7 +243,8 @@ func (s *stepRunEngineRepository) ListRunningStepRunsForTicker(ctx context.Conte err = tx.Commit(ctx) - return res, err + return res, nil + } func (s *stepRunEngineRepository) ListStepRuns(ctx context.Context, tenantId string, opts *repository.ListStepRunsOpts) ([]*dbsqlc.GetStepRunForEngineRow, error) { @@ -502,11 +503,11 @@ func (s *stepRunEngineRepository) ReleaseStepRunSemaphore(ctx context.Context, t return fmt.Errorf("could not get step run for engine: %w", err) } - if stepRun.StepRun.SemaphoreReleased { + if stepRun.SRSemaphoreReleased { return nil } - data := map[string]interface{}{"worker_id": sqlchelpers.UUIDToStr(stepRun.StepRun.WorkerId)} + data := map[string]interface{}{"worker_id": sqlchelpers.UUIDToStr(stepRun.SRWorkerId)} dataBytes, err := json.Marshal(data) @@ -515,7 +516,7 @@ func (s *stepRunEngineRepository) ReleaseStepRunSemaphore(ctx context.Context, t } err = s.queries.CreateStepRunEvent(ctx, tx, dbsqlc.CreateStepRunEventParams{ - Steprunid: stepRun.StepRun.ID, + Steprunid: stepRun.SRID, Reason: dbsqlc.StepRunEventReasonSLOTRELEASED, Severity: dbsqlc.StepRunEventSeverityINFO, Message: "Slot released", @@ -527,8 +528,8 @@ func (s *stepRunEngineRepository) ReleaseStepRunSemaphore(ctx context.Context, t } _, err = s.queries.ReleaseWorkerSemaphoreSlot(ctx, tx, dbsqlc.ReleaseWorkerSemaphoreSlotParams{ - Steprunid: stepRun.StepRun.ID, - Tenantid: stepRun.StepRun.TenantId, + Steprunid: stepRun.SRID, + Tenantid: stepRun.SRTenantId, }) if err != nil { @@ -536,8 +537,8 @@ func (s *stepRunEngineRepository) ReleaseStepRunSemaphore(ctx context.Context, t } _, err = s.queries.UnlinkStepRunFromWorker(ctx, tx, dbsqlc.UnlinkStepRunFromWorkerParams{ - Steprunid: stepRun.StepRun.ID, - Tenantid: stepRun.StepRun.TenantId, + Steprunid: stepRun.SRID, + Tenantid: stepRun.SRTenantId, }) if err != nil { @@ -546,8 +547,8 @@ func (s *stepRunEngineRepository) ReleaseStepRunSemaphore(ctx context.Context, t // Update the Step Run to release the semaphore _, err = s.queries.UpdateStepRun(ctx, tx, dbsqlc.UpdateStepRunParams{ - ID: stepRun.StepRun.ID, - Tenantid: stepRun.StepRun.TenantId, + ID: stepRun.SRID, + Tenantid: stepRun.SRTenantId, SemaphoreReleased: pgtype.Bool{ Valid: true, Bool: true, @@ -573,8 +574,8 @@ func (s *stepRunEngineRepository) releaseWorkerSemaphore(ctx context.Context, st defer deferRollback(ctx, s.l, tx.Rollback) _, err = s.queries.ReleaseWorkerSemaphoreSlot(ctx, tx, dbsqlc.ReleaseWorkerSemaphoreSlotParams{ - Steprunid: stepRun.StepRun.ID, - Tenantid: stepRun.StepRun.TenantId, + Steprunid: stepRun.SRID, + Tenantid: stepRun.SRTenantId, }) if err != nil && !errors.Is(err, pgx.ErrNoRows) { @@ -585,8 +586,8 @@ func (s *stepRunEngineRepository) releaseWorkerSemaphore(ctx context.Context, st // so that we don't re-increment the old worker semaphore on each retry if err == nil { _, err = s.queries.UnlinkStepRunFromWorker(ctx, tx, dbsqlc.UnlinkStepRunFromWorkerParams{ - Steprunid: stepRun.StepRun.ID, - Tenantid: stepRun.StepRun.TenantId, + Steprunid: stepRun.SRID, + Tenantid: stepRun.SRTenantId, }) if err != nil { @@ -617,10 +618,10 @@ func (s *stepRunEngineRepository) assignStepRunToWorkerAttempt(ctx context.Conte // acquire a semaphore slot assigned, err := s.queries.AcquireWorkerSemaphoreSlotAndAssign(ctx, tx, dbsqlc.AcquireWorkerSemaphoreSlotAndAssignParams{ - Steprunid: stepRun.StepRun.ID, + Steprunid: stepRun.SRID, Actionid: stepRun.ActionId, StepTimeout: stepRun.StepTimeout, - Tenantid: stepRun.StepRun.TenantId, + Tenantid: stepRun.SRTenantId, }) if err != nil { @@ -702,7 +703,7 @@ func (s *stepRunEngineRepository) AssignStepRunToWorker(ctx context.Context, ste return err } - return fmt.Errorf("could not assign worker for step run %s (step %s): %w", sqlchelpers.UUIDToStr(stepRun.StepRun.ID), stepRun.StepReadableId.String, err) + return fmt.Errorf("could not assign worker for step run %s (step %s): %w", sqlchelpers.UUIDToStr(stepRun.SRID), stepRun.StepReadableId.String, err) } return nil @@ -713,7 +714,7 @@ func (s *stepRunEngineRepository) AssignStepRunToWorker(ctx context.Context, ste if errors.As(err, &target) { defer s.deferredStepRunEvent( - stepRun.StepRun.ID, + stepRun.SRID, dbsqlc.StepRunEventReasonREQUEUEDNOWORKER, dbsqlc.StepRunEventSeverityWARNING, "No worker available", @@ -725,7 +726,7 @@ func (s *stepRunEngineRepository) AssignStepRunToWorker(ctx context.Context, ste if errors.Is(err, repository.ErrNoWorkerAvailable) { defer s.deferredStepRunEvent( - stepRun.StepRun.ID, + stepRun.SRID, dbsqlc.StepRunEventReasonREQUEUEDNOWORKER, dbsqlc.StepRunEventSeverityWARNING, "No worker available", @@ -735,7 +736,7 @@ func (s *stepRunEngineRepository) AssignStepRunToWorker(ctx context.Context, ste if errors.Is(err, repository.ErrRateLimitExceeded) { defer s.deferredStepRunEvent( - stepRun.StepRun.ID, + stepRun.SRID, dbsqlc.StepRunEventReasonREQUEUEDRATELIMIT, dbsqlc.StepRunEventSeverityWARNING, "Rate limit exceeded", @@ -747,7 +748,7 @@ func (s *stepRunEngineRepository) AssignStepRunToWorker(ctx context.Context, ste } defer s.deferredStepRunEvent( - stepRun.StepRun.ID, + stepRun.SRID, dbsqlc.StepRunEventReasonASSIGNED, dbsqlc.StepRunEventSeverityINFO, fmt.Sprintf("Assigned to worker %s", sqlchelpers.UUIDToStr(assigned.WorkerId)), @@ -811,7 +812,7 @@ func (s *stepRunEngineRepository) UpdateStepRun(ctx context.Context, tenantId, s } err = deadlockRetry(s.l, func() error { - updateInfo, err = s.ResolveRelatedStatuses(ctx, stepRun.StepRun.TenantId, stepRun.StepRun.ID) + updateInfo, err = s.ResolveRelatedStatuses(ctx, stepRun.SRTenantId, stepRun.SRID) return err }) @@ -949,7 +950,7 @@ func (s *stepRunEngineRepository) PreflightCheckReplayStepRun(ctx context.Contex return err } - if !repository.IsFinalStepRunStatus(stepRun.StepRun.Status) { + if !repository.IsFinalStepRunStatus(stepRun.SRStatus) { return repository.ErrPreflightReplayStepRunNotInFinalState } @@ -1141,7 +1142,7 @@ func (s *stepRunEngineRepository) QueueStepRun(ctx context.Context, tenantId, st } retrierExtraErr := deadlockRetry(s.l, func() error { - _, err = s.ResolveRelatedStatuses(ctx, stepRun.StepRun.TenantId, stepRun.StepRun.ID) + _, err = s.ResolveRelatedStatuses(ctx, stepRun.SRTenantId, stepRun.SRID) return err }) @@ -1458,6 +1459,13 @@ func (s *stepRunEngineRepository) getStepRunForEngineTx(ctx context.Context, dbt return res[0], nil } +func (s *stepRunEngineRepository) GetStepRunDataForEngine(ctx context.Context, tenantId, stepRunId string) (*dbsqlc.GetStepRunDataForEngineRow, error) { + return s.queries.GetStepRunDataForEngine(ctx, s.pool, dbsqlc.GetStepRunDataForEngineParams{ + ID: sqlchelpers.UUIDFromStr(stepRunId), + Tenantid: sqlchelpers.UUIDFromStr(tenantId), + }) +} + func (s *stepRunEngineRepository) ListStartableStepRuns(ctx context.Context, tenantId, jobRunId string, parentStepRunId *string) ([]*dbsqlc.GetStepRunForEngineRow, error) { tx, err := s.pool.Begin(ctx) diff --git a/pkg/repository/step_run.go b/pkg/repository/step_run.go index b55da9144..8bc454ba3 100644 --- a/pkg/repository/step_run.go +++ b/pkg/repository/step_run.go @@ -181,6 +181,8 @@ type StepRunEngineRepository interface { GetStepRunForEngine(ctx context.Context, tenantId, stepRunId string) (*dbsqlc.GetStepRunForEngineRow, error) + GetStepRunDataForEngine(ctx context.Context, tenantId, stepRunId string) (*dbsqlc.GetStepRunDataForEngineRow, error) + // QueueStepRun is like UpdateStepRun, except that it will only update the step run if it is in // a pending state. QueueStepRun(ctx context.Context, tenantId, stepRunId string, opts *UpdateStepRunOpts) (*dbsqlc.GetStepRunForEngineRow, error)