Skip to content

Commit

Permalink
Merge branch 'fix--batch-requeue-and-pg-stat' of github.com:hatchet-d…
Browse files Browse the repository at this point in the history
…ev/hatchet into fix--batch-requeue-and-pg-stat
  • Loading branch information
abelanger5 committed Aug 8, 2024
2 parents c165fb1 + bb96401 commit e736317
Show file tree
Hide file tree
Showing 13 changed files with 467 additions and 34 deletions.
23 changes: 23 additions & 0 deletions pkg/repository/prisma/dbsqlc/step_runs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,29 @@ WHERE
sr."id" = input."id"
RETURNING sr."id";

-- name: GetDesiredLabels :many
SELECT
"key",
"strValue",
"intValue",
"required",
"weight",
"comparator"
FROM
"StepDesiredWorkerLabel"
WHERE
"stepId" = @stepId::uuid;

-- name: GetWorkerLabels :many
SELECT
"key",
"strValue",
"intValue"
FROM
"WorkerLabel"
WHERE
"workerId" = @workerId::uuid;

-- name: AcquireWorkerSemaphoreSlotAndAssign :one
WITH valid_workers AS (
SELECT
Expand Down
87 changes: 87 additions & 0 deletions pkg/repository/prisma/dbsqlc/step_runs.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 51 additions & 2 deletions pkg/repository/prisma/step_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,17 @@ func (s *stepRunEngineRepository) UnassignStepRunFromWorker(ctx context.Context,
})
}

func UniqueSet[T any](i []T, keyFunc func(T) string) map[string]struct{} {
set := make(map[string]struct{})

for _, item := range i {
key := keyFunc(item)
set[key] = struct{}{}
}

return set
}

type debugInfo struct {
UniqueActions []string `json:"unique_actions"`
TotalStepRuns int `json:"total_step_runs"`
Expand Down Expand Up @@ -1061,7 +1072,7 @@ func (s *stepRunEngineRepository) QueueStepRuns(ctx context.Context, tenantId st

durationsOfQueueListResults := make([]string, 0)

queueItems := make([]scheduling.QueueItemWithOrder, 0)
queueItems := make([]*scheduling.QueueItemWithOrder, 0)

// TODO: verify whether this is multithreaded and if it is, whether thread safe
results.Query(func(i int, qi []*dbsqlc.QueueItem, err error) {
Expand All @@ -1072,7 +1083,7 @@ func (s *stepRunEngineRepository) QueueStepRuns(ctx context.Context, tenantId st
queueName := ""

for i := range qi {
queueItems = append(queueItems, scheduling.QueueItemWithOrder{
queueItems = append(queueItems, &scheduling.QueueItemWithOrder{
QueueItem: qi[i],
Order: i,
})
Expand Down Expand Up @@ -1174,12 +1185,50 @@ func (s *stepRunEngineRepository) QueueStepRuns(ctx context.Context, tenantId st
return emptyRes, fmt.Errorf("could not list semaphore slots to assign: %w", err)
}

// GET UNIQUE STEP IDS
stepIdSet := UniqueSet(queueItems, func(x *scheduling.QueueItemWithOrder) string {
return sqlchelpers.UUIDToStr(x.StepId)
})

desiredLabels := make(map[string][]*dbsqlc.GetDesiredLabelsRow)
hasDesired := false

// GET DESIRED LABELS
// OPTIMIZATION: CACHEABLE
for stepId := range stepIdSet {
labels, err := s.queries.GetDesiredLabels(ctx, tx, sqlchelpers.UUIDFromStr(stepId))
if err != nil {
return emptyRes, fmt.Errorf("could not get desired labels: %w", err)
}
desiredLabels[stepId] = labels
hasDesired = true
}

var workerLabels = make(map[string][]*dbsqlc.GetWorkerLabelsRow)

if hasDesired {
// GET UNIQUE WORKER LABELS
workerIdSet := UniqueSet(slots, func(x *dbsqlc.ListSemaphoreSlotsToAssignRow) string {
return sqlchelpers.UUIDToStr(x.WorkerId)
})

for workerId := range workerIdSet {
labels, err := s.queries.GetWorkerLabels(ctx, tx, sqlchelpers.UUIDFromStr(workerId))
if err != nil {
return emptyRes, fmt.Errorf("could not get worker labels: %w", err)
}
workerLabels[workerId] = labels
}
}

plan, err := scheduling.GeneratePlan(
slots,
uniqueActionsArr,
queueItems,
stepRateUnits,
currRateLimitValues,
workerLabels,
desiredLabels,
)

if err != nil {
Expand Down
52 changes: 52 additions & 0 deletions pkg/scheduling/affinity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package scheduling

import "github.com/hatchet-dev/hatchet/pkg/repository/prisma/dbsqlc"

func ComputeWeight(s []*dbsqlc.GetDesiredLabelsRow, l []*dbsqlc.GetWorkerLabelsRow) int {
totalWeight := 0

for _, desiredLabel := range s {
labelFound := false
for _, workerLabel := range l {
if desiredLabel.Key == workerLabel.Key {
labelFound = true
switch desiredLabel.Comparator {
case dbsqlc.WorkerLabelComparatorEQUAL:
if (desiredLabel.StrValue.Valid && workerLabel.StrValue.Valid && desiredLabel.StrValue.String == workerLabel.StrValue.String) ||
(desiredLabel.IntValue.Valid && workerLabel.IntValue.Valid && desiredLabel.IntValue.Int32 == workerLabel.IntValue.Int32) {
totalWeight += int(desiredLabel.Weight)
}
case dbsqlc.WorkerLabelComparatorNOTEQUAL:
if (desiredLabel.StrValue.Valid && workerLabel.StrValue.Valid && desiredLabel.StrValue.String != workerLabel.StrValue.String) ||
(desiredLabel.IntValue.Valid && workerLabel.IntValue.Valid && desiredLabel.IntValue.Int32 != workerLabel.IntValue.Int32) {
totalWeight += int(desiredLabel.Weight)
}
case dbsqlc.WorkerLabelComparatorGREATERTHAN:
if desiredLabel.IntValue.Valid && workerLabel.IntValue.Valid && workerLabel.IntValue.Int32 > desiredLabel.IntValue.Int32 {
totalWeight += int(desiredLabel.Weight)
}
case dbsqlc.WorkerLabelComparatorLESSTHAN:
if desiredLabel.IntValue.Valid && workerLabel.IntValue.Valid && workerLabel.IntValue.Int32 < desiredLabel.IntValue.Int32 {
totalWeight += int(desiredLabel.Weight)
}
case dbsqlc.WorkerLabelComparatorGREATERTHANOREQUAL:
if desiredLabel.IntValue.Valid && workerLabel.IntValue.Valid && workerLabel.IntValue.Int32 >= desiredLabel.IntValue.Int32 {
totalWeight += int(desiredLabel.Weight)
}
case dbsqlc.WorkerLabelComparatorLESSTHANOREQUAL:
if desiredLabel.IntValue.Valid && workerLabel.IntValue.Valid && workerLabel.IntValue.Int32 <= desiredLabel.IntValue.Int32 {
totalWeight += int(desiredLabel.Weight)
}
}
break // Move to the next desired label
}
}

// If the label is required but not found, return -1 to indicate an invalid match
if desiredLabel.Required && !labelFound {
return -1
}
}

return totalWeight
}
40 changes: 40 additions & 0 deletions pkg/scheduling/affinity_output.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
{
"StepRunIds": [
"16fa711e-c03e-435c-88d9-62adf1591d98",
"064e787a-6cfd-4f82-8b6d-d8031459fdee"
],
"StepRunTimeouts": [
"60s",
"60s"
],
"SlotIds": [
"8cf68f09-b914-4f31-9777-8082b751a2d4",
"e2c744b8-b914-4f31-9777-8082b751a2d4"
],
"WorkerIds": [
"aaaaaaaa-43db-4b5f-876a-c6c71f4d52aa",
"aaaaaaaa-43db-4b5f-876a-c6c71f4d52aa"
],
"UnassignedStepRunIds": [],
"QueuedStepRuns": [
{
"StepRunId": "16fa711e-c03e-435c-88d9-62adf1591d98",
"WorkerId": "aaaaaaaa-43db-4b5f-876a-c6c71f4d52aa",
"DispatcherId": "9994a9eb-430d-46da-934d-d9dd953cfd21"
},
{
"StepRunId": "064e787a-6cfd-4f82-8b6d-d8031459fdee",
"WorkerId": "aaaaaaaa-43db-4b5f-876a-c6c71f4d52aa",
"DispatcherId": "9994a9eb-430d-46da-934d-d9dd953cfd21"
}
],
"TimedOutStepRuns": [],
"QueuedItems": [
137295,
152259
],
"ShouldContinue": false,
"MinQueuedIds": {
"child:process2": 137295
}
}
Loading

0 comments on commit e736317

Please sign in to comment.