Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add throttle limit distribution #587

Merged
merged 28 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ linters-settings:
govet:
enable:
- composites
disable:
- printf
dupl:
threshold: 120
goconst:
Expand Down
65 changes: 63 additions & 2 deletions plugin/action/throttle/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,11 @@ Time interval to check event throughput.
**`rules`** *`[]RuleConfig`*

Rules to override the `default_limit` for different group of event. It's a list of objects.
Each object has the `limit` and `conditions` fields.
Each object has the `limit`, `limit_kind` and `conditions` fields as well as an optional `limit_distribution` field.
* `limit` – the value which will override the `default_limit`, if `conditions` are met.
* `limit_kind` – the type of a limit: `count` - number of messages, `size` - total size from all messages
* `limit_kind` – the type of limit: `count` - number of messages, `size` - total size from all messages
* `conditions` – the map of `event field name => event field value`. The conditions are checked using `AND` operator.
* `limit_distribution` – see `LimitDistributionConfig` for details.

<br>

Expand All @@ -78,6 +79,41 @@ Time interval after which unused limiters are removed.

<br>

**`limit_distribution`** *`LimitDistributionConfig`*

It allows to distribute the `default_limit` between events by condition.

`LimitDistributionConfig` params:
* `field` - the event field on which the distribution will be based.
* `ratios` - the list of objects. Each object has:
* `ratio` - distribution ratio, value must be in range [0.0;1.0].
* `values` - the list of strings which contains all `field` values that fall into this distribution.
* `metric_labels` - list of metric labels.

> Notes:
> 1. Sum of ratios must be in range [0.0;1.0].
> 2. If sum of ratios less than 1, then adding **default distribution** with ratio **1-sum**,
> otherwise **default distribution** isn't used.
> All events for which the value in the `field` doesn't fall into any of the distributions:
> * fall into default distribution, if it exists
> * throttled, otherwise

`LimitDistributionConfig` example:
```yaml
field: log.level
ratios:
- ratio: 0.5
values: ['error']
- ratio: 0.3
values: ['warn', 'info']
```
For this config and the `default_limit=100`:
* events with `log.level=error` will have a limit of 50
* events with `log.level=warn` or `log.level=info` will have a limit of 30
* all other events will have a limit of 20

<br>

**`endpoint`** *`string`*


Expand Down Expand Up @@ -140,5 +176,30 @@ If not set limiter values are considered as non-json data.

<br>

**`limiter_distribution_field`** *`string`*

Defines field with limit distribution inside json object stored in value
(e.g. if set to "distribution", value must be of kind `{"distribution":{<object>},...}`).
Distribution object example:
```json
{
"field": "log.level",
"ratios": [
{
"ratio": 0.5,
"values": ["error"]
},
{
"ratio": 0.3,
"values": ["warn", "info"]
}
],
"enabled": true
}
```
> If `limiter_value_field` and `limiter_distribution_field` not set, distribution will not be stored.

<br>


<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
225 changes: 225 additions & 0 deletions plugin/action/throttle/buckets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
package throttle

import (
"time"
)

type buckets interface {
get(index, distrIndex int) int64
getAll(index int, buf []int64) []int64
set(index, distrIndex int, value int64)
reset(index int)
add(index, distrIndex int, value int64)
isEmpty(index int) bool
// rebuild will rebuild buckets and returns actual bucket id
rebuild(currentTs, ts time.Time) int

// actualizeIndex checks probable shift of buckets and returns actual bucket index and actuality
actualizeIndex(maxID, index int) (int, bool)
getCount() int
getInterval() time.Duration
getMinID() int

// for testing only
isSimple() bool
}

func newBuckets(count, distributionSize int, interval time.Duration) buckets {
if distributionSize == 1 {
return newSimpleBuckets(count, interval)
}
return newDistributedBuckets(count, distributionSize, interval)
}

type bucketsMeta struct {
count int
interval time.Duration
minID int // min bucket id
maxID int // max bucket id
}

func newBucketsMeta(count int, interval time.Duration) bucketsMeta {
return bucketsMeta{
count: count,
interval: interval,
}
}

func (m bucketsMeta) getCount() int {
return m.count
}

func (m bucketsMeta) getInterval() time.Duration {
return m.interval
}

func (m bucketsMeta) getMinID() int {
return m.minID
}

func (m bucketsMeta) actualizeIndex(maxID, index int) (int, bool) {
if m.maxID == maxID {
return index, true
}

// buckets were rebuild during some operations, calc actual index
shift := m.maxID - maxID
actualIndex := index - shift
return actualIndex, actualIndex > 0
}

// timeToBucketID converts time to bucketID
func (m bucketsMeta) timeToBucketID(t time.Time) int {
return int(t.UnixNano() / m.interval.Nanoseconds())
}

type simpleBuckets struct {
bucketsMeta
b []int64
}

func newSimpleBuckets(count int, interval time.Duration) *simpleBuckets {
return &simpleBuckets{
bucketsMeta: newBucketsMeta(count, interval),
b: make([]int64, count),
}
}

func (b *simpleBuckets) get(index, _ int) int64 {
return b.b[index]
}

func (b *simpleBuckets) getAll(index int, buf []int64) []int64 {
return append(buf, b.b[index])
}

func (b *simpleBuckets) set(index, _ int, value int64) {
b.b[index] = value
}

func (b *simpleBuckets) reset(index int) {
b.b[index] = 0
}

func (b *simpleBuckets) add(index, _ int, value int64) {
b.b[index] += value
}

func (b *simpleBuckets) isEmpty(index int) bool {
return b.b[index] == 0
}

func (b *simpleBuckets) rebuild(currentTs, ts time.Time) int {
resetFn := func(count int) {
b.b = append(b.b[count:], b.b[:count]...)
for i := 0; i < count; i++ {
b.reset(b.getCount() - 1 - i)
}
}

return rebuildBuckets(&b.bucketsMeta, resetFn, currentTs, ts)
}

func (b *simpleBuckets) isSimple() bool {
return true
}

type distributedBuckets struct {
bucketsMeta
b []distributedBucket
}

func newDistributedBuckets(count, distributionSize int, interval time.Duration) *distributedBuckets {
db := &distributedBuckets{
bucketsMeta: newBucketsMeta(count, interval),
b: make([]distributedBucket, count),
}
for i := 0; i < count; i++ {
db.b[i] = newDistributedBucket(distributionSize)
}
return db
}

func (b *distributedBuckets) get(index, distrIndex int) int64 {
return b.b[index][distrIndex]
}

func (b *distributedBuckets) getAll(index int, buf []int64) []int64 {
return b.b[index].copyTo(buf)
}

func (b *distributedBuckets) set(index, distrIndex int, value int64) {
b.b[index][distrIndex] = value
}

func (b *distributedBuckets) reset(index int) {
for i := range b.b[index] {
b.b[index][i] = 0
}
}

func (b *distributedBuckets) add(index, distrIndex int, value int64) {
b.b[index][distrIndex] += value
}

func (b *distributedBuckets) isEmpty(index int) bool {
for _, v := range b.b[index] {
if v > 0 {
return false
}
}
return true
}

func (b *distributedBuckets) rebuild(currentTs, ts time.Time) int {
resetFn := func(count int) {
b.b = append(b.b[count:], b.b[:count]...)
for i := 0; i < count; i++ {
b.reset(b.getCount() - 1 - i)
}
}

return rebuildBuckets(&b.bucketsMeta, resetFn, currentTs, ts)
}

func (b *distributedBuckets) isSimple() bool {
return false
}

type distributedBucket []int64

func newDistributedBucket(size int) distributedBucket {
return make(distributedBucket, size)
}

func (s distributedBucket) copyTo(buf distributedBucket) distributedBucket {
return append(buf, s...)
}

func rebuildBuckets(meta *bucketsMeta, resetFn func(int), currentTs, ts time.Time) int {
currentID := meta.timeToBucketID(currentTs)
if meta.minID == 0 {
// min id weren't set yet. It MUST be extracted from currentTs, because ts from event can be invalid (e.g. from 1970 or 2077 year)
meta.maxID = currentID
meta.minID = meta.maxID - meta.count + 1
}
maxID := meta.minID + meta.count - 1

// currentID exceed maxID, actualize buckets
if currentID > maxID {
dif := currentID - maxID
n := min(dif, meta.count)
// reset old buckets
resetFn(n)
// update ids
meta.minID += dif
meta.maxID = currentID
}

id := meta.timeToBucketID(ts)
// events from past or future goes to the latest bucket
if id < meta.minID || id > meta.maxID {
id = meta.maxID
}
return id
}
Loading
Loading