Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: New API with health endpoint #139

Merged
merged 6 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package api

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"

log "github.com/sirupsen/logrus"

"github.com/algorand/conduit/conduit/pipeline"
)

// StatusProvider is a subset of the Pipeline interface required by the health handler.
type StatusProvider interface {
Status() (pipeline.Status, error)
}

func makeHealthHandler(p StatusProvider) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
status, err := p.Status()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, `{"error": "%s"}`, err)
return
}
w.WriteHeader(http.StatusOK)
data, _ := json.Marshal(status)
fmt.Fprint(w, string(data))
}
}

// StartServer starts an http server that exposes a health check endpoint.
// A callback is returned that can be used to gracefully shutdown the server.
func StartServer(logger *log.Logger, p StatusProvider, address string) (func(ctx context.Context), error) {
mux := http.NewServeMux()
mux.HandleFunc("/health", makeHealthHandler(p))

srv := &http.Server{
Addr: address,
Handler: mux,
}

go func() {
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Fatalf("failed to start API server: %s", err)
}
}()

shutdownCallback := func(ctx context.Context) {
if err := srv.Shutdown(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Fatalf("failed to shutdown API server: %s", err)
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
}

Check warning on line 54 in api/api.go

View check run for this annotation

Codecov / codecov/patch

api/api.go#L53-L54

Added lines #L53 - L54 were not covered by tests
}
return shutdownCallback, nil
}
103 changes: 103 additions & 0 deletions api/api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package api

import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/require"

"github.com/algorand/conduit/conduit/pipeline"
)

type mockStatusProvider struct {
s pipeline.Status
err error
}

func (m mockStatusProvider) Status() (pipeline.Status, error) {
if m.err != nil {
return pipeline.Status{}, m.err
}
return m.s, nil
}

func TestStartServer_BadAddress(t *testing.T) {
l, h := test.NewNullLogger()
// capture the fatal log if any.
l.ExitFunc = func(int) {}
sp := mockStatusProvider{}

shutdown, err := StartServer(l, sp, "bad address")
defer shutdown(context.Background())
require.NoError(t, err)
time.Sleep(1 * time.Millisecond)

require.Len(t, h.Entries, 1)
require.Equal(t, h.LastEntry().Level, logrus.FatalLevel)
}

func TestStartServer_GracefulShutdown(t *testing.T) {
l, h := test.NewNullLogger()
// capture the fatal log if any.
l.ExitFunc = func(int) {}
sp := mockStatusProvider{}
shutdown, err := StartServer(l, sp, "bad address")
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
defer shutdown(context.Background())
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)
require.Len(t, h.Entries, 0)
}

func TestStartServer_HealthCheck(t *testing.T) {
l, _ := test.NewNullLogger()
// capture the fatal log if any.
l.ExitFunc = func(int) {}
sp := mockStatusProvider{
s: pipeline.Status{
Round: 999,
},
}

// Find an open port...
listener, err := net.Listen("tcp", ":0")
addr := listener.Addr().String()
listener.Close()
require.NoError(t, err)

// Start server.
shutdown, err := StartServer(l, sp, addr)
defer shutdown(context.Background())
require.NoError(t, err)

// Make request.
resp, err := http.Get("http://" + addr + "/health")
require.NoError(t, err)

// Make sure we got the right response.
require.Equal(t, http.StatusOK, resp.StatusCode)
var respStatus pipeline.Status
json.NewDecoder(resp.Body).Decode(&respStatus)
require.NoError(t, err)
require.Equal(t, sp.s, respStatus)
}

func TestHealthHandlerError(t *testing.T) {
sp := mockStatusProvider{
err: fmt.Errorf("some error"),
}
handler := makeHealthHandler(sp)
rec := httptest.NewRecorder()
handler(rec, nil)

// validate response
resp := rec.Result()
require.Equal(t, http.StatusInternalServerError, resp.StatusCode)
require.Contains(t, rec.Body.String(), "some error")
}
6 changes: 6 additions & 0 deletions conduit/data/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@
Password string `yaml:"password"`
}

// API defines parameters for the Conduit API server.
type API struct {
Address string `yaml:"addr"`
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
}

// Config stores configuration specific to the conduit pipeline
type Config struct {
// ConduitArgs are the program inputs. Should not be serialized for config.
Expand All @@ -72,6 +77,7 @@
Processors []NameConfigPair `yaml:"processors"`
Exporter NameConfigPair `yaml:"exporter"`
Metrics Metrics `yaml:"metrics"`
API API `yaml:"api"`
// RetryCount is the number of retries to perform for an error in the pipeline
RetryCount uint64 `yaml:"retry-count"`
// RetryDelay is a duration amount interpreted from a string
Expand All @@ -88,8 +94,8 @@

// If it is a negative time, it is an error
if cfg.RetryDelay < 0 {
return fmt.Errorf("Args.Valid(): invalid retry delay - time duration was negative (%s)", cfg.RetryDelay.String())
}

Check warning on line 98 in conduit/data/config.go

View check run for this annotation

Codecov / codecov/patch

conduit/data/config.go#L97-L98

Added lines #L97 - L98 were not covered by tests

return nil
}
Expand All @@ -101,8 +107,8 @@
}

if !isDir(args.ConduitDataDir) {
return nil, fmt.Errorf("MakePipelineConfig(): invalid data dir '%s'", args.ConduitDataDir)
}

Check warning on line 111 in conduit/data/config.go

View check run for this annotation

Codecov / codecov/patch

conduit/data/config.go#L110-L111

Added lines #L110 - L111 were not covered by tests

// Search for pipeline configuration in data directory
autoloadParamConfigPath, err := getConfigFromDataDir(args.ConduitDataDir, DefaultConfigBaseName, []string{"yml", "yaml"})
Expand All @@ -112,8 +118,8 @@

file, err := os.Open(autoloadParamConfigPath)
if err != nil {
return nil, fmt.Errorf("MakePipelineConfig(): reading config error: %w", err)
}

Check warning on line 122 in conduit/data/config.go

View check run for this annotation

Codecov / codecov/patch

conduit/data/config.go#L121-L122

Added lines #L121 - L122 were not covered by tests

pCfgDecoder := yaml.NewDecoder(file)
// Make sure we are strict about only unmarshalling known fields
Expand All @@ -137,8 +143,8 @@
}

if err := pCfg.Valid(); err != nil {
return nil, fmt.Errorf("MakePipelineConfig(): config file (%s) had mal-formed schema: %w", autoloadParamConfigPath, err)
}

Check warning on line 147 in conduit/data/config.go

View check run for this annotation

Codecov / codecov/patch

conduit/data/config.go#L146-L147

Added lines #L146 - L147 were not covered by tests

return &pCfg, nil
}
Expand Down Expand Up @@ -174,9 +180,9 @@
}

if count > 1 {
return "", fmt.Errorf("config filename (%s) in data directory (%s) matched more than one filetype: %v",
configFilename, dataDirectory, configFileTypes)
}

Check warning on line 185 in conduit/data/config.go

View check run for this annotation

Codecov / codecov/patch

conduit/data/config.go#L183-L185

Added lines #L183 - L185 were not covered by tests

// if count == 0 then the fullpath will be set to "" and error will be nil
// if count == 1 then it fullpath will be correct
Expand Down
23 changes: 23 additions & 0 deletions conduit/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@
Stop()
Error() error
Wait()
Status() (Status, error)
}

// Status is a struct that contains the current pipeline status.
type Status struct {
Round uint64 `json:"round"`
}

type pipelineImpl struct {
Expand All @@ -58,6 +64,9 @@
completeCallback []conduit.OnCompleteFunc

pipelineMetadata state

statusMu sync.Mutex
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
status Status
}

type pluginChannel chan data.BlockData
Expand Down Expand Up @@ -122,8 +131,8 @@
func (p *pipelineImpl) makeConfig(cfg data.NameConfigPair, pluginType plugins.PluginType) (*log.Logger, plugins.PluginConfig, error) {
configs, err := yaml.Marshal(cfg.Config)
if err != nil {
return nil, plugins.PluginConfig{}, fmt.Errorf("makeConfig(): could not serialize config: %w", err)
}

Check warning on line 135 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L134-L135

Added lines #L134 - L135 were not covered by tests

lgr := log.New()
lgr.SetOutput(p.logger.Out)
Expand All @@ -136,7 +145,7 @@
config.DataDir = path.Join(p.cfg.ConduitArgs.ConduitDataDir, fmt.Sprintf("%s_%s", pluginType, cfg.Name))
err = os.MkdirAll(config.DataDir, os.ModePerm)
if err != nil {
return nil, plugins.PluginConfig{}, fmt.Errorf("makeConfig: unable to create plugin data directory: %w", err)

Check warning on line 148 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L148

Added line #L148 was not covered by tests
}
}

Expand Down Expand Up @@ -187,8 +196,8 @@
for _, part := range parts {
_, config, err := p.makeConfig(part.cfg, part.t)
if err != nil {
return 0, err
}

Check warning on line 200 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L199-L200

Added lines #L199 - L200 were not covered by tests
rnd, err := part.RoundRequest(config)
if err != nil {
return 0, err
Expand Down Expand Up @@ -218,16 +227,16 @@
telemetryConfig := telemetry.MakeTelemetryConfig(p.cfg.Telemetry.URI, p.cfg.Telemetry.Index, p.cfg.Telemetry.UserName, p.cfg.Telemetry.Password)
telemetryClient, err := telemetry.MakeOpenSearchClient(telemetryConfig)
if err != nil {
return nil, fmt.Errorf("failed to initialize telemetry: %w", err)
}

Check warning on line 231 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L230-L231

Added lines #L230 - L231 were not covered by tests
p.logger.Infof("Telemetry initialized with URI: %s", telemetryConfig.URI)

// If GUID is not in metadata, save it. Otherwise, use the GUID from metadata.
if p.pipelineMetadata.TelemetryID == "" {
p.pipelineMetadata.TelemetryID = telemetryClient.TelemetryConfig.GUID
} else {
telemetryClient.TelemetryConfig.GUID = p.pipelineMetadata.TelemetryID
}

Check warning on line 239 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L238-L239

Added lines #L238 - L239 were not covered by tests

return telemetryClient, nil
}
Expand All @@ -246,12 +255,12 @@
var err error
profFile, err := os.Create(p.cfg.CPUProfile)
if err != nil {
return fmt.Errorf("Pipeline.Init(): unable to create profile: %w", err)

Check warning on line 258 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L258

Added line #L258 was not covered by tests
}
p.profFile = profFile
err = pprof.StartCPUProfile(profFile)
if err != nil {
return fmt.Errorf("Pipeline.Init(): unable to start pprof: %w", err)

Check warning on line 263 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L263

Added line #L263 was not covered by tests
}
}

Expand Down Expand Up @@ -300,7 +309,7 @@
var telemetryErr error
telemetryClient, telemetryErr = p.initializeTelemetry()
if telemetryErr != nil {
p.logger.Warnf("Telemetry initialization failed, continuing without telemetry: %s", telemetryErr)

Check warning on line 312 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L312

Added line #L312 was not covered by tests
} else {
// Try sending a startup event. If it fails, log a warning and continue
event := telemetryClient.MakeTelemetryStartupEvent()
Expand All @@ -318,16 +327,16 @@
{
importerLogger, pluginConfig, err := p.makeConfig(p.cfg.Importer, plugins.Importer)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not make %s config: %w", p.cfg.Importer.Name, err)
}

Check warning on line 331 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L330-L331

Added lines #L330 - L331 were not covered by tests
err = p.importer.Init(p.ctx, *p.initProvider, pluginConfig, importerLogger)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize importer (%s): %w", p.cfg.Importer.Name, err)
}

Check warning on line 335 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L334-L335

Added lines #L334 - L335 were not covered by tests
genesis, err := p.importer.GetGenesis()
if err != nil {
return fmt.Errorf("Pipeline.GetGenesis(): could not obtain Genesis from the importer (%s): %w", p.cfg.Importer.Name, err)
}

Check warning on line 339 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L338-L339

Added lines #L338 - L339 were not covered by tests
(*p.initProvider).SetGenesis(genesis)

// write pipeline metadata
Expand All @@ -340,8 +349,8 @@
p.pipelineMetadata.Network = genesis.Network
err = p.pipelineMetadata.encodeToFile(p.cfg.ConduitArgs.ConduitDataDir)
if err != nil {
return fmt.Errorf("Pipeline.Init() failed to write metadata to file: %w", err)
}

Check warning on line 353 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L352-L353

Added lines #L352 - L353 were not covered by tests

p.logger.Infof("Initialized Importer: %s", p.cfg.Importer.Name)
}
Expand All @@ -351,11 +360,11 @@
ncPair := p.cfg.Processors[idx]
logger, config, err := p.makeConfig(ncPair, plugins.Processor)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", ncPair, err)

Check warning on line 363 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L363

Added line #L363 was not covered by tests
}
err = processor.Init(p.ctx, *p.initProvider, config, logger)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", ncPair.Name, err)

Check warning on line 367 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L367

Added line #L367 was not covered by tests
}
p.logger.Infof("Initialized Processor: %s", ncPair.Name)
}
Expand All @@ -364,12 +373,12 @@
{
logger, config, err := p.makeConfig(p.cfg.Exporter, plugins.Exporter)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", p.cfg.Exporter.Name, err)
}

Check warning on line 377 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L376-L377

Added lines #L376 - L377 were not covered by tests
err = p.exporter.Init(p.ctx, *p.initProvider, config, logger)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize Exporter (%s): %w", p.cfg.Exporter.Name, err)
}

Check warning on line 381 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L380-L381

Added lines #L380 - L381 were not covered by tests
p.logger.Infof("Initialized Exporter: %s", p.cfg.Exporter.Name)
}

Expand All @@ -382,11 +391,15 @@
go p.startMetricsServer()
}

p.statusMu.Lock()
defer p.statusMu.Unlock()
p.status.Round = p.pipelineMetadata.NextRound

return err
}

func (p *pipelineImpl) Stop() {
p.ccf(nil)

Check warning on line 402 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L402

Added line #L402 was not covered by tests
p.wg.Wait()

if p.profFile != nil {
Expand All @@ -402,21 +415,21 @@
}
}

if err := p.importer.Close(); err != nil {

Check warning on line 418 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L418

Added line #L418 was not covered by tests
// Log and continue on closing the rest of the pipeline
p.logger.Errorf("Pipeline.Stop(): Importer (%s) error on close: %v", p.importer.Metadata().Name, err)

Check warning on line 420 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L420

Added line #L420 was not covered by tests
}

for _, processor := range p.processors {
if err := processor.Close(); err != nil {

Check warning on line 424 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L424

Added line #L424 was not covered by tests
// Log and continue on closing the rest of the pipeline
p.logger.Errorf("Pipeline.Stop(): Processor (%s) error on close: %v", processor.Metadata().Name, err)

Check warning on line 426 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L426

Added line #L426 was not covered by tests
}
}

if err := p.exporter.Close(); err != nil {
p.logger.Errorf("Pipeline.Stop(): Exporter (%s) error on close: %v", p.exporter.Metadata().Name, err)
}

Check warning on line 432 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L430-L432

Added lines #L430 - L432 were not covered by tests
}

func numInnerTxn(txn sdk.SignedTxnWithAD) int {
Expand Down Expand Up @@ -459,8 +472,8 @@
for {
startRound := time.Now()
select {
case <-p.ctx.Done():
p.logger.Infof("importer handler exiting. lastRnd=%d", lastRnd)

Check warning on line 476 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L475-L476

Added lines #L475 - L476 were not covered by tests
return
case rnd := <-roundChan:
lastRnd = rnd
Expand Down Expand Up @@ -580,9 +593,9 @@
lastRound = blk.Round()

if p.pipelineMetadata.NextRound != lastRound {
lastError = fmt.Errorf("aborting after out of order block data. %d != %d", p.pipelineMetadata.NextRound, lastRound)
return
}

Check warning on line 598 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L596-L598

Added lines #L596 - L598 were not covered by tests

var exportTime time.Duration
exportTime, lastError = RetriesNoOutput(exporter.Receive, blk, p, eName)
Expand All @@ -602,11 +615,14 @@
// Increment Round, update metadata
nextRound := lastRound + 1
p.pipelineMetadata.NextRound = nextRound
p.statusMu.Lock()
p.status.Round = nextRound
p.statusMu.Unlock()
lastError = p.pipelineMetadata.encodeToFile(p.cfg.ConduitArgs.ConduitDataDir)
if lastError != nil {
lastError = fmt.Errorf("aborting after updating NextRound=%d BUT failing to save metadata: %w", nextRound, lastError)
return
}

Check warning on line 625 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L623-L625

Added lines #L623 - L625 were not covered by tests
p.logger.Tracef("exporter %s incremented pipeline metadata NextRound to %d", eName, nextRound)

for i, cb := range p.completeCallback {
Expand Down Expand Up @@ -688,6 +704,13 @@
p.wg.Wait()
}

func (p *pipelineImpl) Status() (Status, error) {
p.statusMu.Lock()
ret := p.status
p.statusMu.Unlock()
return ret, nil

Check warning on line 711 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L707-L711

Added lines #L707 - L711 were not covered by tests
}

// start a http server serving /metrics
func (p *pipelineImpl) startMetricsServer() {
http.Handle("/metrics", promhttp.Handler())
Expand Down Expand Up @@ -730,7 +753,7 @@
return nil, fmt.Errorf("MakePipeline(): could not build importer '%s': %w", importerName, err)
}

pipeline.importer = importerConstructor.New()

Check warning on line 756 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L756

Added line #L756 was not covered by tests
logger.Infof("Found Importer: %s", importerName)

// ---
Expand All @@ -738,12 +761,12 @@
for _, processorConfig := range cfg.Processors {
processorName := processorConfig.Name

processorConstructor, err := processors.ProcessorConstructorByName(processorName)

Check warning on line 764 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L764

Added line #L764 was not covered by tests
if err != nil {
return nil, fmt.Errorf("MakePipeline(): could not build processor '%s': %w", processorName, err)
}

pipeline.processors = append(pipeline.processors, processorConstructor.New())

Check warning on line 769 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L769

Added line #L769 was not covered by tests
logger.Infof("Found Processor: %s", processorName)
}

Expand All @@ -751,12 +774,12 @@

exporterName := cfg.Exporter.Name

exporterConstructor, err := exporters.ExporterConstructorByName(exporterName)

Check warning on line 777 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L777

Added line #L777 was not covered by tests
if err != nil {
return nil, fmt.Errorf("MakePipeline(): could not build exporter '%s': %w", exporterName, err)
}

pipeline.exporter = exporterConstructor.New()

Check warning on line 782 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L782

Added line #L782 was not covered by tests
logger.Infof("Found Exporter: %s", exporterName)

return pipeline, nil
Expand All @@ -768,22 +791,22 @@
logger.Infof("Creating PID file at: %s", pidFilePath)
fout, err := os.Create(pidFilePath)
if err != nil {
err = fmt.Errorf("%s: could not create pid file, %v", pidFilePath, err)
logger.Error(err)
return err
}

Check warning on line 797 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L794-L797

Added lines #L794 - L797 were not covered by tests

if _, err = fmt.Fprintf(fout, "%d", os.Getpid()); err != nil {
err = fmt.Errorf("%s: could not write pid file, %v", pidFilePath, err)
logger.Error(err)
return err
}

Check warning on line 803 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L800-L803

Added lines #L800 - L803 were not covered by tests

err = fout.Close()
if err != nil {
err = fmt.Errorf("%s: could not close pid file, %v", pidFilePath, err)
logger.Error(err)
return err
}

Check warning on line 810 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L807-L810

Added lines #L807 - L810 were not covered by tests
return err
}
15 changes: 15 additions & 0 deletions pkg/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"

"github.com/algorand/conduit/api"
"github.com/algorand/conduit/conduit/data"
"github.com/algorand/conduit/conduit/loggers"
"github.com/algorand/conduit/conduit/pipeline"
Expand Down Expand Up @@ -47,8 +48,8 @@

pCfg, err := data.MakePipelineConfig(args)
if err != nil {
return err
}

Check warning on line 52 in pkg/cli/cli.go

View check run for this annotation

Codecov / codecov/patch

pkg/cli/cli.go#L51-L52

Added lines #L51 - L52 were not covered by tests

// Initialize logger
level, err := log.ParseLevel(pCfg.LogLevel)
Expand All @@ -62,8 +63,8 @@

logger, err = loggers.MakeThreadSafeLogger(level, pCfg.LogFile)
if err != nil {
return fmt.Errorf("failed to create logger: %w", err)
}

Check warning on line 67 in pkg/cli/cli.go

View check run for this annotation

Codecov / codecov/patch

pkg/cli/cli.go#L66-L67

Added lines #L66 - L67 were not covered by tests

logger.Infof("Starting Conduit %s", version.LongVersion())
logger.Infof("Using data directory: %s", args.ConduitDataDir)
Expand Down Expand Up @@ -91,26 +92,40 @@
return err
}

err = pline.Init()
if err != nil {
// Suppress log, it is about to be printed to stderr.
if pCfg.LogFile != "" {
logger.Error(err)
}
return fmt.Errorf("pipeline init error: %w", err)

Check warning on line 101 in pkg/cli/cli.go

View check run for this annotation

Codecov / codecov/patch

pkg/cli/cli.go#L95-L101

Added lines #L95 - L101 were not covered by tests
}
pline.Start()
defer pline.Stop()

// Start server
if pCfg.API.Address != "" {
shutdown, err := api.StartServer(logger, pline, pCfg.API.Address)
if err != nil {
// Suppress log, it is about to be printed to stderr.
if pCfg.LogFile != "" {
logger.Error(err)
}
return fmt.Errorf("failed to start API server: %w", err)

Check warning on line 114 in pkg/cli/cli.go

View check run for this annotation

Codecov / codecov/patch

pkg/cli/cli.go#L103-L114

Added lines #L103 - L114 were not covered by tests
}
defer shutdown(context.Background())

Check warning on line 116 in pkg/cli/cli.go

View check run for this annotation

Codecov / codecov/patch

pkg/cli/cli.go#L116

Added line #L116 was not covered by tests
}

pline.Wait()
return pline.Error()

Check warning on line 120 in pkg/cli/cli.go

View check run for this annotation

Codecov / codecov/patch

pkg/cli/cli.go#L119-L120

Added lines #L119 - L120 were not covered by tests
}

// MakeConduitCmdWithUtilities creates the main cobra command with all utilities
func MakeConduitCmdWithUtilities() *cobra.Command {
cmd := MakeConduitCmd()
cmd.AddCommand(initialize.InitCommand)
cmd.AddCommand(list.Command)
return cmd

Check warning on line 128 in pkg/cli/cli.go

View check run for this annotation

Codecov / codecov/patch

pkg/cli/cli.go#L124-L128

Added lines #L124 - L128 were not covered by tests
}

// MakeConduitCmd creates the main cobra command, initializes flags
Expand All @@ -132,17 +147,17 @@
Detailed documentation is online: https://github.com/algorand/conduit`,
Args: cobra.NoArgs,
Run: func(cmd *cobra.Command, args []string) {
err := runConduitCmdWithConfig(cfg)
if err != nil {
fmt.Fprintf(os.Stderr, "\nExiting with error:\t%s.\n", err)
os.Exit(1)
}

Check warning on line 154 in pkg/cli/cli.go

View check run for this annotation

Codecov / codecov/patch

pkg/cli/cli.go#L150-L154

Added lines #L150 - L154 were not covered by tests
},
PersistentPreRun: func(cmd *cobra.Command, args []string) {
if vFlag {
fmt.Printf("%s\n", version.LongVersion())
os.Exit(0)
}

Check warning on line 160 in pkg/cli/cli.go

View check run for this annotation

Codecov / codecov/patch

pkg/cli/cli.go#L156-L160

Added lines #L156 - L160 were not covered by tests
},
}
cmd.Flags().StringVarP(&cfg.ConduitDataDir, "data-dir", "d", "", "Set the Conduit data directory. If not set the CONDUIT_DATA_DIR environment variable is used.")
Expand Down
15 changes: 4 additions & 11 deletions pkg/cli/internal/initialize/conduit.yml.example
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ retry-delay: "1s"
# Whether or not to print the conduit banner on startup.
hide-banner: false

# When the address is not empty information is available on '/health'
api:
addr: ":8981"

# When enabled prometheus metrics are available on '/metrics'
metrics:
mode: OFF
Expand All @@ -33,14 +37,3 @@ processors:
# An exporter is defined to do something with the data.
exporter:
%s

# Enable telemetry for conduit
telemetry:
enabled: false

# By default the following fields will be configured to send data to Algorand.
# To store your own telemetry events, they can be overridden.
# uri: ""
# index: ""
# username: ""
# password: ""
Loading