Skip to content

Commit

Permalink
fix: add filter condition for min queued item
Browse files Browse the repository at this point in the history
  • Loading branch information
abelanger5 committed Aug 8, 2024
1 parent 8e1e7ce commit 89769ab
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 17 deletions.
2 changes: 1 addition & 1 deletion pkg/repository/prisma/db/db_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -1390,7 +1390,7 @@ model QueueItem {
tenantId String @db.Uuid
queue String

@@index([isQueued, queue, id])
@@index([isQueued, tenantId, queue, id])
}

enum StepRunEventReason {
Expand Down
6 changes: 6 additions & 0 deletions pkg/repository/prisma/dbsqlc/batch.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/repository/prisma/dbsqlc/queue.sql
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ WHERE
qi."isQueued" = true
AND qi."tenantId" = @tenantId::uuid
AND qi."queue" = @queue::text
AND (
sqlc.narg('gtId')::bigint IS NULL OR
qi."id" > sqlc.narg('gtId')::bigint
)
ORDER BY
qi."id" ASC
LIMIT
Expand Down
2 changes: 1 addition & 1 deletion pkg/repository/prisma/dbsqlc/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ CREATE UNIQUE INDEX "JobRunLookupData_jobRunId_tenantId_key" ON "JobRunLookupDat
CREATE UNIQUE INDEX "Queue_tenantId_name_key" ON "Queue"("tenantId" ASC, "name" ASC);

-- CreateIndex
CREATE INDEX "QueueItem_isQueued_queue_id_idx" ON "QueueItem"("isQueued" ASC, "queue" ASC, "id" ASC);
CREATE INDEX "QueueItem_isQueued_tenantId_queue_id_idx" ON "QueueItem"("isQueued" ASC, "tenantId" ASC, "queue" ASC, "id" ASC);

-- CreateIndex
CREATE UNIQUE INDEX "RateLimit_tenantId_key_key" ON "RateLimit"("tenantId" ASC, "key" ASC);
Expand Down
46 changes: 39 additions & 7 deletions pkg/repository/prisma/step_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math/rand"
"sort"
"strings"
"sync"
"time"

"github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -198,11 +199,12 @@ func (s *stepRunAPIRepository) ListStepRunArchives(tenantId string, stepRunId st
}

type stepRunEngineRepository struct {
pool *pgxpool.Pool
v validator.Validator
l *zerolog.Logger
queries *dbsqlc.Queries
cf *server.ConfigFileRuntime
pool *pgxpool.Pool
v validator.Validator
l *zerolog.Logger
queries *dbsqlc.Queries
cf *server.ConfigFileRuntime
cachedMinQueuedIds sync.Map
}

func NewStepRunEngineRepository(pool *pgxpool.Pool, v validator.Validator, l *zerolog.Logger, cf *server.ConfigFileRuntime) repository.StepRunEngineRepository {
Expand Down Expand Up @@ -1004,10 +1006,24 @@ func (s *stepRunEngineRepository) QueueStepRuns(ctx context.Context, tenantId st
for _, queue := range queues {
name := queue.Name

query = append(query, dbsqlc.ListQueueItemsParams{
q := dbsqlc.ListQueueItemsParams{
Tenantid: pgTenantId,
Queue: name,
})
}

// lookup to see if we have a min queued id cached
minQueuedId, ok := s.cachedMinQueuedIds.Load(getCacheName(tenantId, name))

if ok {
if minQueuedIdInt, ok := minQueuedId.(int64); ok {
q.GtId = pgtype.Int8{
Int64: minQueuedIdInt,
Valid: true,
}
}
}

query = append(query, q)
}

queueItems := make([]queueItemWithOrder, 0)
Expand Down Expand Up @@ -1061,6 +1077,7 @@ func (s *stepRunEngineRepository) QueueStepRuns(ctx context.Context, tenantId st
queuedItems := make([]int64, 0)
queuedStepRuns := make([]repository.QueuedStepRun, 0)
timedOutStepRuns := make([]pgtype.UUID, 0)
minQueuedIds := make(map[string]int64)

// get a list of unique actions
uniqueActions := make(map[string]bool)
Expand Down Expand Up @@ -1127,6 +1144,12 @@ func (s *stepRunEngineRepository) QueueStepRuns(ctx context.Context, tenantId st
}

for _, qi := range queueItems {
if currMinQueued, ok := minQueuedIds[qi.Queue]; !ok {
minQueuedIds[qi.Queue] = qi.ID
} else if qi.ID < currMinQueued {
minQueuedIds[qi.Queue] = qi.ID
}

if len(actionsToSlots[qi.ActionId.String]) == 0 {
allStepRunsWithActionAssigned[qi.ActionId.String] = false
unassignedStepRunIds = append(unassignedStepRunIds, qi.StepRunId)
Expand Down Expand Up @@ -1235,6 +1258,11 @@ func (s *stepRunEngineRepository) QueueStepRuns(ctx context.Context, tenantId st
s.l.Warn().Msg(string(debugInfoBytes))
}()

// update the cache with the min queued id
for name, qiId := range minQueuedIds {
s.cachedMinQueuedIds.Store(getCacheName(tenantId, name), qiId)
}

shouldContinue := false

// if at least one of the actions got all step runs assigned, and there are slots remaining, return true
Expand Down Expand Up @@ -2250,3 +2278,7 @@ func (s *stepRunEngineRepository) ClearStepRunPayloadData(ctx context.Context, t

return hasMore, nil
}

func getCacheName(tenantId, queue string) string {
return fmt.Sprintf("%s:%s", tenantId, queue)
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ CREATE UNIQUE INDEX "StepRunPtr_tenantId_key" ON "StepRunPtr"("tenantId");
CREATE UNIQUE INDEX "Queue_tenantId_name_key" ON "Queue"("tenantId", "name");

-- CreateIndex
CREATE INDEX "QueueItem_isQueued_queue_id_idx" ON "QueueItem"("isQueued", "queue", "id");
CREATE INDEX "QueueItem_isQueued_tenantId_queue_id_idx" ON "QueueItem"("isQueued", "tenantId", "queue", "id");

-- CreateIndex
CREATE INDEX "StepRun_status_tenantId_deletedAt_queueOrder_idx" ON "StepRun"("status", "tenantId", "deletedAt", "queueOrder");
2 changes: 1 addition & 1 deletion prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,7 @@ model QueueItem {
tenantId String @db.Uuid
queue String
@@index([isQueued, queue, id])
@@index([isQueued, tenantId, queue, id])
}

enum StepRunEventReason {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ CREATE TABLE "Queue" ("id" bigserial NOT NULL, "tenantId" uuid NOT NULL, "name"
CREATE UNIQUE INDEX "Queue_tenantId_name_key" ON "Queue" ("tenantId", "name");
-- Create "QueueItem" table
CREATE TABLE "QueueItem" ("id" bigserial NOT NULL, "stepRunId" uuid NULL, "stepId" uuid NULL, "actionId" text NULL, "scheduleTimeoutAt" timestamp(3) NULL, "stepTimeout" text NULL, "isQueued" boolean NOT NULL, "tenantId" uuid NOT NULL, "queue" text NOT NULL, PRIMARY KEY ("id"));
-- Create index "QueueItem_isQueued_queue_idx" to table: "QueueItem"
CREATE INDEX "QueueItem_isQueued_queue_id_idx" ON "QueueItem" ("isQueued", "queue", "id");
-- Create index "QueueItem_isQueued_tenantId_queue_id_idx" to table: "QueueItem"
CREATE INDEX "QueueItem_isQueued_tenantId_queue_id_idx" ON "QueueItem" ("isQueued", "tenantId", "queue", "id");
-- Create "StepRunPtr" table
CREATE TABLE "StepRunPtr" ("maxAssignedBlockAddr" bigint NOT NULL DEFAULT 0, "tenantId" uuid NOT NULL, PRIMARY KEY ("tenantId"));
-- Create index "StepRunPtr_tenantId_key" to table: "StepRunPtr"
CREATE UNIQUE INDEX "StepRunPtr_tenantId_key" ON "StepRunPtr" ("tenantId");
-- Create "StepRunQueue" table
CREATE TABLE "StepRunQueue" ("id" bigserial NOT NULL, "queue" text NOT NULL, "blockAddr" bigint NOT NULL, "tenantId" uuid NOT NULL, PRIMARY KEY ("id"));
-- Create index "StepRunQueue_tenantId_queue_key" to table: "StepRunQueue"
CREATE UNIQUE INDEX "StepRunQueue_tenantId_queue_key" ON "StepRunQueue" ("tenantId", "queue");
CREATE UNIQUE INDEX "StepRunQueue_tenantId_queue_key" ON "StepRunQueue" ("tenantId", "queue");
4 changes: 2 additions & 2 deletions sql/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:+QvXlSxYo+il8pX9rvm3NIKC9orM9sFIm5QB7Udefqg=
h1:w0Zr8pQw27CJ7DcMJiqWFrO87TgnEVCuv8khG5T+A/c=
20240115180414_init.sql h1:Ef3ZyjAHkmJPdGF/dEWCahbwgcg6uGJKnDxW2JCRi2k=
20240122014727_v0_6_0.sql h1:o/LdlteAeFgoHJ3e/M4Xnghqt9826IE/Y/h0q95Acuo=
20240126235456_v0_7_0.sql h1:KiVzt/hXgQ6esbdC6OMJOOWuYEXmy1yeCpmsVAHTFKs=
Expand Down Expand Up @@ -43,4 +43,4 @@ h1:+QvXlSxYo+il8pX9rvm3NIKC9orM9sFIm5QB7Udefqg=
20240716143349_v0.39.0.sql h1:K0m6v5XamYBYJgBKpm69Jh3QOOSXKTCSNoU9hR3sLM4=
20240726160629_v0.40.0.sql h1:XmnKVQ/AMUTPnL1SZPwLhmY0KR4sT9B6+uhVbElYx34=
20240728042317_v0.41.0.sql h1:kgjgRXSRGMCXAOUweAwlTFd/uWtx7a24gLdJEfbK1rM=
20240807141711_v0.42.0.sql h1:O1FXe7zXpGAuqapJxf+YNQCg3/huGoecEWHxqmLvPjU=
20240808115843_v0.42.0.sql h1:LvnhW1j6Yax3hDRig1kF/mVAR4eNyCaR96JFzotwYgk=
2 changes: 1 addition & 1 deletion sql/schema/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ CREATE UNIQUE INDEX "JobRunLookupData_jobRunId_tenantId_key" ON "JobRunLookupDat
CREATE UNIQUE INDEX "Queue_tenantId_name_key" ON "Queue"("tenantId" ASC, "name" ASC);

-- CreateIndex
CREATE INDEX "QueueItem_isQueued_queue_id_idx" ON "QueueItem"("isQueued" ASC, "queue" ASC, "id" ASC);
CREATE INDEX "QueueItem_isQueued_tenantId_queue_id_idx" ON "QueueItem"("isQueued" ASC, "tenantId" ASC, "queue" ASC, "id" ASC);

-- CreateIndex
CREATE UNIQUE INDEX "RateLimit_tenantId_key_key" ON "RateLimit"("tenantId" ASC, "key" ASC);
Expand Down

0 comments on commit 89769ab

Please sign in to comment.