Skip to content

Commit

Permalink
fix: more efficient step run events, reduce caching on queue (#981)
Browse files Browse the repository at this point in the history
  • Loading branch information
abelanger5 authored Oct 23, 2024
1 parent 35b115c commit dd5bc90
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 55 deletions.
2 changes: 1 addition & 1 deletion internal/services/controllers/jobs/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func (ec *JobsControllerImpl) handleJobRunCancelled(ctx context.Context, task *m

reason := "JOB_RUN_CANCELLED"

if payload.Reason != nil {
if payload.Reason != nil && *payload.Reason != "" {
reason = *payload.Reason
}

Expand Down
1 change: 1 addition & 0 deletions pkg/repository/buffer/bulk_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ func bulkStepRunEvents(
})

if err != nil {
l.Error().Err(err).Msg("could not create deferred step run event")
return fmt.Errorf("bulk_events - could not create deferred step run event: %w", err)
}

Expand Down
38 changes: 20 additions & 18 deletions pkg/repository/prisma/dbsqlc/step_runs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1090,32 +1090,34 @@ WITH input_values AS (
1 AS "count",
unnest(@data::jsonb[]) AS "data"
),
matched_rows AS (
SELECT DISTINCT ON (sre."stepRunId")
sre."stepRunId", sre."reason", sre."severity", sre."id"
FROM "StepRunEvent" sre
WHERE
sre."stepRunId" = ANY(@stepRunIds::uuid[])
ORDER BY sre."stepRunId", sre."id" DESC
),
locked_rows AS (
SELECT "id"
FROM "StepRunEvent"
WHERE "stepRunId" IN (SELECT unnest(@stepRunIds::uuid[]))
SELECT sre."id", iv.*
FROM "StepRunEvent" sre
JOIN
matched_rows mr ON sre."id" = mr."id"
JOIN
input_values iv ON sre."stepRunId" = iv."stepRunId" AND sre."reason" = iv."reason" AND sre."severity" = iv."severity"
ORDER BY "id"
FOR UPDATE
),
updated AS (
UPDATE "StepRunEvent"
SET
"timeLastSeen" = input_values."timeLastSeen",
"message" = input_values."message",
"timeLastSeen" = locked_rows."timeLastSeen",
"message" = locked_rows."message",
"count" = "StepRunEvent"."count" + 1,
"data" = input_values."data"
FROM input_values
"data" = locked_rows."data"
FROM locked_rows
WHERE
"StepRunEvent"."stepRunId" = input_values."stepRunId"
AND "StepRunEvent"."reason" = input_values."reason"
AND "StepRunEvent"."severity" = input_values."severity"
AND "StepRunEvent"."id" = (
SELECT "id"
FROM "StepRunEvent"
WHERE "stepRunId" = input_values."stepRunId"
ORDER BY "id" DESC
LIMIT 1
)
"StepRunEvent"."id" = locked_rows."id"
RETURNING "StepRunEvent".*
)
INSERT INTO "StepRunEvent" (
Expand All @@ -1139,7 +1141,7 @@ SELECT
"data"
FROM input_values
WHERE NOT EXISTS (
SELECT 1 FROM updated WHERE "stepRunId" = input_values."stepRunId"
SELECT 1 FROM updated WHERE "stepRunId" = input_values."stepRunId" AND "reason" = input_values."reason" AND "severity" = input_values."severity"
);

-- name: CountStepRunEvents :one
Expand Down
38 changes: 20 additions & 18 deletions pkg/repository/prisma/dbsqlc/step_runs.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion pkg/repository/prisma/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,13 +300,15 @@ func NewEngineRepository(pool *pgxpool.Pool, cf *server.ConfigFileRuntime, fs ..
}

rlCache := cache.New(5 * time.Minute)
queueCache := cache.New(5 * time.Minute)

eventEngine, cleanupEventEngine, err := NewEventEngineRepository(pool, opts.v, opts.l, opts.metered)

if err != nil {
return nil, nil, err
}

stepRunEngine, cleanupStepRunEngine, err := NewStepRunEngineRepository(pool, opts.v, opts.l, cf, rlCache)
stepRunEngine, cleanupStepRunEngine, err := NewStepRunEngineRepository(pool, opts.v, opts.l, cf, rlCache, queueCache)

if err != nil {
return nil, nil, err
Expand All @@ -319,6 +321,7 @@ func NewEngineRepository(pool *pgxpool.Pool, cf *server.ConfigFileRuntime, fs ..

return func() error {
rlCache.Stop()
queueCache.Stop()

if err := cleanupStepRunEngine(); err != nil {
return err
Expand Down
28 changes: 11 additions & 17 deletions pkg/repository/prisma/step_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"github.com/hashicorp/go-multierror"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
Expand Down Expand Up @@ -273,7 +272,7 @@ type stepRunEngineRepository struct {
bulkEventBuffer *buffer.BulkEventWriter
bulkSemaphoreReleaser *buffer.BulkSemaphoreReleaser
bulkQueuer *buffer.BulkStepRunQueuer
queueActionTenantCache *lru.Cache[string, bool]
queueActionTenantCache *cache.Cache

updateConcurrentFactor int
maxHashFactor int
Expand All @@ -295,7 +294,7 @@ func (s *stepRunEngineRepository) cleanup() error {
return s.bulkEventBuffer.Cleanup()
}

func NewStepRunEngineRepository(pool *pgxpool.Pool, v validator.Validator, l *zerolog.Logger, cf *server.ConfigFileRuntime, rlCache *cache.Cache) (*stepRunEngineRepository, func() error, error) {
func NewStepRunEngineRepository(pool *pgxpool.Pool, v validator.Validator, l *zerolog.Logger, cf *server.ConfigFileRuntime, rlCache *cache.Cache, queueCache *cache.Cache) (*stepRunEngineRepository, func() error, error) {
queries := dbsqlc.New()

eventBuffer, err := buffer.NewBulkEventWriter(pool, v, l, cf.EventBuffer)
Expand Down Expand Up @@ -328,10 +327,9 @@ func NewStepRunEngineRepository(pool *pgxpool.Pool, v validator.Validator, l *ze
bulkEventBuffer: eventBuffer,
bulkSemaphoreReleaser: semReleaser,
bulkQueuer: bulkQueuer,
queueActionTenantCache: queueCache,
}

s.queueActionTenantCache, _ = lru.New[string, bool](10000)

err = s.startBuffers()

if err != nil {
Expand Down Expand Up @@ -2843,14 +2841,9 @@ func (s *stepRunEngineRepository) UpdateStepRunInputSchema(ctx context.Context,
}

func (s *stepRunEngineRepository) doCachedUpsertOfQueue(ctx context.Context, tx dbsqlc.DBTX, tenantId string, innerStepRun *dbsqlc.GetStepRunForEngineRow) error {
// update the queue with the action id

cacheKey := fmt.Sprintf("t-%s-q-%s", tenantId, innerStepRun.ActionId)

_, ok := s.queueActionTenantCache.Get(cacheKey)

if !ok {
cacheKey := fmt.Sprintf("t-%s-q-%s", tenantId, innerStepRun.SRQueue)

_, err := cache.MakeCacheable(s.queueActionTenantCache, cacheKey, func() (*bool, error) {
err := s.queries.UpsertQueue(
ctx,
tx,
Expand All @@ -2859,15 +2852,16 @@ func (s *stepRunEngineRepository) doCachedUpsertOfQueue(ctx context.Context, tx
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
},
)

if err != nil {
return err
return nil, err
}

s.queueActionTenantCache.Add(cacheKey, true)

}
res := true
return &res, nil
})

return nil
return err
}

func (s *stepRunEngineRepository) QueueStepRun(ctx context.Context, tenantId, stepRunId string, opts *repository.QueueStepRunOpts) (*dbsqlc.GetStepRunForEngineRow, error) {
Expand Down

0 comments on commit dd5bc90

Please sign in to comment.