Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: New API with health endpoint #139

Merged
merged 6 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package api

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"

log "github.com/sirupsen/logrus"

"github.com/algorand/conduit/conduit/pipeline"
)

// StatusProvider is a subset of the Pipeline interface required by the health handler.
type StatusProvider interface {
Status() (pipeline.Status, error)
}

func makeHealthHandler(p StatusProvider) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
status, err := p.Status()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, `{"error": "%s"}`, err)
return
}
w.WriteHeader(http.StatusOK)
data, _ := json.Marshal(status)
fmt.Fprint(w, string(data))
}
}

// StartServer starts an http server that exposes a health check endpoint.
// A callback is returned that can be used to gracefully shutdown the server.
func StartServer(logger *log.Logger, p StatusProvider, address string) (func(ctx context.Context), error) {
mux := http.NewServeMux()
mux.HandleFunc("/health", makeHealthHandler(p))

srv := &http.Server{
Addr: address,
Handler: mux,
}

go func() {
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Fatalf("failed to start API server: %s", err)
}
}()

shutdownCallback := func(ctx context.Context) {
if err := srv.Shutdown(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Fatalf("failed to shutdown API server: %s", err)
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
}

Check warning on line 54 in api/api.go

View check run for this annotation

Codecov / codecov/patch

api/api.go#L53-L54

Added lines #L53 - L54 were not covered by tests
}
return shutdownCallback, nil
}
103 changes: 103 additions & 0 deletions api/api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package api

import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/require"

"github.com/algorand/conduit/conduit/pipeline"
)

type mockStatusProvider struct {
s pipeline.Status
err error
}

func (m mockStatusProvider) Status() (pipeline.Status, error) {
if m.err != nil {
return pipeline.Status{}, m.err
}
return m.s, nil
}

func TestStartServer_BadAddress(t *testing.T) {
l, h := test.NewNullLogger()
// capture the fatal log if any.
l.ExitFunc = func(int) {}
sp := mockStatusProvider{}

shutdown, err := StartServer(l, sp, "bad address")
defer shutdown(context.Background())
require.NoError(t, err)
time.Sleep(1 * time.Millisecond)

require.Len(t, h.Entries, 1)
require.Equal(t, h.LastEntry().Level, logrus.FatalLevel)
}

func TestStartServer_GracefulShutdown(t *testing.T) {
l, h := test.NewNullLogger()
// capture the fatal log if any.
l.ExitFunc = func(int) {}
sp := mockStatusProvider{}
shutdown, err := StartServer(l, sp, "bad address")
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
defer shutdown(context.Background())
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)
require.Len(t, h.Entries, 0)
}

func TestStartServer_HealthCheck(t *testing.T) {
l, _ := test.NewNullLogger()
// capture the fatal log if any.
l.ExitFunc = func(int) {}
sp := mockStatusProvider{
s: pipeline.Status{
Round: 999,
},
}

// Find an open port...
listener, err := net.Listen("tcp", ":0")
addr := listener.Addr().String()
listener.Close()
require.NoError(t, err)

// Start server.
shutdown, err := StartServer(l, sp, addr)
defer shutdown(context.Background())
require.NoError(t, err)

// Make request.
resp, err := http.Get("http://" + addr + "/health")
require.NoError(t, err)

// Make sure we got the right response.
require.Equal(t, http.StatusOK, resp.StatusCode)
var respStatus pipeline.Status
json.NewDecoder(resp.Body).Decode(&respStatus)
require.NoError(t, err)
require.Equal(t, sp.s, respStatus)
}

func TestHealthHandlerError(t *testing.T) {
sp := mockStatusProvider{
err: fmt.Errorf("some error"),
}
handler := makeHealthHandler(sp)
rec := httptest.NewRecorder()
handler(rec, nil)

// validate response
resp := rec.Result()
require.Equal(t, http.StatusInternalServerError, resp.StatusCode)
require.Contains(t, rec.Body.String(), "some error")
}
6 changes: 6 additions & 0 deletions conduit/data/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@
Password string `yaml:"password"`
}

// API defines parameters for the Conduit API server.
type API struct {
Address string `yaml:"addr"`
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
}

// Config stores configuration specific to the conduit pipeline
type Config struct {
// ConduitArgs are the program inputs. Should not be serialized for config.
Expand All @@ -72,6 +77,7 @@
Processors []NameConfigPair `yaml:"processors"`
Exporter NameConfigPair `yaml:"exporter"`
Metrics Metrics `yaml:"metrics"`
API API `yaml:"api"`
// RetryCount is the number of retries to perform for an error in the pipeline
RetryCount uint64 `yaml:"retry-count"`
// RetryDelay is a duration amount interpreted from a string
Expand All @@ -88,8 +94,8 @@

// If it is a negative time, it is an error
if cfg.RetryDelay < 0 {
return fmt.Errorf("Args.Valid(): invalid retry delay - time duration was negative (%s)", cfg.RetryDelay.String())
}

Check warning on line 98 in conduit/data/config.go

View check run for this annotation

Codecov / codecov/patch

conduit/data/config.go#L97-L98

Added lines #L97 - L98 were not covered by tests

return nil
}
Expand All @@ -101,8 +107,8 @@
}

if !isDir(args.ConduitDataDir) {
return nil, fmt.Errorf("MakePipelineConfig(): invalid data dir '%s'", args.ConduitDataDir)
}

Check warning on line 111 in conduit/data/config.go

View check run for this annotation

Codecov / codecov/patch

conduit/data/config.go#L110-L111

Added lines #L110 - L111 were not covered by tests

// Search for pipeline configuration in data directory
autoloadParamConfigPath, err := getConfigFromDataDir(args.ConduitDataDir, DefaultConfigBaseName, []string{"yml", "yaml"})
Expand All @@ -112,8 +118,8 @@

file, err := os.Open(autoloadParamConfigPath)
if err != nil {
return nil, fmt.Errorf("MakePipelineConfig(): reading config error: %w", err)
}

Check warning on line 122 in conduit/data/config.go

View check run for this annotation

Codecov / codecov/patch

conduit/data/config.go#L121-L122

Added lines #L121 - L122 were not covered by tests

pCfgDecoder := yaml.NewDecoder(file)
// Make sure we are strict about only unmarshalling known fields
Expand All @@ -137,8 +143,8 @@
}

if err := pCfg.Valid(); err != nil {
return nil, fmt.Errorf("MakePipelineConfig(): config file (%s) had mal-formed schema: %w", autoloadParamConfigPath, err)
}

Check warning on line 147 in conduit/data/config.go

View check run for this annotation

Codecov / codecov/patch

conduit/data/config.go#L146-L147

Added lines #L146 - L147 were not covered by tests

return &pCfg, nil
}
Expand Down Expand Up @@ -174,9 +180,9 @@
}

if count > 1 {
return "", fmt.Errorf("config filename (%s) in data directory (%s) matched more than one filetype: %v",
configFilename, dataDirectory, configFileTypes)
}

Check warning on line 185 in conduit/data/config.go

View check run for this annotation

Codecov / codecov/patch

conduit/data/config.go#L183-L185

Added lines #L183 - L185 were not covered by tests

// if count == 0 then the fullpath will be set to "" and error will be nil
// if count == 1 then it fullpath will be correct
Expand Down
23 changes: 23 additions & 0 deletions conduit/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@
Stop()
Error() error
Wait()
Status() (Status, error)
}

// Status is a struct that contains the current pipeline status.
type Status struct {
Round uint64 `json:"round"`
}

type pipelineImpl struct {
Expand All @@ -57,6 +63,9 @@
completeCallback []conduit.OnCompleteFunc

pipelineMetadata state

statusMu sync.Mutex
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
status Status
}

func (p *pipelineImpl) Error() error {
Expand Down Expand Up @@ -108,8 +117,8 @@
func (p *pipelineImpl) makeConfig(cfg data.NameConfigPair, pluginType plugins.PluginType) (*log.Logger, plugins.PluginConfig, error) {
configs, err := yaml.Marshal(cfg.Config)
if err != nil {
return nil, plugins.PluginConfig{}, fmt.Errorf("makeConfig(): could not serialize config: %w", err)
}

Check warning on line 121 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L120-L121

Added lines #L120 - L121 were not covered by tests

lgr := log.New()
lgr.SetOutput(p.logger.Out)
Expand All @@ -122,7 +131,7 @@
config.DataDir = path.Join(p.cfg.ConduitArgs.ConduitDataDir, fmt.Sprintf("%s_%s", pluginType, cfg.Name))
err = os.MkdirAll(config.DataDir, os.ModePerm)
if err != nil {
return nil, plugins.PluginConfig{}, fmt.Errorf("makeConfig: unable to create plugin data directory: %w", err)

Check warning on line 134 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L134

Added line #L134 was not covered by tests
}
}

Expand Down Expand Up @@ -173,8 +182,8 @@
for _, part := range parts {
_, config, err := p.makeConfig(part.cfg, part.t)
if err != nil {
return 0, err
}

Check warning on line 186 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L185-L186

Added lines #L185 - L186 were not covered by tests
rnd, err := part.RoundRequest(config)
if err != nil {
return 0, err
Expand Down Expand Up @@ -204,16 +213,16 @@
telemetryConfig := telemetry.MakeTelemetryConfig(p.cfg.Telemetry.URI, p.cfg.Telemetry.Index, p.cfg.Telemetry.UserName, p.cfg.Telemetry.Password)
telemetryClient, err := telemetry.MakeOpenSearchClient(telemetryConfig)
if err != nil {
return nil, fmt.Errorf("failed to initialize telemetry: %w", err)
}

Check warning on line 217 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L216-L217

Added lines #L216 - L217 were not covered by tests
p.logger.Infof("Telemetry initialized with URI: %s", telemetryConfig.URI)

// If GUID is not in metadata, save it. Otherwise, use the GUID from metadata.
if p.pipelineMetadata.TelemetryID == "" {
p.pipelineMetadata.TelemetryID = telemetryClient.TelemetryConfig.GUID
} else {
telemetryClient.TelemetryConfig.GUID = p.pipelineMetadata.TelemetryID
}

Check warning on line 225 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L224-L225

Added lines #L224 - L225 were not covered by tests

return telemetryClient, nil
}
Expand All @@ -232,12 +241,12 @@
var err error
profFile, err := os.Create(p.cfg.CPUProfile)
if err != nil {
return fmt.Errorf("Pipeline.Init(): unable to create profile: %w", err)

Check warning on line 244 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L244

Added line #L244 was not covered by tests
}
p.profFile = profFile
err = pprof.StartCPUProfile(profFile)
if err != nil {
return fmt.Errorf("Pipeline.Init(): unable to start pprof: %w", err)

Check warning on line 249 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L249

Added line #L249 was not covered by tests
}
}

Expand Down Expand Up @@ -286,7 +295,7 @@
var telemetryErr error
telemetryClient, telemetryErr = p.initializeTelemetry()
if telemetryErr != nil {
p.logger.Warnf("Telemetry initialization failed, continuing without telemetry: %s", telemetryErr)

Check warning on line 298 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L298

Added line #L298 was not covered by tests
} else {
// Try sending a startup event. If it fails, log a warning and continue
event := telemetryClient.MakeTelemetryStartupEvent()
Expand All @@ -304,16 +313,16 @@
{
importerLogger, pluginConfig, err := p.makeConfig(p.cfg.Importer, plugins.Importer)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not make %s config: %w", p.cfg.Importer.Name, err)
}

Check warning on line 317 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L316-L317

Added lines #L316 - L317 were not covered by tests
err = p.importer.Init(p.ctx, *p.initProvider, pluginConfig, importerLogger)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize importer (%s): %w", p.cfg.Importer.Name, err)
}

Check warning on line 321 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L320-L321

Added lines #L320 - L321 were not covered by tests
genesis, err := p.importer.GetGenesis()
if err != nil {
return fmt.Errorf("Pipeline.GetGenesis(): could not obtain Genesis from the importer (%s): %w", p.cfg.Importer.Name, err)
}

Check warning on line 325 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L324-L325

Added lines #L324 - L325 were not covered by tests
(*p.initProvider).SetGenesis(genesis)

// write pipeline metadata
Expand All @@ -326,8 +335,8 @@
p.pipelineMetadata.Network = genesis.Network
err = p.pipelineMetadata.encodeToFile(p.cfg.ConduitArgs.ConduitDataDir)
if err != nil {
return fmt.Errorf("Pipeline.Init() failed to write metadata to file: %w", err)
}

Check warning on line 339 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L338-L339

Added lines #L338 - L339 were not covered by tests

p.logger.Infof("Initialized Importer: %s", p.cfg.Importer.Name)
}
Expand All @@ -337,11 +346,11 @@
ncPair := p.cfg.Processors[idx]
logger, config, err := p.makeConfig(ncPair, plugins.Processor)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", ncPair, err)

Check warning on line 349 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L349

Added line #L349 was not covered by tests
}
err = processor.Init(p.ctx, *p.initProvider, config, logger)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", ncPair.Name, err)

Check warning on line 353 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L353

Added line #L353 was not covered by tests
}
p.logger.Infof("Initialized Processor: %s", ncPair.Name)
}
Expand All @@ -350,12 +359,12 @@
{
logger, config, err := p.makeConfig(p.cfg.Exporter, plugins.Exporter)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", p.cfg.Exporter.Name, err)
}

Check warning on line 363 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L362-L363

Added lines #L362 - L363 were not covered by tests
err = p.exporter.Init(p.ctx, *p.initProvider, config, logger)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize Exporter (%s): %w", p.cfg.Exporter.Name, err)
}

Check warning on line 367 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L366-L367

Added lines #L366 - L367 were not covered by tests
p.logger.Infof("Initialized Exporter: %s", p.cfg.Exporter.Name)
}

Expand All @@ -368,6 +377,10 @@
go p.startMetricsServer()
}

p.statusMu.Lock()
defer p.statusMu.Unlock()
p.status.Round = p.pipelineMetadata.NextRound

return err
}

Expand All @@ -388,20 +401,20 @@
}
}

if err := p.importer.Close(); err != nil {

Check warning on line 404 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L404

Added line #L404 was not covered by tests
// Log and continue on closing the rest of the pipeline
p.logger.Errorf("Pipeline.Stop(): Importer (%s) error on close: %v", p.importer.Metadata().Name, err)

Check warning on line 406 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L406

Added line #L406 was not covered by tests
}

for _, processor := range p.processors {
if err := processor.Close(); err != nil {

Check warning on line 410 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L410

Added line #L410 was not covered by tests
// Log and continue on closing the rest of the pipeline
p.logger.Errorf("Pipeline.Stop(): Processor (%s) error on close: %v", processor.Metadata().Name, err)

Check warning on line 412 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L412

Added line #L412 was not covered by tests
}
}

if err := p.exporter.Close(); err != nil {
p.logger.Errorf("Pipeline.Stop(): Exporter (%s) error on close: %v", p.exporter.Metadata().Name, err)

Check warning on line 417 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L416-L417

Added lines #L416 - L417 were not covered by tests
}
}

Expand Down Expand Up @@ -499,6 +512,9 @@

// Increment Round, update metadata
p.pipelineMetadata.NextRound++
p.statusMu.Lock()
p.status.Round = p.pipelineMetadata.NextRound
p.statusMu.Unlock()
err = p.pipelineMetadata.encodeToFile(p.cfg.ConduitArgs.ConduitDataDir)
if err != nil {
p.logger.Errorf("%v", err)
Expand Down Expand Up @@ -532,6 +548,13 @@
p.wg.Wait()
}

func (p *pipelineImpl) Status() (Status, error) {
p.statusMu.Lock()
ret := p.status
p.statusMu.Unlock()
return ret, nil

Check warning on line 555 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L551-L555

Added lines #L551 - L555 were not covered by tests
}

// start a http server serving /metrics
func (p *pipelineImpl) startMetricsServer() {
http.Handle("/metrics", promhttp.Handler())
Expand Down Expand Up @@ -574,7 +597,7 @@
return nil, fmt.Errorf("MakePipeline(): could not build importer '%s': %w", importerName, err)
}

pipeline.importer = importerConstructor.New()

Check warning on line 600 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L600

Added line #L600 was not covered by tests
logger.Infof("Found Importer: %s", importerName)

// ---
Expand All @@ -582,12 +605,12 @@
for _, processorConfig := range cfg.Processors {
processorName := processorConfig.Name

processorConstructor, err := processors.ProcessorConstructorByName(processorName)

Check warning on line 608 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L608

Added line #L608 was not covered by tests
if err != nil {
return nil, fmt.Errorf("MakePipeline(): could not build processor '%s': %w", processorName, err)
}

pipeline.processors = append(pipeline.processors, processorConstructor.New())

Check warning on line 613 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L613

Added line #L613 was not covered by tests
logger.Infof("Found Processor: %s", processorName)
}

Expand All @@ -595,12 +618,12 @@

exporterName := cfg.Exporter.Name

exporterConstructor, err := exporters.ExporterConstructorByName(exporterName)

Check warning on line 621 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L621

Added line #L621 was not covered by tests
if err != nil {
return nil, fmt.Errorf("MakePipeline(): could not build exporter '%s': %w", exporterName, err)
}

pipeline.exporter = exporterConstructor.New()

Check warning on line 626 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L626

Added line #L626 was not covered by tests
logger.Infof("Found Exporter: %s", exporterName)

return pipeline, nil
Expand All @@ -612,22 +635,22 @@
logger.Infof("Creating PID file at: %s", pidFilePath)
fout, err := os.Create(pidFilePath)
if err != nil {
err = fmt.Errorf("%s: could not create pid file, %v", pidFilePath, err)
logger.Error(err)
return err
}

Check warning on line 641 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L638-L641

Added lines #L638 - L641 were not covered by tests

if _, err = fmt.Fprintf(fout, "%d", os.Getpid()); err != nil {
err = fmt.Errorf("%s: could not write pid file, %v", pidFilePath, err)
logger.Error(err)
return err
}

Check warning on line 647 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L644-L647

Added lines #L644 - L647 were not covered by tests

err = fout.Close()
if err != nil {
err = fmt.Errorf("%s: could not close pid file, %v", pidFilePath, err)
logger.Error(err)
return err
}

Check warning on line 654 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L651-L654

Added lines #L651 - L654 were not covered by tests
return err
}
15 changes: 15 additions & 0 deletions pkg/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"

"github.com/algorand/conduit/api"
"github.com/algorand/conduit/conduit/data"
"github.com/algorand/conduit/conduit/loggers"
"github.com/algorand/conduit/conduit/pipeline"
Expand Down Expand Up @@ -47,8 +48,8 @@

pCfg, err := data.MakePipelineConfig(args)
if err != nil {
return err
}

Check warning on line 52 in pkg/cli/cli.go

View check run for this annotation

Codecov / codecov/patch

pkg/cli/cli.go#L51-L52

Added lines #L51 - L52 were not covered by tests

// Initialize logger
level, err := log.ParseLevel(pCfg.LogLevel)
Expand All @@ -62,8 +63,8 @@

logger, err = loggers.MakeThreadSafeLogger(level, pCfg.LogFile)
if err != nil {
return fmt.Errorf("failed to create logger: %w", err)
}

Check warning on line 67 in pkg/cli/cli.go

View check run for this annotation

Codecov / codecov/patch

pkg/cli/cli.go#L66-L67

Added lines #L66 - L67 were not covered by tests

logger.Infof("Starting Conduit %s", version.LongVersion())
logger.Infof("Using data directory: %s", args.ConduitDataDir)
Expand Down Expand Up @@ -91,26 +92,40 @@
return err
}

err = pipeline.Init()
if err != nil {
// Suppress log, it is about to be printed to stderr.
if pCfg.LogFile != "" {
logger.Error(err)
}
return fmt.Errorf("pipeline init error: %w", err)

Check warning on line 101 in pkg/cli/cli.go

View check run for this annotation

Codecov / codecov/patch

pkg/cli/cli.go#L95-L101

Added lines #L95 - L101 were not covered by tests
}
pipeline.Start()
defer pipeline.Stop()

// Start server
if pCfg.API.Address != "" {
shutdown, err := api.StartServer(logger, pipeline, pCfg.API.Address)
if err != nil {
// Suppress log, it is about to be printed to stderr.
if pCfg.LogFile != "" {
logger.Error(err)
}
return fmt.Errorf("failed to start API server: %w", err)

Check warning on line 114 in pkg/cli/cli.go

View check run for this annotation

Codecov / codecov/patch

pkg/cli/cli.go#L103-L114

Added lines #L103 - L114 were not covered by tests
}
defer shutdown(context.Background())

Check warning on line 116 in pkg/cli/cli.go

View check run for this annotation

Codecov / codecov/patch

pkg/cli/cli.go#L116

Added line #L116 was not covered by tests
}

pipeline.Wait()
return pipeline.Error()

Check warning on line 120 in pkg/cli/cli.go

View check run for this annotation

Codecov / codecov/patch

pkg/cli/cli.go#L119-L120

Added lines #L119 - L120 were not covered by tests
}

// MakeConduitCmdWithUtilities creates the main cobra command with all utilities
func MakeConduitCmdWithUtilities() *cobra.Command {
cmd := MakeConduitCmd()
cmd.AddCommand(initialize.InitCommand)
cmd.AddCommand(list.Command)
return cmd

Check warning on line 128 in pkg/cli/cli.go

View check run for this annotation

Codecov / codecov/patch

pkg/cli/cli.go#L124-L128

Added lines #L124 - L128 were not covered by tests
}

// MakeConduitCmd creates the main cobra command, initializes flags
Expand All @@ -132,17 +147,17 @@
Detailed documentation is online: https://github.com/algorand/conduit`,
Args: cobra.NoArgs,
Run: func(cmd *cobra.Command, args []string) {
err := runConduitCmdWithConfig(cfg)
if err != nil {
fmt.Fprintf(os.Stderr, "\nExiting with error:\n%s.\n", err)
os.Exit(1)
}

Check warning on line 154 in pkg/cli/cli.go

View check run for this annotation

Codecov / codecov/patch

pkg/cli/cli.go#L150-L154

Added lines #L150 - L154 were not covered by tests
},
PersistentPreRun: func(cmd *cobra.Command, args []string) {
if vFlag {
fmt.Printf("%s\n", version.LongVersion())
os.Exit(0)
}

Check warning on line 160 in pkg/cli/cli.go

View check run for this annotation

Codecov / codecov/patch

pkg/cli/cli.go#L156-L160

Added lines #L156 - L160 were not covered by tests
},
}
cmd.Flags().StringVarP(&cfg.ConduitDataDir, "data-dir", "d", "", "Set the Conduit data directory. If not set the CONDUIT_DATA_DIR environment variable is used.")
Expand Down
15 changes: 4 additions & 11 deletions pkg/cli/internal/initialize/conduit.yml.example
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ retry-delay: "1s"
# Whether or not to print the conduit banner on startup.
hide-banner: false

# When the address is not empty information is available on '/health'
api:
addr: ":8981"

# When enabled prometheus metrics are available on '/metrics'
metrics:
mode: OFF
Expand All @@ -33,14 +37,3 @@ processors:
# An exporter is defined to do something with the data.
exporter:
%s

# Enable telemetry for conduit
telemetry:
enabled: false

# By default the following fields will be configured to send data to Algorand.
# To store your own telemetry events, they can be overridden.
# uri: ""
# index: ""
# username: ""
# password: ""
Loading