From ad2127f2816cae9fa0e2fae7640a4722f1c42520 Mon Sep 17 00:00:00 2001 From: YuukanOO Date: Thu, 9 Nov 2023 11:14:59 +0100 Subject: [PATCH] chore: add discriminated union management Makes it easier to persist and retrieve dynamic types such as deployment Source data and Job payloads. --- cmd/configuration.go | 2 +- cmd/serve/deployments.go | 2 +- .../src/components/deployment-card.svelte | 7 +- .../src/components/deployment-pill.svelte | 2 +- .../front/src/lib/resources/deployments.ts | 22 +++++-- .../apps/[id]/deployments/new/+page.svelte | 4 +- cmd/serve/server.go | 14 ++-- .../app/command/cleanup_app_test.go | 2 +- .../deployment/app/command/deploy_test.go | 11 ++-- .../command/fail_running_deployments_test.go | 4 +- .../deployment/app/command/promote_test.go | 2 +- .../deployment/app/command/redeploy_test.go | 2 +- internal/deployment/app/query/gateway.go | 12 ++-- internal/deployment/domain/deployment.go | 31 +++++---- internal/deployment/domain/deployment_test.go | 49 ++++++++------ internal/deployment/domain/meta.go | 33 ---------- internal/deployment/domain/meta_test.go | 31 --------- internal/deployment/domain/service.go | 11 +--- internal/deployment/domain/source.go | 30 ++++++--- .../infra/backend/docker/docker_test.go | 7 +- .../deployment/infra/source/archive/data.go | 21 ++++++ .../deployment/infra/source/archive/source.go | 38 +++++------ internal/deployment/infra/source/facade.go | 6 +- internal/deployment/infra/source/git/data.go | 52 +++++++++++++++ .../deployment/infra/source/git/source.go | 65 +++++++------------ internal/deployment/infra/source/raw/data.go | 21 ++++++ .../deployment/infra/source/raw/source.go | 29 ++++----- .../deployment/infra/sqlite/deployments.go | 40 ++++++------ internal/deployment/infra/sqlite/gateway.go | 40 ++++++++---- .../1699476081_rename_source_columns.up.sql | 2 + .../app/command/fail_running_jobs_test.go | 4 +- .../worker/app/command/process_next_test.go | 30 +++++++-- internal/worker/app/command/queue.go | 21 +++--- internal/worker/app/command/queue_test.go | 21 +++--- internal/worker/domain/job.go | 45 +++++++++---- internal/worker/domain/job_test.go | 30 +++++---- internal/worker/infra/jobs/cleanup/data.go | 19 ++++++ internal/worker/infra/jobs/cleanup/handler.go | 50 ++++++++------ internal/worker/infra/jobs/deploy/data.go | 50 ++++++++++++++ internal/worker/infra/jobs/deploy/handler.go | 57 ++++++++-------- internal/worker/infra/jobs/facade.go | 50 +++++++------- internal/worker/infra/memory/jobs.go | 6 +- internal/worker/infra/sqlite/jobs.go | 20 +++--- .../1696843357_add_dedupe_name.up.sql | 9 ++- pkg/http/helpers.go | 11 +++- pkg/storage/discriminated.go | 44 +++++++++++++ pkg/storage/discriminated_test.go | 58 +++++++++++++++++ pkg/storage/scanner.go | 22 +++++-- pkg/storage/sqlite/builder/builder.go | 4 +- pkg/types/is.go | 7 ++ pkg/types/is_test.go | 25 +++++++ 51 files changed, 760 insertions(+), 415 deletions(-) delete mode 100644 internal/deployment/domain/meta.go delete mode 100644 internal/deployment/domain/meta_test.go create mode 100644 internal/deployment/infra/source/archive/data.go create mode 100644 internal/deployment/infra/source/git/data.go create mode 100644 internal/deployment/infra/source/raw/data.go create mode 100644 internal/deployment/infra/sqlite/migrations/1699476081_rename_source_columns.up.sql create mode 100644 internal/worker/infra/jobs/cleanup/data.go create mode 100644 internal/worker/infra/jobs/deploy/data.go create mode 100644 pkg/storage/discriminated.go create mode 100644 pkg/storage/discriminated_test.go create mode 100644 pkg/types/is.go create mode 100644 pkg/types/is_test.go diff --git a/cmd/configuration.go b/cmd/configuration.go index f5b506d0..0ec99d40 100644 --- a/cmd/configuration.go +++ b/cmd/configuration.go @@ -25,7 +25,7 @@ var ( ) const ( - databaseFilename = "seelf.db?_foreign_keys=yes" + databaseFilename = "seelf.db?_foreign_keys=yes&_txlock=immediate" defaultConfigFilename = "conf.yml" defaultPort = 8080 defaultHost = "" diff --git a/cmd/serve/deployments.go b/cmd/serve/deployments.go index 8a6920e4..ef74f312 100644 --- a/cmd/serve/deployments.go +++ b/cmd/serve/deployments.go @@ -19,7 +19,7 @@ type queueDeploymentBody struct { Environment string `json:"environment" form:"environment"` Raw monad.Maybe[string] `json:"raw"` Archive *multipart.FileHeader `form:"archive"` - Git monad.Maybe[git.Payload] `json:"git"` + Git monad.Maybe[git.Request] `json:"git"` } func (s *server) queueDeploymentHandler() gin.HandlerFunc { diff --git a/cmd/serve/front/src/components/deployment-card.svelte b/cmd/serve/front/src/components/deployment-card.svelte index 6dd4a7fc..23eb5a90 100644 --- a/cmd/serve/front/src/components/deployment-card.svelte +++ b/cmd/serve/front/src/components/deployment-card.svelte @@ -50,13 +50,12 @@ : '-'} - {#if data.meta.kind === 'git'} - {@const [branch, commit] = data.meta.data.split('@', 2)} + {#if data.source.discriminator === 'git'} - {branch} + {data.source.data.branch} - {commit.substring(0, 10)} + {data.source.data.hash.substring(0, 10)} {/if} diff --git a/cmd/serve/front/src/components/deployment-pill.svelte b/cmd/serve/front/src/components/deployment-pill.svelte index 9e83ce18..a56060c9 100644 --- a/cmd/serve/front/src/components/deployment-pill.svelte +++ b/cmd/serve/front/src/components/deployment-pill.svelte @@ -18,7 +18,7 @@ gap={1} > = undefined; let branch = ''; diff --git a/cmd/serve/server.go b/cmd/serve/server.go index 5bebe3ab..34d62d9a 100644 --- a/cmd/serve/server.go +++ b/cmd/serve/server.go @@ -209,7 +209,7 @@ func (s *server) configureServices() error { ) s.docker = docker.New(s.options, s.logger) - handler := jobs.NewFacade(s.logger, + handler := jobs.NewFacade( deploy.New(s.logger, deplcmd.Deploy(deploymentsStore, deploymentsStore, sourceFacade, s.docker)), cleanup.New(s.logger, deplcmd.CleanupApp(deploymentsStore, appsStore, appsStore, s.docker)), ) @@ -221,7 +221,7 @@ func (s *server) configureServices() error { func(ctx context.Context, tags []string) error { return processNextJob(ctx, workercmd.ProcessNextCommand{Names: tags}) }, - async.Group(s.options.RunnersDeploymentCount(), deploy.JobName, cleanup.JobName), + async.Group(s.options.RunnersDeploymentCount(), deploy.Data{}.Discriminator(), cleanup.Data("").Discriminator()), ) s.usersReader = usersStore @@ -247,7 +247,7 @@ func (s *server) configureServices() error { s.failRunningDeployments = deplcmd.FailRunningDeployments(deploymentsStore, deploymentsStore) s.failRunningJobs = workercmd.FailRunningJobs(jobsStore, jobsStore) - s.queueJob = workercmd.Queue(jobsStore) + s.queueJob = workercmd.Queue(jobsStore, handler) return nil } @@ -361,9 +361,13 @@ func (s *server) configureRouter() { func (s *server) domainEventHandler(ctx context.Context, e event.Event) error { switch evt := e.(type) { case domain.DeploymentCreated: - s.queueJob(ctx, deploy.Queue(evt)) + s.queueJob(ctx, workercmd.QueueCommand{ + Payload: deploy.Request(evt), + }) case domain.AppCleanupRequested: - s.queueJob(ctx, cleanup.Queue(evt.ID)) + s.queueJob(ctx, workercmd.QueueCommand{ + Payload: cleanup.Request(evt), + }) } return nil diff --git a/internal/deployment/app/command/cleanup_app_test.go b/internal/deployment/app/command/cleanup_app_test.go index 9bd67700..5e3a5bfc 100644 --- a/internal/deployment/app/command/cleanup_app_test.go +++ b/internal/deployment/app/command/cleanup_app_test.go @@ -49,7 +49,7 @@ func Test_CleanupApp(t *testing.T) { t.Run("should fail if there are still pending or running deployments", func(t *testing.T) { app := domain.NewApp("my-app", "uid") - depl, _ := app.NewDeployment(1, domain.NewMeta("some", "data"), domain.Production, options{}, "uid") + depl, _ := app.NewDeployment(1, meta{}, domain.Production, options{}, "uid") app.RequestCleanup("uid") uc := cleanup(initialData{ diff --git a/internal/deployment/app/command/deploy_test.go b/internal/deployment/app/command/deploy_test.go index 776b5fc4..f9d2e180 100644 --- a/internal/deployment/app/command/deploy_test.go +++ b/internal/deployment/app/command/deploy_test.go @@ -107,8 +107,6 @@ func Test_Deploy(t *testing.T) { }) } -const kind domain.Kind = "dummy" - type dummySource struct { err error } @@ -117,8 +115,8 @@ func source(failedWithErr error) domain.Source { return &dummySource{failedWithErr} } -func (*dummySource) Prepare(domain.App, any) (domain.Meta, error) { - return domain.NewMeta(kind, ""), nil +func (*dummySource) Prepare(domain.App, any) (domain.SourceData, error) { + return meta{}, nil } func (t *dummySource) Fetch(context.Context, domain.Deployment) error { @@ -140,3 +138,8 @@ func (b *dummyBackend) Run(context.Context, domain.Deployment) (domain.Services, func (b *dummyBackend) Cleanup(context.Context, domain.App) error { return nil } + +type meta struct{} + +func (meta) Discriminator() string { return "test" } +func (m meta) NeedVCS() bool { return false } diff --git a/internal/deployment/app/command/fail_running_deployments_test.go b/internal/deployment/app/command/fail_running_deployments_test.go index 9060ddc1..851352dd 100644 --- a/internal/deployment/app/command/fail_running_deployments_test.go +++ b/internal/deployment/app/command/fail_running_deployments_test.go @@ -26,12 +26,12 @@ func Test_FailRunningDeployments(t *testing.T) { t.Run("should reset running deployments", func(t *testing.T) { errReset := errors.New("server_reset") - started, _ := app.NewDeployment(2, domain.Meta{}, domain.Production, opts, "some-uid") + started, _ := app.NewDeployment(2, meta{}, domain.Production, opts, "some-uid") err := started.HasStarted() testutil.IsNil(t, err) - succeeded, _ := app.NewDeployment(1, domain.Meta{}, domain.Production, opts, "some-uid") + succeeded, _ := app.NewDeployment(1, meta{}, domain.Production, opts, "some-uid") succeeded.HasStarted() err = succeeded.HasEnded(domain.Services{}, nil) diff --git a/internal/deployment/app/command/promote_test.go b/internal/deployment/app/command/promote_test.go index d4f5cf11..a386e007 100644 --- a/internal/deployment/app/command/promote_test.go +++ b/internal/deployment/app/command/promote_test.go @@ -43,7 +43,7 @@ func Test_Promote(t *testing.T) { }) t.Run("should correctly creates a new deployment based on the provided one", func(t *testing.T) { - dpl, _ := app.NewDeployment(1, domain.NewMeta("some", "data"), domain.Staging, opts, "some-uid") + dpl, _ := app.NewDeployment(1, meta{}, domain.Staging, opts, "some-uid") uc := promote(dpl) number, err := uc(ctx, command.PromoteCommand{ diff --git a/internal/deployment/app/command/redeploy_test.go b/internal/deployment/app/command/redeploy_test.go index 28ca7c7b..bbeba5af 100644 --- a/internal/deployment/app/command/redeploy_test.go +++ b/internal/deployment/app/command/redeploy_test.go @@ -43,7 +43,7 @@ func Test_Redeploy(t *testing.T) { }) t.Run("should correctly creates a new deployment based on the provided one", func(t *testing.T) { - dpl, _ := app.NewDeployment(1, domain.NewMeta("some", "data"), domain.Production, opts, "some-uid") + dpl, _ := app.NewDeployment(1, meta{}, domain.Production, opts, "some-uid") uc := redeploy(dpl) number, err := uc(ctx, command.RedeployCommand{ diff --git a/internal/deployment/app/query/gateway.go b/internal/deployment/app/query/gateway.go index 1545bbde..cdcc7c87 100644 --- a/internal/deployment/app/query/gateway.go +++ b/internal/deployment/app/query/gateway.go @@ -9,6 +9,8 @@ import ( "github.com/YuukanOO/seelf/pkg/storage" ) +var SourceDataTypes = storage.NewDiscriminatedMapper[SourceData]() + type ( // Access to the underlying storage adapter for read use cases Gateway interface { @@ -43,17 +45,19 @@ type ( AppID string `json:"app_id"` DeploymentNumber int `json:"deployment_number"` Environment string `json:"environment"` - Meta Meta `json:"meta"` + Source Source `json:"source"` State State `json:"state"` RequestedAt time.Time `json:"requested_at"` RequestedBy User `json:"requested_by"` } - Meta struct { - Kind string `json:"kind"` - Data monad.Maybe[string] `json:"data"` // Contain source data only when the information is not sensitive + Source struct { + Discriminator string `json:"discriminator"` + Data SourceData `json:"data"` } + SourceData storage.Discriminated + VCSConfig struct { Url string `json:"url"` Token monad.Maybe[query.SecretString] `json:"token"` diff --git a/internal/deployment/domain/deployment.go b/internal/deployment/domain/deployment.go index d0fdf01e..63dc7df9 100644 --- a/internal/deployment/domain/deployment.go +++ b/internal/deployment/domain/deployment.go @@ -42,10 +42,10 @@ type ( event.Emitter id DeploymentID - path string + path string // Relative path where deployment data will be stored config Config state State - source Meta + source SourceData requested shared.Action[domain.UserID] } @@ -69,7 +69,7 @@ type ( Path string Config Config State State - Source Meta + Source SourceData Requested shared.Action[domain.UserID] } @@ -83,7 +83,7 @@ type ( // entity to make sure a new deployment can be created for an app. func (a App) NewDeployment( deployNumber DeploymentNumber, - meta Meta, + meta SourceData, env Environment, tmpl DeploymentDirTemplate, requestedBy domain.UserID, @@ -92,7 +92,7 @@ func (a App) NewDeployment( return d, ErrAppCleanupRequested } - if meta.kind.IsVCS() && !a.vcs.HasValue() { + if meta.NeedVCS() && !a.vcs.HasValue() { return d, ErrVCSNotConfigured } @@ -148,8 +148,10 @@ func (a App) Promote( func DeploymentFrom(scanner storage.Scanner) (d Deployment, err error) { var ( - requestedAt time.Time - requestedBy domain.UserID + requestedAt time.Time + requestedBy domain.UserID + sourceMetaDiscriminator string + sourceMetaData string ) err = scanner.Scan( @@ -165,20 +167,25 @@ func DeploymentFrom(scanner storage.Scanner) (d Deployment, err error) { &d.state.services, &d.state.startedAt, &d.state.finishedAt, - &d.source.kind, - &d.source.data, + &sourceMetaDiscriminator, + &sourceMetaData, &requestedAt, &requestedBy, ) + if err != nil { + return d, err + } + + d.source, err = SourceDataTypes.From(sourceMetaDiscriminator, sourceMetaData) d.requested = shared.ActionFrom(requestedBy, requestedAt) return d, err } -func (d Deployment) ID() DeploymentID { return d.id } -func (d Deployment) Config() Config { return d.config } -func (d Deployment) Source() Meta { return d.source } +func (d Deployment) ID() DeploymentID { return d.id } +func (d Deployment) Config() Config { return d.config } +func (d Deployment) Source() SourceData { return d.source } // Retrieve the deployment path relative to the given directories. func (d Deployment) Path(relativeTo ...string) string { diff --git a/internal/deployment/domain/deployment_test.go b/internal/deployment/domain/deployment_test.go index a2bac612..14ed3510 100644 --- a/internal/deployment/domain/deployment_test.go +++ b/internal/deployment/domain/deployment_test.go @@ -14,17 +14,17 @@ import ( func Test_Deployment(t *testing.T) { var ( - uid auth.UserID = "uid" - tmpl domain.DeploymentDirTemplate = dirTemplate{} - number domain.DeploymentNumber = 1 - meta domain.Meta = domain.NewMeta("test-kind", "test-data") + uid auth.UserID = "uid" + tmpl domain.DeploymentDirTemplate = dirTemplate{} + number domain.DeploymentNumber = 1 + vcsMeta = meta{true} + nonVcsMeta = meta{false} ) - t.Run("should require a version control config to be defined on the app for vcs managed trigger", func(t *testing.T) { - gitMeta := domain.NewMeta(domain.KindGit, "master") + t.Run("should require a version control config to be defined on the app for vcs managed source", func(t *testing.T) { app := domain.NewApp("my-app", uid) - _, err := app.NewDeployment(number, gitMeta, domain.Production, tmpl, uid) + _, err := app.NewDeployment(number, vcsMeta, domain.Production, tmpl, uid) testutil.ErrorIs(t, domain.ErrVCSNotConfigured, err) }) @@ -33,19 +33,19 @@ func Test_Deployment(t *testing.T) { app := domain.NewApp("my-app", uid) app.RequestCleanup("uid") - _, err := app.NewDeployment(number, meta, domain.Production, tmpl, uid) + _, err := app.NewDeployment(number, nonVcsMeta, domain.Production, tmpl, uid) testutil.ErrorIs(t, domain.ErrAppCleanupRequested, err) }) t.Run("should be created from a valid app", func(t *testing.T) { app := domain.NewApp("my-app", uid) - dpl, err := app.NewDeployment(number, meta, domain.Production, tmpl, uid) + dpl, err := app.NewDeployment(number, nonVcsMeta, domain.Production, tmpl, uid) conf := dpl.Config() testutil.IsNil(t, err) testutil.Equals(t, domain.DeploymentIDFrom(app.ID(), number), dpl.ID()) - testutil.Equals(t, meta, dpl.Source()) + testutil.Equals(t, nonVcsMeta, dpl.Source().(meta)) testutil.Equals(t, "my-app", conf.AppName()) testutil.Equals(t, domain.Production, conf.Environment()) @@ -62,7 +62,7 @@ func Test_Deployment(t *testing.T) { t.Run("could returns its path relative to the given directories", func(t *testing.T) { app := domain.NewApp("my-app", uid) - dpl, err := app.NewDeployment(number, meta, domain.Production, tmpl, uid) + dpl, err := app.NewDeployment(number, nonVcsMeta, domain.Production, tmpl, uid) testutil.IsNil(t, err) testutil.Equals(t, filepath.Join(string(dpl.ID().AppID()), "1", "production"), dpl.Path()) @@ -71,7 +71,7 @@ func Test_Deployment(t *testing.T) { t.Run("could returns its log path relative to the given directories", func(t *testing.T) { app := domain.NewApp("my-app", uid) - dpl, err := app.NewDeployment(number, meta, domain.Production, tmpl, uid) + dpl, err := app.NewDeployment(number, nonVcsMeta, domain.Production, tmpl, uid) created := testutil.EventIs[domain.DeploymentCreated](t, &dpl, 0) @@ -84,7 +84,7 @@ func Test_Deployment(t *testing.T) { var err error app := domain.NewApp("my-app", uid) - dpl, err := app.NewDeployment(number, meta, domain.Production, tmpl, uid) + dpl, err := app.NewDeployment(number, nonVcsMeta, domain.Production, tmpl, uid) testutil.IsNil(t, err) @@ -105,7 +105,7 @@ func Test_Deployment(t *testing.T) { ) app := domain.NewApp("my-app", uid) - dpl, _ := app.NewDeployment(number, meta, domain.Production, tmpl, uid) + dpl, _ := app.NewDeployment(number, nonVcsMeta, domain.Production, tmpl, uid) services, _ = services.Internal(dpl.Config(), "aservice", "an/image") dpl.HasStarted() @@ -124,7 +124,7 @@ func Test_Deployment(t *testing.T) { var err error app := domain.NewApp("my-app", uid) - dpl, _ := app.NewDeployment(number, meta, domain.Production, tmpl, uid) + dpl, _ := app.NewDeployment(number, nonVcsMeta, domain.Production, tmpl, uid) dpl.HasStarted() err = dpl.HasEnded(nil, nil) @@ -145,7 +145,7 @@ func Test_Deployment(t *testing.T) { ) app := domain.NewApp("my-app", uid) - dpl, _ := app.NewDeployment(number, meta, domain.Production, tmpl, uid) + dpl, _ := app.NewDeployment(number, nonVcsMeta, domain.Production, tmpl, uid) dpl.HasStarted() err = dpl.HasEnded(nil, reason) @@ -162,7 +162,7 @@ func Test_Deployment(t *testing.T) { t.Run("could be redeployed", func(t *testing.T) { app := domain.NewApp("my-app", uid) - dpl, _ := app.NewDeployment(number, meta, domain.Production, tmpl, uid) + dpl, _ := app.NewDeployment(number, nonVcsMeta, domain.Production, tmpl, uid) redpl, err := app.Redeploy(dpl, 2, tmpl, "another-user") @@ -172,7 +172,7 @@ func Test_Deployment(t *testing.T) { }) t.Run("should err if trying to redeploy a deployment on the wrong app", func(t *testing.T) { - source, _ := domain.NewApp("an-app", uid).NewDeployment(1, meta, domain.Production, tmpl, uid) + source, _ := domain.NewApp("an-app", uid).NewDeployment(1, nonVcsMeta, domain.Production, tmpl, uid) _, err := domain.NewApp("my-app", uid).Redeploy(source, 2, tmpl, "uid") @@ -181,7 +181,7 @@ func Test_Deployment(t *testing.T) { t.Run("could not promote an already in production deployment", func(t *testing.T) { app := domain.NewApp("my-app", uid) - dpl, _ := app.NewDeployment(number, meta, domain.Production, tmpl, uid) + dpl, _ := app.NewDeployment(number, nonVcsMeta, domain.Production, tmpl, uid) _, err := app.Promote(dpl, 2, tmpl, "another-user") @@ -189,7 +189,7 @@ func Test_Deployment(t *testing.T) { }) t.Run("should err if trying to promote a deployment on the wrong app", func(t *testing.T) { - source, _ := domain.NewApp("an-app", uid).NewDeployment(1, meta, domain.Staging, tmpl, uid) + source, _ := domain.NewApp("an-app", uid).NewDeployment(1, nonVcsMeta, domain.Staging, tmpl, uid) _, err := domain.NewApp("my-app", uid).Promote(source, 2, tmpl, "uid") @@ -198,7 +198,7 @@ func Test_Deployment(t *testing.T) { t.Run("could promote a staging deployment", func(t *testing.T) { app := domain.NewApp("my-app", uid) - dpl, _ := app.NewDeployment(number, meta, domain.Staging, tmpl, uid) + dpl, _ := app.NewDeployment(number, nonVcsMeta, domain.Staging, tmpl, uid) promoted, err := app.Promote(dpl, 2, tmpl, "another-user") @@ -213,3 +213,10 @@ type dirTemplate struct{} func (dirTemplate) Execute(d domain.DeploymentTemplateData) string { return filepath.Join(strconv.Itoa(int(d.Number)), string(d.Environment)) } + +type meta struct { + isVCS bool +} + +func (meta) Discriminator() string { return "test" } +func (m meta) NeedVCS() bool { return m.isVCS } diff --git a/internal/deployment/domain/meta.go b/internal/deployment/domain/meta.go deleted file mode 100644 index 643af2d8..00000000 --- a/internal/deployment/domain/meta.go +++ /dev/null @@ -1,33 +0,0 @@ -package domain - -// Contains stuff related to how the deployment has been triggered. -// The inner data depends on the Trigger which has been requested, this is -// why I use primitive types here. -type Meta struct { - kind Kind - data string -} - -// Builds a new deployment meta from a kind and any additional data. -// You should never call it outside a Source implementation since -// it will be picked by a job to actually initiate the deployment. -func NewMeta(kind Kind, data string) Meta { - return Meta{kind, data} -} - -func (m Meta) Kind() Kind { return m.kind } -func (m Meta) Data() string { return m.data } - -// Specific type to represents a trigger kind to add helper methods on it. -type Kind string - -// Represents a trigger which will fetch source code from Git. It is defined here -// because it has special meanings (see below). -const KindGit Kind = "git" - -// Gets wether this kind of trigger is a version controlled one. -// For now it only checks for git but in the future, maybe there will be other -// VCS to deal with. -func (k Kind) IsVCS() bool { - return k == KindGit -} diff --git a/internal/deployment/domain/meta_test.go b/internal/deployment/domain/meta_test.go deleted file mode 100644 index abcd3b09..00000000 --- a/internal/deployment/domain/meta_test.go +++ /dev/null @@ -1,31 +0,0 @@ -package domain_test - -import ( - "testing" - - "github.com/YuukanOO/seelf/internal/deployment/domain" - "github.com/YuukanOO/seelf/pkg/testutil" -) - -func Test_Meta(t *testing.T) { - t.Run("could be created", func(t *testing.T) { - var ( - kind domain.Kind = "meta kind" - payload = "meta payload" - ) - - meta := domain.NewMeta(kind, payload) - - testutil.Equals(t, kind, meta.Kind()) - testutil.Equals(t, payload, meta.Data()) - }) -} - -func Test_Kind(t *testing.T) { - t.Run("could check wether it represents a VCS managed source", func(t *testing.T) { - var rawKind domain.Kind = "raw" - - testutil.IsFalse(t, rawKind.IsVCS()) - testutil.IsTrue(t, domain.KindGit.IsVCS()) - }) -} diff --git a/internal/deployment/domain/service.go b/internal/deployment/domain/service.go index ae5539d6..818b3fd6 100644 --- a/internal/deployment/domain/service.go +++ b/internal/deployment/domain/service.go @@ -78,15 +78,8 @@ func (s Services) hasExposedServices() bool { return false } -func (s Services) Value() (driver.Value, error) { - r, err := json.Marshal(s) - - return string(r), err -} - -func (s *Services) Scan(value any) error { - return storage.ScanJSON(value, s) -} +func (s Services) Value() (driver.Value, error) { return storage.ValueJSON(s) } +func (s *Services) Scan(value any) error { return storage.ScanJSON(value, s) } // Type needed to marshal an unexposed Service data. type marshalledService struct { diff --git a/internal/deployment/domain/source.go b/internal/deployment/domain/source.go index f97dcada..9bb76b3a 100644 --- a/internal/deployment/domain/source.go +++ b/internal/deployment/domain/source.go @@ -2,18 +2,30 @@ package domain import ( "context" + "errors" - "github.com/YuukanOO/seelf/pkg/apperr" + "github.com/YuukanOO/seelf/pkg/storage" ) var ( - ErrNoValidSourceFound = apperr.New("no_valid_source_found") - ErrInvalidSourcePayload = apperr.New("invalid_source_payload") - ErrSourceFetchFailed = apperr.New("source_fetch_failed") + ErrNoValidSourceFound = errors.New("no_valid_source_found") + ErrInvalidSourcePayload = errors.New("invalid_source_payload") + ErrSourceFetchFailed = errors.New("source_fetch_failed") + + SourceDataTypes = storage.NewDiscriminatedMapper[SourceData]() ) -// Represents a source which has initiated a deployment. -type Source interface { - Prepare(App, any) (Meta, error) // Prepare the given payload for the given application, doing any needed validation - Fetch(context.Context, Deployment) error // Retrieve deployment data before passing in to a backend -} +type ( + // Contains stuff related to how the deployment has been triggered. + // The inner data depends on the Source which has been requested. + SourceData interface { + storage.Discriminated + NeedVCS() bool + } + + // Represents a source which has initiated a deployment. + Source interface { + Prepare(App, any) (SourceData, error) // Prepare the given payload for the given application, doing any needed validation + Fetch(context.Context, Deployment) error // Retrieve deployment data before passing in to a backend + } +) diff --git a/internal/deployment/infra/backend/docker/docker_test.go b/internal/deployment/infra/backend/docker/docker_test.go index f762754b..615ce346 100644 --- a/internal/deployment/infra/backend/docker/docker_test.go +++ b/internal/deployment/infra/backend/docker/docker_test.go @@ -110,7 +110,7 @@ func Test_Run(t *testing.T) { t.Run("should err if no compose file was found for a deployment", func(t *testing.T) { opts := options{domain: "http://docker.localhost"} app := domain.NewApp("my-app", "uid") - depl, _ := app.NewDeployment(1, domain.NewMeta("test", ""), domain.Production, opts, "uid") + depl, _ := app.NewDeployment(1, meta{}, domain.Production, opts, "uid") dockerBackend, _, _ := backend(opts) _, err := dockerBackend.Run(context.Background(), depl) @@ -375,3 +375,8 @@ func (o options) AcmeEmail() string { return o.email } func (o options) Execute(d domain.DeploymentTemplateData) string { return filepath.Join(strconv.Itoa(int(d.Number)), string(d.Environment)) } + +type meta struct{} + +func (meta) Discriminator() string { return "test" } +func (meta) NeedVCS() bool { return false } diff --git a/internal/deployment/infra/source/archive/data.go b/internal/deployment/infra/source/archive/data.go new file mode 100644 index 00000000..860568b8 --- /dev/null +++ b/internal/deployment/infra/source/archive/data.go @@ -0,0 +1,21 @@ +package archive + +import ( + "github.com/YuukanOO/seelf/internal/deployment/app/query" + "github.com/YuukanOO/seelf/internal/deployment/domain" +) + +type Data string + +func (p Data) Discriminator() string { return "archive" } +func (p Data) NeedVCS() bool { return false } + +func init() { + domain.SourceDataTypes.Register(Data(""), func(value string) (domain.SourceData, error) { + return Data(value), nil + }) + + query.SourceDataTypes.Register(Data(""), func(value string) (query.SourceData, error) { + return Data(value), nil + }) +} diff --git a/internal/deployment/infra/source/archive/source.go b/internal/deployment/infra/source/archive/source.go index dcc1b674..16bdf9b2 100644 --- a/internal/deployment/infra/source/archive/source.go +++ b/internal/deployment/infra/source/archive/source.go @@ -14,12 +14,10 @@ import ( "github.com/YuukanOO/seelf/internal/deployment/infra" "github.com/YuukanOO/seelf/internal/deployment/infra/source" "github.com/YuukanOO/seelf/pkg/ostools" + "github.com/YuukanOO/seelf/pkg/types" ) -const ( - kind domain.Kind = "archive" - tmpPattern string = "seelf-archive-" -) +const tmpPattern string = "seelf-archive-" var ( ErrOpenArchiveFailed = errors.New("open_archive_failed") @@ -46,22 +44,20 @@ func New(options Options) source.Source { } } -func (*service) CanPrepare(payload any) bool { - _, ok := payload.(*multipart.FileHeader) - return ok -} +func (*service) CanPrepare(payload any) bool { return types.Is[*multipart.FileHeader](payload) } +func (*service) CanFetch(meta domain.SourceData) bool { return types.Is[Data](meta) } -func (t *service) Prepare(app domain.App, payload any) (domain.Meta, error) { +func (t *service) Prepare(app domain.App, payload any) (domain.SourceData, error) { file, ok := payload.(*multipart.FileHeader) if !ok { - return domain.Meta{}, domain.ErrInvalidSourcePayload + return nil, domain.ErrInvalidSourcePayload } tmpfile, err := os.CreateTemp("", tmpPattern) if err != nil { - return domain.Meta{}, err + return nil, err } defer tmpfile.Close() @@ -69,20 +65,16 @@ func (t *service) Prepare(app domain.App, payload any) (domain.Meta, error) { archive, err := file.Open() if err != nil { - return domain.Meta{}, err + return nil, err } defer archive.Close() if _, err := io.Copy(tmpfile, archive); err != nil { - return domain.Meta{}, err + return nil, err } - return domain.NewMeta(kind, tmpfile.Name()), nil -} - -func (*service) CanFetch(meta domain.Meta) bool { - return meta.Kind() == kind + return Data(tmpfile.Name()), nil } func (t *service) Fetch(ctx context.Context, depl domain.Deployment) error { @@ -103,12 +95,16 @@ func (t *service) Fetch(ctx context.Context, depl domain.Deployment) error { return domain.ErrSourceFetchFailed } - archivePath := depl.Source().Data() + data, ok := depl.Source().(Data) + + if !ok { + return domain.ErrInvalidSourcePayload + } - logger.Stepf("extracting archive %s into %s", archivePath, buildDir) + logger.Stepf("extracting archive %s into %s", data, buildDir) // Open the archive file stored in a temporary location - archive, err := os.Open(archivePath) + archive, err := os.Open(string(data)) if err != nil { logger.Error(err) diff --git a/internal/deployment/infra/source/facade.go b/internal/deployment/infra/source/facade.go index 11a85405..563860dc 100644 --- a/internal/deployment/infra/source/facade.go +++ b/internal/deployment/infra/source/facade.go @@ -10,7 +10,7 @@ type ( Source interface { domain.Source CanPrepare(any) bool - CanFetch(domain.Meta) bool + CanFetch(domain.SourceData) bool } facade struct { @@ -23,14 +23,14 @@ func NewFacade(sources ...Source) domain.Source { return &facade{sources} } -func (r *facade) Prepare(app domain.App, payload any) (domain.Meta, error) { +func (r *facade) Prepare(app domain.App, payload any) (domain.SourceData, error) { for _, src := range r.sources { if src.CanPrepare(payload) { return src.Prepare(app, payload) } } - return domain.Meta{}, domain.ErrNoValidSourceFound + return nil, domain.ErrNoValidSourceFound } func (r *facade) Fetch(ctx context.Context, depl domain.Deployment) error { diff --git a/internal/deployment/infra/source/git/data.go b/internal/deployment/infra/source/git/data.go new file mode 100644 index 00000000..ab15c53f --- /dev/null +++ b/internal/deployment/infra/source/git/data.go @@ -0,0 +1,52 @@ +package git + +import ( + "database/sql/driver" + "encoding/json" + "strings" + + "github.com/YuukanOO/seelf/internal/deployment/app/query" + "github.com/YuukanOO/seelf/internal/deployment/domain" + "github.com/YuukanOO/seelf/pkg/storage" +) + +type Data struct { + Branch string `json:"branch"` + Hash string `json:"hash"` +} + +func (p Data) Discriminator() string { return "git" } +func (p Data) NeedVCS() bool { return true } + +func (p Data) Value() (driver.Value, error) { return storage.ValueJSON(p) } + +func init() { + domain.SourceDataTypes.Register(Data{}, func(value string) (domain.SourceData, error) { + return tryParseGitData(value) + }) + + // Here the registered discriminated type is the same since there are no unexposed fields and + // it also handle the retrocompatibility with the old payload format. + query.SourceDataTypes.Register(Data{}, func(value string) (query.SourceData, error) { + return tryParseGitData(value) + }) +} + +// Try to parse the given value as a git data payload. If the value is not a valid +// json string, it will fallback to the old format. +func tryParseGitData(value string) (Data, error) { + var p Data + + if !json.Valid([]byte(value)) { + lastSeparatorIdx := strings.LastIndex(value, "@") + branch, hash := value[:lastSeparatorIdx], value[lastSeparatorIdx+1:] + + p.Branch = branch + p.Hash = hash + return p, nil + } + + err := storage.ScanJSON(value, &p) + + return p, err +} diff --git a/internal/deployment/infra/source/git/source.go b/internal/deployment/infra/source/git/source.go index b4658c10..bfdd351a 100644 --- a/internal/deployment/infra/source/git/source.go +++ b/internal/deployment/infra/source/git/source.go @@ -3,9 +3,7 @@ package git import ( "context" "errors" - "fmt" "os" - sstrings "strings" "github.com/YuukanOO/seelf/internal/deployment/domain" "github.com/YuukanOO/seelf/internal/deployment/infra" @@ -15,6 +13,7 @@ import ( "github.com/YuukanOO/seelf/pkg/apperr" "github.com/YuukanOO/seelf/pkg/monad" "github.com/YuukanOO/seelf/pkg/ostools" + "github.com/YuukanOO/seelf/pkg/types" "github.com/YuukanOO/seelf/pkg/validation" "github.com/YuukanOO/seelf/pkg/validation/strings" "github.com/go-git/go-git/v5" @@ -31,10 +30,7 @@ var ( ErrGitCheckoutFailed = errors.New("git_checkout_failed") ) -const ( - basicAuthUser = "seelf" - separator = "@" -) +const basicAuthUser = "seelf" type ( Options interface { @@ -42,7 +38,8 @@ type ( LogsDir() string } - Payload struct { + // Public request to trigger a git deployment + Request struct { Branch string `json:"branch"` Hash monad.Maybe[string] `json:"hash"` } @@ -61,49 +58,37 @@ func New(options Options, reader domain.AppsReader) source.Source { } } -func (*service) CanPrepare(payload any) bool { - _, ok := payload.(Payload) - return ok -} +func (*service) CanPrepare(payload any) bool { return types.Is[Request](payload) } +func (*service) CanFetch(meta domain.SourceData) bool { return types.Is[Data](meta) } -func (s *service) Prepare(app domain.App, payload any) (domain.Meta, error) { - p, ok := payload.(Payload) +func (s *service) Prepare(app domain.App, payload any) (domain.SourceData, error) { + req, ok := payload.(Request) if !ok { - return domain.Meta{}, domain.ErrInvalidSourcePayload + return nil, domain.ErrInvalidSourcePayload } if err := validation.Check(validation.Of{ - "branch": validation.Is(p.Branch, strings.Required), - "hash": validation.Maybe(p.Hash, func(hash string) error { + "branch": validation.Is(req.Branch, strings.Required), + "hash": validation.Maybe(req.Hash, func(hash string) error { return validation.Is(hash, strings.Required) }), }); err != nil { - return domain.Meta{}, err + return nil, err } if !app.VCS().HasValue() { - return domain.Meta{}, domain.ErrVCSNotConfigured + return nil, domain.ErrVCSNotConfigured } // Retrieve the latest commit to make sure the branch exists - latestCommit, err := getLatestBranchCommit(app.VCS().MustGet(), p.Branch) + latestCommit, err := getLatestBranchCommit(app.VCS().MustGet(), req.Branch) if err != nil { - return domain.Meta{}, validation.WrapIfAppErr(err, "branch") + return nil, validation.WrapIfAppErr(err, "branch") } - if !p.Hash.HasValue() { - p.Hash = p.Hash.WithValue(latestCommit) - } - - metaPayload := fmt.Sprintf("%s%s%s", p.Branch, separator, p.Hash.MustGet()) - - return domain.NewMeta(domain.KindGit, metaPayload), nil -} - -func (*service) CanFetch(meta domain.Meta) bool { - return meta.Kind() == domain.KindGit + return Data{req.Branch, req.Hash.Get(latestCommit)}, nil } func (s *service) Fetch(ctx context.Context, depl domain.Deployment) error { @@ -118,7 +103,7 @@ func (s *service) Fetch(ctx context.Context, depl domain.Deployment) error { logger := infra.NewStepLogger(logfile) // Retrieve git url and token from the app - app, err := s.reader.GetByID(context.Background(), depl.ID().AppID()) + app, err := s.reader.GetByID(ctx, depl.ID().AppID()) if err != nil { logger.Error(err) @@ -139,18 +124,18 @@ func (s *service) Fetch(ctx context.Context, depl domain.Deployment) error { config := app.VCS().MustGet() - // Retrieve the branch and hash for deployment payload - // We use the LastIndex because an @ is a vaid character in a branch name - data := depl.Source().Data() - lastSeparatorIdx := sstrings.LastIndex(data, separator) - branch, hash := data[:lastSeparatorIdx], data[lastSeparatorIdx+1:] + data, ok := depl.Source().(Data) + + if !ok { + return domain.ErrInvalidSourcePayload + } - logger.Stepf("cloning branch %s at %s from %s using token: %t", branch, hash, config.Url(), config.Token().HasValue()) + logger.Stepf("cloning branch %s at %s from %s using token: %t", data.Branch, data.Hash, config.Url(), config.Token().HasValue()) r, err := git.PlainCloneContext(ctx, buildDir, false, &git.CloneOptions{ Auth: getAuthMethod(config), SingleBranch: true, - ReferenceName: plumbing.NewBranchReferenceName(branch), + ReferenceName: plumbing.NewBranchReferenceName(data.Branch), URL: config.Url().String(), Progress: logfile, }) @@ -161,7 +146,7 @@ func (s *service) Fetch(ctx context.Context, depl domain.Deployment) error { } // Resolve short hash names if needed - rev, err := r.ResolveRevision(plumbing.Revision(hash)) + rev, err := r.ResolveRevision(plumbing.Revision(data.Hash)) if err != nil { logger.Error(err) diff --git a/internal/deployment/infra/source/raw/data.go b/internal/deployment/infra/source/raw/data.go new file mode 100644 index 00000000..d6980310 --- /dev/null +++ b/internal/deployment/infra/source/raw/data.go @@ -0,0 +1,21 @@ +package raw + +import ( + "github.com/YuukanOO/seelf/internal/deployment/app/query" + "github.com/YuukanOO/seelf/internal/deployment/domain" +) + +type Data string + +func (p Data) Discriminator() string { return "raw" } +func (p Data) NeedVCS() bool { return false } + +func init() { + domain.SourceDataTypes.Register(Data(""), func(value string) (domain.SourceData, error) { + return Data(value), nil + }) + + query.SourceDataTypes.Register(Data(""), func(value string) (query.SourceData, error) { + return Data(value), nil + }) +} diff --git a/internal/deployment/infra/source/raw/source.go b/internal/deployment/infra/source/raw/source.go index 02a6148b..a6663efa 100644 --- a/internal/deployment/infra/source/raw/source.go +++ b/internal/deployment/infra/source/raw/source.go @@ -10,12 +10,11 @@ import ( "github.com/YuukanOO/seelf/internal/deployment/infra" "github.com/YuukanOO/seelf/internal/deployment/infra/source" "github.com/YuukanOO/seelf/pkg/ostools" + "github.com/YuukanOO/seelf/pkg/types" "github.com/YuukanOO/seelf/pkg/validation" "github.com/YuukanOO/seelf/pkg/validation/strings" ) -const kind domain.Kind = "raw" - var ErrWriteComposeFailed = errors.New("write_compose_failed") type ( @@ -35,29 +34,23 @@ func New(options Options) source.Source { } } -func (*service) CanPrepare(payload any) bool { - _, ok := payload.(string) - return ok -} +func (*service) CanPrepare(payload any) bool { return types.Is[string](payload) } +func (*service) CanFetch(meta domain.SourceData) bool { return types.Is[Data](meta) } -func (s *service) Prepare(app domain.App, payload any) (domain.Meta, error) { +func (s *service) Prepare(app domain.App, payload any) (domain.SourceData, error) { rawServiceFileContent, ok := payload.(string) if !ok { - return domain.Meta{}, domain.ErrInvalidSourcePayload + return nil, domain.ErrInvalidSourcePayload } if err := validation.Check(validation.Of{ "content": validation.Is(rawServiceFileContent, strings.Required), }); err != nil { - return domain.Meta{}, err + return nil, err } - return domain.NewMeta(kind, rawServiceFileContent), nil -} - -func (*service) CanFetch(meta domain.Meta) bool { - return meta.Kind() == kind + return Data(rawServiceFileContent), nil } func (s *service) Fetch(ctx context.Context, depl domain.Deployment) error { @@ -80,9 +73,15 @@ func (s *service) Fetch(ctx context.Context, depl domain.Deployment) error { filename := filepath.Join(buildDir, "compose.yml") + data, ok := depl.Source().(Data) + + if !ok { + return domain.ErrInvalidSourcePayload + } + logger.Stepf("writing service file to %s", filename) - if err := ostools.WriteFile(filename, []byte(depl.Source().Data())); err != nil { + if err := ostools.WriteFile(filename, []byte(data)); err != nil { logger.Error(err) return ErrWriteComposeFailed } diff --git a/internal/deployment/infra/sqlite/deployments.go b/internal/deployment/infra/sqlite/deployments.go index 74898a24..49bbba14 100644 --- a/internal/deployment/infra/sqlite/deployments.go +++ b/internal/deployment/infra/sqlite/deployments.go @@ -40,8 +40,8 @@ func (s *deploymentsStore) GetByID(ctx context.Context, id domain.DeploymentID) ,state_services ,state_started_at ,state_finished_at - ,source_kind - ,source_data + ,source_discriminator + ,source ,requested_at ,requested_by FROM deployments @@ -79,8 +79,8 @@ func (s *deploymentsStore) GetRunningDeployments(ctx context.Context) ([]domain. ,state_services ,state_started_at ,state_finished_at - ,source_kind - ,source_data + ,source_discriminator + ,source ,requested_at ,requested_by FROM deployments @@ -104,22 +104,22 @@ func (s *deploymentsStore) Write(c context.Context, deployments ...*domain.Deplo case domain.DeploymentCreated: return builder. Insert("deployments", builder.Values{ - "app_id": evt.ID.AppID(), - "deployment_number": evt.ID.DeploymentNumber(), - "path": evt.Path, - "config_appname": evt.Config.AppName(), - "config_environment": evt.Config.Environment(), - "config_env": evt.Config.Env(), - "state_status": evt.State.Status(), - "state_logfile": evt.State.LogFile(), - "state_errcode": evt.State.ErrCode(), - "state_services": evt.State.Services(), - "state_started_at": evt.State.StartedAt(), - "state_finished_at": evt.State.FinishedAt(), - "source_kind": evt.Source.Kind(), - "source_data": evt.Source.Data(), - "requested_at": evt.Requested.At(), - "requested_by": evt.Requested.By(), + "app_id": evt.ID.AppID(), + "deployment_number": evt.ID.DeploymentNumber(), + "path": evt.Path, + "config_appname": evt.Config.AppName(), + "config_environment": evt.Config.Environment(), + "config_env": evt.Config.Env(), + "state_status": evt.State.Status(), + "state_logfile": evt.State.LogFile(), + "state_errcode": evt.State.ErrCode(), + "state_services": evt.State.Services(), + "state_started_at": evt.State.StartedAt(), + "state_finished_at": evt.State.FinishedAt(), + "source_discriminator": evt.Source.Discriminator(), + "source": evt.Source, + "requested_at": evt.Requested.At(), + "requested_by": evt.Requested.By(), }). Exec(s, ctx) case domain.DeploymentStateChanged: diff --git a/internal/deployment/infra/sqlite/gateway.go b/internal/deployment/infra/sqlite/gateway.go index 199fcc3c..b3145000 100644 --- a/internal/deployment/infra/sqlite/gateway.go +++ b/internal/deployment/infra/sqlite/gateway.go @@ -59,8 +59,8 @@ func (s *gateway) GetAllDeploymentsByApp(ctx context.Context, appid string, filt deployments.app_id ,deployments.deployment_number ,deployments.config_environment - ,deployments.source_kind - ,deployments.source_data + ,deployments.source_discriminator + ,deployments.source ,deployments.state_status ,deployments.state_errcode ,deployments.state_services @@ -85,8 +85,8 @@ func (s *gateway) GetDeploymentByID(ctx context.Context, appid string, deploymen deployments.app_id ,deployments.deployment_number ,deployments.config_environment - ,deployments.source_kind - ,deployments.source_data + ,deployments.source_discriminator + ,deployments.source ,deployments.state_status ,deployments.state_errcode ,deployments.state_services @@ -123,8 +123,8 @@ func newAppWithLastDeploymentsByEnvDataloader[T any]( ,deployments.app_id ,deployments.deployment_number ,deployments.config_environment - ,deployments.source_kind - ,deployments.source_data + ,deployments.source_discriminator + ,deployments.source ,deployments.state_status ,deployments.state_errcode ,deployments.state_services @@ -209,13 +209,17 @@ func appDetailDataMapper(s storage.Scanner) (a query.AppDetail, err error) { } func lastDeploymentMapper(s storage.Scanner) (d query.Deployment, err error) { - var maxRequestedAt string + var ( + maxRequestedAt string + sourceData string + ) + err = s.Scan( &d.AppID, &d.DeploymentNumber, &d.Environment, - &d.Meta.Kind, - &d.Meta.Data, + &d.Source.Discriminator, + &sourceData, &d.State.Status, &d.State.ErrCode, &d.State.Services, @@ -227,16 +231,24 @@ func lastDeploymentMapper(s storage.Scanner) (d query.Deployment, err error) { &maxRequestedAt, // Needed because go-sqlite3 lib could not extract max(requested_at) into a time.Time... I may switch to another lib in the future ) + if err != nil { + return d, err + } + + d.Source.Data, err = query.SourceDataTypes.From(d.Source.Discriminator, sourceData) + return d, err } func deploymentMapper(scanner storage.Scanner) (d query.Deployment, err error) { + var sourceData string + err = scanner.Scan( &d.AppID, &d.DeploymentNumber, &d.Environment, - &d.Meta.Kind, - &d.Meta.Data, + &d.Source.Discriminator, + &sourceData, &d.State.Status, &d.State.ErrCode, &d.State.Services, @@ -247,5 +259,11 @@ func deploymentMapper(scanner storage.Scanner) (d query.Deployment, err error) { &d.RequestedBy.Email, ) + if err != nil { + return d, err + } + + d.Source.Data, err = query.SourceDataTypes.From(d.Source.Discriminator, sourceData) + return d, err } diff --git a/internal/deployment/infra/sqlite/migrations/1699476081_rename_source_columns.up.sql b/internal/deployment/infra/sqlite/migrations/1699476081_rename_source_columns.up.sql new file mode 100644 index 00000000..dbaf59ec --- /dev/null +++ b/internal/deployment/infra/sqlite/migrations/1699476081_rename_source_columns.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE deployments RENAME COLUMN source_kind TO source_discriminator; +ALTER TABLE deployments RENAME COLUMN source_data TO source; \ No newline at end of file diff --git a/internal/worker/app/command/fail_running_jobs_test.go b/internal/worker/app/command/fail_running_jobs_test.go index b550926f..9de6aa00 100644 --- a/internal/worker/app/command/fail_running_jobs_test.go +++ b/internal/worker/app/command/fail_running_jobs_test.go @@ -21,8 +21,8 @@ func Test_FailRunningJobs(t *testing.T) { t.Run("should reset running jobs", func(t *testing.T) { reason := errors.New("server_reset") ctx := context.Background() - job1 := domain.NewJob("1", "", monad.None[string]()) - job2 := domain.NewJob("2", "", monad.None[string]()) + job1 := domain.NewJob(payload{}, monad.None[string]()) + job2 := domain.NewJob(payload{}, monad.None[string]()) fail, store := sut(job1, job2) diff --git a/internal/worker/app/command/process_next_test.go b/internal/worker/app/command/process_next_test.go index d75cb83f..d817252e 100644 --- a/internal/worker/app/command/process_next_test.go +++ b/internal/worker/app/command/process_next_test.go @@ -29,21 +29,22 @@ func Test_ProcessNext(t *testing.T) { }) t.Run("should process the next job", func(t *testing.T) { - uc, worker := processNext(domain.NewJob("name", "payload", monad.Value("dedupe"))) + var data payload + + uc, worker := processNext(domain.NewJob(data, monad.Value("dedupe"))) err := uc(context.Background(), command.ProcessNextCommand{ - Names: []string{"name"}, + Names: []string{data.Discriminator()}, }) testutil.IsNil(t, err) testutil.IsTrue(t, worker.processedJob.HasValue()) - testutil.Equals(t, "name", worker.processedJob.MustGet().Name()) - testutil.Equals(t, "payload", worker.processedJob.MustGet().Payload()) + testutil.Equals(t, data, worker.processedJob.MustGet().Data().(payload)) worker.processedJob = worker.processedJob.None() err = uc(context.Background(), command.ProcessNextCommand{ - Names: []string{"name"}, + Names: []string{data.Discriminator()}, }) testutil.IsNil(t, err) @@ -53,10 +54,29 @@ func Test_ProcessNext(t *testing.T) { type dummyHandler struct { processedJob monad.Maybe[domain.Job] + err error } func (w *dummyHandler) Process(ctx context.Context, job domain.Job) error { + if w.err != nil { + return w.err + } + w.processedJob = w.processedJob.WithValue(job) return nil } + +func (w *dummyHandler) Prepare(data any) (domain.JobData, monad.Maybe[string], error) { + if w.err != nil { + return nil, monad.None[string](), w.err + } + + return payload{data}, monad.None[string](), nil +} + +type payload struct { + data any +} + +func (p payload) Discriminator() string { return "test" } diff --git a/internal/worker/app/command/queue.go b/internal/worker/app/command/queue.go index e9113596..4375ce7d 100644 --- a/internal/worker/app/command/queue.go +++ b/internal/worker/app/command/queue.go @@ -4,27 +4,24 @@ import ( "context" "github.com/YuukanOO/seelf/internal/worker/domain" - "github.com/YuukanOO/seelf/pkg/monad" - "github.com/YuukanOO/seelf/pkg/validation" - "github.com/YuukanOO/seelf/pkg/validation/strings" ) type QueueCommand struct { - Name string `json:"name"` - Payload string `json:"payload"` - DedupeName monad.Maybe[string] `json:"dedupe_name"` + Payload any `json:"payload"` } -func Queue(writer domain.JobsWriter) func(context.Context, QueueCommand) error { +func Queue( + writer domain.JobsWriter, + handler domain.Handler, +) func(context.Context, QueueCommand) error { return func(ctx context.Context, cmd QueueCommand) error { - if err := validation.Check(validation.Of{ - "name": validation.Is(cmd.Name, strings.Required), - "dedupe_name": validation.Maybe(cmd.DedupeName, strings.Required), - }); err != nil { + data, dedupe, err := handler.Prepare(cmd.Payload) + + if err != nil { return err } - job := domain.NewJob(cmd.Name, cmd.Payload, cmd.DedupeName) + job := domain.NewJob(data, dedupe) return writer.Write(ctx, &job) } diff --git a/internal/worker/app/command/queue_test.go b/internal/worker/app/command/queue_test.go index 445e32ea..1396da23 100644 --- a/internal/worker/app/command/queue_test.go +++ b/internal/worker/app/command/queue_test.go @@ -2,31 +2,32 @@ package command_test import ( "context" + "errors" "testing" "github.com/YuukanOO/seelf/internal/worker/app/command" "github.com/YuukanOO/seelf/internal/worker/infra/memory" "github.com/YuukanOO/seelf/pkg/testutil" - "github.com/YuukanOO/seelf/pkg/validation" ) func Test_Queue(t *testing.T) { - queue := func() func(context.Context, command.QueueCommand) error { - store := memory.NewJobsStore() - return command.Queue(store) + queue := func(prepareErr error) func(context.Context, command.QueueCommand) error { + return command.Queue(memory.NewJobsStore(), &dummyHandler{err: prepareErr}) } - t.Run("should require valid inputs", func(t *testing.T) { - uc := queue() - err := uc(context.Background(), command.QueueCommand{}) + t.Run("should returns an error if no handler can prepare the payload", func(t *testing.T) { + prepareErr := errors.New("prepare_error") + uc := queue(prepareErr) + err := uc(context.Background(), command.QueueCommand{ + Payload: "payload", + }) - testutil.ErrorIs(t, validation.ErrValidationFailed, err) + testutil.ErrorIs(t, prepareErr, err) }) t.Run("should queue a job", func(t *testing.T) { - uc := queue() + uc := queue(nil) err := uc(context.Background(), command.QueueCommand{ - Name: "name", Payload: "payload", }) diff --git a/internal/worker/domain/job.go b/internal/worker/domain/job.go index 95f98a62..4a054e0f 100644 --- a/internal/worker/domain/job.go +++ b/internal/worker/domain/job.go @@ -2,6 +2,7 @@ package domain import ( "context" + "errors" "time" "github.com/YuukanOO/seelf/pkg/event" @@ -12,20 +13,28 @@ import ( const retryDelay = 15 * time.Second +var ( + ErrNoValidHandlerFound = errors.New("no_valid_handler_found") + ErrInvalidPayload = errors.New("invalid_payload") + + JobDataTypes = storage.NewDiscriminatedMapper[JobData]() +) + type ( // VALUE OBJECTS JobID string + JobData storage.Discriminated + // ENTITY Job struct { event.Emitter id JobID - name string dedupeName string // Unique token to avoid multiple workers to process the same job (for example, multiple deployments for the same app and environment) - payload string + data JobData errcode monad.Maybe[string] queuedAt time.Time } @@ -43,6 +52,7 @@ type ( // Represents an object which can handle a specific job. Handler interface { + Prepare(any) (JobData, monad.Maybe[string], error) // Try to prepare a job payload and returns the JobData needed to process it and an eventual dedupe name to use Process(context.Context, Job) error } @@ -50,9 +60,8 @@ type ( JobQueued struct { ID JobID - Name string DedupeName string - Payload string + Data JobData QueuedAt time.Time } @@ -71,13 +80,12 @@ type ( // A dedupe name can be provided to avoid multiple workers to process the same kind of job // at the same time, such as a deployment for the same app and environment. // If no dedupe name is given, the job id will be used instead. -func NewJob(name, payload string, dedupeName monad.Maybe[string]) (j Job) { +func NewJob(data JobData, dedupeName monad.Maybe[string]) (j Job) { jobId := id.New[JobID]() j.apply(JobQueued{ ID: jobId, - Name: name, DedupeName: dedupeName.Get(string(jobId)), - Payload: payload, + Data: data, QueuedAt: time.Now().UTC(), }) @@ -86,15 +94,26 @@ func NewJob(name, payload string, dedupeName monad.Maybe[string]) (j Job) { // Recreates a job from a storage scanner func JobFrom(scanner storage.Scanner) (j Job, err error) { + var ( + dataDiscriminator string + dataPayload string + ) + err = scanner.Scan( &j.id, - &j.name, &j.dedupeName, - &j.payload, + &dataDiscriminator, + &dataPayload, &j.queuedAt, &j.errcode, ) + if err != nil { + return j, err + } + + j.data, err = JobDataTypes.From(dataDiscriminator, dataPayload) + return j, err } @@ -114,17 +133,15 @@ func (j *Job) Done() { }) } -func (j Job) ID() JobID { return j.id } -func (j Job) Name() string { return j.name } -func (j Job) Payload() string { return j.payload } +func (j Job) ID() JobID { return j.id } +func (j Job) Data() JobData { return j.data } func (j *Job) apply(e event.Event) { switch evt := e.(type) { case JobQueued: j.id = evt.ID - j.name = evt.Name j.dedupeName = evt.DedupeName - j.payload = evt.Payload + j.data = evt.Data j.queuedAt = evt.QueuedAt case JobFailed: j.errcode = j.errcode.WithValue(evt.ErrCode) diff --git a/internal/worker/domain/job_test.go b/internal/worker/domain/job_test.go index 828431f0..e14d2026 100644 --- a/internal/worker/domain/job_test.go +++ b/internal/worker/domain/job_test.go @@ -12,45 +12,43 @@ import ( func Test_Job(t *testing.T) { t.Run("can be created", func(t *testing.T) { - name := "jobname" - payload := "somepayload" + var data = payload{} - job := domain.NewJob(name, payload, monad.None[string]()) + job := domain.NewJob(data, monad.None[string]()) testutil.NotEquals(t, "", job.ID()) - testutil.Equals(t, name, job.Name()) - testutil.Equals(t, payload, job.Payload()) + testutil.Equals(t, data, job.Data().(payload)) testutil.HasNEvents(t, &job, 1) evt := testutil.EventIs[domain.JobQueued](t, &job, 0) testutil.Equals(t, job.ID(), evt.ID) - testutil.Equals(t, job.Name(), evt.Name) - testutil.Equals(t, job.Payload(), evt.Payload) + testutil.Equals(t, job.Data(), evt.Data) testutil.IsFalse(t, evt.QueuedAt.IsZero()) }) t.Run("can be created with a dedupe name", func(t *testing.T) { - name := "jobname" - payload := "somepayload" + var data = payload{} - job := domain.NewJob(name, payload, monad.None[string]()) + job := domain.NewJob(data, monad.None[string]()) evt := testutil.EventIs[domain.JobQueued](t, &job, 0) testutil.Equals(t, string(evt.ID), evt.DedupeName) dedupeName := "app-environment" - job = domain.NewJob(name, payload, monad.Value(dedupeName)) + job = domain.NewJob(data, monad.Value(dedupeName)) evt = testutil.EventIs[domain.JobQueued](t, &job, 0) testutil.Equals(t, dedupeName, evt.DedupeName) }) t.Run("can be marked as failed", func(t *testing.T) { + var data = payload{} + err := errors.New("some error") - job := domain.NewJob("jobname", "somepayload", monad.None[string]()) + job := domain.NewJob(data, monad.None[string]()) job.Failed(err) @@ -65,7 +63,9 @@ func Test_Job(t *testing.T) { }) t.Run("can be marked as done", func(t *testing.T) { - job := domain.NewJob("jobname", "somepayload", monad.None[string]()) + var data = payload{} + + job := domain.NewJob(data, monad.None[string]()) job.Done() @@ -74,3 +74,7 @@ func Test_Job(t *testing.T) { testutil.Equals(t, job.ID(), evt.ID) }) } + +type payload struct{} + +func (p payload) Discriminator() string { return "test" } diff --git a/internal/worker/infra/jobs/cleanup/data.go b/internal/worker/infra/jobs/cleanup/data.go new file mode 100644 index 00000000..0695757d --- /dev/null +++ b/internal/worker/infra/jobs/cleanup/data.go @@ -0,0 +1,19 @@ +package cleanup + +import ( + "database/sql/driver" + + deployment "github.com/YuukanOO/seelf/internal/deployment/domain" + "github.com/YuukanOO/seelf/internal/worker/domain" +) + +type Data deployment.AppID + +func (d Data) Discriminator() string { return "deployment.cleanup-app" } +func (d Data) Value() (driver.Value, error) { return string(d), nil } + +func init() { + domain.JobDataTypes.Register(Data(""), func(value string) (domain.JobData, error) { + return Data(value), nil + }) +} diff --git a/internal/worker/infra/jobs/cleanup/handler.go b/internal/worker/infra/jobs/cleanup/handler.go index 3c569c4e..da7efb90 100644 --- a/internal/worker/infra/jobs/cleanup/handler.go +++ b/internal/worker/infra/jobs/cleanup/handler.go @@ -3,49 +3,57 @@ package cleanup import ( "context" - deplcmd "github.com/YuukanOO/seelf/internal/deployment/app/command" + "github.com/YuukanOO/seelf/internal/deployment/app/command" depldomain "github.com/YuukanOO/seelf/internal/deployment/domain" - "github.com/YuukanOO/seelf/internal/worker/app/command" "github.com/YuukanOO/seelf/internal/worker/domain" "github.com/YuukanOO/seelf/internal/worker/infra/jobs" "github.com/YuukanOO/seelf/pkg/log" + "github.com/YuukanOO/seelf/pkg/monad" + "github.com/YuukanOO/seelf/pkg/types" ) -const JobName = "deployment:cleanup-app" +type ( + Request depldomain.AppCleanupRequested -// Creates a new deployment job for the given deployment id. -func Queue(id depldomain.AppID) command.QueueCommand { - return command.QueueCommand{ - Name: JobName, - Payload: string(id), + handler struct { + logger log.Logger + cleanup func(context.Context, command.CleanupAppCommand) error } -} - -type handler struct { - logger log.Logger - cleanup func(context.Context, deplcmd.CleanupAppCommand) error -} +) -func New(logger log.Logger, cleanup func(context.Context, deplcmd.CleanupAppCommand) error) jobs.Handler { +func New(logger log.Logger, cleanup func(context.Context, command.CleanupAppCommand) error) jobs.Handler { return &handler{ logger: logger, cleanup: cleanup, } } -func (*handler) JobName() string { - return JobName +func (*handler) CanPrepare(data any) bool { return types.Is[Request](data) } +func (*handler) CanProcess(data domain.JobData) bool { return types.Is[Data](data) } + +func (h *handler) Prepare(payload any) (domain.JobData, monad.Maybe[string], error) { + req, ok := payload.(Request) + + if !ok { + return nil, monad.None[string](), domain.ErrInvalidPayload + } + + return Data(req.ID), monad.None[string](), nil } func (h *handler) Process(ctx context.Context, job domain.Job) error { - appid := job.Payload() + data, ok := job.Data().(Data) + + if !ok { + return domain.ErrInvalidPayload + } - if err := h.cleanup(ctx, deplcmd.CleanupAppCommand{ - ID: appid, + if err := h.cleanup(ctx, command.CleanupAppCommand{ + ID: string(data), }); err != nil { h.logger.Errorw("cleanup job has failed", "error", err, - "appid", appid, + "appid", data, ) return err } diff --git a/internal/worker/infra/jobs/deploy/data.go b/internal/worker/infra/jobs/deploy/data.go new file mode 100644 index 00000000..c318d50d --- /dev/null +++ b/internal/worker/infra/jobs/deploy/data.go @@ -0,0 +1,50 @@ +package deploy + +import ( + "database/sql/driver" + "encoding/json" + "strconv" + "strings" + + deployment "github.com/YuukanOO/seelf/internal/deployment/domain" + "github.com/YuukanOO/seelf/internal/worker/domain" + "github.com/YuukanOO/seelf/pkg/storage" +) + +type Data struct { + AppID deployment.AppID `json:"app_id"` + DeploymentNumber deployment.DeploymentNumber `json:"deployment_number"` +} + +func (d Data) Discriminator() string { return "deployment.deploy" } +func (d Data) Value() (driver.Value, error) { return storage.ValueJSON(d) } + +func init() { + domain.JobDataTypes.Register(Data{}, func(value string) (domain.JobData, error) { + return tryParseDeployJobData(value) + }) +} + +func tryParseDeployJobData(value string) (domain.JobData, error) { + var data Data + + // Handle old payload for compatibility. + if !json.Valid([]byte(value)) { + separatorIdx := strings.Index(value, ":") + appid, numberStr := value[:separatorIdx], value[separatorIdx+1:] + number, err := strconv.Atoi(numberStr) + + if err != nil { + return nil, err + } + + data.AppID = deployment.AppID(appid) + data.DeploymentNumber = deployment.DeploymentNumber(number) + + return data, nil + } + + err := storage.ScanJSON(value, &data) + + return data, err +} diff --git a/internal/worker/infra/jobs/deploy/handler.go b/internal/worker/infra/jobs/deploy/handler.go index 32a46eb6..793c5f29 100644 --- a/internal/worker/infra/jobs/deploy/handler.go +++ b/internal/worker/infra/jobs/deploy/handler.go @@ -3,35 +3,24 @@ package deploy import ( "context" "fmt" - "strconv" - "strings" deplcmd "github.com/YuukanOO/seelf/internal/deployment/app/command" depldomain "github.com/YuukanOO/seelf/internal/deployment/domain" - "github.com/YuukanOO/seelf/internal/worker/app/command" "github.com/YuukanOO/seelf/internal/worker/domain" "github.com/YuukanOO/seelf/internal/worker/infra/jobs" "github.com/YuukanOO/seelf/pkg/log" "github.com/YuukanOO/seelf/pkg/monad" + "github.com/YuukanOO/seelf/pkg/types" ) -const JobName = "deployment:deploy" +type ( + Request depldomain.DeploymentCreated -// Creates a new deployment job for the given deployment id. -func Queue(evt depldomain.DeploymentCreated) command.QueueCommand { - id := evt.ID - - return command.QueueCommand{ - Name: JobName, - Payload: fmt.Sprintf("%s:%d", id.AppID(), id.DeploymentNumber()), - DedupeName: monad.Value(fmt.Sprintf("%s:%s", JobName, evt.Config.ProjectName())), + handler struct { + logger log.Logger + deploy func(context.Context, deplcmd.DeployCommand) error } -} - -type handler struct { - logger log.Logger - deploy func(context.Context, deplcmd.DeployCommand) error -} +) func New(logger log.Logger, deploy func(context.Context, deplcmd.DeployCommand) error) jobs.Handler { return &handler{ @@ -40,25 +29,39 @@ func New(logger log.Logger, deploy func(context.Context, deplcmd.DeployCommand) } } -func (*handler) JobName() string { - return JobName +func (*handler) CanPrepare(data any) bool { return types.Is[Request](data) } +func (*handler) CanProcess(data domain.JobData) bool { return types.Is[Data](data) } + +func (h *handler) Prepare(payload any) (domain.JobData, monad.Maybe[string], error) { + evt, ok := payload.(Request) + + if !ok { + return nil, monad.None[string](), domain.ErrInvalidPayload + } + + data := Data{evt.ID.AppID(), evt.ID.DeploymentNumber()} + dedupeName := monad.Value(fmt.Sprintf("%s.%s", data.Discriminator(), evt.Config.ProjectName())) + + return data, dedupeName, nil } func (h *handler) Process(ctx context.Context, job domain.Job) error { - parts := strings.Split(job.Payload(), ":") - appid := parts[0] - number, _ := strconv.Atoi(parts[1]) + data, ok := job.Data().(Data) + + if !ok { + return domain.ErrInvalidPayload + } // Here the error is not given back to the worker because if it fails, the information // is already in the associated Deployment. The only exception is for sql errors. if err := h.deploy(ctx, deplcmd.DeployCommand{ - AppID: appid, - DeploymentNumber: number, + AppID: string(data.AppID), + DeploymentNumber: int(data.DeploymentNumber), }); err != nil { h.logger.Errorw("deploy job has failed", "error", err, - "appid", appid, - "deployment", number, + "appid", data.AppID, + "deployment", data.DeploymentNumber, ) } diff --git a/internal/worker/infra/jobs/facade.go b/internal/worker/infra/jobs/facade.go index 29f2775c..8c2f2ff4 100644 --- a/internal/worker/infra/jobs/facade.go +++ b/internal/worker/infra/jobs/facade.go @@ -5,46 +5,37 @@ import ( "fmt" "github.com/YuukanOO/seelf/internal/worker/domain" - "github.com/YuukanOO/seelf/pkg/log" + "github.com/YuukanOO/seelf/pkg/monad" ) type ( - facade struct { - logger log.Logger - handlers map[string]Handler - } - Handler interface { domain.Handler - JobName() string // Retrieve the name of the job handled by this handler. + CanPrepare(any) bool + CanProcess(domain.JobData) bool } -) -func NewFacade(logger log.Logger, handlers ...Handler) domain.Handler { - handlersMap := make(map[string]Handler, len(handlers)) + facade struct { + handlers []Handler + } +) - for _, handler := range handlers { - name := handler.JobName() +func NewFacade(handlers ...Handler) domain.Handler { + return &facade{handlers} +} - // Should never happened, but let's make it clear - if _, exists := handlersMap[name]; exists { - panic("duplicate job handler for " + name) +func (w *facade) Prepare(payload any) (domain.JobData, monad.Maybe[string], error) { + for _, handler := range w.handlers { + if handler.CanPrepare(payload) { + return handler.Prepare(payload) } - - handlersMap[name] = handler } - return &facade{logger, handlersMap} + return nil, monad.None[string](), domain.ErrNoValidHandlerFound } func (w *facade) Process(ctx context.Context, job domain.Job) (err error) { - handler, found := w.handlers[job.Name()] - - if !found { - w.logger.Errorw("could not find a job handler for", - "name", job.Name()) - return nil - } + data := job.Data() defer func() { if r := recover(); r != nil { @@ -52,6 +43,11 @@ func (w *facade) Process(ctx context.Context, job domain.Job) (err error) { } }() - err = handler.Process(ctx, job) - return + for _, handler := range w.handlers { + if handler.CanProcess(data) { + return handler.Process(ctx, job) + } + } + + return domain.ErrNoValidHandlerFound } diff --git a/internal/worker/infra/memory/jobs.go b/internal/worker/infra/memory/jobs.go index ce0d12ca..5b2df163 100644 --- a/internal/worker/infra/memory/jobs.go +++ b/internal/worker/infra/memory/jobs.go @@ -35,10 +35,10 @@ func NewJobsStore(existingJobs ...domain.Job) JobsStore { return s } -func (s *jobsStore) GetNextPendingJob(ctx context.Context, names []string) (domain.Job, error) { +func (s *jobsStore) GetNextPendingJob(ctx context.Context, jobTypes []string) (domain.Job, error) { for _, job := range s.jobs { - for _, name := range names { - if job.value.Name() == name { + for _, jobType := range jobTypes { + if job.value.Data().Discriminator() == jobType { return *job.value, nil } } diff --git a/internal/worker/infra/sqlite/jobs.go b/internal/worker/infra/sqlite/jobs.go index 5bd59752..beeb621b 100644 --- a/internal/worker/infra/sqlite/jobs.go +++ b/internal/worker/infra/sqlite/jobs.go @@ -24,7 +24,7 @@ func NewJobsStore(db sqlite.Database) JobsStore { return &jobsStore{db} } -func (s *jobsStore) GetNextPendingJob(ctx context.Context, names []string) (domain.Job, error) { +func (s *jobsStore) GetNextPendingJob(ctx context.Context, jobType []string) (domain.Job, error) { // This query will lock the database to make sure we can't retrieved the same job twice. return builder. Query[domain.Job](` @@ -36,10 +36,10 @@ WHERE id IN ( retrieved = 0 AND queued_at <= DATETIME('now') AND dedupe_name NOT IN (SELECT DISTINCT dedupe_name FROM jobs WHERE retrieved = 1)`). - S(builder.Array("AND name IN", names)). + S(builder.Array("AND data_discriminator IN", jobType)). F(`ORDER BY queued_at LIMIT 1 ) -RETURNING id, name, dedupe_name, payload, queued_at, errcode`). +RETURNING id, dedupe_name, data_discriminator, data, queued_at, errcode`). One(s, ctx, domain.JobFrom) } @@ -48,9 +48,9 @@ func (s *jobsStore) GetRunningJobs(ctx context.Context) ([]domain.Job, error) { Query[domain.Job](` SELECT id - ,name ,dedupe_name - ,payload + ,data_discriminator + ,data ,queued_at ,errcode FROM jobs @@ -64,11 +64,11 @@ func (s *jobsStore) Write(c context.Context, jobs ...*domain.Job) error { case domain.JobQueued: return builder. Insert("jobs", builder.Values{ - "id": evt.ID, - "name": evt.Name, - "dedupe_name": evt.DedupeName, - "payload": evt.Payload, - "queued_at": evt.QueuedAt, + "id": evt.ID, + "dedupe_name": evt.DedupeName, + "data_discriminator": evt.Data.Discriminator(), + "data": evt.Data, + "queued_at": evt.QueuedAt, }). Exec(s, ctx) case domain.JobFailed: diff --git a/internal/worker/infra/sqlite/migrations/1696843357_add_dedupe_name.up.sql b/internal/worker/infra/sqlite/migrations/1696843357_add_dedupe_name.up.sql index bf24e51c..0fc79b2a 100644 --- a/internal/worker/infra/sqlite/migrations/1696843357_add_dedupe_name.up.sql +++ b/internal/worker/infra/sqlite/migrations/1696843357_add_dedupe_name.up.sql @@ -1,3 +1,10 @@ +-- Now use period to namespace job names +UPDATE jobs +SET name = replace(name, ':', '.'); + ALTER TABLE jobs ADD dedupe_name TEXT NOT NULL DEFAULT ''; ALTER TABLE jobs ADD retrieved BOOLEAN NOT NULL DEFAULT false; -CREATE INDEX idx_jobs_dedupe_name ON jobs(dedupe_name); \ No newline at end of file +ALTER TABLE jobs RENAME COLUMN name TO data_discriminator; +ALTER TABLE jobs RENAME COLUMN payload TO data; +CREATE INDEX idx_jobs_dedupe_name ON jobs(dedupe_name); +CREATE INDEX idx_jobs_data_discriminator ON jobs(data_discriminator); diff --git a/pkg/http/helpers.go b/pkg/http/helpers.go index ecfc7f21..2f2a7ac5 100644 --- a/pkg/http/helpers.go +++ b/pkg/http/helpers.go @@ -11,6 +11,8 @@ import ( "github.com/gin-gonic/gin" ) +var ErrUnexpected = apperr.New("unexpected_error") + // Tiny interface to represents needed contrat in order to use helpers provided by this package. type Server interface { IsSecure() bool @@ -78,22 +80,25 @@ func Ok[TOut any](ctx *gin.Context, data TOut) error { // Handle the given non-nil error and sets the status code based on error type. func handleError(s Server, ctx *gin.Context, err error) { - var status int + var ( + status int = http.StatusInternalServerError + data any = ErrUnexpected + ) // Translates the error type to the appropriate HTTP status code if _, isAppErr := apperr.As[apperr.Error](err); isAppErr { status = http.StatusBadRequest // Default to HTTP 400 + data = err if errors.Is(err, apperr.ErrNotFound) { status = http.StatusNotFound // But if it's a not found, that's an HTTP 404 } } else { s.Logger().Errorw(err.Error(), "error", err) - status = http.StatusInternalServerError } ctx.Error(err) - ctx.AbortWithStatusJSON(status, err) + ctx.AbortWithStatusJSON(status, data) } func addCommonResponseHeaders(ctx *gin.Context) { diff --git a/pkg/storage/discriminated.go b/pkg/storage/discriminated.go new file mode 100644 index 00000000..c54a3bc2 --- /dev/null +++ b/pkg/storage/discriminated.go @@ -0,0 +1,44 @@ +package storage + +type ( + // Represents an extensible type known through a discriminator value when implementing + // the Scanner interface as no sense since we don't know which type is used in our entity. + Discriminated interface { + Discriminator() string + } + + // Function used to map from a raw value to a discriminated type. + DiscriminatedMapperFunc[T Discriminated] func(string) (T, error) + + // Mapper struct to be able to rehydrate discriminated types. + DiscriminatedMapper[T Discriminated] map[string]DiscriminatedMapperFunc[T] +) + +// Builds a new mapper configuration to hold a discriminated type. +func NewDiscriminatedMapper[T Discriminated]() DiscriminatedMapper[T] { + return make(DiscriminatedMapper[T]) +} + +// Register a new concrete type available to the mapper. +func (m DiscriminatedMapper[T]) Register(concreteType T, mapper DiscriminatedMapperFunc[T]) { + discriminator := concreteType.Discriminator() + + // Check for duplicate registrations, should panic because it's a dev error. + if _, found := m[discriminator]; found { + panic("duplicate concrete type registered for " + discriminator) + } + + m[discriminator] = mapper +} + +// Rehydrate a discriminated type from a raw value. +func (m DiscriminatedMapper[T]) From(discriminator, value string) (T, error) { + mapper, found := m[discriminator] + + if !found { + var t T + return t, ErrCouldNotUnmarshalGivenType + } + + return mapper(value) +} diff --git a/pkg/storage/discriminated_test.go b/pkg/storage/discriminated_test.go new file mode 100644 index 00000000..6d248000 --- /dev/null +++ b/pkg/storage/discriminated_test.go @@ -0,0 +1,58 @@ +package storage_test + +import ( + "testing" + + "github.com/YuukanOO/seelf/pkg/storage" + "github.com/YuukanOO/seelf/pkg/testutil" +) + +type ( + discriminatedType storage.Discriminated + + type1 struct { + data string + } + + type2 struct { + data string + } +) + +func (t type1) Discriminator() string { return "type1" } +func (t type2) Discriminator() string { return "type2" } + +var mapper = storage.NewDiscriminatedMapper[discriminatedType]() + +func Test_Discriminated(t *testing.T) { + mapper.Register(type1{}, func(data string) (discriminatedType, error) { return type1{data}, nil }) + mapper.Register(type2{}, func(data string) (discriminatedType, error) { return type2{data}, nil }) + + t.Run("should panic if a type is already registered with the same discriminator", func(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Errorf("expected panic, got none") + } + }() + + mapper.Register(type1{}, func(data string) (discriminatedType, error) { return type1{data}, nil }) + }) + + t.Run("should error if the discriminator is not known", func(t *testing.T) { + _, err := mapper.From("unknown", "") + + testutil.ErrorIs(t, err, storage.ErrCouldNotUnmarshalGivenType) + }) + + t.Run("should return the correct type", func(t *testing.T) { + t1, err := mapper.From("type1", "data1") + + testutil.IsNil(t, err) + testutil.Equals(t, type1{"data1"}, t1.(type1)) + + t2, err := mapper.From("type2", "data2") + + testutil.IsNil(t, err) + testutil.Equals(t, type2{"data2"}, t2.(type2)) + }) +} diff --git a/pkg/storage/scanner.go b/pkg/storage/scanner.go index a5e1a169..58f22419 100644 --- a/pkg/storage/scanner.go +++ b/pkg/storage/scanner.go @@ -1,6 +1,7 @@ package storage import ( + "database/sql/driver" "encoding/json" "errors" ) @@ -14,13 +15,14 @@ type ( // // Since domain store will always constructs an aggregate as a whole, it makes the process // relatively easy to keep under control. - // - // The things to keep in mind is the order used when scanning which should always be the - // same. - // - // IMPORTANT: it will fails if the type of a monad.Value is not a primitive type or - // does not implements the sql.Scanner interface. Scanner interface { + // Scan current row into the destination fields. + // The things to keep in mind is the order used when scanning which should always be the + // same as the order of fields returned by the underlying implementation (for a database, + // the order of SELECT columns). + // + // IMPORTANT: it will fails if the type of a monad.Value is not a primitive type or + // does not implements the sql.Scanner interface. Scan(...any) error } @@ -38,3 +40,11 @@ func ScanJSON[T any](value any, target *T) error { return json.Unmarshal([]byte(str), target) } + +// Ease the valueing of a json serialized field by calling json.Marshal and returning +// a string as accepted as a valid driver.Value. +func ValueJSON[T any](v T) (driver.Value, error) { + b, err := json.Marshal(v) + + return string(b), err +} diff --git a/pkg/storage/sqlite/builder/builder.go b/pkg/storage/sqlite/builder/builder.go index eb13bda7..4d1a3144 100644 --- a/pkg/storage/sqlite/builder/builder.go +++ b/pkg/storage/sqlite/builder/builder.go @@ -162,6 +162,8 @@ func (q *queryBuilder[T]) All( return nil, err } + defer rows.Close() + results := make([]T, 0) // Instantiates needed stuff for data loaders @@ -172,8 +174,6 @@ func (q *queryBuilder[T]) All( mappings[i] = make(KeysMapping) } - defer rows.Close() - for rows.Next() { row, err := mapper(rows) diff --git a/pkg/types/is.go b/pkg/types/is.go new file mode 100644 index 00000000..5ee04657 --- /dev/null +++ b/pkg/types/is.go @@ -0,0 +1,7 @@ +package types + +// Checks if the given obj is of type T. +func Is[T any](obj any) bool { + _, ok := obj.(T) + return ok +} diff --git a/pkg/types/is_test.go b/pkg/types/is_test.go new file mode 100644 index 00000000..620cf0c6 --- /dev/null +++ b/pkg/types/is_test.go @@ -0,0 +1,25 @@ +package types_test + +import ( + "testing" + + "github.com/YuukanOO/seelf/pkg/testutil" + "github.com/YuukanOO/seelf/pkg/types" +) + +type ( + type1 struct{} + type2 struct{} +) + +func Test_Is(t *testing.T) { + t.Run("should be able to return if an object is of a given type", func(t *testing.T) { + var ( + t1 any = type1{} + t2 any = type2{} + ) + + testutil.IsTrue(t, types.Is[type1](t1)) + testutil.IsFalse(t, types.Is[type1](t2)) + }) +}