diff --git a/client_windows.go b/client_windows.go index 33e0566ff..d9cbbde4b 100644 --- a/client_windows.go +++ b/client_windows.go @@ -205,7 +205,7 @@ func (cli *Client) EnrollContext(nc net.Conn, ctx interface{}) (gc Conn, err err c := newUDPConn(cli.el, nil, nc.LocalAddr(), nc.RemoteAddr()) c.SetContext(ctx) c.rawConn = nc - cli.el.ch <- &openConn{c: c, isDatagram: true, cb: func() { close(connOpened) }} + cli.el.ch <- &openConn{c: c, cb: func() { close(connOpened) }} go func(uc net.Conn, el *eventloop) { var buffer [0x10000]byte for { diff --git a/connection_windows.go b/connection_windows.go index e46498f9d..efdfc22c9 100644 --- a/connection_windows.go +++ b/connection_windows.go @@ -45,7 +45,6 @@ type udpConn struct { type openConn struct { c *conn cb func() - isDatagram bool } type conn struct { diff --git a/eventloop_unix.go b/eventloop_unix.go index 617cf9162..97475302c 100644 --- a/eventloop_unix.go +++ b/eventloop_unix.go @@ -226,28 +226,13 @@ loop: return nil } -func (el *eventloop) close(c *conn, err error) (rerr error) { - if addr := c.localAddr; addr != nil && strings.HasPrefix(c.localAddr.Network(), "udp") { - rerr = el.poller.Delete(c.fd) - if _, ok := el.listeners[c.fd]; !ok { - rerr = unix.Close(c.fd) - el.connections.delConn(c) - } - if el.eventHandler.OnClose(c, err) == Shutdown { - return errorx.ErrEngineShutdown - } - c.release() - return - } - +func (el *eventloop) close(c *conn, err error) error { if !c.opened || el.connections.getConn(c.fd) == nil { - return // ignore stale connections + return nil // ignore stale connections } el.connections.delConn(c) - if el.eventHandler.OnClose(c, err) == Shutdown { - rerr = errorx.ErrEngineShutdown - } + action := el.eventHandler.OnClose(c, err) // Send residual data in buffer back to the remote before actually closing the connection. for !c.outboundBuffer.IsEmpty() { @@ -262,8 +247,10 @@ func (el *eventloop) close(c *conn, err error) (rerr error) { } } - err0, err1 := el.poller.Delete(c.fd), unix.Close(c.fd) + c.release() + var errStr strings.Builder + err0, err1 := el.poller.Delete(c.fd), unix.Close(c.fd) if err0 != nil { err0 = fmt.Errorf("failed to delete fd=%d from poller in event-loop(%d): %v", c.fd, el.idx, os.NewSyscallError("delete", err0)) @@ -276,16 +263,10 @@ func (el *eventloop) close(c *conn, err error) (rerr error) { errStr.WriteString(err1.Error()) } if errStr.Len() > 0 { - if rerr != nil { - el.getLogger().Errorf(strings.TrimSuffix(errStr.String(), " | ")) - } else { - rerr = errors.New(strings.TrimSuffix(errStr.String(), " | ")) - } + return errors.New(strings.TrimSuffix(errStr.String(), " | ")) } - c.release() - - return + return el.handleAction(c, action) } func (el *eventloop) wake(c *conn) error { @@ -333,19 +314,6 @@ func (el *eventloop) ticker(ctx context.Context) { } } -func (el *eventloop) handleAction(c *conn, action Action) error { - switch action { - case None: - return nil - case Close: - return el.close(c, nil) - case Shutdown: - return errorx.ErrEngineShutdown - default: - return nil - } -} - func (el *eventloop) readUDP(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error { n, sa, err := unix.Recvfrom(fd, el.buffer, 0) if err != nil { @@ -372,6 +340,19 @@ func (el *eventloop) readUDP(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error return nil } +func (el *eventloop) handleAction(c *conn, action Action) error { + switch action { + case None: + return nil + case Close: + return el.close(c, nil) + case Shutdown: + return errorx.ErrEngineShutdown + default: + return nil + } +} + /* func (el *eventloop) execCmd(itf interface{}) (err error) { cmd := itf.(*asyncCmd) diff --git a/eventloop_windows.go b/eventloop_windows.go index ea0f87377..565f3ef30 100644 --- a/eventloop_windows.go +++ b/eventloop_windows.go @@ -18,8 +18,8 @@ import ( "bytes" "context" "errors" + "fmt" "runtime" - "strings" "sync/atomic" "time" @@ -97,10 +97,8 @@ func (el *eventloop) open(oc *openConn) error { } c := oc.c - if !oc.isDatagram { - el.connections[c] = struct{}{} - el.incConn(1) - } + el.connections[c] = struct{}{} + el.incConn(1) out, action := el.eventHandler.OnOpen(c) if out != nil { @@ -188,28 +186,18 @@ func (el *eventloop) wake(c *conn) error { } func (el *eventloop) close(c *conn, err error) error { - if addr := c.localAddr; addr != nil && strings.HasPrefix(addr.Network(), "udp") { - action := el.eventHandler.OnClose(c, err) - if c.rawConn != nil { - if err := c.rawConn.Close(); err != nil { - el.getLogger().Errorf("failed to close connection(%s), error:%v", c.remoteAddr.String(), err) - } - } - c.release() - return el.handleAction(c, action) - } - - if _, ok := el.connections[c]; !ok { + if _, ok := el.connections[c]; c.rawConn == nil || !ok { return nil // ignore stale wakes. } delete(el.connections, c) el.incConn(-1) action := el.eventHandler.OnClose(c, err) - if err := c.rawConn.Close(); err != nil { - el.getLogger().Errorf("failed to close connection(%s), error:%v", c.remoteAddr.String(), err) - } + err = c.rawConn.Close() c.release() + if err != nil { + return fmt.Errorf("failed to close connection=%s in event-loop(%d): %v", c.remoteAddr, el.idx, err) + } return el.handleAction(c, action) }