Skip to content

Commit

Permalink
chore: remove scheduled_jobs.resource_id
Browse files Browse the repository at this point in the history
Use SQLite JSON functions to extract needed informations.
  • Loading branch information
YuukanOO committed Oct 7, 2024
1 parent a829b63 commit cc3394b
Show file tree
Hide file tree
Showing 13 changed files with 79 additions and 43 deletions.
5 changes: 2 additions & 3 deletions internal/deployment/app/cleanup_app/cleanup_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ type Command struct {
To time.Time `json:"to"`
}

func (Command) Name_() string { return "deployment.command.cleanup_app" }
func (c Command) ResourceID() string { return c.AppID }
func (c Command) Group() string { return bus.Group(c.AppID, c.Environment, c.TargetID) }
func (Command) Name_() string { return "deployment.command.cleanup_app" }
func (c Command) Group() string { return bus.Group(c.AppID, c.Environment, c.TargetID) }

func Handler(
reader domain.TargetsReader,
Expand Down
6 changes: 3 additions & 3 deletions internal/deployment/app/cleanup_app/cleanup_app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func Test_CleanupApp(t *testing.T) {
var provider mockProvider
user := authfixture.User()
target := fixture.Target(fixture.WithTargetCreatedBy(user.ID()))
target.Configured(target.CurrentVersion(), nil, nil)
assert.Nil(t, target.Configured(target.CurrentVersion(), nil, nil))
assert.Nil(t, target.RequestDelete(false, "uid"))

handler, ctx := arrange(t, &provider,
Expand Down Expand Up @@ -146,7 +146,7 @@ func Test_CleanupApp(t *testing.T) {
var provider mockProvider
user := authfixture.User()
target := fixture.Target(fixture.WithTargetCreatedBy(user.ID()))
target.Configured(target.CurrentVersion(), nil, errors.New("configuration failed"))
assert.Nil(t, target.Configured(target.CurrentVersion(), nil, errors.New("configuration failed")))
app := fixture.App(fixture.WithAppCreatedBy(user.ID()),
fixture.WithEnvironmentConfig(
domain.NewEnvironmentConfig(target.ID()),
Expand Down Expand Up @@ -181,7 +181,7 @@ func Test_CleanupApp(t *testing.T) {
var provider mockProvider
user := authfixture.User()
target := fixture.Target(fixture.WithTargetCreatedBy(user.ID()))
target.Configured(target.CurrentVersion(), nil, nil)
assert.Nil(t, target.Configured(target.CurrentVersion(), nil, nil))
app := fixture.App(fixture.WithAppCreatedBy(user.ID()),
fixture.WithEnvironmentConfig(
domain.NewEnvironmentConfig(target.ID()),
Expand Down
7 changes: 3 additions & 4 deletions internal/deployment/app/cleanup_target/cleanup_target.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@ import (
type Command struct {
bus.AsyncCommand

ID string `json:"id"`
ID string `json:"target_id"`
}

func (Command) Name_() string { return "deployment.command.cleanup_target" }
func (c Command) ResourceID() string { return c.ID }
func (c Command) Group() string { return c.ID }
func (Command) Name_() string { return "deployment.command.cleanup_target" }
func (c Command) Group() string { return c.ID }

func Handler(
reader domain.TargetsReader,
Expand Down
6 changes: 4 additions & 2 deletions internal/deployment/app/cleanup_target/on_job_dismissed.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ func OnJobDismissedHandler(
writer domain.TargetsWriter,
) bus.SignalHandler[embedded.JobDismissed] {
return func(ctx context.Context, evt embedded.JobDismissed) error {
if _, isCleanupJob := evt.Command.(Command); !isCleanupJob {
cmd, isCleanupJob := evt.Command.(Command)

if !isCleanupJob {
return nil
}

target, err := reader.GetByID(ctx, domain.TargetID(evt.Command.ResourceID()))
target, err := reader.GetByID(ctx, domain.TargetID(cmd.ID))

if err != nil {
// Target deleted, no need to go further
Expand Down
7 changes: 3 additions & 4 deletions internal/deployment/app/configure_target/configure_target.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@ import (
type Command struct {
bus.AsyncCommand

ID string `json:"id"`
ID string `json:"target_id"`
Version time.Time `json:"version"`
}

func (Command) Name_() string { return "deployment.command.configure_target" }
func (c Command) ResourceID() string { return c.ID }
func (c Command) Group() string { return c.ID }
func (Command) Name_() string { return "deployment.command.configure_target" }
func (c Command) Group() string { return c.ID }

func Handler(
reader domain.TargetsReader,
Expand Down
6 changes: 2 additions & 4 deletions internal/deployment/app/deploy/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package deploy
import (
"context"
"errors"
"strconv"

"github.com/YuukanOO/seelf/internal/deployment/domain"
"github.com/YuukanOO/seelf/pkg/apperr"
Expand All @@ -20,9 +19,8 @@ type Command struct {
TargetID string `json:"target_id"`
}

func (Command) Name_() string { return "deployment.command.deploy" }
func (c Command) ResourceID() string { return c.AppID + "-" + strconv.Itoa(c.DeploymentNumber) }
func (c Command) Group() string { return bus.Group(c.AppID, c.Environment, c.TargetID) }
func (Command) Name_() string { return "deployment.command.deploy" }
func (c Command) Group() string { return bus.Group(c.AppID, c.Environment, c.TargetID) }

// Handle the deployment process.
// If an unexpected error occurs during this process, it uses the bus.PreserveOrder function
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,48 @@ ALTER TABLE apps ADD version DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP;
ALTER TABLE registries ADD version DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP;
ALTER TABLE targets ADD version DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP;

ALTER TABLE apps ADD history TEXT NOT NULL DEFAULT '{}';
ALTER TABLE apps ADD history JSON NOT NULL DEFAULT '{}';
UPDATE apps SET history = '{"production": ["' || production_target || '"], "staging": ["'|| staging_target ||'"]}';

-- Since the group has changed for configure and cleanup, just update those jobs.
UPDATE scheduled_jobs
SET
[group] = resource_id
WHERE message_name IN ('deployment.command.configure_target', 'deployment.command.cleanup_target');
,message_data = '{"target_id": "' || resource_id || '"}'
WHERE message_name = 'deployment.command.cleanup_target';

UPDATE scheduled_jobs
SET
[group] = resource_id
,message_data = '{"target_id": "' || resource_id || '", "version": "' || (message_data ->> '$.version') || '"}'
WHERE message_name = 'deployment.command.configure_target';

-- Since those messages no longer exists.
DELETE FROM scheduled_jobs WHERE message_name IN ('deployment.command.delete_target', 'deployment.command.delete_app');

DROP TRIGGER IF EXISTS on_deployment_failed_cleanup_jobs;

-- When a deployment is marked as failed, remove all pending jobs for that resource.
-- This is to avoid running jobs that are no longer needed.
CREATE TRIGGER IF NOT EXISTS on_deployment_failed_cleanup_jobs AFTER UPDATE ON deployments
WHEN OLD.state_status != NEW.state_status AND NEW.state_status = 2 -- Only when the deployment goes to the failed state
BEGIN
DELETE FROM scheduled_jobs
WHERE
message_name = 'deployment.command.deploy'
AND message_data ->> '$.app_id' = NEW.app_id
AND message_data ->> '$.deployment_number' = NEW.deployment_number
AND retrieved = 0;
END;

-- When a target is configured, no need to keep old configure jobs since they are outdated.
CREATE TRIGGER IF NOT EXISTS on_target_configure_remove_outdated_jobs
BEFORE INSERT ON scheduled_jobs
WHEN NEW.message_name = 'deployment.command.configure_target'
BEGIN
DELETE FROM scheduled_jobs
WHERE
resource_id = NEW.resource_id
AND message_name = 'deployment.command.configure_target'
message_name = 'deployment.command.configure_target'
AND message_data ->> '$.target_id' = NEW.message_data ->> '$.target_id'
AND retrieved = 0;
END
END;
1 change: 0 additions & 1 deletion pkg/bus/embedded/get_jobs/get_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
type (
Job struct {
ID string `json:"id"`
ResourceID string `json:"resource_id"`
Group string `json:"group"`
MessageName string `json:"message_name"`
MessageData string `json:"message_data"`
Expand Down
10 changes: 4 additions & 6 deletions pkg/bus/embedded/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,15 @@ type returnCommand struct {
err error
}

func (r returnCommand) Name_() string { return "returnCommand" }
func (r returnCommand) ResourceID() string { return "" }
func (r returnCommand) Group() string { return "" }
func (r returnCommand) Name_() string { return "returnCommand" }
func (r returnCommand) Group() string { return "" }

type unhandledCommand struct {
bus.AsyncCommand
}

func (u unhandledCommand) Name_() string { return "unhandledCommand" }
func (u unhandledCommand) ResourceID() string { return "" }
func (u unhandledCommand) Group() string { return "" }
func (u unhandledCommand) Name_() string { return "unhandledCommand" }
func (u unhandledCommand) Group() string { return "" }

var (
_ embedded.JobsStore = (*adapter)(nil)
Expand Down
3 changes: 1 addition & 2 deletions pkg/bus/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ type (
AsyncRequest interface {
MutateRequest
TypedRequest[AsyncResult]
ResourceID() string // ID of the main resource processed by the request
Group() string // Work group for this request, at most one job per group is processed at any given time
Group() string // Work group for this request, at most one job per group is processed at any given time
}

// Request to query the system.
Expand Down
5 changes: 2 additions & 3 deletions pkg/bus/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,8 @@ type asyncCommand struct {
SomeValue int
}

func (asyncCommand) Name_() string { return "AsyncCommand" }
func (asyncCommand) ResourceID() string { return "" }
func (asyncCommand) Group() string { return "" }
func (asyncCommand) Name_() string { return "AsyncCommand" }
func (asyncCommand) Group() string { return "" }

type getQuery struct {
bus.Query[int]
Expand Down
3 changes: 0 additions & 3 deletions pkg/bus/sqlite/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ func (s *JobsStore) GetAllJobs(ctx context.Context, query get_jobs.Query) (stora
return builder.
Select[get_jobs.Job](`
id
,resource_id
,[group]
,message_name
,message_data
Expand Down Expand Up @@ -136,7 +135,6 @@ func (s *JobsStore) Queue(
return builder.
Insert("scheduled_jobs", builder.Values{
"id": id.New[string](),
"resource_id": msg.ResourceID(),
"[group]": msg.Group(),
"message_name": msg.Name_(),
"message_data": msgData,
Expand Down Expand Up @@ -226,7 +224,6 @@ func jobMapper(scanner storage.Scanner) (embedded.Job, error) {
func jobQueryMapper(scanner storage.Scanner) (j get_jobs.Job, err error) {
err = scanner.Scan(
&j.ID,
&j.ResourceID,
&j.Group,
&j.MessageName,
&j.MessageData,
Expand Down
31 changes: 28 additions & 3 deletions pkg/bus/sqlite/migrations/1727345059_nullable_policy.up.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,32 @@
ALTER TABLE scheduled_jobs DROP COLUMN policy;
CREATE TEMP TABLE tmp_scheduled_jobs AS
SELECT * FROM scheduled_jobs;

-- Make the policy column nullable since it's not used anymore.
DROP TABLE scheduled_jobs;

-- Make the policy and resource_id columns nullable since it's not used anymore.
-- Since migrations are executed by domain orders (scheduler, auth, deployment) and
-- I have failed by making deployment rely on the scheduled_jobs table, I must keep
-- it or else I have to update the migration (which I think is worse).
ALTER TABLE scheduled_jobs ADD policy INTEGER NULL;
CREATE TABLE scheduled_jobs
(
id TEXT NOT NULL,
resource_id TEXT NULL,
[group] TEXT NOT NULL,
message_name TEXT NOT NULL,
message_data JSON NOT NULL,
queued_at DATETIME NOT NULL,
not_before DATETIME NOT NULL,
policy INTEGER NULL,
errcode TEXT NULL,
retrieved BOOLEAN NOT NULL DEFAULT false,

CONSTRAINT pk_scheduled_jobs PRIMARY KEY(id)
);

INSERT INTO scheduled_jobs
SELECT * FROM tmp_scheduled_jobs;

CREATE INDEX idx_scheduled_jobs_group ON scheduled_jobs([group]);
CREATE INDEX idx_scheduled_jobs_message_name ON scheduled_jobs(message_name);

DROP TABLE tmp_scheduled_jobs;

0 comments on commit cc3394b

Please sign in to comment.