Skip to content

Commit

Permalink
Merge branch 'panjf2000:dev' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
0-haha authored Jul 5, 2024
2 parents b364de7 + a0d1ed7 commit 98b0001
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 66 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ English | [中文](README_ZH.md)

# 🚀 Features

## 🦖 Milestone

- [x] [High-performance](#-performance) event-driven looping based on a networking model of multiple threads/goroutines
- [x] Built-in goroutine pool powered by the library [ants](https://github.com/panjf2000/ants)
- [x] Lock-free during the entire runtime
Expand All @@ -43,8 +45,12 @@ English | [中文](README_ZH.md)
- [x] Running on `Linux`, `macOS`, `Windows`, and *BSD: `Darwin`/`DragonFlyBSD`/`FreeBSD`/`NetBSD`/`OpenBSD`
- [x] **Edge-triggered** I/O support
- [x] Multiple network addresses binding

## 🕊 Roadmap

- [ ] **TLS** support
- [ ] [io_uring](https://kernel.dk/io_uring.pdf) support
- [ ] [io_uring](https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023) support
- [ ] **KCP** support

***Windows version of `gnet` should only be used in development for developing and testing, it shouldn't be used in production.***

Expand Down
10 changes: 8 additions & 2 deletions README_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

# 🚀 功能

## 🦖 当前支持

- [x] 基于多线程/协程网络模型的[高性能](#-性能测试)事件驱动循环
- [x] 内置 goroutine 池,由开源库 [ants](https://github.com/panjf2000/ants) 提供支持
- [x] 整个生命周期是无锁的
Expand All @@ -43,8 +45,12 @@
- [x] 支持 `Linux`, `macOS`, `Windows`*BSD 操作系统: `Darwin`/`DragonFlyBSD`/`FreeBSD`/`NetBSD`/`OpenBSD`
- [x] **Edge-triggered** I/O 支持
- [x] 多网络地址绑定
- [ ] **TLS** 支持
- [ ] [io_uring](https://kernel.dk/io_uring.pdf) 支持

## 🕊 未来计划

- [ ] 支持 **TLS**
- [ ] 支持 [io_uring](https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023)
- [ ] 支持 **KCP**

***`gnet` 的 Windows 版本应该仅用于开发阶段的开发和测试,切勿用于生产环境***

Expand Down
2 changes: 1 addition & 1 deletion client_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (cli *Client) EnrollContext(nc net.Conn, ctx interface{}) (gc Conn, err err
c := newUDPConn(cli.el, nil, nc.LocalAddr(), nc.RemoteAddr())
c.SetContext(ctx)
c.rawConn = nc
cli.el.ch <- &openConn{c: c, isDatagram: true, cb: func() { close(connOpened) }}
cli.el.ch <- &openConn{c: c, cb: func() { close(connOpened) }}
go func(uc net.Conn, el *eventloop) {
var buffer [0x10000]byte
for {
Expand Down
1 change: 0 additions & 1 deletion connection_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ type udpConn struct {
type openConn struct {
c *conn
cb func()
isDatagram bool
}

type conn struct {
Expand Down
72 changes: 31 additions & 41 deletions eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,24 +301,14 @@ loop:
return nil
}

func (el *eventloop) close(c *conn, err error) (rerr error) {
if addr := c.localAddr; addr != nil && strings.HasPrefix(c.localAddr.Network(), "udp") {
rerr = el.poller.Delete(c.fd)
if _, ok := el.listeners[c.fd]; !ok {
rerr = unix.Close(c.fd)
el.connections.delConn(c)
}
if el.eventHandler.OnClose(c, err) == Shutdown {
return errorx.ErrEngineShutdown
}
c.release()
return
}

func (el *eventloop) close(c *conn, err error) error {
if !c.opened || el.connections.getConn(c.fd) == nil {
return // ignore stale connections
return nil // ignore stale connections
}

el.connections.delConn(c)
action := el.eventHandler.OnClose(c, err)

// close the TLS connection by sending the alert
if c.tlsconn != nil {
// close the TLS connection, which will send a close notify to the client
Expand All @@ -342,26 +332,26 @@ func (el *eventloop) close(c *conn, err error) (rerr error) {
}
}

c.release()

var errStr strings.Builder
err0, err1 := el.poller.Delete(c.fd), unix.Close(c.fd)
if err0 != nil {
rerr = fmt.Errorf("failed to delete fd=%d from poller in event-loop(%d): %v", c.fd, el.idx, err0)
err0 = fmt.Errorf("failed to delete fd=%d from poller in event-loop(%d): %v",
c.fd, el.idx, os.NewSyscallError("delete", err0))
errStr.WriteString(err0.Error())
errStr.WriteString(" | ")
}
if err1 != nil {
err1 = fmt.Errorf("failed to close fd=%d in event-loop(%d): %v", c.fd, el.idx, os.NewSyscallError("close", err1))
if rerr != nil {
rerr = errors.New(rerr.Error() + " & " + err1.Error())
} else {
rerr = err1
}
err1 = fmt.Errorf("failed to close fd=%d in event-loop(%d): %v",
c.fd, el.idx, os.NewSyscallError("close", err1))
errStr.WriteString(err1.Error())
}

el.connections.delConn(c)
if el.eventHandler.OnClose(c, err) == Shutdown {
rerr = errorx.ErrEngineShutdown
if errStr.Len() > 0 {
return errors.New(strings.TrimSuffix(errStr.String(), " | "))
}
c.release()

return
return el.handleAction(c, action)
}

func (el *eventloop) wake(c *conn) error {
Expand Down Expand Up @@ -409,19 +399,6 @@ func (el *eventloop) ticker(ctx context.Context) {
}
}

func (el *eventloop) handleAction(c *conn, action Action) error {
switch action {
case None:
return nil
case Close:
return el.close(c, nil)
case Shutdown:
return errorx.ErrEngineShutdown
default:
return nil
}
}

func (el *eventloop) readUDP(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error {
n, sa, err := unix.Recvfrom(fd, el.buffer, 0)
if err != nil {
Expand All @@ -448,6 +425,19 @@ func (el *eventloop) readUDP(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error
return nil
}

func (el *eventloop) handleAction(c *conn, action Action) error {
switch action {
case None:
return nil
case Close:
return el.close(c, nil)
case Shutdown:
return errorx.ErrEngineShutdown
default:
return nil
}
}

/*
func (el *eventloop) execCmd(itf interface{}) (err error) {
cmd := itf.(*asyncCmd)
Expand Down
28 changes: 8 additions & 20 deletions eventloop_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"bytes"
"context"
"errors"
"fmt"
"runtime"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -97,10 +97,8 @@ func (el *eventloop) open(oc *openConn) error {
}

c := oc.c
if !oc.isDatagram {
el.connections[c] = struct{}{}
el.incConn(1)
}
el.connections[c] = struct{}{}
el.incConn(1)

out, action := el.eventHandler.OnOpen(c)
if out != nil {
Expand Down Expand Up @@ -188,28 +186,18 @@ func (el *eventloop) wake(c *conn) error {
}

func (el *eventloop) close(c *conn, err error) error {
if addr := c.localAddr; addr != nil && strings.HasPrefix(addr.Network(), "udp") {
action := el.eventHandler.OnClose(c, err)
if c.rawConn != nil {
if err := c.rawConn.Close(); err != nil {
el.getLogger().Errorf("failed to close connection(%s), error:%v", c.remoteAddr.String(), err)
}
}
c.release()
return el.handleAction(c, action)
}

if _, ok := el.connections[c]; !ok {
if _, ok := el.connections[c]; c.rawConn == nil || !ok {
return nil // ignore stale wakes.
}

delete(el.connections, c)
el.incConn(-1)
action := el.eventHandler.OnClose(c, err)
if err := c.rawConn.Close(); err != nil {
el.getLogger().Errorf("failed to close connection(%s), error:%v", c.remoteAddr.String(), err)
}
err = c.rawConn.Close()
c.release()
if err != nil {
return fmt.Errorf("failed to close connection=%s in event-loop(%d): %v", c.remoteAddr, el.idx, err)
}

return el.handleAction(c, action)
}
Expand Down

0 comments on commit 98b0001

Please sign in to comment.