Skip to content

Commit

Permalink
Merge pull request #82 from keep-network/balance-monitoring-retries
Browse files Browse the repository at this point in the history
Balance monitoring retries

We want to use a retry mechanism for single executions of the balance check.
The retries will be performed during the period of retryTimeout, logging 
a warning on each error. When the timeout is hit an error will be logged and 
retries stopped. The next balance check will be triggered at the next tick.
  • Loading branch information
pdyraga authored Sep 2, 2021
2 parents cecc0d5 + 71ed95d commit adb7bbc
Show file tree
Hide file tree
Showing 6 changed files with 522 additions and 6 deletions.
6 changes: 5 additions & 1 deletion pkg/chain/celo/celoutil/balance_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package celoutil

import (
"context"
"time"

"github.com/celo-org/celo-blockchain/common"
"github.com/keep-network/keep-common/pkg/chain/celo"
"github.com/keep-network/keep-common/pkg/chain/ethlike"
"time"
)

// BalanceSource provides a balance info for the given address.
Expand Down Expand Up @@ -38,16 +39,19 @@ func NewBalanceMonitor(balanceSource BalanceSource) *BalanceMonitor {
// Observe starts a process which checks the address balance with the given
// tick and triggers an alert in case the balance falls below the
// alert threshold value.
// The balance check will be retried in case of an error up to the retry timeout.
func (bm *BalanceMonitor) Observe(
ctx context.Context,
address common.Address,
alertThreshold *celo.Wei,
tick time.Duration,
retryTimeout time.Duration,
) {
bm.delegate.Observe(
ctx,
ethlike.Address(address),
&alertThreshold.Token,
tick,
retryTimeout,
)
}
6 changes: 5 additions & 1 deletion pkg/chain/ethereum/ethutil/balance_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package ethutil

import (
"context"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/keep-network/keep-common/pkg/chain/ethereum"
"github.com/keep-network/keep-common/pkg/chain/ethlike"
"time"
)

// BalanceSource provides a balance info for the given address.
Expand Down Expand Up @@ -38,16 +39,19 @@ func NewBalanceMonitor(balanceSource BalanceSource) *BalanceMonitor {
// Observe starts a process which checks the address balance with the given
// tick and triggers an alert in case the balance falls below the
// alert threshold value.
// The balance check will be retried in case of an error up to the retry timeout.
func (bm *BalanceMonitor) Observe(
ctx context.Context,
address common.Address,
alertThreshold *ethereum.Wei,
tick time.Duration,
retryTimeout time.Duration,
) {
bm.delegate.Observe(
ctx,
ethlike.Address(address),
&alertThreshold.Token,
tick,
retryTimeout,
)
}
33 changes: 29 additions & 4 deletions pkg/chain/ethlike/balance_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package ethlike

import (
"context"
"fmt"
"time"

"github.com/keep-network/keep-common/pkg/wrappers"
)

// BalanceSource provides a balance info for the given address.
Expand All @@ -22,17 +25,26 @@ func NewBalanceMonitor(balanceSource BalanceSource) *BalanceMonitor {
// Observe starts a process which checks the address balance with the given
// tick and triggers an alert in case the balance falls below the
// alert threshold value.
// The balance check will be retried in case of an error up to the retry timeout.
func (bm *BalanceMonitor) Observe(
ctx context.Context,
address Address,
alertThreshold *Token,
tick time.Duration,
retryTimeout time.Duration,
) {
check := func() {
check := func(ctx context.Context) error {
balance, err := bm.balanceSource(address)
if err != nil {
logger.Errorf("balance monitor error: [%v]", err)
return
wrappedErr := fmt.Errorf(
"failed to get balance for account [%s]: [%w]",
address.TerminalString(),
err,
)

logger.Warning(wrappedErr)

return wrappedErr
}

if balance.Cmp(alertThreshold.Int) == -1 {
Expand All @@ -43,16 +55,29 @@ func (bm *BalanceMonitor) Observe(
alertThreshold.Text(10),
)
}

return nil
}

go func() {
ticker := time.NewTicker(tick)
defer ticker.Stop()

checkBalance := func() {
err := wrappers.DoWithDefaultRetry(retryTimeout, check)
if err != nil {
logger.Errorf("balance monitor error: [%v]", err)
}
}

// Initial balance check at monitoring start.
checkBalance()

for {
select {
// Balance check at ticks.
case <-ticker.C:
check()
checkBalance()
case <-ctx.Done():
return
}
Expand Down
59 changes: 59 additions & 0 deletions pkg/chain/ethlike/balance_monitor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package ethlike

import (
"context"
"fmt"
"math/big"
"sync"
"time"

"testing"

"github.com/ipfs/go-log"
)

func TestBalanceMonitor_Retries(t *testing.T) {
log.SetDebugLogging()

attemptsCount := 0
expectedAttempts := 3

wg := &sync.WaitGroup{}
wg.Add(expectedAttempts)

balanceSource := func(address Address) (*Token, error) {
attemptsCount++
wg.Done()

if attemptsCount < expectedAttempts {
return nil, fmt.Errorf("not this time")
}

return &Token{big.NewInt(10)}, nil
}

balanceMonitor := NewBalanceMonitor(balanceSource)

address := Address{1, 2}
alertThreshold := &Token{big.NewInt(15)}
tick := 1 * time.Minute
retryTimeout := 5 * time.Second

balanceMonitor.Observe(
context.Background(),
address,
alertThreshold,
tick,
retryTimeout,
)

wg.Wait()

if expectedAttempts != attemptsCount {
t.Errorf(
"unexpected retries count\nexpected: %d\nactual: %d",
expectedAttempts,
attemptsCount,
)
}
}
190 changes: 190 additions & 0 deletions pkg/wrappers/wrappers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
package wrappers

import (
"context"
"fmt"
"math/rand"
"time"
)

// DoWithRetry executes the provided doFn as long as it returns an error or until
// a timeout is hit. It applies exponential backoff wait of backoffTime * 2^n
// before nth retry of doFn. In case the calculated backoff is longer than
// backoffMax, the backoffMax wait is applied.
func DoWithRetry(
backoffTime time.Duration,
backoffMax time.Duration,
timeout time.Duration,
doFn func(ctx context.Context) error,
) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

var err error
for {
select {
case <-ctx.Done():
return fmt.Errorf(
"retry timeout [%v] exceeded; most recent error: [%w]",
timeout,
err,
)
default:
err = doFn(ctx)
if err == nil {
return nil
}

timedOut := backoffWait(ctx, backoffTime)
if timedOut {
return fmt.Errorf(
"retry timeout [%v] exceeded; most recent error: [%w]",
timeout,
err,
)
}

backoffTime = calculateBackoff(
backoffTime,
backoffMax,
)
}
}
}

const (
// DefaultDoBackoffTime is the default value of backoff time used by
// DoWithDefaultRetry function.
DefaultDoBackoffTime = 1 * time.Second

// DefaultDoMaxBackoffTime is the default value of max backoff time used by
// DoWithDefaultRetry function.
DefaultDoMaxBackoffTime = 120 * time.Second
)

// DoWithDefaultRetry executes the provided doFn as long as it returns an error or
// until a timeout is hit. It applies exponential backoff wait of
// DefaultBackoffTime * 2^n before nth retry of doFn. In case the calculated
// backoff is longer than DefaultMaxBackoffTime, the DefaultMaxBackoffTime is
// applied.
func DoWithDefaultRetry(
timeout time.Duration,
doFn func(ctx context.Context) error,
) error {
return DoWithRetry(
DefaultDoBackoffTime,
DefaultDoMaxBackoffTime,
timeout,
doFn,
)
}

// ConfirmWithTimeout executes the provided confirmFn until it returns true or
// until it fails or until a timeout is hit. It applies exponential backoff wait
// of backoffTime * 2^n before nth execution of confirmFn. In case the
// calculated backoff is longer than backoffMax, the backoffMax is applied.
// In case confirmFn returns an error, ConfirmWithTimeout exits with the same
// error immediately. This is different from DoWithRetry behavior as the use
// case for this function is different. ConfirmWithTimeout is intended to be
// used to confirm a chain state and not to try to enforce a successful
// execution of some function.
func ConfirmWithTimeout(
backoffTime time.Duration,
backoffMax time.Duration,
timeout time.Duration,
confirmFn func(ctx context.Context) (bool, error),
) (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

for {
select {
case <-ctx.Done():
return false, nil
default:
ok, err := confirmFn(ctx)
if err == nil && ok {
return true, nil
}
if err != nil {
return false, err
}

timedOut := backoffWait(ctx, backoffTime)
if timedOut {
return false, nil
}

backoffTime = calculateBackoff(
backoffTime,
backoffMax,
)
}
}
}

const (
// DefaultConfirmBackoffTime is the default value of backoff time used by
// ConfirmWithDefaultTimeout function.
DefaultConfirmBackoffTime = 5 * time.Second

// DefaultConfirmMaxBackoffTime is the default value of max backoff time
// used by ConfirmWithDefaultTimeout function.
DefaultConfirmMaxBackoffTime = 10 * time.Second
)

// ConfirmWithTimeoutDefaultBackoff executed the provided confirmFn until it
// returns true or until it fails or until timeout is hit. It applies
// backoff wait of DefaultConfirmBackoffTime * 2^n before nth execution of
// confirmFn. In case the calculated backoff is longer than
// DefaultConfirmMaxBackoffTime, DefaultConfirmMaxBackoffTime is applied.
// In case confirmFn returns an error, ConfirmWithTimeoutDefaultBackoff exits
// with the same error immediately. This is different from DoWithDefaultRetry
// behavior as the use case for this function is different.
// ConfirmWithTimeoutDefaultBackoff is intended to be used to confirm a chain
// state and not to try to enforce a successful execution of some function.
func ConfirmWithTimeoutDefaultBackoff(
timeout time.Duration,
confirmFn func(ctx context.Context) (bool, error),
) (bool, error) {
return ConfirmWithTimeout(
DefaultConfirmBackoffTime,
DefaultConfirmMaxBackoffTime,
timeout,
confirmFn,
)
}

func calculateBackoff(
backoffPrev time.Duration,
backoffMax time.Duration,
) time.Duration {
backoff := backoffPrev

backoff *= 2

// #nosec G404
// we are fine with not using cryptographically secure random integer,
// it is just exponential backoff jitter
r := rand.Int63n(backoff.Nanoseconds()/10 + 1)
jitter := time.Duration(r) * time.Nanosecond
backoff += jitter

if backoff > backoffMax {
backoff = backoffMax
}

return backoff
}

func backoffWait(ctx context.Context, waitTime time.Duration) bool {
timer := time.NewTimer(waitTime)
defer timer.Stop()

select {
case <-ctx.Done():
return true
case <-timer.C:
return false
}
}
Loading

0 comments on commit adb7bbc

Please sign in to comment.