Skip to content

Commit

Permalink
chore: remove scheduled_jobs.resource_id
Browse files Browse the repository at this point in the history
Use SQLite JSON functions to extract needed informations.
  • Loading branch information
YuukanOO committed Oct 7, 2024
1 parent a829b63 commit 82097d6
Show file tree
Hide file tree
Showing 17 changed files with 126 additions and 82 deletions.
1 change: 0 additions & 1 deletion cmd/serve/front/src/lib/resources/jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import type { Paginated } from '$lib/pagination';

export type Job = {
id: string;
resource_id: string;
group: string;
message_name: string;
message_data: string;
Expand Down
1 change: 0 additions & 1 deletion cmd/serve/front/src/routes/(main)/jobs/+page.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
{:else if value === 'resource'}
<!-- @ts-ignore -->
<div>{translateMessageName(item.message_name)}</div>
<div class="meta">{item.resource_id}</div>
{/if}
</svelte:fragment>

Expand Down
5 changes: 2 additions & 3 deletions internal/deployment/app/cleanup_app/cleanup_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ type Command struct {
To time.Time `json:"to"`
}

func (Command) Name_() string { return "deployment.command.cleanup_app" }
func (c Command) ResourceID() string { return c.AppID }
func (c Command) Group() string { return bus.Group(c.AppID, c.Environment, c.TargetID) }
func (Command) Name_() string { return "deployment.command.cleanup_app" }
func (c Command) Group() string { return bus.Group(c.AppID, c.Environment, c.TargetID) }

func Handler(
reader domain.TargetsReader,
Expand Down
6 changes: 3 additions & 3 deletions internal/deployment/app/cleanup_app/cleanup_app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func Test_CleanupApp(t *testing.T) {
var provider mockProvider
user := authfixture.User()
target := fixture.Target(fixture.WithTargetCreatedBy(user.ID()))
target.Configured(target.CurrentVersion(), nil, nil)
assert.Nil(t, target.Configured(target.CurrentVersion(), nil, nil))
assert.Nil(t, target.RequestDelete(false, "uid"))

handler, ctx := arrange(t, &provider,
Expand Down Expand Up @@ -146,7 +146,7 @@ func Test_CleanupApp(t *testing.T) {
var provider mockProvider
user := authfixture.User()
target := fixture.Target(fixture.WithTargetCreatedBy(user.ID()))
target.Configured(target.CurrentVersion(), nil, errors.New("configuration failed"))
assert.Nil(t, target.Configured(target.CurrentVersion(), nil, errors.New("configuration failed")))
app := fixture.App(fixture.WithAppCreatedBy(user.ID()),
fixture.WithEnvironmentConfig(
domain.NewEnvironmentConfig(target.ID()),
Expand Down Expand Up @@ -181,7 +181,7 @@ func Test_CleanupApp(t *testing.T) {
var provider mockProvider
user := authfixture.User()
target := fixture.Target(fixture.WithTargetCreatedBy(user.ID()))
target.Configured(target.CurrentVersion(), nil, nil)
assert.Nil(t, target.Configured(target.CurrentVersion(), nil, nil))
app := fixture.App(fixture.WithAppCreatedBy(user.ID()),
fixture.WithEnvironmentConfig(
domain.NewEnvironmentConfig(target.ID()),
Expand Down
33 changes: 16 additions & 17 deletions internal/deployment/app/cleanup_app/on_app_cleanup_requested.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,21 @@ func OnAppCleanupRequestedHandler(scheduler bus.Scheduler) bus.SignalHandler[dom
return func(ctx context.Context, evt domain.AppCleanupRequested) error {
now := time.Now().UTC()

if err := scheduler.Queue(ctx, Command{
AppID: string(evt.ID),
Environment: string(domain.Production),
TargetID: string(evt.ProductionConfig.Target()),
From: evt.ProductionConfig.Version(),
To: now,
}); err != nil {
return err
}

return scheduler.Queue(ctx, Command{
AppID: string(evt.ID),
Environment: string(domain.Staging),
TargetID: string(evt.StagingConfig.Target()),
From: evt.StagingConfig.Version(),
To: now,
})
return scheduler.Queue(ctx,
Command{
AppID: string(evt.ID),
Environment: string(domain.Production),
TargetID: string(evt.ProductionConfig.Target()),
From: evt.ProductionConfig.Version(),
To: now,
},
Command{
AppID: string(evt.ID),
Environment: string(domain.Staging),
TargetID: string(evt.StagingConfig.Target()),
From: evt.StagingConfig.Version(),
To: now,
},
)
}
}
7 changes: 3 additions & 4 deletions internal/deployment/app/cleanup_target/cleanup_target.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@ import (
type Command struct {
bus.AsyncCommand

ID string `json:"id"`
ID string `json:"target_id"`
}

func (Command) Name_() string { return "deployment.command.cleanup_target" }
func (c Command) ResourceID() string { return c.ID }
func (c Command) Group() string { return c.ID }
func (Command) Name_() string { return "deployment.command.cleanup_target" }
func (c Command) Group() string { return c.ID }

func Handler(
reader domain.TargetsReader,
Expand Down
6 changes: 4 additions & 2 deletions internal/deployment/app/cleanup_target/on_job_dismissed.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ func OnJobDismissedHandler(
writer domain.TargetsWriter,
) bus.SignalHandler[embedded.JobDismissed] {
return func(ctx context.Context, evt embedded.JobDismissed) error {
if _, isCleanupJob := evt.Command.(Command); !isCleanupJob {
cmd, isCleanupJob := evt.Command.(Command)

if !isCleanupJob {
return nil
}

target, err := reader.GetByID(ctx, domain.TargetID(evt.Command.ResourceID()))
target, err := reader.GetByID(ctx, domain.TargetID(cmd.ID))

if err != nil {
// Target deleted, no need to go further
Expand Down
7 changes: 3 additions & 4 deletions internal/deployment/app/configure_target/configure_target.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@ import (
type Command struct {
bus.AsyncCommand

ID string `json:"id"`
ID string `json:"target_id"`
Version time.Time `json:"version"`
}

func (Command) Name_() string { return "deployment.command.configure_target" }
func (c Command) ResourceID() string { return c.ID }
func (c Command) Group() string { return c.ID }
func (Command) Name_() string { return "deployment.command.configure_target" }
func (c Command) Group() string { return c.ID }

func Handler(
reader domain.TargetsReader,
Expand Down
6 changes: 2 additions & 4 deletions internal/deployment/app/deploy/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package deploy
import (
"context"
"errors"
"strconv"

"github.com/YuukanOO/seelf/internal/deployment/domain"
"github.com/YuukanOO/seelf/pkg/apperr"
Expand All @@ -20,9 +19,8 @@ type Command struct {
TargetID string `json:"target_id"`
}

func (Command) Name_() string { return "deployment.command.deploy" }
func (c Command) ResourceID() string { return c.AppID + "-" + strconv.Itoa(c.DeploymentNumber) }
func (c Command) Group() string { return bus.Group(c.AppID, c.Environment, c.TargetID) }
func (Command) Name_() string { return "deployment.command.deploy" }
func (c Command) Group() string { return bus.Group(c.AppID, c.Environment, c.TargetID) }

// Handle the deployment process.
// If an unexpected error occurs during this process, it uses the bus.PreserveOrder function
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,48 @@ ALTER TABLE apps ADD version DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP;
ALTER TABLE registries ADD version DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP;
ALTER TABLE targets ADD version DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP;

ALTER TABLE apps ADD history TEXT NOT NULL DEFAULT '{}';
ALTER TABLE apps ADD history JSON NOT NULL DEFAULT '{}';
UPDATE apps SET history = '{"production": ["' || production_target || '"], "staging": ["'|| staging_target ||'"]}';

-- Since the group has changed for configure and cleanup, just update those jobs.
UPDATE scheduled_jobs
SET
[group] = resource_id
WHERE message_name IN ('deployment.command.configure_target', 'deployment.command.cleanup_target');
,message_data = '{"target_id": "' || resource_id || '"}'
WHERE message_name = 'deployment.command.cleanup_target';

UPDATE scheduled_jobs
SET
[group] = resource_id
,message_data = '{"target_id": "' || resource_id || '", "version": "' || (message_data ->> '$.version') || '"}'
WHERE message_name = 'deployment.command.configure_target';

-- Since those messages no longer exists.
DELETE FROM scheduled_jobs WHERE message_name IN ('deployment.command.delete_target', 'deployment.command.delete_app');

DROP TRIGGER IF EXISTS on_deployment_failed_cleanup_jobs;

-- When a deployment is marked as failed, remove all pending jobs for that resource.
-- This is to avoid running jobs that are no longer needed.
CREATE TRIGGER IF NOT EXISTS on_deployment_failed_cleanup_jobs AFTER UPDATE ON deployments
WHEN OLD.state_status != NEW.state_status AND NEW.state_status = 2 -- Only when the deployment goes to the failed state
BEGIN
DELETE FROM scheduled_jobs
WHERE
message_name = 'deployment.command.deploy'
AND message_data ->> '$.app_id' = NEW.app_id
AND message_data ->> '$.deployment_number' = NEW.deployment_number
AND retrieved = 0;
END;

-- When a target is configured, no need to keep old configure jobs since they are outdated.
CREATE TRIGGER IF NOT EXISTS on_target_configure_remove_outdated_jobs
BEFORE INSERT ON scheduled_jobs
WHEN NEW.message_name = 'deployment.command.configure_target'
BEGIN
DELETE FROM scheduled_jobs
WHERE
resource_id = NEW.resource_id
AND message_name = 'deployment.command.configure_target'
message_name = 'deployment.command.configure_target'
AND message_data ->> '$.target_id' = NEW.message_data ->> '$.target_id'
AND retrieved = 0;
END
END;
1 change: 0 additions & 1 deletion pkg/bus/embedded/get_jobs/get_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
type (
Job struct {
ID string `json:"id"`
ResourceID string `json:"resource_id"`
Group string `json:"group"`
MessageName string `json:"message_name"`
MessageData string `json:"message_data"`
Expand Down
22 changes: 12 additions & 10 deletions pkg/bus/embedded/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,15 @@ type returnCommand struct {
err error
}

func (r returnCommand) Name_() string { return "returnCommand" }
func (r returnCommand) ResourceID() string { return "" }
func (r returnCommand) Group() string { return "" }
func (r returnCommand) Name_() string { return "returnCommand" }
func (r returnCommand) Group() string { return "" }

type unhandledCommand struct {
bus.AsyncCommand
}

func (u unhandledCommand) Name_() string { return "unhandledCommand" }
func (u unhandledCommand) ResourceID() string { return "" }
func (u unhandledCommand) Group() string { return "" }
func (u unhandledCommand) Name_() string { return "unhandledCommand" }
func (u unhandledCommand) Group() string { return "" }

var (
_ embedded.JobsStore = (*adapter)(nil)
Expand All @@ -145,12 +143,12 @@ var (

type job struct {
id string
msg bus.AsyncRequest
cmd bus.AsyncRequest
err error
}

func (j *job) ID() string { return j.id }
func (j *job) Command() bus.AsyncRequest { return j.msg }
func (j *job) Command() bus.AsyncRequest { return j.cmd }

type adapter struct {
wg sync.WaitGroup
Expand All @@ -160,9 +158,13 @@ type adapter struct {
done []*job
}

func (a *adapter) Queue(ctx context.Context, msg bus.AsyncRequest) error {
func (a *adapter) Queue(ctx context.Context, requests ...bus.AsyncRequest) error {
a.wg.Add(1)
a.jobs = append(a.jobs, &job{id: id.New[string](), msg: msg})

for _, req := range requests {
a.jobs = append(a.jobs, &job{id: id.New[string](), cmd: req})
}

return nil
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/bus/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ type (
AsyncRequest interface {
MutateRequest
TypedRequest[AsyncResult]
ResourceID() string // ID of the main resource processed by the request
Group() string // Work group for this request, at most one job per group is processed at any given time
Group() string // Work group for this request, at most one job per group is processed at any given time
}

// Request to query the system.
Expand Down
5 changes: 2 additions & 3 deletions pkg/bus/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,8 @@ type asyncCommand struct {
SomeValue int
}

func (asyncCommand) Name_() string { return "AsyncCommand" }
func (asyncCommand) ResourceID() string { return "" }
func (asyncCommand) Group() string { return "" }
func (asyncCommand) Name_() string { return "AsyncCommand" }
func (asyncCommand) Group() string { return "" }

type getQuery struct {
bus.Query[int]
Expand Down
2 changes: 1 addition & 1 deletion pkg/bus/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ import "context"
// Enable scheduled dispatching of a message.
type Scheduler interface {
// Queue a request to be dispatched asynchronously at a later time.
Queue(context.Context, AsyncRequest) error
Queue(context.Context, ...AsyncRequest) error
}
40 changes: 22 additions & 18 deletions pkg/bus/sqlite/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ func (s *JobsStore) GetAllJobs(ctx context.Context, query get_jobs.Query) (stora
return builder.
Select[get_jobs.Job](`
id
,resource_id
,[group]
,message_name
,message_data
Expand Down Expand Up @@ -124,27 +123,33 @@ func (s *JobsStore) DismissJob(ctx context.Context, cmd dismiss_job.Command) (bu

func (s *JobsStore) Queue(
ctx context.Context,
msg bus.AsyncRequest,
requests ...bus.AsyncRequest,
) error {
now := time.Now().UTC()
msgData, err := storage.ValueJSON(msg)

if err != nil {
return err
for _, req := range requests {
cmdData, err := storage.ValueJSON(req)

if err != nil {
return err
}

if err = builder.
Insert("scheduled_jobs", builder.Values{
"id": id.New[string](),
"[group]": req.Group(),
"message_name": req.Name_(),
"message_data": cmdData,
"queued_at": now,
"not_before": now,
"retrieved": false,
}).
Exec(s.db, ctx); err != nil {
return err
}
}

return builder.
Insert("scheduled_jobs", builder.Values{
"id": id.New[string](),
"resource_id": msg.ResourceID(),
"[group]": msg.Group(),
"message_name": msg.Name_(),
"message_data": msgData,
"queued_at": now,
"not_before": now,
"retrieved": false,
}).
Exec(s.db, ctx)
return nil
}

func (s *JobsStore) GetNextPendingJobs(ctx context.Context) ([]embedded.Job, error) {
Expand Down Expand Up @@ -226,7 +231,6 @@ func jobMapper(scanner storage.Scanner) (embedded.Job, error) {
func jobQueryMapper(scanner storage.Scanner) (j get_jobs.Job, err error) {
err = scanner.Scan(
&j.ID,
&j.ResourceID,
&j.Group,
&j.MessageName,
&j.MessageData,
Expand Down
Loading

0 comments on commit 82097d6

Please sign in to comment.