From f4f6984f1eeb2b68bb81aa7d7864a18dcc145f41 Mon Sep 17 00:00:00 2001 From: Michael Benford Date: Sun, 2 Jan 2022 12:40:51 -0300 Subject: [PATCH] Add support for type parameters (generics) --- .tool-versions | 2 +- README.md | 44 +++++++-------- bench_test.go | 12 ++-- buffer.go | 34 +++++++----- buffer_test.go | 143 ++++++++++++++++++++++++------------------------ flusher.go | 8 +-- go.mod | 15 ++++- go.sum | 1 - options.go | 13 ----- options_test.go | 14 +---- 10 files changed, 139 insertions(+), 147 deletions(-) diff --git a/.tool-versions b/.tool-versions index 6973eed..4933a41 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1 +1 @@ -golang 1.14 +golang 1.18 diff --git a/README.md b/README.md index 28e108e..52add33 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ ## Installation - go get github.com/globocom/go-buffer + go get github.com/globocom/go-buffer/v3 ## Examples @@ -30,19 +30,19 @@ package main import ( "time" - "github.com/globocom/go-buffer/v2" + "github.com/globocom/go-buffer/v3" ) func main() { - buff := buffer.New( - // buffer can hold up to 5 items - buffer.WithSize(5), + buff := buffer.New[string]( // call this function when the buffer needs flushing - buffer.WithFlusher(func(items []interface{}) { + func(items []string) { for _, item := range items { - println(item.(string)) + println(string) } - }), + }, + // buffer can hold up to 5 items + buffer.WithSize(5), ) // ensure the buffer defer buff.Close() @@ -68,22 +68,22 @@ package main import ( "time" - "github.com/globocom/go-buffer/v2" + "github.com/globocom/go-buffer/v3" ) func main() { - buff := buffer.New( + buff := buffer.New[string]( + // call this function when the buffer needs flushing + func(items []string) { + for _, item := range items { + println(item) + } + }, // buffer can hold up to 5 items buffer.WithSize(5), // buffer will be flushed every second, regardless of // how many items were pushed buffer.WithFlushInterval(time.Second), - // call this function when the buffer needs flushing - buffer.WithFlusher(func(items []interface{}) { - for _, item := range items { - println(item.(string)) - } - }), ) defer buff.Close() @@ -110,15 +110,15 @@ import ( ) func main() { - buff := buffer.New( - // buffer can hold up to 5 items - buffer.WithSize(5), + buff := buffer.New[string]( // call this function when the buffer needs flushing - buffer.WithFlusher(func(items []interface{}) { + func(items []string) { for _, item := range items { - println(item.(string)) + println(item) } - }), + }, + // buffer can hold up to 5 items + buffer.WithSize(5), ) defer buff.Close() diff --git a/bench_test.go b/bench_test.go index 9c07000..e0198b1 100644 --- a/bench_test.go +++ b/bench_test.go @@ -3,16 +3,16 @@ package buffer_test import ( "testing" - "github.com/globocom/go-buffer/v2" + "github.com/globocom/go-buffer/v3" ) func BenchmarkBuffer(b *testing.B) { - noop := buffer.FlusherFunc(func([]interface{}) {}) + noop := buffer.FlusherFunc[int](func([]int) {}) b.Run("push only", func(b *testing.B) { - sut := buffer.New( + sut := buffer.New[int]( + noop, buffer.WithSize(uint(b.N)+1), - buffer.WithFlusher(noop), ) defer sut.Close() @@ -25,9 +25,9 @@ func BenchmarkBuffer(b *testing.B) { }) b.Run("push and flush", func(b *testing.B) { - sut := buffer.New( + sut := buffer.New[int]( + noop, buffer.WithSize(1), - buffer.WithFlusher(noop), ) defer sut.Close() diff --git a/buffer.go b/buffer.go index 8fae11e..cd62ff9 100644 --- a/buffer.go +++ b/buffer.go @@ -15,9 +15,10 @@ var ( type ( // Buffer represents a data buffer that is asynchronously flushed, either manually or automatically. - Buffer struct { + Buffer[T any] struct { io.Closer - dataCh chan interface{} + flusher Flusher[T] + dataCh chan T flushCh chan struct{} closeCh chan struct{} doneCh chan struct{} @@ -27,9 +28,9 @@ type ( // Push appends an item to the end of the buffer. // -// It returns an ErrTimeout if if cannot be performed in a timely fashion, and +// It returns an ErrTimeout if it cannot be performed in a timely fashion, and // an ErrClosed if the buffer has been closed. -func (buffer *Buffer) Push(item interface{}) error { +func (buffer *Buffer[T]) Push(item T) error { if buffer.closed() { return ErrClosed } @@ -46,7 +47,7 @@ func (buffer *Buffer) Push(item interface{}) error { // // It returns an ErrTimeout if if cannot be performed in a timely fashion, and // an ErrClosed if the buffer has been closed. -func (buffer *Buffer) Flush() error { +func (buffer *Buffer[T]) Flush() error { if buffer.closed() { return ErrClosed } @@ -67,7 +68,7 @@ func (buffer *Buffer) Flush() error { // An ErrTimeout can either mean that a flush could not be triggered, or it can // mean that a flush was triggered but it has not finished yet. In any case it is // safe to call Close again. -func (buffer *Buffer) Close() error { +func (buffer *Buffer[T]) Close() error { if buffer.closed() { return ErrClosed } @@ -90,7 +91,7 @@ func (buffer *Buffer) Close() error { } } -func (buffer Buffer) closed() bool { +func (buffer *Buffer[T]) closed() bool { select { case <-buffer.doneCh: return true @@ -99,9 +100,9 @@ func (buffer Buffer) closed() bool { } } -func (buffer *Buffer) consume() { +func (buffer *Buffer[T]) consume() { count := 0 - items := make([]interface{}, buffer.options.Size) + items := make([]T, buffer.options.Size) mustFlush := false ticker, stopTicker := newTicker(buffer.options.FlushInterval) @@ -123,10 +124,10 @@ func (buffer *Buffer) consume() { if mustFlush { stopTicker() - buffer.options.Flusher.Write(items[:count]) + buffer.flusher.Write(items[:count]) count = 0 - items = make([]interface{}, buffer.options.Size) + items = make([]T, buffer.options.Size) mustFlush = false ticker, stopTicker = newTicker(buffer.options.FlushInterval) } @@ -146,9 +147,14 @@ func newTicker(interval time.Duration) (<-chan time.Time, func()) { } // New creates a new buffer instance with the provided options. -func New(opts ...Option) *Buffer { - buffer := &Buffer{ - dataCh: make(chan interface{}), +func New[T any](flusher Flusher[T], opts ...Option) *Buffer[T] { + if flusher == nil { + panic("invalid flusher") + } + + buffer := &Buffer[T]{ + flusher: flusher, + dataCh: make(chan T), flushCh: make(chan struct{}), closeCh: make(chan struct{}), doneCh: make(chan struct{}), diff --git a/buffer_test.go b/buffer_test.go index 869bbbc..4c47645 100644 --- a/buffer_test.go +++ b/buffer_test.go @@ -6,7 +6,7 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/globocom/go-buffer/v2" + "github.com/globocom/go-buffer/v3" ) var _ = Describe("Buffer", func() { @@ -19,38 +19,39 @@ var _ = Describe("Buffer", func() { Context("Constructor", func() { It("creates a new Buffer instance", func() { // act - sut := buffer.New( + sut := buffer.New[string]( + flusher, buffer.WithSize(10), - buffer.WithFlusher(flusher), ) // assert Expect(sut).NotTo(BeNil()) }) + It("panics when provided an invalid flusher", func() { + Expect(func() { + buffer.New[string]( + nil, + buffer.WithSize(1), + ) + }).To(Panic()) + }) + Context("invalid options", func() { It("panics when provided an invalid size", func() { Expect(func() { - buffer.New( + buffer.New[string]( + flusher, buffer.WithSize(0), ) }).To(Panic()) }) - It("panics when provided an invalid flusher", func() { - Expect(func() { - buffer.New( - buffer.WithSize(1), - buffer.WithFlusher(nil), - ) - }).To(Panic()) - }) - It("panics when provided an invalid flush interval", func() { Expect(func() { - buffer.New( + buffer.New[string]( + flusher, buffer.WithSize(1), - buffer.WithFlusher(flusher), buffer.WithFlushInterval(-1), ) }).To(Panic()) @@ -58,9 +59,9 @@ var _ = Describe("Buffer", func() { It("panics when provided an invalid push timeout", func() { Expect(func() { - buffer.New( + buffer.New[string]( + flusher, buffer.WithSize(1), - buffer.WithFlusher(flusher), buffer.WithPushTimeout(-1), ) }).To(Panic()) @@ -68,9 +69,9 @@ var _ = Describe("Buffer", func() { It("panics when provided an invalid flush timeout", func() { Expect(func() { - buffer.New( + buffer.New[string]( + flusher, buffer.WithSize(1), - buffer.WithFlusher(flusher), buffer.WithFlushTimeout(-1), ) }).To(Panic()) @@ -78,9 +79,9 @@ var _ = Describe("Buffer", func() { It("panics when provided an invalid close timeout", func() { Expect(func() { - buffer.New( + buffer.New[string]( + flusher, buffer.WithSize(1), - buffer.WithFlusher(flusher), buffer.WithCloseTimeout(-1), ) }).To(Panic()) @@ -91,15 +92,15 @@ var _ = Describe("Buffer", func() { Context("Pushing", func() { It("pushes items into the buffer when Push is called", func() { // arrange - sut := buffer.New( + sut := buffer.New[string]( + flusher, buffer.WithSize(3), - buffer.WithFlusher(flusher), ) // act - err1 := sut.Push(1) - err2 := sut.Push(2) - err3 := sut.Push(3) + err1 := sut.Push("a") + err2 := sut.Push("b") + err3 := sut.Push("c") // assert Expect(err1).To(Succeed()) @@ -110,16 +111,16 @@ var _ = Describe("Buffer", func() { It("fails when Push cannot execute in a timely fashion", func() { // arrange flusher.Func = func() { select {} } - sut := buffer.New( + sut := buffer.New[string]( + flusher, buffer.WithSize(2), - buffer.WithFlusher(flusher), buffer.WithPushTimeout(time.Second), ) // act - err1 := sut.Push(1) - err2 := sut.Push(2) - err3 := sut.Push(3) + err1 := sut.Push("a") + err2 := sut.Push("b") + err3 := sut.Push("c") // assert Expect(err1).To(Succeed()) @@ -129,14 +130,14 @@ var _ = Describe("Buffer", func() { It("fails when the buffer is closed", func() { // arrange - sut := buffer.New( + sut := buffer.New[string]( + flusher, buffer.WithSize(2), - buffer.WithFlusher(flusher), ) _ = sut.Close() // act - err := sut.Push(1) + err := sut.Push("a") // assert Expect(err).To(MatchError(buffer.ErrClosed)) @@ -146,21 +147,21 @@ var _ = Describe("Buffer", func() { Context("Flushing", func() { It("flushes the buffer when it fills up", func(done Done) { // arrange - sut := buffer.New( + sut := buffer.New[string]( + flusher, buffer.WithSize(5), - buffer.WithFlusher(flusher), ) // act - _ = sut.Push(1) - _ = sut.Push(2) - _ = sut.Push(3) - _ = sut.Push(4) - _ = sut.Push(5) + _ = sut.Push("a") + _ = sut.Push("b") + _ = sut.Push("c") + _ = sut.Push("d") + _ = sut.Push("e") // assert result := <-flusher.Done - Expect(result.Items).To(ConsistOf(1, 2, 3, 4, 5)) + Expect(result.Items).To(ConsistOf("a", "b", "c", "d", "e")) close(done) }) @@ -168,30 +169,30 @@ var _ = Describe("Buffer", func() { // arrange interval := 3 * time.Second start := time.Now() - sut := buffer.New( + sut := buffer.New[string]( + flusher, buffer.WithSize(5), - buffer.WithFlusher(flusher), buffer.WithFlushInterval(interval), ) // act - _ = sut.Push(1) + _ = sut.Push("a") // assert result := <-flusher.Done - Expect(result.Items).To(ConsistOf(1)) + Expect(result.Items).To(ConsistOf("a")) Expect(result.Time).To(BeTemporally("~", start, interval+time.Second)) close(done) }, 5) It("flushes the buffer when Flush is called", func(done Done) { // arrange - sut := buffer.New( + sut := buffer.New[string]( + flusher, buffer.WithSize(3), - buffer.WithFlusher(flusher), ) - _ = sut.Push(1) - _ = sut.Push(2) + _ = sut.Push("a") + _ = sut.Push("b") // act err := sut.Flush() @@ -199,19 +200,19 @@ var _ = Describe("Buffer", func() { // assert result := <-flusher.Done Expect(err).To(Succeed()) - Expect(result.Items).To(ConsistOf(1, 2)) + Expect(result.Items).To(ConsistOf("a", "b")) close(done) }) It("fails when Flush cannot execute in a timely fashion", func() { // arrange flusher.Func = func() { time.Sleep(3 * time.Second) } - sut := buffer.New( + sut := buffer.New[string]( + flusher, buffer.WithSize(1), - buffer.WithFlusher(flusher), buffer.WithFlushTimeout(time.Second), ) - _ = sut.Push(1) + _ = sut.Push("a") // act err := sut.Flush() @@ -222,9 +223,9 @@ var _ = Describe("Buffer", func() { It("fails when the buffer is closed", func() { // arrange - sut := buffer.New( + sut := buffer.New[string]( + flusher, buffer.WithSize(2), - buffer.WithFlusher(flusher), ) _ = sut.Close() @@ -239,12 +240,12 @@ var _ = Describe("Buffer", func() { Context("Closing", func() { It("flushes the buffer and closes it when Close is called", func(done Done) { // arrange - sut := buffer.New( + sut := buffer.New[string]( + flusher, buffer.WithSize(3), - buffer.WithFlusher(flusher), ) - _ = sut.Push(1) - _ = sut.Push(2) + _ = sut.Push("a") + _ = sut.Push("b") // act err := sut.Close() @@ -252,7 +253,7 @@ var _ = Describe("Buffer", func() { // assert result := <-flusher.Done Expect(err).To(Succeed()) - Expect(result.Items).To(ConsistOf(1, 2)) + Expect(result.Items).To(ConsistOf("a", "b")) close(done) }) @@ -260,12 +261,12 @@ var _ = Describe("Buffer", func() { // arrange flusher.Func = func() { time.Sleep(2 * time.Second) } - sut := buffer.New( + sut := buffer.New[string]( + flusher, buffer.WithSize(1), - buffer.WithFlusher(flusher), buffer.WithCloseTimeout(time.Second), ) - _ = sut.Push(1) + _ = sut.Push("a") // act err := sut.Close() @@ -278,9 +279,9 @@ var _ = Describe("Buffer", func() { // arrange flusher.Func = func() { time.Sleep(2 * time.Second) } - sut := buffer.New( + sut := buffer.New[string]( + flusher, buffer.WithSize(1), - buffer.WithFlusher(flusher), buffer.WithCloseTimeout(time.Second), ) _ = sut.Close() @@ -296,12 +297,12 @@ var _ = Describe("Buffer", func() { // arrange flusher.Func = func() { time.Sleep(2 * time.Second) } - sut := buffer.New( + sut := buffer.New[string]( + flusher, buffer.WithSize(1), - buffer.WithFlusher(flusher), buffer.WithCloseTimeout(time.Second), ) - _ = sut.Push(1) + _ = sut.Push("a") // act err1 := sut.Close() @@ -323,11 +324,11 @@ type ( WriteCall struct { Time time.Time - Items []interface{} + Items []string } ) -func (flusher *MockFlusher) Write(items []interface{}) { +func (flusher *MockFlusher) Write(items []string) { call := &WriteCall{ Time: time.Now(), Items: items, diff --git a/flusher.go b/flusher.go index e1078cd..4845d10 100644 --- a/flusher.go +++ b/flusher.go @@ -2,14 +2,14 @@ package buffer type ( // Flusher represents a destination of buffered data. - Flusher interface { - Write(items []interface{}) + Flusher[T any] interface { + Write(items []T) } // FlusherFunc represents a flush function. - FlusherFunc func(items []interface{}) + FlusherFunc[T any] func(items []T) ) -func (fn FlusherFunc) Write(items []interface{}) { +func (fn FlusherFunc[T]) Write(items []T) { fn(items) } diff --git a/go.mod b/go.mod index 8dd618c..f92a045 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,19 @@ -module github.com/globocom/go-buffer/v2 +module github.com/globocom/go-buffer/v3 -go 1.13 +go 1.18 require ( github.com/onsi/ginkgo v1.13.0 github.com/onsi/gomega v1.10.1 ) + +require ( + github.com/fsnotify/fsnotify v1.4.9 // indirect + github.com/nxadm/tail v1.4.4 // indirect + golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7 // indirect + golang.org/x/sys v0.0.0-20200519105757-fe76b779f299 // indirect + golang.org/x/text v0.3.2 // indirect + golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 // indirect + gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect + gopkg.in/yaml.v2 v2.3.0 // indirect +) diff --git a/go.sum b/go.sum index 5276e40..e63d6ae 100644 --- a/go.sum +++ b/go.sum @@ -11,7 +11,6 @@ github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0 github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= diff --git a/options.go b/options.go index dda28d2..794b64e 100644 --- a/options.go +++ b/options.go @@ -8,7 +8,6 @@ import ( const ( invalidSize = "size cannot be zero" - invalidFlusher = "flusher cannot be nil" invalidInterval = "interval must be greater than zero (%s)" invalidTimeout = "timeout cannot be negative (%s)" ) @@ -17,7 +16,6 @@ type ( // Configuration options. Options struct { Size uint - Flusher Flusher FlushInterval time.Duration PushTimeout time.Duration FlushTimeout time.Duration @@ -35,13 +33,6 @@ func WithSize(size uint) Option { } } -// WithFlusher sets the flusher that should be used to write out the buffer. -func WithFlusher(flusher Flusher) Option { - return func(options *Options) { - options.Flusher = flusher - } -} - // WithFlushInterval sets the interval between automatic flushes. func WithFlushInterval(interval time.Duration) Option { return func(options *Options) { @@ -74,9 +65,6 @@ func validateOptions(options *Options) error { if options.Size == 0 { return errors.New(invalidSize) } - if options.Flusher == nil { - return errors.New(invalidFlusher) - } if options.FlushInterval < 0 { return fmt.Errorf(invalidInterval, "FlushInterval") } @@ -96,7 +84,6 @@ func validateOptions(options *Options) error { func resolveOptions(opts ...Option) *Options { options := &Options{ Size: 0, - Flusher: nil, FlushInterval: 0, PushTimeout: time.Second, FlushTimeout: time.Second, diff --git a/options_test.go b/options_test.go index bff0846..18a52d6 100644 --- a/options_test.go +++ b/options_test.go @@ -6,7 +6,7 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/globocom/go-buffer/v2" + "github.com/globocom/go-buffer/v3" ) var _ = Describe("Options", func() { @@ -21,18 +21,6 @@ var _ = Describe("Options", func() { Expect(opts.Size).To(BeIdenticalTo(uint(10))) }) - It("sets up flusher", func() { - // arrange - opts := &buffer.Options{} - flusher := func(items []interface{}) {} - - // act - buffer.WithFlusher(buffer.FlusherFunc(flusher))(opts) - - // assert - Expect(opts.Flusher).NotTo(BeNil()) - }) - It("sets up flush interval", func() { // arrange opts := &buffer.Options{}