From a7c4d6ba3f145f935cfbd4a016f018fc3b3d439d Mon Sep 17 00:00:00 2001 From: Will Winder Date: Wed, 16 Aug 2023 12:56:46 -0400 Subject: [PATCH 1/5] Add health endpoint. Remove telemetry from default template. It is not yet available. Add tests and minor refactoring for testability. Address linter complaints. Revert some unnecessary refactoring. --- api/api.go | 57 ++++++++++ api/api_test.go | 103 ++++++++++++++++++ conduit/data/config.go | 6 + conduit/pipeline/pipeline.go | 14 +++ pkg/cli/cli.go | 15 +++ .../internal/initialize/conduit.yml.example | 15 +-- 6 files changed, 199 insertions(+), 11 deletions(-) create mode 100644 api/api.go create mode 100644 api/api_test.go diff --git a/api/api.go b/api/api.go new file mode 100644 index 00000000..9bba5700 --- /dev/null +++ b/api/api.go @@ -0,0 +1,57 @@ +package api + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + + log "github.com/sirupsen/logrus" + + "github.com/algorand/conduit/conduit/pipeline" +) + +// StatusProvider is a subset of the Pipeline interface required by the health handler. +type StatusProvider interface { + Status() (pipeline.Status, error) +} + +func makeHealthHandler(p StatusProvider) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + status, err := p.Status() + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprintf(w, `{"error": "%s"}`, err) + return + } + w.WriteHeader(http.StatusOK) + data, _ := json.Marshal(status) + fmt.Fprint(w, string(data)) + } +} + +// StartServer starts a http server that exposes a health check endpoint. +// A callback is returned that can be used to gracefully shutdown the server. +func StartServer(logger *log.Logger, p StatusProvider, address string) (func(ctx context.Context), error) { + mux := http.NewServeMux() + mux.HandleFunc("/health", makeHealthHandler(p)) + + srv := &http.Server{ + Addr: address, + Handler: mux, + } + + go func() { + if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + logger.Fatalf("failed to start API server: %s", err) + } + }() + + shutdownCallback := func(ctx context.Context) { + if err := srv.Shutdown(ctx); err != nil { + logger.Fatalf("failed to shutdown API server: %s", err) + } + } + return shutdownCallback, nil +} diff --git a/api/api_test.go b/api/api_test.go new file mode 100644 index 00000000..92cfc714 --- /dev/null +++ b/api/api_test.go @@ -0,0 +1,103 @@ +package api + +import ( + "context" + "encoding/json" + "fmt" + "net" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/require" + + "github.com/algorand/conduit/conduit/pipeline" +) + +type mockStatusProvider struct { + s pipeline.Status + err error +} + +func (m mockStatusProvider) Status() (pipeline.Status, error) { + if m.err != nil { + return pipeline.Status{}, m.err + } + return m.s, nil +} + +func TestStartServer_BadAddress(t *testing.T) { + l, h := test.NewNullLogger() + // capture the fatal log if any. + l.ExitFunc = func(int) {} + sp := mockStatusProvider{} + + shutdown, err := StartServer(l, sp, "bad address") + defer shutdown(context.Background()) + require.NoError(t, err) + time.Sleep(1 * time.Millisecond) + + require.Len(t, h.Entries, 1) + require.Equal(t, h.LastEntry().Level, logrus.FatalLevel) +} + +func TestStartServer_GracefulShutdown(t *testing.T) { + l, h := test.NewNullLogger() + // capture the fatal log if any. + l.ExitFunc = func(int) {} + sp := mockStatusProvider{} + shutdown, err := StartServer(l, sp, "bad address") + defer shutdown(context.Background()) + require.NoError(t, err) + require.Len(t, h.Entries, 0) +} + +func TestStartServer_HealthCheck(t *testing.T) { + l, _ := test.NewNullLogger() + // capture the fatal log if any. + l.ExitFunc = func(int) {} + sp := mockStatusProvider{ + s: pipeline.Status{ + Round: 999, + }, + } + + // Find an open port... + listener, err := net.Listen("tcp", ":0") + addr := listener.Addr().String() + listener.Close() + require.NoError(t, err) + + // Start server. + shutdown, err := StartServer(l, sp, addr) + defer shutdown(context.Background()) + require.NoError(t, err) + + // Make request. + resp, err := http.Get("http://" + addr + "/health") + require.NoError(t, err) + + // Make sure we got the right response. + require.Equal(t, http.StatusOK, resp.StatusCode) + var respStatus pipeline.Status + json.NewDecoder(resp.Body).Decode(&respStatus) + require.NoError(t, err) + require.Equal(t, sp.s, respStatus) +} + +func TestHealthHandlerError(t *testing.T) { + sp := mockStatusProvider{ + err: fmt.Errorf("some error"), + } + handler := makeHealthHandler(sp) + rec := httptest.NewRecorder() + handler(rec, nil) + + // validate response + resp := rec.Result() + require.Equal(t, http.StatusInternalServerError, resp.StatusCode) + require.Contains(t, rec.Body.String(), "some error") +} diff --git a/conduit/data/config.go b/conduit/data/config.go index e82b9158..9cd94a96 100644 --- a/conduit/data/config.go +++ b/conduit/data/config.go @@ -56,6 +56,11 @@ type Telemetry struct { Password string `yaml:"password"` } +// API defines parameters for the Conduit API server. +type API struct { + Address string `yaml:"addr"` +} + // Config stores configuration specific to the conduit pipeline type Config struct { // ConduitArgs are the program inputs. Should not be serialized for config. @@ -72,6 +77,7 @@ type Config struct { Processors []NameConfigPair `yaml:"processors"` Exporter NameConfigPair `yaml:"exporter"` Metrics Metrics `yaml:"metrics"` + API API `yaml:"api"` // RetryCount is the number of retries to perform for an error in the pipeline RetryCount uint64 `yaml:"retry-count"` // RetryDelay is a duration amount interpreted from a string diff --git a/conduit/pipeline/pipeline.go b/conduit/pipeline/pipeline.go index b77529d4..866fab61 100644 --- a/conduit/pipeline/pipeline.go +++ b/conduit/pipeline/pipeline.go @@ -9,6 +9,7 @@ import ( "path" "runtime/pprof" "sync" + "sync/atomic" "time" "github.com/prometheus/client_golang/prometheus" @@ -37,6 +38,12 @@ type Pipeline interface { Stop() Error() error Wait() + Status() (Status, error) +} + +// Status is a struct that contains the current pipeline status. +type Status struct { + Round uint64 `json:"round"` } type pipelineImpl struct { @@ -532,6 +539,13 @@ func (p *pipelineImpl) Wait() { p.wg.Wait() } +func (p *pipelineImpl) Status() (Status, error) { + rnd := atomic.LoadUint64(&p.pipelineMetadata.NextRound) + return Status{ + Round: rnd, + }, nil +} + // start a http server serving /metrics func (p *pipelineImpl) startMetricsServer() { http.Handle("/metrics", promhttp.Handler()) diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index 840af6f5..a442bf7a 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -10,6 +10,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "github.com/algorand/conduit/api" "github.com/algorand/conduit/conduit/data" "github.com/algorand/conduit/conduit/loggers" "github.com/algorand/conduit/conduit/pipeline" @@ -101,6 +102,20 @@ func runConduitCmdWithConfig(args *data.Args) error { } pipeline.Start() defer pipeline.Stop() + + // Start server + if pCfg.API.Address != "" { + shutdown, err := api.StartServer(logger, pipeline, pCfg.API.Address) + if err != nil { + // Suppress log, it is about to be printed to stderr. + if pCfg.LogFile != "" { + logger.Error(err) + } + return fmt.Errorf("failed to start API server: %w", err) + } + defer shutdown(context.Background()) + } + pipeline.Wait() return pipeline.Error() } diff --git a/pkg/cli/internal/initialize/conduit.yml.example b/pkg/cli/internal/initialize/conduit.yml.example index 56709af6..ffe64324 100644 --- a/pkg/cli/internal/initialize/conduit.yml.example +++ b/pkg/cli/internal/initialize/conduit.yml.example @@ -17,6 +17,10 @@ retry-delay: "1s" # Whether or not to print the conduit banner on startup. hide-banner: false +# When the address is not empty information is available on '/health' +api: + addr: ":8981" + # When enabled prometheus metrics are available on '/metrics' metrics: mode: OFF @@ -33,14 +37,3 @@ processors: # An exporter is defined to do something with the data. exporter: %s - -# Enable telemetry for conduit -telemetry: - enabled: false - - # By default the following fields will be configured to send data to Algorand. - # To store your own telemetry events, they can be overridden. - # uri: "" - # index: "" - # username: "" - # password: "" From cd408aa97d258f2f99facc7688ebe2dcba769218 Mon Sep 17 00:00:00 2001 From: Will Winder Date: Thu, 17 Aug 2023 15:21:48 -0400 Subject: [PATCH 2/5] Use a mutex instead of atomics. --- conduit/pipeline/pipeline.go | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/conduit/pipeline/pipeline.go b/conduit/pipeline/pipeline.go index 866fab61..697a4762 100644 --- a/conduit/pipeline/pipeline.go +++ b/conduit/pipeline/pipeline.go @@ -9,7 +9,6 @@ import ( "path" "runtime/pprof" "sync" - "sync/atomic" "time" "github.com/prometheus/client_golang/prometheus" @@ -64,6 +63,9 @@ type pipelineImpl struct { completeCallback []conduit.OnCompleteFunc pipelineMetadata state + + statusMu sync.Mutex + status Status } func (p *pipelineImpl) Error() error { @@ -375,6 +377,10 @@ func (p *pipelineImpl) Init() error { go p.startMetricsServer() } + p.statusMu.Lock() + defer p.statusMu.Unlock() + p.status.Round = p.pipelineMetadata.NextRound + return err } @@ -506,6 +512,9 @@ func (p *pipelineImpl) Start() { // Increment Round, update metadata p.pipelineMetadata.NextRound++ + p.statusMu.Lock() + p.status.Round = p.pipelineMetadata.NextRound + p.statusMu.Unlock() err = p.pipelineMetadata.encodeToFile(p.cfg.ConduitArgs.ConduitDataDir) if err != nil { p.logger.Errorf("%v", err) @@ -540,10 +549,10 @@ func (p *pipelineImpl) Wait() { } func (p *pipelineImpl) Status() (Status, error) { - rnd := atomic.LoadUint64(&p.pipelineMetadata.NextRound) - return Status{ - Round: rnd, - }, nil + p.statusMu.Lock() + ret := p.status + p.statusMu.Unlock() + return ret, nil } // start a http server serving /metrics From c3ee521e4a0899b069a9a2bb8d6a0d74151d7d1d Mon Sep 17 00:00:00 2001 From: Will Winder Date: Thu, 17 Aug 2023 15:55:07 -0400 Subject: [PATCH 3/5] Update api/api.go Co-authored-by: Zeph Grunschlag --- api/api.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/api.go b/api/api.go index 9bba5700..ad813cfe 100644 --- a/api/api.go +++ b/api/api.go @@ -31,7 +31,7 @@ func makeHealthHandler(p StatusProvider) func(w http.ResponseWriter, r *http.Req } } -// StartServer starts a http server that exposes a health check endpoint. +// StartServer starts an http server that exposes a health check endpoint. // A callback is returned that can be used to gracefully shutdown the server. func StartServer(logger *log.Logger, p StatusProvider, address string) (func(ctx context.Context), error) { mux := http.NewServeMux() From d05a34fbeb123b613f4601680fe9f82a222fb90f Mon Sep 17 00:00:00 2001 From: Will Winder Date: Thu, 17 Aug 2023 17:05:59 -0400 Subject: [PATCH 4/5] Don't exit if the shutdown callback is called twice. --- api/api.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/api.go b/api/api.go index ad813cfe..09c02591 100644 --- a/api/api.go +++ b/api/api.go @@ -49,7 +49,7 @@ func StartServer(logger *log.Logger, p StatusProvider, address string) (func(ctx }() shutdownCallback := func(ctx context.Context) { - if err := srv.Shutdown(ctx); err != nil { + if err := srv.Shutdown(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) { logger.Fatalf("failed to shutdown API server: %s", err) } } From bf2092a5fbc612a8734727709463882a64a80409 Mon Sep 17 00:00:00 2001 From: Will Winder Date: Tue, 22 Aug 2023 12:37:02 -0400 Subject: [PATCH 5/5] Fix build --- pkg/cli/cli.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index 5ee1212d..de9cb2f9 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -105,7 +105,7 @@ func runConduitCmdWithConfig(args *data.Args) error { // Start server if pCfg.API.Address != "" { - shutdown, err := api.StartServer(logger, pipeline, pCfg.API.Address) + shutdown, err := api.StartServer(logger, pline, pCfg.API.Address) if err != nil { // Suppress log, it is about to be printed to stderr. if pCfg.LogFile != "" {