Skip to content

Commit

Permalink
cmd: add seb
Browse files Browse the repository at this point in the history
  • Loading branch information
micvbang committed Jul 1, 2024
1 parent 23aec2f commit e2cf380
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 0 deletions.
28 changes: 28 additions & 0 deletions cmd/seb/app/root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package app

import (
"fmt"
"os"

"github.com/spf13/cobra"
)

var rootCmd = &cobra.Command{
Use: "seb",
Short: "Seb",
Long: `Seb is a Simple Event Broker that has the goals of keeping your data safe, being cheap to run and easy to manage`,
Run: func(cmd *cobra.Command, args []string) {
cmd.Help()
},
}

func Execute() {
if err := rootCmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
}
}

func init() {
rootCmd.AddCommand(serveCmd)
}
137 changes: 137 additions & 0 deletions cmd/seb/app/serve.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package app

import (
"context"
"fmt"
"net/http"
"os"
"path"
"time"

"github.com/aws/aws-sdk-go-v2/config"
"github.com/micvbang/go-helpy/sizey"
"github.com/micvbang/simple-event-broker/internal/httphandlers"
"github.com/micvbang/simple-event-broker/internal/infrastructure/httphelpers"
"github.com/micvbang/simple-event-broker/internal/infrastructure/logger"
"github.com/micvbang/simple-event-broker/internal/sebbroker"
"github.com/micvbang/simple-event-broker/internal/sebcache"
"github.com/spf13/cobra"
)

var serveFlags ServeFlags

func init() {
fs := serveCmd.Flags()

fs.IntVar(&serveFlags.logLevel, "log-level", int(logger.LevelInfo), "Log level, info=4, debug=5")

// http
fs.StringVar(&serveFlags.httpListenAddress, "http-address", "127.0.0.1", "Address to listen for HTTP traffic")
fs.IntVar(&serveFlags.httpListenPort, "http-port", 51313, "Port to listen for HTTP traffic")
fs.StringVar(&serveFlags.httpAPIKey, "http-api-key", "api-key", "API key for authorizing HTTP requests (this is not safe and needs to be changed)")

// http debug
fs.BoolVar(&serveFlags.httpEnableDebug, "http-debug-enable", false, "Whether to enable DEBUG endpoints")
fs.StringVar(&serveFlags.httpDebugListenAddress, "http-debug-address", "127.0.0.1", "Address to expose DEBUG endpoints. You very likely want this to remain localhost!")
fs.IntVar(&serveFlags.httpDebugListenPort, "http-debug-port", 5000, "Port to serve DEBUG endpoints on")

// s3
fs.StringVar(&serveFlags.s3BucketName, "s3-bucket", "", "Bucket name")

// caching
fs.StringVar(&serveFlags.cacheDir, "cache-dir", path.Join(os.TempDir(), "seb-cache"), "Local dir to use when caching record batches")
fs.Int64Var(&serveFlags.cacheMaxBytes, "cache-size", 1*sizey.GB, "Maximum number of bytes to keep in the cache (soft limit)")
fs.DurationVar(&serveFlags.cacheEvictionInterval, "cache-eviction-interval", 5*time.Minute, "Amount of time between enforcing maximum cache size")

// batching
fs.DurationVar(&serveFlags.recordBatchBlockTime, "batch-wait-time", time.Second, "Amount of time to wait between receiving first record in batch and committing it")
fs.IntVar(&serveFlags.recordBatchSoftMaxBytes, "batch-bytes-max", 10*sizey.MB, "Soft maximum for the number of bytes to include in each record batch")

// required flags
serveCmd.MarkFlagRequired("s3-bucket")
}

var serveCmd = &cobra.Command{
Use: "serve",
Short: "Start HTTP server",
Long: "Start Seb's HTTP server",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()

flags := serveFlags
log := logger.NewWithLevel(ctx, logger.LogLevel(flags.logLevel))
log.Debugf("flags: %+v", flags)

cache, err := sebcache.NewDiskCache(log, flags.cacheDir)
if err != nil {
log.Fatalf("creating disk cache: %w", err)
}

go sebcache.EvictionLoop(ctx, log.Name("cache eviction"), cache, flags.cacheMaxBytes, flags.cacheEvictionInterval)

blockingS3Broker, err := makeBlockingS3Broker(log, cache, flags.recordBatchSoftMaxBytes, flags.recordBatchBlockTime, flags.s3BucketName)
if err != nil {
log.Fatalf("making blocking s3 broker: %s", err)
}

mux := http.NewServeMux()
httphandlers.RegisterRoutes(log, mux, blockingS3Broker, flags.httpAPIKey)

errs := make(chan error, 8)

go func() {
addr := fmt.Sprintf("%s:%d", flags.httpListenAddress, flags.httpListenPort)
log.Infof("Listening on %s", addr)
errs <- http.ListenAndServe(addr, mux)
}()

if flags.httpEnableDebug {
go func() {
logPprof := log.Name("pprof")
errs <- httphelpers.ListenAndServePprof(logPprof, flags.httpDebugListenAddress, flags.httpDebugListenPort)
}()
}

err = <-errs
log.Errorf("main returned: %s", err)
return err
},
}

func makeBlockingS3Broker(log logger.Logger, cache *sebcache.Cache, bytesSoftMax int, blockTime time.Duration, s3BucketName string) (*sebbroker.Broker, error) {
cfg, err := config.LoadDefaultConfig(context.TODO())
if err != nil {
return nil, fmt.Errorf("creating s3 session: %s", err)
}

s3TopicFactory := sebbroker.NewS3TopicFactory(cfg, s3BucketName, cache)
blockingBatcherFactory := sebbroker.NewBlockingBatcherFactory(blockTime, bytesSoftMax)

broker := sebbroker.New(
log.Name("storage"),
s3TopicFactory,
sebbroker.WithBatcherFactory(blockingBatcherFactory),
)
return broker, nil
}

type ServeFlags struct {
logLevel int

s3BucketName string

httpListenAddress string
httpListenPort int
httpAPIKey string

httpEnableDebug bool
httpDebugListenAddress string
httpDebugListenPort int

cacheDir string
cacheMaxBytes int64
cacheEvictionInterval time.Duration

recordBatchBlockTime time.Duration
recordBatchSoftMaxBytes int
}
9 changes: 9 additions & 0 deletions cmd/seb/root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package main

import (
"github.com/micvbang/simple-event-broker/cmd/seb/app"
)

func main() {
app.Execute()
}
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/klauspost/compress v1.17.8
github.com/micvbang/go-helpy v0.1.20
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.8.1
github.com/stretchr/testify v1.8.4
)

Expand All @@ -29,7 +30,9 @@ require (
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.5 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.28.7 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,25 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.28.7 h1:et3Ta53gotFR4ERLXXHIHl/Uuk1q
github.com/aws/aws-sdk-go-v2/service/sts v1.28.7/go.mod h1:FZf1/nKNEkHdGGJP/cI2MoIMquumuRK6ol3QQJNDxmw=
github.com/aws/smithy-go v1.20.2 h1:tbp628ireGtzcHDDmLT/6ADHidqnwgF57XOXZe6tp4Q=
github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E=
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/micvbang/go-helpy v0.1.20 h1:VcagiZCSEYFw/MuDojNwfNlPIjQooh0xl7AIIN0p9OU=
github.com/micvbang/go-helpy v0.1.20/go.mod h1:9JyNGzneXfG1D3KFGfYXZ4woZa9SgqY3sM0NFOfAMYM=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM=
github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
Expand Down

0 comments on commit e2cf380

Please sign in to comment.