Skip to content

Commit

Permalink
Improve handling of result size in dispatcher (#1133)
Browse files Browse the repository at this point in the history
* Improve handling of result size in dispatcher

* small if case

* 3MB as var
  • Loading branch information
abelanger5 authored Dec 18, 2024
1 parent 23dc410 commit b383ae8
Showing 1 changed file with 50 additions and 10 deletions.
60 changes: 50 additions & 10 deletions internal/services/dispatcher/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,20 +823,59 @@ func (s *sendTimeFilter) canSend() bool {
return true
}

func calculateResultsSize(results []*contracts.StepRunResult) int64 {
var totalSize int64
const payloadSizeThreshold = 3 * 1024 * 1024

func cleanResults(results []*contracts.StepRunResult) []*contracts.StepRunResult {
totalSize, sizeOfOutputs, _ := calculateResultsSize(results)

if totalSize < payloadSizeThreshold {
return results
}

if sizeOfOutputs >= payloadSizeThreshold {
return nil
}

// otherwise, attempt to clean the results by removing large error fields
cleanedResults := make([]*contracts.StepRunResult, 0, len(results))

fieldThreshold := (payloadSizeThreshold - sizeOfOutputs) / len(results) // how much overhead we'd have per result or error field, in the worst case

for _, result := range results {
// Size of the struct fields
if result == nil {
continue
}

// we only try to clean the error field at the moment, as modifying the output is more risky
if result.Error != nil && len(*result.Error) > fieldThreshold {
result.Error = repository.StringPtr("Error is too large to send over the Hatchet stream.")
}

cleanedResults = append(cleanedResults, result)
}

// if we are still over the limit, we just return nil
if totalSize, _, _ := calculateResultsSize(cleanedResults); totalSize > payloadSizeThreshold {
return nil
}

return cleanedResults
}

func calculateResultsSize(results []*contracts.StepRunResult) (totalSize int, sizeOfOutputs int, sizeOfErrors int) {
for _, result := range results {
if result != nil && result.Output != nil {
// Assuming StepRunResult has fields like ID, Status, Output, etc.
// Adjust these based on the actual struct definition
totalSize += int64(len(*result.Output))
// Add sizes of other fields...
totalSize += (len(*result.Output))
sizeOfOutputs += (len(*result.Output))
}

if result != nil && result.Error != nil {
totalSize += (len(*result.Error))
sizeOfErrors += (len(*result.Error))
}
}

return totalSize
return
}

// SubscribeToWorkflowEvents registers workflow events with the dispatcher
Expand All @@ -857,9 +896,10 @@ func (s *DispatcherImpl) SubscribeToWorkflowRuns(server contracts.Dispatcher_Sub
sendMu := sync.Mutex{}

sendEvent := func(e *contracts.WorkflowRunEvent) error {
results := cleanResults(e.Results)

if calculateResultsSize(e.Results) > 3*1024*1024 {
s.l.Warn().Msgf("results size for workflow run %s exceeds 3MB", e.WorkflowRunId)
if results == nil {
s.l.Warn().Msgf("results size for workflow run %s exceeds 3MB and cannot be reduced", e.WorkflowRunId)
e.Results = nil
}

Expand Down

0 comments on commit b383ae8

Please sign in to comment.