Skip to content

Commit

Permalink
random shuffle queues
Browse files Browse the repository at this point in the history
  • Loading branch information
abelanger5 committed Aug 7, 2024
1 parent 88406ad commit eea50a9
Showing 1 changed file with 51 additions and 13 deletions.
64 changes: 51 additions & 13 deletions pkg/repository/prisma/step_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,12 +938,17 @@ func (s *stepRunEngineRepository) UnassignStepRunFromWorker(ctx context.Context,
}

type debugInfo struct {
UniqueActions []string `json:"unique_actions"`
TotalStepRuns int `json:"total_step_runs"`
TotalStepRunsAssigned int `json:"total_step_runs_assigned"`
TotalSlots int `json:"total_slots"`
StartingSlotsPerAction map[string]int `json:"starting_slots"`
EndingSlotsPerAction map[string]int `json:"ending_slots"`
UniqueActions []string `json:"unique_actions"`
TotalStepRuns int `json:"total_step_runs"`
TotalStepRunsAssigned int `json:"total_step_runs_assigned"`
TotalSlots int `json:"total_slots"`
StartingSlotsPerAction map[string]int `json:"starting_slots"`
EndingSlotsPerAction map[string]int `json:"ending_slots"`
Duration string `json:"duration"`
TimeToStartDuration string `json:"time_to_start_duration"`
TimeToListQueuesDuration string `json:"time_to_list_queues_duration"`
TimeToListQueueItemsDuration string `json:"time_to_list_queue_items_duration"`
DurationsOfQueueListResults []string `json:"durations_of_queue_list_results"`
}

type queueItemWithOrder struct {
Expand All @@ -953,6 +958,8 @@ type queueItemWithOrder struct {
}

func (s *stepRunEngineRepository) QueueStepRuns(ctx context.Context, tenantId string) (repository.QueueStepRunsResult, error) {
startedAt := time.Now().UTC()

emptyRes := repository.QueueStepRunsResult{
Queued: []repository.QueuedStepRun{},
SchedulingTimedOut: []string{},
Expand All @@ -971,6 +978,8 @@ func (s *stepRunEngineRepository) QueueStepRuns(ctx context.Context, tenantId st
return emptyRes, err
}

timeToStartDuration := time.Since(startedAt)

defer deferRollback(ctx, s.l, tx.Rollback)

// list queues
Expand All @@ -984,38 +993,62 @@ func (s *stepRunEngineRepository) QueueStepRuns(ctx context.Context, tenantId st
return emptyRes, nil
}

timeToListQueues := time.Since(startedAt)

// construct params for list queue items
query := []dbsqlc.ListQueueItemsParams{}

// randomly order queues
rand.New(rand.NewSource(time.Now().UnixNano())).Shuffle(len(queues), func(i, j int) { queues[i], queues[j] = queues[j], queues[i] }) // nolint:gosec

for _, queue := range queues {
name := queue.Name

query = append(query, dbsqlc.ListQueueItemsParams{
Tenantid: pgTenantId,
Queue: queue.Name,
Queue: name,
})
}

queueItems := make([]queueItemWithOrder, 0)

results := s.queries.ListQueueItems(ctx, tx, query)
defer results.Close()

durationsOfQueueListResults := make([]string, 0)

// TODO: verify whether this is multithreaded and if it is, whether thread safe
results.Query(func(i int, qi []*dbsqlc.QueueItem, err error) {
if err != nil {
return
}

queueName := ""

for i := range qi {
queueItems = append(queueItems, queueItemWithOrder{
QueueItem: qi[i],
order: i,
})

queueName = qi[i].Queue
}

durationsOfQueueListResults = append(durationsOfQueueListResults, fmt.Sprintf("%s:%s:%s", queues[i].Name, queueName, time.Since(startedAt).String()))
})

err = results.Close()

if err != nil {
return emptyRes, fmt.Errorf("could not close queue items result: %w", err)
}

if len(queueItems) == 0 {
return emptyRes, nil
}

timeToListQueueItemsDuration := time.Since(startedAt)

// sort the queue items by order from least to greatest, then by queue id
sort.Slice(queueItems, func(i, j int) bool {
if queueItems[i].order == queueItems[j].order {
Expand Down Expand Up @@ -1179,12 +1212,17 @@ func (s *stepRunEngineRepository) QueueStepRuns(ctx context.Context, tenantId st
defer func() {
// pretty-print json with 2 spaces
debugInfo := debugInfo{
UniqueActions: uniqueActionsArr,
TotalStepRuns: len(queueItems),
TotalStepRunsAssigned: len(stepRunIds),
TotalSlots: len(slots),
StartingSlotsPerAction: startingSlotsPerAction,
EndingSlotsPerAction: endingSlotsPerAction,
UniqueActions: uniqueActionsArr,
TotalStepRuns: len(queueItems),
TotalStepRunsAssigned: len(stepRunIds),
TotalSlots: len(slots),
StartingSlotsPerAction: startingSlotsPerAction,
EndingSlotsPerAction: endingSlotsPerAction,
Duration: time.Since(startedAt).String(),
TimeToStartDuration: timeToStartDuration.String(),
TimeToListQueuesDuration: timeToListQueues.String(),
TimeToListQueueItemsDuration: timeToListQueueItemsDuration.String(),
DurationsOfQueueListResults: durationsOfQueueListResults,
}

debugInfoBytes, err := json.MarshalIndent(debugInfo, "", " ")
Expand Down

0 comments on commit eea50a9

Please sign in to comment.