Skip to content

Commit

Permalink
fix: race condition if a step run has multiple parents (#1095)
Browse files Browse the repository at this point in the history
  • Loading branch information
abelanger5 authored Dec 6, 2024
1 parent db6558a commit a104f13
Showing 1 changed file with 33 additions and 32 deletions.
65 changes: 33 additions & 32 deletions pkg/repository/prisma/step_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2544,38 +2544,6 @@ func (s *stepRunEngineRepository) StepRunSucceeded(ctx context.Context, tenantId
ctx, span := telemetry.NewSpan(ctx, "step-run-started-db")
defer span.End()

finished := string(dbsqlc.StepRunStatusSUCCEEDED)

data := &updateStepRunQueueData{
Hash: hashToBucket(sqlchelpers.UUIDFromStr(workflowRunId), s.maxHashFactor),
StepRunId: stepRunId,
TenantId: tenantId,
FinishedAt: &finishedAt,
Status: &finished,
Output: output,
}

// we write to the buffer first so we don't get race conditions when we resolve workflow run statuses
done, err := s.bulkStatusBuffer.BuffItem(tenantId, data)

if err != nil {
return fmt.Errorf("could not buffer step run succeeded: %w", err)
}

var response *buffer.FlushResponse[pgtype.UUID]

select {
case response = <-done:
case <-ctx.Done():
return ctx.Err()
case <-time.After(20 * time.Second):
return fmt.Errorf("timeout waiting for step run succeeded to be flushed to db")
}

if response.Err != nil {
return fmt.Errorf("could not flush step run succeeded: %w", response.Err)
}

tx, err := s.pool.Begin(ctx)

if err != nil {
Expand Down Expand Up @@ -2609,6 +2577,39 @@ func (s *stepRunEngineRepository) StepRunSucceeded(ctx context.Context, tenantId
return fmt.Errorf("could not commit transaction: %w", err)
}

finished := string(dbsqlc.StepRunStatusSUCCEEDED)

data := &updateStepRunQueueData{
Hash: hashToBucket(sqlchelpers.UUIDFromStr(workflowRunId), s.maxHashFactor),
StepRunId: stepRunId,
TenantId: tenantId,
FinishedAt: &finishedAt,
Status: &finished,
Output: output,
}

// we write to the buffer after updating the job run lookup data so we don't start a step run (which has multiple
// parents) before the job run lookup data is updated
done, err := s.bulkStatusBuffer.BuffItem(tenantId, data)

if err != nil {
return fmt.Errorf("could not buffer step run succeeded: %w", err)
}

var response *buffer.FlushResponse[pgtype.UUID]

select {
case response = <-done:
case <-ctx.Done():
return ctx.Err()
case <-time.After(20 * time.Second):
return fmt.Errorf("timeout waiting for step run succeeded to be flushed to db")
}

if response.Err != nil {
return fmt.Errorf("could not flush step run succeeded: %w", response.Err)
}

return nil
}

Expand Down

0 comments on commit a104f13

Please sign in to comment.