Skip to content

Commit

Permalink
Add support for type parameters (generics)
Browse files Browse the repository at this point in the history
  • Loading branch information
mbenford committed Mar 16, 2022
1 parent 612b799 commit f4f6984
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 147 deletions.
2 changes: 1 addition & 1 deletion .tool-versions
Original file line number Diff line number Diff line change
@@ -1 +1 @@
golang 1.14
golang 1.18
44 changes: 22 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

## Installation

go get github.com/globocom/go-buffer
go get github.com/globocom/go-buffer/v3

## Examples

Expand All @@ -30,19 +30,19 @@ package main
import (
"time"

"github.com/globocom/go-buffer/v2"
"github.com/globocom/go-buffer/v3"
)

func main() {
buff := buffer.New(
// buffer can hold up to 5 items
buffer.WithSize(5),
buff := buffer.New[string](
// call this function when the buffer needs flushing
buffer.WithFlusher(func(items []interface{}) {
func(items []string) {
for _, item := range items {
println(item.(string))
println(string)
}
}),
},
// buffer can hold up to 5 items
buffer.WithSize(5),
)
// ensure the buffer
defer buff.Close()
Expand All @@ -68,22 +68,22 @@ package main
import (
"time"

"github.com/globocom/go-buffer/v2"
"github.com/globocom/go-buffer/v3"
)

func main() {
buff := buffer.New(
buff := buffer.New[string](
// call this function when the buffer needs flushing
func(items []string) {
for _, item := range items {
println(item)
}
},
// buffer can hold up to 5 items
buffer.WithSize(5),
// buffer will be flushed every second, regardless of
// how many items were pushed
buffer.WithFlushInterval(time.Second),
// call this function when the buffer needs flushing
buffer.WithFlusher(func(items []interface{}) {
for _, item := range items {
println(item.(string))
}
}),
)
defer buff.Close()

Expand All @@ -110,15 +110,15 @@ import (
)

func main() {
buff := buffer.New(
// buffer can hold up to 5 items
buffer.WithSize(5),
buff := buffer.New[string](
// call this function when the buffer needs flushing
buffer.WithFlusher(func(items []interface{}) {
func(items []string) {
for _, item := range items {
println(item.(string))
println(item)
}
}),
},
// buffer can hold up to 5 items
buffer.WithSize(5),
)
defer buff.Close()

Expand Down
12 changes: 6 additions & 6 deletions bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ package buffer_test
import (
"testing"

"github.com/globocom/go-buffer/v2"
"github.com/globocom/go-buffer/v3"
)

func BenchmarkBuffer(b *testing.B) {
noop := buffer.FlusherFunc(func([]interface{}) {})
noop := buffer.FlusherFunc[int](func([]int) {})

b.Run("push only", func(b *testing.B) {
sut := buffer.New(
sut := buffer.New[int](
noop,
buffer.WithSize(uint(b.N)+1),
buffer.WithFlusher(noop),
)
defer sut.Close()

Expand All @@ -25,9 +25,9 @@ func BenchmarkBuffer(b *testing.B) {
})

b.Run("push and flush", func(b *testing.B) {
sut := buffer.New(
sut := buffer.New[int](
noop,
buffer.WithSize(1),
buffer.WithFlusher(noop),
)
defer sut.Close()

Expand Down
34 changes: 20 additions & 14 deletions buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ var (

type (
// Buffer represents a data buffer that is asynchronously flushed, either manually or automatically.
Buffer struct {
Buffer[T any] struct {
io.Closer
dataCh chan interface{}
flusher Flusher[T]
dataCh chan T
flushCh chan struct{}
closeCh chan struct{}
doneCh chan struct{}
Expand All @@ -27,9 +28,9 @@ type (

// Push appends an item to the end of the buffer.
//
// It returns an ErrTimeout if if cannot be performed in a timely fashion, and
// It returns an ErrTimeout if it cannot be performed in a timely fashion, and
// an ErrClosed if the buffer has been closed.
func (buffer *Buffer) Push(item interface{}) error {
func (buffer *Buffer[T]) Push(item T) error {
if buffer.closed() {
return ErrClosed
}
Expand All @@ -46,7 +47,7 @@ func (buffer *Buffer) Push(item interface{}) error {
//
// It returns an ErrTimeout if if cannot be performed in a timely fashion, and
// an ErrClosed if the buffer has been closed.
func (buffer *Buffer) Flush() error {
func (buffer *Buffer[T]) Flush() error {
if buffer.closed() {
return ErrClosed
}
Expand All @@ -67,7 +68,7 @@ func (buffer *Buffer) Flush() error {
// An ErrTimeout can either mean that a flush could not be triggered, or it can
// mean that a flush was triggered but it has not finished yet. In any case it is
// safe to call Close again.
func (buffer *Buffer) Close() error {
func (buffer *Buffer[T]) Close() error {
if buffer.closed() {
return ErrClosed
}
Expand All @@ -90,7 +91,7 @@ func (buffer *Buffer) Close() error {
}
}

func (buffer Buffer) closed() bool {
func (buffer *Buffer[T]) closed() bool {
select {
case <-buffer.doneCh:
return true
Expand All @@ -99,9 +100,9 @@ func (buffer Buffer) closed() bool {
}
}

func (buffer *Buffer) consume() {
func (buffer *Buffer[T]) consume() {
count := 0
items := make([]interface{}, buffer.options.Size)
items := make([]T, buffer.options.Size)
mustFlush := false
ticker, stopTicker := newTicker(buffer.options.FlushInterval)

Expand All @@ -123,10 +124,10 @@ func (buffer *Buffer) consume() {

if mustFlush {
stopTicker()
buffer.options.Flusher.Write(items[:count])
buffer.flusher.Write(items[:count])

count = 0
items = make([]interface{}, buffer.options.Size)
items = make([]T, buffer.options.Size)
mustFlush = false
ticker, stopTicker = newTicker(buffer.options.FlushInterval)
}
Expand All @@ -146,9 +147,14 @@ func newTicker(interval time.Duration) (<-chan time.Time, func()) {
}

// New creates a new buffer instance with the provided options.
func New(opts ...Option) *Buffer {
buffer := &Buffer{
dataCh: make(chan interface{}),
func New[T any](flusher Flusher[T], opts ...Option) *Buffer[T] {
if flusher == nil {
panic("invalid flusher")
}

buffer := &Buffer[T]{
flusher: flusher,
dataCh: make(chan T),
flushCh: make(chan struct{}),
closeCh: make(chan struct{}),
doneCh: make(chan struct{}),
Expand Down
Loading

0 comments on commit f4f6984

Please sign in to comment.