Skip to content

Commit

Permalink
feat: tenant partitioning (#649)
Browse files Browse the repository at this point in the history
* feat: tenant partitioning

* fix: rebalance inactive partitions, split into separate partitioner

* fix: shutdown partitioner scheduler properly

* update config options

* fix: config options linting
  • Loading branch information
abelanger5 authored Jun 26, 2024
1 parent 68176b7 commit f2c6bc1
Show file tree
Hide file tree
Showing 30 changed files with 91,994 additions and 81,012 deletions.
5 changes: 2 additions & 3 deletions Taskfile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ tasks:
- sudo sh ./hack/dev/manage-hosts.sh add 127.0.0.1 app.dev.hatchet-tools.com
prisma-migrate:
cmds:
- go run github.com/steebchen/prisma-client-go migrate dev --create-only --skip-generate --name "{{.CLI_ARGS}}"
- task: generate-sqlc
- sh ./hack/dev/atlas-migrate.sh {{.CLI_ARGS}}
- DATABASE_URL='postgresql://hatchet:[email protected]:5431/hatchet' sh ./hack/db/atlas-apply.sh
Expand All @@ -67,7 +66,7 @@ tasks:
- task: generate-sqlc
seed-dev:
cmds:
- sh ./hack/dev/run-go-with-env.sh run github.com/steebchen/prisma-client-go migrate deploy
- sh ./hack/dev/run-go-with-env.sh run github.com/steebchen/prisma-client-go migrate dev --skip-generate
- SEED_DEVELOPMENT=true sh ./hack/dev/run-go-with-env.sh run ./cmd/hatchet-admin seed
start-dev:
deps:
Expand Down Expand Up @@ -137,7 +136,7 @@ tasks:
- sh ./generate.sh
generate-sqlc:
cmds:
- DATABASE_URL='postgresql://hatchet:[email protected]:5431/shadow' npx --yes prisma migrate deploy
- DATABASE_URL='postgresql://hatchet:[email protected]:5431/shadow' npx --yes prisma migrate dev --skip-generate
- DATABASE_URL='postgresql://hatchet:[email protected]:5431/shadow' npx --yes prisma migrate diff --from-empty --to-schema-datasource prisma/schema.prisma --script > sql/schema/schema.sql
- cp sql/schema/schema.sql pkg/repository/prisma/dbsqlc/schema.sql
- go run github.com/sqlc-dev/sqlc/cmd/[email protected] generate --file pkg/repository/prisma/dbsqlc/sqlc.yaml
Expand Down
13 changes: 8 additions & 5 deletions api/v1/server/handlers/tenants/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/hatchet-dev/hatchet/api/v1/server/oas/transformers"
"github.com/hatchet-dev/hatchet/pkg/repository"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/db"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/sqlchelpers"
)

func (t *TenantService) TenantCreate(ctx echo.Context, request gen.TenantCreateRequestObject) (gen.TenantCreateResponseObject, error) {
Expand Down Expand Up @@ -55,14 +56,16 @@ func (t *TenantService) TenantCreate(ctx echo.Context, request gen.TenantCreateR
return nil, err
}

err = t.config.EntitlementRepository.TenantLimit().SelectOrInsertTenantLimits(context.Background(), tenant.ID, nil)
tenantId := sqlchelpers.UUIDToStr(tenant.ID)

err = t.config.EntitlementRepository.TenantLimit().SelectOrInsertTenantLimits(context.Background(), tenantId, nil)

if err != nil {
return nil, err
}

// add the user as an owner of the tenant
_, err = t.config.APIRepository.Tenant().CreateTenantMember(tenant.ID, &repository.CreateTenantMemberOpts{
_, err = t.config.APIRepository.Tenant().CreateTenantMember(tenantId, &repository.CreateTenantMemberOpts{
UserId: user.ID,
Role: "OWNER",
})
Expand All @@ -71,19 +74,19 @@ func (t *TenantService) TenantCreate(ctx echo.Context, request gen.TenantCreateR
return nil, err
}

t.config.Analytics.Tenant(tenant.ID, map[string]interface{}{
t.config.Analytics.Tenant(tenantId, map[string]interface{}{
"name": tenant.Name,
"slug": tenant.Slug,
})

t.config.Analytics.Enqueue(
"tenant:create",
user.ID,
&tenant.ID,
&tenantId,
nil,
)

return gen.TenantCreate200JSONResponse(
*transformers.ToTenant(tenant),
*transformers.ToTenantSqlc(tenant),
), nil
}
10 changes: 10 additions & 0 deletions api/v1/server/oas/transformers/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ func ToTenant(tenant *db.TenantModel) *gen.Tenant {
}
}

func ToTenantSqlc(tenant *dbsqlc.Tenant) *gen.Tenant {
return &gen.Tenant{
Metadata: *toAPIMetadata(sqlchelpers.UUIDToStr(tenant.ID), tenant.CreatedAt.Time, tenant.UpdatedAt.Time),
Name: tenant.Name,
Slug: tenant.Slug,
AnalyticsOptOut: &tenant.AnalyticsOptOut,
AlertMemberEmails: &tenant.AlertMemberEmails,
}
}

func ToTenantAlertingSettings(alerting *db.TenantAlertingSettingsModel) *gen.TenantAlertingSettings {
res := &gen.TenantAlertingSettings{
Metadata: *toAPIMetadata(alerting.ID, alerting.CreatedAt, alerting.UpdatedAt),
Expand Down
8 changes: 7 additions & 1 deletion cmd/hatchet-admin/cli/seed.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func runSeed(cf *loader.ConfigLoader) error {
if errors.Is(err, db.ErrNotFound) {
// seed an example tenant
// initialize a tenant
tenant, err = dc.APIRepository.Tenant().CreateTenant(&repository.CreateTenantOpts{
sqlcTenant, err := dc.APIRepository.Tenant().CreateTenant(&repository.CreateTenantOpts{
ID: &dc.Seed.DefaultTenantID,
Name: dc.Seed.DefaultTenantName,
Slug: dc.Seed.DefaultTenantSlug,
Expand All @@ -96,6 +96,12 @@ func runSeed(cf *loader.ConfigLoader) error {
return err
}

tenant, err = dc.APIRepository.Tenant().GetTenantByID(sqlchelpers.UUIDToStr(sqlcTenant.ID))

if err != nil {
return err
}

fmt.Println("created tenant", tenant.ID)

// add the user to the tenant
Expand Down
135 changes: 135 additions & 0 deletions cmd/hatchet-engine/engine/partitioner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package engine

import (
"context"
"fmt"
"time"

"github.com/go-co-op/gocron/v2"
"github.com/google/uuid"

"github.com/hatchet-dev/hatchet/pkg/repository"
)

type partitioner struct {
s gocron.Scheduler
repo repository.TenantEngineRepository
}

func newPartitioner(repo repository.TenantEngineRepository) (partitioner, error) {
s, err := gocron.NewScheduler(gocron.WithLocation(time.UTC))

if err != nil {
return partitioner{}, err
}

return partitioner{s: s, repo: repo}, nil
}

func (p *partitioner) withControllers(ctx context.Context) (*Teardown, string, error) {
partitionId := uuid.New().String()

err := p.repo.CreateControllerPartition(ctx, partitionId)

if err != nil {
return nil, "", fmt.Errorf("could not create engine partition: %w", err)
}

// rebalance partitions on startup
err = p.repo.RebalanceAllControllerPartitions(ctx)

if err != nil {
return nil, "", fmt.Errorf("could not rebalance engine partitions: %w", err)
}

_, err = p.s.NewJob(
gocron.DurationJob(time.Minute*1),
gocron.NewTask(
func() {
rebalanceControllerPartitions(ctx, p.repo) // nolint: errcheck
},
),
)

if err != nil {
return nil, "", fmt.Errorf("could not create rebalance controller partitions job: %w", err)
}

return &Teardown{
Name: "partition teardown",
Fn: func() error {
deleteCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

err := p.repo.DeleteControllerPartition(deleteCtx, partitionId)

if err != nil {
return fmt.Errorf("could not delete controller partition: %w", err)
}

return p.repo.RebalanceAllControllerPartitions(deleteCtx)
},
}, partitionId, nil
}

func (p *partitioner) withTenantWorkers(ctx context.Context) (*Teardown, string, error) {
partitionId := uuid.New().String()

err := p.repo.CreateTenantWorkerPartition(ctx, partitionId)

if err != nil {
return nil, "", fmt.Errorf("could not create engine partition: %w", err)
}

// rebalance partitions on startup
err = p.repo.RebalanceAllTenantWorkerPartitions(ctx)

if err != nil {
return nil, "", fmt.Errorf("could not rebalance engine partitions: %w", err)
}

_, err = p.s.NewJob(
gocron.DurationJob(time.Minute*1),
gocron.NewTask(
func() {
rebalanceTenantWorkerPartitions(ctx, p.repo) // nolint: errcheck
},
),
)

if err != nil {
return nil, "", fmt.Errorf("could not create rebalance tenant worker partitions job: %w", err)
}

return &Teardown{
Name: "partition teardown",
Fn: func() error {
deleteCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

err := p.repo.DeleteTenantWorkerPartition(deleteCtx, partitionId)

if err != nil {
return fmt.Errorf("could not delete worker partition: %w", err)
}

return p.repo.RebalanceAllTenantWorkerPartitions(deleteCtx)
},
}, partitionId, nil
}

func (p *partitioner) start() {
p.s.Start()
}

func (p *partitioner) shutdown() error {
return p.s.Shutdown()
}

func rebalanceControllerPartitions(ctx context.Context, r repository.TenantEngineRepository) error {
return r.RebalanceInactiveControllerPartitions(ctx)
}

func rebalanceTenantWorkerPartitions(ctx context.Context, r repository.TenantEngineRepository) error {
return r.RebalanceInactiveTenantWorkerPartitions(ctx)
}
45 changes: 37 additions & 8 deletions cmd/hatchet-engine/engine/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,19 @@ func RunWithConfig(ctx context.Context, sc *server.ServerConfig) ([]Teardown, er
return nil, fmt.Errorf("could not initialize tracer: %w", err)
}

p, err := newPartitioner(sc.EngineRepository.Tenant())

if err != nil {
return nil, fmt.Errorf("could not create partitioner: %w", err)
}

teardown := []Teardown{}

teardown = append(teardown, Teardown{
Name: "partitioner",
Fn: p.shutdown,
})

var h *health.Health
healthProbes := sc.HasService("health")
if healthProbes {
Expand Down Expand Up @@ -165,47 +176,55 @@ func RunWithConfig(ctx context.Context, sc *server.ServerConfig) ([]Teardown, er
})
}

if sc.HasService("jobscontroller") {
if sc.HasService("queue") {
partitionTeardown, partitionId, err := p.withControllers(ctx)

if err != nil {
return nil, fmt.Errorf("could not create rebalance controller partitions job: %w", err)
}

teardown = append(teardown, *partitionTeardown)

jc, err := jobs.New(
jobs.WithAlerter(sc.Alerter),
jobs.WithMessageQueue(sc.MessageQueue),
jobs.WithRepository(sc.EngineRepository),
jobs.WithLogger(sc.Logger),
jobs.WithPartitionId(partitionId),
)

if err != nil {
return nil, fmt.Errorf("could not create jobs controller: %w", err)
}

cleanup, err := jc.Start()
cleanupJobs, err := jc.Start()
if err != nil {
return nil, fmt.Errorf("could not start jobs controller: %w", err)
}
teardown = append(teardown, Teardown{
Name: "jobs controller",
Fn: cleanup,
Fn: cleanupJobs,
})
}

if sc.HasService("workflowscontroller") {
wc, err := workflows.New(
workflows.WithAlerter(sc.Alerter),
workflows.WithMessageQueue(sc.MessageQueue),
workflows.WithRepository(sc.EngineRepository),
workflows.WithLogger(sc.Logger),
workflows.WithTenantAlerter(sc.TenantAlerter),
workflows.WithPartitionId(partitionId),
)
if err != nil {
return nil, fmt.Errorf("could not create workflows controller: %w", err)
}

cleanup, err := wc.Start()
cleanupWorkflows, err := wc.Start()
if err != nil {
return nil, fmt.Errorf("could not start workflows controller: %w", err)
}
teardown = append(teardown, Teardown{
Name: "workflows controller",
Fn: cleanup,
Fn: cleanupWorkflows,
})
}

Expand Down Expand Up @@ -338,7 +357,15 @@ func RunWithConfig(ctx context.Context, sc *server.ServerConfig) ([]Teardown, er
}

if sc.HasService("webhookscontroller") {
wh := webhooks.New(sc)
partitionTeardown, partitionId, err := p.withTenantWorkers(ctx)

if err != nil {
return nil, fmt.Errorf("could not create rebalance controller partitions job: %w", err)
}

teardown = append(teardown, *partitionTeardown)

wh := webhooks.New(sc, partitionId)

cleanup, err := wh.Start()
if err != nil {
Expand All @@ -364,6 +391,8 @@ func RunWithConfig(ctx context.Context, sc *server.ServerConfig) ([]Teardown, er
h.SetReady(true)
}

p.start()

<-ctx.Done()

if healthProbes {
Expand Down
6 changes: 3 additions & 3 deletions frontend/docs/pages/self-hosting/configuration-options.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ This document outlines the environment variables used to configure the server. T

## Services Configuration

| Variable | Description | Default Value |
| ----------------- | ------------------------ | -------------------------------------------------------------------------------------------------------------------------------- |
| `SERVER_SERVICES` | List of enabled services | `["health", "ticker", "grpc", "eventscontroller", "jobscontroller", "workflowscontroller", "webhookscontroller", "heartbeater"]` |
| Variable | Description | Default Value |
| ----------------- | ------------------------ | ------------------------------------------------------------------------------------------------ |
| `SERVER_SERVICES` | List of enabled services | `["health", "ticker", "grpc", "eventscontroller", "queue", "webhookscontroller", "heartbeater"]` |

## Encryption Configuration

Expand Down
Loading

0 comments on commit f2c6bc1

Please sign in to comment.