From cf130e20abb0431e504875c7ec0bb7720ac42ae8 Mon Sep 17 00:00:00 2001 From: Zeph Grunschlag Date: Tue, 25 Jul 2023 19:03:46 -0500 Subject: [PATCH] basic logic in place --- conduit/pipeline/common.go | 68 ++++++++++++++++++++++++- conduit/pipeline/pipeline_bench_test.go | 57 +++++++++++---------- conduit/pipeline/pipeline_test.go | 35 ++++++------- 3 files changed, 114 insertions(+), 46 deletions(-) diff --git a/conduit/pipeline/common.go b/conduit/pipeline/common.go index 49090b62..62a2818d 100644 --- a/conduit/pipeline/common.go +++ b/conduit/pipeline/common.go @@ -1,6 +1,13 @@ package pipeline -import log "github.com/sirupsen/logrus" +import ( + "errors" + "fmt" + "time" + + "github.com/algorand/conduit/conduit/data" + log "github.com/sirupsen/logrus" +) // HandlePanic function to log panics in a common way func HandlePanic(logger *log.Logger) { @@ -8,3 +15,62 @@ func HandlePanic(logger *log.Logger) { logger.Panicf("conduit pipeline experienced a panic: %v", r) } } + +type empty struct{} + +// Probly don't need this: +// func EmptyOutputter[X any](f func(a X) error) func(X) (empty, error) { +// return func(a X) (empty, error) { +// return empty{}, f(a) +// } +// } + +type pluginInput interface { + uint64 | data.BlockData | string | empty +} + +func Retries[X, Y pluginInput](f func(x X) (Y, error), x X, p *pipelineImpl, msg string) (*Y, time.Duration, error) { + var err error + start := time.Now() + + for i := uint64(0); p.cfg.RetryCount == 0 || i <= p.cfg.RetryCount; i++ { + // the first time through, we don't sleep or mind the done signal + if i > 0 { + select { + case <-p.ctx.Done(): + return nil, time.Since(start), fmt.Errorf("%s: retry number %d/%d with err: %w. Done signal received: %w", msg, i, p.cfg.RetryCount, err, p.ctx.Err()) + default: + time.Sleep(p.cfg.RetryDelay) + } + } + opStart := time.Now() + y, err2 := f(x) + if err2 == nil { + return &y, time.Since(opStart), nil + } + + p.logger.Infof("%s: retry number %d/%d with err: %v", msg, i, p.cfg.RetryCount, err2) + if p.cfg.RetryCount > 0 { + err = errors.Join(err, err2) + } else { + // in the case of infinite retries, only keep the last error + err = err2 + } + } + + return nil, time.Since(start), fmt.Errorf("%s: giving up after %d retries: %w", msg, p.cfg.RetryCount, err) +} + +func RetriesNoOutput[X pluginInput](f func(a X) error, a X, p *pipelineImpl, msg string) (time.Duration, error) { + _, d, err := Retries(func(a X) (empty, error) { + return empty{}, f(a) + }, a, p, msg) + return d, err +} + +// func RetriesNoOutput[X pluginInput](times int, f func(a X) error, a X) error { +// _, err := Retries(times, func(a X) (empty, error) { +// return empty{}, f(a) +// }, a) +// return err +// } diff --git a/conduit/pipeline/pipeline_bench_test.go b/conduit/pipeline/pipeline_bench_test.go index 93ebee36..833ae8f5 100644 --- a/conduit/pipeline/pipeline_bench_test.go +++ b/conduit/pipeline/pipeline_bench_test.go @@ -2,6 +2,7 @@ package pipeline import ( "context" + "errors" "fmt" "testing" "time" @@ -17,7 +18,7 @@ import ( ) const ( - logLevel = log.ErrorLevel //log.InfoLevel // log.DebugLevel // + logLevel = log.ErrorLevel // log.DebugLevel // log.InfoLevel // log.TraceLevel // retryCount = 3 // math.MaxUint64 ) @@ -195,13 +196,13 @@ func pipeline5sec(b *testing.B, testCase benchmarkCase) int { } exporter := &sleepingExporter{receiveSleep: testCase.exporterSleep} - ctx, cf := context.WithCancel(context.Background()) + ctx, ccf := context.WithCancelCause(context.Background()) logger := log.New() logger.SetLevel(logLevel) pImpl := pipelineImpl{ ctx: ctx, - cf: cf, + ccf: ccf, logger: logger, initProvider: nil, importer: importer, @@ -225,7 +226,7 @@ func pipeline5sec(b *testing.B, testCase benchmarkCase) int { // cancel the pipeline after 5 seconds go func() { time.Sleep(benchmarkDuration) - cf() + ccf(errors.New("benchmark timeout")) }() b.StartTimer() @@ -247,36 +248,36 @@ func finalRound(pi *pipelineImpl) (sdk.Round, error) { func BenchmarkPipeline(b *testing.B) { benchCases := []benchmarkCase{ - { - name: "vanilla 2 procs without sleep", - importerSleep: 0, - processorsSleep: []time.Duration{0, 0}, - exporterSleep: 0, - }, - { - name: "uniform sleep of 10ms", - importerSleep: 10 * time.Millisecond, - processorsSleep: []time.Duration{10 * time.Millisecond, 10 * time.Millisecond}, - exporterSleep: 10 * time.Millisecond, - }, + // { + // name: "vanilla 2 procs without sleep", + // importerSleep: 0, + // processorsSleep: []time.Duration{0, 0}, + // exporterSleep: 0, + // }, + // { + // name: "uniform sleep of 10ms", + // importerSleep: 10 * time.Millisecond, + // processorsSleep: []time.Duration{10 * time.Millisecond, 10 * time.Millisecond}, + // exporterSleep: 10 * time.Millisecond, + // }, { name: "exporter 10ms while others 1ms", importerSleep: time.Millisecond, processorsSleep: []time.Duration{time.Millisecond, time.Millisecond}, exporterSleep: 10 * time.Millisecond, }, - { - name: "importer 10ms while others 1ms", - importerSleep: 10 * time.Millisecond, - processorsSleep: []time.Duration{time.Millisecond, time.Millisecond}, - exporterSleep: time.Millisecond, - }, - { - name: "first processor 10ms while others 1ms", - importerSleep: time.Millisecond, - processorsSleep: []time.Duration{10 * time.Millisecond, time.Millisecond}, - exporterSleep: time.Millisecond, - }, + // { + // name: "importer 10ms while others 1ms", + // importerSleep: 10 * time.Millisecond, + // processorsSleep: []time.Duration{time.Millisecond, time.Millisecond}, + // exporterSleep: time.Millisecond, + // }, + // { + // name: "first processor 10ms while others 1ms", + // importerSleep: time.Millisecond, + // processorsSleep: []time.Duration{10 * time.Millisecond, time.Millisecond}, + // exporterSleep: time.Millisecond, + // }, } for _, buffSize := range []int{1} { for _, bc := range benchCases { diff --git a/conduit/pipeline/pipeline_test.go b/conduit/pipeline/pipeline_test.go index 6392a74e..9feda9ab 100644 --- a/conduit/pipeline/pipeline_test.go +++ b/conduit/pipeline/pipeline_test.go @@ -264,12 +264,12 @@ func TestPipelineRun(t *testing.T) { var pExporter exporters.Exporter = &mExporter var cbComplete conduit.Completed = &mProcessor - ctx, cf := context.WithCancel(context.Background()) + ctx, ccf := context.WithCancelCause(context.Background()) l, _ := test.NewNullLogger() pImpl := pipelineImpl{ ctx: ctx, - cf: cf, + ccf: ccf, logger: l, initProvider: nil, importer: pImporter, @@ -291,7 +291,7 @@ func TestPipelineRun(t *testing.T) { go func() { time.Sleep(1 * time.Second) - cf() + ccf(errors.New("testing timeout")) }() pImpl.Start() @@ -358,11 +358,11 @@ func TestPipelineErrors(t *testing.T) { var pExporter exporters.Exporter = &mExporter var cbComplete conduit.Completed = &mProcessor - ctx, cf := context.WithCancel(context.Background()) + ctx, ccf := context.WithCancelCause(context.Background()) l, _ := test.NewNullLogger() pImpl := pipelineImpl{ ctx: ctx, - cf: cf, + ccf: ccf, cfg: &data.Config{ RetryDelay: 0 * time.Second, RetryCount: math.MaxUint64, @@ -383,37 +383,38 @@ func TestPipelineErrors(t *testing.T) { go pImpl.Start() time.Sleep(time.Millisecond) - pImpl.cf() + cancel := errors.New("testing timeout") + pImpl.ccf(cancel) pImpl.Wait() assert.Error(t, pImpl.Error(), fmt.Errorf("importer")) mImporter.returnError = false mProcessor.returnError = true - pImpl.ctx, pImpl.cf = context.WithCancel(context.Background()) - pImpl.setError(nil) + pImpl.ctx, pImpl.ccf = context.WithCancelCause(context.Background()) + pImpl.joinError(nil) go pImpl.Start() time.Sleep(time.Millisecond) - pImpl.cf() + pImpl.ccf(cancel) pImpl.Wait() assert.Error(t, pImpl.Error(), fmt.Errorf("process")) mProcessor.returnError = false mProcessor.onCompleteError = true - pImpl.ctx, pImpl.cf = context.WithCancel(context.Background()) - pImpl.setError(nil) + pImpl.ctx, pImpl.ccf = context.WithCancelCause(context.Background()) + pImpl.joinError(nil) go pImpl.Start() time.Sleep(time.Millisecond) - pImpl.cf() + pImpl.ccf(cancel) pImpl.Wait() assert.Error(t, pImpl.Error(), fmt.Errorf("on complete")) mProcessor.onCompleteError = false mExporter.returnError = true - pImpl.ctx, pImpl.cf = context.WithCancel(context.Background()) - pImpl.setError(nil) + pImpl.ctx, pImpl.ccf = context.WithCancelCause(context.Background()) + pImpl.joinError(nil) go pImpl.Start() time.Sleep(time.Millisecond) - pImpl.cf() + pImpl.ccf(cancel) pImpl.Wait() assert.Error(t, pImpl.Error(), fmt.Errorf("exporter")) } @@ -433,11 +434,11 @@ func Test_pipelineImpl_registerLifecycleCallbacks(t *testing.T) { var pProcessor processors.Processor = &mProcessor var pExporter exporters.Exporter = &mExporter - ctx, cf := context.WithCancel(context.Background()) + ctx, ccf := context.WithCancelCause(context.Background()) l, _ := test.NewNullLogger() pImpl := pipelineImpl{ ctx: ctx, - cf: cf, + ccf: ccf, cfg: &data.Config{}, logger: l, initProvider: nil,