Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

patch: v2.6.3 #664

Merged
merged 1 commit into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions acceptor_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"errors"
"net"
"runtime"
"sync/atomic"

errorx "github.com/panjf2000/gnet/v2/pkg/errors"
)
Expand All @@ -36,7 +35,7 @@ func (eng *engine) listenStream(ln net.Listener) (err error) {
tc, e := ln.Accept()
if e != nil {
err = e
if atomic.LoadInt32(&eng.beingShutdown) == 0 {
if !eng.beingShutdown.Load() {
eng.opts.Logger.Errorf("Accept() fails due to error: %v", err)
} else if errors.Is(err, net.ErrClosed) {
err = errors.Join(err, errorx.ErrEngineShutdown)
Expand Down Expand Up @@ -74,7 +73,7 @@ func (eng *engine) ListenUDP(pc net.PacketConn) (err error) {
n, addr, e := pc.ReadFrom(buffer[:])
if e != nil {
err = e
if atomic.LoadInt32(&eng.beingShutdown) == 0 {
if !eng.beingShutdown.Load() {
eng.opts.Logger.Errorf("failed to receive data from UDP fd due to error:%v", err)
} else if errors.Is(err, net.ErrClosed) {
err = errors.Join(err, errorx.ErrEngineShutdown)
Expand Down
30 changes: 13 additions & 17 deletions client_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"errors"
"net"
"strconv"
"sync"
"syscall"

"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -66,20 +65,17 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
return
}

shutdownCtx, shutdown := context.WithCancel(context.Background())
rootCtx, shutdown := context.WithCancel(context.Background())
eg, ctx := errgroup.WithContext(rootCtx)
eng := engine{
listeners: make(map[int]*listener),
opts: options,
turnOff: shutdown,
eventHandler: eh,
workerPool: struct {
concurrency: struct {
*errgroup.Group
shutdownCtx context.Context
shutdown context.CancelFunc
once sync.Once
}{&errgroup.Group{}, shutdownCtx, shutdown, sync.Once{}},
}
if options.Ticker {
eng.ticker.ctx, eng.ticker.cancel = context.WithCancel(context.Background())
ctx context.Context
}{eg, ctx},
}
el := eventloop{
listeners: eng.listeners,
Expand Down Expand Up @@ -124,10 +120,14 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
func (cli *Client) Start() error {
logging.Infof("Starting gnet client with 1 event-loop")
cli.el.eventHandler.OnBoot(Engine{cli.el.engine})
cli.el.engine.workerPool.Go(cli.el.run)
cli.el.engine.concurrency.Go(cli.el.run)
// Start the ticker.
if cli.opts.Ticker {
go cli.el.ticker(cli.el.engine.ticker.ctx)
ctx := cli.el.engine.concurrency.ctx
cli.el.engine.concurrency.Go(func() error {
cli.el.ticker(ctx)
return nil
})
}
logging.Debugf("default logging level is %s", logging.LogLevel())
return nil
Expand All @@ -136,11 +136,7 @@ func (cli *Client) Start() error {
// Stop stops the client event-loop.
func (cli *Client) Stop() (err error) {
logging.Error(cli.el.poller.Trigger(queue.HighPriority, func(_ any) error { return errorx.ErrEngineShutdown }, nil))
// Stop the ticker.
if cli.opts.Ticker {
cli.el.engine.ticker.cancel()
}
_ = cli.el.engine.workerPool.Wait()
err = cli.el.engine.concurrency.Wait()
logging.Error(cli.el.poller.Close())
cli.el.eventHandler.OnShutdown(Engine{cli.el.engine})
logging.Cleanup()
Expand Down
31 changes: 14 additions & 17 deletions client_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,17 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
}
logging.SetDefaultLoggerAndFlusher(logger, logFlusher)

shutdownCtx, shutdown := context.WithCancel(context.Background())
rootCtx, shutdown := context.WithCancel(context.Background())
eg, ctx := errgroup.WithContext(rootCtx)
eng := &engine{
listeners: []*listener{},
opts: options,
workerPool: struct {
*errgroup.Group
shutdownCtx context.Context
shutdown context.CancelFunc
once sync.Once
}{&errgroup.Group{}, shutdownCtx, shutdown, sync.Once{}},
listeners: []*listener{},
opts: options,
turnOff: shutdown,
eventHandler: eh,
concurrency: struct {
*errgroup.Group
ctx context.Context
}{eg, ctx},
}
cli.el = &eventloop{
ch: make(chan any, 1024),
Expand All @@ -71,11 +71,11 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {

func (cli *Client) Start() error {
cli.el.eventHandler.OnBoot(Engine{cli.el.eng})
cli.el.eng.workerPool.Go(cli.el.run)
cli.el.eng.concurrency.Go(cli.el.run)
if cli.opts.Ticker {
cli.el.eng.ticker.ctx, cli.el.eng.ticker.cancel = context.WithCancel(context.Background())
cli.el.eng.workerPool.Go(func() error {
cli.el.ticker(cli.el.eng.ticker.ctx)
ctx := cli.el.eng.concurrency.ctx
cli.el.eng.concurrency.Go(func() error {
cli.el.ticker(ctx)
return nil
})
}
Expand All @@ -85,10 +85,7 @@ func (cli *Client) Start() error {

func (cli *Client) Stop() (err error) {
cli.el.ch <- errorx.ErrEngineShutdown
if cli.opts.Ticker {
cli.el.eng.ticker.cancel()
}
_ = cli.el.eng.workerPool.Wait()
err = cli.el.eng.concurrency.Wait()
cli.el.eventHandler.OnShutdown(Engine{cli.el.eng})
logging.Cleanup()
return
Expand Down
97 changes: 40 additions & 57 deletions engine_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"errors"
"runtime"
"strings"
"sync"
"sync/atomic"

"golang.org/x/sync/errgroup"
Expand All @@ -35,27 +34,22 @@ import (
)

type engine struct {
listeners map[int]*listener // listeners for accepting incoming connections
opts *Options // options with engine
ingress *eventloop // main event-loop that monitors all listeners
eventLoops loadBalancer // event-loops for handling events
inShutdown int32 // whether the engine is in shutdown
ticker struct {
ctx context.Context // context for ticker
cancel context.CancelFunc // function to stop the ticker
}
workerPool struct {
listeners map[int]*listener // listeners for accepting incoming connections
opts *Options // options with engine
ingress *eventloop // main event-loop that monitors all listeners
eventLoops loadBalancer // event-loops for handling events
inShutdown atomic.Bool // whether the engine is in shutdown
turnOff context.CancelFunc
eventHandler EventHandler // user eventHandler
concurrency struct {
*errgroup.Group

shutdownCtx context.Context
shutdown context.CancelFunc
once sync.Once
ctx context.Context
}
eventHandler EventHandler // user eventHandler
}

func (eng *engine) isInShutdown() bool {
return atomic.LoadInt32(&eng.inShutdown) == 1
func (eng *engine) isShutdown() bool {
return eng.inShutdown.Load()
}

// shutdown signals the engine to shut down.
Expand All @@ -64,9 +58,7 @@ func (eng *engine) shutdown(err error) {
eng.opts.Logger.Errorf("engine is being shutdown with error: %v", err)
}

eng.workerPool.once.Do(func() {
eng.workerPool.shutdown()
})
eng.turnOff()
}

func (eng *engine) closeEventLoops() {
Expand All @@ -88,7 +80,7 @@ func (eng *engine) closeEventLoops() {
}
}

func (eng *engine) runEventLoops(numEventLoop int) error {
func (eng *engine) runEventLoops(ctx context.Context, numEventLoop int) error {
var el0 *eventloop
lns := eng.listeners
// Create loops locally and bind the listeners.
Expand Down Expand Up @@ -129,21 +121,21 @@ func (eng *engine) runEventLoops(numEventLoop int) error {

// Start event-loops in background.
eng.eventLoops.iterate(func(_ int, el *eventloop) bool {
eng.workerPool.Go(el.run)
eng.concurrency.Go(el.run)
return true
})

if el0 != nil {
eng.workerPool.Go(func() error {
el0.ticker(eng.ticker.ctx)
eng.concurrency.Go(func() error {
el0.ticker(ctx)
return nil
})
}

return nil
}

func (eng *engine) activateReactors(numEventLoop int) error {
func (eng *engine) activateReactors(ctx context.Context, numEventLoop int) error {
for i := 0; i < numEventLoop; i++ {
p, err := netpoll.OpenPoller()
if err != nil {
Expand All @@ -161,7 +153,7 @@ func (eng *engine) activateReactors(numEventLoop int) error {

// Start sub reactors in background.
eng.eventLoops.iterate(func(_ int, el *eventloop) bool {
eng.workerPool.Go(el.orbit)
eng.concurrency.Go(el.orbit)
return true
})

Expand All @@ -183,30 +175,30 @@ func (eng *engine) activateReactors(numEventLoop int) error {
eng.ingress = el

// Start main reactor in background.
eng.workerPool.Go(el.rotate)
eng.concurrency.Go(el.rotate)

// Start the ticker.
if eng.opts.Ticker {
eng.workerPool.Go(func() error {
eng.ingress.ticker(eng.ticker.ctx)
eng.concurrency.Go(func() error {
eng.ingress.ticker(ctx)
return nil
})
}

return nil
}

func (eng *engine) start(numEventLoop int) error {
func (eng *engine) start(ctx context.Context, numEventLoop int) error {
if eng.opts.ReusePort {
return eng.runEventLoops(numEventLoop)
return eng.runEventLoops(ctx, numEventLoop)
}

return eng.activateReactors(numEventLoop)
return eng.activateReactors(ctx, numEventLoop)
}

func (eng *engine) stop(s Engine) {
func (eng *engine) stop(ctx context.Context, s Engine) {
// Wait on a signal for shutdown
<-eng.workerPool.shutdownCtx.Done()
<-ctx.Done()

eng.eventHandler.OnShutdown(s)

Expand All @@ -225,20 +217,15 @@ func (eng *engine) stop(s Engine) {
}
}

// Stop the ticker.
if eng.ticker.cancel != nil {
eng.ticker.cancel()
}

if err := eng.workerPool.Wait(); err != nil {
if err := eng.concurrency.Wait(); err != nil {
eng.opts.Logger.Errorf("engine shutdown error: %v", err)
}

// Close all listeners and pollers of event-loops.
eng.closeEventLoops()

// Put the engine into the shutdown state.
atomic.StoreInt32(&eng.inShutdown, 1)
eng.inShutdown.Store(true)
}

func run(eventHandler EventHandler, listeners []*listener, options *Options, addrs []string) error {
Expand All @@ -261,17 +248,17 @@ func run(eventHandler EventHandler, listeners []*listener, options *Options, add
for _, ln := range listeners {
lns[ln.fd] = ln
}
shutdownCtx, shutdown := context.WithCancel(context.Background())
rootCtx, shutdown := context.WithCancel(context.Background())
eg, ctx := errgroup.WithContext(rootCtx)
eng := engine{
listeners: lns,
opts: options,
workerPool: struct {
*errgroup.Group
shutdownCtx context.Context
shutdown context.CancelFunc
once sync.Once
}{&errgroup.Group{}, shutdownCtx, shutdown, sync.Once{}},
listeners: lns,
opts: options,
turnOff: shutdown,
eventHandler: eventHandler,
concurrency: struct {
*errgroup.Group
ctx context.Context
}{eg, ctx},
}
switch options.LB {
case RoundRobin:
Expand All @@ -282,23 +269,19 @@ func run(eventHandler EventHandler, listeners []*listener, options *Options, add
eng.eventLoops = new(sourceAddrHashLoadBalancer)
}

if eng.opts.Ticker {
eng.ticker.ctx, eng.ticker.cancel = context.WithCancel(context.Background())
}

e := Engine{&eng}
switch eng.eventHandler.OnBoot(e) {
case None:
case None, Close:
case Shutdown:
return nil
}

if err := eng.start(numEventLoop); err != nil {
if err := eng.start(ctx, numEventLoop); err != nil {
eng.closeEventLoops()
eng.opts.Logger.Errorf("gnet engine is stopping with error: %v", err)
return err
}
defer eng.stop(e)
defer eng.stop(rootCtx, e)

for _, addr := range addrs {
allEngines.Store(addr, &eng)
Expand Down
Loading
Loading