Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: New API with health endpoint #139

Merged
merged 6 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package api

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"

log "github.com/sirupsen/logrus"

"github.com/algorand/conduit/conduit/pipeline"
)

// StatusProvider is a subset of the Pipeline interface required by the health handler.
type StatusProvider interface {
Status() (pipeline.Status, error)
}

func makeHealthHandler(p StatusProvider) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
status, err := p.Status()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, `{"error": "%s"}`, err)
return
}
w.WriteHeader(http.StatusOK)
data, _ := json.Marshal(status)
fmt.Fprint(w, string(data))
}
}

// StartServer starts a http server that exposes a health check endpoint.
winder marked this conversation as resolved.
Show resolved Hide resolved
// A callback is returned that can be used to gracefully shutdown the server.
func StartServer(logger *log.Logger, p StatusProvider, address string) (func(ctx context.Context), error) {
mux := http.NewServeMux()
mux.HandleFunc("/health", makeHealthHandler(p))

srv := &http.Server{
Addr: address,
Handler: mux,
}

go func() {
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Fatalf("failed to start API server: %s", err)
}
}()

shutdownCallback := func(ctx context.Context) {
if err := srv.Shutdown(ctx); err != nil {
logger.Fatalf("failed to shutdown API server: %s", err)
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
}
}
return shutdownCallback, nil
}
103 changes: 103 additions & 0 deletions api/api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package api

import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/require"

"github.com/algorand/conduit/conduit/pipeline"
)

type mockStatusProvider struct {
s pipeline.Status
err error
}

func (m mockStatusProvider) Status() (pipeline.Status, error) {
if m.err != nil {
return pipeline.Status{}, m.err
}
return m.s, nil
}

func TestStartServer_BadAddress(t *testing.T) {
l, h := test.NewNullLogger()
// capture the fatal log if any.
l.ExitFunc = func(int) {}
sp := mockStatusProvider{}

shutdown, err := StartServer(l, sp, "bad address")
defer shutdown(context.Background())
require.NoError(t, err)
time.Sleep(1 * time.Millisecond)

require.Len(t, h.Entries, 1)
require.Equal(t, h.LastEntry().Level, logrus.FatalLevel)
}

func TestStartServer_GracefulShutdown(t *testing.T) {
l, h := test.NewNullLogger()
// capture the fatal log if any.
l.ExitFunc = func(int) {}
sp := mockStatusProvider{}
shutdown, err := StartServer(l, sp, "bad address")
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
defer shutdown(context.Background())
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)
require.Len(t, h.Entries, 0)
}

func TestStartServer_HealthCheck(t *testing.T) {
l, _ := test.NewNullLogger()
// capture the fatal log if any.
l.ExitFunc = func(int) {}
sp := mockStatusProvider{
s: pipeline.Status{
Round: 999,
},
}

// Find an open port...
listener, err := net.Listen("tcp", ":0")
addr := listener.Addr().String()
listener.Close()
require.NoError(t, err)

// Start server.
shutdown, err := StartServer(l, sp, addr)
defer shutdown(context.Background())
require.NoError(t, err)

// Make request.
resp, err := http.Get("http://" + addr + "/health")
require.NoError(t, err)

// Make sure we got the right response.
require.Equal(t, http.StatusOK, resp.StatusCode)
var respStatus pipeline.Status
json.NewDecoder(resp.Body).Decode(&respStatus)
require.NoError(t, err)
require.Equal(t, sp.s, respStatus)
}

func TestHealthHandlerError(t *testing.T) {
sp := mockStatusProvider{
err: fmt.Errorf("some error"),
}
handler := makeHealthHandler(sp)
rec := httptest.NewRecorder()
handler(rec, nil)

// validate response
resp := rec.Result()
require.Equal(t, http.StatusInternalServerError, resp.StatusCode)
require.Contains(t, rec.Body.String(), "some error")
}
6 changes: 6 additions & 0 deletions conduit/data/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ type Telemetry struct {
Password string `yaml:"password"`
}

// API defines parameters for the Conduit API server.
type API struct {
Address string `yaml:"addr"`
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
}

// Config stores configuration specific to the conduit pipeline
type Config struct {
// ConduitArgs are the program inputs. Should not be serialized for config.
Expand All @@ -72,6 +77,7 @@ type Config struct {
Processors []NameConfigPair `yaml:"processors"`
Exporter NameConfigPair `yaml:"exporter"`
Metrics Metrics `yaml:"metrics"`
API API `yaml:"api"`
// RetryCount is the number of retries to perform for an error in the pipeline
RetryCount uint64 `yaml:"retry-count"`
// RetryDelay is a duration amount interpreted from a string
Expand Down
14 changes: 14 additions & 0 deletions conduit/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"path"
"runtime/pprof"
"sync"
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -37,6 +38,12 @@ type Pipeline interface {
Stop()
Error() error
Wait()
Status() (Status, error)
}

// Status is a struct that contains the current pipeline status.
type Status struct {
Round uint64 `json:"round"`
}

type pipelineImpl struct {
Expand Down Expand Up @@ -532,6 +539,13 @@ func (p *pipelineImpl) Wait() {
p.wg.Wait()
}

func (p *pipelineImpl) Status() (Status, error) {
rnd := atomic.LoadUint64(&p.pipelineMetadata.NextRound)
algochoi marked this conversation as resolved.
Show resolved Hide resolved
return Status{
Round: rnd,
}, nil
}

// start a http server serving /metrics
func (p *pipelineImpl) startMetricsServer() {
http.Handle("/metrics", promhttp.Handler())
Expand Down
15 changes: 15 additions & 0 deletions pkg/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"

"github.com/algorand/conduit/api"
"github.com/algorand/conduit/conduit/data"
"github.com/algorand/conduit/conduit/loggers"
"github.com/algorand/conduit/conduit/pipeline"
Expand Down Expand Up @@ -101,6 +102,20 @@ func runConduitCmdWithConfig(args *data.Args) error {
}
pipeline.Start()
defer pipeline.Stop()

// Start server
if pCfg.API.Address != "" {
shutdown, err := api.StartServer(logger, pipeline, pCfg.API.Address)
if err != nil {
// Suppress log, it is about to be printed to stderr.
if pCfg.LogFile != "" {
logger.Error(err)
}
return fmt.Errorf("failed to start API server: %w", err)
}
defer shutdown(context.Background())
}

pipeline.Wait()
return pipeline.Error()
}
Expand Down
15 changes: 4 additions & 11 deletions pkg/cli/internal/initialize/conduit.yml.example
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ retry-delay: "1s"
# Whether or not to print the conduit banner on startup.
hide-banner: false

# When the address is not empty information is available on '/health'
api:
addr: ":8981"

# When enabled prometheus metrics are available on '/metrics'
metrics:
mode: OFF
Expand All @@ -33,14 +37,3 @@ processors:
# An exporter is defined to do something with the data.
exporter:
%s

# Enable telemetry for conduit
telemetry:
enabled: false

# By default the following fields will be configured to send data to Algorand.
# To store your own telemetry events, they can be overridden.
# uri: ""
# index: ""
# username: ""
# password: ""