diff --git a/clock/clock.go b/clock/clock.go index f78ed070c..b527ea462 100644 --- a/clock/clock.go +++ b/clock/clock.go @@ -10,12 +10,12 @@ func NewSystemClock() *SystemClock { return &SystemClock{} } -// Now returns now time.Time with UTC location. +// NowUTC returns now time.Time with UTC location. func (t *SystemClock) NowUTC() time.Time { return time.Now().UTC() } -// Now returns now time.Time as a Unix time. +// NowUnix returns current time in a Unix time format. func (t *SystemClock) NowUnix() int64 { return time.Now().Unix() } diff --git a/cmd/config.go b/cmd/config.go index 0d033cf76..8bc59b07d 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -7,6 +7,7 @@ import ( "strings" "github.com/moira-alert/moira" + "github.com/moira-alert/moira/metric_source/retries" "github.com/moira-alert/moira/metrics" "github.com/moira-alert/moira/image_store/s3" @@ -230,15 +231,54 @@ type remoteCommon interface { getRemoteCommon() *RemoteCommonConfig } +// RetriesConfig is a settings for retry policy when performing requests to remote sources. +// Stop retrying when ONE of the following conditions is satisfied: +// - Time passed since first try is greater than MaxElapsedTime; +// - Already MaxRetriesCount done. +type RetriesConfig struct { + // InitialInterval between requests. + InitialInterval string `yaml:"initial_interval"` + // RandomizationFactor is used in exponential backoff to add some randomization + // when calculating next interval between requests. + // It will be used in multiplication like: + // RandomizedInterval = RetryInterval * (random value in range [1 - RandomizationFactor, 1 + RandomizationFactor]) + RandomizationFactor float64 `yaml:"randomization_factor"` + // Each new RetryInterval will be multiplied on Multiplier. + Multiplier float64 `yaml:"multiplier"` + // MaxInterval is the cap for RetryInterval. Note that it doesn't cap the RandomizedInterval. + MaxInterval string `yaml:"max_interval"` + // MaxElapsedTime caps the time passed from first try. If time passed is greater than MaxElapsedTime than stop retrying. + MaxElapsedTime string `yaml:"max_elapsed_time"` + // MaxRetriesCount is the amount of allowed retries. So at most MaxRetriesCount will be performed. + MaxRetriesCount uint64 `yaml:"max_retries_count"` +} + +func (config RetriesConfig) getRetriesSettings() retries.Config { + return retries.Config{ + InitialInterval: to.Duration(config.InitialInterval), + RandomizationFactor: config.RandomizationFactor, + Multiplier: config.Multiplier, + MaxInterval: to.Duration(config.MaxInterval), + MaxElapsedTime: to.Duration(config.MaxElapsedTime), + MaxRetriesCount: config.MaxRetriesCount, + } +} + // GraphiteRemoteConfig is remote graphite settings structure. type GraphiteRemoteConfig struct { RemoteCommonConfig `yaml:",inline"` - // Timeout for remote requests + // Timeout for remote requests. Timeout string `yaml:"timeout"` - // Username for basic auth + // Username for basic auth. User string `yaml:"user"` - // Password for basic auth + // Password for basic auth. Password string `yaml:"password"` + // Retries configuration for general requests to remote graphite. + Retries RetriesConfig `yaml:"retries"` + // HealthcheckTimeout is timeout for remote api health check requests. + HealthcheckTimeout string `yaml:"health_check_timeout"` + // HealthCheckRetries configuration for healthcheck requests to remote graphite. + HealthCheckRetries RetriesConfig `yaml:"health_check_retries"` } func (config GraphiteRemoteConfig) getRemoteCommon() *RemoteCommonConfig { @@ -248,12 +288,15 @@ func (config GraphiteRemoteConfig) getRemoteCommon() *RemoteCommonConfig { // GetRemoteSourceSettings returns remote config parsed from moira config files. func (config *GraphiteRemoteConfig) GetRemoteSourceSettings() *graphiteRemoteSource.Config { return &graphiteRemoteSource.Config{ - URL: config.URL, - CheckInterval: to.Duration(config.CheckInterval), - MetricsTTL: to.Duration(config.MetricsTTL), - Timeout: to.Duration(config.Timeout), - User: config.User, - Password: config.Password, + URL: config.URL, + CheckInterval: to.Duration(config.CheckInterval), + MetricsTTL: to.Duration(config.MetricsTTL), + Timeout: to.Duration(config.Timeout), + User: config.User, + Password: config.Password, + Retries: config.Retries.getRetriesSettings(), + HealthcheckTimeout: to.Duration(config.HealthcheckTimeout), + HealthcheckRetries: config.HealthCheckRetries.getRetriesSettings(), } } diff --git a/go.mod b/go.mod index b309808bd..ad157caf0 100644 --- a/go.mod +++ b/go.mod @@ -171,6 +171,7 @@ require ( github.com/Masterminds/goutils v1.1.1 // indirect github.com/Masterminds/semver/v3 v3.2.1 // indirect github.com/blang/semver/v4 v4.0.0 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/dyatlov/go-opengraph/opengraph v0.0.0-20220524092352-606d7b1e5f8a // indirect github.com/fatih/color v1.16.0 // indirect github.com/go-openapi/jsonpointer v0.20.0 // indirect diff --git a/go.sum b/go.sum index a013bebd9..74558b6da 100644 --- a/go.sum +++ b/go.sum @@ -160,6 +160,8 @@ github.com/bwmarrin/discordgo v0.25.0 h1:NXhdfHRNxtwso6FPdzW2i3uBvvU7UIQTghmV2T4 github.com/bwmarrin/discordgo v0.25.0/go.mod h1:NJZpH+1AfhIcyQsPeuBKsUtYrRnjkyu0kIVMCHkZtRY= github.com/carlosdp/twiliogo v0.0.0-20161027183705-b26045ebb9d1 h1:hXakhQtPnXH839q1pBl/GqfTSchqE+R5Fqn98Iu7UQM= github.com/carlosdp/twiliogo v0.0.0-20161027183705-b26045ebb9d1/go.mod h1:pAxCBpjl/0JxYZlWGP/Dyi8f/LQSCQD2WAsG/iNzqQ8= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= diff --git a/local/api.yml b/local/api.yml index 631240d4e..d852a09b5 100644 --- a/local/api.yml +++ b/local/api.yml @@ -17,8 +17,21 @@ graphite_remote: cluster_name: Graphite 1 url: "http://graphite:80/render" check_interval: 60s - timeout: 60s metrics_ttl: 168h + timeout: 60s + retries: + initial_interval: 60s + randomization_factor: 0.5 + multiplier: 1.5 + max_interval: 120s + max_retries_count: 3 + health_check_timeout: 6s + health_check_retries: + initial_interval: 20s + randomization_factor: 0.5 + multiplier: 1.5 + max_interval: 80s + max_retries_count: 3 prometheus_remote: - cluster_id: default cluster_name: Prometheus 1 diff --git a/local/checker.yml b/local/checker.yml index 71ff47e40..30f7e0bb9 100644 --- a/local/checker.yml +++ b/local/checker.yml @@ -19,8 +19,21 @@ graphite_remote: cluster_name: Graphite 1 url: "http://graphite:80/render" check_interval: 60s - timeout: 60s metrics_ttl: 168h + timeout: 60s + retries: + initial_interval: 60s + randomization_factor: 0.5 + multiplier: 1.5 + max_interval: 120s + max_retries_count: 3 + health_check_timeout: 6s + health_check_retries: + initial_interval: 20s + randomization_factor: 0.5 + multiplier: 1.5 + max_interval: 80s + max_retries_count: 3 prometheus_remote: - cluster_id: default cluster_name: Prometheus 1 diff --git a/local/notifier.yml b/local/notifier.yml index 6a5cf57a7..d23f24955 100644 --- a/local/notifier.yml +++ b/local/notifier.yml @@ -17,8 +17,21 @@ graphite_remote: cluster_name: Graphite 1 url: "http://graphite:80/render" check_interval: 60s - timeout: 60s metrics_ttl: 168h + timeout: 60s + retries: + initial_interval: 60s + randomization_factor: 0.5 + multiplier: 1.5 + max_interval: 120s + max_retries_count: 3 + health_check_timeout: 6s + health_check_retries: + initial_interval: 20s + randomization_factor: 0.5 + multiplier: 1.5 + max_interval: 80s + max_retries_count: 3 prometheus_remote: - cluster_id: default cluster_name: Prometheus 1 diff --git a/metric_source/remote/config.go b/metric_source/remote/config.go index 06ca22af4..4eb7f43c3 100644 --- a/metric_source/remote/config.go +++ b/metric_source/remote/config.go @@ -1,13 +1,53 @@ package remote -import "time" +import ( + "errors" + "time" + + "github.com/moira-alert/moira/metric_source/retries" +) // Config represents config from remote storage. type Config struct { - URL string - CheckInterval time.Duration - MetricsTTL time.Duration - Timeout time.Duration - User string - Password string + URL string + CheckInterval time.Duration + MetricsTTL time.Duration + Timeout time.Duration + User string + Password string + HealthcheckTimeout time.Duration + Retries retries.Config + HealthcheckRetries retries.Config +} + +var ( + errBadRemoteUrl = errors.New("remote graphite URL should not be empty") + errNoTimeout = errors.New("timeout must be specified and can't be 0") + errNoHealthcheckTimeout = errors.New("healthcheck_timeout must be specified and can't be 0") +) + +func (conf Config) validate() error { + resErrors := make([]error, 0) + + if conf.URL == "" { + resErrors = append(resErrors, errBadRemoteUrl) + } + + if conf.Timeout == 0 { + resErrors = append(resErrors, errNoTimeout) + } + + if conf.HealthcheckTimeout == 0 { + resErrors = append(resErrors, errNoHealthcheckTimeout) + } + + if errRetriesValidate := conf.Retries.Validate(); errRetriesValidate != nil { + resErrors = append(resErrors, errRetriesValidate) + } + + if errHealthcheckRetriesValidate := conf.HealthcheckRetries.Validate(); errHealthcheckRetriesValidate != nil { + resErrors = append(resErrors, errHealthcheckRetriesValidate) + } + + return errors.Join(resErrors...) } diff --git a/metric_source/remote/config_test.go b/metric_source/remote/config_test.go new file mode 100644 index 000000000..3d58fd270 --- /dev/null +++ b/metric_source/remote/config_test.go @@ -0,0 +1,88 @@ +package remote + +import ( + "errors" + "fmt" + "testing" + "time" + + "github.com/moira-alert/moira/metric_source/retries" + + . "github.com/smartystreets/goconvey/convey" +) + +func TestConfig_validate(t *testing.T) { + Convey("Test validating retries config", t, func() { + type testcase struct { + caseDesc string + conf Config + expectedErr error + } + + var ( + testInitialInterval = time.Second * 5 + testMaxInterval = time.Second * 10 + testRetriesCount uint64 = 10 + ) + + testRetriesConf := retries.Config{ + InitialInterval: testInitialInterval, + MaxInterval: testMaxInterval, + MaxRetriesCount: testRetriesCount, + } + + cases := []testcase{ + { + caseDesc: "with empty config", + conf: Config{}, + expectedErr: errors.Join(errBadRemoteUrl, errNoTimeout, errNoHealthcheckTimeout, retries.Config{}.Validate(), retries.Config{}.Validate()), + }, + { + caseDesc: "with retries config set", + conf: Config{ + Retries: testRetriesConf, + HealthcheckRetries: testRetriesConf, + }, + expectedErr: errors.Join(errBadRemoteUrl, errNoTimeout, errNoHealthcheckTimeout), + }, + { + caseDesc: "with retries config set and some url", + conf: Config{ + URL: "http://test-graphite", + Retries: testRetriesConf, + HealthcheckRetries: testRetriesConf, + }, + expectedErr: errors.Join(errNoTimeout, errNoHealthcheckTimeout), + }, + { + caseDesc: "with retries config set, some url, timeout", + conf: Config{ + Timeout: time.Second, + URL: "http://test-graphite", + Retries: testRetriesConf, + HealthcheckRetries: testRetriesConf, + }, + expectedErr: errors.Join(errNoHealthcheckTimeout), + }, + { + caseDesc: "with valid config", + conf: Config{ + Timeout: time.Second, + HealthcheckTimeout: time.Millisecond, + URL: "http://test-graphite", + Retries: testRetriesConf, + HealthcheckRetries: testRetriesConf, + }, + expectedErr: nil, + }, + } + + for i := range cases { + Convey(fmt.Sprintf("Case %d: %s", i+1, cases[i].caseDesc), func() { + err := cases[i].conf.validate() + + So(err, ShouldResemble, cases[i].expectedErr) + }) + } + }) +} diff --git a/metric_source/remote/remote.go b/metric_source/remote/remote.go index 853a96240..274a6630f 100644 --- a/metric_source/remote/remote.go +++ b/metric_source/remote/remote.go @@ -1,17 +1,15 @@ package remote import ( - "fmt" "net/http" "time" + "github.com/moira-alert/moira/metric_source/retries" + "github.com/moira-alert/moira" metricSource "github.com/moira-alert/moira/metric_source" ) -// ErrRemoteStorageDisabled is used to prevent remote.Fetch calls when remote storage is disabled. -var ErrRemoteStorageDisabled = fmt.Errorf("remote graphite storage is not enabled") - // ErrRemoteTriggerResponse is a custom error when remote trigger check fails. type ErrRemoteTriggerResponse struct { InternalError error @@ -23,20 +21,38 @@ func (err ErrRemoteTriggerResponse) Error() string { return err.InternalError.Error() } +// ErrRemoteUnavailable is a custom error when remote trigger check fails. +type ErrRemoteUnavailable struct { + InternalError error + Target string +} + +// Error is a representation of Error interface method. +func (err ErrRemoteUnavailable) Error() string { + return err.InternalError.Error() +} + // Remote is implementation of MetricSource interface, which implements fetch metrics method from remote graphite installation. type Remote struct { - config *Config - client *http.Client + config *Config + client *http.Client + retrier retries.Retrier[[]byte] + requestBackoffFactory retries.BackoffFactory + healthcheckBackoffFactory retries.BackoffFactory } // Create configures remote metric source. func Create(config *Config) (metricSource.MetricSource, error) { - if config.URL == "" { - return nil, fmt.Errorf("remote graphite URL should not be empty") + if err := config.validate(); err != nil { + return nil, err } + return &Remote{ - config: config, - client: &http.Client{Timeout: config.Timeout}, + config: config, + client: &http.Client{}, + retrier: retries.NewStandardRetrier[[]byte](), + requestBackoffFactory: retries.NewExponentialBackoffFactory(config.Retries), + healthcheckBackoffFactory: retries.NewExponentialBackoffFactory(config.HealthcheckRetries), }, nil } @@ -53,13 +69,12 @@ func (remote *Remote) Fetch(target string, from, until int64, allowRealTimeAlert Target: target, } } - body, err := remote.makeRequest(req) + + body, err := remote.makeRequest(req, remote.config.Timeout, remote.requestBackoffFactory.NewBackOff()) if err != nil { - return nil, ErrRemoteTriggerResponse{ - InternalError: err, - Target: target, - } + return nil, internalErrToPublicErr(err, target) } + resp, err := decodeBody(body) if err != nil { return nil, ErrRemoteTriggerResponse{ @@ -67,6 +82,7 @@ func (remote *Remote) Fetch(target string, from, until int64, allowRealTimeAlert Target: target, } } + fetchResult := convertResponse(resp, allowRealTimeAlerting) return &fetchResult, nil } @@ -76,25 +92,18 @@ func (remote *Remote) GetMetricsTTLSeconds() int64 { return int64(remote.config.MetricsTTL.Seconds()) } -// IsConfigured returns false in cases that user does not properly configure remote settings like graphite URL. -func (remote *Remote) IsConfigured() (bool, error) { - return true, nil -} - -// IsRemoteAvailable checks if graphite API is available and returns 200 response. +// IsAvailable checks if graphite API is available and returns 200 response. func (remote *Remote) IsAvailable() (bool, error) { - maxRetries := 3 until := time.Now().Unix() from := until - 600 //nolint + req, err := remote.prepareRequest(from, until, "NonExistingTarget") if err != nil { return false, err } - for attempt := 0; attempt < maxRetries; attempt++ { - _, err = remote.makeRequest(req) - if err == nil { - return true, nil - } - } - return false, err + + _, err = remote.makeRequest(req, remote.config.HealthcheckTimeout, remote.healthcheckBackoffFactory.NewBackOff()) + err = internalErrToPublicErr(err, "") + + return !isRemoteUnavailable(err), err } diff --git a/metric_source/remote/remote_test.go b/metric_source/remote/remote_test.go index fd4539572..ecbe47764 100644 --- a/metric_source/remote/remote_test.go +++ b/metric_source/remote/remote_test.go @@ -4,26 +4,128 @@ import ( "fmt" "net/http" "testing" + "time" metricSource "github.com/moira-alert/moira/metric_source" + "github.com/moira-alert/moira/metric_source/retries" + . "github.com/smartystreets/goconvey/convey" ) -func TestIsRemoteAvailable(t *testing.T) { - Convey("Is available", t, func() { - server := createServer([]byte("Some string"), http.StatusOK) - remote := Remote{client: server.Client(), config: &Config{URL: server.URL}} - isAvailable, err := remote.IsAvailable() - So(isAvailable, ShouldBeTrue) - So(err, ShouldBeEmpty) +var testConfigs = []*Config{ + { + Timeout: time.Second, + Retries: retries.Config{ + InitialInterval: time.Millisecond, + RandomizationFactor: 0.5, + Multiplier: 2, + MaxInterval: time.Millisecond * 20, + MaxRetriesCount: 2, + }, + }, + { + Timeout: time.Millisecond * 200, + Retries: retries.Config{ + InitialInterval: time.Millisecond, + RandomizationFactor: 0.5, + Multiplier: 2, + MaxInterval: time.Second, + MaxElapsedTime: time.Second * 2, + }, + }, +} + +func TestIsAvailable(t *testing.T) { + body := []byte("Some string") + + isAvailableTestConfigs := make([]*Config, 0, len(testConfigs)) + for _, conf := range testConfigs { + isAvailableTestConfigs = append(isAvailableTestConfigs, &Config{ + HealthcheckTimeout: conf.Timeout, + HealthcheckRetries: conf.Retries, + }) + } + + retrier := retries.NewStandardRetrier[[]byte]() + + Convey("Given server returns OK response the remote is available", t, func() { + server := createServer(body, http.StatusOK) + defer server.Close() + + for _, config := range isAvailableTestConfigs { + config.URL = server.URL + + remote := Remote{ + client: server.Client(), + config: config, + retrier: retrier, + healthcheckBackoffFactory: retries.NewExponentialBackoffFactory(config.HealthcheckRetries), + } + + isAvailable, err := remote.IsAvailable() + So(isAvailable, ShouldBeTrue) + So(err, ShouldBeEmpty) + } }) - Convey("Not available", t, func() { - server := createServer([]byte("Some string"), http.StatusInternalServerError) - remote := Remote{client: server.Client(), config: &Config{URL: server.URL}} - isAvailable, err := remote.IsAvailable() - So(isAvailable, ShouldBeFalse) - So(err, ShouldResemble, fmt.Errorf("bad response status %d: %s", http.StatusInternalServerError, "Some string")) + Convey("Given server returns Remote Unavailable responses permanently", t, func() { + for statusCode := range remoteUnavailableStatusCodes { + server := createTestServer(TestResponse{body, statusCode}) + + Convey(fmt.Sprintf( + "request failed with %d response status code and remote is unavailable", statusCode, + ), func() { + for _, config := range isAvailableTestConfigs { + config.URL = server.URL + + remote := Remote{ + client: server.Client(), + config: config, + retrier: retrier, + healthcheckBackoffFactory: retries.NewExponentialBackoffFactory(config.HealthcheckRetries), + } + + isAvailable, err := remote.IsAvailable() + So(err, ShouldResemble, ErrRemoteUnavailable{ + InternalError: fmt.Errorf( + "the remote server is not available. Response status %d: %s", statusCode, string(body), + ), + }) + So(isAvailable, ShouldBeFalse) + } + }) + + server.Close() + } + }) + + Convey("Given server returns Remote Unavailable response temporary", t, func() { + for statusCode := range remoteUnavailableStatusCodes { + Convey(fmt.Sprintf( + "the remote is available with retry after %d response", statusCode, + ), func() { + for _, config := range isAvailableTestConfigs { + server := createTestServer( + TestResponse{body, statusCode}, + TestResponse{body, http.StatusOK}, + ) + config.URL = server.URL + + remote := Remote{ + client: server.Client(), + config: config, + retrier: retrier, + healthcheckBackoffFactory: retries.NewExponentialBackoffFactory(config.HealthcheckRetries), + } + + isAvailable, err := remote.IsAvailable() + So(err, ShouldBeNil) + So(isAvailable, ShouldBeTrue) + + server.Close() + } + }) + } }) } @@ -32,9 +134,22 @@ func TestFetch(t *testing.T) { var until int64 = 500 target := "foo.bar" //nolint + retrier := retries.NewStandardRetrier[[]byte]() + validBody := []byte("[{\"Target\": \"t1\",\"DataPoints\":[[1,2],[3,4]]}]") + Convey("Request success but body is invalid", t, func() { server := createServer([]byte("[]"), http.StatusOK) - remote := Remote{client: server.Client(), config: &Config{URL: server.URL}} + + conf := testConfigs[0] + conf.URL = server.URL + + remote := Remote{ + client: server.Client(), + config: conf, + retrier: retrier, + requestBackoffFactory: retries.NewExponentialBackoffFactory(conf.Retries), + } + result, err := remote.Fetch(target, from, until, false) So(result, ShouldResemble, &FetchResult{MetricsData: []metricSource.MetricData{}}) So(err, ShouldBeEmpty) @@ -42,25 +157,129 @@ func TestFetch(t *testing.T) { Convey("Request success but body is invalid", t, func() { server := createServer([]byte("Some string"), http.StatusOK) - remote := Remote{client: server.Client(), config: &Config{URL: server.URL}} + defer server.Close() + + conf := testConfigs[0] + conf.URL = server.URL + + remote := Remote{ + client: server.Client(), + config: conf, + retrier: retrier, + requestBackoffFactory: retries.NewExponentialBackoffFactory(conf.Retries), + } + result, err := remote.Fetch(target, from, until, false) So(result, ShouldBeEmpty) So(err.Error(), ShouldResemble, "invalid character 'S' looking for beginning of value") + So(err, ShouldHaveSameTypeAs, ErrRemoteTriggerResponse{}) }) Convey("Fail request with InternalServerError", t, func() { server := createServer([]byte("Some string"), http.StatusInternalServerError) - remote := Remote{client: server.Client(), config: &Config{URL: server.URL}} - result, err := remote.Fetch(target, from, until, false) - So(result, ShouldBeEmpty) - So(err.Error(), ShouldResemble, fmt.Sprintf("bad response status %d: %s", http.StatusInternalServerError, "Some string")) + defer server.Close() + + for _, config := range testConfigs { + config.URL = server.URL + + remote := Remote{ + client: server.Client(), + config: config, + retrier: retrier, + requestBackoffFactory: retries.NewExponentialBackoffFactory(config.Retries), + } + + result, err := remote.Fetch(target, from, until, false) + + So(result, ShouldBeEmpty) + So(err.Error(), ShouldResemble, fmt.Sprintf("bad response status %d: %s", http.StatusInternalServerError, "Some string")) + So(err, ShouldHaveSameTypeAs, ErrRemoteTriggerResponse{}) + } }) - Convey("Fail make request", t, func() { + Convey("Client calls bad url", t, func() { + server := createTestServer(TestResponse{[]byte("Some string"), http.StatusOK}) + defer server.Close() + url := "💩%$&TR" - remote := Remote{config: &Config{URL: url}} - result, err := remote.Fetch(target, from, until, false) - So(result, ShouldBeEmpty) - So(err.Error(), ShouldResemble, "parse \"💩%$&TR\": invalid URL escape \"%$&\"") + + for _, config := range testConfigs { + config.URL = url + + remote := Remote{ + client: server.Client(), + config: config, + retrier: retrier, + requestBackoffFactory: retries.NewExponentialBackoffFactory(config.Retries), + } + + result, err := remote.Fetch(target, from, until, false) + So(result, ShouldBeEmpty) + So(err.Error(), ShouldResemble, "parse \"💩%$&TR\": invalid URL escape \"%$&\"") + So(err, ShouldHaveSameTypeAs, ErrRemoteTriggerResponse{}) + } + }) + + Convey("Given server returns Remote Unavailable responses permanently", t, func() { + for statusCode := range remoteUnavailableStatusCodes { + server := createTestServer(TestResponse{validBody, statusCode}) + + Convey(fmt.Sprintf( + "request failed with %d response status code and remote is unavailable", statusCode, + ), func() { + for _, config := range testConfigs { + config.URL = server.URL + remote := Remote{ + client: server.Client(), + config: config, + retrier: retrier, + requestBackoffFactory: retries.NewExponentialBackoffFactory(config.Retries), + } + + result, err := remote.Fetch(target, from, until, false) + So(err, ShouldResemble, ErrRemoteUnavailable{ + InternalError: fmt.Errorf( + "the remote server is not available. Response status %d: %s", statusCode, string(validBody), + ), Target: target, + }) + So(result, ShouldBeNil) + } + }) + + server.Close() + } + }) + + Convey("Given server returns Remote Unavailable response temporary", t, func() { + for statusCode := range remoteUnavailableStatusCodes { + Convey(fmt.Sprintf( + "the remote is available with retry after %d response", statusCode, + ), func() { + for _, config := range testConfigs { + server := createTestServer( + TestResponse{validBody, statusCode}, + TestResponse{validBody, http.StatusOK}, + ) + config.URL = server.URL + + remote := Remote{ + client: server.Client(), + config: config, + retrier: retrier, + requestBackoffFactory: retries.NewExponentialBackoffFactory(config.Retries), + } + + result, err := remote.Fetch(target, from, until, false) + So(err, ShouldBeNil) + So(result, ShouldNotBeNil) + + metricsData := result.GetMetricsData() + So(len(metricsData), ShouldEqual, 1) + So(metricsData[0].Name, ShouldEqual, "t1") + + server.Close() + } + }) + } }) } diff --git a/metric_source/remote/request.go b/metric_source/remote/request.go index 686b1766b..2f4df448d 100644 --- a/metric_source/remote/request.go +++ b/metric_source/remote/request.go @@ -2,10 +2,14 @@ package remote import ( "context" + "errors" "fmt" "io" "net/http" "strconv" + "time" + + "github.com/cenkalti/backoff/v4" ) func (remote *Remote) prepareRequest(from, until int64, target string) (*http.Request, error) { @@ -13,39 +17,136 @@ func (remote *Remote) prepareRequest(from, until int64, target string) (*http.Re if err != nil { return nil, err } + q := req.URL.Query() q.Add("format", "json") q.Add("from", strconv.FormatInt(from, 10)) q.Add("target", target) q.Add("until", strconv.FormatInt(until, 10)) req.URL.RawQuery = q.Encode() + if remote.config.User != "" && remote.config.Password != "" { req.SetBasicAuth(remote.config.User, remote.config.Password) } + return req, nil } -func (remote *Remote) makeRequest(req *http.Request) ([]byte, error) { - var body []byte +func (remote *Remote) makeRequest(req *http.Request, timeout time.Duration, backoffPolicy backoff.BackOff) ([]byte, error) { + return remote.retrier.Retry( + requestToRemoteGraphite{ + client: remote.client, + request: req, + requestTimeout: timeout, + }, + backoffPolicy) +} + +// requestToRemoteGraphite implements retries.RetryableOperation. +type requestToRemoteGraphite struct { + client *http.Client + request *http.Request + requestTimeout time.Duration +} + +// DoRetryableOperation is a one attempt of performing request to remote graphite. +func (r requestToRemoteGraphite) DoRetryableOperation() ([]byte, error) { + ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout) + defer cancel() - resp, err := remote.client.Do(req) + req := r.request.WithContext(ctx) + + resp, err := r.client.Do(req) if resp != nil { defer resp.Body.Close() } if err != nil { - return body, fmt.Errorf("The remote server is not available or the response was reset by timeout. "+ //nolint - "TTL: %s, PATH: %s, ERROR: %v ", remote.client.Timeout.String(), req.URL.RawPath, err) + return nil, errRemoteUnavailable{ + internalErr: fmt.Errorf( + "the remote server is not available or the response was reset by timeout. Url: %s, Error: %w ", + req.URL.String(), + err), + } } - body, err = io.ReadAll(resp.Body) + body, err := io.ReadAll(resp.Body) if err != nil { - return body, err + return body, errRemoteUnavailable{internalErr: err} } - if resp.StatusCode != http.StatusOK { - return body, fmt.Errorf("bad response status %d: %s", resp.StatusCode, string(body)) + if isRemoteUnavailableStatusCode(resp.StatusCode) { + return body, errRemoteUnavailable{ + internalErr: fmt.Errorf( + "the remote server is not available. Response status %d: %s", + resp.StatusCode, string(body)), + } + } else if resp.StatusCode != http.StatusOK { + return body, backoff.Permanent( + errInvalidRequest{ + internalErr: fmt.Errorf("bad response status %d: %s", resp.StatusCode, string(body)), + }) } return body, nil } + +type errInvalidRequest struct { + internalErr error +} + +func (err errInvalidRequest) Error() string { + return err.internalErr.Error() +} + +type errRemoteUnavailable struct { + internalErr error +} + +func (err errRemoteUnavailable) Error() string { + return err.internalErr.Error() +} + +func isRemoteUnavailable(err error) bool { + var errUnavailable ErrRemoteUnavailable + return errors.As(err, &errUnavailable) +} + +func internalErrToPublicErr(err error, target string) error { + if err == nil { + return nil + } + + var invalidReqErr errInvalidRequest + if errors.As(err, &invalidReqErr) { + return ErrRemoteTriggerResponse{ + InternalError: invalidReqErr.internalErr, + Target: target, + } + } + + var errUnavailable errRemoteUnavailable + if errors.As(err, &errUnavailable) { + return ErrRemoteUnavailable{ + InternalError: errUnavailable.internalErr, + Target: target, + } + } + + return ErrRemoteTriggerResponse{ + InternalError: err, + Target: target, + } +} + +var remoteUnavailableStatusCodes = map[int]struct{}{ + http.StatusUnauthorized: {}, + http.StatusBadGateway: {}, + http.StatusServiceUnavailable: {}, + http.StatusGatewayTimeout: {}, +} + +func isRemoteUnavailableStatusCode(statusCode int) bool { + _, isUnavailableCode := remoteUnavailableStatusCodes[statusCode] + return isUnavailableCode +} diff --git a/metric_source/remote/request_test.go b/metric_source/remote/request_test.go index 340a8cef9..da575685f 100644 --- a/metric_source/remote/request_test.go +++ b/metric_source/remote/request_test.go @@ -5,7 +5,10 @@ import ( "net/http" "net/http/httptest" "testing" + "time" + "github.com/cenkalti/backoff/v4" + "github.com/moira-alert/moira/metric_source/retries" . "github.com/smartystreets/goconvey/convey" ) @@ -44,38 +47,269 @@ func TestPrepareRequest(t *testing.T) { }) } -func TestMakeRequest(t *testing.T) { +func Test_requestToRemoteGraphite_DoRetryableOperation(t *testing.T) { var from int64 = 300 var until int64 = 500 target := "foo.bar" body := []byte("Some string") + testTimeout := time.Millisecond * 10 + Convey("Client returns status OK", t, func() { server := createServer(body, http.StatusOK) + defer server.Close() + remote := Remote{client: server.Client(), config: &Config{URL: server.URL}} request, _ := remote.prepareRequest(from, until, target) - actual, err := remote.makeRequest(request) + + retryableOp := requestToRemoteGraphite{ + request: request, + client: remote.client, + requestTimeout: testTimeout, + } + + actual, err := retryableOp.DoRetryableOperation() + So(err, ShouldBeNil) So(actual, ShouldResemble, body) }) Convey("Client returns status InternalServerError", t, func() { server := createServer(body, http.StatusInternalServerError) + defer server.Close() + remote := Remote{client: server.Client(), config: &Config{URL: server.URL}} request, _ := remote.prepareRequest(from, until, target) - actual, err := remote.makeRequest(request) - So(err, ShouldResemble, fmt.Errorf("bad response status %d: %s", http.StatusInternalServerError, string(body))) + + retryableOp := requestToRemoteGraphite{ + request: request, + client: remote.client, + requestTimeout: testTimeout, + } + + actual, err := retryableOp.DoRetryableOperation() + + So(err, ShouldResemble, backoff.Permanent(errInvalidRequest{ + internalErr: fmt.Errorf("bad response status %d: %s", http.StatusInternalServerError, string(body)), + })) So(actual, ShouldResemble, body) }) Convey("Client calls bad url", t, func() { server := createServer(body, http.StatusOK) - remote := Remote{client: server.Client(), config: &Config{URL: "http://bad/"}} + defer server.Close() + + client := server.Client() + remote := Remote{client: client, config: &Config{URL: "http://bad/"}} request, _ := remote.prepareRequest(from, until, target) - actual, err := remote.makeRequest(request) - So(err, ShouldNotBeEmpty) + + retryableOp := requestToRemoteGraphite{ + request: request, + client: remote.client, + requestTimeout: testTimeout, + } + + actual, err := retryableOp.DoRetryableOperation() + + So(err, ShouldHaveSameTypeAs, errRemoteUnavailable{}) So(actual, ShouldBeEmpty) }) + + Convey("Client returns status Remote Unavailable status codes", t, func() { + for statusCode := range remoteUnavailableStatusCodes { + server := createServer(body, statusCode) + remote := Remote{client: server.Client(), config: &Config{URL: server.URL}} + request, _ := remote.prepareRequest(from, until, target) + + retryableOp := requestToRemoteGraphite{ + request: request, + client: remote.client, + requestTimeout: testTimeout, + } + + actual, err := retryableOp.DoRetryableOperation() + + So(err, ShouldResemble, errRemoteUnavailable{ + internalErr: fmt.Errorf( + "the remote server is not available. Response status %d: %s", statusCode, string(body)), + }) + So(actual, ShouldResemble, body) + + server.Close() + } + }) +} + +func TestMakeRequestWithRetries(t *testing.T) { + var from int64 = 300 + var until int64 = 500 + target := "foo.bar" + body := []byte("Some string") + + retrier := retries.NewStandardRetrier[[]byte]() + + Convey("Given server returns OK response", t, func() { + server := createTestServer(TestResponse{body, http.StatusOK}) + defer server.Close() + + Convey("request is successful", func() { + remote := Remote{ + client: server.Client(), + retrier: retrier, + } + + for _, config := range testConfigs { + config.URL = server.URL + remote.config = config + request, _ := remote.prepareRequest(from, until, target) + backoffPolicy := retries.NewExponentialBackoffFactory(config.Retries).NewBackOff() + + actual, err := remote.makeRequest( + request, + remote.config.Timeout, + backoffPolicy, + ) + + So(err, ShouldBeNil) + So(actual, ShouldResemble, body) + } + }) + }) + + Convey("Given server returns 500 response", t, func() { + server := createTestServer(TestResponse{body, http.StatusInternalServerError}) + defer server.Close() + + expectedErr := errInvalidRequest{ + internalErr: fmt.Errorf("bad response status %d: %s", http.StatusInternalServerError, string(body)), + } + // systemClock.EXPECT().Sleep(time.Second).Times(0) + + Convey("request failed with 500 response and remote is available", func() { + remote := Remote{ + client: server.Client(), + retrier: retrier, + } + + for _, config := range testConfigs { + config.URL = server.URL + remote.config = config + request, _ := remote.prepareRequest(from, until, target) + backoffPolicy := retries.NewExponentialBackoffFactory(config.Retries).NewBackOff() + + actual, err := remote.makeRequest( + request, + remote.config.Timeout, + backoffPolicy, + ) + + So(err, ShouldResemble, expectedErr) + So(actual, ShouldResemble, body) + } + }) + }) + + Convey("Given client calls bad url", t, func() { + server := createTestServer(TestResponse{body, http.StatusOK}) + defer server.Close() + + Convey("request failed and remote is unavailable", func() { + remote := Remote{ + client: server.Client(), + retrier: retrier, + } + + for _, config := range testConfigs { + config.URL = "http://bad/" + remote.config = config + + request, _ := remote.prepareRequest(from, until, target) + backoffPolicy := retries.NewExponentialBackoffFactory(config.Retries).NewBackOff() + + actual, err := remote.makeRequest( + request, + remote.config.Timeout, + backoffPolicy, + ) + + So(err, ShouldHaveSameTypeAs, errRemoteUnavailable{}) + So(actual, ShouldBeEmpty) + } + }) + }) + + Convey("Given server returns Remote Unavailable responses permanently", t, func() { + for statusCode := range remoteUnavailableStatusCodes { + server := createTestServer(TestResponse{body, statusCode}) + + Convey(fmt.Sprintf( + "request failed with %d response status code and remote is unavailable", statusCode, + ), func() { + remote := Remote{ + client: server.Client(), + retrier: retrier, + } + + for _, config := range testConfigs { + config.URL = server.URL + remote.config = config + + request, _ := remote.prepareRequest(from, until, target) + backoffPolicy := retries.NewExponentialBackoffFactory(config.Retries).NewBackOff() + + actual, err := remote.makeRequest( + request, + remote.config.Timeout, + backoffPolicy, + ) + + So(err, ShouldResemble, errRemoteUnavailable{ + internalErr: fmt.Errorf( + "the remote server is not available. Response status %d: %s", statusCode, string(body), + ), + }) + So(actual, ShouldResemble, body) + } + }) + + server.Close() + } + }) + + Convey("Given server returns Remote Unavailable response temporary", t, func() { + for statusCode := range remoteUnavailableStatusCodes { + Convey(fmt.Sprintf( + "request is successful with retry after %d response and remote is available", statusCode, + ), func() { + for _, config := range testConfigs { + server := createTestServer( + TestResponse{body, statusCode}, + TestResponse{body, http.StatusOK}, + ) + config.URL = server.URL + remote := Remote{ + client: server.Client(), + config: config, + retrier: retrier, + } + + request, _ := remote.prepareRequest(from, until, target) + backoffPolicy := retries.NewExponentialBackoffFactory(config.Retries).NewBackOff() + + actual, err := remote.makeRequest( + request, + remote.config.Timeout, + backoffPolicy, + ) + + So(err, ShouldBeNil) + So(actual, ShouldResemble, body) + + server.Close() + } + }) + } + }) } func createServer(body []byte, statusCode int) *httptest.Server { @@ -84,3 +318,35 @@ func createServer(body []byte, statusCode int) *httptest.Server { rw.Write(body) //nolint })) } + +func createTestServer(testResponses ...TestResponse) *httptest.Server { + responseWriter := NewTestResponseWriter(testResponses) + return httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + response := responseWriter.GetResponse() + rw.WriteHeader(response.statusCode) + rw.Write(response.body) //nolint + })) +} + +type TestResponse struct { + body []byte + statusCode int +} + +type TestResponseWriter struct { + responses []TestResponse + count int +} + +func NewTestResponseWriter(testResponses []TestResponse) *TestResponseWriter { + responseWriter := new(TestResponseWriter) + responseWriter.responses = testResponses + responseWriter.count = 0 + return responseWriter +} + +func (responseWriter *TestResponseWriter) GetResponse() TestResponse { + response := responseWriter.responses[responseWriter.count%len(responseWriter.responses)] + responseWriter.count++ + return response +} diff --git a/metric_source/remote/response.go b/metric_source/remote/response.go index 0af2fbaa0..8cc296948 100644 --- a/metric_source/remote/response.go +++ b/metric_source/remote/response.go @@ -23,6 +23,7 @@ func convertResponse(metricsData []metricSource.MetricData, allowRealTimeAlertin metricData.Values = metricData.Values[:len(metricData.Values)-1] result = append(result, metricData) } + return FetchResult{MetricsData: result} } @@ -32,12 +33,14 @@ func decodeBody(body []byte) ([]metricSource.MetricData, error) { if err != nil { return nil, err } + res := make([]metricSource.MetricData, 0, len(tmp)) for _, m := range tmp { var stepTime int64 = 60 if len(m.DataPoints) > 1 { stepTime = int64(*m.DataPoints[1][1] - *m.DataPoints[0][1]) } + metricData := metricSource.MetricData{ Name: m.Target, StartTime: int64(*m.DataPoints[0][1]), @@ -45,6 +48,7 @@ func decodeBody(body []byte) ([]metricSource.MetricData, error) { StepTime: stepTime, Values: make([]float64, len(m.DataPoints)), } + for i, v := range m.DataPoints { if v[0] == nil { metricData.Values[i] = math.NaN() @@ -52,7 +56,9 @@ func decodeBody(body []byte) ([]metricSource.MetricData, error) { metricData.Values[i] = *v[0] } } + res = append(res, metricData) } + return res, nil } diff --git a/metric_source/retries/backoff_factory.go b/metric_source/retries/backoff_factory.go new file mode 100644 index 000000000..718142161 --- /dev/null +++ b/metric_source/retries/backoff_factory.go @@ -0,0 +1,37 @@ +package retries + +import "github.com/cenkalti/backoff/v4" + +// BackoffFactory is used for creating backoff. It is expected that all backoffs created with one factory instance +// have the same behaviour. +type BackoffFactory interface { + NewBackOff() backoff.BackOff +} + +// ExponentialBackoffFactory is a factory that generates exponential backoffs based on given config. +type ExponentialBackoffFactory struct { + config Config +} + +// NewExponentialBackoffFactory creates new BackoffFactory which will generate exponential backoffs. +func NewExponentialBackoffFactory(config Config) BackoffFactory { + return ExponentialBackoffFactory{ + config: config, + } +} + +// NewBackOff creates new backoff. +func (factory ExponentialBackoffFactory) NewBackOff() backoff.BackOff { + backoffPolicy := backoff.NewExponentialBackOff( + backoff.WithInitialInterval(factory.config.InitialInterval), + backoff.WithRandomizationFactor(factory.config.RandomizationFactor), + backoff.WithMultiplier(factory.config.Multiplier), + backoff.WithMaxInterval(factory.config.MaxInterval), + backoff.WithMaxElapsedTime(factory.config.MaxElapsedTime)) + + if factory.config.MaxRetriesCount > 0 { + return backoff.WithMaxRetries(backoffPolicy, factory.config.MaxRetriesCount) + } + + return backoffPolicy +} diff --git a/metric_source/retries/backoff_factory_test.go b/metric_source/retries/backoff_factory_test.go new file mode 100644 index 000000000..50d12cef1 --- /dev/null +++ b/metric_source/retries/backoff_factory_test.go @@ -0,0 +1,169 @@ +package retries + +import ( + "sync" + "testing" + "time" + + "github.com/cenkalti/backoff/v4" + + . "github.com/smartystreets/goconvey/convey" +) + +const ( + testInitialInterval = time.Millisecond * 5 + testRandomizationFactor = 0.0 + testMultiplier = 2.0 + testMaxInterval = time.Millisecond * 40 +) + +func TestExponentialBackoffFactory(t *testing.T) { + Convey("Test ExponentialBackoffFactory", t, func() { + conf := Config{ + InitialInterval: testInitialInterval, + RandomizationFactor: testRandomizationFactor, + Multiplier: testMultiplier, + MaxInterval: testMaxInterval, + } + + Convey("with MaxRetriesCount != 0 and MaxElapsedTime = 0", func() { + Convey("with retry interval always lower then config.MaxInterval", func() { + conf.MaxRetriesCount = 3 + defer func() { + conf.MaxRetriesCount = 0 + }() + + expectedBackoffs := []time.Duration{ + testInitialInterval, + testInitialInterval * testMultiplier, + testInitialInterval * 4.0, + backoff.Stop, + backoff.Stop, + backoff.Stop, + } + + factory := NewExponentialBackoffFactory(conf) + + b := factory.NewBackOff() + + for i := range expectedBackoffs { + So(b.NextBackOff(), ShouldEqual, expectedBackoffs[i]) + } + }) + + Convey("with retry interval becomes config.MaxInterval", func() { + conf.MaxRetriesCount = 6 + defer func() { + conf.MaxRetriesCount = 0 + }() + + expectedBackoffs := []time.Duration{ + testInitialInterval, + testInitialInterval * testMultiplier, + testInitialInterval * 4.0, + testMaxInterval, + testMaxInterval, + testMaxInterval, + backoff.Stop, + backoff.Stop, + backoff.Stop, + } + + factory := NewExponentialBackoffFactory(conf) + + b := factory.NewBackOff() + + for i := range expectedBackoffs { + So(b.NextBackOff(), ShouldEqual, expectedBackoffs[i]) + } + }) + }) + + Convey("with MaxRetriesCount = 0 and MaxElapsedTime != 0", func() { + conf.MaxElapsedTime = time.Second + defer func() { + conf.MaxElapsedTime = 0 + }() + + once := sync.Once{} + + expectedBackoffs := []time.Duration{ + testInitialInterval, + backoff.Stop, + backoff.Stop, + backoff.Stop, + } + + factory := NewExponentialBackoffFactory(conf) + + b := factory.NewBackOff() + + for i := range expectedBackoffs { + So(b.NextBackOff(), ShouldEqual, expectedBackoffs[i]) + once.Do(func() { + time.Sleep(conf.MaxElapsedTime) + }) + } + }) + + Convey("with MaxRetriesCount != 0 and MaxElapsedTime != 0", func() { + Convey("MaxRetriesCount performed retries before MaxElapsedTime passed", func() { + conf.MaxElapsedTime = time.Second + conf.MaxRetriesCount = 6 + defer func() { + conf.MaxElapsedTime = 0 + conf.MaxRetriesCount = 0 + }() + + expectedBackoffs := []time.Duration{ + testInitialInterval, + testInitialInterval * testMultiplier, + testInitialInterval * 4.0, + testMaxInterval, + testMaxInterval, + testMaxInterval, + backoff.Stop, + backoff.Stop, + backoff.Stop, + } + + factory := NewExponentialBackoffFactory(conf) + + b := factory.NewBackOff() + + for i := range expectedBackoffs { + So(b.NextBackOff(), ShouldEqual, expectedBackoffs[i]) + } + }) + + Convey("MaxElapsedTime passed before MaxRetriesCount performed", func() { + conf.MaxElapsedTime = time.Second + conf.MaxRetriesCount = 6 + defer func() { + conf.MaxElapsedTime = 0 + conf.MaxRetriesCount = 0 + }() + + expectedBackoffs := []time.Duration{ + testInitialInterval, + backoff.Stop, + backoff.Stop, + backoff.Stop, + } + + once := sync.Once{} + + factory := NewExponentialBackoffFactory(conf) + + b := factory.NewBackOff() + + for i := range expectedBackoffs { + So(b.NextBackOff(), ShouldEqual, expectedBackoffs[i]) + once.Do(func() { + time.Sleep(conf.MaxElapsedTime) + }) + } + }) + }) + }) +} diff --git a/metric_source/retries/config.go b/metric_source/retries/config.go new file mode 100644 index 000000000..f32cd0d44 --- /dev/null +++ b/metric_source/retries/config.go @@ -0,0 +1,50 @@ +package retries + +import ( + "errors" + "time" +) + +// Config for exponential backoff retries. +type Config struct { + // InitialInterval between requests. + InitialInterval time.Duration + // RandomizationFactor is used in exponential backoff to add some randomization + // when calculating next interval between requests. + // It will be used in multiplication like: + // RandomizedInterval = RetryInterval * (random value in range [1 - RandomizationFactor, 1 + RandomizationFactor]) + RandomizationFactor float64 + // Each new RetryInterval will be multiplied on Multiplier. + Multiplier float64 + // MaxInterval is the cap for RetryInterval. Note that it doesn't cap the RandomizedInterval. + MaxInterval time.Duration + // MaxElapsedTime caps the time passed from first try. If time passed is greater than MaxElapsedTime than stop retrying. + MaxElapsedTime time.Duration + // MaxRetriesCount is the amount of allowed retries. So at most MaxRetriesCount will be performed. + MaxRetriesCount uint64 +} + +var ( + errNoInitialInterval = errors.New("initial_interval must be specified and can't be 0") + errNoMaxInterval = errors.New("max_interval must be specified and can't be 0") + errNoMaxElapsedTimeAndMaxRetriesCount = errors.New("at least one of max_elapsed_time, max_retries_count must be specified") +) + +// Validate checks that retries Config has all necessary fields. +func (conf Config) Validate() error { + resErrors := make([]error, 0) + + if conf.InitialInterval == 0 { + resErrors = append(resErrors, errNoInitialInterval) + } + + if conf.MaxInterval == 0 { + resErrors = append(resErrors, errNoMaxInterval) + } + + if conf.MaxElapsedTime == 0 && conf.MaxRetriesCount == 0 { + resErrors = append(resErrors, errNoMaxElapsedTimeAndMaxRetriesCount) + } + + return errors.Join(resErrors...) +} diff --git a/metric_source/retries/config_test.go b/metric_source/retries/config_test.go new file mode 100644 index 000000000..a20e75a12 --- /dev/null +++ b/metric_source/retries/config_test.go @@ -0,0 +1,78 @@ +package retries + +import ( + "errors" + "fmt" + "testing" + "time" + + . "github.com/smartystreets/goconvey/convey" +) + +func TestConfig_Validate(t *testing.T) { + Convey("Test validating retries config", t, func() { + type testcase struct { + caseDesc string + conf Config + expectedErr error + } + + var ( + testRetriesCount uint64 = 10 + testMaxElapsedTIme = time.Second * 10 + ) + + cases := []testcase{ + { + caseDesc: "with empty config", + conf: Config{}, + expectedErr: errors.Join(errNoInitialInterval, errNoMaxInterval, errNoMaxElapsedTimeAndMaxRetriesCount), + }, + { + caseDesc: "with only InitialInterval set", + conf: Config{ + InitialInterval: testInitialInterval, + }, + expectedErr: errors.Join(errNoMaxInterval, errNoMaxElapsedTimeAndMaxRetriesCount), + }, + { + caseDesc: "with only MaxInterval set", + conf: Config{ + MaxInterval: testMaxInterval, + }, + expectedErr: errors.Join(errNoInitialInterval, errNoMaxElapsedTimeAndMaxRetriesCount), + }, + { + caseDesc: "with only MaxRetriesCount set", + conf: Config{ + MaxRetriesCount: testRetriesCount, + }, + expectedErr: errors.Join(errNoInitialInterval, errNoMaxInterval), + }, + { + caseDesc: "with only MaxElapsedTime set", + conf: Config{ + MaxElapsedTime: testMaxElapsedTIme, + }, + expectedErr: errors.Join(errNoInitialInterval, errNoMaxInterval), + }, + { + caseDesc: "with valid config", + conf: Config{ + InitialInterval: testInitialInterval, + MaxInterval: testMaxInterval, + MaxElapsedTime: testMaxElapsedTIme, + }, + expectedErr: nil, + }, + } + + for i := range cases { + Convey(fmt.Sprintf("Case %d: %s", i+1, cases[i].caseDesc), func() { + err := cases[i].conf.Validate() + + So(err, ShouldResemble, cases[i].expectedErr) + }) + } + }) +} diff --git a/metric_source/retries/retrier.go b/metric_source/retries/retrier.go new file mode 100644 index 000000000..24a8b38c7 --- /dev/null +++ b/metric_source/retries/retrier.go @@ -0,0 +1,23 @@ +package retries + +import ( + "github.com/cenkalti/backoff/v4" +) + +// Retrier retries the given operation with given backoff. +type Retrier[T any] interface { + // Retry the given operation until the op succeeds or op returns backoff.PermanentError or backoffPolicy returns backoff.Stop. + Retry(op RetryableOperation[T], backoffPolicy backoff.BackOff) (T, error) +} + +type standardRetrier[T any] struct{} + +// NewStandardRetrier returns standard retrier. +func NewStandardRetrier[T any]() Retrier[T] { + return standardRetrier[T]{} +} + +// Retry the given operation until the op succeeds or op returns backoff.PermanentError or backoffPolicy returns backoff.Stop. +func (r standardRetrier[T]) Retry(op RetryableOperation[T], backoffPolicy backoff.BackOff) (T, error) { + return backoff.RetryWithData[T](op.DoRetryableOperation, backoffPolicy) +} diff --git a/metric_source/retries/retrier_test.go b/metric_source/retries/retrier_test.go new file mode 100644 index 000000000..360771d86 --- /dev/null +++ b/metric_source/retries/retrier_test.go @@ -0,0 +1,167 @@ +package retries + +import ( + "errors" + "testing" + + "github.com/cenkalti/backoff/v4" + + . "github.com/smartystreets/goconvey/convey" +) + +func TestStandardRetrier(t *testing.T) { + var ( + maxRetriesCount uint64 = 6 + testErr = errors.New("some test err") + errInsidePermanent = errors.New("test err inside permanent") + permanentTestErr = backoff.Permanent(errInsidePermanent) + ) + + conf := Config{ + InitialInterval: testInitialInterval, + RandomizationFactor: testRandomizationFactor, + Multiplier: testMultiplier, + MaxInterval: testMaxInterval, + MaxRetriesCount: maxRetriesCount, + } + + retrier := NewStandardRetrier[int]() + + Convey("Test retrier", t, func() { + Convey("with successful RetryableOperation", func() { + retPairs := []retPair[int]{ + { + returnValue: 25, + err: nil, + }, + { + returnValue: 26, + err: nil, + }, + } + expectedCalls := 1 + + stub := newStubRetryableOperation[int](retPairs) + + backoffPolicy := NewExponentialBackoffFactory(conf).NewBackOff() + + gotRes, gotErr := retrier.Retry(stub, backoffPolicy) + + So(gotRes, ShouldEqual, retPairs[0].returnValue) + So(gotErr, ShouldBeNil) + So(stub.calls, ShouldEqual, expectedCalls) + }) + + Convey("with successful RetryableOperation after some retries", func() { + retPairs := []retPair[int]{ + { + returnValue: 25, + err: testErr, + }, + { + returnValue: 10, + err: testErr, + }, + { + returnValue: 42, + err: nil, + }, + { + returnValue: 41, + err: nil, + }, + } + expectedCalls := 3 + + stub := newStubRetryableOperation[int](retPairs) + + backoffPolicy := NewExponentialBackoffFactory(conf).NewBackOff() + + gotRes, gotErr := retrier.Retry(stub, backoffPolicy) + + So(gotRes, ShouldEqual, retPairs[2].returnValue) + So(gotErr, ShouldBeNil) + So(stub.calls, ShouldEqual, expectedCalls) + }) + + Convey("with permanent error from RetryableOperation after some retries", func() { + retPairs := []retPair[int]{ + { + returnValue: 25, + err: testErr, + }, + { + returnValue: 10, + err: permanentTestErr, + }, + { + returnValue: 42, + err: nil, + }, + { + returnValue: 41, + err: nil, + }, + } + expectedCalls := 2 + + stub := newStubRetryableOperation[int](retPairs) + + backoffPolicy := NewExponentialBackoffFactory(conf).NewBackOff() + + gotRes, gotErr := retrier.Retry(stub, backoffPolicy) + + So(gotRes, ShouldEqual, retPairs[1].returnValue) + So(gotErr, ShouldResemble, errInsidePermanent) + So(stub.calls, ShouldEqual, expectedCalls) + }) + + Convey("with RetryableOperation failed on each retry", func() { + expectedCalls := conf.MaxRetriesCount + 1 + + stub := newStubRetryableOperation[int](nil) + + backoffPolicy := NewExponentialBackoffFactory(conf).NewBackOff() + + gotRes, gotErr := retrier.Retry(stub, backoffPolicy) + + So(gotRes, ShouldEqual, 0) + So(gotErr, ShouldResemble, errStubValuesEnded) + So(stub.calls, ShouldEqual, expectedCalls) + }) + }) +} + +type retPair[T any] struct { + returnValue T + err error +} + +type stubRetryableOperation[T any] struct { + retPairs []retPair[T] + idx int + calls int +} + +func newStubRetryableOperation[T any](pairs []retPair[T]) *stubRetryableOperation[T] { + return &stubRetryableOperation[T]{ + retPairs: pairs, + idx: 0, + calls: 0, + } +} + +var errStubValuesEnded = errors.New("prepared return values and errors for stub ended") + +func (stub *stubRetryableOperation[T]) DoRetryableOperation() (T, error) { + stub.calls += 1 + + if stub.idx >= len(stub.retPairs) { + return *new(T), errStubValuesEnded + } + + res := stub.retPairs[stub.idx] + stub.idx += 1 + + return res.returnValue, res.err +} diff --git a/metric_source/retries/retryable_operation.go b/metric_source/retries/retryable_operation.go new file mode 100644 index 000000000..dfab3d17e --- /dev/null +++ b/metric_source/retries/retryable_operation.go @@ -0,0 +1,7 @@ +package retries + +// RetryableOperation is an action that can be retried after some time interval. +// If there is an error in DoRetryableOperation that should not be retried, wrap the error with backoff.PermanentError. +type RetryableOperation[T any] interface { + DoRetryableOperation() (T, error) +}