Skip to content

Commit

Permalink
Merge pull request #173 from RichardKnop/feature/fix-send-group-concu…
Browse files Browse the repository at this point in the history
…rrency

Fix pool logic to handle concurrency in SendGroup call.
  • Loading branch information
RichardKnop authored Sep 5, 2017
2 parents 57cfb50 + 6a971bd commit 0247ab8
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 8 deletions.
7 changes: 4 additions & 3 deletions integration-tests/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ func (a ascendingInt64s) Less(i, j int) bool { return a[i] < a[j] }

func testAll(server *machinery.Server, t *testing.T) {
testSendTask(server, t)
testSendGroup(server, t)
testSendGroup(server, t, 0) // with unlimited concurrency
testSendGroup(server, t, 2) // with limited concurrency (2 parallel tasks at the most)
testSendChord(server, t)
testSendChain(server, t)
testReturnJustError(server, t)
Expand Down Expand Up @@ -57,11 +58,11 @@ func testSendTask(server *machinery.Server, t *testing.T) {
}
}

func testSendGroup(server *machinery.Server, t *testing.T) {
func testSendGroup(server *machinery.Server, t *testing.T, sendConcurrency int) {
t1, t2, t3 := newAddTask(1, 1), newAddTask(2, 2), newAddTask(5, 6)

group := tasks.NewGroup(t1, t2, t3)
asyncResults, err := server.SendGroup(group, 10)
asyncResults, err := server.SendGroup(group, sendConcurrency)
if err != nil {
t.Error(err)
}
Expand Down
16 changes: 11 additions & 5 deletions v1/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,9 @@ func (server *Server) SendGroup(group *tasks.Group, sendConcurrency int) ([]*bac

pool := make(chan struct{}, sendConcurrency)
go func() {
pool <- struct{}{}
for i := 0; i < sendConcurrency; i++ {
pool <- struct{}{}
}
}()

for i, signature := range group.Tasks {
Expand All @@ -198,13 +200,17 @@ func (server *Server) SendGroup(group *tasks.Group, sendConcurrency int) ([]*bac
defer wg.Done()

// Publish task
if err := server.broker.Publish(s); err != nil {
errorsChan <- fmt.Errorf("Publish message error: %s", err)

err := server.broker.Publish(s)

if sendConcurrency > 0 {
pool <- struct{}{}
return
}

pool <- struct{}{}
if err != nil {
errorsChan <- fmt.Errorf("Publish message error: %s", err)
return
}

asyncResults[index] = backends.NewAsyncResult(s, server.backend)
}(signature, i)
Expand Down

0 comments on commit 0247ab8

Please sign in to comment.