-
Notifications
You must be signed in to change notification settings - Fork 6
/
buffer.go
160 lines (140 loc) · 3.46 KB
/
buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package buffer
import (
"errors"
"io"
"time"
)
var (
// ErrTimeout indicates an operation has timed out.
ErrTimeout = errors.New("operation timed-out")
// ErrClosed indicates the buffer is closed and can no longer be used.
ErrClosed = errors.New("buffer is closed")
)
type (
// Buffer represents a data buffer that is asynchronously flushed, either manually or automatically.
Buffer struct {
io.Closer
dataCh chan interface{}
flushCh chan struct{}
closeCh chan struct{}
doneCh chan struct{}
options *Options
}
)
// Push appends an item to the end of the buffer.
//
// It returns an ErrTimeout if if cannot be performed in a timely fashion, and
// an ErrClosed if the buffer has been closed.
func (buffer *Buffer) Push(item interface{}) error {
if buffer.closed() {
return ErrClosed
}
select {
case buffer.dataCh <- item:
return nil
case <-time.After(buffer.options.PushTimeout):
return ErrTimeout
}
}
// Flush outputs the buffer to a permanent destination.
//
// It returns an ErrTimeout if if cannot be performed in a timely fashion, and
// an ErrClosed if the buffer has been closed.
func (buffer *Buffer) Flush() error {
if buffer.closed() {
return ErrClosed
}
select {
case buffer.flushCh <- struct{}{}:
return nil
case <-time.After(buffer.options.FlushTimeout):
return ErrTimeout
}
}
// Close flushes the buffer and prevents it from being further used.
//
// It returns an ErrTimeout if if cannot be performed in a timely fashion, and
// an ErrClosed if the buffer has already been closed.
//
// An ErrTimeout can either mean that a flush could not be triggered, or it can
// mean that a flush was triggered but it has not finished yet. In any case it is
// safe to call Close again.
func (buffer *Buffer) Close() error {
if buffer.closed() {
return ErrClosed
}
select {
case buffer.closeCh <- struct{}{}:
// noop
case <-time.After(buffer.options.CloseTimeout):
return ErrTimeout
}
select {
case <-buffer.doneCh:
close(buffer.dataCh)
close(buffer.flushCh)
close(buffer.closeCh)
return nil
case <-time.After(buffer.options.CloseTimeout):
return ErrTimeout
}
}
func (buffer Buffer) closed() bool {
select {
case <-buffer.doneCh:
return true
default:
return false
}
}
func (buffer *Buffer) consume() {
count := 0
items := make([]interface{}, buffer.options.Size)
mustFlush := false
ticker, stopTicker := newTicker(buffer.options.FlushInterval)
isOpen := true
for isOpen {
select {
case item := <-buffer.dataCh:
items[count] = item
count++
mustFlush = count >= len(items)
case <-ticker:
mustFlush = count > 0
case <-buffer.flushCh:
mustFlush = count > 0
case <-buffer.closeCh:
isOpen = false
mustFlush = count > 0
}
if mustFlush {
stopTicker()
buffer.options.Flusher.Write(items[:count])
count = 0
items = make([]interface{}, buffer.options.Size)
mustFlush = false
ticker, stopTicker = newTicker(buffer.options.FlushInterval)
}
}
stopTicker()
close(buffer.doneCh)
}
func newTicker(interval time.Duration) (<-chan time.Time, func()) {
if interval == 0 {
return nil, func() {}
}
ticker := time.NewTicker(interval)
return ticker.C, ticker.Stop
}
// New creates a new buffer instance with the provided options.
func New(opts ...Option) *Buffer {
buffer := &Buffer{
dataCh: make(chan interface{}),
flushCh: make(chan struct{}),
closeCh: make(chan struct{}),
doneCh: make(chan struct{}),
options: resolveOptions(opts...),
}
go buffer.consume()
return buffer
}