diff --git a/acceptor_unix.go b/acceptor_unix.go index 13177a8b8..10467b4e6 100644 --- a/acceptor_unix.go +++ b/acceptor_unix.go @@ -34,7 +34,7 @@ func (eng *engine) accept1(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error { if err != nil { switch err { case unix.EINTR, unix.EAGAIN, unix.ECONNABORTED: - // ECONNABORTED means that a socket on the listen + // ECONNABORTED indicates that a socket on the listen // queue was closed before we Accept()ed it; // it's a silly error, so try again. return nil @@ -66,11 +66,11 @@ func (el *eventloop) accept1(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) return el.readUDP1(fd, ev, flags) } - nfd, sa, err := socket.Accept(el.ln.fd) + nfd, sa, err := socket.Accept(fd) if err != nil { switch err { case unix.EINTR, unix.EAGAIN, unix.ECONNABORTED: - // ECONNABORTED means that a socket on the listen + // ECONNABORTED indicates that a socket on the listen // queue was closed before we Accept()ed it; // it's a silly error, so try again. return nil @@ -87,7 +87,11 @@ func (el *eventloop) accept1(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) } c := newTCPConn(nfd, el, sa, el.ln.addr, remoteAddr) - if err = el.poller.AddRead(&c.pollAttachment); err != nil { + addEvents := el.poller.AddRead + if el.engine.opts.EdgeTriggeredIO { + addEvents = el.poller.AddReadWrite + } + if err = addEvents(&c.pollAttachment, el.engine.opts.EdgeTriggeredIO); err != nil { return err } el.connections.addConn(c, el.idx) diff --git a/client_test.go b/client_test.go index 27c2b9ff5..3aa3d1fed 100644 --- a/client_test.go +++ b/client_test.go @@ -31,7 +31,7 @@ type connHandler struct { type clientEvents struct { *BuiltinEventEngine tester *testing.T - svr *testClientServer + svr *testClient packetLen int } @@ -87,117 +87,219 @@ func (ev *clientEvents) OnShutdown(e Engine) { assert.EqualValuesf(ev.tester, fd, -1, "expected -1, but got: %d", fd) } -func TestServeWithGnetClient(t *testing.T) { +func TestClient(t *testing.T) { // start an engine // connect 10 clients // each client will pipe random data for 1-3 seconds. // the writes to the engine will be random sizes. 0KB - 1MB. // the engine will echo back the data. // waits for graceful connection closing. - t.Run("poll", func(t *testing.T) { + t.Run("poll-LT", func(t *testing.T) { t.Run("tcp", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServeWithGnetClient(t, "tcp", ":9991", false, false, false, false, 10, RoundRobin) + runClient(t, "tcp", ":9991", false, false, false, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServeWithGnetClient(t, "tcp", ":9992", false, false, true, false, 10, LeastConnections) + runClient(t, "tcp", ":9992", false, false, true, false, 10, LeastConnections) }) }) t.Run("tcp-async", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServeWithGnetClient(t, "tcp", ":9991", false, false, false, true, 10, RoundRobin) + runClient(t, "tcp", ":9991", false, false, false, true, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServeWithGnetClient(t, "tcp", ":9992", false, false, true, true, 10, LeastConnections) + runClient(t, "tcp", ":9992", false, false, true, true, 10, LeastConnections) }) }) t.Run("udp", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServeWithGnetClient(t, "udp", ":9991", false, false, false, false, 10, RoundRobin) + runClient(t, "udp", ":9991", false, false, false, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServeWithGnetClient(t, "udp", ":9992", false, false, true, false, 10, LeastConnections) + runClient(t, "udp", ":9992", false, false, true, false, 10, LeastConnections) }) }) t.Run("udp-async", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServeWithGnetClient(t, "udp", ":9991", false, false, false, true, 10, RoundRobin) + runClient(t, "udp", ":9991", false, false, false, true, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServeWithGnetClient(t, "udp", ":9992", false, false, true, true, 10, LeastConnections) + runClient(t, "udp", ":9992", false, false, true, true, 10, LeastConnections) }) }) t.Run("unix", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServeWithGnetClient(t, "unix", "gnet1.sock", false, false, false, false, 10, RoundRobin) + runClient(t, "unix", "gnet1.sock", false, false, false, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServeWithGnetClient(t, "unix", "gnet2.sock", false, false, true, false, 10, SourceAddrHash) + runClient(t, "unix", "gnet2.sock", false, false, true, false, 10, SourceAddrHash) }) }) t.Run("unix-async", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServeWithGnetClient(t, "unix", "gnet1.sock", false, false, false, true, 10, RoundRobin) + runClient(t, "unix", "gnet1.sock", false, false, false, true, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServeWithGnetClient(t, "unix", "gnet2.sock", false, false, true, true, 10, SourceAddrHash) + runClient(t, "unix", "gnet2.sock", false, false, true, true, 10, SourceAddrHash) }) }) }) - t.Run("poll-reuseport", func(t *testing.T) { + t.Run("poll-ET", func(t *testing.T) { t.Run("tcp", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServeWithGnetClient(t, "tcp", ":9991", true, true, false, false, 10, RoundRobin) + runClient(t, "tcp", ":9991", true, false, false, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServeWithGnetClient(t, "tcp", ":9992", true, true, true, false, 10, LeastConnections) + runClient(t, "tcp", ":9992", true, false, true, false, 10, LeastConnections) }) }) t.Run("tcp-async", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServeWithGnetClient(t, "tcp", ":9991", true, true, false, true, 10, RoundRobin) + runClient(t, "tcp", ":9991", true, false, false, true, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServeWithGnetClient(t, "tcp", ":9992", true, true, true, false, 10, LeastConnections) + runClient(t, "tcp", ":9992", true, false, true, true, 10, LeastConnections) }) }) t.Run("udp", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServeWithGnetClient(t, "udp", ":9991", true, true, false, false, 10, RoundRobin) + runClient(t, "udp", ":9991", true, false, false, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServeWithGnetClient(t, "udp", ":9992", true, true, true, false, 10, LeastConnections) + runClient(t, "udp", ":9992", true, false, true, false, 10, LeastConnections) }) }) t.Run("udp-async", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServeWithGnetClient(t, "udp", ":9991", true, true, false, false, 10, RoundRobin) + runClient(t, "udp", ":9991", true, false, false, true, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServeWithGnetClient(t, "udp", ":9992", true, true, true, true, 10, LeastConnections) + runClient(t, "udp", ":9992", true, false, true, true, 10, LeastConnections) }) }) t.Run("unix", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServeWithGnetClient(t, "unix", "gnet1.sock", true, true, false, false, 10, RoundRobin) + runClient(t, "unix", "gnet1.sock", true, false, false, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServeWithGnetClient(t, "unix", "gnet2.sock", true, true, true, false, 10, LeastConnections) + runClient(t, "unix", "gnet2.sock", true, false, true, false, 10, SourceAddrHash) }) }) t.Run("unix-async", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServeWithGnetClient(t, "unix", "gnet1.sock", true, true, false, true, 10, RoundRobin) + runClient(t, "unix", "gnet1.sock", true, false, false, true, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServeWithGnetClient(t, "unix", "gnet2.sock", true, true, true, true, 10, LeastConnections) + runClient(t, "unix", "gnet2.sock", true, false, true, true, 10, SourceAddrHash) + }) + }) + }) + + t.Run("poll-LT-reuseport", func(t *testing.T) { + t.Run("tcp", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runClient(t, "tcp", ":9991", false, true, false, false, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runClient(t, "tcp", ":9992", false, true, true, false, 10, LeastConnections) + }) + }) + t.Run("tcp-async", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runClient(t, "tcp", ":9991", false, true, false, true, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runClient(t, "tcp", ":9992", false, true, true, false, 10, LeastConnections) + }) + }) + t.Run("udp", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runClient(t, "udp", ":9991", false, true, false, false, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runClient(t, "udp", ":9992", false, true, true, false, 10, LeastConnections) + }) + }) + t.Run("udp-async", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runClient(t, "udp", ":9991", false, true, false, false, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runClient(t, "udp", ":9992", false, true, true, true, 10, LeastConnections) + }) + }) + t.Run("unix", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runClient(t, "unix", "gnet1.sock", false, true, false, false, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runClient(t, "unix", "gnet2.sock", false, true, true, false, 10, LeastConnections) + }) + }) + t.Run("unix-async", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runClient(t, "unix", "gnet1.sock", false, true, false, true, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runClient(t, "unix", "gnet2.sock", false, true, true, true, 10, LeastConnections) + }) + }) + }) + + t.Run("poll-ET-reuseport", func(t *testing.T) { + t.Run("tcp", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runClient(t, "tcp", ":9991", true, true, false, false, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runClient(t, "tcp", ":9992", true, true, true, false, 10, LeastConnections) + }) + }) + t.Run("tcp-async", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runClient(t, "tcp", ":9991", true, true, false, true, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runClient(t, "tcp", ":9992", true, true, true, false, 10, LeastConnections) + }) + }) + t.Run("udp", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runClient(t, "udp", ":9991", true, true, false, false, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runClient(t, "udp", ":9992", true, true, true, false, 10, LeastConnections) + }) + }) + t.Run("udp-async", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runClient(t, "udp", ":9991", true, true, false, false, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runClient(t, "udp", ":9992", true, true, true, true, 10, LeastConnections) + }) + }) + t.Run("unix", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runClient(t, "unix", "gnet1.sock", true, true, false, false, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runClient(t, "unix", "gnet2.sock", true, true, true, false, 10, LeastConnections) + }) + }) + t.Run("unix-async", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runClient(t, "unix", "gnet1.sock", true, true, false, true, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runClient(t, "unix", "gnet2.sock", true, true, true, true, 10, LeastConnections) }) }) }) } -type testClientServer struct { +type testClient struct { *BuiltinEventEngine client *Client tester *testing.T @@ -215,12 +317,12 @@ type testClientServer struct { udpReadHeader int32 } -func (s *testClientServer) OnBoot(eng Engine) (action Action) { +func (s *testClient) OnBoot(eng Engine) (action Action) { s.eng = eng return } -func (s *testClientServer) OnOpen(c Conn) (out []byte, action Action) { +func (s *testClient) OnOpen(c Conn) (out []byte, action Action) { c.SetContext(&sync.Once{}) atomic.AddInt32(&s.connected, 1) require.NotNil(s.tester, c.LocalAddr(), "nil local addr") @@ -228,7 +330,7 @@ func (s *testClientServer) OnOpen(c Conn) (out []byte, action Action) { return } -func (s *testClientServer) OnClose(c Conn, err error) (action Action) { +func (s *testClient) OnClose(c Conn, err error) (action Action) { if err != nil { logging.Debugf("error occurred on closed, %v\n", err) } @@ -246,13 +348,13 @@ func (s *testClientServer) OnClose(c Conn, err error) (action Action) { return } -func (s *testClientServer) OnShutdown(Engine) { +func (s *testClient) OnShutdown(Engine) { if s.network == "udp" { require.EqualValues(s.tester, int32(s.nclients), atomic.LoadInt32(&s.udpReadHeader)) } } -func (s *testClientServer) OnTraffic(c Conn) (action Action) { +func (s *testClient) OnTraffic(c Conn) (action Action) { readHeader := func() { ping := make([]byte, len(pingMsg)) n, err := io.ReadFull(c, ping) @@ -302,7 +404,7 @@ func (s *testClientServer) OnTraffic(c Conn) (action Action) { return } -func (s *testClientServer) OnTick() (delay time.Duration, action Action) { +func (s *testClient) OnTick() (delay time.Duration, action Action) { delay = time.Second / 5 if atomic.CompareAndSwapInt32(&s.started, 0, 1) { for i := 0; i < s.nclients; i++ { @@ -321,8 +423,8 @@ func (s *testClientServer) OnTick() (delay time.Duration, action Action) { return } -func testServeWithGnetClient(t *testing.T, network, addr string, reuseport, reuseaddr, multicore, async bool, nclients int, lb LoadBalancing) { - ts := &testClientServer{ +func runClient(t *testing.T, network, addr string, et, reuseport, multicore, async bool, nclients int, lb LoadBalancing) { + ts := &testClient{ tester: t, network: network, addr: addr, @@ -347,10 +449,10 @@ func testServeWithGnetClient(t *testing.T, network, addr string, reuseport, reus err = Run(ts, network+"://"+addr, + WithEdgeTriggeredIO(et), WithLockOSThread(async), WithMulticore(multicore), WithReusePort(reuseport), - WithReuseAddr(reuseaddr), WithTicker(true), WithTCPKeepAlive(time.Minute*1), WithTCPNoDelay(TCPDelay), diff --git a/connection_bsd.go b/connection_bsd.go index 507f636df..6a085c166 100644 --- a/connection_bsd.go +++ b/connection_bsd.go @@ -20,25 +20,39 @@ package gnet import ( "io" + "golang.org/x/sys/unix" + "github.com/panjf2000/gnet/v2/internal/netpoll" ) func (c *conn) handleEvents(_ int, filter int16, flags uint16) (err error) { - switch { - case flags&netpoll.EVFlagsDelete != 0: - case flags&netpoll.EVFlagsEOF != 0: - switch { - case filter == netpoll.EVFilterRead: // read the remaining data after the peer wrote and closed immediately - err = c.loop.read(c) - case filter == netpoll.EVFilterWrite && !c.outboundBuffer.IsEmpty(): - err = c.loop.write(c) + el := c.loop + switch filter { + case unix.EVFILT_READ: + err = el.read(c) + case unix.EVFILT_WRITE: + err = el.write(c) + } + // EV_EOF indicates that the remote has closed the connection. + // We check for EV_EOF after processing the read/write event + // to ensure that nothing is left out on this event filter. + if flags&unix.EV_EOF != 0 && c.opened && err == nil { + switch filter { + case unix.EVFILT_READ: + // Receive the event of EVFILT_READ | EV_EOF, but the previous eventloop.read + // failed to drain the socket buffer, so we make sure we get it done this time. + c.isEOF = true + err = el.read(c) + case unix.EVFILT_WRITE: + // On macOS, the kqueue in both LT and ET mode will notify with one event for the EOF + // of the TCP remote: EVFILT_READ|EV_ADD|EV_CLEAR|EV_EOF. But for some reason, two + // events will be issued in ET mode for the EOF of the Unix remote in this order: + // 1) EVFILT_WRITE|EV_ADD|EV_CLEAR|EV_EOF, 2) EVFILT_READ|EV_ADD|EV_CLEAR|EV_EOF. + err = el.write(c) default: - err = c.loop.close(c, io.EOF) + c.outboundBuffer.Release() // don't bother to write to a connection with some unknown error + err = el.close(c, io.EOF) } - case filter == netpoll.EVFilterRead: - err = c.loop.read(c) - case filter == netpoll.EVFilterWrite && !c.outboundBuffer.IsEmpty(): - err = c.loop.write(c) } return } diff --git a/connection_linux.go b/connection_linux.go index 1f951394e..30af24d59 100644 --- a/connection_linux.go +++ b/connection_linux.go @@ -17,29 +17,55 @@ package gnet -import "github.com/panjf2000/gnet/v2/internal/netpoll" +import ( + "io" -func (c *conn) handleEvents(_ int, ev uint32) error { - // Don't change the ordering of processing EPOLLOUT | EPOLLRDHUP / EPOLLIN unless you're 100% - // sure what you're doing! - // Re-ordering can easily introduce bugs and bad side-effects, as I found out painfully in the past. + "golang.org/x/sys/unix" + + "github.com/panjf2000/gnet/v2/internal/netpoll" +) - // We should always check for the EPOLLOUT event first, as we must try to send the leftover data back to - // the peer when any error occurs on a connection. +func (c *conn) handleEvents(_ int, ev uint32) error { + el := c.loop + // First check for any unexpected non-IO events. + // For these events we just close the corresponding connection directly. + if ev&netpoll.ErrEvents != 0 && ev&unix.EPOLLIN == 0 && ev&unix.EPOLLOUT == 0 { + c.outboundBuffer.Release() // don't bother to write to a connection with some unknown error + return el.close(c, io.EOF) + } + // Secondly, check for EPOLLOUT before EPOLLIN, the former has a higher priority + // than the latter regardless of the aliveness of the current connection: + // + // 1. When the connection is alive and the system is overloaded, we want to + // offload the incoming traffic by writing all pending data back to the remotes + // before continuing to read and handle requests. + // 2. When the connection is dead, we need to try writing any pending data back + // to the remote and close the connection first. // - // Either an EPOLLOUT or EPOLLERR event may be fired when a connection is refused. - // In either case write() should take care of it properly: - // 1) writing data back, - // 2) closing the connection. - if ev&netpoll.OutEvents != 0 && !c.outboundBuffer.IsEmpty() { - if err := c.loop.write(c); err != nil { + // We perform eventloop.write for EPOLLOUT because it will take good care of either case. + if ev&(unix.EPOLLOUT|unix.EPOLLERR) != 0 { + if err := el.write(c); err != nil { return err } } - if ev&netpoll.InEvents != 0 { - return c.loop.read(c) + // Check for EPOLLIN before EPOLLRDHUP in case that there are pending data in + // the socket buffer. + if ev&(unix.EPOLLIN|unix.EPOLLERR) != 0 { + if err := el.read(c); err != nil { + return err + } + } + // Ultimately, check for EPOLLRDHUP, this event indicates that the remote has + // either closed connection or shut down the writing half of the connection. + if ev&unix.EPOLLRDHUP != 0 && c.opened { + if ev&unix.EPOLLIN == 0 { // unreadable EPOLLRDHUP, close the connection directly + return el.close(c, io.EOF) + } + // Received the event of EPOLLIN | EPOLLRDHUP, but the previous eventloop.read + // failed to drain the socket buffer, so we make sure we get it done this time. + c.isEOF = true + return el.read(c) } - return nil } diff --git a/connection_unix.go b/connection_unix.go index b82bc9336..18fe3e26e 100644 --- a/connection_unix.go +++ b/connection_unix.go @@ -41,22 +41,23 @@ type conn struct { fd int // file descriptor gfd gfd.GFD // gnet file descriptor ctx interface{} // user-defined context - peer unix.Sockaddr // remote socket address + remote unix.Sockaddr // remote socket address localAddr net.Addr // local addr remoteAddr net.Addr // remote addr loop *eventloop // connected event-loop - outboundBuffer elastic.Buffer // buffer for data that is eligible to be sent to the peer + outboundBuffer elastic.Buffer // buffer for data that is eligible to be sent to the remote pollAttachment netpoll.PollAttachment // connection attachment for poller - inboundBuffer elastic.RingBuffer // buffer for leftover data from the peer + inboundBuffer elastic.RingBuffer // buffer for leftover data from the remote buffer []byte // buffer for the latest bytes isDatagram bool // UDP protocol opened bool // connection opened event fired + isEOF bool // whether the connection has reached EOF } func newTCPConn(fd int, el *eventloop, sa unix.Sockaddr, localAddr, remoteAddr net.Addr) (c *conn) { c = &conn{ fd: fd, - peer: sa, + remote: sa, loop: el, localAddr: localAddr, remoteAddr: remoteAddr, @@ -71,7 +72,7 @@ func newUDPConn(fd int, el *eventloop, localAddr net.Addr, sa unix.Sockaddr, con c = &conn{ fd: fd, gfd: gfd.NewGFD(fd, el.idx, 0, 0), - peer: sa, + remote: sa, loop: el, localAddr: localAddr, remoteAddr: socket.SockaddrToUDPAddr(sa), @@ -79,13 +80,14 @@ func newUDPConn(fd int, el *eventloop, localAddr net.Addr, sa unix.Sockaddr, con pollAttachment: netpoll.PollAttachment{FD: fd, Callback: el.readUDP}, } if connected { - c.peer = nil + c.remote = nil } return } func (c *conn) release() { c.opened = false + c.isEOF = false c.ctx = nil c.buffer = nil if addr, ok := c.localAddr.(*net.TCPAddr); ok && c.localAddr != c.loop.ln.addr && len(addr.Zone) > 0 { @@ -102,92 +104,122 @@ func (c *conn) release() { } c.localAddr = nil c.remoteAddr = nil - c.pollAttachment.FD, c.pollAttachment.Callback = 0, nil if !c.isDatagram { - c.peer = nil + c.remote = nil c.inboundBuffer.Done() c.outboundBuffer.Release() } } func (c *conn) open(buf []byte) error { - if c.isDatagram && c.peer == nil { + if c.isDatagram && c.remote == nil { return unix.Send(c.fd, buf, 0) } - n, err := unix.Write(c.fd, buf) - if err != nil && err == unix.EAGAIN { - _, _ = c.outboundBuffer.Write(buf) - return nil - } - - if err == nil && n < len(buf) { - _, _ = c.outboundBuffer.Write(buf[n:]) + for { + n, err := unix.Write(c.fd, buf) + if err != nil { + if err == unix.EAGAIN { + _, _ = c.outboundBuffer.Write(buf) + break + } + return err + } + buf = buf[n:] + if len(buf) == 0 { + break + } } - return err + return nil } func (c *conn) write(data []byte) (n int, err error) { + isET := c.loop.engine.opts.EdgeTriggeredIO n = len(data) - // If there is pending data in outbound buffer, the current data ought to be appended to the outbound buffer - // for maintaining the sequence of network packets. + // If there is pending data in outbound buffer, + // the current data ought to be appended to the + // outbound buffer for maintaining the sequence + // of network packets. if !c.outboundBuffer.IsEmpty() { _, _ = c.outboundBuffer.Write(data) + if isET { + err = c.loop.write(c) + } return } var sent int +loop: if sent, err = unix.Write(c.fd, data); err != nil { - // A temporary error occurs, append the data to outbound buffer, writing it back to the peer in the next round. + // A temporary error occurs, append the data to outbound buffer, + // writing it back to the remote in the next round for LT mode. if err == unix.EAGAIN { - _, _ = c.outboundBuffer.Write(data) - err = c.loop.poller.ModReadWrite(&c.pollAttachment) + _, err = c.outboundBuffer.Write(data) + if !isET { + err = c.loop.poller.ModReadWrite(&c.pollAttachment, false) + } return } if err := c.loop.close(c, os.NewSyscallError("write", err)); err != nil { - logging.Errorf("failed to close connection(fd=%d,peer=%+v) on conn.write: %v", + logging.Errorf("failed to close connection(fd=%d,remote=%+v) on conn.write: %v", c.fd, c.remoteAddr, err) } return 0, os.NewSyscallError("write", err) } - // Failed to send all data back to the peer, buffer the leftover data for the next round. - if sent < n { - _, _ = c.outboundBuffer.Write(data[sent:]) - err = c.loop.poller.ModReadWrite(&c.pollAttachment) + data = data[sent:] + if isET && len(data) > 0 { + goto loop } + // Failed to send all data back to the remote, buffer the leftover data for the next round. + if len(data) > 0 { + _, _ = c.outboundBuffer.Write(data) + err = c.loop.poller.ModReadWrite(&c.pollAttachment, false) + } + return } func (c *conn) writev(bs [][]byte) (n int, err error) { + isET := c.loop.engine.opts.EdgeTriggeredIO + for _, b := range bs { n += len(b) } - // If there is pending data in outbound buffer, the current data ought to be appended to the outbound buffer - // for maintaining the sequence of network packets. + // If there is pending data in outbound buffer, + // the current data ought to be appended to the + // outbound buffer for maintaining the sequence + // of network packets. if !c.outboundBuffer.IsEmpty() { _, _ = c.outboundBuffer.Writev(bs) + if isET { + err = c.loop.write(c) + } return } + remaining := n var sent int +loop: if sent, err = gio.Writev(c.fd, bs); err != nil { - // A temporary error occurs, append the data to outbound buffer, writing it back to the peer in the next round. + // A temporary error occurs, append the data to outbound buffer, + // writing it back to the remote in the next round for LT mode. if err == unix.EAGAIN { - _, _ = c.outboundBuffer.Writev(bs) - err = c.loop.poller.ModReadWrite(&c.pollAttachment) + _, err = c.outboundBuffer.Writev(bs) + if !isET { + err = c.loop.poller.ModReadWrite(&c.pollAttachment, false) + } return } if err := c.loop.close(c, os.NewSyscallError("writev", err)); err != nil { - logging.Errorf("failed to close connection(fd=%d,peer=%+v) on conn.writev: %v", + logging.Errorf("failed to close connection(fd=%d,remote=%+v) on conn.writev: %v", c.fd, c.remoteAddr, err) } return 0, os.NewSyscallError("writev", err) } - // Failed to send all data back to the peer, buffer the leftover data for the next round. - if sent < n { - var pos int + pos := len(bs) + if remaining -= sent; remaining > 0 { for i := range bs { bn := len(bs[i]) if sent < bn { @@ -197,9 +229,18 @@ func (c *conn) writev(bs [][]byte) (n int, err error) { } sent -= bn } - _, _ = c.outboundBuffer.Writev(bs[pos:]) - err = c.loop.poller.ModReadWrite(&c.pollAttachment) } + bs = bs[pos:] + if isET && remaining > 0 { + goto loop + } + + // Failed to send all data back to the remote, buffer the leftover data for the next round. + if remaining > 0 { + _, _ = c.outboundBuffer.Writev(bs) + err = c.loop.poller.ModReadWrite(&c.pollAttachment, false) + } + return } @@ -246,10 +287,10 @@ func (c *conn) asyncWritev(itf interface{}) (err error) { } func (c *conn) sendTo(buf []byte) error { - if c.peer == nil { + if c.remote == nil { return unix.Send(c.fd, buf, 0) } - return unix.Sendto(c.fd, buf, 0, c.peer) + return unix.Sendto(c.fd, buf, 0, c.remote) } func (c *conn) resetBuffer() { @@ -290,13 +331,13 @@ func (c *conn) Next(n int) (buf []byte, err error) { } head, tail := c.inboundBuffer.Peek(n) defer c.inboundBuffer.Discard(n) //nolint:errcheck - if len(head) >= n { + if len(head) == n { return head[:n], err } c.loop.cache.Reset() c.loop.cache.Write(head) c.loop.cache.Write(tail) - if inBufferLen >= n { + if inBufferLen == n { return c.loop.cache.Bytes(), err } @@ -317,13 +358,13 @@ func (c *conn) Peek(n int) (buf []byte, err error) { return c.buffer[:n], err } head, tail := c.inboundBuffer.Peek(n) - if len(head) >= n { + if len(head) == n { return head[:n], err } c.loop.cache.Reset() c.loop.cache.Write(head) c.loop.cache.Write(tail) - if inBufferLen >= n { + if inBufferLen == n { return c.loop.cache.Bytes(), err } @@ -389,10 +430,6 @@ func (c *conn) WriteTo(w io.Writer) (n int64, err error) { } func (c *conn) Flush() error { - if c.outboundBuffer.IsEmpty() { - return nil - } - return c.loop.write(c) } @@ -414,7 +451,7 @@ func (c *conn) RemoteAddr() net.Addr { return c.remoteAddr } // func (c *conn) Gfd() gfd.GFD { return c.gfd } func (c *conn) Fd() int { return c.fd } -func (c *conn) Dup() (fd int, err error) { fd, _, err = netpoll.Dup(c.fd); return } +func (c *conn) Dup() (fd int, err error) { fd, _, err = socket.Dup(c.fd); return } func (c *conn) SetReadBuffer(bytes int) error { return socket.SetRecvBuffer(c.fd, bytes) } func (c *conn) SetWriteBuffer(bytes int) error { return socket.SetSendBuffer(c.fd, bytes) } func (c *conn) SetLinger(sec int) error { return socket.SetLinger(c.fd, sec) } diff --git a/connection_windows.go b/connection_windows.go index 61619b5bb..21ebb64d4 100644 --- a/connection_windows.go +++ b/connection_windows.go @@ -54,8 +54,8 @@ type conn struct { buffer *bbPool.ByteBuffer // reuse memory of inbound data as a temporary buffer rawConn net.Conn // original connection localAddr net.Addr // local server addr - remoteAddr net.Addr // remote peer addr - inboundBuffer elastic.RingBuffer // buffer for data from the peer + remoteAddr net.Addr // remote addr + inboundBuffer elastic.RingBuffer // buffer for data from the remote } func packTCPConn(c *conn, buf []byte) *tcpConn { diff --git a/engine_unix.go b/engine_unix.go index 5040a3fe1..261b67100 100644 --- a/engine_unix.go +++ b/engine_unix.go @@ -66,13 +66,6 @@ func (eng *engine) shutdown(err error) { }) } -func (eng *engine) startEventLoops() { - eng.eventLoops.iterate(func(_ int, el *eventloop) bool { - eng.workerPool.Go(el.run) - return true - }) -} - func (eng *engine) closeEventLoops() { eng.eventLoops.iterate(func(_ int, el *eventloop) bool { el.ln.close() @@ -88,14 +81,7 @@ func (eng *engine) closeEventLoops() { } } -func (eng *engine) startSubReactors() { - eng.eventLoops.iterate(func(_ int, el *eventloop) bool { - eng.workerPool.Go(el.activateSubReactor) - return true - }) -} - -func (eng *engine) activateEventLoops(numEventLoop int) (err error) { +func (eng *engine) runEventLoops(numEventLoop int) (err error) { network, address := eng.ln.network, eng.ln.address ln := eng.ln var striker *eventloop @@ -115,7 +101,7 @@ func (eng *engine) activateEventLoops(numEventLoop int) (err error) { el.buffer = make([]byte, eng.opts.ReadBufferCap) el.connections.init() el.eventHandler = eng.eventHandler - if err = el.poller.AddRead(el.ln.packPollAttachment(el.accept)); err != nil { + if err = el.poller.AddRead(el.ln.packPollAttachment(el.accept), false); err != nil { return } eng.eventLoops.register(el) @@ -130,7 +116,10 @@ func (eng *engine) activateEventLoops(numEventLoop int) (err error) { } // Start event-loops in background. - eng.startEventLoops() + eng.eventLoops.iterate(func(_ int, el *eventloop) bool { + eng.workerPool.Go(el.run) + return true + }) eng.workerPool.Go(func() error { striker.ticker(eng.ticker.ctx) @@ -157,7 +146,10 @@ func (eng *engine) activateReactors(numEventLoop int) error { } // Start sub reactors in background. - eng.startSubReactors() + eng.eventLoops.iterate(func(_ int, el *eventloop) bool { + eng.workerPool.Go(el.orbit) + return true + }) if p, err := netpoll.OpenPoller(); err == nil { el := new(eventloop) @@ -166,13 +158,13 @@ func (eng *engine) activateReactors(numEventLoop int) error { el.engine = eng el.poller = p el.eventHandler = eng.eventHandler - if err = el.poller.AddRead(eng.ln.packPollAttachment(eng.accept)); err != nil { + if err = el.poller.AddRead(eng.ln.packPollAttachment(eng.accept), false); err != nil { return err } eng.acceptor = el // Start main reactor in background. - eng.workerPool.Go(el.activateMainReactor) + eng.workerPool.Go(el.rotate) } else { return err } @@ -190,7 +182,7 @@ func (eng *engine) activateReactors(numEventLoop int) error { func (eng *engine) start(numEventLoop int) error { if eng.opts.ReusePort || eng.ln.network == "udp" { - return eng.activateEventLoops(numEventLoop) + return eng.runEventLoops(numEventLoop) } return eng.activateReactors(numEventLoop) diff --git a/eventloop_unix.go b/eventloop_unix.go index e8c8eab92..2d6eae077 100644 --- a/eventloop_unix.go +++ b/eventloop_unix.go @@ -22,13 +22,14 @@ import ( "context" "errors" "fmt" + "io" "os" "strings" "time" "golang.org/x/sys/unix" - "github.com/panjf2000/gnet/v2/internal/io" + gio "github.com/panjf2000/gnet/v2/internal/io" "github.com/panjf2000/gnet/v2/internal/netpoll" "github.com/panjf2000/gnet/v2/internal/queue" errorx "github.com/panjf2000/gnet/v2/pkg/errors" @@ -75,7 +76,12 @@ func (el *eventloop) register(itf interface{}) error { defer ccb.cb() } - if err := el.poller.AddRead(&c.pollAttachment); err != nil { + addEvents := el.poller.AddRead + if el.engine.opts.EdgeTriggeredIO { + addEvents = el.poller.AddReadWrite + } + + if err := addEvents(&c.pollAttachment, el.engine.opts.EdgeTriggeredIO); err != nil { _ = unix.Close(c.fd) c.release() return err @@ -83,7 +89,7 @@ func (el *eventloop) register(itf interface{}) error { el.connections.addConn(c, el.idx) - if c.isDatagram && c.peer != nil { + if c.isDatagram && c.remote != nil { return nil } return el.open(c) @@ -99,8 +105,8 @@ func (el *eventloop) open(c *conn) error { } } - if !c.outboundBuffer.IsEmpty() { - if err := el.poller.ModReadWrite(&c.pollAttachment); err != nil { + if !c.outboundBuffer.IsEmpty() && !el.engine.opts.EdgeTriggeredIO { + if err := el.poller.ModReadWrite(&c.pollAttachment, false); err != nil { return err } } @@ -109,13 +115,19 @@ func (el *eventloop) open(c *conn) error { } func (el *eventloop) read(c *conn) error { + if !c.opened { + return nil + } + + isET := el.engine.opts.EdgeTriggeredIO +loop: n, err := unix.Read(c.fd, el.buffer) if err != nil || n == 0 { if err == unix.EAGAIN { return nil } if n == 0 { - err = unix.ECONNRESET + err = io.EOF } return el.close(c, os.NewSyscallError("read", err)) } @@ -131,6 +143,11 @@ func (el *eventloop) read(c *conn) error { } _, _ = c.inboundBuffer.Write(c.buffer) c.buffer = c.buffer[:0] + + if isET || c.isEOF { + goto loop + } + return nil } @@ -138,16 +155,22 @@ func (el *eventloop) read(c *conn) error { const iovMax = 1024 func (el *eventloop) write(c *conn) error { - iov := c.outboundBuffer.Peek(-1) + if c.outboundBuffer.IsEmpty() { + return nil + } + + isET := el.engine.opts.EdgeTriggeredIO var ( n int err error ) +loop: + iov, _ := c.outboundBuffer.Peek(-1) if len(iov) > 1 { if len(iov) > iovMax { iov = iov[:iovMax] } - n, err = io.Writev(c.fd, iov) + n, err = gio.Writev(c.fd, iov) } else { n, err = unix.Write(c.fd, iov[0]) } @@ -159,11 +182,14 @@ func (el *eventloop) write(c *conn) error { default: return el.close(c, os.NewSyscallError("write", err)) } + if isET && !c.outboundBuffer.IsEmpty() { + goto loop + } - // All data have been drained, it's no need to monitor the writable events, - // remove the writable event from poller to help the future event-loops. - if c.outboundBuffer.IsEmpty() { - _ = el.poller.ModRead(&c.pollAttachment) + // All data have been sent, it's no need to monitor the writable events for LT mode, + // remove the writable event from poller to help the future event-loops if necessary. + if !isET && c.outboundBuffer.IsEmpty() { + _ = el.poller.ModRead(&c.pollAttachment, false) } return nil @@ -187,19 +213,17 @@ func (el *eventloop) close(c *conn, err error) (rerr error) { return // ignore stale connections } - // Send residual data in buffer back to the peer before actually closing the connection. - if !c.outboundBuffer.IsEmpty() { - for !c.outboundBuffer.IsEmpty() { - iov := c.outboundBuffer.Peek(0) - if len(iov) > iovMax { - iov = iov[:iovMax] - } - if n, e := io.Writev(c.fd, iov); e != nil { - el.getLogger().Warnf("close: error occurs when sending data back to peer, %v", e) - break - } else { //nolint:revive - _, _ = c.outboundBuffer.Discard(n) - } + // Send residual data in buffer back to the remote before actually closing the connection. + for !c.outboundBuffer.IsEmpty() { + iov, _ := c.outboundBuffer.Peek(0) + if len(iov) > iovMax { + iov = iov[:iovMax] + } + if n, e := gio.Writev(c.fd, iov); e != nil { + el.getLogger().Warnf("close: error occurs when sending data back to remote, %v", e) + break + } else { //nolint:revive + _, _ = c.outboundBuffer.Discard(n) } } @@ -303,7 +327,7 @@ func (el *eventloop) readUDP1(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) erro } c.buffer = el.buffer[:n] action := el.eventHandler.OnTraffic(c) - if c.peer != nil { + if c.remote != nil { c.release() } if action == Shutdown { diff --git a/gnet.go b/gnet.go index f77ee5793..d511ab5be 100644 --- a/gnet.go +++ b/gnet.go @@ -167,7 +167,7 @@ func (e Engine) Wake(fd gfd.GFD, cb AsyncCallback) error { // Reader is an interface that consists of a number of methods for reading that Conn must implement. // -// Note that the methods in this interface are not goroutine-safe for concurrent use, +// Note that the methods in this interface are not concurrency-safe for concurrent use, // you must invoke them within any method in EventHandler. type Reader interface { io.Reader @@ -175,8 +175,9 @@ type Reader interface { // Next returns a slice containing the next n bytes from the buffer, // advancing the buffer as if the bytes had been returned by Read. - // If there are fewer than n bytes in the buffer, Next returns the entire buffer. - // The error is ErrBufferFull if n is larger than b's buffer size. + // Calling this method has the same effect as calling Peek and Discard. + // If the amount of the available bytes is less than requested, a pair of (0, io.ErrShortBuffer) + // is returned. // // Note that the []byte buf returned by Next() is not allowed to be passed to a new goroutine, // as this []byte will be reused within event-loop. @@ -184,10 +185,9 @@ type Reader interface { // to that new goroutine. Next(n int) (buf []byte, err error) - // Peek returns the next n bytes without advancing the reader. The bytes stop - // being valid at the next read call. If Peek returns fewer than n bytes, it - // also returns an error explaining why the read is short. The error is - // ErrBufferFull if n is larger than b's buffer size. + // Peek returns the next n bytes without advancing the inbound buffer, the returned bytes + // remain valid until a Discard is called. If the amount of the available bytes is + // less than requested, a pair of (0, io.ErrShortBuffer) is returned. // // Note that the []byte buf returned by Peek() is not allowed to be passed to a new goroutine, // as this []byte will be reused within event-loop. @@ -195,11 +195,7 @@ type Reader interface { // to that new goroutine. Peek(n int) (buf []byte, err error) - // Discard skips the next n bytes, returning the number of bytes discarded. - // - // If Discard skips fewer than n bytes, it also returns an error. - // If 0 <= n <= b.Buffered(), Discard is guaranteed to succeed without - // reading from the underlying io.Reader. + // Discard advances the inbound buffer with next n bytes, returning the number of bytes discarded. Discard(n int) (discarded int, err error) // InboundBuffered returns the number of bytes that can be read from the current buffer. @@ -208,22 +204,22 @@ type Reader interface { // Writer is an interface that consists of a number of methods for writing that Conn must implement. type Writer interface { - io.Writer // not goroutine-safe - io.ReaderFrom // not goroutine-safe + io.Writer // not concurrency-safe + io.ReaderFrom // not concurrency-safe - // Writev writes multiple byte slices to peer synchronously, it's not goroutine-safe, + // Writev writes multiple byte slices to remote synchronously, it's not concurrency-safe, // you must invoke it within any method in EventHandler. Writev(bs [][]byte) (n int, err error) - // Flush writes any buffered data to the underlying connection, it's not goroutine-safe, + // Flush writes any buffered data to the underlying connection, it's not concurrency-safe, // you must invoke it within any method in EventHandler. Flush() (err error) // OutboundBuffered returns the number of bytes that can be read from the current buffer. - // it's not goroutine-safe, you must invoke it within any method in EventHandler. + // it's not concurrency-safe, you must invoke it within any method in EventHandler. OutboundBuffered() (n int) - // AsyncWrite writes bytes to peer asynchronously, it's goroutine-safe, + // AsyncWrite writes bytes to remote asynchronously, it's concurrency-safe, // you don't have to invoke it within any method in EventHandler, // usually you would call it in an individual goroutine. // @@ -234,7 +230,7 @@ type Writer interface { // just call Conn.Write to send back your data. AsyncWrite(buf []byte, callback AsyncCallback) (err error) - // AsyncWritev writes multiple byte slices to peer asynchronously, + // AsyncWritev writes multiple byte slices to remote asynchronously, // you don't have to invoke it within any method in EventHandler, // usually you would call it in an individual goroutine. AsyncWritev(bs [][]byte, callback AsyncCallback) (err error) @@ -247,7 +243,7 @@ type AsyncCallback func(c Conn, err error) error // Socket is a set of functions which manipulate the underlying file descriptor of a connection. // -// Note that the methods in this interface are goroutine-safe for concurrent use, +// Note that the methods in this interface are concurrency-safe for concurrent use, // you don't have to invoke them within any method in EventHandler. type Socket interface { // Gfd returns the gfd of socket. @@ -302,35 +298,35 @@ type Socket interface { // Conn is an interface of underlying connection. type Conn interface { - Reader // all methods in Reader are not goroutine-safe. - Writer // some methods in Writer are goroutine-safe, some are not. - Socket // all methods in Socket are goroutine-safe. + Reader // all methods in Reader are not concurrency-safe. + Writer // some methods in Writer are concurrency-safe, some are not. + Socket // all methods in Socket are concurrency-safe. - // Context returns a user-defined context, it's not goroutine-safe, + // Context returns a user-defined context, it's not concurrency-safe, // you must invoke it within any method in EventHandler. Context() (ctx interface{}) - // SetContext sets a user-defined context, it's not goroutine-safe, + // SetContext sets a user-defined context, it's not concurrency-safe, // you must invoke it within any method in EventHandler. SetContext(ctx interface{}) - // LocalAddr is the connection's local socket address, it's not goroutine-safe, + // LocalAddr is the connection's local socket address, it's not concurrency-safe, // you must invoke it within any method in EventHandler. LocalAddr() (addr net.Addr) - // RemoteAddr is the connection's remote peer address, it's not goroutine-safe, + // RemoteAddr is the connection's remote remote address, it's not concurrency-safe, // you must invoke it within any method in EventHandler. RemoteAddr() (addr net.Addr) - // Wake triggers a OnTraffic event for the current connection, it's goroutine-safe. + // Wake triggers a OnTraffic event for the current connection, it's concurrency-safe. Wake(callback AsyncCallback) (err error) - // CloseWithCallback closes the current connection, it's goroutine-safe. + // CloseWithCallback closes the current connection, it's concurrency-safe. // Usually you should provide a non-nil callback for this method, // otherwise your better choice is Close(). CloseWithCallback(callback AsyncCallback) (err error) - // Close closes the current connection, implements net.Conn, it's goroutine-safe. + // Close closes the current connection, implements net.Conn, it's concurrency-safe. Close() (err error) // SetDeadline implements net.Conn. @@ -359,15 +355,15 @@ type ( // OnOpen fires when a new connection has been opened. // // The Conn c has information about the connection such as its local and remote addresses. - // The parameter out is the return value which is going to be sent back to the peer. - // Sending large amounts of data back to the peer in OnOpen is usually not recommended. + // The parameter out is the return value which is going to be sent back to the remote. + // Sending large amounts of data back to the remote in OnOpen is usually not recommended. OnOpen(c Conn) (out []byte, action Action) // OnClose fires when a connection has been closed. // The parameter err is the last known connection error. OnClose(c Conn, err error) (action Action) - // OnTraffic fires when a socket receives data from the peer. + // OnTraffic fires when a socket receives data from the remote. // // Note that the []byte returned from Conn.Peek(int)/Conn.Next(int) is not allowed to be passed to a new goroutine, // as this []byte will be reused within event-loop after OnTraffic() returns. @@ -398,7 +394,7 @@ func (*BuiltinEventEngine) OnShutdown(_ Engine) { } // OnOpen fires when a new connection has been opened. -// The parameter out is the return value which is going to be sent back to the peer. +// The parameter out is the return value which is going to be sent back to the remote. func (*BuiltinEventEngine) OnOpen(_ Conn) (out []byte, action Action) { return } @@ -409,7 +405,7 @@ func (*BuiltinEventEngine) OnClose(_ Conn, _ error) (action Action) { return } -// OnTraffic fires when a local socket receives data from the peer. +// OnTraffic fires when a local socket receives data from the remote. func (*BuiltinEventEngine) OnTraffic(_ Conn) (action Action) { return } @@ -492,6 +488,10 @@ func Run(eventHandler EventHandler, protoAddr string, opts ...Option) (err error } defer ln.close() + if ln.network == "udp" { + options.EdgeTriggeredIO = false + } + return run(eventHandler, ln, options, protoAddr) } diff --git a/gnet_test.go b/gnet_test.go index e6150023d..8d0415079 100644 --- a/gnet_test.go +++ b/gnet_test.go @@ -30,194 +30,277 @@ var ( streamLen = 1024 * 1024 ) -func TestServe(t *testing.T) { +func TestServer(t *testing.T) { // start an engine // connect 10 clients // each client will pipe random data for 1-3 seconds. // the writes to the engine will be random sizes. 0KB - 1MB. // the engine will echo back the data. // waits for graceful connection closing. - t.Run("poll", func(t *testing.T) { + t.Run("poll-LT", func(t *testing.T) { t.Run("tcp", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServe(t, "tcp", ":9991", false, false, false, false, false, 10, RoundRobin) + runServer(t, "tcp", ":9991", false, false, false, false, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServe(t, "tcp", ":9992", false, false, true, false, false, 10, LeastConnections) + runServer(t, "tcp", ":9992", false, false, true, false, false, 10, LeastConnections) }) }) t.Run("tcp-async", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServe(t, "tcp", ":9991", false, false, false, true, false, 10, RoundRobin) + runServer(t, "tcp", ":9991", false, false, false, true, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServe(t, "tcp", ":9992", false, false, true, true, false, 10, LeastConnections) + runServer(t, "tcp", ":9992", false, false, true, true, false, 10, LeastConnections) }) }) t.Run("tcp-async-writev", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServe(t, "tcp", ":9991", false, false, false, true, true, 10, RoundRobin) + runServer(t, "tcp", ":9991", false, false, false, true, true, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServe(t, "tcp", ":9992", false, false, true, true, true, 10, LeastConnections) + runServer(t, "tcp", ":9992", false, false, true, true, true, 10, LeastConnections) }) }) t.Run("udp", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServe(t, "udp", ":9991", false, false, false, false, false, 10, RoundRobin) + runServer(t, "udp", ":9991", false, false, false, false, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServe(t, "udp", ":9992", false, false, true, false, false, 10, LeastConnections) + runServer(t, "udp", ":9992", false, false, true, false, false, 10, LeastConnections) }) }) t.Run("udp-async", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServe(t, "udp", ":9991", false, false, false, true, false, 10, RoundRobin) + runServer(t, "udp", ":9991", false, false, false, true, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServe(t, "udp", ":9992", false, false, true, true, false, 10, LeastConnections) + runServer(t, "udp", ":9992", false, false, true, true, false, 10, LeastConnections) }) }) t.Run("unix", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServe(t, "unix", "gnet1.sock", false, false, false, false, false, 10, RoundRobin) + runServer(t, "unix", "gnet1.sock", false, false, false, false, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServe(t, "unix", "gnet2.sock", false, false, true, false, false, 10, SourceAddrHash) + runServer(t, "unix", "gnet2.sock", false, false, true, false, false, 10, SourceAddrHash) }) }) t.Run("unix-async", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServe(t, "unix", "gnet1.sock", false, false, false, true, false, 10, RoundRobin) + runServer(t, "unix", "gnet1.sock", false, false, false, true, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServe(t, "unix", "gnet2.sock", false, false, true, true, false, 10, SourceAddrHash) + runServer(t, "unix", "gnet2.sock", false, false, true, true, false, 10, SourceAddrHash) }) }) t.Run("unix-async-writev", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServe(t, "unix", "gnet1.sock", false, false, false, true, true, 10, RoundRobin) + runServer(t, "unix", "gnet1.sock", false, false, false, true, true, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServe(t, "unix", "gnet2.sock", false, false, true, true, true, 10, SourceAddrHash) + runServer(t, "unix", "gnet2.sock", false, false, true, true, true, 10, SourceAddrHash) }) }) }) - t.Run("poll-reuseport", func(t *testing.T) { + t.Run("poll-ET", func(t *testing.T) { t.Run("tcp", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServe(t, "tcp", ":9991", true, false, false, false, false, 10, RoundRobin) + runServer(t, "tcp", ":9991", true, false, false, false, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServe(t, "tcp", ":9992", true, false, true, false, false, 10, LeastConnections) + runServer(t, "tcp", ":9992", true, false, true, false, false, 10, LeastConnections) }) }) t.Run("tcp-async", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServe(t, "tcp", ":9991", true, false, false, true, false, 10, RoundRobin) + runServer(t, "tcp", ":9991", true, false, false, true, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServe(t, "tcp", ":9992", true, false, true, true, false, 10, LeastConnections) + runServer(t, "tcp", ":9992", true, false, true, true, false, 10, LeastConnections) }) }) t.Run("tcp-async-writev", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServe(t, "tcp", ":9991", true, false, false, true, true, 10, RoundRobin) + runServer(t, "tcp", ":9991", true, false, false, true, true, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServe(t, "tcp", ":9992", true, false, true, true, true, 10, LeastConnections) + runServer(t, "tcp", ":9992", true, false, true, true, true, 10, LeastConnections) }) }) t.Run("udp", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServe(t, "udp", ":9991", true, false, false, false, false, 10, RoundRobin) + runServer(t, "udp", ":9991", true, false, false, false, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServe(t, "udp", ":9992", true, false, true, false, false, 10, LeastConnections) + runServer(t, "udp", ":9992", true, false, true, false, false, 10, LeastConnections) }) }) t.Run("udp-async", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServe(t, "udp", ":9991", true, false, false, true, false, 10, RoundRobin) + runServer(t, "udp", ":9991", true, false, false, true, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServe(t, "udp", ":9992", true, false, true, true, false, 10, LeastConnections) + runServer(t, "udp", ":9992", true, false, true, true, false, 10, LeastConnections) }) }) t.Run("unix", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServe(t, "unix", "gnet1.sock", true, false, false, false, false, 10, RoundRobin) + runServer(t, "unix", "gnet1.sock", true, false, false, false, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServe(t, "unix", "gnet2.sock", true, false, true, false, false, 10, LeastConnections) + runServer(t, "unix", "gnet2.sock", true, false, true, false, false, 10, SourceAddrHash) }) }) t.Run("unix-async", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServe(t, "unix", "gnet1.sock", true, false, false, true, false, 10, RoundRobin) + runServer(t, "unix", "gnet1.sock", true, false, false, true, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServe(t, "unix", "gnet2.sock", true, false, true, true, false, 10, LeastConnections) + runServer(t, "unix", "gnet2.sock", true, false, true, true, false, 10, SourceAddrHash) }) }) t.Run("unix-async-writev", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServe(t, "unix", "gnet1.sock", true, false, false, true, true, 10, RoundRobin) + runServer(t, "unix", "gnet1.sock", true, false, false, true, true, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServe(t, "unix", "gnet2.sock", true, false, true, true, true, 10, LeastConnections) + runServer(t, "unix", "gnet2.sock", true, false, true, true, true, 10, SourceAddrHash) }) }) }) - t.Run("poll-reuseaddr", func(t *testing.T) { + t.Run("poll-LT-reuseport", func(t *testing.T) { t.Run("tcp", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServe(t, "tcp", ":9991", false, true, false, false, false, 10, RoundRobin) + runServer(t, "tcp", ":9991", false, true, false, false, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServe(t, "tcp", ":9992", false, true, true, false, false, 10, LeastConnections) + runServer(t, "tcp", ":9992", false, true, true, false, false, 10, LeastConnections) }) }) t.Run("tcp-async", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServe(t, "tcp", ":9991", false, true, false, true, false, 10, RoundRobin) + runServer(t, "tcp", ":9991", false, true, false, true, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServe(t, "tcp", ":9992", false, true, true, false, false, 10, LeastConnections) + runServer(t, "tcp", ":9992", false, true, true, true, false, 10, LeastConnections) + }) + }) + t.Run("tcp-async-writev", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runServer(t, "tcp", ":9991", false, true, false, true, true, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runServer(t, "tcp", ":9992", false, true, true, true, true, 10, LeastConnections) + }) + }) + t.Run("udp", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runServer(t, "udp", ":9991", false, true, false, false, false, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runServer(t, "udp", ":9992", false, true, true, false, false, 10, LeastConnections) + }) + }) + t.Run("udp-async", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runServer(t, "udp", ":9991", false, true, false, true, false, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runServer(t, "udp", ":9992", false, true, true, true, false, 10, LeastConnections) + }) + }) + t.Run("unix", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runServer(t, "unix", "gnet1.sock", false, true, false, false, false, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runServer(t, "unix", "gnet2.sock", false, true, true, false, false, 10, LeastConnections) + }) + }) + t.Run("unix-async", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runServer(t, "unix", "gnet1.sock", false, true, false, true, false, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runServer(t, "unix", "gnet2.sock", false, true, true, true, false, 10, LeastConnections) + }) + }) + t.Run("unix-async-writev", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runServer(t, "unix", "gnet1.sock", false, true, false, true, true, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runServer(t, "unix", "gnet2.sock", false, true, true, true, true, 10, LeastConnections) + }) + }) + }) + + t.Run("poll-ET-reuseport", func(t *testing.T) { + t.Run("tcp", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runServer(t, "tcp", ":9991", true, true, false, false, false, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runServer(t, "tcp", ":9992", true, true, true, false, false, 10, LeastConnections) + }) + }) + t.Run("tcp-async", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runServer(t, "tcp", ":9991", true, true, false, true, false, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runServer(t, "tcp", ":9992", true, true, true, true, false, 10, LeastConnections) + }) + }) + t.Run("tcp-async-writev", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runServer(t, "tcp", ":9991", true, true, false, true, true, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runServer(t, "tcp", ":9992", true, true, true, true, true, 10, LeastConnections) }) }) t.Run("udp", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServe(t, "udp", ":9991", false, true, false, false, false, 10, RoundRobin) + runServer(t, "udp", ":9991", true, true, false, false, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServe(t, "udp", ":9992", false, true, true, false, false, 10, LeastConnections) + runServer(t, "udp", ":9992", true, true, true, false, false, 10, LeastConnections) }) }) t.Run("udp-async", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServe(t, "udp", ":9991", false, true, false, false, false, 10, RoundRobin) + runServer(t, "udp", ":9991", true, true, false, true, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServe(t, "udp", ":9992", false, true, true, true, false, 10, LeastConnections) + runServer(t, "udp", ":9992", true, true, true, true, false, 10, LeastConnections) }) }) t.Run("unix", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServe(t, "unix", "gnet1.sock", false, true, false, false, false, 10, RoundRobin) + runServer(t, "unix", "gnet1.sock", true, true, false, false, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServe(t, "unix", "gnet2.sock", false, true, true, false, false, 10, LeastConnections) + runServer(t, "unix", "gnet2.sock", true, true, true, false, false, 10, LeastConnections) }) }) t.Run("unix-async", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - testServe(t, "unix", "gnet1.sock", false, true, false, true, false, 10, RoundRobin) + runServer(t, "unix", "gnet1.sock", true, true, false, true, false, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runServer(t, "unix", "gnet2.sock", true, true, true, true, false, 10, LeastConnections) + }) + }) + t.Run("unix-async-writev", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runServer(t, "unix", "gnet1.sock", true, true, false, true, true, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - testServe(t, "unix", "gnet2.sock", false, true, true, true, false, 10, LeastConnections) + runServer(t, "unix", "gnet2.sock", true, true, true, true, true, 10, LeastConnections) }) }) }) @@ -377,7 +460,7 @@ func (s *testServer) OnTick() (delay time.Duration, action Action) { return } -func testServe(t *testing.T, network, addr string, reuseport, reuseaddr, multicore, async, writev bool, nclients int, lb LoadBalancing) { +func runServer(t *testing.T, network, addr string, et, reuseport, multicore, async, writev bool, nclients int, lb LoadBalancing) { ts := &testServer{ tester: t, network: network, @@ -390,10 +473,10 @@ func testServe(t *testing.T, network, addr string, reuseport, reuseaddr, multico } err := Run(ts, network+"://"+addr, + WithEdgeTriggeredIO(et), WithLockOSThread(async), WithMulticore(multicore), WithReusePort(reuseport), - WithReuseAddr(reuseaddr), WithTicker(true), WithTCPKeepAlive(time.Minute*1), WithTCPNoDelay(TCPDelay), @@ -1248,7 +1331,7 @@ func (s *simServer) OnTick() (delay time.Duration, action Action) { if atomic.CompareAndSwapInt32(&s.started, 0, 1) { for i := 0; i < s.nclients; i++ { go func() { - runClient(s.tester, s.network, s.addr, s.packetSize, s.packetBatch) + runSimClient(s.tester, s.network, s.addr, s.packetSize, s.packetBatch) }() } } @@ -1339,32 +1422,32 @@ func (codec testCodec) Unpack(buf []byte) ([]byte, error) { func TestSimServer(t *testing.T) { t.Run("packet-size=128,batch=100", func(t *testing.T) { - testSimServer(t, ":7200", 10, 128, 100) + runSimServer(t, ":7200", false, 10, 128, 100) }) t.Run("packet-size=256,batch=50", func(t *testing.T) { - testSimServer(t, ":7201", 10, 256, 50) + runSimServer(t, ":7201", true, 10, 256, 50) }) t.Run("packet-size=512,batch=30", func(t *testing.T) { - testSimServer(t, ":7202", 10, 512, 30) + runSimServer(t, ":7202", false, 10, 512, 30) }) t.Run("packet-size=1024,batch=20", func(t *testing.T) { - testSimServer(t, ":7203", 10, 1024, 20) + runSimServer(t, ":7203", true, 10, 1024, 20) }) t.Run("packet-size=64*1024,batch=10", func(t *testing.T) { - testSimServer(t, ":7204", 10, 64*1024, 10) + runSimServer(t, ":7204", false, 10, 64*1024, 10) }) t.Run("packet-size=128*1024,batch=5", func(t *testing.T) { - testSimServer(t, ":7205", 10, 128*1024, 5) + runSimServer(t, ":7205", true, 10, 128*1024, 5) }) t.Run("packet-size=512*1024,batch=3", func(t *testing.T) { - testSimServer(t, ":7206", 10, 512*1024, 3) + runSimServer(t, ":7206", false, 10, 512*1024, 3) }) t.Run("packet-size=1024*1024,batch=2", func(t *testing.T) { - testSimServer(t, ":7207", 10, 1024*1024, 2) + runSimServer(t, ":7207", true, 10, 1024*1024, 2) }) } -func testSimServer(t *testing.T, addr string, nclients, packetSize, packetBatch int) { +func runSimServer(t *testing.T, addr string, et bool, nclients, packetSize, packetBatch int) { ts := &simServer{ tester: t, network: "tcp", @@ -1376,13 +1459,14 @@ func testSimServer(t *testing.T, addr string, nclients, packetSize, packetBatch } err := Run(ts, ts.network+"://"+ts.addr, + WithEdgeTriggeredIO(et), WithMulticore(ts.multicore), WithTicker(true), WithTCPKeepAlive(time.Minute*1)) assert.NoError(t, err) } -func runClient(t *testing.T, network, addr string, packetSize, batch int) { +func runSimClient(t *testing.T, network, addr string, packetSize, batch int) { rand.Seed(time.Now().UnixNano()) c, err := net.Dial(network, addr) require.NoError(t, err) diff --git a/internal/netpoll/poll_data_unix.go b/internal/netpoll/defs_poller.go similarity index 100% rename from internal/netpoll/poll_data_unix.go rename to internal/netpoll/defs_poller.go diff --git a/internal/netpoll/epoll_events.go b/internal/netpoll/defs_poller_epoll.go similarity index 89% rename from internal/netpoll/epoll_events.go rename to internal/netpoll/defs_poller_epoll.go index a9a5a3c7b..d67f36d6f 100644 --- a/internal/netpoll/epoll_events.go +++ b/internal/netpoll/defs_poller_epoll.go @@ -34,12 +34,11 @@ const ( // ErrEvents represents exceptional events that are not read/write, like socket being closed, // reading/writing from/to a closed socket, etc. ErrEvents = unix.EPOLLERR | unix.EPOLLHUP | unix.EPOLLRDHUP - // OutEvents combines EPOLLOUT event and some exceptional events. - OutEvents = ErrEvents | unix.EPOLLOUT - // InEvents combines EPOLLIN/EPOLLPRI events and some exceptional events. - InEvents = ErrEvents | unix.EPOLLIN | unix.EPOLLPRI ) +// PollEventHandler is the callback for I/O events notified by the poller. +type PollEventHandler func(int, uint32) error + type eventList struct { size int events []epollevent diff --git a/internal/netpoll/kqueue_events.go b/internal/netpoll/defs_poller_kqueue.go similarity index 83% rename from internal/netpoll/kqueue_events.go rename to internal/netpoll/defs_poller_kqueue.go index ba55cd7fd..0b2e883b8 100644 --- a/internal/netpoll/kqueue_events.go +++ b/internal/netpoll/defs_poller_kqueue.go @@ -31,16 +31,11 @@ const ( MinPollEventsCap = 16 // MaxAsyncTasksAtOneTime is the maximum amount of asynchronous tasks that the event-loop will process at one time. MaxAsyncTasksAtOneTime = 128 - // EVFilterWrite represents writeable events from sockets. - EVFilterWrite = unix.EVFILT_WRITE - // EVFilterRead represents readable events from sockets. - EVFilterRead = unix.EVFILT_READ - // EVFlagsDelete indicates an event has been removed from the kqueue. - EVFlagsDelete = unix.EV_DELETE - // EVFlagsEOF indicates filter-specific EOF condition. - EVFlagsEOF = unix.EV_EOF ) +// PollEventHandler is the callback for I/O events notified by the poller. +type PollEventHandler func(int, int16, uint16) error + type eventList struct { size int events []unix.Kevent_t diff --git a/internal/netpoll/poll_data_bsd.go b/internal/netpoll/poll_data_bsd.go deleted file mode 100644 index 9b605568b..000000000 --- a/internal/netpoll/poll_data_bsd.go +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright (c) 2021 The Gnet Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build freebsd || dragonfly || netbsd || openbsd || darwin -// +build freebsd dragonfly netbsd openbsd darwin - -package netpoll - -// PollEventHandler is the callback for I/O events notified by the poller. -type PollEventHandler func(int, int16, uint16) error diff --git a/internal/netpoll/poll_data_linux.go b/internal/netpoll/poll_data_linux.go deleted file mode 100644 index e9571188e..000000000 --- a/internal/netpoll/poll_data_linux.go +++ /dev/null @@ -1,18 +0,0 @@ -// Copyright (c) 2021 The Gnet Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package netpoll - -// PollEventHandler is the callback for I/O events notified by the poller. -type PollEventHandler func(int, uint32) error diff --git a/internal/netpoll/epoll_default_poller.go b/internal/netpoll/poller_epoll_default.go similarity index 81% rename from internal/netpoll/epoll_default_poller.go rename to internal/netpoll/poller_epoll_default.go index b8e686192..c21765772 100644 --- a/internal/netpoll/epoll_default_poller.go +++ b/internal/netpoll/poller_epoll_default.go @@ -57,7 +57,7 @@ func OpenPoller() (poller *Poller, err error) { return } poller.efdBuf = make([]byte, 8) - if err = poller.AddRead(&PollAttachment{FD: poller.efd}); err != nil { + if err = poller.AddRead(&PollAttachment{FD: poller.efd}, true); err != nil { _ = poller.Close() poller = nil return @@ -100,8 +100,13 @@ func (p *Poller) Trigger(priority queue.EventPriority, fn queue.TaskFunc, arg in p.urgentAsyncTaskQueue.Enqueue(task) } if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) { - if _, err = unix.Write(p.efd, b); err == unix.EAGAIN { - err = nil + for { + _, err = unix.Write(p.efd, b) + if err == unix.EAGAIN { + _, _ = unix.Read(p.efd, p.efdBuf) + continue + } + break } } return os.NewSyscallError("write", err) @@ -129,7 +134,6 @@ func (p *Poller) Polling(callback PollEventHandler) error { ev := &el.events[i] if fd := int(ev.Fd); fd == p.efd { // poller is awakened to run tasks in queues. doChores = true - _, _ = unix.Read(p.efd, p.efdBuf) } else { switch err = callback(fd, ev.Events); err { case nil: @@ -169,10 +173,16 @@ func (p *Poller) Polling(callback PollEventHandler) error { } atomic.StoreInt32(&p.wakeupCall, 0) if (!p.asyncTaskQueue.IsEmpty() || !p.urgentAsyncTaskQueue.IsEmpty()) && atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) { - switch _, err = unix.Write(p.efd, b); err { - case nil, unix.EAGAIN: - default: - doChores = true + for { + _, err = unix.Write(p.efd, b) + if err == unix.EAGAIN { + _, _ = unix.Read(p.efd, p.efdBuf) + continue + } + if err != nil { + logging.Errorf("failed to notify next round of event-loop for leftover tasks, %v", os.NewSyscallError("write", err)) + } + break } } } @@ -186,39 +196,59 @@ func (p *Poller) Polling(callback PollEventHandler) error { } const ( - readEvents = unix.EPOLLPRI | unix.EPOLLIN - writeEvents = unix.EPOLLOUT + readEvents = unix.EPOLLIN | unix.EPOLLPRI | unix.EPOLLRDHUP + writeEvents = unix.EPOLLOUT | unix.EPOLLRDHUP readWriteEvents = readEvents | writeEvents ) // AddReadWrite registers the given file-descriptor with readable and writable events to the poller. -func (p *Poller) AddReadWrite(pa *PollAttachment) error { +func (p *Poller) AddReadWrite(pa *PollAttachment, edgeTriggered bool) error { + var ev uint32 = readWriteEvents + if edgeTriggered { + ev |= unix.EPOLLET + } return os.NewSyscallError("epoll_ctl add", - unix.EpollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: readWriteEvents})) + unix.EpollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev})) } // AddRead registers the given file-descriptor with readable event to the poller. -func (p *Poller) AddRead(pa *PollAttachment) error { +func (p *Poller) AddRead(pa *PollAttachment, edgeTriggered bool) error { + var ev uint32 = readEvents + if edgeTriggered { + ev |= unix.EPOLLET + } return os.NewSyscallError("epoll_ctl add", - unix.EpollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: readEvents})) + unix.EpollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev})) } // AddWrite registers the given file-descriptor with writable event to the poller. -func (p *Poller) AddWrite(pa *PollAttachment) error { +func (p *Poller) AddWrite(pa *PollAttachment, edgeTriggered bool) error { + var ev uint32 = writeEvents + if edgeTriggered { + ev |= unix.EPOLLET + } return os.NewSyscallError("epoll_ctl add", - unix.EpollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: writeEvents})) + unix.EpollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev})) } // ModRead renews the given file-descriptor with readable event in the poller. -func (p *Poller) ModRead(pa *PollAttachment) error { +func (p *Poller) ModRead(pa *PollAttachment, edgeTriggered bool) error { + var ev uint32 = readEvents + if edgeTriggered { + ev |= unix.EPOLLET + } return os.NewSyscallError("epoll_ctl mod", - unix.EpollCtl(p.fd, unix.EPOLL_CTL_MOD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: readEvents})) + unix.EpollCtl(p.fd, unix.EPOLL_CTL_MOD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev})) } // ModReadWrite renews the given file-descriptor with readable and writable events in the poller. -func (p *Poller) ModReadWrite(pa *PollAttachment) error { +func (p *Poller) ModReadWrite(pa *PollAttachment, edgeTriggered bool) error { + var ev uint32 = readWriteEvents + if edgeTriggered { + ev |= unix.EPOLLET + } return os.NewSyscallError("epoll_ctl mod", - unix.EpollCtl(p.fd, unix.EPOLL_CTL_MOD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: readWriteEvents})) + unix.EpollCtl(p.fd, unix.EPOLL_CTL_MOD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev})) } // Delete removes the given file-descriptor from the poller. diff --git a/internal/netpoll/epoll_optimized_poller.go b/internal/netpoll/poller_epoll_ultimate.go similarity index 85% rename from internal/netpoll/epoll_optimized_poller.go rename to internal/netpoll/poller_epoll_ultimate.go index 2c5db7353..479415366 100644 --- a/internal/netpoll/epoll_optimized_poller.go +++ b/internal/netpoll/poller_epoll_ultimate.go @@ -58,7 +58,7 @@ func OpenPoller() (poller *Poller, err error) { } poller.efdBuf = make([]byte, 8) poller.epa = &PollAttachment{FD: efd} - if err = poller.AddRead(poller.epa); err != nil { + if err = poller.AddRead(poller.epa, true); err != nil { _ = poller.Close() poller = nil return @@ -101,8 +101,13 @@ func (p *Poller) Trigger(priority queue.EventPriority, fn queue.TaskFunc, arg in p.urgentAsyncTaskQueue.Enqueue(task) } if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) { - if _, err = unix.Write(p.epa.FD, b); err == unix.EAGAIN { - err = nil + for { + _, err = unix.Write(p.epa.FD, b) + if err == unix.EAGAIN { + _, _ = unix.Read(p.epa.FD, p.efdBuf) + continue + } + break } } return os.NewSyscallError("write", err) @@ -131,7 +136,6 @@ func (p *Poller) Polling() error { pollAttachment := *(**PollAttachment)(unsafe.Pointer(&ev.data)) if pollAttachment.FD == p.epa.FD { // poller is awakened to run tasks in queues. doChores = true - _, _ = unix.Read(p.epa.FD, p.efdBuf) } else { switch err = pollAttachment.Callback(pollAttachment.FD, ev.events); err { case nil: @@ -171,10 +175,16 @@ func (p *Poller) Polling() error { } atomic.StoreInt32(&p.wakeupCall, 0) if (!p.asyncTaskQueue.IsEmpty() || !p.urgentAsyncTaskQueue.IsEmpty()) && atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) { - switch _, err = unix.Write(p.epa.FD, b); err { - case nil, unix.EAGAIN: - default: - doChores = true + for { + _, err = unix.Write(p.epa.FD, b) + if err == unix.EAGAIN { + _, _ = unix.Read(p.epa.FD, p.efdBuf) + continue + } + if err != nil { + logging.Errorf("failed to notify next round of event-loop for leftover tasks, %v", os.NewSyscallError("write", err)) + } + break } } } @@ -188,47 +198,62 @@ func (p *Poller) Polling() error { } const ( - readEvents = unix.EPOLLPRI | unix.EPOLLIN - writeEvents = unix.EPOLLOUT + readEvents = unix.EPOLLIN | unix.EPOLLPRI | unix.EPOLLRDHUP + writeEvents = unix.EPOLLOUT | unix.EPOLLRDHUP readWriteEvents = readEvents | writeEvents ) // AddReadWrite registers the given file-descriptor with readable and writable events to the poller. -func (p *Poller) AddReadWrite(pa *PollAttachment) error { +func (p *Poller) AddReadWrite(pa *PollAttachment, edgeTriggered bool) error { var ev epollevent ev.events = readWriteEvents + if edgeTriggered { + ev.events |= unix.EPOLLET + } *(**PollAttachment)(unsafe.Pointer(&ev.data)) = pa return os.NewSyscallError("epoll_ctl add", epollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &ev)) } // AddRead registers the given file-descriptor with readable event to the poller. -func (p *Poller) AddRead(pa *PollAttachment) error { +func (p *Poller) AddRead(pa *PollAttachment, edgeTriggered bool) error { var ev epollevent ev.events = readEvents + if edgeTriggered { + ev.events |= unix.EPOLLET + } *(**PollAttachment)(unsafe.Pointer(&ev.data)) = pa return os.NewSyscallError("epoll_ctl add", epollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &ev)) } // AddWrite registers the given file-descriptor with writable event to the poller. -func (p *Poller) AddWrite(pa *PollAttachment) error { +func (p *Poller) AddWrite(pa *PollAttachment, edgeTriggered bool) error { var ev epollevent ev.events = writeEvents + if edgeTriggered { + ev.events |= unix.EPOLLET + } *(**PollAttachment)(unsafe.Pointer(&ev.data)) = pa return os.NewSyscallError("epoll_ctl add", epollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &ev)) } // ModRead renews the given file-descriptor with readable event in the poller. -func (p *Poller) ModRead(pa *PollAttachment) error { +func (p *Poller) ModRead(pa *PollAttachment, edgeTriggered bool) error { var ev epollevent ev.events = readEvents + if edgeTriggered { + ev.events |= unix.EPOLLET + } *(**PollAttachment)(unsafe.Pointer(&ev.data)) = pa return os.NewSyscallError("epoll_ctl mod", epollCtl(p.fd, unix.EPOLL_CTL_MOD, pa.FD, &ev)) } // ModReadWrite renews the given file-descriptor with readable and writable events in the poller. -func (p *Poller) ModReadWrite(pa *PollAttachment) error { +func (p *Poller) ModReadWrite(pa *PollAttachment, edgeTriggered bool) error { var ev epollevent ev.events = readWriteEvents + if edgeTriggered { + ev.events |= unix.EPOLLET + } *(**PollAttachment)(unsafe.Pointer(&ev.data)) = pa return os.NewSyscallError("epoll_ctl mod", epollCtl(p.fd, unix.EPOLL_CTL_MOD, pa.FD, &ev)) } diff --git a/internal/netpoll/kqueue_default_poller.go b/internal/netpoll/poller_kqueue_default.go similarity index 86% rename from internal/netpoll/kqueue_default_poller.go rename to internal/netpoll/poller_kqueue_default.go index 12f29b443..4a04c3d80 100644 --- a/internal/netpoll/kqueue_default_poller.go +++ b/internal/netpoll/poller_kqueue_default.go @@ -179,32 +179,44 @@ func (p *Poller) Polling(callback PollEventHandler) error { } // AddReadWrite registers the given file-descriptor with readable and writable events to the poller. -func (p *Poller) AddReadWrite(pa *PollAttachment) error { +func (p *Poller) AddReadWrite(pa *PollAttachment, edgeTriggered bool) error { + var flags uint16 = unix.EV_ADD + if edgeTriggered { + flags |= unix.EV_CLEAR + } _, err := unix.Kevent(p.fd, []unix.Kevent_t{ - {Ident: keventIdent(pa.FD), Flags: unix.EV_ADD, Filter: unix.EVFILT_READ}, - {Ident: keventIdent(pa.FD), Flags: unix.EV_ADD, Filter: unix.EVFILT_WRITE}, + {Ident: keventIdent(pa.FD), Flags: flags, Filter: unix.EVFILT_READ}, + {Ident: keventIdent(pa.FD), Flags: flags, Filter: unix.EVFILT_WRITE}, }, nil, nil) return os.NewSyscallError("kevent add", err) } // AddRead registers the given file-descriptor with readable event to the poller. -func (p *Poller) AddRead(pa *PollAttachment) error { +func (p *Poller) AddRead(pa *PollAttachment, edgeTriggered bool) error { + var flags uint16 = unix.EV_ADD + if edgeTriggered { + flags |= unix.EV_CLEAR + } _, err := unix.Kevent(p.fd, []unix.Kevent_t{ - {Ident: keventIdent(pa.FD), Flags: unix.EV_ADD, Filter: unix.EVFILT_READ}, + {Ident: keventIdent(pa.FD), Flags: flags, Filter: unix.EVFILT_READ}, }, nil, nil) return os.NewSyscallError("kevent add", err) } // AddWrite registers the given file-descriptor with writable event to the poller. -func (p *Poller) AddWrite(pa *PollAttachment) error { +func (p *Poller) AddWrite(pa *PollAttachment, edgeTriggered bool) error { + var flags uint16 = unix.EV_ADD + if edgeTriggered { + flags |= unix.EV_CLEAR + } _, err := unix.Kevent(p.fd, []unix.Kevent_t{ - {Ident: keventIdent(pa.FD), Flags: unix.EV_ADD, Filter: unix.EVFILT_WRITE}, + {Ident: keventIdent(pa.FD), Flags: flags, Filter: unix.EVFILT_WRITE}, }, nil, nil) return os.NewSyscallError("kevent add", err) } // ModRead renews the given file-descriptor with readable event in the poller. -func (p *Poller) ModRead(pa *PollAttachment) error { +func (p *Poller) ModRead(pa *PollAttachment, _ bool) error { _, err := unix.Kevent(p.fd, []unix.Kevent_t{ {Ident: keventIdent(pa.FD), Flags: unix.EV_DELETE, Filter: unix.EVFILT_WRITE}, }, nil, nil) @@ -212,9 +224,13 @@ func (p *Poller) ModRead(pa *PollAttachment) error { } // ModReadWrite renews the given file-descriptor with readable and writable events in the poller. -func (p *Poller) ModReadWrite(pa *PollAttachment) error { +func (p *Poller) ModReadWrite(pa *PollAttachment, edgeTriggered bool) error { + var flags uint16 = unix.EV_ADD + if edgeTriggered { + flags |= unix.EV_CLEAR + } _, err := unix.Kevent(p.fd, []unix.Kevent_t{ - {Ident: keventIdent(pa.FD), Flags: unix.EV_ADD, Filter: unix.EVFILT_WRITE}, + {Ident: keventIdent(pa.FD), Flags: flags, Filter: unix.EVFILT_WRITE}, }, nil, nil) return os.NewSyscallError("kevent add", err) } diff --git a/internal/netpoll/kqueue_optimized_poller.go b/internal/netpoll/poller_kqueue_ultimate.go similarity index 92% rename from internal/netpoll/kqueue_optimized_poller.go rename to internal/netpoll/poller_kqueue_ultimate.go index 1b5b69e7b..397dad052 100644 --- a/internal/netpoll/kqueue_optimized_poller.go +++ b/internal/netpoll/poller_kqueue_ultimate.go @@ -181,10 +181,13 @@ func (p *Poller) Polling() error { } // AddReadWrite registers the given file-descriptor with readable and writable events to the poller. -func (p *Poller) AddReadWrite(pa *PollAttachment) error { +func (p *Poller) AddReadWrite(pa *PollAttachment, edgeTriggered bool) error { var evs [2]unix.Kevent_t evs[0].Ident = keventIdent(pa.FD) evs[0].Flags = unix.EV_ADD + if edgeTriggered { + evs[0].Flags |= unix.EV_CLEAR + } evs[0].Filter = unix.EVFILT_READ evs[0].Udata = (*byte)(unsafe.Pointer(pa)) evs[1] = evs[0] @@ -194,10 +197,13 @@ func (p *Poller) AddReadWrite(pa *PollAttachment) error { } // AddRead registers the given file-descriptor with readable event to the poller. -func (p *Poller) AddRead(pa *PollAttachment) error { +func (p *Poller) AddRead(pa *PollAttachment, edgeTriggered bool) error { var evs [1]unix.Kevent_t evs[0].Ident = keventIdent(pa.FD) evs[0].Flags = unix.EV_ADD + if edgeTriggered { + evs[0].Flags |= unix.EV_CLEAR + } evs[0].Filter = unix.EVFILT_READ evs[0].Udata = (*byte)(unsafe.Pointer(pa)) _, err := unix.Kevent(p.fd, evs[:], nil, nil) @@ -205,10 +211,13 @@ func (p *Poller) AddRead(pa *PollAttachment) error { } // AddWrite registers the given file-descriptor with writable event to the poller. -func (p *Poller) AddWrite(pa *PollAttachment) error { +func (p *Poller) AddWrite(pa *PollAttachment, edgeTriggered bool) error { var evs [1]unix.Kevent_t evs[0].Ident = keventIdent(pa.FD) evs[0].Flags = unix.EV_ADD + if edgeTriggered { + evs[0].Flags |= unix.EV_CLEAR + } evs[0].Filter = unix.EVFILT_WRITE evs[0].Udata = (*byte)(unsafe.Pointer(pa)) _, err := unix.Kevent(p.fd, evs[:], nil, nil) @@ -216,7 +225,7 @@ func (p *Poller) AddWrite(pa *PollAttachment) error { } // ModRead renews the given file-descriptor with readable event in the poller. -func (p *Poller) ModRead(pa *PollAttachment) error { +func (p *Poller) ModRead(pa *PollAttachment, _ bool) error { var evs [1]unix.Kevent_t evs[0].Ident = keventIdent(pa.FD) evs[0].Flags = unix.EV_DELETE @@ -227,10 +236,13 @@ func (p *Poller) ModRead(pa *PollAttachment) error { } // ModReadWrite renews the given file-descriptor with readable and writable events in the poller. -func (p *Poller) ModReadWrite(pa *PollAttachment) error { +func (p *Poller) ModReadWrite(pa *PollAttachment, edgeTriggered bool) error { var evs [1]unix.Kevent_t evs[0].Ident = keventIdent(pa.FD) evs[0].Flags = unix.EV_ADD + if edgeTriggered { + evs[0].Flags |= unix.EV_CLEAR + } evs[0].Filter = unix.EVFILT_WRITE evs[0].Udata = (*byte)(unsafe.Pointer(pa)) _, err := unix.Kevent(p.fd, evs[:], nil, nil) diff --git a/internal/netpoll/fd_unix.go b/internal/socket/fd_unix.go similarity index 99% rename from internal/netpoll/fd_unix.go rename to internal/socket/fd_unix.go index 65fb730a0..32e1d5d10 100644 --- a/internal/netpoll/fd_unix.go +++ b/internal/socket/fd_unix.go @@ -15,7 +15,7 @@ //go:build linux || freebsd || dragonfly || netbsd || openbsd || darwin // +build linux freebsd dragonfly netbsd openbsd darwin -package netpoll +package socket import ( "sync/atomic" diff --git a/listener_unix.go b/listener_unix.go index ceb734187..95bf0171d 100644 --- a/listener_unix.go +++ b/listener_unix.go @@ -46,7 +46,7 @@ func (ln *listener) packPollAttachment(handler netpoll.PollEventHandler) *netpol } func (ln *listener) dup() (int, string, error) { - return netpoll.Dup(ln.fd) + return socket.Dup(ln.fd) } func (ln *listener) normalize() (err error) { diff --git a/options.go b/options.go index fb39e7fbf..7eb355ed3 100644 --- a/options.go +++ b/options.go @@ -68,7 +68,7 @@ type Options struct { // ============================= Options for both server-side and client-side ============================= - // ReadBufferCap is the maximum number of bytes that can be read from the peer when the readable event comes. + // ReadBufferCap is the maximum number of bytes that can be read from the remote when the readable event comes. // The default value is 64KB, it can either be reduced to avoid starving the subsequent connections or increased // to read more data from a socket. // @@ -122,6 +122,11 @@ type Options struct { // Logger is the customized logger for logging info, if it is not set, // then gnet will use the default logger powered by go.uber.org/zap. Logger logging.Logger + + // EdgeTriggeredIO enables the edge-triggered I/O for the underlying epoll/kqueue event-loop. + // Don't enable it unless you are 100% sure what you are doing. + // Note that this option is only available for TCP protocol. + EdgeTriggeredIO bool } // WithOptions sets up all options. @@ -249,3 +254,10 @@ func WithMulticastInterfaceIndex(idx int) Option { opts.MulticastInterfaceIndex = idx } } + +// WithEdgeTriggeredIO enables the edge-triggered I/O for the underlying epoll/kqueue event-loop. +func WithEdgeTriggeredIO(et bool) Option { + return func(opts *Options) { + opts.EdgeTriggeredIO = et + } +} diff --git a/pkg/buffer/elastic/elastic_buffer_test.go b/pkg/buffer/elastic/elastic_buffer_test.go index 971e75c27..f000ac00d 100644 --- a/pkg/buffer/elastic/elastic_buffer_test.go +++ b/pkg/buffer/elastic/elastic_buffer_test.go @@ -34,21 +34,24 @@ func TestMixedBuffer_Basic(t *testing.T) { require.EqualValues(t, newDataLen, mb.Buffered()) require.EqualValues(t, rbn, mb.ringBuffer.Buffered()) - bs := mb.Peek(-1) + bs, err := mb.Peek(-1) + require.NoError(t, err) var p []byte for _, b := range bs { p = append(p, b...) } require.EqualValues(t, data, p) - bs = mb.Peek(rbn) + bs, err = mb.Peek(rbn) + require.NoError(t, err) p = bs[0] require.EqualValues(t, data[:rbn], p) n, err = mb.Discard(rbn) require.NoError(t, err) require.EqualValues(t, rbn, n) require.NotNil(t, mb.ringBuffer) - bs = mb.Peek(newDataLen - rbn) + bs, err = mb.Peek(newDataLen - rbn) + require.NoError(t, err) p = bs[0] require.EqualValues(t, data[rbn:], p) n, err = mb.Discard(newDataLen - rbn) @@ -82,7 +85,8 @@ func TestMixedBuffer_Basic(t *testing.T) { require.NoError(t, err) require.EqualValues(t, cum-headCum, n) require.EqualValues(t, cum, mb.Buffered()) - bs = mb.Peek(-1) + bs, err = mb.Peek(-1) + require.NoError(t, err) p = p[:0] for _, b := range bs { p = append(p, b...) @@ -125,7 +129,8 @@ func TestMixedBuffer_ReadFrom(t *testing.T) { require.NoError(t, err) require.EqualValues(t, dataLen, m) require.EqualValues(t, data, buf) - bs := mb.Peek(dataLen) + bs, err := mb.Peek(dataLen) + require.NoError(t, err) var p []byte for _, b := range bs { p = append(p, b...) diff --git a/pkg/buffer/elastic/elastic_ring_list_buffer.go b/pkg/buffer/elastic/elastic_ring_list_buffer.go index d16fd3803..624b5af98 100644 --- a/pkg/buffer/elastic/elastic_ring_list_buffer.go +++ b/pkg/buffer/elastic/elastic_ring_list_buffer.go @@ -53,13 +53,15 @@ func (mb *Buffer) Read(p []byte) (n int, err error) { } // Peek returns n bytes as [][]byte, these bytes won't be discarded until Buffer.Discard() is called. -func (mb *Buffer) Peek(n int) [][]byte { - if n <= 0 { +func (mb *Buffer) Peek(n int) ([][]byte, error) { + if n <= 0 || n == math.MaxInt32 { n = math.MaxInt32 + } else if n > mb.Buffered() { + return nil, io.ErrShortBuffer } head, tail := mb.ringBuffer.Peek(n) - if mb.ringBuffer.Buffered() >= n { - return [][]byte{head, tail} + if mb.ringBuffer.Buffered() == n { + return [][]byte{head, tail}, nil } return mb.listBuffer.PeekWithBytes(n, head, tail) } diff --git a/pkg/buffer/linkedlist/linked_list_buffer.go b/pkg/buffer/linkedlist/linked_list_buffer.go index 1f1ea9fbe..0095422ae 100644 --- a/pkg/buffer/linkedlist/linked_list_buffer.go +++ b/pkg/buffer/linkedlist/linked_list_buffer.go @@ -58,9 +58,41 @@ func (llb *Buffer) Read(p []byte) (n int, err error) { return } } + if n == 0 { + err = io.EOF + } return } +// AllocNode allocates a []byte with the given length that is expected to +// be pushed into the Buffer. +func (llb *Buffer) AllocNode(n int) []byte { + return bsPool.Get(n) +} + +// FreeNode puts the given []byte back to the pool to free the memory. +func (llb *Buffer) FreeNode(p []byte) { + bsPool.Put(p) +} + +// Append is like PushBack but appends b without copying it. +func (llb *Buffer) Append(p []byte) { + n := len(p) + if n == 0 { + return + } + llb.pushBack(&node{buf: p}) +} + +// Pop removes and returns the buffer of the head or nil if the list is empty. +func (llb *Buffer) Pop() []byte { + n := llb.pop() + if n == nil { + return nil + } + return n.buf +} + // PushFront is a wrapper of pushFront, which accepts []byte as its argument. func (llb *Buffer) PushFront(p []byte) { n := len(p) @@ -85,43 +117,59 @@ func (llb *Buffer) PushBack(p []byte) { // Peek assembles the up to maxBytes of [][]byte based on the list of node, // it won't remove these nodes from l until Discard() is called. -func (llb *Buffer) Peek(maxBytes int) [][]byte { - if maxBytes <= 0 { +func (llb *Buffer) Peek(maxBytes int) ([][]byte, error) { + if maxBytes <= 0 || maxBytes == math.MaxInt32 { maxBytes = math.MaxInt32 + } else if maxBytes > llb.Buffered() { + return nil, io.ErrShortBuffer } llb.bs = llb.bs[:0] var cum int for iter := llb.head; iter != nil; iter = iter.next { - llb.bs = append(llb.bs, iter.buf) - if cum += iter.len(); cum >= maxBytes { + offset := iter.len() + if cum+offset > maxBytes { + offset = maxBytes - cum + } + llb.bs = append(llb.bs, iter.buf[:offset]) + if cum += offset; cum == maxBytes { break } } - return llb.bs + return llb.bs, nil } // PeekWithBytes is like Peek but accepts [][]byte and puts them onto head. -func (llb *Buffer) PeekWithBytes(maxBytes int, bs ...[]byte) [][]byte { - if maxBytes <= 0 { +func (llb *Buffer) PeekWithBytes(maxBytes int, bs ...[]byte) ([][]byte, error) { + if maxBytes <= 0 || maxBytes == math.MaxInt32 { maxBytes = math.MaxInt32 + } else if maxBytes > llb.Buffered() { + return nil, io.ErrShortBuffer } llb.bs = llb.bs[:0] var cum int for _, b := range bs { if n := len(b); n > 0 { - llb.bs = append(llb.bs, b) - if cum += n; cum >= maxBytes { - return llb.bs + offset := n + if cum+offset > maxBytes { + offset = maxBytes - cum + } + llb.bs = append(llb.bs, b[:offset]) + if cum += offset; cum == maxBytes { + return llb.bs, nil } } } for iter := llb.head; iter != nil; iter = iter.next { - llb.bs = append(llb.bs, iter.buf) - if cum += iter.len(); cum >= maxBytes { + offset := iter.len() + if cum+offset > maxBytes { + offset = maxBytes - cum + } + llb.bs = append(llb.bs, iter.buf[:offset]) + if cum += offset; cum == maxBytes { break } } - return llb.bs + return llb.bs, nil } // Discard removes some nodes based on n bytes. diff --git a/pkg/buffer/linkedlist/llbuffer_test.go b/pkg/buffer/linkedlist/llbuffer_test.go index fb5c61477..1a755ca0d 100644 --- a/pkg/buffer/linkedlist/llbuffer_test.go +++ b/pkg/buffer/linkedlist/llbuffer_test.go @@ -28,25 +28,27 @@ func TestLinkedListBuffer_Basic(t *testing.T) { require.EqualValues(t, maxBlocks, llb.Len()) require.EqualValues(t, cum, llb.Buffered()) - bs := llb.Peek(cum / 4) + bs, err := llb.Peek(cum / 4) + require.NoError(t, err) var p []byte for _, b := range bs { p = append(p, b...) } pn := len(p) - require.GreaterOrEqual(t, pn, cum/4) + require.EqualValues(t, pn, cum/4) require.EqualValues(t, buf.Bytes()[:pn], p) tmpA := make([]byte, cum/16) tmpB := make([]byte, cum/16) rand.Read(tmpA) rand.Read(tmpB) - bs = llb.PeekWithBytes(cum/4, tmpA, tmpB) + bs, err = llb.PeekWithBytes(cum/4, tmpA, tmpB) + require.NoError(t, err) p = p[:0] for _, b := range bs { p = append(p, b...) } pn = len(p) - require.GreaterOrEqual(t, pn, cum/4) + require.EqualValues(t, pn, cum/4) var tmpBuf bytes.Buffer tmpBuf.Write(tmpA) tmpBuf.Write(tmpB) diff --git a/reactor_default_bsd.go b/reactor_default_bsd.go deleted file mode 100644 index fa0a7d7bb..000000000 --- a/reactor_default_bsd.go +++ /dev/null @@ -1,127 +0,0 @@ -// Copyright (c) 2019 The Gnet Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build (freebsd || dragonfly || netbsd || openbsd || darwin) && !poll_opt -// +build freebsd dragonfly netbsd openbsd darwin -// +build !poll_opt - -package gnet - -import ( - "io" - "runtime" - - "github.com/panjf2000/gnet/v2/internal/netpoll" - "github.com/panjf2000/gnet/v2/pkg/errors" -) - -func (el *eventloop) activateMainReactor() error { - if el.engine.opts.LockOSThread { - runtime.LockOSThread() - defer runtime.UnlockOSThread() - } - - err := el.poller.Polling(el.engine.accept) - if err == errors.ErrEngineShutdown { - el.engine.opts.Logger.Debugf("main reactor is exiting in terms of the demand from user, %v", err) - err = nil - } else if err != nil { - el.engine.opts.Logger.Errorf("main reactor is exiting due to error: %v", err) - } - - el.engine.shutdown(err) - - return err -} - -func (el *eventloop) activateSubReactor() error { - if el.engine.opts.LockOSThread { - runtime.LockOSThread() - defer runtime.UnlockOSThread() - } - - err := el.poller.Polling(func(fd int, filter int16, flags uint16) (err error) { - if c := el.connections.getConn(fd); c != nil { - switch { - case flags&netpoll.EVFlagsDelete != 0: - case flags&netpoll.EVFlagsEOF != 0: - switch { - case filter == netpoll.EVFilterRead: // read the remaining data after the peer wrote and closed immediately - err = el.read(c) - case filter == netpoll.EVFilterWrite && !c.outboundBuffer.IsEmpty(): - err = el.write(c) - default: - err = el.close(c, io.EOF) - } - case filter == netpoll.EVFilterRead: - err = el.read(c) - case filter == netpoll.EVFilterWrite && !c.outboundBuffer.IsEmpty(): - err = el.write(c) - } - } - return - }) - if err == errors.ErrEngineShutdown { - el.engine.opts.Logger.Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err) - err = nil - } else if err != nil { - el.engine.opts.Logger.Errorf("event-loop(%d) is exiting due to error: %v", el.idx, err) - } - - el.closeConns() - el.engine.shutdown(err) - - return err -} - -func (el *eventloop) run() error { - if el.engine.opts.LockOSThread { - runtime.LockOSThread() - defer runtime.UnlockOSThread() - } - - err := el.poller.Polling(func(fd int, filter int16, flags uint16) (err error) { - if c := el.connections.getConn(fd); c != nil { - switch { - case flags&netpoll.EVFlagsDelete != 0: - case flags&netpoll.EVFlagsEOF != 0: - switch { - case filter == netpoll.EVFilterRead: // read the remaining data after the peer wrote and closed immediately - err = el.read(c) - case filter == netpoll.EVFilterWrite && !c.outboundBuffer.IsEmpty(): - err = el.write(c) - default: - err = el.close(c, io.EOF) - } - case filter == netpoll.EVFilterRead: - err = el.read(c) - case filter == netpoll.EVFilterWrite && !c.outboundBuffer.IsEmpty(): - err = el.write(c) - } - return - } - return el.accept(fd, filter, flags) - }) - if err == errors.ErrEngineShutdown { - el.engine.opts.Logger.Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err) - err = nil - } else if err != nil { - el.engine.opts.Logger.Errorf("event-loop(%d) is exiting due to error: %v", el.idx, err) - } - - el.closeConns() - el.engine.shutdown(err) - - return err -} diff --git a/reactor_default_linux.go b/reactor_default_linux.go deleted file mode 100644 index 7fb863f11..000000000 --- a/reactor_default_linux.go +++ /dev/null @@ -1,134 +0,0 @@ -// Copyright (c) 2019 The Gnet Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build !poll_opt -// +build !poll_opt - -package gnet - -import ( - "runtime" - - "github.com/panjf2000/gnet/v2/internal/netpoll" - "github.com/panjf2000/gnet/v2/pkg/errors" -) - -func (el *eventloop) activateMainReactor() error { - if el.engine.opts.LockOSThread { - runtime.LockOSThread() - defer runtime.UnlockOSThread() - } - - err := el.poller.Polling(el.engine.accept) - if err == errors.ErrEngineShutdown { - el.engine.opts.Logger.Debugf("main reactor is exiting in terms of the demand from user, %v", err) - err = nil - } else if err != nil { - el.engine.opts.Logger.Errorf("main reactor is exiting due to error: %v", err) - } - - el.engine.shutdown(err) - - return err -} - -func (el *eventloop) activateSubReactor() error { - if el.engine.opts.LockOSThread { - runtime.LockOSThread() - defer runtime.UnlockOSThread() - } - - err := el.poller.Polling(func(fd int, ev uint32) error { - if c := el.connections.getConn(fd); c != nil { - // Don't change the ordering of processing EPOLLOUT | EPOLLRDHUP / EPOLLIN unless you're 100% - // sure what you're doing! - // Re-ordering can easily introduce bugs and bad side-effects, as I found out painfully in the past. - - // We should always check for the EPOLLOUT event first, as we must try to send the leftover data back to - // the peer when any error occurs on a connection. - // - // Either an EPOLLOUT or EPOLLERR event may be fired when a connection is refused. - // In either case write() should take care of it properly: - // 1) writing data back, - // 2) closing the connection. - if ev&netpoll.OutEvents != 0 && !c.outboundBuffer.IsEmpty() { - if err := el.write(c); err != nil { - return err - } - } - if ev&netpoll.InEvents != 0 { - return el.read(c) - } - return nil - } - return nil - }) - - if err == errors.ErrEngineShutdown { - el.engine.opts.Logger.Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err) - err = nil - } else if err != nil { - el.engine.opts.Logger.Errorf("event-loop(%d) is exiting due to error: %v", el.idx, err) - } - - el.closeConns() - el.engine.shutdown(err) - - return err -} - -func (el *eventloop) run() error { - if el.engine.opts.LockOSThread { - runtime.LockOSThread() - defer runtime.UnlockOSThread() - } - - err := el.poller.Polling(func(fd int, ev uint32) error { - if c := el.connections.getConn(fd); c != nil { - // Don't change the ordering of processing EPOLLOUT | EPOLLRDHUP / EPOLLIN unless you're 100% - // sure what you're doing! - // Re-ordering can easily introduce bugs and bad side-effects, as I found out painfully in the past. - - // We should always check for the EPOLLOUT event first, as we must try to send the leftover data back to - // the peer when any error occurs on a connection. - // - // Either an EPOLLOUT or EPOLLERR event may be fired when a connection is refused. - // In either case write() should take care of it properly: - // 1) writing data back, - // 2) closing the connection. - if ev&netpoll.OutEvents != 0 && !c.outboundBuffer.IsEmpty() { - if err := el.write(c); err != nil { - return err - } - } - if ev&netpoll.InEvents != 0 { - return el.read(c) - } - return nil - } - return el.accept(fd, ev) - }) - - if err == errors.ErrEngineShutdown { - el.engine.opts.Logger.Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err) - err = nil - } else if err != nil { - el.engine.opts.Logger.Errorf("event-loop(%d) is exiting due to error: %v", el.idx, err) - } - - el.closeConns() - el.engine.shutdown(err) - - return err -} diff --git a/reactor_epoll_default.go b/reactor_epoll_default.go new file mode 100644 index 000000000..b2c514dd5 --- /dev/null +++ b/reactor_epoll_default.go @@ -0,0 +1,189 @@ +// Copyright (c) 2019 The Gnet Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build linux && !poll_opt +// +build linux,!poll_opt + +package gnet + +import ( + "io" + "runtime" + + "golang.org/x/sys/unix" + + "github.com/panjf2000/gnet/v2/internal/netpoll" + "github.com/panjf2000/gnet/v2/pkg/errors" +) + +func (el *eventloop) rotate() error { + if el.engine.opts.LockOSThread { + runtime.LockOSThread() + defer runtime.UnlockOSThread() + } + + err := el.poller.Polling(el.engine.accept) + if err == errors.ErrEngineShutdown { + el.getLogger().Debugf("main reactor is exiting in terms of the demand from user, %v", err) + err = nil + } else if err != nil { + el.getLogger().Errorf("main reactor is exiting due to error: %v", err) + } + + el.engine.shutdown(err) + + return err +} + +func (el *eventloop) orbit() error { + if el.engine.opts.LockOSThread { + runtime.LockOSThread() + defer runtime.UnlockOSThread() + } + + err := el.poller.Polling(func(fd int, ev uint32) error { + c := el.connections.getConn(fd) + if c == nil { + // Somehow epoll notify with an event for a stale fd that is not in our connection set. + // We need to delete it from the epoll set. + return el.poller.Delete(fd) + } + + // First check for any unexpected non-IO events. + // For these events we just close the corresponding connection directly. + if ev&netpoll.ErrEvents != 0 && ev&unix.EPOLLIN == 0 && ev&unix.EPOLLOUT == 0 { + c.outboundBuffer.Release() // don't bother to write to a connection with some unknown error + return el.close(c, io.EOF) + } + // Secondly, check for EPOLLOUT before EPOLLIN, the former has a higher priority + // than the latter regardless of the aliveness of the current connection: + // + // 1. When the connection is alive and the system is overloaded, we want to + // offload the incoming traffic by writing all pending data back to the remotes + // before continuing to read and handle requests. + // 2. When the connection is dead, we need to try writing any pending data back + // to the remote and close the connection first. + // + // We perform eventloop.write for EPOLLOUT because it will take good care of either case. + if ev&(unix.EPOLLOUT|unix.EPOLLERR) != 0 { + if err := el.write(c); err != nil { + return err + } + } + // Check for EPOLLIN before EPOLLRDHUP in case that there are pending data in + // the socket buffer. + if ev&(unix.EPOLLIN|unix.EPOLLERR) != 0 { + if err := el.read(c); err != nil { + return err + } + } + // Ultimately, check for EPOLLRDHUP, this event indicates that the remote has + // either closed connection or shut down the writing half of the connection. + if ev&unix.EPOLLRDHUP != 0 && c.opened { + if ev&unix.EPOLLIN == 0 { // unreadable EPOLLRDHUP, close the connection directly + return el.close(c, io.EOF) + } + // Received the event of EPOLLIN | EPOLLRDHUP, but the previous eventloop.read + // failed to drain the socket buffer, so we make sure we get it done this time. + c.isEOF = true + return el.read(c) + } + return nil + }) + + if err == errors.ErrEngineShutdown { + el.getLogger().Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err) + err = nil + } else if err != nil { + el.getLogger().Errorf("event-loop(%d) is exiting due to error: %v", el.idx, err) + } + + el.closeConns() + el.engine.shutdown(err) + + return err +} + +func (el *eventloop) run() error { + if el.engine.opts.LockOSThread { + runtime.LockOSThread() + defer runtime.UnlockOSThread() + } + + err := el.poller.Polling(func(fd int, ev uint32) error { + c := el.connections.getConn(fd) + if c == nil { + if fd == el.ln.fd { + return el.accept(fd, ev) + } + // Somehow epoll notify with an event for a stale fd that is not in our connection set. + // We need to delete it from the epoll set. + return el.poller.Delete(fd) + + } + + // First check for any unexpected non-IO events. + // For these events we just close the corresponding connection directly. + if ev&netpoll.ErrEvents != 0 && ev&unix.EPOLLIN == 0 && ev&unix.EPOLLOUT == 0 { + c.outboundBuffer.Release() // don't bother to write to a connection with some unknown error + return el.close(c, io.EOF) + } + // Secondly, check for EPOLLOUT before EPOLLIN, the former has a higher priority + // than the latter regardless of the aliveness of the current connection: + // + // 1. When the connection is alive and the system is overloaded, we want to + // offload the incoming traffic by writing all pending data back to the remotes + // before continuing to read and handle requests. + // 2. When the connection is dead, we need to try writing any pending data back + // to the remote and close the connection first. + // + // We perform eventloop.write for EPOLLOUT because it will take good care of either case. + if ev&(unix.EPOLLOUT|unix.EPOLLERR) != 0 { + if err := el.write(c); err != nil { + return err + } + } + // Check for EPOLLIN before EPOLLRDHUP in case that there are pending data in + // the socket buffer. + if ev&(unix.EPOLLIN|unix.EPOLLERR) != 0 { + if err := el.read(c); err != nil { + return err + } + } + // Ultimately, check for EPOLLRDHUP, this event indicates that the remote has + // either closed connection or shut down the writing half of the connection. + if ev&unix.EPOLLRDHUP != 0 && c.opened { + if ev&unix.EPOLLIN == 0 { // unreadable EPOLLRDHUP, close the connection directly + return el.close(c, io.EOF) + } + // Received the event of EPOLLIN | EPOLLRDHUP, but the previous eventloop.read + // failed to drain the socket buffer, so we make sure we get it done this time. + c.isEOF = true + return el.read(c) + } + return nil + }) + + if err == errors.ErrEngineShutdown { + el.getLogger().Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err) + err = nil + } else if err != nil { + el.getLogger().Errorf("event-loop(%d) is exiting due to error: %v", el.idx, err) + } + + el.closeConns() + el.engine.shutdown(err) + + return err +} diff --git a/reactor_optimized_linux.go b/reactor_epoll_ultimate.go similarity index 67% rename from reactor_optimized_linux.go rename to reactor_epoll_ultimate.go index 73312d9d1..49e1477ba 100644 --- a/reactor_optimized_linux.go +++ b/reactor_epoll_ultimate.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build poll_opt -// +build poll_opt +//go:build linux && poll_opt +// +build linux,poll_opt package gnet @@ -23,7 +23,7 @@ import ( "github.com/panjf2000/gnet/v2/pkg/errors" ) -func (el *eventloop) activateMainReactor() error { +func (el *eventloop) rotate() error { if el.engine.opts.LockOSThread { runtime.LockOSThread() defer runtime.UnlockOSThread() @@ -31,10 +31,10 @@ func (el *eventloop) activateMainReactor() error { err := el.poller.Polling() if err == errors.ErrEngineShutdown { - el.engine.opts.Logger.Debugf("main reactor is exiting in terms of the demand from user, %v", err) + el.getLogger().Debugf("main reactor is exiting in terms of the demand from user, %v", err) err = nil } else if err != nil { - el.engine.opts.Logger.Errorf("main reactor is exiting due to error: %v", err) + el.getLogger().Errorf("main reactor is exiting due to error: %v", err) } el.engine.shutdown(err) @@ -42,7 +42,7 @@ func (el *eventloop) activateMainReactor() error { return err } -func (el *eventloop) activateSubReactor() error { +func (el *eventloop) orbit() error { if el.engine.opts.LockOSThread { runtime.LockOSThread() defer runtime.UnlockOSThread() @@ -50,10 +50,10 @@ func (el *eventloop) activateSubReactor() error { err := el.poller.Polling() if err == errors.ErrEngineShutdown { - el.engine.opts.Logger.Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err) + el.getLogger().Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err) err = nil } else if err != nil { - el.engine.opts.Logger.Errorf("event-loop(%d) is exiting due to error: %v", el.idx, err) + el.getLogger().Errorf("event-loop(%d) is exiting due to error: %v", el.idx, err) } el.closeConns() @@ -70,10 +70,10 @@ func (el *eventloop) run() error { err := el.poller.Polling() if err == errors.ErrEngineShutdown { - el.engine.opts.Logger.Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err) + el.getLogger().Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err) err = nil } else if err != nil { - el.engine.opts.Logger.Errorf("event-loop(%d) is exiting due to error: %v", el.idx, err) + el.getLogger().Errorf("event-loop(%d) is exiting due to error: %v", el.idx, err) } el.closeConns() diff --git a/reactor_kqueue_default.go b/reactor_kqueue_default.go new file mode 100644 index 000000000..4cd533618 --- /dev/null +++ b/reactor_kqueue_default.go @@ -0,0 +1,166 @@ +// Copyright (c) 2019 The Gnet Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build (freebsd || dragonfly || netbsd || openbsd || darwin) && !poll_opt +// +build freebsd dragonfly netbsd openbsd darwin +// +build !poll_opt + +package gnet + +import ( + "io" + "runtime" + + "golang.org/x/sys/unix" + + "github.com/panjf2000/gnet/v2/pkg/errors" +) + +func (el *eventloop) rotate() error { + if el.engine.opts.LockOSThread { + runtime.LockOSThread() + defer runtime.UnlockOSThread() + } + + err := el.poller.Polling(el.engine.accept) + if err == errors.ErrEngineShutdown { + el.getLogger().Debugf("main reactor is exiting in terms of the demand from user, %v", err) + err = nil + } else if err != nil { + el.getLogger().Errorf("main reactor is exiting due to error: %v", err) + } + + el.engine.shutdown(err) + + return err +} + +func (el *eventloop) orbit() error { + if el.engine.opts.LockOSThread { + runtime.LockOSThread() + defer runtime.UnlockOSThread() + } + + err := el.poller.Polling(func(fd int, filter int16, flags uint16) (err error) { + c := el.connections.getConn(fd) + if c == nil { + // This might happen when the connection has already been closed, + // the file descriptor will be deleted from kqueue automatically + // as documented in the manual pages, So we just print a warning log. + el.getLogger().Warnf("received event[fd=%d|filter=%d|flags=%d] of a stale connection from event-loop(%d)", fd, filter, flags, el.idx) + return + } + + switch filter { + case unix.EVFILT_READ: + err = el.read(c) + case unix.EVFILT_WRITE: + err = el.write(c) + } + // EV_EOF indicates that the remote has closed the connection. + // We check for EV_EOF after processing the read/write event + // to ensure that nothing is left out on this event filter. + if flags&unix.EV_EOF != 0 && c.opened && err == nil { + switch filter { + case unix.EVFILT_READ: + // Receive the event of EVFILT_READ | EV_EOF, but the previous eventloop.read + // failed to drain the socket buffer, so we make sure we get it done this time. + c.isEOF = true + err = el.read(c) + case unix.EVFILT_WRITE: + // On macOS, the kqueue in both LT and ET mode will notify with one event for the EOF + // of the TCP remote: EVFILT_READ|EV_ADD|EV_CLEAR|EV_EOF. But for some reason, two + // events will be issued in ET mode for the EOF of the Unix remote in this order: + // 1) EVFILT_WRITE|EV_ADD|EV_CLEAR|EV_EOF, 2) EVFILT_READ|EV_ADD|EV_CLEAR|EV_EOF. + err = el.write(c) + default: + c.outboundBuffer.Release() // don't bother to write to a connection with some unknown error + err = el.close(c, io.EOF) + } + } + return + }) + if err == errors.ErrEngineShutdown { + el.getLogger().Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err) + err = nil + } else if err != nil { + el.getLogger().Errorf("event-loop(%d) is exiting due to error: %v", el.idx, err) + } + + el.closeConns() + el.engine.shutdown(err) + + return err +} + +func (el *eventloop) run() error { + if el.engine.opts.LockOSThread { + runtime.LockOSThread() + defer runtime.UnlockOSThread() + } + + err := el.poller.Polling(func(fd int, filter int16, flags uint16) (err error) { + c := el.connections.getConn(fd) + if c == nil { + if fd == el.ln.fd { + return el.accept(fd, filter, flags) + } + // This might happen when the connection has already been closed, + // the file descriptor will be deleted from kqueue automatically + // as documented in the manual pages, So we just print a warning log. + el.getLogger().Warnf("received event[fd=%d|filter=%d|flags=%d] of a stale connection from event-loop(%d)", fd, filter, flags, el.idx) + return + } + + switch filter { + case unix.EVFILT_READ: + err = el.read(c) + case unix.EVFILT_WRITE: + err = el.write(c) + } + // EV_EOF indicates that the remote has closed the connection. + // We check for EV_EOF after processing the read/write event + // to ensure that nothing is left out on this event filter. + if flags&unix.EV_EOF != 0 && c.opened && err == nil { + switch filter { + case unix.EVFILT_READ: + // Receive the event of EVFILT_READ | EV_EOF, but the previous eventloop.read + // failed to drain the socket buffer, so we make sure we get it done this time. + c.isEOF = true + err = el.read(c) + case unix.EVFILT_WRITE: + // On macOS, the kqueue in both LT and ET mode will notify with one event for the EOF + // of the TCP remote: EVFILT_READ|EV_ADD|EV_CLEAR|EV_EOF. But for some reason, two + // events will be issued in ET mode for the EOF of the Unix remote in this order: + // 1) EVFILT_WRITE|EV_ADD|EV_CLEAR|EV_EOF, 2) EVFILT_READ|EV_ADD|EV_CLEAR|EV_EOF. + err = el.write(c) + default: + c.outboundBuffer.Release() // don't bother to write to a connection with some unknown error + err = el.close(c, io.EOF) + } + } + return + }) + if err == errors.ErrEngineShutdown { + el.getLogger().Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err) + err = nil + } else if err != nil { + el.getLogger().Errorf("event-loop(%d) is exiting due to error: %v", el.idx, err) + } + + el.closeConns() + el.engine.shutdown(err) + + return err +} diff --git a/reactor_optimized_bsd.go b/reactor_kqueue_ultimate.go similarity index 70% rename from reactor_optimized_bsd.go rename to reactor_kqueue_ultimate.go index 2938fa553..e09c296df 100644 --- a/reactor_optimized_bsd.go +++ b/reactor_kqueue_ultimate.go @@ -24,7 +24,7 @@ import ( "github.com/panjf2000/gnet/v2/pkg/errors" ) -func (el *eventloop) activateMainReactor() error { +func (el *eventloop) rotate() error { if el.engine.opts.LockOSThread { runtime.LockOSThread() defer runtime.UnlockOSThread() @@ -32,10 +32,10 @@ func (el *eventloop) activateMainReactor() error { err := el.poller.Polling() if err == errors.ErrEngineShutdown { - el.engine.opts.Logger.Debugf("main reactor is exiting in terms of the demand from user, %v", err) + el.getLogger().Debugf("main reactor is exiting in terms of the demand from user, %v", err) err = nil } else if err != nil { - el.engine.opts.Logger.Errorf("main reactor is exiting due to error: %v", err) + el.getLogger().Errorf("main reactor is exiting due to error: %v", err) } el.engine.shutdown(err) @@ -43,7 +43,7 @@ func (el *eventloop) activateMainReactor() error { return err } -func (el *eventloop) activateSubReactor() error { +func (el *eventloop) orbit() error { if el.engine.opts.LockOSThread { runtime.LockOSThread() defer runtime.UnlockOSThread() @@ -51,10 +51,10 @@ func (el *eventloop) activateSubReactor() error { err := el.poller.Polling() if err == errors.ErrEngineShutdown { - el.engine.opts.Logger.Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err) + el.getLogger().Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err) err = nil } else if err != nil { - el.engine.opts.Logger.Errorf("event-loop(%d) is exiting due to error: %v", el.idx, err) + el.getLogger().Errorf("event-loop(%d) is exiting due to error: %v", el.idx, err) } el.closeConns() @@ -71,10 +71,10 @@ func (el *eventloop) run() error { err := el.poller.Polling() if err == errors.ErrEngineShutdown { - el.engine.opts.Logger.Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err) + el.getLogger().Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err) err = nil } else if err != nil { - el.engine.opts.Logger.Errorf("event-loop(%d) is exiting due to error: %v", el.idx, err) + el.getLogger().Errorf("event-loop(%d) is exiting due to error: %v", el.idx, err) } el.closeConns()