diff --git a/cmd/serve/front/src/lib/resources/jobs.ts b/cmd/serve/front/src/lib/resources/jobs.ts index 32f0b0c..12d385d 100644 --- a/cmd/serve/front/src/lib/resources/jobs.ts +++ b/cmd/serve/front/src/lib/resources/jobs.ts @@ -4,7 +4,6 @@ import type { Paginated } from '$lib/pagination'; export type Job = { id: string; - resource_id: string; group: string; message_name: string; message_data: string; diff --git a/cmd/serve/front/src/routes/(main)/jobs/+page.svelte b/cmd/serve/front/src/routes/(main)/jobs/+page.svelte index 9ef633a..0d5b2e1 100644 --- a/cmd/serve/front/src/routes/(main)/jobs/+page.svelte +++ b/cmd/serve/front/src/routes/(main)/jobs/+page.svelte @@ -48,7 +48,6 @@ {:else if value === 'resource'}
{translateMessageName(item.message_name)}
-
{item.resource_id}
{/if} diff --git a/internal/deployment/app/cleanup_app/cleanup_app.go b/internal/deployment/app/cleanup_app/cleanup_app.go index 99248ac..1807a14 100644 --- a/internal/deployment/app/cleanup_app/cleanup_app.go +++ b/internal/deployment/app/cleanup_app/cleanup_app.go @@ -25,9 +25,8 @@ type Command struct { To time.Time `json:"to"` } -func (Command) Name_() string { return "deployment.command.cleanup_app" } -func (c Command) ResourceID() string { return c.AppID } -func (c Command) Group() string { return bus.Group(c.AppID, c.Environment, c.TargetID) } +func (Command) Name_() string { return "deployment.command.cleanup_app" } +func (c Command) Group() string { return bus.Group(c.AppID, c.Environment, c.TargetID) } func Handler( reader domain.TargetsReader, diff --git a/internal/deployment/app/cleanup_app/cleanup_app_test.go b/internal/deployment/app/cleanup_app/cleanup_app_test.go index 150faf6..4a3e431 100644 --- a/internal/deployment/app/cleanup_app/cleanup_app_test.go +++ b/internal/deployment/app/cleanup_app/cleanup_app_test.go @@ -108,7 +108,7 @@ func Test_CleanupApp(t *testing.T) { var provider mockProvider user := authfixture.User() target := fixture.Target(fixture.WithTargetCreatedBy(user.ID())) - target.Configured(target.CurrentVersion(), nil, nil) + assert.Nil(t, target.Configured(target.CurrentVersion(), nil, nil)) assert.Nil(t, target.RequestDelete(false, "uid")) handler, ctx := arrange(t, &provider, @@ -146,7 +146,7 @@ func Test_CleanupApp(t *testing.T) { var provider mockProvider user := authfixture.User() target := fixture.Target(fixture.WithTargetCreatedBy(user.ID())) - target.Configured(target.CurrentVersion(), nil, errors.New("configuration failed")) + assert.Nil(t, target.Configured(target.CurrentVersion(), nil, errors.New("configuration failed"))) app := fixture.App(fixture.WithAppCreatedBy(user.ID()), fixture.WithEnvironmentConfig( domain.NewEnvironmentConfig(target.ID()), @@ -181,7 +181,7 @@ func Test_CleanupApp(t *testing.T) { var provider mockProvider user := authfixture.User() target := fixture.Target(fixture.WithTargetCreatedBy(user.ID())) - target.Configured(target.CurrentVersion(), nil, nil) + assert.Nil(t, target.Configured(target.CurrentVersion(), nil, nil)) app := fixture.App(fixture.WithAppCreatedBy(user.ID()), fixture.WithEnvironmentConfig( domain.NewEnvironmentConfig(target.ID()), diff --git a/internal/deployment/app/cleanup_app/on_app_cleanup_requested.go b/internal/deployment/app/cleanup_app/on_app_cleanup_requested.go index 726a091..69bc6d5 100644 --- a/internal/deployment/app/cleanup_app/on_app_cleanup_requested.go +++ b/internal/deployment/app/cleanup_app/on_app_cleanup_requested.go @@ -13,22 +13,21 @@ func OnAppCleanupRequestedHandler(scheduler bus.Scheduler) bus.SignalHandler[dom return func(ctx context.Context, evt domain.AppCleanupRequested) error { now := time.Now().UTC() - if err := scheduler.Queue(ctx, Command{ - AppID: string(evt.ID), - Environment: string(domain.Production), - TargetID: string(evt.ProductionConfig.Target()), - From: evt.ProductionConfig.Version(), - To: now, - }); err != nil { - return err - } - - return scheduler.Queue(ctx, Command{ - AppID: string(evt.ID), - Environment: string(domain.Staging), - TargetID: string(evt.StagingConfig.Target()), - From: evt.StagingConfig.Version(), - To: now, - }) + return scheduler.Queue(ctx, + Command{ + AppID: string(evt.ID), + Environment: string(domain.Production), + TargetID: string(evt.ProductionConfig.Target()), + From: evt.ProductionConfig.Version(), + To: now, + }, + Command{ + AppID: string(evt.ID), + Environment: string(domain.Staging), + TargetID: string(evt.StagingConfig.Target()), + From: evt.StagingConfig.Version(), + To: now, + }, + ) } } diff --git a/internal/deployment/app/cleanup_target/cleanup_target.go b/internal/deployment/app/cleanup_target/cleanup_target.go index 9a5f8fb..d6b8e70 100644 --- a/internal/deployment/app/cleanup_target/cleanup_target.go +++ b/internal/deployment/app/cleanup_target/cleanup_target.go @@ -14,12 +14,11 @@ import ( type Command struct { bus.AsyncCommand - ID string `json:"id"` + ID string `json:"target_id"` } -func (Command) Name_() string { return "deployment.command.cleanup_target" } -func (c Command) ResourceID() string { return c.ID } -func (c Command) Group() string { return c.ID } +func (Command) Name_() string { return "deployment.command.cleanup_target" } +func (c Command) Group() string { return c.ID } func Handler( reader domain.TargetsReader, diff --git a/internal/deployment/app/cleanup_target/on_job_dismissed.go b/internal/deployment/app/cleanup_target/on_job_dismissed.go index 01eb9f0..f87eec9 100644 --- a/internal/deployment/app/cleanup_target/on_job_dismissed.go +++ b/internal/deployment/app/cleanup_target/on_job_dismissed.go @@ -15,11 +15,13 @@ func OnJobDismissedHandler( writer domain.TargetsWriter, ) bus.SignalHandler[embedded.JobDismissed] { return func(ctx context.Context, evt embedded.JobDismissed) error { - if _, isCleanupJob := evt.Command.(Command); !isCleanupJob { + cmd, isCleanupJob := evt.Command.(Command) + + if !isCleanupJob { return nil } - target, err := reader.GetByID(ctx, domain.TargetID(evt.Command.ResourceID())) + target, err := reader.GetByID(ctx, domain.TargetID(cmd.ID)) if err != nil { // Target deleted, no need to go further diff --git a/internal/deployment/app/configure_target/configure_target.go b/internal/deployment/app/configure_target/configure_target.go index 7ee5e81..569a1b0 100644 --- a/internal/deployment/app/configure_target/configure_target.go +++ b/internal/deployment/app/configure_target/configure_target.go @@ -14,13 +14,12 @@ import ( type Command struct { bus.AsyncCommand - ID string `json:"id"` + ID string `json:"target_id"` Version time.Time `json:"version"` } -func (Command) Name_() string { return "deployment.command.configure_target" } -func (c Command) ResourceID() string { return c.ID } -func (c Command) Group() string { return c.ID } +func (Command) Name_() string { return "deployment.command.configure_target" } +func (c Command) Group() string { return c.ID } func Handler( reader domain.TargetsReader, diff --git a/internal/deployment/app/deploy/deploy.go b/internal/deployment/app/deploy/deploy.go index 6fade1e..450a15d 100644 --- a/internal/deployment/app/deploy/deploy.go +++ b/internal/deployment/app/deploy/deploy.go @@ -3,7 +3,6 @@ package deploy import ( "context" "errors" - "strconv" "github.com/YuukanOO/seelf/internal/deployment/domain" "github.com/YuukanOO/seelf/pkg/apperr" @@ -20,9 +19,8 @@ type Command struct { TargetID string `json:"target_id"` } -func (Command) Name_() string { return "deployment.command.deploy" } -func (c Command) ResourceID() string { return c.AppID + "-" + strconv.Itoa(c.DeploymentNumber) } -func (c Command) Group() string { return bus.Group(c.AppID, c.Environment, c.TargetID) } +func (Command) Name_() string { return "deployment.command.deploy" } +func (c Command) Group() string { return bus.Group(c.AppID, c.Environment, c.TargetID) } // Handle the deployment process. // If an unexpected error occurs during this process, it uses the bus.PreserveOrder function diff --git a/internal/deployment/infra/sqlite/migrations/1728345059_add_app_cleanup_needed_on.up.sql b/internal/deployment/infra/sqlite/migrations/1728345059_add_app_cleanup_needed_on.up.sql index 4ff1094..dc317d9 100644 --- a/internal/deployment/infra/sqlite/migrations/1728345059_add_app_cleanup_needed_on.up.sql +++ b/internal/deployment/infra/sqlite/migrations/1728345059_add_app_cleanup_needed_on.up.sql @@ -12,18 +12,40 @@ ALTER TABLE apps ADD version DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP; ALTER TABLE registries ADD version DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP; ALTER TABLE targets ADD version DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP; -ALTER TABLE apps ADD history TEXT NOT NULL DEFAULT '{}'; +ALTER TABLE apps ADD history JSON NOT NULL DEFAULT '{}'; UPDATE apps SET history = '{"production": ["' || production_target || '"], "staging": ["'|| staging_target ||'"]}'; -- Since the group has changed for configure and cleanup, just update those jobs. UPDATE scheduled_jobs SET [group] = resource_id -WHERE message_name IN ('deployment.command.configure_target', 'deployment.command.cleanup_target'); + ,message_data = '{"target_id": "' || resource_id || '"}' +WHERE message_name = 'deployment.command.cleanup_target'; + +UPDATE scheduled_jobs +SET + [group] = resource_id + ,message_data = '{"target_id": "' || resource_id || '", "version": "' || (message_data ->> '$.version') || '"}' +WHERE message_name = 'deployment.command.configure_target'; -- Since those messages no longer exists. DELETE FROM scheduled_jobs WHERE message_name IN ('deployment.command.delete_target', 'deployment.command.delete_app'); +DROP TRIGGER IF EXISTS on_deployment_failed_cleanup_jobs; + +-- When a deployment is marked as failed, remove all pending jobs for that resource. +-- This is to avoid running jobs that are no longer needed. +CREATE TRIGGER IF NOT EXISTS on_deployment_failed_cleanup_jobs AFTER UPDATE ON deployments + WHEN OLD.state_status != NEW.state_status AND NEW.state_status = 2 -- Only when the deployment goes to the failed state +BEGIN + DELETE FROM scheduled_jobs + WHERE + message_name = 'deployment.command.deploy' + AND message_data ->> '$.app_id' = NEW.app_id + AND message_data ->> '$.deployment_number' = NEW.deployment_number + AND retrieved = 0; +END; + -- When a target is configured, no need to keep old configure jobs since they are outdated. CREATE TRIGGER IF NOT EXISTS on_target_configure_remove_outdated_jobs BEFORE INSERT ON scheduled_jobs @@ -31,7 +53,7 @@ WHEN NEW.message_name = 'deployment.command.configure_target' BEGIN DELETE FROM scheduled_jobs WHERE - resource_id = NEW.resource_id - AND message_name = 'deployment.command.configure_target' + message_name = 'deployment.command.configure_target' + AND message_data ->> '$.target_id' = NEW.message_data ->> '$.target_id' AND retrieved = 0; -END \ No newline at end of file +END; \ No newline at end of file diff --git a/pkg/bus/embedded/get_jobs/get_jobs.go b/pkg/bus/embedded/get_jobs/get_jobs.go index 28a8c2b..cdb2612 100644 --- a/pkg/bus/embedded/get_jobs/get_jobs.go +++ b/pkg/bus/embedded/get_jobs/get_jobs.go @@ -11,7 +11,6 @@ import ( type ( Job struct { ID string `json:"id"` - ResourceID string `json:"resource_id"` Group string `json:"group"` MessageName string `json:"message_name"` MessageData string `json:"message_data"` diff --git a/pkg/bus/embedded/runner_test.go b/pkg/bus/embedded/runner_test.go index 6156eff..7cec068 100644 --- a/pkg/bus/embedded/runner_test.go +++ b/pkg/bus/embedded/runner_test.go @@ -126,17 +126,15 @@ type returnCommand struct { err error } -func (r returnCommand) Name_() string { return "returnCommand" } -func (r returnCommand) ResourceID() string { return "" } -func (r returnCommand) Group() string { return "" } +func (r returnCommand) Name_() string { return "returnCommand" } +func (r returnCommand) Group() string { return "" } type unhandledCommand struct { bus.AsyncCommand } -func (u unhandledCommand) Name_() string { return "unhandledCommand" } -func (u unhandledCommand) ResourceID() string { return "" } -func (u unhandledCommand) Group() string { return "" } +func (u unhandledCommand) Name_() string { return "unhandledCommand" } +func (u unhandledCommand) Group() string { return "" } var ( _ embedded.JobsStore = (*adapter)(nil) @@ -145,12 +143,12 @@ var ( type job struct { id string - msg bus.AsyncRequest + cmd bus.AsyncRequest err error } func (j *job) ID() string { return j.id } -func (j *job) Command() bus.AsyncRequest { return j.msg } +func (j *job) Command() bus.AsyncRequest { return j.cmd } type adapter struct { wg sync.WaitGroup @@ -160,9 +158,13 @@ type adapter struct { done []*job } -func (a *adapter) Queue(ctx context.Context, msg bus.AsyncRequest) error { +func (a *adapter) Queue(ctx context.Context, requests ...bus.AsyncRequest) error { a.wg.Add(1) - a.jobs = append(a.jobs, &job{id: id.New[string](), msg: msg}) + + for _, req := range requests { + a.jobs = append(a.jobs, &job{id: id.New[string](), cmd: req}) + } + return nil } diff --git a/pkg/bus/message.go b/pkg/bus/message.go index 411d9ee..acd76c6 100644 --- a/pkg/bus/message.go +++ b/pkg/bus/message.go @@ -65,8 +65,7 @@ type ( AsyncRequest interface { MutateRequest TypedRequest[AsyncResult] - ResourceID() string // ID of the main resource processed by the request - Group() string // Work group for this request, at most one job per group is processed at any given time + Group() string // Work group for this request, at most one job per group is processed at any given time } // Request to query the system. diff --git a/pkg/bus/message_test.go b/pkg/bus/message_test.go index d851288..b997248 100644 --- a/pkg/bus/message_test.go +++ b/pkg/bus/message_test.go @@ -181,9 +181,8 @@ type asyncCommand struct { SomeValue int } -func (asyncCommand) Name_() string { return "AsyncCommand" } -func (asyncCommand) ResourceID() string { return "" } -func (asyncCommand) Group() string { return "" } +func (asyncCommand) Name_() string { return "AsyncCommand" } +func (asyncCommand) Group() string { return "" } type getQuery struct { bus.Query[int] diff --git a/pkg/bus/scheduler.go b/pkg/bus/scheduler.go index 4a1e60c..889785a 100644 --- a/pkg/bus/scheduler.go +++ b/pkg/bus/scheduler.go @@ -5,5 +5,5 @@ import "context" // Enable scheduled dispatching of a message. type Scheduler interface { // Queue a request to be dispatched asynchronously at a later time. - Queue(context.Context, AsyncRequest) error + Queue(context.Context, ...AsyncRequest) error } diff --git a/pkg/bus/sqlite/jobs.go b/pkg/bus/sqlite/jobs.go index c528b2b..07d9211 100644 --- a/pkg/bus/sqlite/jobs.go +++ b/pkg/bus/sqlite/jobs.go @@ -58,7 +58,6 @@ func (s *JobsStore) GetAllJobs(ctx context.Context, query get_jobs.Query) (stora return builder. Select[get_jobs.Job](` id - ,resource_id ,[group] ,message_name ,message_data @@ -124,27 +123,33 @@ func (s *JobsStore) DismissJob(ctx context.Context, cmd dismiss_job.Command) (bu func (s *JobsStore) Queue( ctx context.Context, - msg bus.AsyncRequest, + requests ...bus.AsyncRequest, ) error { now := time.Now().UTC() - msgData, err := storage.ValueJSON(msg) - if err != nil { - return err + for _, req := range requests { + cmdData, err := storage.ValueJSON(req) + + if err != nil { + return err + } + + if err = builder. + Insert("scheduled_jobs", builder.Values{ + "id": id.New[string](), + "[group]": req.Group(), + "message_name": req.Name_(), + "message_data": cmdData, + "queued_at": now, + "not_before": now, + "retrieved": false, + }). + Exec(s.db, ctx); err != nil { + return err + } } - return builder. - Insert("scheduled_jobs", builder.Values{ - "id": id.New[string](), - "resource_id": msg.ResourceID(), - "[group]": msg.Group(), - "message_name": msg.Name_(), - "message_data": msgData, - "queued_at": now, - "not_before": now, - "retrieved": false, - }). - Exec(s.db, ctx) + return nil } func (s *JobsStore) GetNextPendingJobs(ctx context.Context) ([]embedded.Job, error) { @@ -226,7 +231,6 @@ func jobMapper(scanner storage.Scanner) (embedded.Job, error) { func jobQueryMapper(scanner storage.Scanner) (j get_jobs.Job, err error) { err = scanner.Scan( &j.ID, - &j.ResourceID, &j.Group, &j.MessageName, &j.MessageData, diff --git a/pkg/bus/sqlite/migrations/1727345059_nullable_policy.up.sql b/pkg/bus/sqlite/migrations/1727345059_nullable_policy.up.sql index 77f94a2..d441124 100644 --- a/pkg/bus/sqlite/migrations/1727345059_nullable_policy.up.sql +++ b/pkg/bus/sqlite/migrations/1727345059_nullable_policy.up.sql @@ -1,7 +1,32 @@ -ALTER TABLE scheduled_jobs DROP COLUMN policy; +CREATE TEMP TABLE tmp_scheduled_jobs AS +SELECT * FROM scheduled_jobs; --- Make the policy column nullable since it's not used anymore. +DROP TABLE scheduled_jobs; + +-- Make the policy and resource_id columns nullable since it's not used anymore. -- Since migrations are executed by domain orders (scheduler, auth, deployment) and -- I have failed by making deployment rely on the scheduled_jobs table, I must keep -- it or else I have to update the migration (which I think is worse). -ALTER TABLE scheduled_jobs ADD policy INTEGER NULL; +CREATE TABLE scheduled_jobs +( + id TEXT NOT NULL, + resource_id TEXT NULL, + [group] TEXT NOT NULL, + message_name TEXT NOT NULL, + message_data JSON NOT NULL, + queued_at DATETIME NOT NULL, + not_before DATETIME NOT NULL, + policy INTEGER NULL, + errcode TEXT NULL, + retrieved BOOLEAN NOT NULL DEFAULT false, + + CONSTRAINT pk_scheduled_jobs PRIMARY KEY(id) +); + +INSERT INTO scheduled_jobs +SELECT * FROM tmp_scheduled_jobs; + +CREATE INDEX idx_scheduled_jobs_group ON scheduled_jobs([group]); +CREATE INDEX idx_scheduled_jobs_message_name ON scheduled_jobs(message_name); + +DROP TABLE tmp_scheduled_jobs; \ No newline at end of file