Skip to content

Commit

Permalink
cmd: add seb-api, simple web server
Browse files Browse the repository at this point in the history
  • Loading branch information
micvbang committed Apr 8, 2024
1 parent e25a3e8 commit 5e95693
Show file tree
Hide file tree
Showing 9 changed files with 267 additions and 9 deletions.
127 changes: 127 additions & 0 deletions cmd/seb-api/api/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package api

import (
"context"
"flag"
"fmt"
"net/http"
"os"
"path"
"time"

"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/micvbang/simple-event-broker/internal/httphandlers"
"github.com/micvbang/simple-event-broker/internal/infrastructure/logger"
"github.com/micvbang/simple-event-broker/internal/recordbatch"
"github.com/micvbang/simple-event-broker/internal/storage"
)

func Run() {
ctx := context.Background()
log := logger.NewWithLevel(ctx, logger.LevelDebug)

flags := parseFlags()

log.Debugf("flags: %v", flags)

blockingS3Storage, err := makeBlockingS3Storage(log, flags.sleepTime, flags.bucketName)
if err != nil {
log.Fatalf("making blocking s3 storage: %s", err)
}

mux := http.NewServeMux()
mux.HandleFunc("POST /add", httphandlers.AddRecord(log, blockingS3Storage))
mux.HandleFunc("/get", httphandlers.GetRecord(log, blockingS3Storage))

mux.HandleFunc("GET /path/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, "got path\n")
})

addr := fmt.Sprintf("%s:%d", flags.httpListenAddress, flags.httpListenPort)
log.Infof("Listening on %s", addr)
err = http.ListenAndServe(addr, mux)
log.Fatalf("ListenAndServe returned: %s", err)
}

func makeBlockingS3Storage(log logger.Logger, sleepTime time.Duration, s3BucketName string) (storage.Storage, error) {
contextCreator := func() context.Context {
ctx, cancel := context.WithTimeout(context.Background(), sleepTime)
go func() {
// We have to cancel the context. Just ensure that it's cancelled at
// some point in the future.
time.Sleep(sleepTime * 2)
cancel()
}()

return ctx
}

session, err := session.NewSession()
if err != nil {
return nil, fmt.Errorf("creating s3 session: %s", err)
}

s3TopicStorage := func(log logger.Logger, topicName string) (*storage.TopicStorage, error) {
return storage.NewS3TopicStorage(log.Name("s3 storage"), storage.S3StorageInput{
S3: s3.New(session),
BucketName: s3BucketName,
RootDir: "/tmp/recordbatch",
TopicName: topicName,
})
}

blockingBatcher := func(log logger.Logger, ts *storage.TopicStorage) storage.RecordBatcher {
return recordbatch.NewBlockingBatcher(log.Name("blocking batcher"), contextCreator, func(b recordbatch.RecordBatch) error {
t0 := time.Now()
err := ts.AddRecordBatch(b)
log.Debugf("persisting to s3: %v", time.Since(t0))
return err
})
}

return storage.New(log.Name("storage"), s3TopicStorage, blockingBatcher), nil
}

type flags struct {
bucketName string
sleepTime time.Duration
cacheDir string
httpListenAddress string
httpListenPort int
}

func parseFlags() flags {
fs := flag.NewFlagSet("seb-api", flag.ExitOnError)

f := flags{}

fs.StringVar(&f.bucketName, "b", "simple-commit-log-delete-me", "Bucket name")
fs.DurationVar(&f.sleepTime, "s", time.Second, "Amount of time to wait between receiving first message in batch and committing it")
fs.StringVar(&f.cacheDir, "c", path.Join(os.TempDir(), "seb"), "Local dir to use when caching record batches")
fs.StringVar(&f.httpListenAddress, "l", "127.0.0.1", "Address to listen for HTTP traffic")
fs.IntVar(&f.httpListenPort, "p", 8080, "Port to listen for HTTP traffic")

err := fs.Parse(os.Args[1:])
if err != nil {
fs.Usage()
os.Exit(1)
}

required := []struct {
value string
name string
}{
{name: "bucket name", value: f.bucketName},
}

for _, r := range required {
if len(r.value) == 0 {
fmt.Printf("ERROR: %s required\n", r.name)
fs.Usage()
os.Exit(1)
}
}

return f
}
9 changes: 9 additions & 0 deletions cmd/seb-api/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package main

import (
"github.com/micvbang/simple-event-broker/cmd/seb-api/api"
)

func main() {
api.Run()
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/micvbang/simple-event-broker

go 1.18
go 1.22

require (
github.com/aws/aws-sdk-go v1.49.8 // indirect
Expand Down
39 changes: 39 additions & 0 deletions internal/httphandlers/addrecord.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package httphandlers

import (
"fmt"
"io"
"net/http"

"github.com/micvbang/simple-event-broker/internal/infrastructure/logger"
"github.com/micvbang/simple-event-broker/internal/storage"
)

func AddRecord(log logger.Logger, s storage.Storage) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
log.Debugf("hit %s", r.URL)

params, err := parseQueryParams(r, []string{topicNameKey})
if err != nil {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprint(w, err.Error())
}

bs, err := io.ReadAll(r.Body)
if err != nil {
log.Errorf("failed to read body: %s", err.Error())
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprint(w, err.Error())
return
}

err = s.AddRecord(params[topicNameKey], bs)
if err != nil {
log.Errorf("failed to add: %s", err.Error())
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprint(w, err.Error())
return
}
}
}
46 changes: 46 additions & 0 deletions internal/httphandlers/getrecord.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package httphandlers

import (
"errors"
"fmt"
"net/http"

"github.com/micvbang/go-helpy/uint64y"
"github.com/micvbang/simple-event-broker/internal/infrastructure/logger"
"github.com/micvbang/simple-event-broker/internal/storage"
)

func GetRecord(log logger.Logger, s storage.Storage) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
log.Debugf("hit %s", r.URL)

params, err := parseQueryParams(r, []string{recordIDKey, topicNameKey})
if err != nil {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprint(w, err.Error())
}

recordID, err := uint64y.FromString(params[recordIDKey])
if err != nil {
log.Errorf("parsing record id key: %s", err.Error())
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "url parameter '%s', must be a number: %s", recordIDKey, err)
w.Write([]byte(err.Error()))
return
}

record, err := s.GetRecord(params[topicNameKey], recordID)
if err != nil {
if errors.Is(err, storage.ErrOutOfBounds) {
log.Debugf("not found")
w.WriteHeader(http.StatusNotFound)
return
}

log.Errorf("reading record: %s", err.Error())
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "failed to read record '%d': %s", recordID, err)
}
w.Write(record)
}
}
32 changes: 32 additions & 0 deletions internal/httphandlers/shared.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package httphandlers

import (
"errors"
"fmt"
"net/http"
)

const (
topicNameKey = "topic-name"
recordIDKey = "record-id"
)

func parseQueryParams(r *http.Request, requiredParams []string) (map[string]string, error) {
err := r.ParseForm()
if err != nil {
return nil, fmt.Errorf("parsing form: %s", err)
}

errs := []error{}
outputs := make(map[string]string, len(requiredParams))
for _, param := range requiredParams {
value := r.Form.Get(param)
if len(value) == 0 {
errs = append(errs, fmt.Errorf("query parameter '%s' required", param))
}

outputs[param] = value
}

return outputs, errors.Join(errs...)
}
2 changes: 0 additions & 2 deletions internal/storage/disktopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"github.com/micvbang/simple-event-broker/internal/infrastructure/logger"
)

const recordBatchExtension = ".record_batch"

type DiskTopicStorage struct{}

func NewDiskTopicStorage(log logger.Logger, rootDir string, topic string) (*TopicStorage, error) {
Expand Down
17 changes: 11 additions & 6 deletions internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@ type topicBatcher struct {
storage *TopicStorage
}

type Storage struct {
type Storage interface {
AddRecord(topicName string, record recordbatch.Record) error
GetRecord(topicName string, recordID uint64) (recordbatch.Record, error)
}

type storage struct {
log logger.Logger

autoCreateTopics bool
Expand All @@ -28,8 +33,8 @@ type Storage struct {
topicBatcher map[string]topicBatcher
}

func NewStorage(log logger.Logger, createTopicStorage func(log logger.Logger, topicName string) (*TopicStorage, error), createBatcher func(logger.Logger, *TopicStorage) RecordBatcher) *Storage {
return &Storage{
func New(log logger.Logger, createTopicStorage func(log logger.Logger, topicName string) (*TopicStorage, error), createBatcher func(logger.Logger, *TopicStorage) RecordBatcher) Storage {
return &storage{
autoCreateTopics: true,
createTopicStorage: createTopicStorage,
createBatcher: createBatcher,
Expand All @@ -39,7 +44,7 @@ func NewStorage(log logger.Logger, createTopicStorage func(log logger.Logger, to
}
}

func (s *Storage) AddRecord(topicName string, record recordbatch.Record) error {
func (s *storage) AddRecord(topicName string, record recordbatch.Record) error {
tb, err := s.getTopicBatcher(topicName)
if err != nil {
return err
Expand All @@ -52,7 +57,7 @@ func (s *Storage) AddRecord(topicName string, record recordbatch.Record) error {
return nil
}

func (s *Storage) GetRecord(topicName string, recordID uint64) (recordbatch.Record, error) {
func (s *storage) GetRecord(topicName string, recordID uint64) (recordbatch.Record, error) {
tb, err := s.getTopicBatcher(topicName)
if err != nil {
return nil, err
Expand All @@ -61,7 +66,7 @@ func (s *Storage) GetRecord(topicName string, recordID uint64) (recordbatch.Reco
return tb.storage.ReadRecord(recordID)
}

func (s *Storage) getTopicBatcher(topicName string) (topicBatcher, error) {
func (s *storage) getTopicBatcher(topicName string) (topicBatcher, error) {
s.mu.Lock()
defer s.mu.Unlock()

Expand Down
2 changes: 2 additions & 0 deletions internal/storage/topicstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ func readRecordBatchHeader(backingStorage BackingStorage, topicPath string, reco
return rb.Header, nil
}

const recordBatchExtension = ".record_batch"

func listRecordBatchIDs(backingStorage BackingStorage, topicPath string) ([]uint64, error) {
files, err := backingStorage.ListFiles(topicPath, recordBatchExtension)
if err != nil {
Expand Down

0 comments on commit 5e95693

Please sign in to comment.