diff --git a/.golangci.yml b/.golangci.yml
index 5a6b9e62..34e82136 100644
--- a/.golangci.yml
+++ b/.golangci.yml
@@ -7,6 +7,8 @@ linters-settings:
govet:
enable:
- composites
+ disable:
+ - printf
dupl:
threshold: 120
goconst:
diff --git a/plugin/action/throttle/README.md b/plugin/action/throttle/README.md
index cb66dc94..1d500415 100755
--- a/plugin/action/throttle/README.md
+++ b/plugin/action/throttle/README.md
@@ -65,10 +65,11 @@ Time interval to check event throughput.
**`rules`** *`[]RuleConfig`*
Rules to override the `default_limit` for different group of event. It's a list of objects.
-Each object has the `limit` and `conditions` fields.
+Each object has the `limit`, `limit_kind` and `conditions` fields as well as an optional `limit_distribution` field.
* `limit` – the value which will override the `default_limit`, if `conditions` are met.
-* `limit_kind` – the type of a limit: `count` - number of messages, `size` - total size from all messages
+* `limit_kind` – the type of limit: `count` - number of messages, `size` - total size from all messages
* `conditions` – the map of `event field name => event field value`. The conditions are checked using `AND` operator.
+* `limit_distribution` – see `LimitDistributionConfig` for details.
@@ -78,6 +79,41 @@ Time interval after which unused limiters are removed.
+**`limit_distribution`** *`LimitDistributionConfig`*
+
+It allows to distribute the `default_limit` between events by condition.
+
+`LimitDistributionConfig` params:
+* `field` - the event field on which the distribution will be based.
+* `ratios` - the list of objects. Each object has:
+ * `ratio` - distribution ratio, value must be in range [0.0;1.0].
+ * `values` - the list of strings which contains all `field` values that fall into this distribution.
+* `metric_labels` - list of metric labels.
+
+> Notes:
+> 1. Sum of ratios must be in range [0.0;1.0].
+> 2. If sum of ratios less than 1, then adding **default distribution** with ratio **1-sum**,
+> otherwise **default distribution** isn't used.
+> All events for which the value in the `field` doesn't fall into any of the distributions:
+> * fall into default distribution, if it exists
+> * throttled, otherwise
+
+`LimitDistributionConfig` example:
+```yaml
+field: log.level
+ratios:
+ - ratio: 0.5
+ values: ['error']
+ - ratio: 0.3
+ values: ['warn', 'info']
+```
+For this config and the `default_limit=100`:
+* events with `log.level=error` will have a limit of 50
+* events with `log.level=warn` or `log.level=info` will have a limit of 30
+* all other events will have a limit of 20
+
+
+
**`endpoint`** *`string`*
@@ -140,5 +176,30 @@ If not set limiter values are considered as non-json data.
+**`limiter_distribution_field`** *`string`*
+
+Defines field with limit distribution inside json object stored in value
+(e.g. if set to "distribution", value must be of kind `{"distribution":{},...}`).
+Distribution object example:
+```json
+{
+ "field": "log.level",
+ "ratios": [
+ {
+ "ratio": 0.5,
+ "values": ["error"]
+ },
+ {
+ "ratio": 0.3,
+ "values": ["warn", "info"]
+ }
+ ],
+ "enabled": true
+}
+```
+> If `limiter_value_field` and `limiter_distribution_field` not set, distribution will not be stored.
+
+
+
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
\ No newline at end of file
diff --git a/plugin/action/throttle/buckets.go b/plugin/action/throttle/buckets.go
new file mode 100644
index 00000000..13b21a7c
--- /dev/null
+++ b/plugin/action/throttle/buckets.go
@@ -0,0 +1,225 @@
+package throttle
+
+import (
+ "time"
+)
+
+type buckets interface {
+ get(index, distrIndex int) int64
+ getAll(index int, buf []int64) []int64
+ set(index, distrIndex int, value int64)
+ reset(index int)
+ add(index, distrIndex int, value int64)
+ isEmpty(index int) bool
+ // rebuild will rebuild buckets and returns actual bucket id
+ rebuild(currentTs, ts time.Time) int
+
+ // actualizeIndex checks probable shift of buckets and returns actual bucket index and actuality
+ actualizeIndex(maxID, index int) (int, bool)
+ getCount() int
+ getInterval() time.Duration
+ getMinID() int
+
+ // for testing only
+ isSimple() bool
+}
+
+func newBuckets(count, distributionSize int, interval time.Duration) buckets {
+ if distributionSize == 1 {
+ return newSimpleBuckets(count, interval)
+ }
+ return newDistributedBuckets(count, distributionSize, interval)
+}
+
+type bucketsMeta struct {
+ count int
+ interval time.Duration
+ minID int // min bucket id
+ maxID int // max bucket id
+}
+
+func newBucketsMeta(count int, interval time.Duration) bucketsMeta {
+ return bucketsMeta{
+ count: count,
+ interval: interval,
+ }
+}
+
+func (m bucketsMeta) getCount() int {
+ return m.count
+}
+
+func (m bucketsMeta) getInterval() time.Duration {
+ return m.interval
+}
+
+func (m bucketsMeta) getMinID() int {
+ return m.minID
+}
+
+func (m bucketsMeta) actualizeIndex(maxID, index int) (int, bool) {
+ if m.maxID == maxID {
+ return index, true
+ }
+
+ // buckets were rebuild during some operations, calc actual index
+ shift := m.maxID - maxID
+ actualIndex := index - shift
+ return actualIndex, actualIndex > 0
+}
+
+// timeToBucketID converts time to bucketID
+func (m bucketsMeta) timeToBucketID(t time.Time) int {
+ return int(t.UnixNano() / m.interval.Nanoseconds())
+}
+
+type simpleBuckets struct {
+ bucketsMeta
+ b []int64
+}
+
+func newSimpleBuckets(count int, interval time.Duration) *simpleBuckets {
+ return &simpleBuckets{
+ bucketsMeta: newBucketsMeta(count, interval),
+ b: make([]int64, count),
+ }
+}
+
+func (b *simpleBuckets) get(index, _ int) int64 {
+ return b.b[index]
+}
+
+func (b *simpleBuckets) getAll(index int, buf []int64) []int64 {
+ return append(buf, b.b[index])
+}
+
+func (b *simpleBuckets) set(index, _ int, value int64) {
+ b.b[index] = value
+}
+
+func (b *simpleBuckets) reset(index int) {
+ b.b[index] = 0
+}
+
+func (b *simpleBuckets) add(index, _ int, value int64) {
+ b.b[index] += value
+}
+
+func (b *simpleBuckets) isEmpty(index int) bool {
+ return b.b[index] == 0
+}
+
+func (b *simpleBuckets) rebuild(currentTs, ts time.Time) int {
+ resetFn := func(count int) {
+ b.b = append(b.b[count:], b.b[:count]...)
+ for i := 0; i < count; i++ {
+ b.reset(b.getCount() - 1 - i)
+ }
+ }
+
+ return rebuildBuckets(&b.bucketsMeta, resetFn, currentTs, ts)
+}
+
+func (b *simpleBuckets) isSimple() bool {
+ return true
+}
+
+type distributedBuckets struct {
+ bucketsMeta
+ b []distributedBucket
+}
+
+func newDistributedBuckets(count, distributionSize int, interval time.Duration) *distributedBuckets {
+ db := &distributedBuckets{
+ bucketsMeta: newBucketsMeta(count, interval),
+ b: make([]distributedBucket, count),
+ }
+ for i := 0; i < count; i++ {
+ db.b[i] = newDistributedBucket(distributionSize)
+ }
+ return db
+}
+
+func (b *distributedBuckets) get(index, distrIndex int) int64 {
+ return b.b[index][distrIndex]
+}
+
+func (b *distributedBuckets) getAll(index int, buf []int64) []int64 {
+ return b.b[index].copyTo(buf)
+}
+
+func (b *distributedBuckets) set(index, distrIndex int, value int64) {
+ b.b[index][distrIndex] = value
+}
+
+func (b *distributedBuckets) reset(index int) {
+ for i := range b.b[index] {
+ b.b[index][i] = 0
+ }
+}
+
+func (b *distributedBuckets) add(index, distrIndex int, value int64) {
+ b.b[index][distrIndex] += value
+}
+
+func (b *distributedBuckets) isEmpty(index int) bool {
+ for _, v := range b.b[index] {
+ if v > 0 {
+ return false
+ }
+ }
+ return true
+}
+
+func (b *distributedBuckets) rebuild(currentTs, ts time.Time) int {
+ resetFn := func(count int) {
+ b.b = append(b.b[count:], b.b[:count]...)
+ for i := 0; i < count; i++ {
+ b.reset(b.getCount() - 1 - i)
+ }
+ }
+
+ return rebuildBuckets(&b.bucketsMeta, resetFn, currentTs, ts)
+}
+
+func (b *distributedBuckets) isSimple() bool {
+ return false
+}
+
+type distributedBucket []int64
+
+func newDistributedBucket(size int) distributedBucket {
+ return make(distributedBucket, size)
+}
+
+func (s distributedBucket) copyTo(buf distributedBucket) distributedBucket {
+ return append(buf, s...)
+}
+
+func rebuildBuckets(meta *bucketsMeta, resetFn func(int), currentTs, ts time.Time) int {
+ currentID := meta.timeToBucketID(currentTs)
+ if meta.minID == 0 {
+ // min id weren't set yet. It MUST be extracted from currentTs, because ts from event can be invalid (e.g. from 1970 or 2077 year)
+ meta.maxID = currentID
+ meta.minID = meta.maxID - meta.count + 1
+ }
+ maxID := meta.minID + meta.count - 1
+
+ // currentID exceed maxID, actualize buckets
+ if currentID > maxID {
+ dif := currentID - maxID
+ n := min(dif, meta.count)
+ // reset old buckets
+ resetFn(n)
+ // update ids
+ meta.minID += dif
+ meta.maxID = currentID
+ }
+
+ id := meta.timeToBucketID(ts)
+ // events from past or future goes to the latest bucket
+ if id < meta.minID || id > meta.maxID {
+ id = meta.maxID
+ }
+ return id
+}
diff --git a/plugin/action/throttle/buckets_test.go b/plugin/action/throttle/buckets_test.go
new file mode 100644
index 00000000..ce7fd363
--- /dev/null
+++ b/plugin/action/throttle/buckets_test.go
@@ -0,0 +1,213 @@
+package throttle
+
+import (
+ "slices"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestMetaActualizeIndex(t *testing.T) {
+ type testCase struct {
+ curMaxID int
+ newMaxID int
+ index int
+ }
+
+ tests := []struct {
+ name string
+ tc testCase
+
+ wantIndex int
+ wantActual bool
+ }{
+ {
+ name: "same_max_id",
+ tc: testCase{
+ curMaxID: 10,
+ newMaxID: 10,
+ index: 5,
+ },
+ wantIndex: 5,
+ wantActual: true,
+ },
+ {
+ name: "not_same_max_id_actual",
+ tc: testCase{
+ curMaxID: 12,
+ newMaxID: 10,
+ index: 5,
+ },
+ wantIndex: 3,
+ wantActual: true,
+ },
+ {
+ name: "not_same_max_id_not_actual",
+ tc: testCase{
+ curMaxID: 30,
+ newMaxID: 10,
+ index: 5,
+ },
+ wantIndex: -15,
+ wantActual: false,
+ },
+ }
+ for _, tt := range tests {
+ tt := tt
+ t.Run(tt.name, func(t *testing.T) {
+ t.Parallel()
+
+ meta := bucketsMeta{
+ maxID: tt.tc.curMaxID,
+ }
+ index, actual := meta.actualizeIndex(tt.tc.newMaxID, tt.tc.index)
+ require.Equal(t, tt.wantIndex, index, "wrong index")
+ require.Equal(t, tt.wantActual, actual, "wrong actuality")
+ })
+ }
+}
+
+func TestRebuildBuckets(t *testing.T) {
+ const interval = time.Second
+
+ // equals to current bucketsMeta.timeToBucketID func
+ timeToID := func(t time.Time) int {
+ return int(t.UnixNano() / interval.Nanoseconds())
+ }
+
+ ts, _ := time.Parse(time.RFC3339, "2024-02-12T10:20:30Z")
+
+ type testCase struct {
+ b []int64
+ meta *bucketsMeta
+ currentTs time.Time
+ ts time.Time
+ }
+ type wantData struct {
+ id int
+ buckets []int64
+ minID int
+ maxID int
+ }
+
+ tests := []struct {
+ name string
+ tc testCase
+ want wantData
+ }{
+ {
+ name: "zero_min_id",
+ tc: testCase{
+ b: []int64{1, 2, 3},
+ meta: &bucketsMeta{
+ count: 3,
+ interval: interval,
+ minID: 0,
+ },
+ currentTs: ts,
+ },
+ want: wantData{
+ id: -1,
+ buckets: []int64{1, 2, 3},
+ minID: timeToID(ts) - 2, // 2 = count-1
+ maxID: timeToID(ts),
+ },
+ },
+ {
+ name: "current_id_not_greater_max_id",
+ tc: testCase{
+ b: []int64{1, 2, 3},
+ meta: &bucketsMeta{
+ count: 3,
+ interval: interval,
+ minID: timeToID(ts),
+ },
+ currentTs: ts,
+ },
+ want: wantData{
+ id: -1,
+ buckets: []int64{1, 2, 3},
+ minID: timeToID(ts),
+ maxID: -1,
+ },
+ },
+ {
+ name: "current_id_greater_max_id",
+ tc: testCase{
+ b: []int64{1, 2, 3},
+ meta: &bucketsMeta{
+ count: 3,
+ interval: interval,
+ minID: timeToID(ts),
+ },
+ currentTs: ts.Add(4 * time.Second),
+ },
+ want: wantData{
+ id: -1,
+ buckets: []int64{3, 0, 0},
+ minID: timeToID(ts.Add(2 * time.Second)),
+ maxID: timeToID(ts.Add(4 * time.Second)),
+ },
+ },
+ {
+ name: "ts_id_between_minmax",
+ tc: testCase{
+ meta: &bucketsMeta{
+ interval: interval,
+ minID: timeToID(ts),
+ maxID: timeToID(ts.Add(3 * time.Second)),
+ },
+ ts: ts.Add(time.Second),
+ },
+ want: wantData{
+ id: timeToID(ts.Add(time.Second)),
+ minID: -1,
+ maxID: -1,
+ },
+ },
+ {
+ name: "ts_id_not_between_minmax",
+ tc: testCase{
+ meta: &bucketsMeta{
+ interval: interval,
+ minID: timeToID(ts),
+ maxID: timeToID(ts.Add(3 * time.Second)),
+ },
+ ts: ts.Add(5 * time.Second),
+ },
+ want: wantData{
+ id: timeToID(ts.Add(3 * time.Second)), // same as maxID
+ minID: -1,
+ maxID: -1,
+ },
+ },
+ }
+ for _, tt := range tests {
+ tt := tt
+ t.Run(tt.name, func(t *testing.T) {
+ t.Parallel()
+
+ resetFn := func(count int) {
+ tt.tc.b = append(tt.tc.b[count:], tt.tc.b[:count]...)
+ for i := 0; i < count; i++ {
+ tt.tc.b[len(tt.tc.b)-1-i] = 0
+ }
+ }
+ id := rebuildBuckets(tt.tc.meta, resetFn, tt.tc.currentTs, tt.tc.ts)
+
+ if tt.want.id != -1 {
+ require.Equal(t, tt.want.id, id, "wrong ID")
+ }
+ if len(tt.want.buckets) > 0 {
+ require.Equal(t, true, slices.Equal(tt.want.buckets, tt.tc.b), "wrong buckets")
+ }
+ if tt.want.minID != -1 {
+ require.Equal(t, tt.want.minID, tt.tc.meta.minID, "wrong buckets min ID")
+ }
+ if tt.want.maxID != -1 {
+ require.Equal(t, tt.want.maxID, tt.tc.meta.maxID, "wrong buckets max ID")
+ }
+ })
+ }
+}
diff --git a/plugin/action/throttle/distribution.go b/plugin/action/throttle/distribution.go
new file mode 100644
index 00000000..64cd0bf2
--- /dev/null
+++ b/plugin/action/throttle/distribution.go
@@ -0,0 +1,140 @@
+package throttle
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "math"
+
+ "github.com/ozontech/file.d/cfg"
+ "github.com/ozontech/file.d/metric"
+)
+
+type limitDistributionMetrics struct {
+ CustomLabels []string
+ EventsCount metric.HeldCounterVec
+ EventsSize metric.HeldCounterVec
+}
+
+type limitDistributionRatio struct {
+ Ratio float64 `json:"ratio"`
+ Values []string `json:"values"`
+}
+
+type limitDistributionCfg struct {
+ Field string `json:"field"`
+ Ratios []limitDistributionRatio `json:"ratios"`
+ Enabled bool `json:"enabled"`
+}
+
+func (c *limitDistributionCfg) marshalJson() []byte {
+ v, _ := json.Marshal(c)
+ return v
+}
+
+func (c *limitDistributionCfg) isEmpty() bool {
+ return c.Field == "" || len(c.Ratios) == 0
+}
+
+func parseLimitDistribution(c limitDistributionCfg, totalLimit int64) (limitDistributions, error) {
+ if c.Field == "" {
+ return limitDistributions{}, nil
+ }
+ if len(c.Ratios) == 0 {
+ return limitDistributions{}, errors.New("empty 'ratios'")
+ }
+
+ ld := limitDistributions{
+ field: cfg.ParseFieldSelector(c.Field),
+ distributions: make([]complexDistribution, len(c.Ratios)),
+ idxByKey: map[string]int{},
+ enabled: c.Enabled,
+ }
+
+ var ratioSum float64
+ for i, r := range c.Ratios {
+ if r.Ratio < 0 || r.Ratio > 1 {
+ return ld, errors.New("'ratio' value must be in range [0.0;1.0]")
+ }
+ if len(r.Values) == 0 {
+ return ld, fmt.Errorf("empty 'values' in ratio #%d", i)
+ }
+
+ ratioSum += r.Ratio
+ for _, v := range r.Values {
+ if _, ok := ld.idxByKey[v]; ok {
+ return ld, fmt.Errorf("value '%s' is duplicated in 'ratios' list", v)
+ }
+ ld.idxByKey[v] = i
+ }
+
+ ld.distributions[i] = complexDistribution{
+ ratio: r.Ratio,
+ limit: int64(math.Round(r.Ratio * float64(totalLimit))),
+ }
+ }
+
+ dif := 1 - ratioSum
+ if dif < 0 {
+ return ld, errors.New("sum of ratios must be less than or equal to 1")
+ }
+
+ defRatio := math.Round(dif*100) / 100
+ ld.defDistribution = complexDistribution{
+ ratio: defRatio,
+ limit: int64(math.Round(defRatio * float64(totalLimit))),
+ }
+
+ return ld, nil
+}
+
+type complexDistribution struct {
+ ratio float64 // between [0.0;1.0]
+ limit int64 // distributed limit = total limit * ratio
+}
+
+// limitDistributions is not thread-safe
+type limitDistributions struct {
+ field []string // event field, based on the values of which limits are distributed
+ idxByKey map[string]int // relationship between the field value and index in a distributions
+ distributions []complexDistribution
+ defDistribution complexDistribution // default distribution if there is no field value in idxByKey map
+ enabled bool
+}
+
+func (ld *limitDistributions) isEnabled() bool {
+ return ld.enabled
+}
+
+func (ld *limitDistributions) size() int {
+ return len(ld.distributions)
+}
+
+// get returns (index, distribution limit) by key or (-1, default distribution limit) otherwise
+func (ld *limitDistributions) getLimit(key string) (int, int64) {
+ if idx, ok := ld.idxByKey[key]; ok {
+ return idx, ld.distributions[idx].limit
+ }
+ return -1, ld.defDistribution.limit
+}
+
+func (ld *limitDistributions) copy() limitDistributions {
+ fieldCopy := make([]string, len(ld.field))
+ copy(fieldCopy, ld.field)
+
+ distributionsCopy := make([]complexDistribution, len(ld.distributions))
+ copy(distributionsCopy, ld.distributions)
+
+ idxByKeyCopy := make(map[string]int, len(ld.idxByKey))
+ for k, v := range ld.idxByKey {
+ idxByKeyCopy[k] = v
+ }
+
+ return limitDistributions{
+ field: fieldCopy,
+ distributions: distributionsCopy,
+ idxByKey: idxByKeyCopy,
+ defDistribution: ld.defDistribution,
+ enabled: ld.enabled,
+ }
+}
diff --git a/plugin/action/throttle/distribution_test.go b/plugin/action/throttle/distribution_test.go
new file mode 100644
index 00000000..d876722f
--- /dev/null
+++ b/plugin/action/throttle/distribution_test.go
@@ -0,0 +1,163 @@
+package throttle
+
+import (
+ "fmt"
+ "slices"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+func Test_parseLimitDistribution(t *testing.T) {
+ field := "log.level"
+ fieldSlice := []string{"log", "level"}
+
+ tests := []struct {
+ name string
+ cfg limitDistributionCfg
+ totalLimit int64
+
+ want limitDistributions
+ wantErr bool
+ }{
+ {
+ name: "empty_cfg",
+ cfg: limitDistributionCfg{},
+ want: limitDistributions{},
+ },
+ {
+ name: "valid_cfg",
+ cfg: limitDistributionCfg{
+ Field: field,
+ Ratios: []limitDistributionRatio{
+ {Ratio: 0.5, Values: []string{"error"}},
+ {Ratio: 0.35, Values: []string{"warn", "info"}},
+ {Ratio: 0.15, Values: []string{"debug"}},
+ },
+ },
+ totalLimit: 100,
+ want: limitDistributions{
+ field: fieldSlice,
+ distributions: []complexDistribution{
+ {ratio: 0.5, limit: 50},
+ {ratio: 0.35, limit: 35},
+ {ratio: 0.15, limit: 15},
+ },
+ idxByKey: map[string]int{
+ "error": 0,
+ "warn": 1, "info": 1,
+ "debug": 2,
+ },
+ },
+ },
+ {
+ name: "valid_cfg_with_def",
+ cfg: limitDistributionCfg{
+ Field: field,
+ Ratios: []limitDistributionRatio{
+ {Ratio: 0.5, Values: []string{"error"}},
+ {Ratio: 0.3, Values: []string{"warn", "info"}},
+ {Ratio: 0.16, Values: []string{"debug"}},
+ },
+ },
+ totalLimit: 100,
+ want: limitDistributions{
+ field: fieldSlice,
+ distributions: []complexDistribution{
+ {ratio: 0.5, limit: 50},
+ {ratio: 0.3, limit: 30},
+ {ratio: 0.16, limit: 16},
+ },
+ idxByKey: map[string]int{
+ "error": 0,
+ "warn": 1, "info": 1,
+ "debug": 2,
+ },
+ defDistribution: complexDistribution{
+ ratio: 0.04,
+ limit: 4,
+ },
+ },
+ },
+ {
+ name: "err_empty_ratios",
+ cfg: limitDistributionCfg{
+ Field: field,
+ },
+ wantErr: true,
+ },
+ {
+ name: "err_invalid_ratio1",
+ cfg: limitDistributionCfg{
+ Field: field,
+ Ratios: []limitDistributionRatio{
+ {Ratio: -0.5},
+ },
+ },
+ wantErr: true,
+ },
+ {
+ name: "err_invalid_ratio2",
+ cfg: limitDistributionCfg{
+ Field: field,
+ Ratios: []limitDistributionRatio{
+ {Ratio: 1.4},
+ },
+ },
+ wantErr: true,
+ },
+ {
+ name: "err_empty_ratio_values",
+ cfg: limitDistributionCfg{
+ Field: field,
+ Ratios: []limitDistributionRatio{
+ {Ratio: 0.66},
+ },
+ },
+ wantErr: true,
+ },
+ {
+ name: "err_duplicate_ratio_values",
+ cfg: limitDistributionCfg{
+ Field: field,
+ Ratios: []limitDistributionRatio{
+ {Ratio: 0.66, Values: []string{"error"}},
+ {Ratio: 0.1, Values: []string{"debug", "error"}},
+ },
+ },
+ wantErr: true,
+ },
+ {
+ name: "err_invalid_ratio_sum",
+ cfg: limitDistributionCfg{
+ Field: field,
+ Ratios: []limitDistributionRatio{
+ {Ratio: 0.5, Values: []string{"error"}},
+ {Ratio: 0.51, Values: []string{"warn", "info"}},
+ },
+ },
+ wantErr: true,
+ },
+ }
+ for _, tt := range tests {
+ tt := tt
+ t.Run(tt.name, func(t *testing.T) {
+ t.Parallel()
+
+ ld, err := parseLimitDistribution(tt.cfg, tt.totalLimit)
+
+ require.Equal(t, tt.wantErr, err != nil, "wrong error")
+ if tt.wantErr {
+ return
+ }
+
+ require.Equal(t, tt.want.field, ld.field, "wrong field")
+ require.Equal(t, tt.want.defDistribution, ld.defDistribution, "wrong defPriority")
+ require.Equal(t, true, slices.Equal(tt.want.distributions, ld.distributions), "wrong priorities")
+ require.Equal(t, len(tt.want.idxByKey), len(ld.idxByKey), "wrong idxByKey size")
+ for k, v := range ld.idxByKey {
+ require.Equal(t, tt.want.idxByKey[k], v, fmt.Sprintf("wrong value in idxByKey with key %q", k))
+ }
+ })
+ }
+}
diff --git a/plugin/action/throttle/in_memory_limiter.go b/plugin/action/throttle/in_memory_limiter.go
index dc7fde45..46e5dddc 100644
--- a/plugin/action/throttle/in_memory_limiter.go
+++ b/plugin/action/throttle/in_memory_limiter.go
@@ -10,99 +10,193 @@ import (
)
type inMemoryLimiter struct {
- limit complexLimit // threshold and type of an inMemoryLimiter
- bucketCount int
- buckets []int64
- interval time.Duration // bucket interval
- minID int // minimum bucket id
- maxID int // max bucket id
- mu sync.Mutex
+ limit complexLimit
+ buckets buckets
+ mu sync.Mutex
// nowFn is passed to create limiters and required for test purposes
nowFn func() time.Time
-}
-// NewInMemoryLimiter returns limiter instance.
-func NewInMemoryLimiter(interval time.Duration, bucketCount int, limit complexLimit, nowFn func() time.Time) *inMemoryLimiter {
- return &inMemoryLimiter{
- interval: interval,
- bucketCount: bucketCount,
- limit: limit,
+ // metrics
+ metricLabelsBuf []string
+ limitDistrMetrics *limitDistributionMetrics
+}
- buckets: make([]int64, bucketCount),
+// newInMemoryLimiter returns limiter instance.
+func newInMemoryLimiter(
+ cfg *limiterConfig,
+ limit *complexLimit,
+ limitDistrMetrics *limitDistributionMetrics,
+ nowFn func() time.Time,
+) *inMemoryLimiter {
+ distSize := limit.distributions.size()
+
+ l := &inMemoryLimiter{
+ limit: complexLimit{
+ value: limit.value,
+ kind: limit.kind,
+ },
+ buckets: newBuckets(
+ cfg.bucketsCount,
+ distSize+1, // +1 because of default distribution
+ cfg.bucketInterval,
+ ),
nowFn: nowFn,
+
+ metricLabelsBuf: make([]string, 0, len(limitDistrMetrics.CustomLabels)+1),
+ limitDistrMetrics: limitDistrMetrics,
}
-}
-func (l *inMemoryLimiter) sync() {
+ // need a copy due to possible runtime changes (sync with redis)
+ if distSize > 0 {
+ l.limit.distributions = limit.distributions.copy()
+ }
+ return l
}
+func (l *inMemoryLimiter) sync() {}
+
func (l *inMemoryLimiter) isAllowed(event *pipeline.Event, ts time.Time) bool {
+ limit := atomic.LoadInt64(&l.limit.value)
+
// limit value fast check without races
- if atomic.LoadInt64(&l.limit.value) < 0 {
+ if limit < 0 {
return true
}
- l.mu.Lock()
- defer l.mu.Unlock()
+ l.lock()
+ defer l.unlock()
+
+ // If the limit is given with distribution, then distributed buckets are used
+ distrIdx := 0
+ distrFieldVal := ""
+ if l.limit.distributions.isEnabled() {
+ distrFieldVal = event.Root.Dig(l.limit.distributions.field...).AsString()
+ distrIdx, limit = l.limit.distributions.getLimit(distrFieldVal)
+
+ // The distribution index in the bucket matches the distribution value index in distributions,
+ // but is shifted by 1 because default distribution has index 0.
+ distrIdx++
+ }
id := l.rebuildBuckets(ts)
- index := id - l.minID
+ index := id - l.buckets.getMinID()
switch l.limit.kind {
+ case "", limitKindCount:
+ l.buckets.add(index, distrIdx, 1)
+ case limitKindSize:
+ l.buckets.add(index, distrIdx, int64(event.Size))
default:
logger.Fatalf("unknown type of the inMemoryLimiter: %q", l.limit.kind)
- case "", "count":
- l.buckets[index]++
- case "size":
- l.buckets[index] += int64(event.Size)
}
- return l.buckets[index] <= l.limit.value
+ isAllowed := l.buckets.get(index, distrIdx) <= limit
+ if !isAllowed && l.limit.distributions.isEnabled() {
+ l.metricLabelsBuf = l.metricLabelsBuf[:0]
+
+ l.metricLabelsBuf = append(l.metricLabelsBuf, distrFieldVal)
+ for _, lbl := range l.limitDistrMetrics.CustomLabels {
+ val := "not_set"
+ node := event.Root.Dig(lbl)
+ if node != nil {
+ val = node.AsString()
+ }
+ l.metricLabelsBuf = append(l.metricLabelsBuf, val)
+ }
+
+ switch l.limit.kind {
+ case "", limitKindCount:
+ l.limitDistrMetrics.EventsCount.WithLabelValues(l.metricLabelsBuf...).Inc()
+ case limitKindSize:
+ l.limitDistrMetrics.EventsSize.WithLabelValues(l.metricLabelsBuf...).Add(float64(event.Size))
+ }
+ }
+
+ return isAllowed
}
-// rebuildBuckets will rebuild buckets for given ts and returns actual bucket id
-// Not thread safe - use external lock!
-func (l *inMemoryLimiter) rebuildBuckets(ts time.Time) int {
- currentTs := l.nowFn()
- currentID := l.timeToBucketID(currentTs)
- if l.minID == 0 {
- // min id weren't set yet. It MUST be extracted from currentTs, because ts from event can be invalid (e.g. from 1970 or 2077 year)
- l.maxID = l.timeToBucketID(currentTs)
- l.minID = l.maxID - l.bucketCount + 1
+func (l *inMemoryLimiter) lock() {
+ l.mu.Lock()
+}
+
+func (l *inMemoryLimiter) unlock() {
+ l.mu.Unlock()
+}
+
+func (l *inMemoryLimiter) updateLimit(limit int64) {
+ atomic.StoreInt64(&l.limit.value, limit)
+}
+
+func (l *inMemoryLimiter) updateDistribution(distribution limitDistributionCfg) error {
+ if distribution.isEmpty() && l.limit.distributions.size() == 0 {
+ return nil
}
- maxID := l.minID + len(l.buckets) - 1
-
- // currentBucket exceed maxID. Create actual buckets
- if currentID > maxID {
- n := currentID - maxID
- // add new buckets
- for i := 0; i < n; i++ {
- l.buckets = append(l.buckets, 0)
- }
- // remove old buckets
- l.buckets = l.buckets[n:]
- // update min buckets
- l.minID += n
- l.maxID = currentID
+ ld, err := parseLimitDistribution(distribution, atomic.LoadInt64(&l.limit.value))
+ if err != nil {
+ return err
}
- id := l.timeToBucketID(ts)
- // events from past or future goes to the latest bucket
- if id < l.minID || id > l.maxID {
- id = l.maxID
+ l.lock()
+ defer l.unlock()
+
+ // recreate buckets
+ if l.limit.distributions.size() == 0 && ld.size() > 0 || l.limit.distributions.size() > 0 && ld.size() == 0 {
+ l.buckets = newBuckets(
+ l.buckets.getCount(),
+ ld.size()+1, // +1 because of default distribution
+ l.buckets.getInterval(),
+ )
}
- return id
+
+ l.limit.distributions = ld
+ return nil
+}
+
+func (l *inMemoryLimiter) getBucket(bucketIdx int, buf []int64) []int64 {
+ return l.buckets.getAll(bucketIdx, buf)
+}
+
+// Not thread safe - use lock&unlock methods!
+func (l *inMemoryLimiter) updateBucket(bucketIdx, distrIdx int, value int64) {
+ l.buckets.set(bucketIdx, distrIdx, value)
+}
+
+// Not thread safe - use lock&unlock methods!
+func (l *inMemoryLimiter) resetBucket(bucketIdx int) {
+ l.buckets.reset(bucketIdx)
+}
+
+func (l *inMemoryLimiter) isBucketEmpty(bucketIdx int) bool {
+ return l.buckets.isEmpty(bucketIdx)
}
-// timeToBucketID converts time to bucketID.
-func (l *inMemoryLimiter) timeToBucketID(t time.Time) int {
- return int(t.UnixNano() / l.interval.Nanoseconds())
+// rebuildBuckets will rebuild buckets for given ts and returns actual bucket id
+// Not thread safe - use lock&unlock methods!
+func (l *inMemoryLimiter) rebuildBuckets(ts time.Time) int {
+ return l.buckets.rebuild(l.nowFn(), ts)
+}
+
+// actualizeBucketIdx checks probable shift of buckets and returns actual bucket index and actuality
+func (l *inMemoryLimiter) actualizeBucketIdx(maxID, bucketIdx int) (int, bool) {
+ return l.buckets.actualizeIndex(maxID, bucketIdx)
+}
+
+func (l *inMemoryLimiter) bucketsCount() int {
+ return l.buckets.getCount()
+}
+
+func (l *inMemoryLimiter) bucketsInterval() time.Duration {
+ return l.buckets.getInterval()
+}
+
+func (l *inMemoryLimiter) bucketsMinID() int {
+ return l.buckets.getMinID()
}
func (l *inMemoryLimiter) setNowFn(fn func() time.Time) {
- l.mu.Lock()
+ l.lock()
l.nowFn = fn
- l.mu.Unlock()
+ l.unlock()
}
diff --git a/plugin/action/throttle/limiters_map.go b/plugin/action/throttle/limiters_map.go
index 4d8cfa9e..85da3b92 100644
--- a/plugin/action/throttle/limiters_map.go
+++ b/plugin/action/throttle/limiters_map.go
@@ -52,14 +52,15 @@ func newLimiterWithGen(lim limiter, gen int64) *limiterWithGen {
// limiterConfig configuration for creation of new limiters.
type limiterConfig struct {
- ctx context.Context
- backend string
- redisClient redisClient
- pipeline string
- throttleField string
- bucketInterval time.Duration
- bucketsCount int
- limiterValueField string
+ ctx context.Context
+ backend string
+ redisClient redisClient
+ pipeline string
+ throttleField string
+ bucketInterval time.Duration
+ bucketsCount int
+ limiterValueField string
+ limiterDistributionField string
}
// limitersMapConfig configuration of limiters map.
@@ -71,7 +72,9 @@ type limitersMapConfig struct {
limiterCfg *limiterConfig
- mapSizeMetric prometheus.Gauge
+ // metrics
+ mapSizeMetric prometheus.Gauge
+ limitDistrMetrics *limitDistributionMetrics
}
// limitersMap is auxiliary type for storing the map of strings to limiters with additional info for cleanup
@@ -91,7 +94,9 @@ type limitersMap struct {
limiterCfg *limiterConfig
- mapSizeMetric prometheus.Gauge
+ // metrics
+ mapSizeMetric prometheus.Gauge
+ limitDistrMetrics *limitDistributionMetrics
}
func newLimitersMap(lmCfg limitersMapConfig, redisOpts *redis.Options) *limitersMap {
@@ -108,7 +113,8 @@ func newLimitersMap(lmCfg limitersMapConfig, redisOpts *redis.Options) *limiters
limiterCfg: lmCfg.limiterCfg,
- mapSizeMetric: lmCfg.mapSizeMetric,
+ mapSizeMetric: lmCfg.mapSizeMetric,
+ limitDistrMetrics: lmCfg.limitDistrMetrics,
}
if redisOpts != nil {
lm.limiterCfg.redisClient = redis.NewClient(redisOpts)
@@ -218,25 +224,20 @@ func (l *limitersMap) maintenance(ctx context.Context) {
}
}
-// getNewLimiter creates new limiter based on limiter configuration.
-func (l *limitersMap) getNewLimiter(throttleKey, keyLimitOverride string, rule *rule) limiter {
+// newLimiter creates new limiter based on limiter configuration.
+func (l *limitersMap) newLimiter(throttleKey, keyLimitOverride string, rule *rule) limiter {
switch l.limiterCfg.backend {
case redisBackend:
- return NewRedisLimiter(
- l.limiterCfg.ctx,
- l.limiterCfg.redisClient,
- l.limiterCfg.pipeline,
- l.limiterCfg.throttleField,
- throttleKey,
- l.limiterCfg.bucketInterval,
- l.limiterCfg.bucketsCount,
- rule.limit,
- keyLimitOverride,
- l.limiterCfg.limiterValueField,
+ return newRedisLimiter(
+ l.limiterCfg,
+ throttleKey, keyLimitOverride,
+ &rule.limit,
+ rule.distributionCfg,
+ l.limitDistrMetrics,
l.nowFn,
)
case inMemoryBackend:
- return NewInMemoryLimiter(l.limiterCfg.bucketInterval, l.limiterCfg.bucketsCount, rule.limit, l.nowFn)
+ return newInMemoryLimiter(l.limiterCfg, &rule.limit, l.limitDistrMetrics, l.nowFn)
default:
l.logger.Panicf("unknown limiter backend: %s", l.limiterCfg.backend)
}
@@ -264,16 +265,15 @@ func (l *limitersMap) getOrAdd(throttleKey, keyLimitOverride string, limiterBuf
key := string(limiterBuf)
// we could already write it between `l.mu.RUnlock()` and `l.mu.Lock()`, so we need to check again
l.mu.Lock()
+ defer l.mu.Unlock()
lim, has = l.lims[key]
if has {
lim.gen.Store(l.curGen)
- l.mu.Unlock()
return lim, limiterBuf
}
- newLim := l.getNewLimiter(throttleKey, keyLimitOverride, rule)
+ newLim := l.newLimiter(throttleKey, keyLimitOverride, rule)
lim = newLimiterWithGen(newLim, l.curGen)
l.lims[key] = lim
- l.mu.Unlock()
return lim, limiterBuf
}
diff --git a/plugin/action/throttle/redis_limiter.go b/plugin/action/throttle/redis_limiter.go
index d80e35c5..853d8dfd 100644
--- a/plugin/action/throttle/redis_limiter.go
+++ b/plugin/action/throttle/redis_limiter.go
@@ -2,12 +2,10 @@ package throttle
import (
"bytes"
- "context"
"encoding/json"
"fmt"
"strconv"
"strings"
- "sync/atomic"
"time"
"github.com/ozontech/file.d/logger"
@@ -21,61 +19,63 @@ const (
type redisLimiter struct {
redis redisClient
- // _
- // bucket counter prefix bucket id forms key in redis: _
+ // bucket counter prefix, forms key in redis: __
keyPrefix bytes.Buffer
-
- // _
- // limit key in redis
+ // limit key in redis. If not overridden, has the format _
keyLimit string
// contains values which will be used for incrementing remote bucket counter
// buckets will be flushed after every sync to contain only increment value
incrementLimiter *inMemoryLimiter
-
// contains global values synced from redis
totalLimiter *inMemoryLimiter
// contains indexes of buckets in incrementLimiter for sync
keyIdxsForSync []int
-
// contains bucket IDs for keys in keyIdxsForSync
bucketIdsForSync []int
+ // contains bucket values for keys in keyIdxsForSync
+ bucketValuesForSync [][]int64
+ // contains distribution indexes of distributed bucket for updateLimiterValues
+ distributionIdxsForUpdate []int
+ // contains distribution values of distributed bucket for updateLimiterValues
+ distributionValuesForUpdate []int64
// json field with limit value
valField string
+ // json field with distribution value
+ distributionField string
// limit default value to set if limit key does not exist in redis
defaultVal string
}
-// NewRedisLimiter return instance of redis limiter.
-func NewRedisLimiter(
- ctx context.Context,
- redis redisClient,
- pipelineName, throttleFieldName, throttleFieldValue string,
- bucketInterval time.Duration,
- bucketCount int,
- limit complexLimit,
- keyLimitOverride string,
- valField string,
+// newRedisLimiter return instance of redis limiter.
+func newRedisLimiter(
+ cfg *limiterConfig,
+ throttleFieldValue, keyLimitOverride string,
+ limit *complexLimit,
+ distributionCfg []byte,
+ limitDistrMetrics *limitDistributionMetrics,
nowFn func() time.Time,
) *redisLimiter {
rl := &redisLimiter{
- redis: redis,
- incrementLimiter: NewInMemoryLimiter(bucketInterval, bucketCount, limit, nowFn),
- totalLimiter: NewInMemoryLimiter(bucketInterval, bucketCount, limit, nowFn),
- valField: valField,
+ redis: cfg.redisClient,
+ incrementLimiter: newInMemoryLimiter(cfg, limit, limitDistrMetrics, nowFn),
+ totalLimiter: newInMemoryLimiter(cfg, limit, limitDistrMetrics, nowFn),
+ valField: cfg.limiterValueField,
+ distributionField: cfg.limiterDistributionField,
}
- rl.keyIdxsForSync = make([]int, 0, bucketCount)
- rl.bucketIdsForSync = make([]int, 0, bucketCount)
+ rl.keyIdxsForSync = make([]int, 0, cfg.bucketsCount)
+ rl.bucketIdsForSync = make([]int, 0, cfg.bucketsCount)
+ rl.bucketValuesForSync = make([][]int64, cfg.bucketsCount)
rl.keyPrefix = bytes.Buffer{}
- // full name of keyPrefix will be pipelineName_throttleFieldName_throttleFieldValue_limit. `limit` added afterwards
- rl.keyPrefix.WriteString(pipelineName)
+ // keyPrefix will be pipelineName_throttleFieldName_throttleFieldValue_
+ rl.keyPrefix.WriteString(cfg.pipeline)
rl.keyPrefix.WriteString("_")
- rl.keyPrefix.WriteString(throttleFieldName)
+ rl.keyPrefix.WriteString(cfg.throttleField)
rl.keyPrefix.WriteString("_")
rl.keyPrefix.WriteString(throttleFieldValue)
rl.keyPrefix.WriteString("_")
@@ -84,12 +84,20 @@ func NewRedisLimiter(
} else {
rl.keyLimit = keyLimitOverride
}
- if valField == "" {
+ if rl.valField == "" {
rl.defaultVal = strconv.FormatInt(limit.value, 10)
} else {
// no err check since valField is string
- valKey, _ := json.Marshal(valField)
- rl.defaultVal = fmt.Sprintf("{%s:%v}", valKey, limit.value)
+ valKey, _ := json.Marshal(rl.valField)
+ if limit.distributions.size() > 0 && rl.distributionField != "" {
+ distrKey, _ := json.Marshal(rl.distributionField)
+ rl.defaultVal = fmt.Sprintf("{%s:%v,%s:%s}",
+ valKey, limit.value,
+ distrKey, distributionCfg,
+ )
+ } else {
+ rl.defaultVal = fmt.Sprintf("{%s:%v}", valKey, limit.value)
+ }
}
return rl
@@ -108,8 +116,8 @@ func (l *redisLimiter) isAllowed(event *pipeline.Event, ts time.Time) bool {
func (l *redisLimiter) sync() {
// required to prevent concurrent update of buckets via throttle plugin
- l.incrementLimiter.mu.Lock()
- l.totalLimiter.mu.Lock()
+ l.incrementLimiter.lock()
+ l.totalLimiter.lock()
n := time.Now()
@@ -117,28 +125,30 @@ func (l *redisLimiter) sync() {
maxID := l.incrementLimiter.rebuildBuckets(n)
_ = l.totalLimiter.rebuildBuckets(n)
- minID := l.totalLimiter.minID
- count := l.incrementLimiter.bucketCount
+ minID := l.totalLimiter.bucketsMinID()
+ count := l.incrementLimiter.bucketsCount()
l.keyIdxsForSync = l.keyIdxsForSync[:0]
l.bucketIdsForSync = l.bucketIdsForSync[:0]
for i := 0; i < count; i++ {
+ l.bucketValuesForSync[i] = l.bucketValuesForSync[i][:0]
+
// no new events passed
- if l.incrementLimiter.buckets[i] == 0 {
+ if l.incrementLimiter.isBucketEmpty(i) {
continue
}
l.keyIdxsForSync = append(l.keyIdxsForSync, i)
l.bucketIdsForSync = append(l.bucketIdsForSync, minID+i)
+ l.bucketValuesForSync[i] = l.incrementLimiter.getBucket(i, l.bucketValuesForSync[i])
}
- l.totalLimiter.mu.Unlock()
- l.incrementLimiter.mu.Unlock()
+ l.totalLimiter.unlock()
+ l.incrementLimiter.unlock()
- if len(l.bucketIdsForSync) == 0 {
- return
+ if len(l.bucketIdsForSync) > 0 {
+ l.syncLocalGlobalLimiters(maxID)
}
- l.syncLocalGlobalLimiters(maxID)
if err := l.updateKeyLimit(); err != nil {
logger.Errorf("failed to update key limit: %v", err)
}
@@ -146,77 +156,99 @@ func (l *redisLimiter) sync() {
func (l *redisLimiter) syncLocalGlobalLimiters(maxID int) {
prefix := l.keyPrefix.String()
-
builder := new(strings.Builder)
+ tlBucketsInterval := l.totalLimiter.bucketsInterval()
for i, ID := range l.bucketIdsForSync {
builder.Reset()
builder.WriteString(prefix)
+ builder.WriteString(strconv.Itoa(ID))
- bucketIdx := l.keyIdxsForSync[i]
-
- stringID := strconv.Itoa(ID)
- builder.WriteString(stringID)
+ // _
key := builder.String()
+ bucketIdx := l.keyIdxsForSync[i]
- intCmd := l.redis.IncrBy(key, l.incrementLimiter.buckets[bucketIdx])
- val, err := intCmd.Result()
- if err != nil {
- logger.Errorf("can't watch global limit for %s: %s", key, err.Error())
- continue
+ l.distributionIdxsForUpdate = l.distributionIdxsForUpdate[:0]
+ l.distributionValuesForUpdate = l.distributionValuesForUpdate[:0]
+ for distrIdx := 0; distrIdx < len(l.bucketValuesForSync[bucketIdx]); distrIdx++ {
+ builder.Reset()
+ builder.WriteString(key)
+ builder.WriteString("_")
+ builder.WriteString(strconv.Itoa(distrIdx))
+
+ // __
+ subKey := builder.String()
+
+ intCmd := l.redis.IncrBy(subKey, l.bucketValuesForSync[bucketIdx][distrIdx])
+ val, err := intCmd.Result()
+ if err != nil {
+ logger.Errorf("can't watch global limit for %s: %s", key, err.Error())
+ continue
+ }
+ l.distributionIdxsForUpdate = append(l.distributionIdxsForUpdate, distrIdx)
+ l.distributionValuesForUpdate = append(l.distributionValuesForUpdate, val)
+
+ // for oldest bucket set lifetime equal to 1 bucket duration, for newest equal to ((bucket count + 1) * bucket duration)
+ l.redis.Expire(subKey, tlBucketsInterval+tlBucketsInterval*time.Duration(bucketIdx))
}
- l.updateLimiterValues(maxID, bucketIdx, val)
-
- // for oldest bucket set lifetime equal to 1 bucket duration, for newest equal to ((bucket count + 1) * bucket duration)
- l.redis.Expire(key, l.totalLimiter.interval+l.totalLimiter.interval*time.Duration(bucketIdx))
+ l.updateLimiterValues(maxID, bucketIdx)
}
}
-// updateLimiterValues checks probable shift of buckets and updates buckets values.
-func (l *redisLimiter) updateLimiterValues(maxID, bucketIdx int, totalLimiterVal int64) {
- l.incrementLimiter.mu.Lock()
- if l.incrementLimiter.maxID == maxID {
- l.incrementLimiter.buckets[bucketIdx] = 0
- } else {
- // buckets were rebuild during request to redis
- shift := l.incrementLimiter.maxID - maxID
- currBucketIdx := bucketIdx - shift
-
- // currBucketIdx < 0 means it become too old and must be ignored
- if currBucketIdx > 0 {
- l.incrementLimiter.buckets[currBucketIdx] = 0
+// updateLimiterValues updates buckets values
+func (l *redisLimiter) updateLimiterValues(maxID, bucketIdx int) {
+ updateLim := func(lim *inMemoryLimiter, values []int64) {
+ lim.lock()
+ // isn't actual means it becomes too old and must be ignored
+ if actualBucketIdx, actual := lim.actualizeBucketIdx(maxID, bucketIdx); actual {
+ if values == nil {
+ lim.resetBucket(actualBucketIdx)
+ } else {
+ for i, distrIdx := range l.distributionIdxsForUpdate {
+ lim.updateBucket(actualBucketIdx, distrIdx, values[i])
+ }
+ }
}
+ lim.unlock()
}
- l.incrementLimiter.mu.Unlock()
-
- l.totalLimiter.mu.Lock()
- if l.totalLimiter.maxID == maxID {
- l.totalLimiter.buckets[bucketIdx] = totalLimiterVal
- } else {
- // buckets were rebuild during request to redis
- shift := l.totalLimiter.maxID - maxID
- currBucketIdx := bucketIdx - shift
- // currBucketIdx < 0 means it become too old and must be ignored
- if currBucketIdx > 0 {
- l.totalLimiter.buckets[currBucketIdx] = totalLimiterVal
- }
- }
- l.totalLimiter.mu.Unlock()
+ // reset increment limiter
+ updateLim(l.incrementLimiter, nil)
+ // update total limiter
+ updateLim(l.totalLimiter, l.distributionValuesForUpdate)
}
-func getLimitValFromJson(data []byte, valField string) (int64, error) {
+func decodeKeyLimitValue(data []byte, valField, distrField string) (int64, limitDistributionCfg, error) {
+ var limit int64
+ var distr limitDistributionCfg
+ var err error
var m map[string]json.RawMessage
reader := bytes.NewReader(data)
- if err := json.NewDecoder(reader).Decode(&m); err != nil {
- return 0, fmt.Errorf("failed to unmarshal map: %w", err)
+ if err = json.NewDecoder(reader).Decode(&m); err != nil {
+ return limit, distr, fmt.Errorf("failed to unmarshal map: %w", err)
}
+
limitVal, has := m[valField]
if !has {
- return 0, fmt.Errorf("no %q key in map", valField)
+ return limit, distr, fmt.Errorf("no %q key in map", valField)
+ }
+
+ if limit, err = json.Number(bytes.Trim(limitVal, `"`)).Int64(); err != nil {
+ return limit, distr, err
+ }
+
+ if distrField != "" {
+ distrVal, has := m[distrField]
+ if !has {
+ return limit, distr, nil
+ }
+ if err := json.Unmarshal(distrVal, &distr); err != nil {
+ return limit, distr, err
+ }
}
- return json.Number(bytes.Trim(limitVal, `"`)).Int64()
+
+ return limit, distr, nil
}
// updateKeyLimit reads key limit from redis and updates current limit.
@@ -224,6 +256,7 @@ func (l *redisLimiter) updateKeyLimit() error {
var b bool
var err error
var limitVal int64
+ var distrVal limitDistributionCfg
// try to set global limit to default
if b, err = l.redis.SetNX(l.keyLimit, l.defaultVal, 0).Result(); err != nil {
return fmt.Errorf("failed to set redis value by key %q: %w", l.keyLimit, err)
@@ -237,17 +270,23 @@ func (l *redisLimiter) updateKeyLimit() error {
if jsonData, err = v.Bytes(); err != nil {
return fmt.Errorf("failed to convert redis value to bytes: %w", err)
}
- if limitVal, err = getLimitValFromJson(jsonData, l.valField); err != nil {
- return fmt.Errorf("failed to get limit value from redis json: %w", err)
+ if limitVal, distrVal, err = decodeKeyLimitValue(jsonData, l.valField, l.distributionField); err != nil {
+ return fmt.Errorf("failed to decode redis json value: %w", err)
}
} else {
if limitVal, err = v.Int64(); err != nil {
return fmt.Errorf("failed to convert redis value to int64: %w", err)
}
}
- // atomic store to prevent races with limit value fast check
- atomic.StoreInt64(&l.totalLimiter.limit.value, limitVal)
- atomic.StoreInt64(&l.incrementLimiter.limit.value, limitVal)
+ l.totalLimiter.updateLimit(limitVal)
+ l.incrementLimiter.updateLimit(limitVal)
+
+ if err = l.totalLimiter.updateDistribution(distrVal); err != nil {
+ return fmt.Errorf("failed to update limiter distribution: %w", err)
+ }
+ if err = l.incrementLimiter.updateDistribution(distrVal); err != nil {
+ return fmt.Errorf("failed to update limiter distribution: %w", err)
+ }
return nil
}
diff --git a/plugin/action/throttle/redis_limiter_test.go b/plugin/action/throttle/redis_limiter_test.go
index 0b90f4ee..5dc67387 100644
--- a/plugin/action/throttle/redis_limiter_test.go
+++ b/plugin/action/throttle/redis_limiter_test.go
@@ -2,138 +2,18 @@ package throttle
import (
"context"
+ "fmt"
"strings"
"testing"
"time"
"github.com/alicebob/miniredis/v2"
"github.com/go-redis/redis"
- "github.com/stretchr/testify/assert"
+ "github.com/ozontech/file.d/metric"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)
-func TestUpdateLimiterValuesSameMaxID(t *testing.T) {
- maxID := 10
- bucketCount := 10
- lastBucketIdx := bucketCount - 1
- totalLimitFromRedis := 444444
-
- incLimiter := &inMemoryLimiter{
- maxID: maxID,
- bucketCount: bucketCount,
- buckets: make([]int64, bucketCount),
- }
- incLimiter.buckets[lastBucketIdx] = 1111111
-
- totalLimiter := &inMemoryLimiter{
- maxID: maxID,
- bucketCount: bucketCount,
- buckets: make([]int64, bucketCount),
- }
- redisLim := &redisLimiter{
- incrementLimiter: incLimiter,
- totalLimiter: totalLimiter,
- }
-
- redisLim.updateLimiterValues(maxID, lastBucketIdx, int64(totalLimitFromRedis))
-
- assert.Equal(t, maxID, redisLim.incrementLimiter.maxID, "id must be same")
- assert.Equal(t, maxID, redisLim.totalLimiter.maxID, "id must be same")
- assert.Equal(t, int64(0), redisLim.incrementLimiter.buckets[lastBucketIdx], "limit should be discarded")
- assert.Equal(t, int64(totalLimitFromRedis), redisLim.totalLimiter.buckets[lastBucketIdx], "limit didn't updated")
-}
-
-func TestUpdateLimiterValuesLastIDShifted(t *testing.T) {
- maxID := 10
- bucketCount := 10
- lastBucketIdx := bucketCount - 1
- totalLimitBeforeSync := 1111
- totalLimitFromRedis := 2222
-
- incLimiter := &inMemoryLimiter{
- maxID: maxID,
- bucketCount: bucketCount,
- buckets: make([]int64, bucketCount),
- }
- incLimiter.buckets[lastBucketIdx] = 1111111
-
- totalLimiter := &inMemoryLimiter{
- maxID: maxID,
- bucketCount: bucketCount,
- buckets: make([]int64, bucketCount),
- }
- totalLimiter.buckets[lastBucketIdx] = int64(totalLimitBeforeSync)
- redisLim := &redisLimiter{
- incrementLimiter: incLimiter,
- totalLimiter: totalLimiter,
- }
-
- assert.Equal(t, int64(totalLimitBeforeSync), totalLimiter.buckets[lastBucketIdx])
- // maxID was updated
- updateMaxID := 13
- incLimiter.maxID = updateMaxID
- totalLimiter.maxID = updateMaxID
-
- shift := make([]int64, 3)
-
- posOfLastBucketAfterShifting := bucketCount - (updateMaxID - maxID) - 1
- incrementLimiterBuckets := incLimiter.buckets[(updateMaxID - maxID):]
- incrementLimiterBuckets = append(incrementLimiterBuckets, shift...)
- redisLim.incrementLimiter.buckets = incrementLimiterBuckets
-
- totalLimiterBuckets := totalLimiter.buckets[(updateMaxID - maxID):]
- totalLimiterBuckets = append(totalLimiterBuckets, shift...)
- redisLim.totalLimiter.buckets = totalLimiterBuckets
-
- redisLim.updateLimiterValues(maxID, lastBucketIdx, int64(totalLimitFromRedis))
-
- assert.Equal(t, int64(0), redisLim.incrementLimiter.buckets[posOfLastBucketAfterShifting], "limit should be discarded")
- assert.Equal(t, int64(totalLimitFromRedis), redisLim.totalLimiter.buckets[posOfLastBucketAfterShifting], "limit didn't updated")
-}
-
-func TestUpdateLimiterValuesLastIDOutOfRange(t *testing.T) {
- maxID := 10
- bucketCount := 10
- lastBucketIdx := bucketCount - 1
- totalLimitBeforeSync := 1111
- totalLimitFromRedis := 2222
-
- incLimiter := &inMemoryLimiter{
- maxID: maxID,
- bucketCount: bucketCount,
- buckets: make([]int64, bucketCount),
- }
- incLimiter.buckets[lastBucketIdx] = 1111111
-
- totalLimiter := &inMemoryLimiter{
- maxID: maxID,
- bucketCount: bucketCount,
- buckets: make([]int64, bucketCount),
- }
- totalLimiter.buckets[lastBucketIdx] = int64(totalLimitBeforeSync)
- redisLim := &redisLimiter{
- incrementLimiter: incLimiter,
- totalLimiter: totalLimiter,
- }
-
- assert.Equal(t, int64(totalLimitBeforeSync), totalLimiter.buckets[lastBucketIdx])
- // maxID was shifted out of range
- updateMaxID := 1000
- incLimiter.maxID = updateMaxID
- totalLimiter.maxID = updateMaxID
-
- redisLim.incrementLimiter.buckets = make([]int64, bucketCount)
-
- redisLim.totalLimiter.buckets = make([]int64, bucketCount)
-
- redisLim.updateLimiterValues(maxID, lastBucketIdx, int64(totalLimitFromRedis))
-
- for i := range redisLim.incrementLimiter.buckets {
- assert.Equal(t, int64(0), redisLim.incrementLimiter.buckets[i], "update vals from redis should be ignored")
- assert.Equal(t, int64(0), redisLim.totalLimiter.buckets[i], "update vals from redis should be ignored")
- }
-}
-
func Test_updateKeyLimit(t *testing.T) {
ctx := context.Background()
pipelineName := "test_pipeline"
@@ -141,10 +21,34 @@ func Test_updateKeyLimit(t *testing.T) {
throttleFieldValue1 := "pod1"
throttleFieldValue2 := "pod2"
throttleFieldValue3 := "pod3"
- defaultLimit := complexLimit{
+ defaultLimit := &complexLimit{
value: 1,
- kind: "count",
+ kind: limitKindCount,
}
+ defaultDistribution := limitDistributionCfg{
+ Field: "level",
+ Ratios: []limitDistributionRatio{
+ {
+ Ratio: 0.7,
+ Values: []string{"error"},
+ },
+ {
+ Ratio: 0.3,
+ Values: []string{"warn", "info"},
+ },
+ },
+ Enabled: true,
+ }
+ defaultDistributionJson := defaultDistribution.marshalJson()
+ ld, _ := parseLimitDistribution(defaultDistribution, 10)
+ defaultLimitWithDistribution := &complexLimit{
+ value: 10,
+ kind: limitKindCount,
+ distributions: ld,
+ }
+ pod1LimitKey := strings.Join([]string{
+ pipelineName, throttleFieldName, throttleFieldValue1, keySuffix,
+ }, "_")
pod2LimitKey := strings.Join([]string{
pipelineName, throttleFieldName, throttleFieldValue2, keySuffix,
}, "_")
@@ -160,8 +64,10 @@ func Test_updateKeyLimit(t *testing.T) {
require.NoError(t, s.Set(pod3LimitKey, `{"custom_limit_field":103}`))
require.NoError(t, s.Set("custom_limit_key2", `{"custom_limit_field":104}`))
require.NoError(t, s.Set("custom_field_string_val", `{"custom_limit_field":"105"}`))
- require.NoError(t, s.Set("custom_field_invalid_type", `{"custom_limit_field":{"invalid":"invalid"}}`))
- require.NoError(t, s.Set("custom_field_error", `no_custom_field`))
+ require.NoError(t, s.Set("custom_limit_key3", `{"custom_limit_field":1000,"custom_distr_field":{"field":"new-field","ratios":[{"ratio":0.4,"values":["val1","val2"]},{"ratio":0.5,"values":["val3"]}],"enabled":false}}`))
+ require.NoError(t, s.Set("custom_limit_field_invalid_type", `{"custom_limit_field":{"invalid":"invalid"}}`))
+ require.NoError(t, s.Set("custom_distr_field_invalid_type", `{"custom_limit_field":107,"custom_distr_field":"test"}`))
+ require.NoError(t, s.Set("custom_limit_field_not_exists", `no_custom_field`))
require.NoError(t, s.Set("parse_int_error", `not_int`))
client := redis.NewClient(
@@ -176,7 +82,6 @@ func Test_updateKeyLimit(t *testing.T) {
MaxRetryBackoff: 0,
},
)
-
invalidClient := redis.NewClient(
&redis.Options{
Network: "tcp",
@@ -192,141 +97,324 @@ func Test_updateKeyLimit(t *testing.T) {
type args struct {
client redisClient
+ defaultLimit *complexLimit
throttleFieldValue string
keyLimitOverride string
valField string
+ distrField string
+ }
+ type limitersData struct {
+ limit int64
+ distributions limitDistributions
+ wantSimpleBuckets bool
+ }
+ type redisData struct {
+ key string
+ value string
}
tests := []struct {
- name string
- args args
- wantLimit int64
- wantErr bool
+ name string
+ args args
+
+ wantLimiters *limitersData
+ wantRedis *redisData
+ wantErr bool
}{
{
name: "set_default_limit",
args: args{
client: client,
+ defaultLimit: defaultLimit,
throttleFieldValue: throttleFieldValue1,
keyLimitOverride: "",
valField: "",
},
- wantLimit: 1,
- wantErr: false,
+ wantRedis: &redisData{
+ key: pod1LimitKey,
+ value: "1",
+ },
+ },
+ {
+ name: "set_default_limit_custom_field",
+ args: args{
+ client: client,
+ defaultLimit: defaultLimit,
+ throttleFieldValue: throttleFieldValue1,
+ keyLimitOverride: "default_limit",
+ valField: "custom_limit_field",
+ },
+ wantRedis: &redisData{
+ key: "default_limit",
+ value: `{"custom_limit_field":1}`,
+ },
+ },
+ {
+ name: "set_default_distribution",
+ args: args{
+ client: client,
+ defaultLimit: defaultLimitWithDistribution,
+ throttleFieldValue: throttleFieldValue1,
+ keyLimitOverride: "default_distr1",
+ valField: "custom_limit_field",
+ distrField: "custom_distr_field",
+ },
+ wantRedis: &redisData{
+ key: "default_distr1",
+ value: fmt.Sprintf(`{"custom_limit_field":10,"custom_distr_field":%s}`, defaultDistributionJson),
+ },
+ },
+ {
+ name: "set_default_without_distributions",
+ args: args{
+ client: client,
+ defaultLimit: defaultLimit,
+ throttleFieldValue: throttleFieldValue1,
+ keyLimitOverride: "default_distr2",
+ valField: "custom_limit_field",
+ distrField: "custom_distr_field",
+ },
+ wantRedis: &redisData{
+ key: "default_distr2",
+ value: `{"custom_limit_field":1}`,
+ },
+ },
+ {
+ name: "set_default_without_distr_field",
+ args: args{
+ client: client,
+ defaultLimit: defaultLimitWithDistribution,
+ throttleFieldValue: throttleFieldValue1,
+ keyLimitOverride: "default_distr3",
+ valField: "custom_limit_field",
+ distrField: "",
+ },
+ wantRedis: &redisData{
+ key: "default_distr3",
+ value: `{"custom_limit_field":10}`,
+ },
},
{
name: "get_limit_from_default_key",
args: args{
client: client,
+ defaultLimit: defaultLimit,
throttleFieldValue: throttleFieldValue2,
keyLimitOverride: "",
valField: "",
},
- wantLimit: 101,
- wantErr: false,
+ wantLimiters: &limitersData{
+ limit: 101,
+ wantSimpleBuckets: true,
+ },
},
{
name: "get_limit_from_custom_key",
args: args{
client: client,
+ defaultLimit: defaultLimit,
throttleFieldValue: throttleFieldValue1,
keyLimitOverride: "custom_limit_key",
valField: "",
},
- wantLimit: 102,
- wantErr: false,
+ wantLimiters: &limitersData{
+ limit: 102,
+ wantSimpleBuckets: true,
+ },
},
{
name: "get_limit_from_default_key_custom_field",
args: args{
client: client,
+ defaultLimit: defaultLimit,
throttleFieldValue: throttleFieldValue3,
keyLimitOverride: "",
valField: "custom_limit_field",
},
- wantLimit: 103,
- wantErr: false,
+ wantLimiters: &limitersData{
+ limit: 103,
+ wantSimpleBuckets: true,
+ },
},
{
name: "get_limit_from_custom_key_custom_field",
args: args{
client: client,
+ defaultLimit: defaultLimit,
throttleFieldValue: throttleFieldValue1,
keyLimitOverride: "custom_limit_key2",
valField: "custom_limit_field",
},
- wantLimit: 104,
- wantErr: false,
+ wantLimiters: &limitersData{
+ limit: 104,
+ wantSimpleBuckets: true,
+ },
},
{
name: "get_limit_from_custom_key_custom_field_string_value",
args: args{
client: client,
+ defaultLimit: defaultLimit,
throttleFieldValue: throttleFieldValue1,
keyLimitOverride: "custom_field_string_val",
valField: "custom_limit_field",
},
- wantLimit: 105,
- wantErr: false,
+ wantLimiters: &limitersData{
+ limit: 105,
+ wantSimpleBuckets: true,
+ },
},
{
- name: "get_limit_from_custom_field_invalid_val_type",
+ name: "get_limit_and_distribution",
args: args{
client: client,
+ defaultLimit: defaultLimitWithDistribution,
throttleFieldValue: throttleFieldValue1,
- keyLimitOverride: "custom_field_invalid_type",
+ keyLimitOverride: "custom_limit_key3",
valField: "custom_limit_field",
+ distrField: "custom_distr_field",
+ },
+ wantLimiters: &limitersData{
+ limit: 1000,
+ distributions: limitDistributions{
+ field: []string{"new-field"},
+ idxByKey: map[string]int{
+ "val1": 0,
+ "val2": 0,
+ "val3": 1,
+ },
+ distributions: []complexDistribution{
+ {ratio: 0.4, limit: 400},
+ {ratio: 0.5, limit: 500},
+ },
+ defDistribution: complexDistribution{ratio: 0.1, limit: 100},
+ enabled: false,
+ },
+ wantSimpleBuckets: false,
},
- wantLimit: 0,
- wantErr: true,
},
{
- name: "get_limit_from_custom_field_error",
+ name: "recreate_buckets_simple_to_distributed",
args: args{
client: client,
+ defaultLimit: defaultLimit,
throttleFieldValue: throttleFieldValue1,
- keyLimitOverride: "custom_field_error",
+ keyLimitOverride: "custom_limit_key3",
+ valField: "custom_limit_field",
+ distrField: "custom_distr_field",
+ },
+ wantLimiters: &limitersData{
+ limit: 1000,
+ distributions: limitDistributions{
+ field: []string{"new-field"},
+ idxByKey: map[string]int{
+ "val1": 0,
+ "val2": 0,
+ "val3": 1,
+ },
+ distributions: []complexDistribution{
+ {ratio: 0.4, limit: 400},
+ {ratio: 0.5, limit: 500},
+ },
+ defDistribution: complexDistribution{ratio: 0.1, limit: 100},
+ enabled: false,
+ },
+ wantSimpleBuckets: false,
+ },
+ },
+ {
+ name: "recreate_buckets_distributed_to_simple",
+ args: args{
+ client: client,
+ defaultLimit: defaultLimitWithDistribution,
+ throttleFieldValue: throttleFieldValue1,
+ keyLimitOverride: "custom_limit_key2",
valField: "custom_limit_field",
+ distrField: "custom_distr_field",
+ },
+ wantLimiters: &limitersData{
+ limit: 104,
+ wantSimpleBuckets: true,
},
- wantLimit: 0,
- wantErr: true,
+ },
+ {
+ name: "get_limit_from_custom_limit_field_invalid_type",
+ args: args{
+ client: client,
+ defaultLimit: defaultLimit,
+ throttleFieldValue: throttleFieldValue1,
+ keyLimitOverride: "custom_limit_field_invalid_type",
+ valField: "custom_limit_field",
+ },
+ wantErr: true,
+ },
+ {
+ name: "get_distribution_from_custom_distr_field_invalid_type",
+ args: args{
+ client: client,
+ defaultLimit: defaultLimit,
+ throttleFieldValue: throttleFieldValue1,
+ keyLimitOverride: "custom_distr_field_invalid_type",
+ valField: "custom_distr_field",
+ },
+ wantErr: true,
+ },
+ {
+ name: "get_limit_from_custom_limit_field_not_exists",
+ args: args{
+ client: client,
+ defaultLimit: defaultLimit,
+ throttleFieldValue: throttleFieldValue1,
+ keyLimitOverride: "custom_limit_field_not_exists",
+ valField: "custom_limit_field",
+ },
+ wantErr: true,
},
{
name: "parse_int_error",
args: args{
client: client,
+ defaultLimit: defaultLimit,
throttleFieldValue: throttleFieldValue1,
keyLimitOverride: "parse_int_error",
},
- wantLimit: 0,
- wantErr: true,
+ wantErr: true,
},
{
name: "set_limit_error",
args: args{
client: invalidClient,
+ defaultLimit: defaultLimit,
throttleFieldValue: throttleFieldValue1,
keyLimitOverride: "custom_field_error",
valField: "custom_limit_field",
},
- wantLimit: 0,
- wantErr: true,
+ wantErr: true,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
- lim := NewRedisLimiter(
- ctx,
- tt.args.client,
- pipelineName,
- throttleFieldName,
+ ctl := metric.NewCtl("test", prometheus.NewRegistry())
+ lim := newRedisLimiter(
+ &limiterConfig{
+ ctx: ctx,
+ redisClient: tt.args.client,
+ pipeline: pipelineName,
+ throttleField: throttleFieldName,
+ bucketInterval: time.Second,
+ bucketsCount: 1,
+ limiterValueField: tt.args.valField,
+ limiterDistributionField: tt.args.distrField,
+ },
tt.args.throttleFieldValue,
- time.Second,
- 1,
- defaultLimit,
tt.args.keyLimitOverride,
- tt.args.valField,
+ tt.args.defaultLimit,
+ defaultDistributionJson,
+ &limitDistributionMetrics{
+ EventsCount: metric.NewHeldCounterVec(ctl.RegisterCounterVec("test_count", "")),
+ EventsSize: metric.NewHeldCounterVec(ctl.RegisterCounterVec("test_size", "")),
+ },
time.Now,
)
err := lim.updateKeyLimit()
@@ -334,12 +422,27 @@ func Test_updateKeyLimit(t *testing.T) {
if err != nil {
errMsg = err.Error()
}
+
require.Equal(t, tt.wantErr, err != nil, errMsg)
if tt.wantErr {
return
}
- require.Equal(t, tt.wantLimit, lim.incrementLimiter.limit.value)
- require.Equal(t, tt.wantLimit, lim.totalLimiter.limit.value)
+
+ if tt.wantLimiters != nil {
+ require.Equal(t, tt.wantLimiters.limit, lim.incrementLimiter.limit.value)
+ require.Equal(t, tt.wantLimiters.limit, lim.totalLimiter.limit.value)
+
+ require.Equal(t, tt.wantLimiters.distributions, lim.incrementLimiter.limit.distributions)
+ require.Equal(t, tt.wantLimiters.distributions, lim.totalLimiter.limit.distributions)
+
+ require.Equal(t, tt.wantLimiters.wantSimpleBuckets, lim.incrementLimiter.buckets.isSimple())
+ require.Equal(t, tt.wantLimiters.wantSimpleBuckets, lim.totalLimiter.buckets.isSimple())
+ }
+ if tt.wantRedis != nil {
+ val, err := s.Get(tt.wantRedis.key)
+ require.NoError(t, err)
+ require.Equal(t, tt.wantRedis.value, val)
+ }
})
}
t.Cleanup(func() {
@@ -347,25 +450,27 @@ func Test_updateKeyLimit(t *testing.T) {
})
}
-func Test_getLimitValFromJson(t *testing.T) {
+func Test_decodeKeyLimitValue(t *testing.T) {
type args struct {
- data []byte
- valField string
+ data []byte
+ valField string
+ distrField string
}
tests := []struct {
- name string
- args args
- want int64
- wantErr bool
+ name string
+ args args
+
+ wantLimit int64
+ wantDistr limitDistributionCfg
+ wantErr bool
}{
{
- name: "ok",
+ name: "ok_only_limit",
args: args{
data: []byte(`{"limit_key":"3000"}`),
valField: "limit_key",
},
- want: 3000,
- wantErr: false,
+ wantLimit: 3000,
},
{
name: "ok_with_object",
@@ -373,34 +478,65 @@ func Test_getLimitValFromJson(t *testing.T) {
data: []byte(`{"limit_key":"3000","some_obj":{"field":"key"}}`),
valField: "limit_key",
},
- want: 3000,
- wantErr: false,
+ wantLimit: 3000,
},
{
- name: "unmarshal_error",
+ name: "ok_limit_and_distribution",
+ args: args{
+ data: []byte(`{"limit_key":"3000","distr_key":{"field":"my-field","ratios":[{"ratio":0.4,"values":["val1","val2"]},{"ratio":0.6,"values":["val3"]}],"enabled":true}}`),
+ valField: "limit_key",
+ distrField: "distr_key",
+ },
+ wantLimit: 3000,
+ wantDistr: limitDistributionCfg{
+ Field: "my-field",
+ Ratios: []limitDistributionRatio{
+ {Ratio: 0.4, Values: []string{"val1", "val2"}},
+ {Ratio: 0.6, Values: []string{"val3"}},
+ },
+ Enabled: true,
+ },
+ },
+ {
+ name: "decode_error",
args: args{
data: []byte(`"3000"`),
valField: "limit_key",
},
- want: 0,
wantErr: true,
},
{
- name: "key_error",
+ name: "limit_key_not_exists",
args: args{
data: []byte(`{"not_limit_key":"3000"}`),
valField: "limit_key",
},
- want: 0,
wantErr: true,
},
{
- name: "parse_int64_error",
+ name: "limit_format_error",
args: args{
data: []byte(`{"limit_key":"not_int"}`),
valField: "limit_key",
},
- want: 0,
+ wantErr: true,
+ },
+ {
+ name: "distribution_key_not_exists",
+ args: args{
+ data: []byte(`{"limit_key":"3000","not_distr_key":"test"}`),
+ valField: "limit_key",
+ distrField: "distr_key",
+ },
+ wantLimit: 3000,
+ },
+ {
+ name: "distribution_format_error",
+ args: args{
+ data: []byte(`{"limit_key":"3000","distr_key":"test"}`),
+ valField: "limit_key",
+ distrField: "distr_key",
+ },
wantErr: true,
},
}
@@ -408,9 +544,15 @@ func Test_getLimitValFromJson(t *testing.T) {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
- got, err := getLimitValFromJson(tt.args.data, tt.args.valField)
+ gotLimit, gotDistr, err := decodeKeyLimitValue(tt.args.data, tt.args.valField, tt.args.distrField)
+
require.Equal(t, tt.wantErr, err != nil)
- require.Equal(t, tt.want, got)
+ if tt.wantErr {
+ return
+ }
+
+ require.Equal(t, tt.wantLimit, gotLimit)
+ require.Equal(t, tt.wantDistr, gotDistr)
})
}
}
diff --git a/plugin/action/throttle/rule.go b/plugin/action/throttle/rule.go
index d98d668c..ce502c1e 100644
--- a/plugin/action/throttle/rule.go
+++ b/plugin/action/throttle/rule.go
@@ -7,19 +7,21 @@ import (
)
type complexLimit struct {
- value int64
- kind string
+ value int64
+ kind string
+ distributions limitDistributions
}
type rule struct {
- fields []string // sorted list of used keys is used for combining limiter key.
- values []string // values to check against. order is the same as for keys.
- limit complexLimit
- byteIdxPart []byte
+ fields []string // sorted list of used keys is used for combining limiter key.
+ values []string // values to check against. order is the same as for keys.
+ limit complexLimit
+ distributionCfg []byte // json-encoded limit distribution cfg
+ byteIdxPart []byte
}
-// NewRule returns new Limit instance.
-func NewRule(conditions map[string]string, limit complexLimit, ruleNum int) *rule {
+// newRule returns new rule instance.
+func newRule(conditions map[string]string, limit complexLimit, distributionCfg []byte, ruleNum int) *rule { // nolint: gocritic // hugeParam is ok here
var (
keys = make([]string, 0, len(conditions))
values = make([]string, len(conditions))
@@ -37,10 +39,11 @@ func NewRule(conditions map[string]string, limit complexLimit, ruleNum int) *rul
byteIdxPart := []byte{byte('a' + ruleNum), ':'}
return &rule{
- fields: keys,
- values: values,
- limit: limit,
- byteIdxPart: byteIdxPart,
+ fields: keys,
+ values: values,
+ limit: limit,
+ byteIdxPart: byteIdxPart,
+ distributionCfg: distributionCfg,
}
}
diff --git a/plugin/action/throttle/throttle.go b/plugin/action/throttle/throttle.go
index 381eadc8..5b475583 100644
--- a/plugin/action/throttle/throttle.go
+++ b/plugin/action/throttle/throttle.go
@@ -16,15 +16,18 @@ import (
"github.com/ozontech/file.d/pipeline"
)
-var (
- defaultThrottleKey = "default"
+const defaultThrottleKey = "default"
+var (
// limiters should be shared across pipeline, so let's have a map by namespace and limiter name
limiters = map[string]*limitersMap{}
limitersMu = &sync.RWMutex{}
)
const (
+ limitKindCount = "count"
+ limitKindSize = "size"
+
redisBackend = "redis"
inMemoryBackend = "memory"
)
@@ -47,6 +50,7 @@ type Plugin struct {
// plugin metrics
limitersMapSizeMetric prometheus.Gauge
+ limitDistrMetrics *limitDistributionMetrics
}
// ! config-params
@@ -109,10 +113,11 @@ type Config struct {
// > @3@4@5@6
// >
// > Rules to override the `default_limit` for different group of event. It's a list of objects.
- // > Each object has the `limit` and `conditions` fields.
+ // > Each object has the `limit`, `limit_kind` and `conditions` fields as well as an optional `limit_distribution` field.
// > * `limit` – the value which will override the `default_limit`, if `conditions` are met.
- // > * `limit_kind` – the type of a limit: `count` - number of messages, `size` - total size from all messages
+ // > * `limit_kind` – the type of limit: `count` - number of messages, `size` - total size from all messages
// > * `conditions` – the map of `event field name => event field value`. The conditions are checked using `AND` operator.
+ // > * `limit_distribution` – see `LimitDistributionConfig` for details.
Rules []RuleConfig `json:"rules" default:"" slice:"true"` // *
// > @3@4@5@6
@@ -120,6 +125,40 @@ type Config struct {
// > Time interval after which unused limiters are removed.
LimiterExpiration cfg.Duration `json:"limiter_expiration" parse:"duration" default:"30m"` // *
LimiterExpiration_ time.Duration
+
+ // > @3@4@5@6
+ // >
+ // > It allows to distribute the `default_limit` between events by condition.
+ // >
+ // > `LimitDistributionConfig` params:
+ // > * `field` - the event field on which the distribution will be based.
+ // > * `ratios` - the list of objects. Each object has:
+ // > * `ratio` - distribution ratio, value must be in range [0.0;1.0].
+ // > * `values` - the list of strings which contains all `field` values that fall into this distribution.
+ // > * `metric_labels` - list of metric labels.
+ // >
+ // >> Notes:
+ // >> 1. Sum of ratios must be in range [0.0;1.0].
+ // >> 2. If sum of ratios less than 1, then adding **default distribution** with ratio **1-sum**,
+ // >> otherwise **default distribution** isn't used.
+ // >> All events for which the value in the `field` doesn't fall into any of the distributions:
+ // >> * fall into default distribution, if it exists
+ // >> * throttled, otherwise
+ // >
+ // > `LimitDistributionConfig` example:
+ // > ```yaml
+ // > field: log.level
+ // > ratios:
+ // > - ratio: 0.5
+ // > values: ['error']
+ // > - ratio: 0.3
+ // > values: ['warn', 'info']
+ // > ```
+ // > For this config and the `default_limit=100`:
+ // > * events with `log.level=error` will have a limit of 50
+ // > * events with `log.level=warn` or `log.level=info` will have a limit of 30
+ // > * all other events will have a limit of 20
+ LimitDistribution LimitDistributionConfig `json:"limit_distribution" child:"true"` // *
}
type RedisBackendConfig struct {
@@ -180,12 +219,60 @@ type RedisBackendConfig struct {
// > (e.g. if set to "limit", values must be of kind `{"limit":"",...}`).
// > If not set limiter values are considered as non-json data.
LimiterValueField string `json:"limiter_value_field" default:""` // *
+
+ // > @3@4@5@6
+ // >
+ // > Defines field with limit distribution inside json object stored in value
+ // > (e.g. if set to "distribution", value must be of kind `{"distribution":{},...}`).
+ // > Distribution object example:
+ // > ```json
+ // > {
+ // > "field": "log.level",
+ // > "ratios": [
+ // > {
+ // > "ratio": 0.5,
+ // > "values": ["error"]
+ // > },
+ // > {
+ // > "ratio": 0.3,
+ // > "values": ["warn", "info"]
+ // > }
+ // > ],
+ // > "enabled": true
+ // > }
+ // > ```
+ // >> If `limiter_value_field` and `limiter_distribution_field` not set, distribution will not be stored.
+ LimiterDistributionField string `json:"limiter_distribution_field" default:""` // *
}
type RuleConfig struct {
- Limit int64 `json:"limit"`
- LimitKind string `json:"limit_kind" default:"count" options:"count|size"`
- Conditions map[string]string `json:"conditions"`
+ Limit int64 `json:"limit"`
+ LimitKind string `json:"limit_kind" default:"count" options:"count|size"`
+ Conditions map[string]string `json:"conditions"`
+ LimitDistribution LimitDistributionConfig `json:"limit_distribution" child:"true"`
+}
+
+type ComplexRatio struct {
+ Ratio float64 `json:"ratio" required:"true"`
+ Values []string `json:"values" required:"true"`
+}
+
+type LimitDistributionConfig struct {
+ Field cfg.FieldSelector `json:"field" default:""`
+ Ratios []ComplexRatio `json:"ratios" slice:"true"`
+ MetricLabels []string `json:"metric_labels" slice:"true"`
+}
+
+func (c LimitDistributionConfig) toInternal() limitDistributionCfg {
+ internal := limitDistributionCfg{
+ Field: string(c.Field),
+ Ratios: make([]limitDistributionRatio, 0, len(c.Ratios)),
+ Enabled: true,
+ }
+ for _, v := range c.Ratios {
+ internal.Ratios = append(internal.Ratios, limitDistributionRatio(v))
+ }
+ return internal
}
func init() {
@@ -201,9 +288,16 @@ func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) {
func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) {
p.config = config.(*Config)
- p.registerMetrics(params.MetricCtl)
-
p.logger = params.Logger
+
+ distrCfg := p.config.LimitDistribution.toInternal()
+ ld, err := parseLimitDistribution(distrCfg, p.config.DefaultLimit)
+ if err != nil {
+ p.logger.Fatal("can't parse limit_distribution", zap.Error(err))
+ }
+
+ p.registerMetrics(params.MetricCtl, p.config.LimitDistribution.MetricLabels)
+
p.pipeline = params.PipelineName
ctx, cancel := context.WithCancel(context.Background())
p.ctx = ctx
@@ -241,15 +335,17 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginP
isStrict: params.PipelineSettings.IsStrict,
logger: p.logger,
limiterCfg: &limiterConfig{
- ctx: p.ctx,
- backend: p.config.LimiterBackend,
- pipeline: p.pipeline,
- throttleField: string(p.config.ThrottleField),
- bucketInterval: p.config.BucketInterval_,
- bucketsCount: p.config.BucketsCount,
- limiterValueField: p.config.RedisBackendCfg.LimiterValueField,
+ ctx: p.ctx,
+ backend: p.config.LimiterBackend,
+ pipeline: p.pipeline,
+ throttleField: string(p.config.ThrottleField),
+ bucketInterval: p.config.BucketInterval_,
+ bucketsCount: p.config.BucketsCount,
+ limiterValueField: p.config.RedisBackendCfg.LimiterValueField,
+ limiterDistributionField: p.config.RedisBackendCfg.LimiterDistributionField,
},
- mapSizeMetric: p.limitersMapSizeMetric,
+ mapSizeMetric: p.limitersMapSizeMetric,
+ limitDistrMetrics: p.limitDistrMetrics,
}
limiters[p.pipeline] = newLimitersMap(lmCfg, redisOpts)
if p.config.LimiterBackend == redisBackend {
@@ -265,17 +361,49 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginP
limitersMu.Unlock()
for i, r := range p.config.Rules {
- p.rules = append(p.rules, NewRule(r.Conditions, complexLimit{r.Limit, r.LimitKind}, i))
+ ruleDistrCfg := r.LimitDistribution.toInternal()
+ ldRule, err := parseLimitDistribution(ruleDistrCfg, r.Limit)
+ if err != nil {
+ p.logger.Fatal("can't parse rule's limit_distribution", zap.Error(err), zap.Int("rule", i))
+ }
+
+ p.rules = append(p.rules, newRule(
+ r.Conditions,
+ complexLimit{r.Limit, r.LimitKind, ldRule},
+ ruleDistrCfg.marshalJson(),
+ i,
+ ))
}
- p.rules = append(p.rules, NewRule(map[string]string{}, complexLimit{p.config.DefaultLimit, p.config.LimitKind}, len(p.config.Rules)))
+ p.rules = append(p.rules, newRule(
+ map[string]string{},
+ complexLimit{p.config.DefaultLimit, p.config.LimitKind, ld},
+ distrCfg.marshalJson(),
+ len(p.config.Rules),
+ ))
}
-func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
+func (p *Plugin) registerMetrics(ctl *metric.Ctl, limitDistrMetricsLabels []string) {
p.limitersMapSizeMetric = ctl.RegisterGauge(
"throttle_limiter_map_size",
"Size of internal map of throttle limiters",
)
+
+ labels := []string{"distribution_value"}
+ labels = append(labels, limitDistrMetricsLabels...)
+ p.limitDistrMetrics = &limitDistributionMetrics{
+ CustomLabels: limitDistrMetricsLabels,
+ EventsCount: metric.NewHeldCounterVec(ctl.RegisterCounterVec(
+ "throttle_distributed_events_count_total",
+ "total count of events that have been throttled using limit distribution",
+ labels...,
+ )),
+ EventsSize: metric.NewHeldCounterVec(ctl.RegisterCounterVec(
+ "throttle_distributed_events_size_total",
+ "total size of events that have been throttled using limit distribution",
+ labels...,
+ )),
+ }
}
// Stop ends plugin activity.
@@ -318,12 +446,12 @@ func (p *Plugin) isAllowed(event *pipeline.Event) bool {
keyLimitOverride = strings.Clone(event.Root.Dig(p.config.RedisBackendCfg.LimiterKeyField_...).AsString())
}
- for _, rule := range p.rules {
- if !rule.isMatch(event) {
+ for _, r := range p.rules {
+ if !r.isMatch(event) {
continue
}
var lim limiter
- lim, p.limiterBuf = p.limitersMap.getOrAdd(throttleKey, keyLimitOverride, p.limiterBuf, rule)
+ lim, p.limiterBuf = p.limitersMap.getOrAdd(throttleKey, keyLimitOverride, p.limiterBuf, r)
return lim.isAllowed(event, ts)
}
diff --git a/plugin/action/throttle/throttle_test.go b/plugin/action/throttle/throttle_test.go
index 18b123cd..69f6d439 100644
--- a/plugin/action/throttle/throttle_test.go
+++ b/plugin/action/throttle/throttle_test.go
@@ -3,6 +3,7 @@ package throttle
import (
"fmt"
"math/rand"
+ "strings"
"sync"
"testing"
"time"
@@ -170,15 +171,15 @@ func TestSizeThrottle(t *testing.T) {
config := &Config{
Rules: []RuleConfig{
- {Limit: int64(limitA), LimitKind: "size", Conditions: map[string]string{"k8s_ns": "ns_1"}},
- {Limit: int64(limitB), LimitKind: "size", Conditions: map[string]string{"k8s_ns": "ns_2"}},
+ {Limit: int64(limitA), LimitKind: limitKindSize, Conditions: map[string]string{"k8s_ns": "ns_1"}},
+ {Limit: int64(limitB), LimitKind: limitKindSize, Conditions: map[string]string{"k8s_ns": "ns_2"}},
},
BucketsCount: buckets,
BucketInterval: "100ms",
ThrottleField: "k8s_pod",
TimeField: "",
DefaultLimit: int64(defaultLimit),
- LimitKind: "size",
+ LimitKind: limitKindSize,
}
test.NewConfig(config, nil)
@@ -205,7 +206,7 @@ func TestMixedThrottle(t *testing.T) {
config := &Config{
Rules: []RuleConfig{
{Limit: int64(limitA), Conditions: map[string]string{"k8s_ns": "ns_1"}},
- {Limit: int64(limitB), LimitKind: "size", Conditions: map[string]string{"k8s_ns": "ns_2"}},
+ {Limit: int64(limitB), LimitKind: limitKindSize, Conditions: map[string]string{"k8s_ns": "ns_2"}},
},
BucketsCount: buckets,
BucketInterval: "100ms",
@@ -235,7 +236,7 @@ func TestRedisThrottle(t *testing.T) {
config := &Config{
Rules: []RuleConfig{
- {Limit: int64(defaultLimit), LimitKind: "count"},
+ {Limit: int64(defaultLimit), LimitKind: limitKindCount},
},
BucketsCount: 1,
BucketInterval: "2s",
@@ -295,7 +296,7 @@ func TestRedisThrottleMultiPipes(t *testing.T) {
config := &Config{
Rules: []RuleConfig{
- {Limit: int64(defaultLimit), LimitKind: "count"},
+ {Limit: int64(defaultLimit), LimitKind: limitKindCount},
},
BucketsCount: 1,
BucketInterval: "2m",
@@ -390,7 +391,7 @@ func TestRedisThrottleWithCustomLimitData(t *testing.T) {
eventsTotal := 3
config := &Config{
Rules: []RuleConfig{
- {Limit: int64(defaultLimit), LimitKind: "count"},
+ {Limit: int64(defaultLimit), LimitKind: limitKindCount},
},
BucketsCount: 1,
BucketInterval: "2s",
@@ -453,7 +454,7 @@ func TestThrottleLimiterExpiration(t *testing.T) {
eventsTotal := 3
config := &Config{
Rules: []RuleConfig{
- {Limit: int64(defaultLimit), LimitKind: "count"},
+ {Limit: int64(defaultLimit), LimitKind: limitKindCount},
},
BucketsCount: 1,
BucketInterval: "100ms",
@@ -548,3 +549,95 @@ func TestThrottleRedisFallbackToInMemory(t *testing.T) {
throttleMapsCleanup()
})
}
+
+func TestThrottleWithDistribution(t *testing.T) {
+ defaultLimit := 12
+ config := &Config{
+ ThrottleField: "k8s_pod",
+ DefaultLimit: int64(defaultLimit),
+ BucketsCount: 1,
+ BucketInterval: "1s",
+ LimitDistribution: LimitDistributionConfig{
+ Field: "level",
+ Ratios: []ComplexRatio{
+ {Ratio: 0.5, Values: []string{"error"}},
+ {Ratio: 0.3, Values: []string{"warn", "info"}},
+ },
+ },
+ }
+ test.NewConfig(config, nil)
+
+ p, input, output := test.NewPipelineMock(
+ test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false),
+ "name",
+ )
+
+ wg := &sync.WaitGroup{}
+ wg.Add(defaultLimit)
+
+ outEvents := map[string]int{}
+ output.SetOutFn(func(e *pipeline.Event) {
+ level := strings.Clone(e.Root.Dig("level").AsString())
+ outEvents[level]++
+ wg.Done()
+ })
+ wantOutEvents := map[string]int{
+ "error": 6,
+ "info": 3,
+ "warn": 1,
+ "debug": 1,
+ "": 1,
+ }
+
+ events := []string{
+ `{"time":"%s","k8s_pod":"pod_1","level":"error"}`,
+ `{"time":"%s","k8s_pod":"pod_1","level":"info"}`,
+ `{"time":"%s","k8s_pod":"pod_1","level":"error"}`,
+ `{"time":"%s","k8s_pod":"pod_1","level":"warn"}`,
+ `{"time":"%s","k8s_pod":"pod_1","level":"error"}`,
+ `{"time":"%s","k8s_pod":"pod_1","level":"error"}`,
+ `{"time":"%s","k8s_pod":"pod_1","level":""}`,
+ `{"time":"%s","k8s_pod":"pod_1","level":"info"}`,
+ `{"time":"%s","k8s_pod":"pod_1","level":"error"}`,
+ `{"time":"%s","k8s_pod":"pod_1","level":"info"}`,
+ `{"time":"%s","k8s_pod":"pod_1","level":"debug"}`,
+ `{"time":"%s","k8s_pod":"pod_1","level":"warn"}`,
+ `{"time":"%s","k8s_pod":"pod_1","level":"error"}`,
+ `{"time":"%s","k8s_pod":"pod_1","level":"debug"}`,
+ `{"time":"%s","k8s_pod":"pod_1","level":"error"}`,
+ }
+
+ nowTs := time.Now().Format(time.RFC3339Nano)
+ for i := 0; i < len(events); i++ {
+ json := fmt.Sprintf(events[i], nowTs)
+ input.In(0, "test", 0, []byte(json))
+ }
+
+ wgWaitWithTimeout := func(wg *sync.WaitGroup, timeout time.Duration) bool {
+ c := make(chan struct{})
+ go func() {
+ defer close(c)
+ wg.Wait()
+ }()
+ select {
+ case <-c:
+ return false
+ case <-time.After(timeout):
+ return true
+ }
+ }
+
+ timeout := wgWaitWithTimeout(wg, 5*time.Second)
+ p.Stop()
+
+ require.False(t, timeout, "timeout expired")
+
+ require.Equal(t, len(wantOutEvents), len(outEvents), "wrong outEvents size")
+ for k, v := range outEvents {
+ require.Equal(t, wantOutEvents[k], v, fmt.Sprintf("wrong value in outEvents with key %q", k))
+ }
+
+ t.Cleanup(func() {
+ throttleMapsCleanup()
+ })
+}