Skip to content

Commit

Permalink
feat: support configurable I/O chunk to drain at a time in ET mode (#646
Browse files Browse the repository at this point in the history
)

Fixes #643
  • Loading branch information
panjf2000 authored Nov 5, 2024
1 parent 4a0fed8 commit 2e261de
Show file tree
Hide file tree
Showing 7 changed files with 338 additions and 174 deletions.
168 changes: 111 additions & 57 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,203 +100,254 @@ func TestClient(t *testing.T) {
t.Run("poll-LT", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", false, false, false, false, 10, RoundRobin)
runClient(t, "tcp", ":9991", &testConf{false, 0, false, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", false, false, true, false, 10, LeastConnections)
runClient(t, "tcp", ":9992", &testConf{false, 0, false, true, false, false, 10, LeastConnections})
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", false, false, false, true, 10, RoundRobin)
runClient(t, "tcp", ":9991", &testConf{false, 0, false, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", false, false, true, true, 10, LeastConnections)
runClient(t, "tcp", ":9992", &testConf{false, 0, false, true, true, false, 10, LeastConnections})
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", false, false, false, false, 10, RoundRobin)
runClient(t, "udp", ":9991", &testConf{false, 0, false, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", false, false, true, false, 10, LeastConnections)
runClient(t, "udp", ":9992", &testConf{false, 0, false, true, false, false, 10, LeastConnections})
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", false, false, false, true, 10, RoundRobin)
runClient(t, "udp", ":9991", &testConf{false, 0, false, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", false, false, true, true, 10, LeastConnections)
runClient(t, "udp", ":9992", &testConf{false, 0, false, true, true, false, 10, LeastConnections})
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", false, false, false, false, 10, RoundRobin)
runClient(t, "unix", "gnet1.sock", &testConf{false, 0, false, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", false, false, true, false, 10, SourceAddrHash)
runClient(t, "unix", "gnet2.sock", &testConf{false, 0, false, true, false, false, 10, SourceAddrHash})
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", false, false, false, true, 10, RoundRobin)
runClient(t, "unix", "gnet1.sock", &testConf{false, 0, false, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", false, false, true, true, 10, SourceAddrHash)
runClient(t, "unix", "gnet2.sock", &testConf{false, 0, false, true, true, false, 10, SourceAddrHash})
})
})
})

t.Run("poll-ET", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", true, false, false, false, 10, RoundRobin)
runClient(t, "tcp", ":9991", &testConf{true, 0, false, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", true, false, true, false, 10, LeastConnections)
runClient(t, "tcp", ":9992", &testConf{true, 0, false, true, false, false, 10, LeastConnections})
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", true, false, false, true, 10, RoundRobin)
runClient(t, "tcp", ":9991", &testConf{true, 0, false, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", true, false, true, true, 10, LeastConnections)
runClient(t, "tcp", ":9992", &testConf{true, 0, false, true, true, false, 10, LeastConnections})
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", true, false, false, false, 10, RoundRobin)
runClient(t, "udp", ":9991", &testConf{true, 0, false, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", true, false, true, false, 10, LeastConnections)
runClient(t, "udp", ":9992", &testConf{true, 0, false, true, false, false, 10, LeastConnections})
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", true, false, false, true, 10, RoundRobin)
runClient(t, "udp", ":9991", &testConf{true, 0, false, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", true, false, true, true, 10, LeastConnections)
runClient(t, "udp", ":9992", &testConf{true, 0, false, true, true, false, 10, LeastConnections})
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", true, false, false, false, 10, RoundRobin)
runClient(t, "unix", "gnet1.sock", &testConf{true, 0, false, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", true, false, true, false, 10, SourceAddrHash)
runClient(t, "unix", "gnet2.sock", &testConf{true, 0, false, true, false, false, 10, SourceAddrHash})
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", true, false, false, true, 10, RoundRobin)
runClient(t, "unix", "gnet1.sock", &testConf{true, 0, false, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", true, false, true, true, 10, SourceAddrHash)
runClient(t, "unix", "gnet2.sock", &testConf{true, 0, false, true, true, false, 10, SourceAddrHash})
})
})
})

t.Run("poll-ET-chunk", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", &testConf{true, 1 << 18, false, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", &testConf{true, 1 << 19, false, true, false, false, 10, LeastConnections})
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", &testConf{true, 1 << 18, false, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", &testConf{true, 1 << 19, false, true, true, false, 10, LeastConnections})
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", &testConf{true, 1 << 18, false, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", &testConf{true, 1 << 19, false, true, false, false, 10, LeastConnections})
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", &testConf{true, 1 << 18, false, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", &testConf{true, 1 << 19, false, true, true, false, 10, LeastConnections})
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", &testConf{true, 1 << 18, false, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", &testConf{true, 1 << 19, false, true, false, false, 10, SourceAddrHash})
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", &testConf{true, 1 << 18, false, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", &testConf{true, 1 << 19, false, true, true, false, 10, SourceAddrHash})
})
})
})

t.Run("poll-reuseport-LT", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", false, true, false, false, 10, RoundRobin)
runClient(t, "tcp", ":9991", &testConf{false, 0, true, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", false, true, true, false, 10, LeastConnections)
runClient(t, "tcp", ":9992", &testConf{false, 0, true, true, false, false, 10, LeastConnections})
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", false, true, false, true, 10, RoundRobin)
runClient(t, "tcp", ":9991", &testConf{false, 0, true, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", false, true, true, false, 10, LeastConnections)
runClient(t, "tcp", ":9992", &testConf{false, 0, true, true, false, false, 10, LeastConnections})
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", false, true, false, false, 10, RoundRobin)
runClient(t, "udp", ":9991", &testConf{false, 0, true, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", false, true, true, false, 10, LeastConnections)
runClient(t, "udp", ":9992", &testConf{false, 0, true, true, false, false, 10, LeastConnections})
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", false, true, false, false, 10, RoundRobin)
runClient(t, "udp", ":9991", &testConf{false, 0, true, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", false, true, true, true, 10, LeastConnections)
runClient(t, "udp", ":9992", &testConf{false, 0, true, true, true, false, 10, LeastConnections})
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", false, true, false, false, 10, RoundRobin)
runClient(t, "unix", "gnet1.sock", &testConf{false, 0, true, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", false, true, true, false, 10, LeastConnections)
runClient(t, "unix", "gnet2.sock", &testConf{false, 0, true, true, false, false, 10, LeastConnections})
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", false, true, false, true, 10, RoundRobin)
runClient(t, "unix", "gnet1.sock", &testConf{false, 0, true, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", false, true, true, true, 10, LeastConnections)
runClient(t, "unix", "gnet2.sock", &testConf{false, 0, true, true, true, false, 10, LeastConnections})
})
})
})

t.Run("poll-reuseport-ET", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", true, true, false, false, 10, RoundRobin)
runClient(t, "tcp", ":9991", &testConf{true, 0, true, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", true, true, true, false, 10, LeastConnections)
runClient(t, "tcp", ":9992", &testConf{true, 0, true, true, false, false, 10, LeastConnections})
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", true, true, false, true, 10, RoundRobin)
runClient(t, "tcp", ":9991", &testConf{true, 0, true, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", true, true, true, false, 10, LeastConnections)
runClient(t, "tcp", ":9992", &testConf{true, 0, true, true, false, false, 10, LeastConnections})
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", true, true, false, false, 10, RoundRobin)
runClient(t, "udp", ":9991", &testConf{true, 0, true, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", true, true, true, false, 10, LeastConnections)
runClient(t, "udp", ":9992", &testConf{true, 0, true, true, false, false, 10, LeastConnections})
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", true, true, false, false, 10, RoundRobin)
runClient(t, "udp", ":9991", &testConf{true, 0, true, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", true, true, true, true, 10, LeastConnections)
runClient(t, "udp", ":9992", &testConf{true, 0, true, true, true, false, 10, LeastConnections})
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", true, true, false, false, 10, RoundRobin)
runClient(t, "unix", "gnet1.sock", &testConf{true, 0, true, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", true, true, true, false, 10, LeastConnections)
runClient(t, "unix", "gnet2.sock", &testConf{true, 0, true, true, false, false, 10, LeastConnections})
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", true, true, false, true, 10, RoundRobin)
runClient(t, "unix", "gnet1.sock", &testConf{true, 0, true, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", true, true, true, true, 10, LeastConnections)
runClient(t, "unix", "gnet2.sock", &testConf{true, 0, true, true, true, false, 10, LeastConnections})
})
})
})
Expand Down Expand Up @@ -426,20 +477,22 @@ func (s *testClient) OnTick() (delay time.Duration, action Action) {
return
}

func runClient(t *testing.T, network, addr string, et, reuseport, multicore, async bool, nclients int, lb LoadBalancing) {
func runClient(t *testing.T, network, addr string, conf *testConf) {
ts := &testClient{
tester: t,
network: network,
addr: addr,
multicore: multicore,
async: async,
nclients: nclients,
multicore: conf.multicore,
async: conf.async,
nclients: conf.clients,
workerPool: goPool.Default(),
}
var err error
clientEV := &clientEvents{tester: t, packetLen: streamLen, svr: ts}
ts.client, err = NewClient(
clientEV,
WithEdgeTriggeredIO(conf.et),
WithEdgeTriggeredIOChunk(conf.etChunk),
WithTCPNoDelay(TCPNoDelay),
WithLockOSThread(true),
WithTicker(true),
Expand All @@ -452,13 +505,14 @@ func runClient(t *testing.T, network, addr string, et, reuseport, multicore, asy

err = Run(ts,
network+"://"+addr,
WithEdgeTriggeredIO(et),
WithLockOSThread(async),
WithMulticore(multicore),
WithReusePort(reuseport),
WithEdgeTriggeredIO(conf.et),
WithEdgeTriggeredIOChunk(conf.etChunk),
WithLockOSThread(conf.async),
WithMulticore(conf.multicore),
WithReusePort(conf.reuseport),
WithTicker(true),
WithTCPKeepAlive(time.Minute*1),
WithLoadBalancing(lb))
WithLoadBalancing(conf.lb))
assert.NoError(t, err)
}

Expand Down
7 changes: 7 additions & 0 deletions client_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
poller: p,
}

if options.EdgeTriggeredIOChunk > 0 {
options.EdgeTriggeredIO = true
options.EdgeTriggeredIOChunk = math.CeilToPowerOfTwo(options.EdgeTriggeredIOChunk)
} else if options.EdgeTriggeredIO {
options.EdgeTriggeredIOChunk = 1 << 20 // 1MB
}

rbc := options.ReadBufferCap
switch {
case rbc <= 0:
Expand Down
4 changes: 2 additions & 2 deletions connection_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ type udpConn struct {
}

type openConn struct {
c *conn
cb func()
c *conn
cb func()
}

type conn struct {
Expand Down
Loading

0 comments on commit 2e261de

Please sign in to comment.