Skip to content

Commit

Permalink
opt: clamp the event-loops in ET mode to avaoid starving (#599)
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 authored May 1, 2024
1 parent 4c3b84f commit f03f0bd
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 60 deletions.
2 changes: 1 addition & 1 deletion connection_bsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (c *conn) processIO(_ int, filter netpoll.IOEvent, flags netpoll.IOFlags) (
// 1) EVFILT_WRITE|EV_ADD|EV_CLEAR|EV_EOF, 2) EVFILT_READ|EV_ADD|EV_CLEAR|EV_EOF.
err = el.write(c)
default:
c.outboundBuffer.Release() // don't bother to write to a connection with some unknown error
c.outboundBuffer.Release() // don't bother to write to a connection that is already broken
err = el.close(c, io.EOF)
}
}
Expand Down
8 changes: 4 additions & 4 deletions connection_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ func (c *conn) processIO(_ int, ev netpoll.IOEvent, _ netpoll.IOFlags) error {
el := c.loop
// First check for any unexpected non-IO events.
// For these events we just close the connection directly.
if ev&netpoll.ErrEvents != 0 && ev&unix.EPOLLIN == 0 && ev&unix.EPOLLOUT == 0 {
c.outboundBuffer.Release() // don't bother to write to a connection with some unknown error
if ev&(netpoll.ErrEvents|unix.EPOLLRDHUP) != 0 && ev&netpoll.ReadWriteEvents == 0 {
c.outboundBuffer.Release() // don't bother to write to a connection that is already broken
return el.close(c, io.EOF)
}
// Secondly, check for EPOLLOUT before EPOLLIN, the former has a higher priority
Expand All @@ -43,14 +43,14 @@ func (c *conn) processIO(_ int, ev netpoll.IOEvent, _ netpoll.IOFlags) error {
// to the remote first and then close the connection.
//
// We perform eventloop.write for EPOLLOUT because it can take good care of either case.
if ev&(unix.EPOLLOUT|unix.EPOLLERR) != 0 {
if ev&(netpoll.WriteEvents|netpoll.ErrEvents) != 0 {
if err := el.write(c); err != nil {
return err
}
}
// Check for EPOLLIN before EPOLLRDHUP in case that there are pending data in
// the socket buffer.
if ev&(unix.EPOLLIN|unix.EPOLLERR) != 0 {
if ev&(netpoll.ReadEvents|netpoll.ErrEvents) != 0 {
if err := el.read(c); err != nil {
return err
}
Expand Down
41 changes: 36 additions & 5 deletions eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,18 @@ func (el *eventloop) open(c *conn) error {
return el.handleAction(c, action)
}

func (el *eventloop) read0(itf interface{}) error {
return el.read(itf.(*conn))
}

const maxBytesTransferET = 1 << 22

func (el *eventloop) read(c *conn) error {
if !c.opened {
return nil
}

var recv int
isET := el.engine.opts.EdgeTriggeredIO
loop:
n, err := unix.Read(c.fd, el.buffer)
Expand All @@ -131,6 +138,7 @@ loop:
}
return el.close(c, os.NewSyscallError("read", err))
}
recv += n

c.buffer = el.buffer[:n]
action := el.eventHandler.OnTraffic(c)
Expand All @@ -144,13 +152,25 @@ loop:
_, _ = c.inboundBuffer.Write(c.buffer)
c.buffer = c.buffer[:0]

if isET || c.isEOF {
if c.isEOF || (isET && recv < maxBytesTransferET) {
goto loop
}

// To prevent infinite reading in ET mode and starving other events,
// we need to set up threshold for the maximum read bytes per connection
// on each event-loop. If the threshold is reached and there are still
// unread data in the socket buffer, we must issue another read event manually.
if isET && n == len(el.buffer) {
return el.poller.Trigger(queue.LowPriority, el.read0, c)
}

return nil
}

func (el *eventloop) write0(itf interface{}) error {
return el.write(itf.(*conn))
}

// The default value of UIO_MAXIOV/IOV_MAX is 1024 on Linux and most BSD-like OSs.
const iovMax = 1024

Expand All @@ -161,8 +181,9 @@ func (el *eventloop) write(c *conn) error {

isET := el.engine.opts.EdgeTriggeredIO
var (
n int
err error
n int
sent int
err error
)
loop:
iov, _ := c.outboundBuffer.Peek(-1)
Expand All @@ -182,14 +203,24 @@ loop:
default:
return el.close(c, os.NewSyscallError("write", err))
}
if isET && !c.outboundBuffer.IsEmpty() {
sent += n

if isET && !c.outboundBuffer.IsEmpty() && sent < maxBytesTransferET {
goto loop
}

// All data have been sent, it's no need to monitor the writable events for LT mode,
// remove the writable event from poller to help the future event-loops if necessary.
if !isET && c.outboundBuffer.IsEmpty() {
_ = el.poller.ModRead(&c.pollAttachment, false)
return el.poller.ModRead(&c.pollAttachment, false)
}

// To prevent infinite writing in ET mode and starving other events,
// we need to set up threshold for the maximum write bytes per connection
// on each event-loop. If the threshold is reached and there are still
// pending data to write, we must issue another write event manually.
if isET && !c.outboundBuffer.IsEmpty() {
return el.poller.Trigger(queue.HighPriority, el.write0, c)
}

return nil
Expand Down
30 changes: 15 additions & 15 deletions eventloop_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var (
func BenchmarkGC4El100k(b *testing.B) {
oldGc := debug.SetGCPercent(-1)

ts1 := benchServeGC(b, "tcp", ":9001", true, 4, 100000)
ts1 := benchServeGC(b, "tcp", ":0", true, 4, 100000)
b.Run("Run-4-eventloop-100000", func(b *testing.B) {
for i := 0; i < b.N; i++ {
runtime.GC()
Expand All @@ -62,7 +62,7 @@ func BenchmarkGC4El100k(b *testing.B) {
func BenchmarkGC4El200k(b *testing.B) {
oldGc := debug.SetGCPercent(-1)

ts1 := benchServeGC(b, "tcp", ":9001", true, 4, 200000)
ts1 := benchServeGC(b, "tcp", ":0", true, 4, 200000)
b.Run("Run-4-eventloop-200000", func(b *testing.B) {
for i := 0; i < b.N; i++ {
runtime.GC()
Expand All @@ -76,7 +76,7 @@ func BenchmarkGC4El200k(b *testing.B) {
func BenchmarkGC4El500k(b *testing.B) {
oldGc := debug.SetGCPercent(-1)

ts1 := benchServeGC(b, "tcp", ":9001", true, 4, 500000)
ts1 := benchServeGC(b, "tcp", ":0", true, 4, 500000)
b.Run("Run-4-eventloop-500000", func(b *testing.B) {
for i := 0; i < b.N; i++ {
runtime.GC()
Expand Down Expand Up @@ -146,73 +146,73 @@ func TestServeGC(t *testing.T) {
if testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 1, 10000)
testServeGC(t, "tcp", ":0", true, true, 1, 10000)
})
t.Run("1-loop-100000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 1, 100000)
testServeGC(t, "tcp", ":0", true, true, 1, 100000)
})
t.Run("1-loop-1000000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 1, 1000000)
testServeGC(t, "tcp", ":0", true, true, 1, 1000000)
})
t.Run("2-loop-10000", func(t *testing.T) {
if testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 2, 10000)
testServeGC(t, "tcp", ":0", true, true, 2, 10000)
})
t.Run("2-loop-100000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 2, 100000)
testServeGC(t, "tcp", ":0", true, true, 2, 100000)
})
t.Run("2-loop-1000000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 2, 1000000)
testServeGC(t, "tcp", ":0", true, true, 2, 1000000)
})
t.Run("4-loop-10000", func(t *testing.T) {
if testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 4, 10000)
testServeGC(t, "tcp", ":0", true, true, 4, 10000)
})
t.Run("4-loop-100000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 4, 100000)
testServeGC(t, "tcp", ":0", true, true, 4, 100000)
})
t.Run("4-loop-1000000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 4, 1000000)
testServeGC(t, "tcp", ":0", true, true, 4, 1000000)
})
t.Run("16-loop-10000", func(t *testing.T) {
if testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 16, 10000)
testServeGC(t, "tcp", ":0", true, true, 16, 10000)
})
t.Run("16-loop-100000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 16, 100000)
testServeGC(t, "tcp", ":0", true, true, 16, 100000)
})
t.Run("16-loop-1000000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 16, 1000000)
testServeGC(t, "tcp", ":0", true, true, 16, 1000000)
})
})
}
Expand Down
11 changes: 8 additions & 3 deletions internal/netpoll/defs_poller_epoll.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,14 @@ const (
MinPollEventsCap = 32
// MaxAsyncTasksAtOneTime is the maximum amount of asynchronous tasks that the event-loop will process at one time.
MaxAsyncTasksAtOneTime = 256
// ErrEvents represents exceptional events that are not read/write, like socket being closed,
// reading/writing from/to a closed socket, etc.
ErrEvents = unix.EPOLLERR | unix.EPOLLHUP | unix.EPOLLRDHUP
// ReadEvents represents readable events that are polled by epoll.
ReadEvents = unix.EPOLLIN | unix.EPOLLPRI
// WriteEvents represents writeable events that are polled by epoll.
WriteEvents = unix.EPOLLOUT
// ReadWriteEvents represents both readable and writeable events.
ReadWriteEvents = ReadEvents | WriteEvents
// ErrEvents represents exceptional events that occurred on the local side.
ErrEvents = unix.EPOLLERR | unix.EPOLLHUP
)

type eventList struct {
Expand Down
26 changes: 10 additions & 16 deletions internal/netpoll/poller_epoll_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,57 +193,51 @@ func (p *Poller) Polling(callback PollEventHandler) error {
}
}

const (
readEvents = unix.EPOLLIN | unix.EPOLLPRI | unix.EPOLLRDHUP
writeEvents = unix.EPOLLOUT | unix.EPOLLRDHUP
readWriteEvents = readEvents | writeEvents
)

// AddReadWrite registers the given file-descriptor with readable and writable events to the poller.
func (p *Poller) AddReadWrite(pa *PollAttachment, edgeTriggered bool) error {
var ev uint32 = readWriteEvents
var ev uint32 = ReadWriteEvents
if edgeTriggered {
ev |= unix.EPOLLET
ev |= unix.EPOLLET | unix.EPOLLRDHUP
}
return os.NewSyscallError("epoll_ctl add",
unix.EpollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev}))
}

// AddRead registers the given file-descriptor with readable event to the poller.
func (p *Poller) AddRead(pa *PollAttachment, edgeTriggered bool) error {
var ev uint32 = readEvents
var ev uint32 = ReadEvents
if edgeTriggered {
ev |= unix.EPOLLET
ev |= unix.EPOLLET | unix.EPOLLRDHUP
}
return os.NewSyscallError("epoll_ctl add",
unix.EpollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev}))
}

// AddWrite registers the given file-descriptor with writable event to the poller.
func (p *Poller) AddWrite(pa *PollAttachment, edgeTriggered bool) error {
var ev uint32 = writeEvents
var ev uint32 = WriteEvents
if edgeTriggered {
ev |= unix.EPOLLET
ev |= unix.EPOLLET | unix.EPOLLRDHUP
}
return os.NewSyscallError("epoll_ctl add",
unix.EpollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev}))
}

// ModRead renews the given file-descriptor with readable event in the poller.
func (p *Poller) ModRead(pa *PollAttachment, edgeTriggered bool) error {
var ev uint32 = readEvents
var ev uint32 = ReadEvents
if edgeTriggered {
ev |= unix.EPOLLET
ev |= unix.EPOLLET | unix.EPOLLRDHUP
}
return os.NewSyscallError("epoll_ctl mod",
unix.EpollCtl(p.fd, unix.EPOLL_CTL_MOD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev}))
}

// ModReadWrite renews the given file-descriptor with readable and writable events in the poller.
func (p *Poller) ModReadWrite(pa *PollAttachment, edgeTriggered bool) error {
var ev uint32 = readWriteEvents
var ev uint32 = ReadWriteEvents
if edgeTriggered {
ev |= unix.EPOLLET
ev |= unix.EPOLLET | unix.EPOLLRDHUP
}
return os.NewSyscallError("epoll_ctl mod",
unix.EpollCtl(p.fd, unix.EPOLL_CTL_MOD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev}))
Expand Down
26 changes: 10 additions & 16 deletions internal/netpoll/poller_epoll_ultimate.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,18 +195,12 @@ func (p *Poller) Polling() error {
}
}

const (
readEvents = unix.EPOLLIN | unix.EPOLLPRI | unix.EPOLLRDHUP
writeEvents = unix.EPOLLOUT | unix.EPOLLRDHUP
readWriteEvents = readEvents | writeEvents
)

// AddReadWrite registers the given file-descriptor with readable and writable events to the poller.
func (p *Poller) AddReadWrite(pa *PollAttachment, edgeTriggered bool) error {
var ev epollevent
ev.events = readWriteEvents
ev.events = ReadWriteEvents
if edgeTriggered {
ev.events |= unix.EPOLLET
ev.events |= unix.EPOLLET | unix.EPOLLRDHUP
}
convertPollAttachment(unsafe.Pointer(&ev.data), pa)
return os.NewSyscallError("epoll_ctl add", epollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &ev))
Expand All @@ -215,9 +209,9 @@ func (p *Poller) AddReadWrite(pa *PollAttachment, edgeTriggered bool) error {
// AddRead registers the given file-descriptor with readable event to the poller.
func (p *Poller) AddRead(pa *PollAttachment, edgeTriggered bool) error {
var ev epollevent
ev.events = readEvents
ev.events = ReadEvents
if edgeTriggered {
ev.events |= unix.EPOLLET
ev.events |= unix.EPOLLET | unix.EPOLLRDHUP
}
convertPollAttachment(unsafe.Pointer(&ev.data), pa)
return os.NewSyscallError("epoll_ctl add", epollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &ev))
Expand All @@ -226,9 +220,9 @@ func (p *Poller) AddRead(pa *PollAttachment, edgeTriggered bool) error {
// AddWrite registers the given file-descriptor with writable event to the poller.
func (p *Poller) AddWrite(pa *PollAttachment, edgeTriggered bool) error {
var ev epollevent
ev.events = writeEvents
ev.events = WriteEvents
if edgeTriggered {
ev.events |= unix.EPOLLET
ev.events |= unix.EPOLLET | unix.EPOLLRDHUP
}
convertPollAttachment(unsafe.Pointer(&ev.data), pa)
return os.NewSyscallError("epoll_ctl add", epollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &ev))
Expand All @@ -237,9 +231,9 @@ func (p *Poller) AddWrite(pa *PollAttachment, edgeTriggered bool) error {
// ModRead renews the given file-descriptor with readable event in the poller.
func (p *Poller) ModRead(pa *PollAttachment, edgeTriggered bool) error {
var ev epollevent
ev.events = readEvents
ev.events = ReadEvents
if edgeTriggered {
ev.events |= unix.EPOLLET
ev.events |= unix.EPOLLET | unix.EPOLLRDHUP
}
convertPollAttachment(unsafe.Pointer(&ev.data), pa)
return os.NewSyscallError("epoll_ctl mod", epollCtl(p.fd, unix.EPOLL_CTL_MOD, pa.FD, &ev))
Expand All @@ -248,9 +242,9 @@ func (p *Poller) ModRead(pa *PollAttachment, edgeTriggered bool) error {
// ModReadWrite renews the given file-descriptor with readable and writable events in the poller.
func (p *Poller) ModReadWrite(pa *PollAttachment, edgeTriggered bool) error {
var ev epollevent
ev.events = readWriteEvents
ev.events = ReadWriteEvents
if edgeTriggered {
ev.events |= unix.EPOLLET
ev.events |= unix.EPOLLET | unix.EPOLLRDHUP
}
convertPollAttachment(unsafe.Pointer(&ev.data), pa)
return os.NewSyscallError("epoll_ctl mod", epollCtl(p.fd, unix.EPOLL_CTL_MOD, pa.FD, &ev))
Expand Down

0 comments on commit f03f0bd

Please sign in to comment.