From 931fd63c8a5756025264e9e20215bfe14a60939d Mon Sep 17 00:00:00 2001 From: Jakub Nowakowski Date: Wed, 1 Sep 2021 14:08:06 +0200 Subject: [PATCH 1/4] Add wrappers package The package was originally defined in keep-ecdsa repository https://github.com/keep-network/keep-ecdsa/blob/2fdf57d4e61f9dd313fb717c221605b5d812b187/pkg/utils/wrappers.go --- pkg/wrappers/wrappers.go | 190 +++++++++++++++++++++++++++ pkg/wrappers/wrappers_test.go | 234 ++++++++++++++++++++++++++++++++++ 2 files changed, 424 insertions(+) create mode 100644 pkg/wrappers/wrappers.go create mode 100644 pkg/wrappers/wrappers_test.go diff --git a/pkg/wrappers/wrappers.go b/pkg/wrappers/wrappers.go new file mode 100644 index 0000000..f874a38 --- /dev/null +++ b/pkg/wrappers/wrappers.go @@ -0,0 +1,190 @@ +package wrappers + +import ( + "context" + "fmt" + "math/rand" + "time" +) + +// DoWithRetry executes the provided doFn as long as it returns an error or until +// a timeout is hit. It applies exponential backoff wait of backoffTime * 2^n +// before nth retry of doFn. In case the calculated backoff is longer than +// backoffMax, the backoffMax wait is applied. +func DoWithRetry( + backoffTime time.Duration, + backoffMax time.Duration, + timeout time.Duration, + doFn func(ctx context.Context) error, +) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + var err error + for { + select { + case <-ctx.Done(): + return fmt.Errorf( + "retry timeout [%v] exceeded; most recent error: [%w]", + timeout, + err, + ) + default: + err = doFn(ctx) + if err == nil { + return nil + } + + timedOut := backoffWait(ctx, backoffTime) + if timedOut { + return fmt.Errorf( + "retry timeout [%v] exceeded; most recent error: [%w]", + timeout, + err, + ) + } + + backoffTime = calculateBackoff( + backoffTime, + backoffMax, + ) + } + } +} + +const ( + // DefaultDoBackoffTime is the default value of backoff time used by + // DoWithDefaultRetry function. + DefaultDoBackoffTime = 1 * time.Second + + // DefaultDoMaxBackoffTime is the default value of max backoff time used by + // DoWithDefaultRetry function. + DefaultDoMaxBackoffTime = 120 * time.Second +) + +// DoWithDefaultRetry executes the provided doFn as long as it returns an error or +// until a timeout is hit. It applies exponential backoff wait of +// DefaultBackoffTime * 2^n before nth retry of doFn. In case the calculated +// backoff is longer than DefaultMaxBackoffTime, the DefaultMaxBackoffTime is +// applied. +func DoWithDefaultRetry( + timeout time.Duration, + doFn func(ctx context.Context) error, +) error { + return DoWithRetry( + DefaultDoBackoffTime, + DefaultDoMaxBackoffTime, + timeout, + doFn, + ) +} + +// ConfirmWithTimeout executes the provided confirmFn until it returns true or +// until it fails or until a timeout is hit. It applies exponential backoff wait +// of backoffTime * 2^n before nth execution of confirmFn. In case the +// calculated backoff is longer than backoffMax, the backoffMax is applied. +// In case confirmFn returns an error, ConfirmWithTimeout exits with the same +// error immediately. This is different from DoWithRetry behavior as the use +// case for this function is different. ConfirmWithTimeout is intended to be +// used to confirm a chain state and not to try to enforce a successful +// execution of some function. +func ConfirmWithTimeout( + backoffTime time.Duration, + backoffMax time.Duration, + timeout time.Duration, + confirmFn func(ctx context.Context) (bool, error), +) (bool, error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + for { + select { + case <-ctx.Done(): + return false, nil + default: + ok, err := confirmFn(ctx) + if err == nil && ok { + return true, nil + } + if err != nil { + return false, err + } + + timedOut := backoffWait(ctx, backoffTime) + if timedOut { + return false, nil + } + + backoffTime = calculateBackoff( + backoffTime, + backoffMax, + ) + } + } +} + +const ( + // DefaultConfirmBackoffTime is the default value of backoff time used by + // ConfirmWithDefaultTimeout function. + DefaultConfirmBackoffTime = 5 * time.Second + + // DefaultConfirmMaxBackoffTime is the default value of max backoff time + // used by ConfirmWithDefaultTimeout function. + DefaultConfirmMaxBackoffTime = 10 * time.Second +) + +// ConfirmWithTimeoutDefaultBackoff executed the provided confirmFn until it +// returns true or until it fails or until timeout is hit. It applies +// backoff wait of DefaultConfirmBackoffTime * 2^n before nth execution of +// confirmFn. In case the calculated backoff is longer than +// DefaultConfirmMaxBackoffTime, DefaultConfirmMaxBackoffTime is applied. +// In case confirmFn returns an error, ConfirmWithTimeoutDefaultBackoff exits +// with the same error immediately. This is different from DoWithDefaultRetry +// behavior as the use case for this function is different. +// ConfirmWithTimeoutDefaultBackoff is intended to be used to confirm a chain +// state and not to try to enforce a successful execution of some function. +func ConfirmWithTimeoutDefaultBackoff( + timeout time.Duration, + confirmFn func(ctx context.Context) (bool, error), +) (bool, error) { + return ConfirmWithTimeout( + DefaultConfirmBackoffTime, + DefaultConfirmMaxBackoffTime, + timeout, + confirmFn, + ) +} + +func calculateBackoff( + backoffPrev time.Duration, + backoffMax time.Duration, +) time.Duration { + backoff := backoffPrev + + backoff *= 2 + + // #nosec G404 + // we are fine with not using cryptographically secure random integer, + // it is just exponential backoff jitter + r := rand.Int63n(backoff.Nanoseconds()/10 + 1) + jitter := time.Duration(r) * time.Nanosecond + backoff += jitter + + if backoff > backoffMax { + backoff = backoffMax + } + + return backoff +} + +func backoffWait(ctx context.Context, waitTime time.Duration) bool { + timer := time.NewTimer(waitTime) + defer timer.Stop() + + select { + case <-ctx.Done(): + return true + case <-timer.C: + return false + } +} diff --git a/pkg/wrappers/wrappers_test.go b/pkg/wrappers/wrappers_test.go new file mode 100644 index 0000000..463222b --- /dev/null +++ b/pkg/wrappers/wrappers_test.go @@ -0,0 +1,234 @@ +package wrappers + +import ( + "context" + "errors" + "fmt" + "testing" + "time" +) + +func TestDoWithRetry(t *testing.T) { + backoffTime := 10 * time.Millisecond + backoffMax := 100 * time.Millisecond + timeout := 2 * time.Second + + actualFailCount := 0 + expectedFailCount := 4 + doFn := func(ctx context.Context) error { + if actualFailCount < expectedFailCount { + actualFailCount++ + return fmt.Errorf("try again please") + } + + return nil + } + + err := DoWithRetry(backoffTime, backoffMax, timeout, doFn) + if err != nil { + t.Fatal(err) + } + + if actualFailCount != expectedFailCount { + t.Errorf( + "unexpected fail count: actual [%v], expected [%v]", + actualFailCount, + expectedFailCount, + ) + } +} + +func TestDoWithRetryExceedTimeout(t *testing.T) { + backoffTime := 100 * time.Millisecond + backoffMax := 500 * time.Millisecond + timeout := 1 * time.Second + + actualFailCount := 0 + + // This function should be executed 4 times and timeout: + // 100 ms + // 200 ms + // 400 ms + // 500 ms <- here it should exceed 1s timeout + doFn := func(ctx context.Context) error { + if actualFailCount < 10 { + actualFailCount++ + return fmt.Errorf("try again please %d", actualFailCount) + } + + return nil + } + + err := DoWithRetry(backoffTime, backoffMax, timeout, doFn) + if err == nil { + t.Fatal("expected a timeout error") + } + + expectedLatestError := "try again please 4" + expectedError := "retry timeout [1s] exceeded; most recent error: [try again please 4]" + if err.Error() != expectedError { + t.Errorf( + "unexpected error message\nactual: [%v]\nexpected: [%v]", + err.Error(), + expectedError, + ) + } + if errors.Unwrap(err).Error() != expectedLatestError { + t.Errorf( + "unexpected error message\nactual: [%v]\nexpected: [%v]", + err.Error(), + expectedError, + ) + } + + expectedFailCount := 4 + if actualFailCount != expectedFailCount { + t.Errorf( + "unexpected fail count: actual [%v], expected [%v]", + actualFailCount, + expectedFailCount, + ) + } +} + +func TestConfirmWithTimeout(t *testing.T) { + backoffTime := 10 * time.Millisecond + backoffMax := 100 * time.Millisecond + timeout := 2 * time.Second + + actualCheckCount := 0 + expectedCheckCount := 3 + confirmFn := func(ctx context.Context) (bool, error) { + if actualCheckCount < expectedCheckCount { + actualCheckCount++ + return false, nil + } + + return true, nil + } + + ok, err := ConfirmWithTimeout(backoffTime, backoffMax, timeout, confirmFn) + if err != nil { + t.Fatal(err) + } + if !ok { + t.Errorf("expected the check to eventually succeed") + } + + if actualCheckCount != expectedCheckCount { + t.Errorf( + "unexpected check count: actual [%v], expected [%v]", + actualCheckCount, + expectedCheckCount, + ) + } +} + +func TestConfirmWithTimeoutExceedTimeout(t *testing.T) { + backoffTime := 100 * time.Millisecond + backoffMax := 300 * time.Millisecond + timeout := 1 * time.Second + + actualCheckCount := 0 + + // This function should be executed 5 times and timeout: + // 100 ms + // 200 ms + // 300 ms + // 300 ms + // 300 ms <- here it should exceed 1s timeout + confirmFn := func(ctx context.Context) (bool, error) { + if actualCheckCount < 10 { + actualCheckCount++ + return false, nil + } + + return true, nil + } + + ok, err := ConfirmWithTimeout(backoffTime, backoffMax, timeout, confirmFn) + if err != nil { + t.Fatal(err) + } + if ok { + t.Errorf("expected the check to eventually fail") + } + + expectedCheckCount := 5 + if actualCheckCount != expectedCheckCount { + t.Errorf( + "unexpected check count: actual [%v], expected [%v]", + actualCheckCount, + expectedCheckCount, + ) + } +} + +func TestConfirmWithTimeoutFailure(t *testing.T) { + backoffTime := 100 * time.Millisecond + backoffMax := 300 * time.Millisecond + timeout := 1 * time.Second + + confirmFn := func(ctx context.Context) (bool, error) { + return false, fmt.Errorf("untada") + } + + ok, err := ConfirmWithTimeout(backoffTime, backoffMax, timeout, confirmFn) + if err == nil { + t.Fatal("expected an error") + } + if ok { + t.Errorf("should return false") + } + + expectedError := "untada" + if err.Error() != expectedError { + t.Errorf( + "unexpected error message\nactual: [%v]\nexpected: [%v]", + err.Error(), + expectedError, + ) + } +} + +func TestCalculateBackoff(t *testing.T) { + backoffInitial := 120 * time.Second + backoffMax := 300 * time.Second + + expectedMin := 240 * time.Second // 2 * backoffInitial + expectedMax := 265 * time.Second // 2 * backoffInitial * 110% + 1 + + for i := 0; i < 100; i++ { + backoff := calculateBackoff(backoffInitial, backoffMax) + + if backoff < expectedMin { + t.Errorf( + "backoff [%v] shorter than the expected minimum [%v]", + backoff, + expectedMin, + ) + } + + if backoff > expectedMax { + t.Errorf( + "backoff [%v] longer than the expected maximum [%v]", + backoff, + expectedMax, + ) + } + } +} + +func TestCalculateBackoffMax(t *testing.T) { + backoffInitial := 220 * time.Second + backoffMax := 300 * time.Second + + backoff := calculateBackoff(backoffInitial, backoffMax) + if backoff != backoffMax { + t.Errorf( + "expected max backoff of [%v]; has [%v]", + backoffMax, + backoff, + ) + } +} From 26fdfa2c50d110b0b3a73759a965d8a75b79579c Mon Sep 17 00:00:00 2001 From: Jakub Nowakowski Date: Wed, 1 Sep 2021 14:27:50 +0200 Subject: [PATCH 2/4] Retry balance checks on failures We want to use a retry mechanism for single executions of the balance check. The retries will be performed during the period of `retryTimeout`, logging a warning on each error. When the timeout is hit an error will be logged and retries stopped. A next balance check will be triggered at the tick. --- pkg/chain/celo/celoutil/balance_monitor.go | 6 ++++- pkg/chain/ethereum/ethutil/balance_monitor.go | 6 ++++- pkg/chain/ethlike/balance_monitor.go | 24 +++++++++++++++---- 3 files changed, 30 insertions(+), 6 deletions(-) diff --git a/pkg/chain/celo/celoutil/balance_monitor.go b/pkg/chain/celo/celoutil/balance_monitor.go index 76a49df..c8c5704 100644 --- a/pkg/chain/celo/celoutil/balance_monitor.go +++ b/pkg/chain/celo/celoutil/balance_monitor.go @@ -2,10 +2,11 @@ package celoutil import ( "context" + "time" + "github.com/celo-org/celo-blockchain/common" "github.com/keep-network/keep-common/pkg/chain/celo" "github.com/keep-network/keep-common/pkg/chain/ethlike" - "time" ) // BalanceSource provides a balance info for the given address. @@ -38,16 +39,19 @@ func NewBalanceMonitor(balanceSource BalanceSource) *BalanceMonitor { // Observe starts a process which checks the address balance with the given // tick and triggers an alert in case the balance falls below the // alert threshold value. +// The balance check will be retried in case of an error up to the retry timeout. func (bm *BalanceMonitor) Observe( ctx context.Context, address common.Address, alertThreshold *celo.Wei, tick time.Duration, + retryTimeout time.Duration, ) { bm.delegate.Observe( ctx, ethlike.Address(address), &alertThreshold.Token, tick, + retryTimeout, ) } diff --git a/pkg/chain/ethereum/ethutil/balance_monitor.go b/pkg/chain/ethereum/ethutil/balance_monitor.go index f540e51..e26f478 100644 --- a/pkg/chain/ethereum/ethutil/balance_monitor.go +++ b/pkg/chain/ethereum/ethutil/balance_monitor.go @@ -2,10 +2,11 @@ package ethutil import ( "context" + "time" + "github.com/ethereum/go-ethereum/common" "github.com/keep-network/keep-common/pkg/chain/ethereum" "github.com/keep-network/keep-common/pkg/chain/ethlike" - "time" ) // BalanceSource provides a balance info for the given address. @@ -38,16 +39,19 @@ func NewBalanceMonitor(balanceSource BalanceSource) *BalanceMonitor { // Observe starts a process which checks the address balance with the given // tick and triggers an alert in case the balance falls below the // alert threshold value. +// The balance check will be retried in case of an error up to the retry timeout. func (bm *BalanceMonitor) Observe( ctx context.Context, address common.Address, alertThreshold *ethereum.Wei, tick time.Duration, + retryTimeout time.Duration, ) { bm.delegate.Observe( ctx, ethlike.Address(address), &alertThreshold.Token, tick, + retryTimeout, ) } diff --git a/pkg/chain/ethlike/balance_monitor.go b/pkg/chain/ethlike/balance_monitor.go index 05f83a8..2a89eeb 100644 --- a/pkg/chain/ethlike/balance_monitor.go +++ b/pkg/chain/ethlike/balance_monitor.go @@ -2,7 +2,10 @@ package ethlike import ( "context" + "fmt" "time" + + "github.com/keep-network/keep-common/pkg/wrappers" ) // BalanceSource provides a balance info for the given address. @@ -22,17 +25,26 @@ func NewBalanceMonitor(balanceSource BalanceSource) *BalanceMonitor { // Observe starts a process which checks the address balance with the given // tick and triggers an alert in case the balance falls below the // alert threshold value. +// The balance check will be retried in case of an error up to the retry timeout. func (bm *BalanceMonitor) Observe( ctx context.Context, address Address, alertThreshold *Token, tick time.Duration, + retryTimeout time.Duration, ) { - check := func() { + check := func(ctx context.Context) error { balance, err := bm.balanceSource(address) if err != nil { - logger.Errorf("balance monitor error: [%v]", err) - return + wrappedErr := fmt.Errorf( + "failed to get balance for account [%s]: [%w]", + address.TerminalString(), + err, + ) + + logger.Warning(wrappedErr) + + return wrappedErr } if balance.Cmp(alertThreshold.Int) == -1 { @@ -43,6 +55,8 @@ func (bm *BalanceMonitor) Observe( alertThreshold.Text(10), ) } + + return nil } go func() { @@ -52,7 +66,9 @@ func (bm *BalanceMonitor) Observe( for { select { case <-ticker.C: - check() + err := wrappers.DoWithDefaultRetry(retryTimeout, check) + + logger.Errorf("balance monitor error: [%v]", err) case <-ctx.Done(): return } From f17cc48d707751c7689817e49f9365dc3ef1bbe0 Mon Sep 17 00:00:00 2001 From: Jakub Nowakowski Date: Wed, 1 Sep 2021 17:24:31 +0200 Subject: [PATCH 3/4] Check balance at monitoring start We added a check execution at start of the monitoring. Previously we had to wait for ticker to invoke the first check. --- pkg/chain/ethlike/balance_monitor.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/pkg/chain/ethlike/balance_monitor.go b/pkg/chain/ethlike/balance_monitor.go index 2a89eeb..bfec9c6 100644 --- a/pkg/chain/ethlike/balance_monitor.go +++ b/pkg/chain/ethlike/balance_monitor.go @@ -63,12 +63,21 @@ func (bm *BalanceMonitor) Observe( ticker := time.NewTicker(tick) defer ticker.Stop() + checkBalance := func() { + err := wrappers.DoWithDefaultRetry(retryTimeout, check) + if err != nil { + logger.Errorf("balance monitor error: [%v]", err) + } + } + + // Initial balance check at monitoring start. + checkBalance() + for { select { + // Balance check at ticks. case <-ticker.C: - err := wrappers.DoWithDefaultRetry(retryTimeout, check) - - logger.Errorf("balance monitor error: [%v]", err) + checkBalance() case <-ctx.Done(): return } From 71ed95dda8ed4029c770a0050714cd4a4be813b5 Mon Sep 17 00:00:00 2001 From: Jakub Nowakowski Date: Wed, 1 Sep 2021 17:25:23 +0200 Subject: [PATCH 4/4] Add simple test for Balance Monitor The test checks calls to the balance source function. In the future we should check logged messages. --- pkg/chain/ethlike/balance_monitor_test.go | 59 +++++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 pkg/chain/ethlike/balance_monitor_test.go diff --git a/pkg/chain/ethlike/balance_monitor_test.go b/pkg/chain/ethlike/balance_monitor_test.go new file mode 100644 index 0000000..6438cd3 --- /dev/null +++ b/pkg/chain/ethlike/balance_monitor_test.go @@ -0,0 +1,59 @@ +package ethlike + +import ( + "context" + "fmt" + "math/big" + "sync" + "time" + + "testing" + + "github.com/ipfs/go-log" +) + +func TestBalanceMonitor_Retries(t *testing.T) { + log.SetDebugLogging() + + attemptsCount := 0 + expectedAttempts := 3 + + wg := &sync.WaitGroup{} + wg.Add(expectedAttempts) + + balanceSource := func(address Address) (*Token, error) { + attemptsCount++ + wg.Done() + + if attemptsCount < expectedAttempts { + return nil, fmt.Errorf("not this time") + } + + return &Token{big.NewInt(10)}, nil + } + + balanceMonitor := NewBalanceMonitor(balanceSource) + + address := Address{1, 2} + alertThreshold := &Token{big.NewInt(15)} + tick := 1 * time.Minute + retryTimeout := 5 * time.Second + + balanceMonitor.Observe( + context.Background(), + address, + alertThreshold, + tick, + retryTimeout, + ) + + wg.Wait() + + if expectedAttempts != attemptsCount { + t.Errorf( + "unexpected retries count\nexpected: %d\nactual: %d", + expectedAttempts, + attemptsCount, + ) + } +}