Skip to content

Commit

Permalink
chore: big scheduler refactoring to make things easier (wip)
Browse files Browse the repository at this point in the history
Also refactor how cleanup are handled for apps and targets.
  • Loading branch information
YuukanOO committed Oct 2, 2024
1 parent b0176e9 commit 9a242a9
Show file tree
Hide file tree
Showing 80 changed files with 1,902 additions and 1,499 deletions.
13 changes: 7 additions & 6 deletions cmd/serve/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package serve

import (
"github.com/YuukanOO/seelf/pkg/bus"
"github.com/YuukanOO/seelf/pkg/bus/embedded/get_jobs"
"github.com/YuukanOO/seelf/pkg/http"
"github.com/gin-gonic/gin"
)
Expand All @@ -12,13 +13,13 @@ type listJobsFilters struct {

func (s *server) listJobsHandler() gin.HandlerFunc {
return http.Bind(s, func(ctx *gin.Context, request listJobsFilters) error {
var filters bus.GetJobsFilters
var filters get_jobs.Query

if request.Page != 0 {
filters.Page.Set(request.Page)
}

jobs, err := s.scheduledJobsStore.GetAllJobs(ctx.Request.Context(), filters)
jobs, err := bus.Send(s.bus, ctx.Request.Context(), filters)

if err != nil {
return err
Expand All @@ -30,11 +31,11 @@ func (s *server) listJobsHandler() gin.HandlerFunc {

func (s *server) deleteJobsHandler() gin.HandlerFunc {
return http.Send(s, func(ctx *gin.Context) error {
err := s.scheduledJobsStore.Delete(ctx.Request.Context(), ctx.Param("id"))
// err := s.scheduledJobsStore.Delete(ctx.Request.Context(), ctx.Param("id"))

if err != nil {
return err
}
// if err != nil {
// return err
// }

return http.NoContent(ctx)
})
Expand Down
14 changes: 9 additions & 5 deletions cmd/serve/middlewares.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"strings"
"time"

"github.com/YuukanOO/seelf/internal/auth/app/api_login"
"github.com/YuukanOO/seelf/internal/auth/domain"
"github.com/YuukanOO/seelf/pkg/bus"
httputils "github.com/YuukanOO/seelf/pkg/http"
"github.com/gin-contrib/sessions"
"github.com/gin-gonic/gin"
Expand All @@ -25,8 +27,8 @@ var errUnauthorized = errors.New("unauthorized")
func (s *server) authenticate(withApiAccess bool) gin.HandlerFunc {
return func(ctx *gin.Context) {
// First, try to find a user id in the encrypted session cookie
sess := sessions.Default(ctx)
uid, ok := sess.Get(userSessionKey).(string)
userSession := sessions.Default(ctx)
uid, ok := userSession.Get(userSessionKey).(string)
failed := !ok || uid == ""

// If it failed and api access is not allowed, return early
Expand All @@ -50,15 +52,17 @@ func (s *server) authenticate(withApiAccess bool) gin.HandlerFunc {
return
}

id, err := s.usersReader.GetIDFromAPIKey(ctx.Request.Context(), domain.APIKey(authHeader[apiAuthPrefixLength:]))
id, err := bus.Send(s.bus, ctx.Request.Context(), api_login.Query{
Key: authHeader[apiAuthPrefixLength:],
})

if err != nil {
_ = ctx.AbortWithError(http.StatusUnauthorized, errUnauthorized)
return
}

// Attach the user id to the context passed down in every usecases.
ctx.Request = ctx.Request.WithContext(domain.WithUserID(ctx.Request.Context(), id))
// Attach the user id to the context passed down in every use cases.
ctx.Request = ctx.Request.WithContext(domain.WithUserID(ctx.Request.Context(), domain.UserID(id)))

ctx.Next()
}
Expand Down
21 changes: 8 additions & 13 deletions cmd/serve/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"time"

"github.com/YuukanOO/seelf/cmd/startup"
"github.com/YuukanOO/seelf/internal/auth/domain"
"github.com/YuukanOO/seelf/pkg/bus"
"github.com/YuukanOO/seelf/pkg/log"
"github.com/gin-contrib/sessions"
Expand All @@ -39,25 +38,21 @@ type (
}

server struct {
options ServerOptions
router *gin.Engine
bus bus.Dispatcher
logger log.Logger
usersReader domain.UsersReader
scheduledJobsStore bus.ScheduledJobsStore
options ServerOptions
router *gin.Engine
bus bus.Dispatcher
logger log.Logger
}
)

func newHttpServer(options ServerOptions, root startup.ServerRoot) *server {
gin.SetMode(gin.ReleaseMode)

s := &server{
options: options,
router: gin.New(),
usersReader: root.UsersReader(),
scheduledJobsStore: root.ScheduledJobsStore(),
bus: root.Bus(),
logger: root.Logger(),
options: options,
router: gin.New(),
bus: root.Bus(),
logger: root.Logger(),
}

_ = s.router.SetTrustedProxies(nil)
Expand Down
53 changes: 20 additions & 33 deletions cmd/startup/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,12 @@ import (
"github.com/YuukanOO/seelf/internal/deployment/app/cleanup_app"
"github.com/YuukanOO/seelf/internal/deployment/app/cleanup_target"
"github.com/YuukanOO/seelf/internal/deployment/app/configure_target"
"github.com/YuukanOO/seelf/internal/deployment/app/delete_app"
"github.com/YuukanOO/seelf/internal/deployment/app/delete_target"
"github.com/YuukanOO/seelf/internal/deployment/app/deploy"
"github.com/YuukanOO/seelf/internal/deployment/app/expose_seelf_container"
deploymentdomain "github.com/YuukanOO/seelf/internal/deployment/domain"
deploymentinfra "github.com/YuukanOO/seelf/internal/deployment/infra"
"github.com/YuukanOO/seelf/pkg/bus"
"github.com/YuukanOO/seelf/pkg/bus/memory"
"github.com/YuukanOO/seelf/pkg/bus/embedded"
bussqlite "github.com/YuukanOO/seelf/pkg/bus/sqlite"
"github.com/YuukanOO/seelf/pkg/log"
"github.com/YuukanOO/seelf/pkg/monad"
Expand All @@ -30,8 +28,6 @@ type (
Cleanup() error
Bus() bus.Dispatcher
Logger() log.Logger
UsersReader() domain.UsersReader
ScheduledJobsStore() bus.ScheduledJobsStore
}

ServerOptions interface {
Expand All @@ -47,12 +43,10 @@ type (
}

serverRoot struct {
bus bus.Bus
logger log.Logger
db *sqlite.Database
usersReader domain.UsersReader
schedulerStore bus.ScheduledJobsStore
scheduler bus.RunnableScheduler
bus bus.Bus
logger log.Logger
db *sqlite.Database
scheduler *embedded.Runner
}
)

Expand All @@ -63,10 +57,7 @@ func Server(options ServerOptions, logger log.Logger) (ServerRoot, error) {
logger: logger,
}

// embedded.NewBus()
// embedded.NewScheduler()

s.bus = memory.NewBus()
s.bus = embedded.NewBus()

db, err := sqlite.Open(options.ConnectionString(), s.logger, s.bus)

Expand All @@ -76,31 +67,29 @@ func Server(options ServerOptions, logger log.Logger) (ServerRoot, error) {

s.db = db

s.schedulerStore = bussqlite.NewScheduledJobsStore(s.db)
jobsStore, err := bussqlite.Setup(s.bus, s.db)

if err = s.schedulerStore.Setup(); err != nil {
if err != nil {
return nil, err
}

s.scheduler = bus.NewScheduler(s.schedulerStore, s.logger, s.bus, options.RunnersPollInterval(),
bus.WorkerGroup{
s.scheduler = embedded.NewRunner(jobsStore, s.logger, s.bus, options.RunnersPollInterval(),
embedded.WorkerGroup{
Size: options.RunnersDeploymentCount(),
Messages: []string{deploy.Command{}.Name_()},
Requests: []bus.AsyncRequest{deploy.Command{}},
},
bus.WorkerGroup{
embedded.WorkerGroup{
Size: options.RunnersCleanupCount(),
Messages: []string{
cleanup_app.Command{}.Name_(),
delete_app.Command{}.Name_(),
configure_target.Command{}.Name_(),
cleanup_target.Command{}.Name_(),
delete_target.Command{}.Name_(),
Requests: []bus.AsyncRequest{
cleanup_app.Command{},
cleanup_target.Command{},
configure_target.Command{},
},
},
)

// Setup auth infrastructure
if s.usersReader, err = authinfra.Setup(s.logger, s.db, s.bus); err != nil {
if err = authinfra.Setup(s.logger, s.db, s.bus); err != nil {
return nil, err
}

Expand All @@ -110,7 +99,7 @@ func Server(options ServerOptions, logger log.Logger) (ServerRoot, error) {
s.logger,
s.db,
s.bus,
s.scheduler,
jobsStore,
); err != nil {
return nil, err
}
Expand Down Expand Up @@ -153,7 +142,5 @@ func (s *serverRoot) Cleanup() error {
return s.db.Close()
}

func (s *serverRoot) Bus() bus.Dispatcher { return s.bus }
func (s *serverRoot) Logger() log.Logger { return s.logger }
func (s *serverRoot) UsersReader() domain.UsersReader { return s.usersReader }
func (s *serverRoot) ScheduledJobsStore() bus.ScheduledJobsStore { return s.schedulerStore }
func (s *serverRoot) Bus() bus.Dispatcher { return s.bus }
func (s *serverRoot) Logger() log.Logger { return s.logger }
13 changes: 13 additions & 0 deletions internal/auth/app/api_login/api_login.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package api_login

import "github.com/YuukanOO/seelf/pkg/bus"

type Query struct {
bus.Query[string]

Key string
}

func (Query) Name_() string { return "auth.command.api_login" }

// Implemented directly by the gateway for now
1 change: 0 additions & 1 deletion internal/auth/domain/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ type (

UsersReader interface {
GetAdminUser(context.Context) (User, error)
GetIDFromAPIKey(context.Context, APIKey) (UserID, error)
CheckEmailAvailability(context.Context, Email, ...UserID) (EmailRequirement, error)
GetByEmail(context.Context, Email) (User, error)
GetByID(context.Context, UserID) (User, error)
Expand Down
10 changes: 5 additions & 5 deletions internal/auth/infra/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/YuukanOO/seelf/internal/auth/app/login"
"github.com/YuukanOO/seelf/internal/auth/app/refresh_api_key"
"github.com/YuukanOO/seelf/internal/auth/app/update_user"
"github.com/YuukanOO/seelf/internal/auth/domain"
"github.com/YuukanOO/seelf/internal/auth/infra/crypto"
authsqlite "github.com/YuukanOO/seelf/internal/auth/infra/sqlite"
)
Expand All @@ -19,9 +18,9 @@ func Setup(
logger log.Logger,
db *sqlite.Database,
b bus.Bus,
) (domain.UsersReader, error) {
) error {
usersStore := authsqlite.NewUsersStore(db)
authQueryHandler := authsqlite.NewGateway(db)
gateway := authsqlite.NewGateway(db)

passwordHasher := crypto.NewBCryptHasher()
keyGenerator := crypto.NewKeyGenerator()
Expand All @@ -30,7 +29,8 @@ func Setup(
bus.Register(b, create_first_account.Handler(usersStore, usersStore, passwordHasher, keyGenerator))
bus.Register(b, update_user.Handler(usersStore, usersStore, passwordHasher))
bus.Register(b, refresh_api_key.Handler(usersStore, usersStore, keyGenerator))
bus.Register(b, authQueryHandler.GetProfile)
bus.Register(b, gateway.GetIDFromAPIKey)
bus.Register(b, gateway.GetProfile)

return usersStore, db.Migrate(authsqlite.Migrations)
return db.Migrate(authsqlite.Migrations)
}
15 changes: 11 additions & 4 deletions internal/auth/infra/sqlite/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,22 @@ package sqlite
import (
"context"

"github.com/YuukanOO/seelf/internal/auth/app/api_login"
"github.com/YuukanOO/seelf/internal/auth/app/get_profile"
"github.com/YuukanOO/seelf/pkg/storage"
"github.com/YuukanOO/seelf/pkg/storage/sqlite"
"github.com/YuukanOO/seelf/pkg/storage/sqlite/builder"
)

type gateway struct {
type Gateway struct {
db *sqlite.Database
}

func NewGateway(db *sqlite.Database) *gateway {
return &gateway{db}
func NewGateway(db *sqlite.Database) *Gateway {
return &Gateway{db}
}

func (s *gateway) GetProfile(ctx context.Context, q get_profile.Query) (get_profile.Profile, error) {
func (s *Gateway) GetProfile(ctx context.Context, q get_profile.Query) (get_profile.Profile, error) {
return builder.
Query[get_profile.Profile](`
SELECT
Expand All @@ -30,6 +31,12 @@ func (s *gateway) GetProfile(ctx context.Context, q get_profile.Query) (get_prof
One(s.db, ctx, profileMapper)
}

func (s *Gateway) GetIDFromAPIKey(ctx context.Context, c api_login.Query) (string, error) {
return builder.
Query[string]("SELECT id FROM users WHERE api_key = ?", c.Key).
Extract(s.db, ctx)
}

func profileMapper(row storage.Scanner) (p get_profile.Profile, err error) {
err = row.Scan(
&p.ID,
Expand Down
8 changes: 1 addition & 7 deletions internal/auth/infra/sqlite/users.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,8 @@ func (s *usersStore) GetByEmail(ctx context.Context, email domain.Email) (u doma
One(s.db, ctx, domain.UserFrom)
}

func (s *usersStore) GetIDFromAPIKey(ctx context.Context, key domain.APIKey) (domain.UserID, error) {
return builder.
Query[domain.UserID]("SELECT id FROM users WHERE api_key = ?", key).
Extract(s.db, ctx)
}

func (s *usersStore) Write(c context.Context, users ...*domain.User) error {
return sqlite.WriteAndDispatch(s.db, c, users, func(ctx context.Context, e event.Event) error {
return sqlite.WriteEvents(s.db, c, users, func(ctx context.Context, e event.Event) error {
switch evt := e.(type) {
case domain.UserRegistered:
return builder.
Expand Down
Loading

0 comments on commit 9a242a9

Please sign in to comment.