Skip to content

Commit

Permalink
opt: reduce duplicate code of I/O processing (#587)
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 authored Apr 22, 2024
1 parent 39c175b commit 3594d22
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 202 deletions.
15 changes: 2 additions & 13 deletions acceptor_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (el *eventloop) accept0(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error
nfd, sa, err := socket.Accept(fd)
if err != nil {
switch err {
case unix.EAGAIN: // the Accept queue has been drained, we can return now
case unix.EAGAIN: // the Accept queue has been drained out, we can return now
return nil
case unix.EINTR, unix.ECONNRESET, unix.ECONNABORTED:
// ECONNRESET or ECONNABORTED could indicate that a socket
Expand Down Expand Up @@ -93,16 +93,5 @@ func (el *eventloop) accept(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) e
}

c := newTCPConn(nfd, el, sa, el.listeners[fd].addr, remoteAddr)
addEvents := el.poller.AddRead
if el.engine.opts.EdgeTriggeredIO {
addEvents = el.poller.AddReadWrite
}
if err = addEvents(&c.pollAttachment, el.engine.opts.EdgeTriggeredIO); err != nil {
el.getLogger().Errorf("failed to register the accepted socket fd=%d to poller: %v", c.fd, err)
_ = unix.Close(c.fd)
c.release()
return err
}
el.connections.addConn(c, el.idx)
return el.open(c)
return el.register0(c)
}
8 changes: 4 additions & 4 deletions connection_bsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ func (c *conn) processIO(_ int, filter netpoll.IOEvent, flags netpoll.IOFlags) (
if flags&unix.EV_EOF != 0 && c.opened && err == nil {
switch filter {
case unix.EVFILT_READ:
// Receive the event of EVFILT_READ | EV_EOF, but the previous eventloop.read
// Received the event of EVFILT_READ|EV_EOF, but the previous eventloop.read
// failed to drain the socket buffer, so we make sure we get it done this time.
c.isEOF = true
err = el.read(c)
case unix.EVFILT_WRITE:
// On macOS, the kqueue in both LT and ET mode will notify with one event for the EOF
// of the TCP remote: EVFILT_READ|EV_ADD|EV_CLEAR|EV_EOF. But for some reason, two
// events will be issued in ET mode for the EOF of the Unix remote in this order:
// On macOS, the kqueue in either LT or ET mode will notify with one event for the
// EOF of the TCP remote: EVFILT_READ|EV_ADD|EV_CLEAR|EV_EOF. But for some reason,
// two events will be issued in ET mode for the EOF of the Unix remote in this order:
// 1) EVFILT_WRITE|EV_ADD|EV_CLEAR|EV_EOF, 2) EVFILT_READ|EV_ADD|EV_CLEAR|EV_EOF.
err = el.write(c)
default:
Expand Down
10 changes: 5 additions & 5 deletions connection_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
func (c *conn) processIO(_ int, ev netpoll.IOEvent, _ netpoll.IOFlags) error {
el := c.loop
// First check for any unexpected non-IO events.
// For these events we just close the corresponding connection directly.
// For these events we just close the connection directly.
if ev&netpoll.ErrEvents != 0 && ev&unix.EPOLLIN == 0 && ev&unix.EPOLLOUT == 0 {
c.outboundBuffer.Release() // don't bother to write to a connection with some unknown error
return el.close(c, io.EOF)
Expand All @@ -40,9 +40,9 @@ func (c *conn) processIO(_ int, ev netpoll.IOEvent, _ netpoll.IOFlags) error {
// offload the incoming traffic by writing all pending data back to the remotes
// before continuing to read and handle requests.
// 2. When the connection is dead, we need to try writing any pending data back
// to the remote and close the connection first.
// to the remote first and then close the connection.
//
// We perform eventloop.write for EPOLLOUT because it will take good care of either case.
// We perform eventloop.write for EPOLLOUT because it can take good care of either case.
if ev&(unix.EPOLLOUT|unix.EPOLLERR) != 0 {
if err := el.write(c); err != nil {
return err
Expand All @@ -61,8 +61,8 @@ func (c *conn) processIO(_ int, ev netpoll.IOEvent, _ netpoll.IOFlags) error {
if ev&unix.EPOLLIN == 0 { // unreadable EPOLLRDHUP, close the connection directly
return el.close(c, io.EOF)
}
// Received the event of EPOLLIN | EPOLLRDHUP, but the previous eventloop.read
// failed to drain the socket buffer, so we make sure we get it done this time.
// Received the event of EPOLLIN|EPOLLRDHUP, but the previous eventloop.read
// failed to drain the socket buffer, so we ensure to get it done this time.
c.isEOF = true
return el.read(c)
}
Expand Down
6 changes: 3 additions & 3 deletions eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,20 +75,20 @@ func (el *eventloop) register(itf interface{}) error {
c = ccb.c
defer ccb.cb()
}
return el.register0(c)
}

func (el *eventloop) register0(c *conn) error {
addEvents := el.poller.AddRead
if el.engine.opts.EdgeTriggeredIO {
addEvents = el.poller.AddReadWrite
}

if err := addEvents(&c.pollAttachment, el.engine.opts.EdgeTriggeredIO); err != nil {
_ = unix.Close(c.fd)
c.release()
return err
}

el.connections.addConn(c, el.idx)

if c.isDatagram && c.remote != nil {
return nil
}
Expand Down
13 changes: 7 additions & 6 deletions eventloop_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ package gnet
import (
"bytes"
"context"
"errors"
"runtime"
"strings"
"sync/atomic"
"time"

"github.com/panjf2000/gnet/v2/pkg/errors"
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
"github.com/panjf2000/gnet/v2/pkg/logging"
)

Expand Down Expand Up @@ -79,7 +80,7 @@ func (el *eventloop) run() (err error) {
err = v()
}

if err == errors.ErrEngineShutdown {
if errors.Is(err, errorx.ErrEngineShutdown) {
el.getLogger().Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err)
break
} else if err != nil {
Expand Down Expand Up @@ -121,7 +122,7 @@ func (el *eventloop) read(c *conn) error {
case Close:
return el.close(c, nil)
case Shutdown:
return errors.ErrEngineShutdown
return errorx.ErrEngineShutdown
}
_, _ = c.inboundBuffer.Write(c.buffer.B)
c.buffer.Reset()
Expand All @@ -132,7 +133,7 @@ func (el *eventloop) read(c *conn) error {
func (el *eventloop) readUDP(c *conn) error {
action := el.eventHandler.OnTraffic(c)
if action == Shutdown {
return errors.ErrEngineShutdown
return errorx.ErrEngineShutdown
}
c.release()
return nil
Expand Down Expand Up @@ -160,7 +161,7 @@ func (el *eventloop) ticker(ctx context.Context) {
case Shutdown:
if !shutdown {
shutdown = true
el.ch <- errors.ErrEngineShutdown
el.ch <- errorx.ErrEngineShutdown
el.getLogger().Debugf("stopping ticker in event-loop(%d) from Tick()", el.idx)
}
}
Expand Down Expand Up @@ -220,7 +221,7 @@ func (el *eventloop) handleAction(c *conn, action Action) error {
case Close:
return el.close(c, nil)
case Shutdown:
return errors.ErrEngineShutdown
return errorx.ErrEngineShutdown
default:
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion gnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1484,7 +1484,7 @@ func (s *simServer) OnTraffic(c Conn) (action Action) {
var packets [][]byte
for {
data, err := codec.Decode(c)
if err == errIncompletePacket {
if errors.Is(err, errIncompletePacket) {
break
}
if err != nil {
Expand Down
101 changes: 8 additions & 93 deletions reactor_epoll_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@
package gnet

import (
"io"
"errors"
"runtime"

"golang.org/x/sys/unix"

"github.com/panjf2000/gnet/v2/internal/netpoll"
"github.com/panjf2000/gnet/v2/pkg/errors"
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
)

func (el *eventloop) rotate() error {
Expand All @@ -34,7 +32,7 @@ func (el *eventloop) rotate() error {
}

err := el.poller.Polling(el.accept0)
if err == errors.ErrEngineShutdown {
if errors.Is(err, errorx.ErrEngineShutdown) {
el.getLogger().Debugf("main reactor is exiting in terms of the demand from user, %v", err)
err = nil
} else if err != nil {
Expand All @@ -52,57 +50,16 @@ func (el *eventloop) orbit() error {
defer runtime.UnlockOSThread()
}

err := el.poller.Polling(func(fd int, ev netpoll.IOEvent, _ netpoll.IOFlags) error {
err := el.poller.Polling(func(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) error {
c := el.connections.getConn(fd)
if c == nil {
// Somehow epoll notified with an event for a stale fd that is not in our connection set.
// We need to delete it from the epoll set.
return el.poller.Delete(fd)
}

// First check for any unexpected non-IO events.
// For these events we just close the corresponding connection directly.
if ev&netpoll.ErrEvents != 0 && ev&unix.EPOLLIN == 0 && ev&unix.EPOLLOUT == 0 {
c.outboundBuffer.Release() // don't bother to write to a connection with some unknown error
return el.close(c, io.EOF)
}
// Secondly, check for EPOLLOUT before EPOLLIN, the former has a higher priority
// than the latter regardless of the aliveness of the current connection:
//
// 1. When the connection is alive and the system is overloaded, we want to
// offload the incoming traffic by writing all pending data back to the remotes
// before continuing to read and handle requests.
// 2. When the connection is dead, we need to try writing any pending data back
// to the remote and close the connection first.
//
// We perform eventloop.write for EPOLLOUT because it will take good care of either case.
if ev&(unix.EPOLLOUT|unix.EPOLLERR) != 0 {
if err := el.write(c); err != nil {
return err
}
}
// Check for EPOLLIN before EPOLLRDHUP in case that there are pending data in
// the socket buffer.
if ev&(unix.EPOLLIN|unix.EPOLLERR) != 0 {
if err := el.read(c); err != nil {
return err
}
}
// Ultimately, check for EPOLLRDHUP, this event indicates that the remote has
// either closed connection or shut down the writing half of the connection.
if ev&unix.EPOLLRDHUP != 0 && c.opened {
if ev&unix.EPOLLIN == 0 { // unreadable EPOLLRDHUP, close the connection directly
return el.close(c, io.EOF)
}
// Received the event of EPOLLIN | EPOLLRDHUP, but the previous eventloop.read
// failed to drain the socket buffer, so we make sure we get it done this time.
c.isEOF = true
return el.read(c)
}
return nil
return c.processIO(fd, ev, flags)
})

if err == errors.ErrEngineShutdown {
if errors.Is(err, errorx.ErrEngineShutdown) {
el.getLogger().Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err)
err = nil
} else if err != nil {
Expand Down Expand Up @@ -130,52 +87,10 @@ func (el *eventloop) run() error {
// Somehow epoll notified with an event for a stale fd that is not in our connection set.
// We need to delete it from the epoll set.
return el.poller.Delete(fd)

}

// First check for any unexpected non-IO events.
// For these events we just close the corresponding connection directly.
if ev&netpoll.ErrEvents != 0 && ev&unix.EPOLLIN == 0 && ev&unix.EPOLLOUT == 0 {
c.outboundBuffer.Release() // don't bother to write to a connection with some unknown error
return el.close(c, io.EOF)
}
// Secondly, check for EPOLLOUT before EPOLLIN, the former has a higher priority
// than the latter regardless of the aliveness of the current connection:
//
// 1. When the connection is alive and the system is overloaded, we want to
// offload the incoming traffic by writing all pending data back to the remotes
// before continuing to read and handle requests.
// 2. When the connection is dead, we need to try writing any pending data back
// to the remote and close the connection first.
//
// We perform eventloop.write for EPOLLOUT because it will take good care of either case.
if ev&(unix.EPOLLOUT|unix.EPOLLERR) != 0 {
if err := el.write(c); err != nil {
return err
}
}
// Check for EPOLLIN before EPOLLRDHUP in case that there are pending data in
// the socket buffer.
if ev&(unix.EPOLLIN|unix.EPOLLERR) != 0 {
if err := el.read(c); err != nil {
return err
}
}
// Ultimately, check for EPOLLRDHUP, this event indicates that the remote has
// either closed connection or shut down the writing half of the connection.
if ev&unix.EPOLLRDHUP != 0 && c.opened {
if ev&unix.EPOLLIN == 0 { // unreadable EPOLLRDHUP, close the connection directly
return el.close(c, io.EOF)
}
// Received the event of EPOLLIN | EPOLLRDHUP, but the previous eventloop.read
// failed to drain the socket buffer, so we make sure we get it done this time.
c.isEOF = true
return el.read(c)
}
return nil
return c.processIO(fd, ev, flags)
})

if err == errors.ErrEngineShutdown {
if errors.Is(err, errorx.ErrEngineShutdown) {
el.getLogger().Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err)
err = nil
} else if err != nil {
Expand Down
9 changes: 5 additions & 4 deletions reactor_epoll_ultimate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
package gnet

import (
"errors"
"runtime"

"github.com/panjf2000/gnet/v2/pkg/errors"
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
)

func (el *eventloop) rotate() error {
Expand All @@ -30,7 +31,7 @@ func (el *eventloop) rotate() error {
}

err := el.poller.Polling()
if err == errors.ErrEngineShutdown {
if errors.Is(err, errorx.ErrEngineShutdown) {
el.getLogger().Debugf("main reactor is exiting in terms of the demand from user, %v", err)
err = nil
} else if err != nil {
Expand All @@ -49,7 +50,7 @@ func (el *eventloop) orbit() error {
}

err := el.poller.Polling()
if err == errors.ErrEngineShutdown {
if errors.Is(err, errorx.ErrEngineShutdown) {
el.getLogger().Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err)
err = nil
} else if err != nil {
Expand All @@ -69,7 +70,7 @@ func (el *eventloop) run() error {
}

err := el.poller.Polling()
if err == errors.ErrEngineShutdown {
if errors.Is(err, errorx.ErrEngineShutdown) {
el.getLogger().Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err)
err = nil
} else if err != nil {
Expand Down
Loading

0 comments on commit 3594d22

Please sign in to comment.