Skip to content

Commit

Permalink
basic logic in place
Browse files Browse the repository at this point in the history
  • Loading branch information
Zeph Grunschlag committed Jul 26, 2023
1 parent bead065 commit cf130e2
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 46 deletions.
68 changes: 67 additions & 1 deletion conduit/pipeline/common.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,76 @@
package pipeline

import log "github.com/sirupsen/logrus"
import (
"errors"
"fmt"
"time"

"github.com/algorand/conduit/conduit/data"
log "github.com/sirupsen/logrus"
)

// HandlePanic function to log panics in a common way
func HandlePanic(logger *log.Logger) {
if r := recover(); r != nil {
logger.Panicf("conduit pipeline experienced a panic: %v", r)
}
}

type empty struct{}

// Probly don't need this:
// func EmptyOutputter[X any](f func(a X) error) func(X) (empty, error) {
// return func(a X) (empty, error) {
// return empty{}, f(a)
// }
// }

type pluginInput interface {
uint64 | data.BlockData | string | empty
}

func Retries[X, Y pluginInput](f func(x X) (Y, error), x X, p *pipelineImpl, msg string) (*Y, time.Duration, error) {
var err error
start := time.Now()

for i := uint64(0); p.cfg.RetryCount == 0 || i <= p.cfg.RetryCount; i++ {
// the first time through, we don't sleep or mind the done signal
if i > 0 {
select {
case <-p.ctx.Done():
return nil, time.Since(start), fmt.Errorf("%s: retry number %d/%d with err: %w. Done signal received: %w", msg, i, p.cfg.RetryCount, err, p.ctx.Err())
default:
time.Sleep(p.cfg.RetryDelay)
}
}
opStart := time.Now()
y, err2 := f(x)
if err2 == nil {
return &y, time.Since(opStart), nil
}

p.logger.Infof("%s: retry number %d/%d with err: %v", msg, i, p.cfg.RetryCount, err2)
if p.cfg.RetryCount > 0 {
err = errors.Join(err, err2)
} else {
// in the case of infinite retries, only keep the last error
err = err2
}
}

return nil, time.Since(start), fmt.Errorf("%s: giving up after %d retries: %w", msg, p.cfg.RetryCount, err)
}

func RetriesNoOutput[X pluginInput](f func(a X) error, a X, p *pipelineImpl, msg string) (time.Duration, error) {
_, d, err := Retries(func(a X) (empty, error) {
return empty{}, f(a)
}, a, p, msg)
return d, err
}

// func RetriesNoOutput[X pluginInput](times int, f func(a X) error, a X) error {
// _, err := Retries(times, func(a X) (empty, error) {
// return empty{}, f(a)
// }, a)
// return err
// }
57 changes: 29 additions & 28 deletions conduit/pipeline/pipeline_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pipeline

import (
"context"
"errors"
"fmt"
"testing"
"time"
Expand All @@ -17,7 +18,7 @@ import (
)

const (
logLevel = log.ErrorLevel //log.InfoLevel // log.DebugLevel //
logLevel = log.ErrorLevel // log.DebugLevel // log.InfoLevel // log.TraceLevel //
retryCount = 3 // math.MaxUint64
)

Expand Down Expand Up @@ -195,13 +196,13 @@ func pipeline5sec(b *testing.B, testCase benchmarkCase) int {
}
exporter := &sleepingExporter{receiveSleep: testCase.exporterSleep}

ctx, cf := context.WithCancel(context.Background())
ctx, ccf := context.WithCancelCause(context.Background())

logger := log.New()
logger.SetLevel(logLevel)
pImpl := pipelineImpl{
ctx: ctx,
cf: cf,
ccf: ccf,
logger: logger,
initProvider: nil,
importer: importer,
Expand All @@ -225,7 +226,7 @@ func pipeline5sec(b *testing.B, testCase benchmarkCase) int {
// cancel the pipeline after 5 seconds
go func() {
time.Sleep(benchmarkDuration)
cf()
ccf(errors.New("benchmark timeout"))
}()

b.StartTimer()
Expand All @@ -247,36 +248,36 @@ func finalRound(pi *pipelineImpl) (sdk.Round, error) {

func BenchmarkPipeline(b *testing.B) {
benchCases := []benchmarkCase{
{
name: "vanilla 2 procs without sleep",
importerSleep: 0,
processorsSleep: []time.Duration{0, 0},
exporterSleep: 0,
},
{
name: "uniform sleep of 10ms",
importerSleep: 10 * time.Millisecond,
processorsSleep: []time.Duration{10 * time.Millisecond, 10 * time.Millisecond},
exporterSleep: 10 * time.Millisecond,
},
// {
// name: "vanilla 2 procs without sleep",
// importerSleep: 0,
// processorsSleep: []time.Duration{0, 0},
// exporterSleep: 0,
// },
// {
// name: "uniform sleep of 10ms",
// importerSleep: 10 * time.Millisecond,
// processorsSleep: []time.Duration{10 * time.Millisecond, 10 * time.Millisecond},
// exporterSleep: 10 * time.Millisecond,
// },
{
name: "exporter 10ms while others 1ms",
importerSleep: time.Millisecond,
processorsSleep: []time.Duration{time.Millisecond, time.Millisecond},
exporterSleep: 10 * time.Millisecond,
},
{
name: "importer 10ms while others 1ms",
importerSleep: 10 * time.Millisecond,
processorsSleep: []time.Duration{time.Millisecond, time.Millisecond},
exporterSleep: time.Millisecond,
},
{
name: "first processor 10ms while others 1ms",
importerSleep: time.Millisecond,
processorsSleep: []time.Duration{10 * time.Millisecond, time.Millisecond},
exporterSleep: time.Millisecond,
},
// {
// name: "importer 10ms while others 1ms",
// importerSleep: 10 * time.Millisecond,
// processorsSleep: []time.Duration{time.Millisecond, time.Millisecond},
// exporterSleep: time.Millisecond,
// },
// {
// name: "first processor 10ms while others 1ms",
// importerSleep: time.Millisecond,
// processorsSleep: []time.Duration{10 * time.Millisecond, time.Millisecond},
// exporterSleep: time.Millisecond,
// },
}
for _, buffSize := range []int{1} {
for _, bc := range benchCases {
Expand Down
35 changes: 18 additions & 17 deletions conduit/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,12 +264,12 @@ func TestPipelineRun(t *testing.T) {
var pExporter exporters.Exporter = &mExporter
var cbComplete conduit.Completed = &mProcessor

ctx, cf := context.WithCancel(context.Background())
ctx, ccf := context.WithCancelCause(context.Background())

l, _ := test.NewNullLogger()
pImpl := pipelineImpl{
ctx: ctx,
cf: cf,
ccf: ccf,
logger: l,
initProvider: nil,
importer: pImporter,
Expand All @@ -291,7 +291,7 @@ func TestPipelineRun(t *testing.T) {

go func() {
time.Sleep(1 * time.Second)
cf()
ccf(errors.New("testing timeout"))
}()

pImpl.Start()
Expand Down Expand Up @@ -358,11 +358,11 @@ func TestPipelineErrors(t *testing.T) {
var pExporter exporters.Exporter = &mExporter
var cbComplete conduit.Completed = &mProcessor

ctx, cf := context.WithCancel(context.Background())
ctx, ccf := context.WithCancelCause(context.Background())
l, _ := test.NewNullLogger()
pImpl := pipelineImpl{
ctx: ctx,
cf: cf,
ccf: ccf,
cfg: &data.Config{
RetryDelay: 0 * time.Second,
RetryCount: math.MaxUint64,
Expand All @@ -383,37 +383,38 @@ func TestPipelineErrors(t *testing.T) {

go pImpl.Start()
time.Sleep(time.Millisecond)
pImpl.cf()
cancel := errors.New("testing timeout")
pImpl.ccf(cancel)
pImpl.Wait()
assert.Error(t, pImpl.Error(), fmt.Errorf("importer"))

mImporter.returnError = false
mProcessor.returnError = true
pImpl.ctx, pImpl.cf = context.WithCancel(context.Background())
pImpl.setError(nil)
pImpl.ctx, pImpl.ccf = context.WithCancelCause(context.Background())
pImpl.joinError(nil)
go pImpl.Start()
time.Sleep(time.Millisecond)
pImpl.cf()
pImpl.ccf(cancel)
pImpl.Wait()
assert.Error(t, pImpl.Error(), fmt.Errorf("process"))

mProcessor.returnError = false
mProcessor.onCompleteError = true
pImpl.ctx, pImpl.cf = context.WithCancel(context.Background())
pImpl.setError(nil)
pImpl.ctx, pImpl.ccf = context.WithCancelCause(context.Background())
pImpl.joinError(nil)
go pImpl.Start()
time.Sleep(time.Millisecond)
pImpl.cf()
pImpl.ccf(cancel)
pImpl.Wait()
assert.Error(t, pImpl.Error(), fmt.Errorf("on complete"))

mProcessor.onCompleteError = false
mExporter.returnError = true
pImpl.ctx, pImpl.cf = context.WithCancel(context.Background())
pImpl.setError(nil)
pImpl.ctx, pImpl.ccf = context.WithCancelCause(context.Background())
pImpl.joinError(nil)
go pImpl.Start()
time.Sleep(time.Millisecond)
pImpl.cf()
pImpl.ccf(cancel)
pImpl.Wait()
assert.Error(t, pImpl.Error(), fmt.Errorf("exporter"))
}
Expand All @@ -433,11 +434,11 @@ func Test_pipelineImpl_registerLifecycleCallbacks(t *testing.T) {
var pProcessor processors.Processor = &mProcessor
var pExporter exporters.Exporter = &mExporter

ctx, cf := context.WithCancel(context.Background())
ctx, ccf := context.WithCancelCause(context.Background())
l, _ := test.NewNullLogger()
pImpl := pipelineImpl{
ctx: ctx,
cf: cf,
ccf: ccf,
cfg: &data.Config{},
logger: l,
initProvider: nil,
Expand Down

0 comments on commit cf130e2

Please sign in to comment.