Skip to content

Commit

Permalink
nsqd: read/write concurrently in MPUB benchmarks
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed Jan 2, 2017
1 parent 4e612db commit 5c72621
Showing 1 changed file with 54 additions and 34 deletions.
88 changes: 54 additions & 34 deletions nsqd/protocol_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1547,25 +1547,35 @@ func benchmarkProtocolV2PubMultiTopic(b *testing.B, numTopics int) {
rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))

num := b.N / numTopics / batchSize
for i := 0; i < num; i++ {
cmd, _ := nsq.MultiPublish(topicName, batch)
_, err := cmd.WriteTo(rw)
if err != nil {
panic(err.Error())
}
err = rw.Flush()
if err != nil {
panic(err.Error())
}
resp, err := nsq.ReadResponse(rw)
if err != nil {
panic(err.Error())
wg.Add(1)
go func() {
for i := 0; i < num; i++ {
cmd, _ := nsq.MultiPublish(topicName, batch)
_, err := cmd.WriteTo(rw)
if err != nil {
panic(err.Error())
}
err = rw.Flush()
if err != nil {
panic(err.Error())
}
}
_, data, _ := nsq.UnpackResponse(resp)
if !bytes.Equal(data, []byte("OK")) {
panic("invalid response")
wg.Done()
}()
wg.Add(1)
go func() {
for i := 0; i < num; i++ {
resp, err := nsq.ReadResponse(rw)
if err != nil {
panic(err.Error())
}
_, data, _ := nsq.UnpackResponse(resp)
if !bytes.Equal(data, []byte("OK")) {
panic("invalid response")
}
}
}
wg.Done()
}()
wg.Done()
}()
}
Expand Down Expand Up @@ -1611,25 +1621,35 @@ func benchmarkProtocolV2Pub(b *testing.B, size int) {
rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))

num := b.N / runtime.GOMAXPROCS(0) / batchSize
for i := 0; i < num; i++ {
cmd, _ := nsq.MultiPublish(topicName, batch)
_, err := cmd.WriteTo(rw)
if err != nil {
panic(err.Error())
}
err = rw.Flush()
if err != nil {
panic(err.Error())
}
resp, err := nsq.ReadResponse(rw)
if err != nil {
panic(err.Error())
wg.Add(1)
go func() {
for i := 0; i < num; i++ {
cmd, _ := nsq.MultiPublish(topicName, batch)
_, err := cmd.WriteTo(rw)
if err != nil {
panic(err.Error())
}
err = rw.Flush()
if err != nil {
panic(err.Error())
}
}
_, data, _ := nsq.UnpackResponse(resp)
if !bytes.Equal(data, []byte("OK")) {
panic("invalid response")
wg.Done()
}()
wg.Add(1)
go func() {
for i := 0; i < num; i++ {
resp, err := nsq.ReadResponse(rw)
if err != nil {
panic(err.Error())
}
_, data, _ := nsq.UnpackResponse(resp)
if !bytes.Equal(data, []byte("OK")) {
panic("invalid response")
}
}
}
wg.Done()
}()
wg.Done()
}()
}
Expand Down

0 comments on commit 5c72621

Please sign in to comment.