diff --git a/acceptor_unix.go b/acceptor_unix.go index 1afad82b4..fe7d67cab 100644 --- a/acceptor_unix.go +++ b/acceptor_unix.go @@ -33,7 +33,7 @@ func (el *eventloop) accept0(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error nfd, sa, err := socket.Accept(fd) if err != nil { switch err { - case unix.EAGAIN: // the Accept queue has been drained, we can return now + case unix.EAGAIN: // the Accept queue has been drained out, we can return now return nil case unix.EINTR, unix.ECONNRESET, unix.ECONNABORTED: // ECONNRESET or ECONNABORTED could indicate that a socket @@ -93,16 +93,5 @@ func (el *eventloop) accept(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) e } c := newTCPConn(nfd, el, sa, el.listeners[fd].addr, remoteAddr) - addEvents := el.poller.AddRead - if el.engine.opts.EdgeTriggeredIO { - addEvents = el.poller.AddReadWrite - } - if err = addEvents(&c.pollAttachment, el.engine.opts.EdgeTriggeredIO); err != nil { - el.getLogger().Errorf("failed to register the accepted socket fd=%d to poller: %v", c.fd, err) - _ = unix.Close(c.fd) - c.release() - return err - } - el.connections.addConn(c, el.idx) - return el.open(c) + return el.register0(c) } diff --git a/connection_bsd.go b/connection_bsd.go index b24becffb..3223f01e4 100644 --- a/connection_bsd.go +++ b/connection_bsd.go @@ -39,14 +39,14 @@ func (c *conn) processIO(_ int, filter netpoll.IOEvent, flags netpoll.IOFlags) ( if flags&unix.EV_EOF != 0 && c.opened && err == nil { switch filter { case unix.EVFILT_READ: - // Receive the event of EVFILT_READ | EV_EOF, but the previous eventloop.read + // Received the event of EVFILT_READ|EV_EOF, but the previous eventloop.read // failed to drain the socket buffer, so we make sure we get it done this time. c.isEOF = true err = el.read(c) case unix.EVFILT_WRITE: - // On macOS, the kqueue in both LT and ET mode will notify with one event for the EOF - // of the TCP remote: EVFILT_READ|EV_ADD|EV_CLEAR|EV_EOF. But for some reason, two - // events will be issued in ET mode for the EOF of the Unix remote in this order: + // On macOS, the kqueue in either LT or ET mode will notify with one event for the + // EOF of the TCP remote: EVFILT_READ|EV_ADD|EV_CLEAR|EV_EOF. But for some reason, + // two events will be issued in ET mode for the EOF of the Unix remote in this order: // 1) EVFILT_WRITE|EV_ADD|EV_CLEAR|EV_EOF, 2) EVFILT_READ|EV_ADD|EV_CLEAR|EV_EOF. err = el.write(c) default: diff --git a/connection_linux.go b/connection_linux.go index ac8376e4c..d14e1f32e 100644 --- a/connection_linux.go +++ b/connection_linux.go @@ -28,7 +28,7 @@ import ( func (c *conn) processIO(_ int, ev netpoll.IOEvent, _ netpoll.IOFlags) error { el := c.loop // First check for any unexpected non-IO events. - // For these events we just close the corresponding connection directly. + // For these events we just close the connection directly. if ev&netpoll.ErrEvents != 0 && ev&unix.EPOLLIN == 0 && ev&unix.EPOLLOUT == 0 { c.outboundBuffer.Release() // don't bother to write to a connection with some unknown error return el.close(c, io.EOF) @@ -40,9 +40,9 @@ func (c *conn) processIO(_ int, ev netpoll.IOEvent, _ netpoll.IOFlags) error { // offload the incoming traffic by writing all pending data back to the remotes // before continuing to read and handle requests. // 2. When the connection is dead, we need to try writing any pending data back - // to the remote and close the connection first. + // to the remote first and then close the connection. // - // We perform eventloop.write for EPOLLOUT because it will take good care of either case. + // We perform eventloop.write for EPOLLOUT because it can take good care of either case. if ev&(unix.EPOLLOUT|unix.EPOLLERR) != 0 { if err := el.write(c); err != nil { return err @@ -61,8 +61,8 @@ func (c *conn) processIO(_ int, ev netpoll.IOEvent, _ netpoll.IOFlags) error { if ev&unix.EPOLLIN == 0 { // unreadable EPOLLRDHUP, close the connection directly return el.close(c, io.EOF) } - // Received the event of EPOLLIN | EPOLLRDHUP, but the previous eventloop.read - // failed to drain the socket buffer, so we make sure we get it done this time. + // Received the event of EPOLLIN|EPOLLRDHUP, but the previous eventloop.read + // failed to drain the socket buffer, so we ensure to get it done this time. c.isEOF = true return el.read(c) } diff --git a/eventloop_unix.go b/eventloop_unix.go index c0b326804..b7530a966 100644 --- a/eventloop_unix.go +++ b/eventloop_unix.go @@ -75,20 +75,20 @@ func (el *eventloop) register(itf interface{}) error { c = ccb.c defer ccb.cb() } + return el.register0(c) +} +func (el *eventloop) register0(c *conn) error { addEvents := el.poller.AddRead if el.engine.opts.EdgeTriggeredIO { addEvents = el.poller.AddReadWrite } - if err := addEvents(&c.pollAttachment, el.engine.opts.EdgeTriggeredIO); err != nil { _ = unix.Close(c.fd) c.release() return err } - el.connections.addConn(c, el.idx) - if c.isDatagram && c.remote != nil { return nil } diff --git a/eventloop_windows.go b/eventloop_windows.go index 074318c58..ea0f87377 100644 --- a/eventloop_windows.go +++ b/eventloop_windows.go @@ -17,12 +17,13 @@ package gnet import ( "bytes" "context" + "errors" "runtime" "strings" "sync/atomic" "time" - "github.com/panjf2000/gnet/v2/pkg/errors" + errorx "github.com/panjf2000/gnet/v2/pkg/errors" "github.com/panjf2000/gnet/v2/pkg/logging" ) @@ -79,7 +80,7 @@ func (el *eventloop) run() (err error) { err = v() } - if err == errors.ErrEngineShutdown { + if errors.Is(err, errorx.ErrEngineShutdown) { el.getLogger().Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err) break } else if err != nil { @@ -121,7 +122,7 @@ func (el *eventloop) read(c *conn) error { case Close: return el.close(c, nil) case Shutdown: - return errors.ErrEngineShutdown + return errorx.ErrEngineShutdown } _, _ = c.inboundBuffer.Write(c.buffer.B) c.buffer.Reset() @@ -132,7 +133,7 @@ func (el *eventloop) read(c *conn) error { func (el *eventloop) readUDP(c *conn) error { action := el.eventHandler.OnTraffic(c) if action == Shutdown { - return errors.ErrEngineShutdown + return errorx.ErrEngineShutdown } c.release() return nil @@ -160,7 +161,7 @@ func (el *eventloop) ticker(ctx context.Context) { case Shutdown: if !shutdown { shutdown = true - el.ch <- errors.ErrEngineShutdown + el.ch <- errorx.ErrEngineShutdown el.getLogger().Debugf("stopping ticker in event-loop(%d) from Tick()", el.idx) } } @@ -220,7 +221,7 @@ func (el *eventloop) handleAction(c *conn, action Action) error { case Close: return el.close(c, nil) case Shutdown: - return errors.ErrEngineShutdown + return errorx.ErrEngineShutdown default: return nil } diff --git a/gnet_test.go b/gnet_test.go index fec73ad16..11c7df997 100644 --- a/gnet_test.go +++ b/gnet_test.go @@ -1484,7 +1484,7 @@ func (s *simServer) OnTraffic(c Conn) (action Action) { var packets [][]byte for { data, err := codec.Decode(c) - if err == errIncompletePacket { + if errors.Is(err, errIncompletePacket) { break } if err != nil { diff --git a/reactor_epoll_default.go b/reactor_epoll_default.go index 83be6d525..4f150e6ec 100644 --- a/reactor_epoll_default.go +++ b/reactor_epoll_default.go @@ -18,13 +18,11 @@ package gnet import ( - "io" + "errors" "runtime" - "golang.org/x/sys/unix" - "github.com/panjf2000/gnet/v2/internal/netpoll" - "github.com/panjf2000/gnet/v2/pkg/errors" + errorx "github.com/panjf2000/gnet/v2/pkg/errors" ) func (el *eventloop) rotate() error { @@ -34,7 +32,7 @@ func (el *eventloop) rotate() error { } err := el.poller.Polling(el.accept0) - if err == errors.ErrEngineShutdown { + if errors.Is(err, errorx.ErrEngineShutdown) { el.getLogger().Debugf("main reactor is exiting in terms of the demand from user, %v", err) err = nil } else if err != nil { @@ -52,57 +50,16 @@ func (el *eventloop) orbit() error { defer runtime.UnlockOSThread() } - err := el.poller.Polling(func(fd int, ev netpoll.IOEvent, _ netpoll.IOFlags) error { + err := el.poller.Polling(func(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) error { c := el.connections.getConn(fd) if c == nil { // Somehow epoll notified with an event for a stale fd that is not in our connection set. // We need to delete it from the epoll set. return el.poller.Delete(fd) } - - // First check for any unexpected non-IO events. - // For these events we just close the corresponding connection directly. - if ev&netpoll.ErrEvents != 0 && ev&unix.EPOLLIN == 0 && ev&unix.EPOLLOUT == 0 { - c.outboundBuffer.Release() // don't bother to write to a connection with some unknown error - return el.close(c, io.EOF) - } - // Secondly, check for EPOLLOUT before EPOLLIN, the former has a higher priority - // than the latter regardless of the aliveness of the current connection: - // - // 1. When the connection is alive and the system is overloaded, we want to - // offload the incoming traffic by writing all pending data back to the remotes - // before continuing to read and handle requests. - // 2. When the connection is dead, we need to try writing any pending data back - // to the remote and close the connection first. - // - // We perform eventloop.write for EPOLLOUT because it will take good care of either case. - if ev&(unix.EPOLLOUT|unix.EPOLLERR) != 0 { - if err := el.write(c); err != nil { - return err - } - } - // Check for EPOLLIN before EPOLLRDHUP in case that there are pending data in - // the socket buffer. - if ev&(unix.EPOLLIN|unix.EPOLLERR) != 0 { - if err := el.read(c); err != nil { - return err - } - } - // Ultimately, check for EPOLLRDHUP, this event indicates that the remote has - // either closed connection or shut down the writing half of the connection. - if ev&unix.EPOLLRDHUP != 0 && c.opened { - if ev&unix.EPOLLIN == 0 { // unreadable EPOLLRDHUP, close the connection directly - return el.close(c, io.EOF) - } - // Received the event of EPOLLIN | EPOLLRDHUP, but the previous eventloop.read - // failed to drain the socket buffer, so we make sure we get it done this time. - c.isEOF = true - return el.read(c) - } - return nil + return c.processIO(fd, ev, flags) }) - - if err == errors.ErrEngineShutdown { + if errors.Is(err, errorx.ErrEngineShutdown) { el.getLogger().Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err) err = nil } else if err != nil { @@ -130,52 +87,10 @@ func (el *eventloop) run() error { // Somehow epoll notified with an event for a stale fd that is not in our connection set. // We need to delete it from the epoll set. return el.poller.Delete(fd) - - } - - // First check for any unexpected non-IO events. - // For these events we just close the corresponding connection directly. - if ev&netpoll.ErrEvents != 0 && ev&unix.EPOLLIN == 0 && ev&unix.EPOLLOUT == 0 { - c.outboundBuffer.Release() // don't bother to write to a connection with some unknown error - return el.close(c, io.EOF) - } - // Secondly, check for EPOLLOUT before EPOLLIN, the former has a higher priority - // than the latter regardless of the aliveness of the current connection: - // - // 1. When the connection is alive and the system is overloaded, we want to - // offload the incoming traffic by writing all pending data back to the remotes - // before continuing to read and handle requests. - // 2. When the connection is dead, we need to try writing any pending data back - // to the remote and close the connection first. - // - // We perform eventloop.write for EPOLLOUT because it will take good care of either case. - if ev&(unix.EPOLLOUT|unix.EPOLLERR) != 0 { - if err := el.write(c); err != nil { - return err - } } - // Check for EPOLLIN before EPOLLRDHUP in case that there are pending data in - // the socket buffer. - if ev&(unix.EPOLLIN|unix.EPOLLERR) != 0 { - if err := el.read(c); err != nil { - return err - } - } - // Ultimately, check for EPOLLRDHUP, this event indicates that the remote has - // either closed connection or shut down the writing half of the connection. - if ev&unix.EPOLLRDHUP != 0 && c.opened { - if ev&unix.EPOLLIN == 0 { // unreadable EPOLLRDHUP, close the connection directly - return el.close(c, io.EOF) - } - // Received the event of EPOLLIN | EPOLLRDHUP, but the previous eventloop.read - // failed to drain the socket buffer, so we make sure we get it done this time. - c.isEOF = true - return el.read(c) - } - return nil + return c.processIO(fd, ev, flags) }) - - if err == errors.ErrEngineShutdown { + if errors.Is(err, errorx.ErrEngineShutdown) { el.getLogger().Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err) err = nil } else if err != nil { diff --git a/reactor_epoll_ultimate.go b/reactor_epoll_ultimate.go index 49e1477ba..2fd3dd693 100644 --- a/reactor_epoll_ultimate.go +++ b/reactor_epoll_ultimate.go @@ -18,9 +18,10 @@ package gnet import ( + "errors" "runtime" - "github.com/panjf2000/gnet/v2/pkg/errors" + errorx "github.com/panjf2000/gnet/v2/pkg/errors" ) func (el *eventloop) rotate() error { @@ -30,7 +31,7 @@ func (el *eventloop) rotate() error { } err := el.poller.Polling() - if err == errors.ErrEngineShutdown { + if errors.Is(err, errorx.ErrEngineShutdown) { el.getLogger().Debugf("main reactor is exiting in terms of the demand from user, %v", err) err = nil } else if err != nil { @@ -49,7 +50,7 @@ func (el *eventloop) orbit() error { } err := el.poller.Polling() - if err == errors.ErrEngineShutdown { + if errors.Is(err, errorx.ErrEngineShutdown) { el.getLogger().Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err) err = nil } else if err != nil { @@ -69,7 +70,7 @@ func (el *eventloop) run() error { } err := el.poller.Polling() - if err == errors.ErrEngineShutdown { + if errors.Is(err, errorx.ErrEngineShutdown) { el.getLogger().Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err) err = nil } else if err != nil { diff --git a/reactor_kqueue_default.go b/reactor_kqueue_default.go index 19ca75602..7426c74e0 100644 --- a/reactor_kqueue_default.go +++ b/reactor_kqueue_default.go @@ -19,13 +19,11 @@ package gnet import ( - "io" + "errors" "runtime" - "golang.org/x/sys/unix" - "github.com/panjf2000/gnet/v2/internal/netpoll" - "github.com/panjf2000/gnet/v2/pkg/errors" + errorx "github.com/panjf2000/gnet/v2/pkg/errors" ) func (el *eventloop) rotate() error { @@ -35,7 +33,7 @@ func (el *eventloop) rotate() error { } err := el.poller.Polling(el.accept0) - if err == errors.ErrEngineShutdown { + if errors.Is(err, errorx.ErrEngineShutdown) { el.getLogger().Debugf("main reactor is exiting in terms of the demand from user, %v", err) err = nil } else if err != nil { @@ -53,46 +51,18 @@ func (el *eventloop) orbit() error { defer runtime.UnlockOSThread() } - err := el.poller.Polling(func(fd int, filter netpoll.IOEvent, flags netpoll.IOFlags) (err error) { + err := el.poller.Polling(func(fd int, filter netpoll.IOEvent, flags netpoll.IOFlags) error { c := el.connections.getConn(fd) if c == nil { // This might happen when the connection has already been closed, // the file descriptor will be deleted from kqueue automatically // as documented in the manual pages, So we just print a warning log. el.getLogger().Warnf("received event[fd=%d|filter=%d|flags=%d] of a stale connection from event-loop(%d)", fd, filter, flags, el.idx) - return - } - - switch filter { - case unix.EVFILT_READ: - err = el.read(c) - case unix.EVFILT_WRITE: - err = el.write(c) - } - // EV_EOF indicates that the remote has closed the connection. - // We check for EV_EOF after processing the read/write event - // to ensure that nothing is left out on this event filter. - if flags&unix.EV_EOF != 0 && c.opened && err == nil { - switch filter { - case unix.EVFILT_READ: - // Receive the event of EVFILT_READ | EV_EOF, but the previous eventloop.read - // failed to drain the socket buffer, so we make sure we get it done this time. - c.isEOF = true - err = el.read(c) - case unix.EVFILT_WRITE: - // On macOS, the kqueue in both LT and ET mode will notify with one event for the EOF - // of the TCP remote: EVFILT_READ|EV_ADD|EV_CLEAR|EV_EOF. But for some reason, two - // events will be issued in ET mode for the EOF of the Unix remote in this order: - // 1) EVFILT_WRITE|EV_ADD|EV_CLEAR|EV_EOF, 2) EVFILT_READ|EV_ADD|EV_CLEAR|EV_EOF. - err = el.write(c) - default: - c.outboundBuffer.Release() // don't bother to write to a connection with some unknown error - err = el.close(c, io.EOF) - } + return nil } - return + return c.processIO(fd, filter, flags) }) - if err == errors.ErrEngineShutdown { + if errors.Is(err, errorx.ErrEngineShutdown) { el.getLogger().Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err) err = nil } else if err != nil { @@ -111,7 +81,7 @@ func (el *eventloop) run() error { defer runtime.UnlockOSThread() } - err := el.poller.Polling(func(fd int, filter netpoll.IOEvent, flags netpoll.IOFlags) (err error) { + err := el.poller.Polling(func(fd int, filter netpoll.IOEvent, flags netpoll.IOFlags) error { c := el.connections.getConn(fd) if c == nil { if _, ok := el.listeners[fd]; ok { @@ -121,39 +91,11 @@ func (el *eventloop) run() error { // the file descriptor will be deleted from kqueue automatically // as documented in the manual pages, So we just print a warning log. el.getLogger().Warnf("received event[fd=%d|filter=%d|flags=%d] of a stale connection from event-loop(%d)", fd, filter, flags, el.idx) - return - } - - switch filter { - case unix.EVFILT_READ: - err = el.read(c) - case unix.EVFILT_WRITE: - err = el.write(c) - } - // EV_EOF indicates that the remote has closed the connection. - // We check for EV_EOF after processing the read/write event - // to ensure that nothing is left out on this event filter. - if flags&unix.EV_EOF != 0 && c.opened && err == nil { - switch filter { - case unix.EVFILT_READ: - // Receive the event of EVFILT_READ | EV_EOF, but the previous eventloop.read - // failed to drain the socket buffer, so we make sure we get it done this time. - c.isEOF = true - err = el.read(c) - case unix.EVFILT_WRITE: - // On macOS, the kqueue in both LT and ET mode will notify with one event for the EOF - // of the TCP remote: EVFILT_READ|EV_ADD|EV_CLEAR|EV_EOF. But for some reason, two - // events will be issued in ET mode for the EOF of the Unix remote in this order: - // 1) EVFILT_WRITE|EV_ADD|EV_CLEAR|EV_EOF, 2) EVFILT_READ|EV_ADD|EV_CLEAR|EV_EOF. - err = el.write(c) - default: - c.outboundBuffer.Release() // don't bother to write to a connection with some unknown error - err = el.close(c, io.EOF) - } + return nil } - return + return c.processIO(fd, filter, flags) }) - if err == errors.ErrEngineShutdown { + if errors.Is(err, errorx.ErrEngineShutdown) { el.getLogger().Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err) err = nil } else if err != nil { diff --git a/reactor_kqueue_ultimate.go b/reactor_kqueue_ultimate.go index e09c296df..93b998625 100644 --- a/reactor_kqueue_ultimate.go +++ b/reactor_kqueue_ultimate.go @@ -19,9 +19,10 @@ package gnet import ( + "errors" "runtime" - "github.com/panjf2000/gnet/v2/pkg/errors" + errorx "github.com/panjf2000/gnet/v2/pkg/errors" ) func (el *eventloop) rotate() error { @@ -31,7 +32,7 @@ func (el *eventloop) rotate() error { } err := el.poller.Polling() - if err == errors.ErrEngineShutdown { + if errors.Is(err, errorx.ErrEngineShutdown) { el.getLogger().Debugf("main reactor is exiting in terms of the demand from user, %v", err) err = nil } else if err != nil { @@ -50,7 +51,7 @@ func (el *eventloop) orbit() error { } err := el.poller.Polling() - if err == errors.ErrEngineShutdown { + if errors.Is(err, errorx.ErrEngineShutdown) { el.getLogger().Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err) err = nil } else if err != nil { @@ -70,7 +71,7 @@ func (el *eventloop) run() error { } err := el.poller.Polling() - if err == errors.ErrEngineShutdown { + if errors.Is(err, errorx.ErrEngineShutdown) { el.getLogger().Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err) err = nil } else if err != nil {