Skip to content

Commit

Permalink
fix: large step run payloads (#719)
Browse files Browse the repository at this point in the history
* wip: just needs working transformation

* fix: refactor data into separate query
  • Loading branch information
grutt authored Jul 14, 2024
1 parent fee4c63 commit f4f752b
Show file tree
Hide file tree
Showing 10 changed files with 255 additions and 118 deletions.
66 changes: 42 additions & 24 deletions internal/services/controllers/jobs/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,14 +405,20 @@ func (ec *JobsControllerImpl) handleStepRunRetry(ctx context.Context, task *msgq
return fmt.Errorf("could not get step run: %w", err)
}

inputBytes := stepRun.StepRun.Input
retryCount := int(stepRun.StepRun.RetryCount) + 1
data, err := ec.repo.StepRun().GetStepRunDataForEngine(ctx, metadata.TenantId, payload.StepRunId)

if err != nil {
return fmt.Errorf("could not get step run data: %w", err)
}

inputBytes := data.Input
retryCount := int(stepRun.SRRetryCount) + 1

// update step run
_, _, err = ec.repo.StepRun().UpdateStepRun(
ctx,
metadata.TenantId,
sqlchelpers.UUIDToStr(stepRun.StepRun.ID),
sqlchelpers.UUIDToStr(stepRun.SRID),
&repository.UpdateStepRunOpts{
Input: inputBytes,
Status: repository.StepRunStatusPtr(db.StepRunStatusPending),
Expand Down Expand Up @@ -480,8 +486,14 @@ func (ec *JobsControllerImpl) handleStepRunReplay(ctx context.Context, task *msg
return fmt.Errorf("could not get step run: %w", err)
}

data, err := ec.repo.StepRun().GetStepRunDataForEngine(ctx, metadata.TenantId, payload.StepRunId)

if err != nil {
return fmt.Errorf("could not get step run data: %w", err)
}

var inputBytes []byte
retryCount := int(stepRun.StepRun.RetryCount) + 1
retryCount := int(stepRun.SRRetryCount) + 1

// update the input schema for the step run based on the new input
if payload.InputData != "" {
Expand All @@ -491,7 +503,7 @@ func (ec *JobsControllerImpl) handleStepRunReplay(ctx context.Context, task *msg
// input data because the user could have deleted fields that are required by the step.
// A better solution would be to validate the user input ahead of time.
// NOTE: this is an expensive operation.
if currentInput := stepRun.StepRun.Input; len(currentInput) > 0 {
if currentInput := data.Input; len(currentInput) > 0 {
inputMap, err := datautils.JSONBytesToMap([]byte(payload.InputData))

if err != nil {
Expand Down Expand Up @@ -525,14 +537,14 @@ func (ec *JobsControllerImpl) handleStepRunReplay(ctx context.Context, task *msg
// if the input data has been manually set, we reset the retry count as this is a user-triggered retry
retryCount = 0
} else {
inputBytes = stepRun.StepRun.Input
inputBytes = data.Input
}

// update step run
_, err = ec.repo.StepRun().ReplayStepRun(
ctx,
metadata.TenantId,
sqlchelpers.UUIDToStr(stepRun.StepRun.ID),
sqlchelpers.UUIDToStr(stepRun.SRID),
&repository.UpdateStepRunOpts{
Input: inputBytes,
Status: repository.StepRunStatusPtr(db.StepRunStatusPending),
Expand Down Expand Up @@ -645,10 +657,10 @@ func (ec *JobsControllerImpl) runStepRunRequeueTenant(ctx context.Context, tenan
defer span.End()

now := time.Now().UTC()
stepRunId := sqlchelpers.UUIDToStr(stepRunCp.StepRun.ID)
stepRunId := sqlchelpers.UUIDToStr(stepRunCp.SRID)

// if the current time is after the scheduleTimeoutAt, then mark this as timed out
scheduleTimeoutAt := stepRunCp.StepRun.ScheduleTimeoutAt.Time
scheduleTimeoutAt := stepRunCp.SRScheduleTimeoutAt.Time

// timed out if there was no scheduleTimeoutAt set and the current time is after the step run created at time plus the default schedule timeout,
// or if the scheduleTimeoutAt is set and the current time is after the scheduleTimeoutAt
Expand Down Expand Up @@ -730,11 +742,11 @@ func (ec *JobsControllerImpl) runStepRunReassignTenant(ctx context.Context, tena
ctx, span := telemetry.NewSpan(ctx, "handle-step-run-reassign-step-run")
defer span.End()

stepRunId := sqlchelpers.UUIDToStr(stepRunCp.StepRun.ID)
stepRunId := sqlchelpers.UUIDToStr(stepRunCp.SRID)

// if the current time is after the scheduleTimeoutAt, then mark this as timed out
now := time.Now().UTC().UTC()
scheduleTimeoutAt := stepRunCp.StepRun.ScheduleTimeoutAt.Time
scheduleTimeoutAt := stepRunCp.SRScheduleTimeoutAt.Time

// timed out if there was no scheduleTimeoutAt set and the current time is after the step run created at time plus the default schedule timeout,
// or if the scheduleTimeoutAt is set and the current time is after the scheduleTimeoutAt
Expand All @@ -745,7 +757,7 @@ func (ec *JobsControllerImpl) runStepRunReassignTenant(ctx context.Context, tena
}

eventData := map[string]interface{}{
"worker_id": sqlchelpers.UUIDToStr(stepRunCp.StepRun.WorkerId),
"worker_id": sqlchelpers.UUIDToStr(stepRunCp.SRWorkerId),
}

// TODO: batch this query to avoid n+1 issues
Expand Down Expand Up @@ -825,7 +837,7 @@ func (ec *JobsControllerImpl) runStepRunTimeoutTenant(ctx context.Context, tenan
ctx, span := telemetry.NewSpan(ctx, "handle-step-run-timeout-step-run")
defer span.End()

stepRunId := sqlchelpers.UUIDToStr(stepRunCp.StepRun.ID)
stepRunId := sqlchelpers.UUIDToStr(stepRunCp.SRID)

return ec.failStepRun(ctx, tenantId, stepRunId, "TIMED_OUT", time.Now().UTC())
})
Expand All @@ -851,6 +863,12 @@ func (ec *JobsControllerImpl) queueStepRun(ctx context.Context, tenantId, stepId
return ec.a.WrapErr(fmt.Errorf("could not get step run: %w", err), errData)
}

data, err := ec.repo.StepRun().GetStepRunDataForEngine(ctx, tenantId, stepRunId)

if err != nil {
return ec.a.WrapErr(fmt.Errorf("could not get step run data: %w", err), errData)
}

servertel.WithStepRunModel(span, stepRun)

requeueAfterTime := time.Now().Add(4 * time.Second).UTC()
Expand All @@ -860,15 +878,15 @@ func (ec *JobsControllerImpl) queueStepRun(ctx context.Context, tenantId, stepId
}

// set scheduling timeout
if scheduleTimeoutAt := stepRun.StepRun.ScheduleTimeoutAt.Time; scheduleTimeoutAt.IsZero() {
if scheduleTimeoutAt := stepRun.SRScheduleTimeoutAt.Time; scheduleTimeoutAt.IsZero() {
scheduleTimeoutAt = getScheduleTimeout(stepRun)

updateStepOpts.ScheduleTimeoutAt = &scheduleTimeoutAt
}

// If the step run input is not set, then we should set it. This will be set upstream if we've rerun
// the step run manually with new inputs. It will not be set when the step is automatically queued.
if in := stepRun.StepRun.Input; len(in) == 0 || string(in) == "{}" {
if in := data.Input; len(in) == 0 || string(in) == "{}" {
lookupDataBytes := stepRun.JobRunLookupData

if lookupDataBytes != nil {
Expand Down Expand Up @@ -932,7 +950,7 @@ func (ec *JobsControllerImpl) scheduleStepRun(ctx context.Context, tenantId stri
ctx, span := telemetry.NewSpan(ctx, "schedule-step-run")
defer span.End()

stepRunId := sqlchelpers.UUIDToStr(stepRun.StepRun.ID)
stepRunId := sqlchelpers.UUIDToStr(stepRun.SRID)

selectedWorkerId, dispatcherId, err := ec.repo.StepRun().AssignStepRunToWorker(ctx, stepRun)

Expand Down Expand Up @@ -1066,7 +1084,7 @@ func (ec *JobsControllerImpl) handleStepRunFinished(ctx context.Context, task *m

// queue the next step runs
jobRunId := sqlchelpers.UUIDToStr(stepRun.JobRunId)
stepRunId := sqlchelpers.UUIDToStr(stepRun.StepRun.ID)
stepRunId := sqlchelpers.UUIDToStr(stepRun.SRID)

nextStepRuns, err := ec.repo.StepRun().ListStartableStepRuns(ctx, metadata.TenantId, jobRunId, &stepRunId)

Expand All @@ -1076,7 +1094,7 @@ func (ec *JobsControllerImpl) handleStepRunFinished(ctx context.Context, task *m

for _, nextStepRun := range nextStepRuns {
nextStepId := sqlchelpers.UUIDToStr(nextStepRun.StepId)
nextStepRunId := sqlchelpers.UUIDToStr(nextStepRun.StepRun.ID)
nextStepRunId := sqlchelpers.UUIDToStr(nextStepRun.SRID)

err = ec.queueStepRun(ctx, metadata.TenantId, nextStepId, nextStepRunId)

Expand Down Expand Up @@ -1123,7 +1141,7 @@ func (ec *JobsControllerImpl) failStepRun(ctx context.Context, tenantId, stepRun
}

// determine if step run should be retried or not
shouldRetry := stepRun.StepRun.RetryCount < stepRun.StepRetries
shouldRetry := stepRun.SRRetryCount < stepRun.StepRetries

status := db.StepRunStatusFailed

Expand Down Expand Up @@ -1172,7 +1190,7 @@ func (ec *JobsControllerImpl) failStepRun(ctx context.Context, tenantId, stepRun
attemptCancel = true
}

if !stepRun.StepRun.WorkerId.Valid {
if !stepRun.SRWorkerId.Valid {
// this is not a fatal error
ec.l.Warn().Msgf("step run %s has no worker id, skipping cancellation", stepRunId)
attemptCancel = false
Expand All @@ -1181,7 +1199,7 @@ func (ec *JobsControllerImpl) failStepRun(ctx context.Context, tenantId, stepRun
// Attempt to cancel the previous running step run
if attemptCancel {

workerId := sqlchelpers.UUIDToStr(stepRun.StepRun.WorkerId)
workerId := sqlchelpers.UUIDToStr(stepRun.SRWorkerId)

worker, err := ec.repo.Worker().GetWorkerForEngine(ctx, tenantId, workerId)

Expand Down Expand Up @@ -1286,14 +1304,14 @@ func (ec *JobsControllerImpl) cancelStepRun(ctx context.Context, tenantId, stepR

defer ec.handleStepRunUpdateInfo(stepRun, updateInfo)

if !stepRun.StepRun.WorkerId.Valid {
if !stepRun.SRWorkerId.Valid {
// this is not a fatal error
ec.l.Debug().Msgf("step run %s has no worker id, skipping cancellation", stepRunId)

return nil
}

workerId := sqlchelpers.UUIDToStr(stepRun.StepRun.WorkerId)
workerId := sqlchelpers.UUIDToStr(stepRun.SRWorkerId)

worker, err := ec.repo.Worker().GetWorkerForEngine(ctx, tenantId, workerId)

Expand Down Expand Up @@ -1330,7 +1348,7 @@ func (ec *JobsControllerImpl) handleStepRunUpdateInfo(stepRun *dbsqlc.GetStepRun
context.Background(),
msgqueue.WORKFLOW_PROCESSING_QUEUE,
tasktypes.WorkflowRunFinishedToTask(
sqlchelpers.UUIDToStr(stepRun.StepRun.TenantId),
sqlchelpers.UUIDToStr(stepRun.SRTenantId),
updateInfo.WorkflowRunId,
updateInfo.WorkflowRunStatus,
),
Expand Down
2 changes: 1 addition & 1 deletion internal/services/controllers/workflows/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,7 @@ func (wc *WorkflowsControllerImpl) cancelWorkflowRun(ctx context.Context, tenant

for i := range stepRuns {
stepRunCp := stepRuns[i]
stepRunId := sqlchelpers.UUIDToStr(stepRunCp.StepRun.ID)
stepRunId := sqlchelpers.UUIDToStr(stepRunCp.SRID)

errGroup.Go(func() error {
return wc.mq.AddMessage(
Expand Down
8 changes: 7 additions & 1 deletion internal/services/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,13 +449,19 @@ func (d *DispatcherImpl) handleStepRunAssignedTask(ctx context.Context, task *ms
return fmt.Errorf("could not get step run: %w", err)
}

data, err := d.repo.StepRun().GetStepRunDataForEngine(ctx, metadata.TenantId, payload.StepRunId)

if err != nil {
return fmt.Errorf("could not get step run data: %w", err)
}

servertel.WithStepRunModel(span, stepRun)

var multiErr error
var success bool

for _, w := range workers {
err = w.StartStepRun(ctx, metadata.TenantId, stepRun)
err = w.StartStepRun(ctx, metadata.TenantId, stepRun, data)

if err != nil {
multiErr = multierror.Append(multiErr, fmt.Errorf("could not send step action to worker: %w", err))
Expand Down
36 changes: 22 additions & 14 deletions internal/services/dispatcher/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,15 @@ func (worker *subscribedWorker) StartStepRun(
ctx context.Context,
tenantId string,
stepRun *dbsqlc.GetStepRunForEngineRow,
stepRunData *dbsqlc.GetStepRunDataForEngineRow,
) error {
ctx, span := telemetry.NewSpan(ctx, "start-step-run") // nolint:ineffassign
defer span.End()

inputBytes := []byte{}

if stepRun.StepRun.Input != nil {
inputBytes = stepRun.StepRun.Input
if stepRunData.Input != nil {
inputBytes = stepRunData.Input
}

stepName := stepRun.StepReadableId.String
Expand All @@ -57,13 +58,13 @@ func (worker *subscribedWorker) StartStepRun(
JobName: stepRun.JobName,
JobRunId: sqlchelpers.UUIDToStr(stepRun.JobRunId),
StepId: sqlchelpers.UUIDToStr(stepRun.StepId),
StepRunId: sqlchelpers.UUIDToStr(stepRun.StepRun.ID),
StepRunId: sqlchelpers.UUIDToStr(stepRun.SRID),
ActionType: contracts.ActionType_START_STEP_RUN,
ActionId: stepRun.ActionId,
ActionPayload: string(inputBytes),
StepName: stepName,
WorkflowRunId: sqlchelpers.UUIDToStr(stepRun.WorkflowRunId),
RetryCount: stepRun.StepRun.RetryCount,
RetryCount: stepRun.SRRetryCount,
})
}

Expand Down Expand Up @@ -103,11 +104,11 @@ func (worker *subscribedWorker) CancelStepRun(
JobName: stepRun.JobName,
JobRunId: sqlchelpers.UUIDToStr(stepRun.JobRunId),
StepId: sqlchelpers.UUIDToStr(stepRun.StepId),
StepRunId: sqlchelpers.UUIDToStr(stepRun.StepRun.ID),
StepRunId: sqlchelpers.UUIDToStr(stepRun.SRID),
ActionType: contracts.ActionType_CANCEL_STEP_RUN,
StepName: stepRun.StepReadableId.String,
WorkflowRunId: sqlchelpers.UUIDToStr(stepRun.WorkflowRunId),
RetryCount: stepRun.StepRun.RetryCount,
RetryCount: stepRun.SRRetryCount,
})
}

Expand Down Expand Up @@ -1115,7 +1116,7 @@ func (s *DispatcherImpl) tenantTaskToWorkflowEvent(task *msgqueue.Message, tenan
}

workflowEvent.StepRetries = &stepRun.StepRetries
workflowEvent.RetryCount = &stepRun.StepRun.RetryCount
workflowEvent.RetryCount = &stepRun.SRRetryCount

if workflowEvent.EventType == contracts.ResourceEventType_RESOURCE_EVENT_TYPE_STREAM {
streamEventId, err := strconv.ParseInt(task.Metadata["stream_event_id"].(string), 10, 64)
Expand Down Expand Up @@ -1203,23 +1204,30 @@ func (s *DispatcherImpl) getStepResultsForWorkflowRun(tenantId string, workflowR
res := make(map[string][]*contracts.StepRunResult)

for _, stepRun := range stepRuns {

data, err := s.repo.StepRun().GetStepRunDataForEngine(context.Background(), tenantId, sqlchelpers.UUIDToStr(stepRun.SRID))

if err != nil {
return nil, fmt.Errorf("could not get step run data for %s, %e", sqlchelpers.UUIDToStr(stepRun.SRID), err)
}

resStepRun := &contracts.StepRunResult{
StepRunId: sqlchelpers.UUIDToStr(stepRun.StepRun.ID),
StepRunId: sqlchelpers.UUIDToStr(stepRun.SRID),
StepReadableId: stepRun.StepReadableId.String,
JobRunId: sqlchelpers.UUIDToStr(stepRun.JobRunId),
}

if stepRun.StepRun.Error.Valid {
resStepRun.Error = &stepRun.StepRun.Error.String
if data.Error.Valid {
resStepRun.Error = &data.Error.String
}

if stepRun.StepRun.CancelledReason.Valid {
errString := fmt.Sprintf("this step run was cancelled due to %s", stepRun.StepRun.CancelledReason.String)
if stepRun.SRCancelledReason.Valid {
errString := fmt.Sprintf("this step run was cancelled due to %s", stepRun.SRCancelledReason.String)
resStepRun.Error = &errString
}

if stepRun.StepRun.Output != nil {
resStepRun.Output = repository.StringPtr(string(stepRun.StepRun.Output))
if data.Output != nil {
resStepRun.Output = repository.StringPtr(string(data.Output))
}

workflowRunId := sqlchelpers.UUIDToStr(stepRun.WorkflowRunId)
Expand Down
16 changes: 8 additions & 8 deletions internal/services/shared/tasktypes/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ func TenantToStepRunRequeueTask(tenant db.TenantModel) *msgqueue.Message {

func StepRunRetryToTask(stepRun *dbsqlc.GetStepRunForEngineRow, inputData []byte) *msgqueue.Message {
jobRunId := sqlchelpers.UUIDToStr(stepRun.JobRunId)
stepRunId := sqlchelpers.UUIDToStr(stepRun.StepRun.ID)
tenantId := sqlchelpers.UUIDToStr(stepRun.StepRun.TenantId)
stepRunId := sqlchelpers.UUIDToStr(stepRun.SRID)
tenantId := sqlchelpers.UUIDToStr(stepRun.SRTenantId)

payload, _ := datautils.ToJSONMap(StepRunRetryTaskPayload{
JobRunId: jobRunId,
Expand All @@ -175,8 +175,8 @@ func StepRunRetryToTask(stepRun *dbsqlc.GetStepRunForEngineRow, inputData []byte

func StepRunReplayToTask(stepRun *dbsqlc.GetStepRunForEngineRow, inputData []byte) *msgqueue.Message {
jobRunId := sqlchelpers.UUIDToStr(stepRun.JobRunId)
stepRunId := sqlchelpers.UUIDToStr(stepRun.StepRun.ID)
tenantId := sqlchelpers.UUIDToStr(stepRun.StepRun.TenantId)
stepRunId := sqlchelpers.UUIDToStr(stepRun.SRID)
tenantId := sqlchelpers.UUIDToStr(stepRun.SRTenantId)

payload, _ := datautils.ToJSONMap(StepRunReplayTaskPayload{
JobRunId: jobRunId,
Expand All @@ -197,8 +197,8 @@ func StepRunReplayToTask(stepRun *dbsqlc.GetStepRunForEngineRow, inputData []byt
}

func StepRunCancelToTask(stepRun *dbsqlc.GetStepRunForEngineRow, reason string) *msgqueue.Message {
stepRunId := sqlchelpers.UUIDToStr(stepRun.StepRun.ID)
tenantId := sqlchelpers.UUIDToStr(stepRun.StepRun.TenantId)
stepRunId := sqlchelpers.UUIDToStr(stepRun.SRID)
tenantId := sqlchelpers.UUIDToStr(stepRun.SRTenantId)

payload, _ := datautils.ToJSONMap(StepRunNotifyCancelTaskPayload{
StepRunId: stepRunId,
Expand All @@ -220,7 +220,7 @@ func StepRunCancelToTask(stepRun *dbsqlc.GetStepRunForEngineRow, reason string)
func StepRunQueuedToTask(stepRun *dbsqlc.GetStepRunForEngineRow) *msgqueue.Message {
payload, _ := datautils.ToJSONMap(StepRunTaskPayload{
JobRunId: sqlchelpers.UUIDToStr(stepRun.JobRunId),
StepRunId: sqlchelpers.UUIDToStr(stepRun.StepRun.ID),
StepRunId: sqlchelpers.UUIDToStr(stepRun.SRID),
})

metadata, _ := datautils.ToJSONMap(StepRunTaskMetadata{
Expand All @@ -229,7 +229,7 @@ func StepRunQueuedToTask(stepRun *dbsqlc.GetStepRunForEngineRow) *msgqueue.Messa
JobName: stepRun.JobName,
JobId: sqlchelpers.UUIDToStr(stepRun.JobId),
WorkflowVersionId: sqlchelpers.UUIDToStr(stepRun.WorkflowVersionId),
TenantId: sqlchelpers.UUIDToStr(stepRun.StepRun.TenantId),
TenantId: sqlchelpers.UUIDToStr(stepRun.SRTenantId),
})

return &msgqueue.Message{
Expand Down
Loading

0 comments on commit f4f752b

Please sign in to comment.