Skip to content

Commit

Permalink
WIP: cmd: add seb-api dummy
Browse files Browse the repository at this point in the history
  • Loading branch information
micvbang committed Apr 8, 2024
1 parent 3266965 commit 6c1675d
Showing 1 changed file with 195 additions and 0 deletions.
195 changes: 195 additions & 0 deletions cmd/seb-api/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package main

import (
"context"
"errors"
"flag"
"fmt"
"io"
"net/http"
"os"
"path"
"time"

"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/micvbang/go-helpy/uint64y"
"github.com/micvbang/simple-event-broker/internal/infrastructure/logger"
"github.com/micvbang/simple-event-broker/internal/recordbatch"
"github.com/micvbang/simple-event-broker/internal/storage"
)

const (
topicNameKey = "topic-name"
recordIDKey = "record-id"
)

func main() {
ctx := context.Background()
log := logger.NewWithLevel(ctx, logger.LevelDebug)

flags := parseFlags()

log.Debugf("flags: %v", flags)

mux := http.NewServeMux()

contextCreator := func() context.Context {
ctx, cancel := context.WithTimeout(context.Background(), flags.sleepTime)
go func() {
time.Sleep(flags.sleepTime * 2)
cancel()
}()

return ctx
}

session, err := session.NewSession()
if err != nil {
log.Fatalf("creating s3 session: %s", err)
}

s3Storage := func(log logger.Logger, topicName string) (*storage.TopicStorage, error) {
return storage.NewS3TopicStorage(log.Name("s3 storage"), storage.S3StorageInput{
S3: s3.New(session),
BucketName: flags.bucketName,
RootDir: "/tmp/recordbatch",
TopicName: topicName,
})
}

blockingBatcher := func(log logger.Logger, ts *storage.TopicStorage) storage.RecordBatcher {
return recordbatch.NewBlockingBatcher(log.Name("blocking batcher"), contextCreator, func(b recordbatch.RecordBatch) error {
t0 := time.Now()
err := ts.AddRecordBatch(b)
log.Debugf("persisting to s3: %v", time.Since(t0))
return err
})
}

blockingS3Storage := storage.NewStorage(log.Name("storage"), s3Storage, blockingBatcher)

mux.Handle("/add", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
log.Debugf("hit %s", r.URL)

err := r.ParseForm()
if err != nil {
log.Errorf("parsing form: %s", err.Error())
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "failed to parse form: %s", err)
return
}

topicName := r.Form.Get(topicNameKey)
if len(topicName) == 0 {
log.Errorf("no topic name")
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "query param '%s' required", topicNameKey)
return
}

bs, err := io.ReadAll(r.Body)
if err != nil {
log.Errorf("failed to read body: %s", err.Error())
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprint(w, err.Error())
return
}

err = blockingS3Storage.AddRecord(topicName, bs)
if err != nil {
log.Errorf("failed to add: %s", err.Error())
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprint(w, err.Error())
return
}
}))

mux.Handle("/get", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
log.Debugf("hit %s", r.URL)

err := r.ParseForm()
if err != nil {
log.Errorf("parsing form: %s", err.Error())
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "failed to parse form: %s", err)
return
}

recordID, err := uint64y.FromString(r.Form.Get(recordIDKey))
if err != nil {
log.Errorf("parsing record id key: %s", err.Error())
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "url parameter '%s' required, must be a number: %s", recordIDKey, err)
w.Write([]byte(err.Error()))
return
}

topicName := r.Form.Get(topicNameKey)
if len(topicName) == 0 {
log.Errorf("no topic name")
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "query param '%s' required", topicNameKey)
return
}

record, err := blockingS3Storage.GetRecord(topicName, recordID)
if err != nil {
if errors.Is(err, storage.ErrOutOfBounds) {
log.Debugf("not found")
w.WriteHeader(http.StatusNotFound)
return
}

log.Errorf("reading record: %s", err.Error())
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "failed to read record '%d': %s", recordID, err)
}
w.Write(record)
}))

addr := "127.0.0.1:8080"
log.Infof("Listening on %s", addr)
err = http.ListenAndServe(addr, mux)
log.Fatalf("ListenAndServe returned: %s", err)
}

type flags struct {
bucketName string
sleepTime time.Duration
cacheDir string
}

func parseFlags() flags {
fs := flag.NewFlagSet("seb-api", flag.ExitOnError)

f := flags{}

fs.StringVar(&f.bucketName, "b", "simple-commit-log-delete-me", "Bucket name")
fs.DurationVar(&f.sleepTime, "s", time.Second, "Amount of time to wait between receiving first message in batch and committing it")
fs.StringVar(&f.cacheDir, "c", path.Join(os.TempDir(), "seb"), "Local dir to use when caching record batches")

err := fs.Parse(os.Args[1:])
if err != nil {
fs.Usage()
os.Exit(1)
}

required := []struct {
value string
name string
}{
{name: "bucket name", value: f.bucketName},
}

for _, r := range required {
if len(r.value) == 0 {
fmt.Printf("ERROR: %s required\n", r.name)
fs.Usage()
os.Exit(1)
}
}

return f
}

0 comments on commit 6c1675d

Please sign in to comment.