diff --git a/cmd/seb-api/main.go b/cmd/seb-api/main.go new file mode 100644 index 0000000..b7ae20f --- /dev/null +++ b/cmd/seb-api/main.go @@ -0,0 +1,195 @@ +package main + +import ( + "context" + "errors" + "flag" + "fmt" + "io" + "net/http" + "os" + "path" + "time" + + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/micvbang/go-helpy/uint64y" + "github.com/micvbang/simple-event-broker/internal/infrastructure/logger" + "github.com/micvbang/simple-event-broker/internal/recordbatch" + "github.com/micvbang/simple-event-broker/internal/storage" +) + +const ( + topicNameKey = "topic-name" + recordIDKey = "record-id" +) + +func main() { + ctx := context.Background() + log := logger.NewWithLevel(ctx, logger.LevelDebug) + + flags := parseFlags() + + log.Debugf("flags: %v", flags) + + mux := http.NewServeMux() + + contextCreator := func() context.Context { + ctx, cancel := context.WithTimeout(context.Background(), flags.sleepTime) + go func() { + time.Sleep(flags.sleepTime * 2) + cancel() + }() + + return ctx + } + + session, err := session.NewSession() + if err != nil { + log.Fatalf("creating s3 session: %s", err) + } + + s3Storage := func(log logger.Logger, topicName string) (*storage.TopicStorage, error) { + return storage.NewS3TopicStorage(log.Name("s3 storage"), storage.S3StorageInput{ + S3: s3.New(session), + BucketName: flags.bucketName, + RootDir: "/tmp/recordbatch", + TopicName: topicName, + }) + } + + blockingBatcher := func(log logger.Logger, ts *storage.TopicStorage) storage.RecordBatcher { + return recordbatch.NewBlockingBatcher(log.Name("blocking batcher"), contextCreator, func(b recordbatch.RecordBatch) error { + t0 := time.Now() + err := ts.AddRecordBatch(b) + log.Debugf("persisting to s3: %v", time.Since(t0)) + return err + }) + } + + blockingS3Storage := storage.NewStorage(log.Name("storage"), s3Storage, blockingBatcher) + + mux.Handle("/add", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + log.Debugf("hit %s", r.URL) + + err := r.ParseForm() + if err != nil { + log.Errorf("parsing form: %s", err.Error()) + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprintf(w, "failed to parse form: %s", err) + return + } + + topicName := r.Form.Get(topicNameKey) + if len(topicName) == 0 { + log.Errorf("no topic name") + w.WriteHeader(http.StatusBadRequest) + fmt.Fprintf(w, "query param '%s' required", topicNameKey) + return + } + + bs, err := io.ReadAll(r.Body) + if err != nil { + log.Errorf("failed to read body: %s", err.Error()) + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprint(w, err.Error()) + return + } + + err = blockingS3Storage.AddRecord(topicName, bs) + if err != nil { + log.Errorf("failed to add: %s", err.Error()) + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprint(w, err.Error()) + return + } + })) + + mux.Handle("/get", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + log.Debugf("hit %s", r.URL) + + err := r.ParseForm() + if err != nil { + log.Errorf("parsing form: %s", err.Error()) + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprintf(w, "failed to parse form: %s", err) + return + } + + recordID, err := uint64y.FromString(r.Form.Get(recordIDKey)) + if err != nil { + log.Errorf("parsing record id key: %s", err.Error()) + w.WriteHeader(http.StatusBadRequest) + fmt.Fprintf(w, "url parameter '%s' required, must be a number: %s", recordIDKey, err) + w.Write([]byte(err.Error())) + return + } + + topicName := r.Form.Get(topicNameKey) + if len(topicName) == 0 { + log.Errorf("no topic name") + w.WriteHeader(http.StatusBadRequest) + fmt.Fprintf(w, "query param '%s' required", topicNameKey) + return + } + + record, err := blockingS3Storage.GetRecord(topicName, recordID) + if err != nil { + if errors.Is(err, storage.ErrOutOfBounds) { + log.Debugf("not found") + w.WriteHeader(http.StatusNotFound) + return + } + + log.Errorf("reading record: %s", err.Error()) + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprintf(w, "failed to read record '%d': %s", recordID, err) + } + w.Write(record) + })) + + addr := "127.0.0.1:8080" + log.Infof("Listening on %s", addr) + err = http.ListenAndServe(addr, mux) + log.Fatalf("ListenAndServe returned: %s", err) +} + +type flags struct { + bucketName string + sleepTime time.Duration + cacheDir string +} + +func parseFlags() flags { + fs := flag.NewFlagSet("seb-api", flag.ExitOnError) + + f := flags{} + + fs.StringVar(&f.bucketName, "b", "simple-commit-log-delete-me", "Bucket name") + fs.DurationVar(&f.sleepTime, "s", time.Second, "Amount of time to wait between receiving first message in batch and committing it") + fs.StringVar(&f.cacheDir, "c", path.Join(os.TempDir(), "seb"), "Local dir to use when caching record batches") + + err := fs.Parse(os.Args[1:]) + if err != nil { + fs.Usage() + os.Exit(1) + } + + required := []struct { + value string + name string + }{ + {name: "bucket name", value: f.bucketName}, + } + + for _, r := range required { + if len(r.value) == 0 { + fmt.Printf("ERROR: %s required\n", r.name) + fs.Usage() + os.Exit(1) + } + } + + return f +}