-
Notifications
You must be signed in to change notification settings - Fork 0
/
forwarder_kafka_cache.go
87 lines (77 loc) · 1.39 KB
/
forwarder_kafka_cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package main
import (
"errors"
"time"
)
var errCacheNotReady = errors.New("cache not yet ready")
// cache implements a thread-safe token cache for our Kafka forwarder.
type cache struct {
in chan any
out chan []any
length chan int
done chan empty
age chan time.Time
conf *kafkaConfig
}
func newCache() *cache {
return &cache{
in: make(chan any),
out: make(chan []any),
length: make(chan int),
done: make(chan empty),
age: make(chan time.Time),
}
}
func (c *cache) start() {
go func() {
var (
age time.Time
elems = []any{}
)
for {
select {
case e := <-c.in:
if len(elems) == 0 {
age = time.Now()
}
elems = append(elems, e)
case c.out <- elems:
elems = []any{}
case c.length <- len(elems):
case c.age <- age:
case <-c.done:
return
}
}
}()
}
func (c *cache) stop() {
close(c.done)
}
func (c *cache) len() int {
return <-c.length
}
func (c *cache) submit(e any) {
c.in <- e
}
func (c *cache) retrieve() ([]any, error) {
if c.isReady() {
return <-c.out, nil
}
return nil, errCacheNotReady
}
func (c *cache) isReady() bool {
age := <-c.age
if age.IsZero() {
return false
}
// We cache tokens until the cache gets too large or too old -- whichever
// comes first.
if c.len() > c.conf.batchSize {
return true
}
if time.Now().Add(-c.conf.batchPeriod).After(age) {
return true
}
return false
}