diff --git a/pkg/repository/prisma/step_run.go b/pkg/repository/prisma/step_run.go index 89e18c668..6f680fe41 100644 --- a/pkg/repository/prisma/step_run.go +++ b/pkg/repository/prisma/step_run.go @@ -938,12 +938,17 @@ func (s *stepRunEngineRepository) UnassignStepRunFromWorker(ctx context.Context, } type debugInfo struct { - UniqueActions []string `json:"unique_actions"` - TotalStepRuns int `json:"total_step_runs"` - TotalStepRunsAssigned int `json:"total_step_runs_assigned"` - TotalSlots int `json:"total_slots"` - StartingSlotsPerAction map[string]int `json:"starting_slots"` - EndingSlotsPerAction map[string]int `json:"ending_slots"` + UniqueActions []string `json:"unique_actions"` + TotalStepRuns int `json:"total_step_runs"` + TotalStepRunsAssigned int `json:"total_step_runs_assigned"` + TotalSlots int `json:"total_slots"` + StartingSlotsPerAction map[string]int `json:"starting_slots"` + EndingSlotsPerAction map[string]int `json:"ending_slots"` + Duration string `json:"duration"` + TimeToStartDuration string `json:"time_to_start_duration"` + TimeToListQueuesDuration string `json:"time_to_list_queues_duration"` + TimeToListQueueItemsDuration string `json:"time_to_list_queue_items_duration"` + DurationsOfQueueListResults []string `json:"durations_of_queue_list_results"` } type queueItemWithOrder struct { @@ -953,6 +958,8 @@ type queueItemWithOrder struct { } func (s *stepRunEngineRepository) QueueStepRuns(ctx context.Context, tenantId string) (repository.QueueStepRunsResult, error) { + startedAt := time.Now().UTC() + emptyRes := repository.QueueStepRunsResult{ Queued: []repository.QueuedStepRun{}, SchedulingTimedOut: []string{}, @@ -971,6 +978,8 @@ func (s *stepRunEngineRepository) QueueStepRuns(ctx context.Context, tenantId st return emptyRes, err } + timeToStartDuration := time.Since(startedAt) + defer deferRollback(ctx, s.l, tx.Rollback) // list queues @@ -984,19 +993,29 @@ func (s *stepRunEngineRepository) QueueStepRuns(ctx context.Context, tenantId st return emptyRes, nil } + timeToListQueues := time.Since(startedAt) + // construct params for list queue items query := []dbsqlc.ListQueueItemsParams{} + // randomly order queues + rand.New(rand.NewSource(time.Now().UnixNano())).Shuffle(len(queues), func(i, j int) { queues[i], queues[j] = queues[j], queues[i] }) // nolint:gosec + for _, queue := range queues { + name := queue.Name + query = append(query, dbsqlc.ListQueueItemsParams{ Tenantid: pgTenantId, - Queue: queue.Name, + Queue: name, }) } queueItems := make([]queueItemWithOrder, 0) results := s.queries.ListQueueItems(ctx, tx, query) + defer results.Close() + + durationsOfQueueListResults := make([]string, 0) // TODO: verify whether this is multithreaded and if it is, whether thread safe results.Query(func(i int, qi []*dbsqlc.QueueItem, err error) { @@ -1004,18 +1023,32 @@ func (s *stepRunEngineRepository) QueueStepRuns(ctx context.Context, tenantId st return } + queueName := "" + for i := range qi { queueItems = append(queueItems, queueItemWithOrder{ QueueItem: qi[i], order: i, }) + + queueName = qi[i].Queue } + + durationsOfQueueListResults = append(durationsOfQueueListResults, fmt.Sprintf("%s:%s:%s", queues[i].Name, queueName, time.Since(startedAt).String())) }) + err = results.Close() + + if err != nil { + return emptyRes, fmt.Errorf("could not close queue items result: %w", err) + } + if len(queueItems) == 0 { return emptyRes, nil } + timeToListQueueItemsDuration := time.Since(startedAt) + // sort the queue items by order from least to greatest, then by queue id sort.Slice(queueItems, func(i, j int) bool { if queueItems[i].order == queueItems[j].order { @@ -1179,12 +1212,17 @@ func (s *stepRunEngineRepository) QueueStepRuns(ctx context.Context, tenantId st defer func() { // pretty-print json with 2 spaces debugInfo := debugInfo{ - UniqueActions: uniqueActionsArr, - TotalStepRuns: len(queueItems), - TotalStepRunsAssigned: len(stepRunIds), - TotalSlots: len(slots), - StartingSlotsPerAction: startingSlotsPerAction, - EndingSlotsPerAction: endingSlotsPerAction, + UniqueActions: uniqueActionsArr, + TotalStepRuns: len(queueItems), + TotalStepRunsAssigned: len(stepRunIds), + TotalSlots: len(slots), + StartingSlotsPerAction: startingSlotsPerAction, + EndingSlotsPerAction: endingSlotsPerAction, + Duration: time.Since(startedAt).String(), + TimeToStartDuration: timeToStartDuration.String(), + TimeToListQueuesDuration: timeToListQueues.String(), + TimeToListQueueItemsDuration: timeToListQueueItemsDuration.String(), + DurationsOfQueueListResults: durationsOfQueueListResults, } debugInfoBytes, err := json.MarshalIndent(debugInfo, "", " ")