Skip to content

Commit

Permalink
common.go unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
Zeph Grunschlag committed Jul 26, 2023
1 parent 66f1288 commit 42fa4fd
Show file tree
Hide file tree
Showing 3 changed files with 224 additions and 32 deletions.
32 changes: 11 additions & 21 deletions conduit/pipeline/common.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pipeline

import (
"context"
"errors"
"fmt"
"time"
Expand All @@ -18,35 +19,29 @@ func HandlePanic(logger *log.Logger) {

type empty struct{}

// Probly don't need this:
// func EmptyOutputter[X any](f func(a X) error) func(X) (empty, error) {
// return func(a X) (empty, error) {
// return empty{}, f(a)
// }
// }

type pluginInput interface {
uint64 | data.BlockData | string | empty
}

func Retries[X, Y pluginInput](f func(x X) (Y, error), x X, p *pipelineImpl, msg string) (*Y, time.Duration, error) {
var err error
func Retries[X, Y pluginInput](f func(x X) (Y, error), x X, p *pipelineImpl, msg string) (y Y, dur time.Duration, err error) {
start := time.Now()

for i := uint64(0); p.cfg.RetryCount == 0 || i <= p.cfg.RetryCount; i++ {
// the first time through, we don't sleep or mind the done signal
// the first time through, we don't sleep or mind ctx's done signal
if i > 0 {
select {
case <-p.ctx.Done():
return nil, time.Since(start), fmt.Errorf("%s: retry number %d/%d with err: %w. Done signal received: %w", msg, i, p.cfg.RetryCount, err, p.ctx.Err())
dur = time.Since(start)
err = fmt.Errorf("%s: retry number %d/%d with err: %w. Done signal received: %w", msg, i, p.cfg.RetryCount, err, context.Cause(p.ctx))
return
default:
time.Sleep(p.cfg.RetryDelay)
}
}
opStart := time.Now()
y, err2 := f(x)
y2, err2 := f(x)
if err2 == nil {
return &y, time.Since(opStart), nil
return y2, time.Since(opStart), nil
}

p.logger.Infof("%s: retry number %d/%d with err: %v", msg, i, p.cfg.RetryCount, err2)
Expand All @@ -58,7 +53,9 @@ func Retries[X, Y pluginInput](f func(x X) (Y, error), x X, p *pipelineImpl, msg
}
}

return nil, time.Since(start), fmt.Errorf("%s: giving up after %d retries: %w", msg, p.cfg.RetryCount, err)
dur = time.Since(start)
err = fmt.Errorf("%s: giving up after %d retries: %w", msg, p.cfg.RetryCount, err)
return
}

func RetriesNoOutput[X pluginInput](f func(a X) error, a X, p *pipelineImpl, msg string) (time.Duration, error) {
Expand All @@ -67,10 +64,3 @@ func RetriesNoOutput[X pluginInput](f func(a X) error, a X, p *pipelineImpl, msg
}, a, p, msg)
return d, err
}

// func RetriesNoOutput[X pluginInput](times int, f func(a X) error, a X) error {
// _, err := Retries(times, func(a X) (empty, error) {
// return empty{}, f(a)
// }, a)
// return err
// }
210 changes: 210 additions & 0 deletions conduit/pipeline/common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package pipeline

import (
"context"
"errors"
"fmt"
"testing"
"time"

"github.com/algorand/conduit/conduit/data"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
)

// TestRetries tests the retry logic
func TestRetries(t *testing.T) {
errSucceedAfter := errors.New("succeed after has failed")

succeedAfterFactory := func(succeedAfter uint64, never bool) func(uint64) (uint64, error) {
tries := uint64(0)

return func(x uint64) (uint64, error) {
if tries >= succeedAfter && !never {
// ensure not to return the zero value on success
return tries + 1, nil
}
tries++
return 0, fmt.Errorf("%w: tries=%d", errSucceedAfter, tries-1)
}
}

succeedAfterFactoryNoOutput := func(succeedAfter uint64, never bool) func(uint64) error {
tries := uint64(0)

return func(x uint64) error {
if tries >= succeedAfter && !never {
return nil
}
tries++
return fmt.Errorf("%w: tries=%d", errSucceedAfter, tries-1)
}
}

cases := []struct {
name string
retryCount uint64
succeedAfter uint64
neverSucceed bool // neverSucceed trumps succeedAfter
}{
{
name: "retry forever succeeds after 0",
retryCount: 0,
succeedAfter: 0,
neverSucceed: false,
},
{
name: "retry forever succeeds after 1",
retryCount: 0,
succeedAfter: 1,
neverSucceed: false,
},
{
name: "retry forever succeeds after 7",
retryCount: 0,
succeedAfter: 7,
neverSucceed: false,
},
{
name: "retry 5 succeeds after 0",
retryCount: 5,
succeedAfter: 0,
neverSucceed: false,
},
{
name: "retry 5 succeeds after 1",
retryCount: 5,
succeedAfter: 1,
neverSucceed: false,
},
{
name: "retry 5 succeeds after 5",
retryCount: 5,
succeedAfter: 5,
neverSucceed: false,
},
{
name: "retry 5 succeeds after 7",
retryCount: 5,
succeedAfter: 7,
neverSucceed: false,
},
{
name: "retry 5 never succeeds",
retryCount: 5,
succeedAfter: 0,
neverSucceed: true,
},
{
name: "retry foerever never succeeds",
retryCount: 0,
succeedAfter: 0,
neverSucceed: true,
},
}

for _, tc := range cases {
tc := tc

// run cases for Retries()
t.Run("Retries() "+tc.name, func(t *testing.T) {
t.Parallel()
ctx, ccf := context.WithCancelCause(context.Background())
p := &pipelineImpl{
ctx: ctx,
ccf: ccf,
logger: log.New(),
cfg: &data.Config{
RetryCount: tc.retryCount,
RetryDelay: 1 * time.Millisecond,
},
}
succeedAfter := succeedAfterFactory(tc.succeedAfter, tc.neverSucceed)

if tc.retryCount == 0 && tc.neverSucceed {
// avoid infinite loop by cancelling the context

yChan := make(chan uint64)
errChan := make(chan error)
go func() {
y, _, err := Retries(succeedAfter, 0, p, "test")
yChan <- y
errChan <- err
}()
time.Sleep(5 * time.Millisecond)
errTestCancelled := errors.New("test cancelled")
go func() {
ccf(errTestCancelled)
}()
y := <-yChan
err := <-errChan
require.ErrorIs(t, err, errTestCancelled, tc.name)
require.ErrorIs(t, err, errSucceedAfter, tc.name)
require.Zero(t, y, tc.name)
return
}

y, _, err := Retries(succeedAfter, 0, p, "test")
if tc.retryCount == 0 { // WLOG tc.neverSucceed == false
require.NoError(t, err, tc.name)
require.Equal(t, tc.succeedAfter, y-1, tc.name)
} else { // retryCount > 0 so doesn't retry forever
if tc.neverSucceed || tc.succeedAfter > tc.retryCount {
require.ErrorContains(t, err, fmt.Sprintf("%d", tc.retryCount), tc.name)
require.ErrorIs(t, err, errSucceedAfter, tc.name)
require.Zero(t, y, tc.name)
} else { // !tc.neverSucceed && succeedAfter <= retryCount
require.NoError(t, err, tc.name)
require.Equal(t, tc.succeedAfter, y-1, tc.name)
}
}
})

// run cases for RetriesNoOutput()
t.Run("RetriesNoOutput() "+tc.name, func(t *testing.T) {
t.Parallel()
ctx, ccf := context.WithCancelCause(context.Background())
p := &pipelineImpl{
ctx: ctx,
ccf: ccf,
logger: log.New(),
cfg: &data.Config{
RetryCount: tc.retryCount,
RetryDelay: 1 * time.Millisecond,
},
}
succeedAfterNoOutput := succeedAfterFactoryNoOutput(tc.succeedAfter, tc.neverSucceed)

if tc.retryCount == 0 && tc.neverSucceed {
// avoid infinite loop by cancelling the context

errChan := make(chan error)
go func() {
_, err := RetriesNoOutput(succeedAfterNoOutput, 0, p, "test")
errChan <- err
}()
time.Sleep(5 * time.Millisecond)
errTestCancelled := errors.New("test cancelled")
go func() {
ccf(errTestCancelled)
}()
err := <-errChan
require.ErrorIs(t, err, errTestCancelled, tc.name)
require.ErrorIs(t, err, errSucceedAfter, tc.name)
return
}

_, err := RetriesNoOutput(succeedAfterNoOutput, 0, p, "test")
if tc.retryCount == 0 { // WLOG tc.neverSucceed == false
require.NoError(t, err, tc.name)
} else { // retryCount > 0 so doesn't retry forever
if tc.neverSucceed || tc.succeedAfter > tc.retryCount {
require.ErrorContains(t, err, fmt.Sprintf("%d", tc.retryCount), tc.name)
require.ErrorIs(t, err, errSucceedAfter, tc.name)
} else { // !tc.neverSucceed && succeedAfter <= retryCount
require.NoError(t, err, tc.name)
}
}
})
}
}
14 changes: 3 additions & 11 deletions conduit/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,22 +467,18 @@ func (p *pipelineImpl) ImportHandler(importer importers.Importer, roundChan <-ch
totalSelectWait += waitTime
p.logger.Tracef("importer handler @ round %d received %s", rnd, waitTime)

blkDataPtr, importTime, lastError := Retries(importer.GetBlock, rnd, p, importer.Metadata().Name)
blkData, importTime, lastError := Retries(importer.GetBlock, rnd, p, importer.Metadata().Name)
if lastError != nil {
p.cancel(fmt.Errorf("importer %s handler (%w): failed to import round %d after %dms: %w", importer.Metadata().Name, errImporterCause, rnd, importTime.Milliseconds(), lastError))
return
}
if blkDataPtr == nil {
p.cancel(fmt.Errorf("importer %s handler (%w): receieved nil blkDataPtr. this should never happen! failed to import round", importer.Metadata().Name, errImporterCause))
return
}
metrics.ImporterTimeSeconds.Observe(importTime.Seconds())

feedStart := time.Now()
select {
case <-p.ctx.Done():
return
case blkOutChan <- *blkDataPtr:
case blkOutChan <- blkData:
waitTime := time.Since(feedStart)
totalFeedWait += waitTime
p.logger.Tracef("imported round %d into blkOutChan after waiting %dms on channel", rnd, waitTime.Milliseconds())
Expand Down Expand Up @@ -516,15 +512,11 @@ func (p *pipelineImpl) ProcessorHandler(idx int, proc processors.Processor, blkI
p.logger.Tracef("processor[%d] %s handler received block data for round %d after wait of %s", idx, proc.Metadata().Name, lastRnd, waitTime)
lastRnd = blkData.Round()

blkDataPtr, procTime, lastError := Retries(proc.Process, blkData, p, proc.Metadata().Name)
blkData, procTime, lastError := Retries(proc.Process, blkData, p, proc.Metadata().Name)
if lastError != nil {
p.cancel(fmt.Errorf("processor[%d] %s handler (%w): failed to process round %d after %dms: %w", idx, proc.Metadata().Name, errProcessorCause, lastRnd, procTime.Milliseconds(), lastError))
return
}
if blkDataPtr == nil {
p.cancel(fmt.Errorf("processor[%d] %s handler (%w): receieved nil blkDataPtr. this should never happen! failed to process round %d", idx, proc.Metadata().Name, errProcessorCause, lastRnd))
return
}
metrics.ProcessorTimeSeconds.WithLabelValues(proc.Metadata().Name).Observe(procTime.Seconds())

selectStart := time.Now()
Expand Down

0 comments on commit 42fa4fd

Please sign in to comment.