diff --git a/conduit/pipeline/common.go b/conduit/pipeline/common.go index 49090b62..4c31a3ff 100644 --- a/conduit/pipeline/common.go +++ b/conduit/pipeline/common.go @@ -1,6 +1,14 @@ package pipeline -import log "github.com/sirupsen/logrus" +import ( + "context" + "fmt" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/algorand/conduit/conduit/data" +) // HandlePanic function to log panics in a common way func HandlePanic(logger *log.Logger) { @@ -8,3 +16,68 @@ func HandlePanic(logger *log.Logger) { logger.Panicf("conduit pipeline experienced a panic: %v", r) } } + +type empty struct{} + +type pluginInput interface { + uint64 | data.BlockData | string +} + +type pluginOutput interface { + pluginInput | empty +} + +// Retries is a wrapper for retrying a function call f() with a cancellation context, +// a delay and a max retry count. It attempts to call the wrapped function at least once +// and only after the first attempt will pay attention to a context cancellation. +// This can allow the pipeline to receive a cancellation and guarantee attempting to finish +// the round with at least one attempt for each pipeline component. +// - Retry behavior is configured by p.cfg.retryCount. +// - when 0, the function will retry forever or until the context is canceled +// - when > 0, the function will retry p.cfg.retryCount times before giving up +// +// - Upon success: +// - a nil error is returned even if there were intermediate failures +// - the returned duration dur measures the time spent in the call to f() that succeeded +// +// - Upon failure: +// - the return value y is the zero value of type Y and a non-nil error is returned +// - when p.cfg.retryCount > 0, the error will be a join of all the errors encountered during the retries +// - when p.cfg.retryCount == 0, the error will be the last error encountered +// - the returned duration dur is the total time spent in the function, including retries +func Retries[X pluginInput, Y pluginOutput](f func(x X) (Y, error), x X, p *pipelineImpl, msg string) (y Y, dur time.Duration, err error) { + start := time.Now() + + for i := uint64(0); p.cfg.RetryCount == 0 || i <= p.cfg.RetryCount; i++ { + // the first time through, we don't sleep or mind ctx's done signal + if i > 0 { + select { + case <-p.ctx.Done(): + dur = time.Since(start) + err = fmt.Errorf("%s: retry number %d/%d with err: %w. Done signal received: %w", msg, i, p.cfg.RetryCount, err, context.Cause(p.ctx)) + return + default: + time.Sleep(p.cfg.RetryDelay) + } + } + opStart := time.Now() + y, err = f(x) + if err == nil { + dur = time.Since(opStart) + return + } + p.logger.Infof("%s: retry number %d/%d with err: %v", msg, i, p.cfg.RetryCount, err) + } + + dur = time.Since(start) + err = fmt.Errorf("%s: giving up after %d retries: %w", msg, p.cfg.RetryCount, err) + return +} + +// RetriesNoOutput applies the same logic as Retries, but for functions that return no output. +func RetriesNoOutput[X pluginInput](f func(x X) error, a X, p *pipelineImpl, msg string) (time.Duration, error) { + _, d, err := Retries(func(x X) (empty, error) { + return empty{}, f(x) + }, a, p, msg) + return d, err +} diff --git a/conduit/pipeline/common_test.go b/conduit/pipeline/common_test.go new file mode 100644 index 00000000..cd9dcd4a --- /dev/null +++ b/conduit/pipeline/common_test.go @@ -0,0 +1,213 @@ +package pipeline + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/algorand/conduit/conduit/data" + log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" +) + +// TestRetries tests the retry logic +func TestRetries(t *testing.T) { + errSentinelCause := errors.New("succeed after has failed") + + succeedAfterFactory := func(succeedAfter uint64, never bool) func(uint64) (uint64, error) { + tries := uint64(0) + + return func(x uint64) (uint64, error) { + if tries >= succeedAfter && !never { + // ensure not to return the zero value on success + return tries + 1, nil + } + tries++ + return 0, fmt.Errorf("%w: tries=%d", errSentinelCause, tries-1) + } + } + + succeedAfterFactoryNoOutput := func(succeedAfter uint64, never bool) func(uint64) error { + tries := uint64(0) + + return func(x uint64) error { + if tries >= succeedAfter && !never { + return nil + } + tries++ + return fmt.Errorf("%w: tries=%d", errSentinelCause, tries-1) + } + } + + cases := []struct { + name string + retryCount uint64 + succeedAfter uint64 + neverSucceed bool // neverSucceed trumps succeedAfter + }{ + { + name: "retry forever succeeds after 0", + retryCount: 0, + succeedAfter: 0, + neverSucceed: false, + }, + { + name: "retry forever succeeds after 1", + retryCount: 0, + succeedAfter: 1, + neverSucceed: false, + }, + { + name: "retry forever succeeds after 7", + retryCount: 0, + succeedAfter: 7, + neverSucceed: false, + }, + { + name: "retry 5 succeeds after 0", + retryCount: 5, + succeedAfter: 0, + neverSucceed: false, + }, + { + name: "retry 5 succeeds after 1", + retryCount: 5, + succeedAfter: 1, + neverSucceed: false, + }, + { + name: "retry 5 succeeds after 5", + retryCount: 5, + succeedAfter: 5, + neverSucceed: false, + }, + { + name: "retry 5 succeeds after 7", + retryCount: 5, + succeedAfter: 7, + neverSucceed: false, + }, + { + name: "retry 5 never succeeds", + retryCount: 5, + succeedAfter: 0, + neverSucceed: true, + }, + { + name: "retry foerever never succeeds", + retryCount: 0, + succeedAfter: 0, + neverSucceed: true, + }, + } + + for _, tc := range cases { + tc := tc + + // run cases for Retries() + t.Run("Retries() "+tc.name, func(t *testing.T) { + t.Parallel() + ctx, ccf := context.WithCancelCause(context.Background()) + p := &pipelineImpl{ + ctx: ctx, + ccf: ccf, + logger: log.New(), + cfg: &data.Config{ + RetryCount: tc.retryCount, + RetryDelay: 1 * time.Millisecond, + }, + } + succeedAfter := succeedAfterFactory(tc.succeedAfter, tc.neverSucceed) + + if tc.retryCount == 0 && tc.neverSucceed { + // avoid infinite loop by cancelling the context + + yChan := make(chan uint64) + errChan := make(chan error) + go func() { + y, _, err := Retries(succeedAfter, 0, p, "test") + yChan <- y + errChan <- err + }() + time.Sleep(5 * time.Millisecond) + errTestCancelled := errors.New("test cancelled") + go func() { + ccf(errTestCancelled) + }() + y := <-yChan + err := <-errChan + require.ErrorIs(t, err, errTestCancelled, tc.name) + require.ErrorIs(t, err, errSentinelCause, tc.name) + require.Zero(t, y, tc.name) + return + } + + y, _, err := Retries(succeedAfter, 0, p, "test") + if tc.retryCount == 0 { // WLOG tc.neverSucceed == false + require.NoError(t, err, tc.name) + + // note we subtract 1 from y below because succeedAfter has added 1 to its output + // to disambiguate with the zero value which occurs on failure + require.Equal(t, tc.succeedAfter, y-1, tc.name) + } else { // retryCount > 0 so doesn't retry forever + if tc.neverSucceed || tc.succeedAfter > tc.retryCount { + require.ErrorContains(t, err, fmt.Sprintf("%d", tc.retryCount), tc.name) + require.ErrorIs(t, err, errSentinelCause, tc.name) + require.Zero(t, y, tc.name) + } else { // !tc.neverSucceed && succeedAfter <= retryCount + require.NoError(t, err, tc.name) + require.Equal(t, tc.succeedAfter, y-1, tc.name) + } + } + }) + + // run cases for RetriesNoOutput() + t.Run("RetriesNoOutput() "+tc.name, func(t *testing.T) { + t.Parallel() + ctx, ccf := context.WithCancelCause(context.Background()) + p := &pipelineImpl{ + ctx: ctx, + ccf: ccf, + logger: log.New(), + cfg: &data.Config{ + RetryCount: tc.retryCount, + RetryDelay: 1 * time.Millisecond, + }, + } + succeedAfterNoOutput := succeedAfterFactoryNoOutput(tc.succeedAfter, tc.neverSucceed) + + if tc.retryCount == 0 && tc.neverSucceed { + // avoid infinite loop by cancelling the context + + errChan := make(chan error) + go func() { + _, err := RetriesNoOutput(succeedAfterNoOutput, 0, p, "test") + errChan <- err + }() + time.Sleep(5 * time.Millisecond) + errTestCancelled := errors.New("test cancelled") + go func() { + ccf(errTestCancelled) + }() + err := <-errChan + require.ErrorIs(t, err, errTestCancelled, tc.name) + require.ErrorIs(t, err, errSentinelCause, tc.name) + return + } + + _, err := RetriesNoOutput(succeedAfterNoOutput, 0, p, "test") + if tc.retryCount == 0 { // WLOG tc.neverSucceed == false + require.NoError(t, err, tc.name) + } else { // retryCount > 0 so doesn't retry forever + if tc.neverSucceed || tc.succeedAfter > tc.retryCount { + require.ErrorContains(t, err, fmt.Sprintf("%d", tc.retryCount), tc.name) + require.ErrorIs(t, err, errSentinelCause, tc.name) + } else { // !tc.neverSucceed && succeedAfter <= retryCount + require.NoError(t, err, tc.name) + } + } + }) + } +} diff --git a/conduit/pipeline/pipeline.go b/conduit/pipeline/pipeline.go index b77529d4..c492ae0c 100644 --- a/conduit/pipeline/pipeline.go +++ b/conduit/pipeline/pipeline.go @@ -3,6 +3,7 @@ package pipeline import ( "context" "encoding/base64" + "errors" "fmt" "net/http" "os" @@ -41,13 +42,13 @@ type Pipeline interface { type pipelineImpl struct { ctx context.Context - cf context.CancelFunc + ccf context.CancelCauseFunc wg sync.WaitGroup cfg *data.Config logger *log.Logger profFile *os.File err error - mu sync.RWMutex + errMu sync.RWMutex initProvider *data.InitProvider @@ -59,16 +60,29 @@ type pipelineImpl struct { pipelineMetadata state } +type pluginChannel chan data.BlockData + +var ( + errImporterCause = errors.New("importer cancelled") + errProcessorCause = errors.New("processor cancelled") + errExporterCause = errors.New("exporter cancelled") +) + func (p *pipelineImpl) Error() error { - p.mu.RLock() - defer p.mu.RUnlock() + p.errMu.RLock() + defer p.errMu.RUnlock() return p.err } -func (p *pipelineImpl) setError(err error) { - p.mu.Lock() - defer p.mu.Unlock() - p.err = err +func (p *pipelineImpl) joinError(err error) { + p.errMu.Lock() + defer p.errMu.Unlock() + p.err = errors.Join(p.err, err) +} + +func (p *pipelineImpl) cancelWithProblem(err error) { + p.ccf(err) + p.joinError(err) } func (p *pipelineImpl) registerLifecycleCallbacks() { @@ -372,7 +386,7 @@ func (p *pipelineImpl) Init() error { } func (p *pipelineImpl) Stop() { - p.cf() + p.ccf(nil) p.wg.Wait() if p.profFile != nil { @@ -431,103 +445,245 @@ func addMetrics(block data.BlockData, importTime time.Duration) { metrics.ImportedTxnsPerBlock.Observe(float64(len(block.Payset)) + float64(innerTxn)) } -// Start pushes block data through the pipeline -func (p *pipelineImpl) Start() { +func (p *pipelineImpl) importerHandler(importer importers.Importer, roundChan <-chan uint64, blkOutChan pluginChannel) { p.wg.Add(1) - retry := uint64(0) go func() { defer p.wg.Done() - // We need to add a separate recover function here since it launches its own go-routine - defer HandlePanic(p.logger) + lastRnd := uint64(0) + totalSelectWait := time.Duration(0) + totalFeedWait := time.Duration(0) + defer func() { + p.logger.Debugf("importer handler exiting. lastRnd=%d totalSelectWait=%dms; totalFeedWait=%dms", lastRnd, totalSelectWait.Milliseconds(), totalFeedWait.Milliseconds()) + }() + for { - pipelineRun: - metrics.PipelineRetryCount.Observe(float64(retry)) - if retry > p.cfg.RetryCount && p.cfg.RetryCount != 0 { - p.logger.Errorf("Pipeline has exceeded maximum retry count (%d) - stopping...", p.cfg.RetryCount) + startRound := time.Now() + select { + case <-p.ctx.Done(): + p.logger.Infof("importer handler exiting. lastRnd=%d", lastRnd) return + case rnd := <-roundChan: + lastRnd = rnd + waitTime := time.Since(startRound) + totalSelectWait += waitTime + p.logger.Tracef("importer handler waited %dms to receive round %d", waitTime.Milliseconds(), rnd) + + blkData, importTime, lastError := Retries(importer.GetBlock, rnd, p, importer.Metadata().Name) + if lastError != nil { + p.cancelWithProblem(fmt.Errorf("importer %s handler (%w): failed to import round %d after %dms: %w", importer.Metadata().Name, errImporterCause, rnd, importTime.Milliseconds(), lastError)) + return + } + metrics.ImporterTimeSeconds.Observe(importTime.Seconds()) + p.logger.Tracef("importer handler imported round %d in %dms", rnd, importTime.Milliseconds()) + + // TODO: Verify that the block was built with a known protocol version. + + importFinish := time.Now() + + // check context in case out channel is full. + select { + case <-p.ctx.Done(): + return + case blkOutChan <- blkData: + waitTime := time.Since(importFinish) + totalFeedWait += waitTime + p.logger.Tracef("imported round %d into blkOutChan after waiting %dms on channel", rnd, waitTime.Milliseconds()) + } + } + } + }() +} + +func (p *pipelineImpl) processorHandler(idx int, proc processors.Processor, blkInChan pluginChannel, blkOutChan pluginChannel) { + p.wg.Add(1) + go func() { + defer p.wg.Done() + var lastRnd uint64 + totalSelectWait := time.Duration(0) + totalFeedWait := time.Duration(0) + defer func() { + p.logger.Debugf("processor[%d] %s handler exiting. lastRnd=%d totalSelectWait=%dms, totalFeedWait=%dms", idx, proc.Metadata().Name, lastRnd, totalSelectWait.Milliseconds(), totalFeedWait.Milliseconds()) + }() - if retry > 0 { - p.logger.Infof("Retry number %d resuming after a %s retry delay.", retry, p.cfg.RetryDelay) - time.Sleep(p.cfg.RetryDelay) + for { + selectStart := time.Now() + select { + case <-p.ctx.Done(): + p.logger.Infof("processor[%d] %s handler exiting lastRnd=%d", idx, proc.Metadata().Name, lastRnd) + return + case blk := <-blkInChan: + waitTime := time.Since(selectStart) + totalSelectWait += waitTime + p.logger.Tracef("processor[%d] %s handler received block data for round %d after wait of %s", idx, proc.Metadata().Name, lastRnd, waitTime) + lastRnd = blk.Round() + + var procTime time.Duration + var lastError error + blk, procTime, lastError = Retries(proc.Process, blk, p, proc.Metadata().Name) + if lastError != nil { + p.cancelWithProblem(fmt.Errorf("processor[%d] %s handler (%w): failed to process round %d after %dms: %w", idx, proc.Metadata().Name, errProcessorCause, lastRnd, procTime.Milliseconds(), lastError)) + return + } + metrics.ProcessorTimeSeconds.WithLabelValues(proc.Metadata().Name).Observe(procTime.Seconds()) + + selectStart := time.Now() + // check context in case out channel is full. + select { + case <-p.ctx.Done(): + return + case blkOutChan <- blk: + waitTime := time.Since(selectStart) + totalFeedWait += waitTime + p.logger.Tracef("processor[%d] %s for round %d into blkOutChan after waiting %dms", idx, proc.Metadata().Name, lastRnd, totalFeedWait.Milliseconds()) + } } + } + }() +} +// exporterHandler handles the exporter's Receive method, updating the metadata's NextRound, +// saving the metadata, and invoking the activated plugin callbacks. If the context's cancellation is received +// before the exporter's Receive method has succeeded, cancel immediately. +// However, after the exporter's Receive() method has succeeded we try every component at least once +// but will abort if a failure occurs after a cancellation is received. +func (p *pipelineImpl) exporterHandler(exporter exporters.Exporter, blkChan pluginChannel) { + p.wg.Add(1) + go func() { + defer p.wg.Done() + var lastError error + lastRound := uint64(0) + totalSelectWait := time.Duration(0) + totalExportWork := time.Duration(0) + eName := exporter.Metadata().Name + + defer func() { + p.logger.Debugf("exporter %s handler exiting. lastRnd=%d totalSelectWait=%dms, totalExportWork=%dms", eName, lastRound, totalSelectWait.Milliseconds(), totalExportWork.Milliseconds()) + if lastError != nil { + err := fmt.Errorf("exporter %s handler (%w) after round %d: %w", eName, errExporterCause, lastRound, lastError) + p.cancelWithProblem(err) + p.logger.Error(err) + } + }() + + for { + p.logger.Tracef("exporter handler waiting for block data. round=%d", lastRound) + selectStart := time.Now() select { case <-p.ctx.Done(): + p.logger.Infof("exporter handler exiting lastRnd=%d", lastRound) return - default: - { - p.logger.Infof("Pipeline round: %v", p.pipelineMetadata.NextRound) - // fetch block - importStart := time.Now() - blkData, err := p.importer.GetBlock(p.pipelineMetadata.NextRound) - if err != nil { - p.logger.Errorf("%v", err) - p.setError(err) - retry++ - goto pipelineRun - } - metrics.ImporterTimeSeconds.Observe(time.Since(importStart).Seconds()) - - // TODO: Verify that the block was built with a known protocol version. - - // Start time currently measures operations after block fetching is complete. - // This is for backwards compatibility w/ Indexer's metrics - // run through processors - start := time.Now() - for _, proc := range p.processors { - processorStart := time.Now() - blkData, err = proc.Process(blkData) - if err != nil { - p.logger.Errorf("%v", err) - p.setError(err) - retry++ - goto pipelineRun - } - metrics.ProcessorTimeSeconds.WithLabelValues(proc.Metadata().Name).Observe(time.Since(processorStart).Seconds()) - } - // run through exporter - exporterStart := time.Now() - err = p.exporter.Receive(blkData) - if err != nil { - p.logger.Errorf("%v", err) - p.setError(err) - retry++ - goto pipelineRun - } - p.logger.Infof("round r=%d (%d txn) exported in %s", p.pipelineMetadata.NextRound, len(blkData.Payset), time.Since(start)) + case blk := <-blkChan: + waitTime := time.Since(selectStart) + totalSelectWait += waitTime + p.logger.Tracef("exporter handler received block data for round %d after wait of %s", lastRound, waitTime) + lastRound = blk.Round() + + if p.pipelineMetadata.NextRound != lastRound { + lastError = fmt.Errorf("aborting after out of order block data. %d != %d", p.pipelineMetadata.NextRound, lastRound) + return + } - // Increment Round, update metadata - p.pipelineMetadata.NextRound++ - err = p.pipelineMetadata.encodeToFile(p.cfg.ConduitArgs.ConduitDataDir) - if err != nil { - p.logger.Errorf("%v", err) - } + var exportTime time.Duration + exportTime, lastError = RetriesNoOutput(exporter.Receive, blk, p, eName) + if lastError != nil { + lastError = fmt.Errorf("aborting after failing to export round %d: %w", lastRound, lastError) + return + } + metrics.ExporterTimeSeconds.Observe(exportTime.Seconds()) + // Ignore round 0 (which is empty). + if lastRound > 0 { + // Previously we reported time starting after block fetching is complete + // through the end of the export operation. Now that each plugin is running + // in its own goroutine, report only the time of the export. + addMetrics(blk, exportTime) + } - // Callback Processors - for _, cb := range p.completeCallback { - err = cb(blkData) - if err != nil { - p.logger.Errorf("%v", err) - p.setError(err) - retry++ - goto pipelineRun - } - } - metrics.ExporterTimeSeconds.Observe(time.Since(exporterStart).Seconds()) - // Ignore round 0 (which is empty). - if p.pipelineMetadata.NextRound > 1 { - addMetrics(blkData, time.Since(start)) + // Increment Round, update metadata + nextRound := lastRound + 1 + p.pipelineMetadata.NextRound = nextRound + lastError = p.pipelineMetadata.encodeToFile(p.cfg.ConduitArgs.ConduitDataDir) + if lastError != nil { + lastError = fmt.Errorf("aborting after updating NextRound=%d BUT failing to save metadata: %w", nextRound, lastError) + return + } + p.logger.Tracef("exporter %s incremented pipeline metadata NextRound to %d", eName, nextRound) + + for i, cb := range p.completeCallback { + p.logger.Tracef("exporter %s @ round=%d NextRound=%d executing callback %d", eName, lastRound, nextRound, i) + callbackErr := cb(blk) + if callbackErr != nil { + p.logger.Errorf( + "exporter %s # round %d failed callback #%d but CONTINUING to NextRound=%d: %v", + eName, lastRound, i, nextRound, lastError, + ) } - p.setError(nil) - retry = 0 } - } + lastError = nil + // WARNING: removing/re-log-levelling the following will BREAK: + // - the E2E test (Search for "Pipeline round" in subslurp.py) + // - the internal tools logstats collector (See func ConduitCollector in logstats.go of internal-tools repo) + p.logger.Infof(logstatsE2Elog(nextRound, lastRound, len(blk.Payset), exportTime)) + } } }() } +func logstatsE2Elog(nextRound, lastRound uint64, topLevelTxnCount int, exportTime time.Duration) string { + return fmt.Sprintf( + "UPDATED Pipeline NextRound=%d. FINISHED Pipeline round r=%d (%d txn) exported in %s", + nextRound, + lastRound, + topLevelTxnCount, + exportTime, + ) +} + +// Start pushes block data through the pipeline +func (p *pipelineImpl) Start() { + p.logger.Trace("Pipeline.Start()") + + // Setup channels + roundChan := make(chan uint64) + processorBlkInChan := make(pluginChannel) + p.importerHandler(p.importer, roundChan, processorBlkInChan) + + var processorBlkOutChan pluginChannel + for i, proc := range p.processors { + processorBlkOutChan = make(pluginChannel) + p.processorHandler(i, proc, processorBlkInChan, processorBlkOutChan) + processorBlkInChan = processorBlkOutChan + } + + p.exporterHandler(p.exporter, processorBlkInChan) + + p.wg.Add(1) + // Main loop + go func(startRound uint64) { + defer p.wg.Done() + rnd := startRound + totalFeedWait := time.Duration(0) + defer p.logger.Debugf("round channel feed exiting. lastRnd=%d totalFeedWait=%dms", rnd, totalFeedWait.Milliseconds()) + for { + selectStart := time.Now() + p.logger.Infof("Pipeline round kickoff: %v", rnd) + p.logger.Tracef("pushing round %d into roundChan", rnd) + select { + case <-p.ctx.Done(): + p.logger.Infof("round channel feed exiting. lastRnd=%d", rnd) + return + case roundChan <- rnd: + waitTime := time.Since(selectStart) + totalFeedWait += waitTime + rnd++ + } + } + }(p.pipelineMetadata.NextRound) + + <-p.ctx.Done() +} + func (p *pipelineImpl) Wait() { p.wg.Wait() } @@ -554,11 +710,11 @@ func MakePipeline(ctx context.Context, cfg *data.Config, logger *log.Logger) (Pi return nil, fmt.Errorf("MakePipeline(): logger was empty") } - cancelContext, cancelFunc := context.WithCancel(ctx) + cancelContext, cancelCauseFunc := context.WithCancelCause(ctx) pipeline := &pipelineImpl{ ctx: cancelContext, - cf: cancelFunc, + ccf: cancelCauseFunc, cfg: cfg, logger: logger, initProvider: nil, diff --git a/conduit/pipeline/pipeline_bench_test.go b/conduit/pipeline/pipeline_bench_test.go new file mode 100644 index 00000000..5b48bc7f --- /dev/null +++ b/conduit/pipeline/pipeline_bench_test.go @@ -0,0 +1,296 @@ +package pipeline + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/algorand/conduit/conduit/data" + "github.com/algorand/conduit/conduit/plugins" + "github.com/algorand/conduit/conduit/plugins/processors" + sdk "github.com/algorand/go-algorand-sdk/v2/types" + "github.com/prometheus/client_golang/prometheus" + log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + logLevel = log.ErrorLevel // log.DebugLevel // log.InfoLevel // log.TraceLevel // + retryCount = 3 // math.MaxUint64 +) + +type sleepingImporter struct { + cfg plugins.PluginConfig + genesis sdk.Genesis + finalRound sdk.Round + getBlockSleep time.Duration // when non-0, sleep when GetBlock() even in the case of an error + returnError bool + onCompleteError bool + subsystem string + rndOverride uint64 + rndReqErr error +} + +func (m *sleepingImporter) Init(_ context.Context, _ data.InitProvider, cfg plugins.PluginConfig, _ *log.Logger) error { + m.cfg = cfg + return nil +} + +func (m *sleepingImporter) GetGenesis() (*sdk.Genesis, error) { + return &m.genesis, nil +} + +func (m *sleepingImporter) Close() error { + return nil +} + +func (m *sleepingImporter) Metadata() plugins.Metadata { + return plugins.Metadata{Name: "sleepingImporter"} +} + +func (m *sleepingImporter) GetBlock(rnd uint64) (data.BlockData, error) { + if m.getBlockSleep > 0 { + time.Sleep(m.getBlockSleep) + } + var err error + if m.returnError { + err = fmt.Errorf("importer") + } + return data.BlockData{BlockHeader: sdk.BlockHeader{Round: sdk.Round(rnd)}}, err +} + +func (m *sleepingImporter) OnComplete(input data.BlockData) error { + var err error + if m.onCompleteError { + err = fmt.Errorf("on complete") + } + m.finalRound = sdk.Round(input.BlockHeader.Round) + return err +} + +func (m *sleepingImporter) ProvideMetrics(subsystem string) []prometheus.Collector { + m.subsystem = subsystem + return nil +} + +func (m *sleepingImporter) RoundRequest(_ plugins.PluginConfig) (uint64, error) { + return m.rndOverride, m.rndReqErr +} + +type sleepingProcessor struct { + cfg plugins.PluginConfig + finalRound sdk.Round + processSleep time.Duration // when non-0, sleep when Process() even in the case of an error + returnError bool + onCompleteError bool + rndOverride uint64 + rndReqErr error +} + +func (m *sleepingProcessor) Init(_ context.Context, _ data.InitProvider, cfg plugins.PluginConfig, _ *log.Logger) error { + m.cfg = cfg + return nil +} + +func (m *sleepingProcessor) Close() error { + return nil +} + +func (m *sleepingProcessor) RoundRequest(_ plugins.PluginConfig) (uint64, error) { + return m.rndOverride, m.rndReqErr +} + +func (m *sleepingProcessor) Metadata() plugins.Metadata { + return plugins.Metadata{ + Name: "sleepingProcessor", + } +} + +func (m *sleepingProcessor) Process(input data.BlockData) (data.BlockData, error) { + if m.processSleep > 0 { + time.Sleep(m.processSleep) + } + var err error + if m.returnError { + err = fmt.Errorf("process") + } + return input, err +} + +func (m *sleepingProcessor) OnComplete(input data.BlockData) error { + var err error + if m.onCompleteError { + err = fmt.Errorf("on complete") + } + m.finalRound = sdk.Round(input.BlockHeader.Round) + return err +} + +type sleepingExporter struct { + cfg plugins.PluginConfig + finalRound sdk.Round + receiveSleep time.Duration // when non-0, sleep when Receive() even in the case of an error + returnError bool + onCompleteError bool + rndOverride uint64 + rndReqErr error +} + +func (m *sleepingExporter) Metadata() plugins.Metadata { + return plugins.Metadata{ + Name: "sleepingExporter", + } +} + +func (m *sleepingExporter) RoundRequest(_ plugins.PluginConfig) (uint64, error) { + return m.rndOverride, m.rndReqErr +} + +func (m *sleepingExporter) Init(_ context.Context, _ data.InitProvider, cfg plugins.PluginConfig, _ *log.Logger) error { + m.cfg = cfg + return nil +} + +func (m *sleepingExporter) Close() error { + return nil +} + +func (m *sleepingExporter) Receive(exportData data.BlockData) error { + if m.receiveSleep > 0 { + time.Sleep(m.receiveSleep) + } + var err error + if m.returnError { + err = fmt.Errorf("receive") + } + return err +} + +func (m *sleepingExporter) OnComplete(input data.BlockData) error { + var err error + if m.onCompleteError { + err = fmt.Errorf("on complete") + } + m.finalRound = sdk.Round(input.BlockHeader.Round) + return err +} + +type benchmarkCase struct { + name string + channelBuffSize int + importerSleep time.Duration + processorsSleep []time.Duration + exporterSleep time.Duration +} + +func pipeline5sec(b *testing.B, testCase benchmarkCase) int { + benchmarkDuration := 5 * time.Second + + importer := &sleepingImporter{getBlockSleep: testCase.importerSleep} + processors := make([]processors.Processor, len(testCase.processorsSleep)) + for i, procSleep := range testCase.processorsSleep { + processors[i] = &sleepingProcessor{processSleep: procSleep} + } + exporter := &sleepingExporter{receiveSleep: testCase.exporterSleep} + + ctx, ccf := context.WithCancelCause(context.Background()) + + logger := log.New() + logger.SetLevel(logLevel) + pImpl := pipelineImpl{ + ctx: ctx, + ccf: ccf, + logger: logger, + initProvider: nil, + importer: importer, + processors: processors, + exporter: exporter, + pipelineMetadata: state{ + NextRound: 0, + GenesisHash: "", + }, + cfg: &data.Config{ + RetryDelay: 0 * time.Second, + RetryCount: retryCount, + ConduitArgs: &data.Args{ + ConduitDataDir: b.TempDir(), + }, + }, + } + + pImpl.registerLifecycleCallbacks() + + // cancel the pipeline after 5 seconds + go func() { + time.Sleep(benchmarkDuration) + ccf(errors.New("benchmark timeout")) + }() + + b.StartTimer() + pImpl.Start() + pImpl.Wait() + assert.NoError(b, pImpl.Error()) + + fRound, err := finalRound(&pImpl) + require.NoError(b, err) + return int(fRound) +} + +func finalRound(pi *pipelineImpl) (sdk.Round, error) { + if mExp, ok := pi.exporter.(*sleepingExporter); ok { + return mExp.finalRound, nil + } + return 0, fmt.Errorf("not a sleepingExporter: %t", pi.exporter) +} + +func BenchmarkPipeline(b *testing.B) { + benchCases := []benchmarkCase{ + { + name: "vanilla 2 procs without sleep", + importerSleep: 0, + processorsSleep: []time.Duration{0, 0}, + exporterSleep: 0, + }, + { + name: "uniform sleep of 10ms", + importerSleep: 10 * time.Millisecond, + processorsSleep: []time.Duration{10 * time.Millisecond, 10 * time.Millisecond}, + exporterSleep: 10 * time.Millisecond, + }, + { + name: "exporter 10ms while others 1ms", + importerSleep: time.Millisecond, + processorsSleep: []time.Duration{time.Millisecond, time.Millisecond}, + exporterSleep: 10 * time.Millisecond, + }, + { + name: "importer 10ms while others 1ms", + importerSleep: 10 * time.Millisecond, + processorsSleep: []time.Duration{time.Millisecond, time.Millisecond}, + exporterSleep: time.Millisecond, + }, + { + name: "first processor 10ms while others 1ms", + importerSleep: time.Millisecond, + processorsSleep: []time.Duration{10 * time.Millisecond, time.Millisecond}, + exporterSleep: time.Millisecond, + }, + } + for _, buffSize := range []int{1} { + for _, bc := range benchCases { + bc.channelBuffSize = buffSize + b.Run(fmt.Sprintf("%s-size-%d", bc.name, bc.channelBuffSize), func(b *testing.B) { + rounds := 0 + for i := 0; i < b.N; i++ { + rounds += pipeline5sec(b, bc) + } + secs := b.Elapsed().Seconds() + rps := float64(rounds) / secs + b.ReportMetric(rps, "rounds/sec") + }) + } + } +} diff --git a/conduit/pipeline/pipeline_test.go b/conduit/pipeline/pipeline_test.go index 6392a74e..ea8db42c 100644 --- a/conduit/pipeline/pipeline_test.go +++ b/conduit/pipeline/pipeline_test.go @@ -34,10 +34,14 @@ import ( "github.com/algorand/conduit/conduit/telemetry" ) +const ( + initialRound = 1337 +) + // a unique block data to validate with tests var uniqueBlockData = data.BlockData{ BlockHeader: sdk.BlockHeader{ - Round: 1337, + Round: initialRound, }, } @@ -52,6 +56,8 @@ type mockImporter struct { subsystem string rndOverride uint64 rndReqErr error + getBlockCalls int + onCompleteCalls int } func (m *mockImporter) Init(_ context.Context, _ data.InitProvider, cfg plugins.PluginConfig, _ *log.Logger) error { @@ -77,8 +83,12 @@ func (m *mockImporter) GetBlock(rnd uint64) (data.BlockData, error) { err = fmt.Errorf("importer") } m.Called(rnd) - // Return an error to make sure we - return uniqueBlockData, err + m.getBlockCalls++ + return data.BlockData{ + BlockHeader: sdk.BlockHeader{ + Round: sdk.Round(rnd), + }, + }, err } func (m *mockImporter) OnComplete(input data.BlockData) error { @@ -88,6 +98,7 @@ func (m *mockImporter) OnComplete(input data.BlockData) error { } m.finalRound = sdk.Round(input.BlockHeader.Round) m.Called(input) + m.onCompleteCalls++ return err } @@ -109,6 +120,8 @@ type mockProcessor struct { onCompleteError bool rndOverride uint64 rndReqErr error + processCalls int + onCompleteCalls int } func (m *mockProcessor) Init(_ context.Context, _ data.InitProvider, cfg plugins.PluginConfig, _ *log.Logger) error { @@ -135,8 +148,8 @@ func (m *mockProcessor) Process(input data.BlockData) (data.BlockData, error) { if m.returnError { err = fmt.Errorf("process") } + m.processCalls++ m.Called(input) - input.BlockHeader.Round++ return input, err } @@ -146,6 +159,7 @@ func (m *mockProcessor) OnComplete(input data.BlockData) error { err = fmt.Errorf("on complete") } m.finalRound = sdk.Round(input.BlockHeader.Round) + m.onCompleteCalls++ m.Called(input) return err } @@ -159,6 +173,8 @@ type mockExporter struct { onCompleteError bool rndOverride uint64 rndReqErr error + receiveCalls int + onCompleteCalls int } func (m *mockExporter) Metadata() plugins.Metadata { @@ -185,6 +201,7 @@ func (m *mockExporter) Receive(exportData data.BlockData) error { if m.returnError { err = fmt.Errorf("receive") } + m.receiveCalls++ m.Called(exportData) return err } @@ -195,6 +212,7 @@ func (m *mockExporter) OnComplete(input data.BlockData) error { err = fmt.Errorf("on complete") } m.finalRound = sdk.Round(input.BlockHeader.Round) + m.onCompleteCalls++ m.Called(input) return err } @@ -249,33 +267,31 @@ func mockPipeline(t *testing.T, dataDir string) (*pipelineImpl, *test.Hook, *moc // TestPipelineRun tests that running the pipeline calls the correct functions with mocking func TestPipelineRun(t *testing.T) { - mImporter := mockImporter{} - mImporter.On("GetBlock", mock.Anything).Return(uniqueBlockData, nil) + mImporter := mockImporter{rndOverride: initialRound + 1} + mImporter.On("GetBlock", mock.Anything).Return(mock.Anything, nil) + mImporter.On("OnComplete", mock.Anything).Return(nil) mProcessor := mockProcessor{} - processorData := uniqueBlockData - processorData.BlockHeader.Round++ - mProcessor.On("Process", mock.Anything).Return(processorData) + mProcessor.On("Process", mock.Anything).Return(mock.Anything) mProcessor.On("OnComplete", mock.Anything).Return(nil) mExporter := mockExporter{} mExporter.On("Receive", mock.Anything).Return(nil) + mExporter.On("OnComplete", mock.Anything).Return(nil) var pImporter importers.Importer = &mImporter var pProcessor processors.Processor = &mProcessor var pExporter exporters.Exporter = &mExporter - var cbComplete conduit.Completed = &mProcessor - ctx, cf := context.WithCancel(context.Background()) + ctx, ccf := context.WithCancelCause(context.Background()) l, _ := test.NewNullLogger() pImpl := pipelineImpl{ - ctx: ctx, - cf: cf, - logger: l, - initProvider: nil, - importer: pImporter, - processors: []processors.Processor{pProcessor}, - exporter: pExporter, - completeCallback: []conduit.OnCompleteFunc{cbComplete.OnComplete}, + ctx: ctx, + ccf: ccf, + logger: l, + initProvider: nil, + importer: pImporter, + processors: []processors.Processor{pProcessor}, + exporter: pExporter, pipelineMetadata: state{ NextRound: 0, GenesisHash: "", @@ -283,25 +299,50 @@ func TestPipelineRun(t *testing.T) { cfg: &data.Config{ RetryDelay: 0 * time.Second, RetryCount: math.MaxUint64, + Processors: []data.NameConfigPair{{ + Name: "mockProcessor", + Config: map[string]interface{}{}, + }}, ConduitArgs: &data.Args{ ConduitDataDir: t.TempDir(), }, }, } + errTestCancellation := errors.New("test cancellation") go func() { time.Sleep(1 * time.Second) - cf() + ccf(errTestCancellation) }() + err := pImpl.Init() + assert.NoError(t, err) pImpl.Start() pImpl.Wait() assert.NoError(t, pImpl.Error()) - assert.Equal(t, mProcessor.finalRound, uniqueBlockData.BlockHeader.Round+1) + finishedRounds := mExporter.receiveCalls + assert.Equal(t, finishedRounds, mExporter.onCompleteCalls) + assert.Len(t, mExporter.Calls, 2*finishedRounds) + + assert.Equal(t, finishedRounds, mProcessor.onCompleteCalls) + assert.LessOrEqual(t, mProcessor.processCalls, finishedRounds+1) // 1 additional Process() in the pipeline @ cancellation + assert.GreaterOrEqual(t, mProcessor.processCalls, finishedRounds) // 1 additional Process() in the pipeline @ cancellation + assert.Len(t, mProcessor.Calls, mProcessor.processCalls+mProcessor.onCompleteCalls) + + assert.Equal(t, finishedRounds, mImporter.onCompleteCalls) + assert.LessOrEqual(t, mImporter.getBlockCalls, finishedRounds+2) // 2 additional GetBlock() in the pipeline @ cancellation + assert.GreaterOrEqual(t, mImporter.getBlockCalls, finishedRounds) // 2 additional GetBlock() in the pipeline @ cancellation + assert.Len(t, mImporter.Calls, mImporter.getBlockCalls+mImporter.onCompleteCalls) + + finalRound := sdk.Round(initialRound + finishedRounds) + assert.Equal(t, finalRound, mExporter.finalRound) + assert.Equal(t, finalRound, mProcessor.finalRound) + assert.Equal(t, finalRound, mImporter.finalRound) mock.AssertExpectationsForObjects(t, &mImporter, &mProcessor, &mExporter) + assert.ErrorIs(t, context.Cause(pImpl.ctx), errTestCancellation) } // TestPipelineCpuPidFiles tests that cpu and pid files are created when specified @@ -358,11 +399,11 @@ func TestPipelineErrors(t *testing.T) { var pExporter exporters.Exporter = &mExporter var cbComplete conduit.Completed = &mProcessor - ctx, cf := context.WithCancel(context.Background()) + ctx, ccf := context.WithCancelCause(context.Background()) l, _ := test.NewNullLogger() pImpl := pipelineImpl{ ctx: ctx, - cf: cf, + ccf: ccf, cfg: &data.Config{ RetryDelay: 0 * time.Second, RetryCount: math.MaxUint64, @@ -383,37 +424,38 @@ func TestPipelineErrors(t *testing.T) { go pImpl.Start() time.Sleep(time.Millisecond) - pImpl.cf() + cancel := errors.New("testing timeout") + pImpl.ccf(cancel) pImpl.Wait() assert.Error(t, pImpl.Error(), fmt.Errorf("importer")) mImporter.returnError = false mProcessor.returnError = true - pImpl.ctx, pImpl.cf = context.WithCancel(context.Background()) - pImpl.setError(nil) + pImpl.ctx, pImpl.ccf = context.WithCancelCause(context.Background()) + pImpl.joinError(nil) go pImpl.Start() time.Sleep(time.Millisecond) - pImpl.cf() + pImpl.ccf(cancel) pImpl.Wait() assert.Error(t, pImpl.Error(), fmt.Errorf("process")) mProcessor.returnError = false mProcessor.onCompleteError = true - pImpl.ctx, pImpl.cf = context.WithCancel(context.Background()) - pImpl.setError(nil) + pImpl.ctx, pImpl.ccf = context.WithCancelCause(context.Background()) + pImpl.joinError(nil) go pImpl.Start() time.Sleep(time.Millisecond) - pImpl.cf() + pImpl.ccf(cancel) pImpl.Wait() assert.Error(t, pImpl.Error(), fmt.Errorf("on complete")) mProcessor.onCompleteError = false mExporter.returnError = true - pImpl.ctx, pImpl.cf = context.WithCancel(context.Background()) - pImpl.setError(nil) + pImpl.ctx, pImpl.ccf = context.WithCancelCause(context.Background()) + pImpl.joinError(nil) go pImpl.Start() time.Sleep(time.Millisecond) - pImpl.cf() + pImpl.ccf(cancel) pImpl.Wait() assert.Error(t, pImpl.Error(), fmt.Errorf("exporter")) } @@ -433,11 +475,11 @@ func Test_pipelineImpl_registerLifecycleCallbacks(t *testing.T) { var pProcessor processors.Processor = &mProcessor var pExporter exporters.Exporter = &mExporter - ctx, cf := context.WithCancel(context.Background()) + ctx, ccf := context.WithCancelCause(context.Background()) l, _ := test.NewNullLogger() pImpl := pipelineImpl{ ctx: ctx, - cf: cf, + ccf: ccf, cfg: &data.Config{}, logger: l, initProvider: nil, @@ -771,9 +813,10 @@ func TestPipelineRetryVariables(t *testing.T) { var pProcessor processors.Processor = &mockProcessor{} var pExporter exporters.Exporter = &mockExporter{} l, hook := test.NewNullLogger() - ctx, cf := context.WithCancel(context.Background()) + ctx, ccf := context.WithCancelCause(context.Background()) pImpl := pipelineImpl{ ctx: ctx, + ccf: ccf, cfg: &data.Config{ RetryCount: testCase.retryCount, RetryDelay: testCase.retryDelay, @@ -817,7 +860,7 @@ func TestPipelineRetryVariables(t *testing.T) { go func() { time.Sleep(maxDuration) if !done { - cf() + ccf(errors.New("test over timeout")) assert.Equal(t, testCase.totalDuration, maxDuration) } }() @@ -945,3 +988,28 @@ func TestMetrics(t *testing.T) { } assert.Equal(t, 4, found) } + +func TestLogStatsE2Elog(t *testing.T) { + nextRound := uint64(1337) + round := uint64(42) + numTxns := 13 + duration := 12345600 * time.Microsecond + + expectedLog := "UPDATED Pipeline NextRound=1337. FINISHED Pipeline round r=42 (13 txn) exported in 12.3456s" + log := logstatsE2Elog(nextRound, round, numTxns, duration) + require.Equal(t, expectedLog, log) + + logstatsRex, err := regexp.Compile(`round r=(\d+) \((\d+) txn\) exported in (.*)`) + require.NoError(t, err) + matches := logstatsRex.FindStringSubmatch(log) + require.Len(t, matches, 4) + require.Equal(t, "42", matches[1]) + require.Equal(t, "13", matches[2]) + require.Equal(t, "12.3456s", matches[3]) + + e2eRex, err := regexp.Compile(`FINISHED Pipeline round r=(\d+)`) + require.NoError(t, err) + matches = e2eRex.FindStringSubmatch(log) + require.Len(t, matches, 2) + require.Equal(t, "42", matches[1]) +} diff --git a/docs/PluginDevelopment.md b/docs/PluginDevelopment.md index c09e6c39..189cfc34 100644 --- a/docs/PluginDevelopment.md +++ b/docs/PluginDevelopment.md @@ -62,11 +62,15 @@ The context provided to this function should be saved, and used to terminate any ### Per-round function -Each plugin type has a function which is called once per round: +Each plugin type has a function termed the _Plugin Method_ which is called once per round: * Importer: `GetBlock` is called when a particular round is required. * Processor: `Process` is called to process a round. -* Exporter: `Receive` for consuming a round. +* Exporter: `Receive` for consuming and typically persisting a round. + +> NOTE: When a _Plugin Method_ returns an error, the pipeline will retry according to the configured `retry-count` employing a delay of `retry-delay` +> between tries. In most cases, it is recommended that you use Conduit's +> built-in retry mechanism rather than implementing your own. ### Close @@ -104,6 +108,8 @@ type RoundRequestor interface { When all processing has completed for a round, the `OnComplete` function is called on any plugin that implements it. +> NOTE: The error that `OnComplete()` returns is non-fatal and Conduit's pipeline will continue on to the next round. + ```go // Completed is called by the conduit pipeline after every exporter has // finished. It can be used for things like finalizing state. diff --git a/e2e_tests/src/e2e_conduit/runner.py b/e2e_tests/src/e2e_conduit/runner.py index 5692edc5..a9dbde0c 100644 --- a/e2e_tests/src/e2e_conduit/runner.py +++ b/e2e_tests/src/e2e_conduit/runner.py @@ -47,6 +47,7 @@ def setup_scenario(self, scenario): yaml.dump( { "log-level": "info", + "retry-count": 1, "importer": { "name": scenario.importer.name, "config": scenario.importer.config_input, @@ -91,7 +92,7 @@ def run_scenario(self, scenario): return 1 if indexerout.round < scenario.importer.lastblock: - logger.error("conduit did not reach round={scenario.importer.lastblock}") + logger.error(f"conduit did not reach round={scenario.importer.lastblock}") sys.stderr.write(indexerout.dump()) return 1 diff --git a/e2e_tests/src/e2e_conduit/subslurp.py b/e2e_tests/src/e2e_conduit/subslurp.py index e8f50bdf..796982ce 100644 --- a/e2e_tests/src/e2e_conduit/subslurp.py +++ b/e2e_tests/src/e2e_conduit/subslurp.py @@ -6,29 +6,26 @@ logger = logging.getLogger(__name__) +# Matches conduit log output: +# "UPDATED Pipeline NextRound=1337. FINISHED Pipeline round r=42 (13 txn) exported in 12.3456s" +FINISH_ROUND: re.Pattern = re.compile(b"FINISHED Pipeline round r=(\d+)") + class subslurp: """accumulate stdout or stderr from a subprocess and hold it for debugging if something goes wrong""" + def __init__(self, f): self.f = f self.buf = io.BytesIO() self.gz = gzip.open(self.buf, "wb") self.timeout = timedelta(seconds=120) - # Matches conduit log output: "Pipeline round: 110" - self.round_re = re.compile(b'.*"Pipeline round: ([0-9]+)"') self.round = 0 self.error_log = None - def logIsError(self, log_line): - if b"error" in log_line: - self.error_log = log_line - return True - return False - def tryParseRound(self, log_line): - m = self.round_re.match(log_line) - if m is not None and m.group(1) is not None: - self.round = int(m.group(1)) + match = FINISH_ROUND.search(log_line) + if match and (r := int(match.group(1))) is not None: + self.round = r def run(self, lastround): if len(self.f.peek().strip()) == 0: @@ -41,16 +38,21 @@ def run(self, lastround): datetime.now() - start < self.timeout and datetime.now() - lastlog < timedelta(seconds=15) ): - for line in self.f: + for i, line in enumerate(self.f): lastlog = datetime.now() if self.gz is not None: self.gz.write(line) self.tryParseRound(line) if self.round >= lastround: - logger.info(f"Conduit reached desired lastround: {lastround}") + logger.info( + f"Conduit reached desired lastround after parsing line {i+1}: {lastround}" + ) return - if self.logIsError(line): - raise RuntimeError(f"E2E tests logged an error: {self.error_log}") + if b"error" in line: + # NOTE this quite strict criterion!!! + raise RuntimeError( + f"E2E tests logged an error at line {i+1}: {self.error_log}" + ) def dump(self) -> str: if self.gz is not None: @@ -58,4 +60,4 @@ def dump(self) -> str: self.gz = None self.buf.seek(0) r = gzip.open(self.buf, "rt") - return r.read() # type: ignore + return r.read() # type: ignore diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index 840af6f5..23cf4776 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -80,7 +80,7 @@ func runConduitCmdWithConfig(args *data.Args) error { } ctx := context.Background() - pipeline, err := pipeline.MakePipeline(ctx, pCfg, logger) + pline, err := pipeline.MakePipeline(ctx, pCfg, logger) if err != nil { err = fmt.Errorf("pipeline creation error: %w", err) @@ -91,7 +91,7 @@ func runConduitCmdWithConfig(args *data.Args) error { return err } - err = pipeline.Init() + err = pline.Init() if err != nil { // Suppress log, it is about to be printed to stderr. if pCfg.LogFile != "" { @@ -99,10 +99,10 @@ func runConduitCmdWithConfig(args *data.Args) error { } return fmt.Errorf("pipeline init error: %w", err) } - pipeline.Start() - defer pipeline.Stop() - pipeline.Wait() - return pipeline.Error() + pline.Start() + defer pline.Stop() + pline.Wait() + return pline.Error() } // MakeConduitCmdWithUtilities creates the main cobra command with all utilities @@ -134,7 +134,7 @@ Detailed documentation is online: https://github.com/algorand/conduit`, Run: func(cmd *cobra.Command, args []string) { err := runConduitCmdWithConfig(cfg) if err != nil { - fmt.Fprintf(os.Stderr, "\nExiting with error:\n%s.\n", err) + fmt.Fprintf(os.Stderr, "\nExiting with error:\t%s.\n", err) os.Exit(1) } },