diff --git a/README.md b/README.md index 146430cc..bb4134c9 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,7 @@ If the file is not found, gowrap will look for the template [here](https://githu List of available templates: - [circuitbreaker](https://github.com/hexdigest/gowrap/tree/master/templates/circuitbreaker) stops executing methods of the wrapped interface after the specified number of consecutive errors and resumes execution after the specified delay + - [concurrencylimit](https://github.com/hexdigest/gowrap/tree/master/templates/concurrencylimit) limits amount of simultaneous calls - [fallback](https://github.com/hexdigest/gowrap/tree/master/templates/fallback) takes several implementations of the source interface and concurrently runs each implementation if the previous attempt didn't return the result in a specified period of time, it returns the first non-error result - [log](https://github.com/hexdigest/gowrap/tree/master/templates/log) instruments the source interface with logging using standard logger from the "log" package - [logrus](https://github.com/hexdigest/gowrap/tree/master/templates/logrus) instruments the source interface with logging using popular [sirupsen/logrus](https://github.com/sirupsen/logrus) logger diff --git a/templates/concurrencylimit b/templates/concurrencylimit new file mode 100644 index 00000000..5aca36ae --- /dev/null +++ b/templates/concurrencylimit @@ -0,0 +1,46 @@ +{{ $decorator := (or .Vars.DecoratorName (printf "%sWithConcurrencyLimit" .Interface.Name)) }} + +import ( + "time" +) + +// {{$decorator}} implements {{.Interface.Type}} +type {{$decorator}} struct { + _base {{.Interface.Type}} + _burst chan int +} + +// New{{$decorator}} instruments an implementation of the {{.Interface.Type}} with concurrency limiting +func New{{$decorator}}(base {{.Interface.Type}}, concurrentCalls int) *{{$decorator}} { + d := &{{$decorator}}{ + _base: base, + _burst: make(chan int, concurrentCalls), + } + + return d +} + +{{range $method := .Interface.Methods}} + // {{$method.Name}} implements {{$.Interface.Type}} + func (_d *{{$decorator}}) {{$method.Declaration}} { + + {{- if (and $method.AcceptsContext $method.ReturnsError)}} + select { + case <-ctx.Done(): + err = ctx.Err() + return + case _d._burst<-1: + defer func() { + <-_d._burst + }() + } + {{else}} + _d._burst<-1 + defer func() { + <-_d._burst + }() + {{end}} + + {{ $method.Pass "_d._base." }} + } +{{end}} diff --git a/templates_tests/interface_with_concurrency_limit.go b/templates_tests/interface_with_concurrency_limit.go new file mode 100644 index 00000000..d2415e1a --- /dev/null +++ b/templates_tests/interface_with_concurrency_limit.go @@ -0,0 +1,72 @@ +package templatestests + +import "context" + +// DO NOT EDIT! +// This code is generated with http://github.com/hexdigest/gowrap tool +// using ../templates/concurrencylimit template + +//go:generate gowrap gen -p github.com/hexdigest/gowrap/templates_tests -i TestInterface -t ../templates/concurrencylimit -o interface_with_concurrency_limit.go + +// TestInterfaceWithConcurrencyLimit implements TestInterface +type TestInterfaceWithConcurrencyLimit struct { + _base TestInterface + _burst chan int +} + +// NewTestInterfaceWithConcurrencyLimit instruments an implementation of the TestInterface with concurrency limiting +func NewTestInterfaceWithConcurrencyLimit(base TestInterface, concurrentCalls int) *TestInterfaceWithConcurrencyLimit { + d := &TestInterfaceWithConcurrencyLimit{ + _base: base, + _burst: make(chan int, concurrentCalls), + } + + return d +} + +// Channels implements TestInterface +func (_d *TestInterfaceWithConcurrencyLimit) Channels(chA chan bool, chB chan<- bool, chanC <-chan bool) { + _d._burst <- 1 + defer func() { + <-_d._burst + }() + + _d._base.Channels(chA, chB, chanC) + return +} + +// F implements TestInterface +func (_d *TestInterfaceWithConcurrencyLimit) F(ctx context.Context, a1 string, a2 ...string) (result1 string, result2 string, err error) { + select { + case <-ctx.Done(): + err = ctx.Err() + return + case _d._burst <- 1: + defer func() { + <-_d._burst + }() + } + + return _d._base.F(ctx, a1, a2...) +} + +// NoError implements TestInterface +func (_d *TestInterfaceWithConcurrencyLimit) NoError(s1 string) (s2 string) { + _d._burst <- 1 + defer func() { + <-_d._burst + }() + + return _d._base.NoError(s1) +} + +// NoParamsOrResults implements TestInterface +func (_d *TestInterfaceWithConcurrencyLimit) NoParamsOrResults() { + _d._burst <- 1 + defer func() { + <-_d._burst + }() + + _d._base.NoParamsOrResults() + return +} diff --git a/templates_tests/interface_with_concurrency_limit_test.go b/templates_tests/interface_with_concurrency_limit_test.go new file mode 100644 index 00000000..fb1125c2 --- /dev/null +++ b/templates_tests/interface_with_concurrency_limit_test.go @@ -0,0 +1,46 @@ +package templatestests + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestTestInterfaceWithConcurrencyLimit_F(t *testing.T) { + impl := &testImpl{r1: "1", r2: "2", delay: 100 * time.Millisecond} + + wrapped := NewTestInterfaceWithConcurrencyLimit(impl, 3) + + for i := 0; i < 10; i++ { + go func() { + r1, r2, err := wrapped.F(context.Background(), "a1") + assert.NoError(t, err) + assert.Equal(t, "1", r1) + assert.Equal(t, "2", r2) + + }() + } + + <-time.After(10 * time.Millisecond) + + counter := atomic.LoadUint64(&impl.callCounter) + assert.EqualValues(t, 3, counter) // the first burst + + <-time.After(100 * time.Millisecond) + + counter = atomic.LoadUint64(&impl.callCounter) + assert.EqualValues(t, 6, counter) // the second burst + + <-time.After(100 * time.Millisecond) + + counter = atomic.LoadUint64(&impl.callCounter) + assert.EqualValues(t, 9, counter) // the third burst + + <-time.After(100 * time.Millisecond) + + counter = atomic.LoadUint64(&impl.callCounter) + assert.EqualValues(t, 10, counter) // the 10th call +}