From ad73d0508ccaa622f4a1380abdca06b42bcdfc03 Mon Sep 17 00:00:00 2001 From: Jehiah Czebotar Date: Mon, 1 Feb 2021 23:35:11 -0500 Subject: [PATCH] update go-svc v1.2.1; expose context.Context from nsqd --- apps/nsqadmin/main.go | 2 +- apps/nsqd/main.go | 24 ++++++++++++++++++++++-- apps/nsqlookupd/main.go | 2 +- go.mod | 7 +++---- go.sum | 8 ++++---- nsqd/nsqd.go | 17 +++++++++++++++++ 6 files changed, 48 insertions(+), 12 deletions(-) diff --git a/apps/nsqadmin/main.go b/apps/nsqadmin/main.go index 2b3a496a2..cae3144ad 100644 --- a/apps/nsqadmin/main.go +++ b/apps/nsqadmin/main.go @@ -9,7 +9,7 @@ import ( "syscall" "github.com/BurntSushi/toml" - "github.com/judwhite/go-svc/svc" + "github.com/judwhite/go-svc" "github.com/mreiferson/go-options" "github.com/nsqio/nsq/internal/app" "github.com/nsqio/nsq/internal/lg" diff --git a/apps/nsqd/main.go b/apps/nsqd/main.go index 65ceaecf6..cd2121376 100644 --- a/apps/nsqd/main.go +++ b/apps/nsqd/main.go @@ -1,17 +1,19 @@ package main import ( + "context" "flag" "fmt" "math/rand" "os" + "os/signal" "path/filepath" "sync" "syscall" "time" "github.com/BurntSushi/toml" - "github.com/judwhite/go-svc/svc" + "github.com/judwhite/go-svc" "github.com/mreiferson/go-options" "github.com/nsqio/nsq/internal/lg" "github.com/nsqio/nsq/internal/version" @@ -25,7 +27,8 @@ type program struct { func main() { prg := &program{} - if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil { + // SIGTERM handling is in Start() + if err := svc.Run(prg, syscall.SIGINT); err != nil { logFatal("%s", err) } } @@ -62,6 +65,7 @@ func (p *program) Start() error { cfg.Validate() options.Resolve(opts, flagSet, cfg) + nsqd, err := nsqd.New(opts) if err != nil { logFatal("failed to instantiate nsqd - %s", err) @@ -77,6 +81,17 @@ func (p *program) Start() error { logFatal("failed to persist metadata - %s", err) } + signalChan := make(chan os.Signal, 1) + go func() { + // range over all term signals + // we don't want to un-register our sigterm handler which would + // cause default go behavior to apply + for range signalChan { + p.nsqd.TermSignal() + } + }() + signal.Notify(signalChan, syscall.SIGTERM) + go func() { err := p.nsqd.Main() if err != nil { @@ -95,6 +110,11 @@ func (p *program) Stop() error { return nil } +// Context returns a context that will be canceled when nsqd initiates the shutdown +func (p *program) Context() context.Context { + return p.nsqd.Context() +} + func logFatal(f string, args ...interface{}) { lg.LogFatal("[nsqd] ", f, args...) } diff --git a/apps/nsqlookupd/main.go b/apps/nsqlookupd/main.go index 02202d216..449f3adcb 100644 --- a/apps/nsqlookupd/main.go +++ b/apps/nsqlookupd/main.go @@ -9,7 +9,7 @@ import ( "syscall" "github.com/BurntSushi/toml" - "github.com/judwhite/go-svc/svc" + "github.com/judwhite/go-svc" "github.com/mreiferson/go-options" "github.com/nsqio/nsq/internal/lg" "github.com/nsqio/nsq/internal/version" diff --git a/go.mod b/go.mod index d138e7c12..aacdacdd6 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,7 @@ module github.com/nsqio/nsq +go 1.13 + require ( github.com/BurntSushi/toml v0.3.1 github.com/bitly/go-hostpool v0.1.0 @@ -8,12 +10,9 @@ require ( github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/snappy v0.0.1 - github.com/judwhite/go-svc v1.1.2 + github.com/judwhite/go-svc v1.2.1 github.com/julienschmidt/httprouter v1.3.0 github.com/mreiferson/go-options v1.0.0 github.com/nsqio/go-diskqueue v1.0.0 github.com/nsqio/go-nsq v1.0.8 - golang.org/x/sys v0.0.0-20191224085550-c709ea063b76 // indirect ) - -go 1.13 diff --git a/go.sum b/go.sum index ee27b8b10..e18a620ed 100644 --- a/go.sum +++ b/go.sum @@ -13,8 +13,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/judwhite/go-svc v1.1.2 h1:wKroC8SKFs2EmtoS3XVmZinnRtGmu9qVrjubFp8talY= -github.com/judwhite/go-svc v1.1.2/go.mod h1:EeMSAFO3mLgEQfcvnZ50JDG0O1uQlagpAbMS6talrXE= +github.com/judwhite/go-svc v1.2.1 h1:a7fsJzYUa33sfDJRF2N/WXhA+LonCEEY8BJb1tuS5tA= +github.com/judwhite/go-svc v1.2.1/go.mod h1:mo/P2JNX8C07ywpP9YtO2gnBgnUiFTHqtsZekJrUuTk= github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/mreiferson/go-options v1.0.0 h1:RMLidydGlDWpL+lQTXo0bVIf/XT2CTq7AEJMoz5/VWs= @@ -28,8 +28,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -golang.org/x/sys v0.0.0-20191224085550-c709ea063b76 h1:Dho5nD6R3PcW2SH1or8vS0dszDaXRxIw55lBX7XiE5g= -golang.org/x/sys v0.0.0-20191224085550-c709ea063b76/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c h1:VwygUrnw9jn88c4u8GD3rZQbqrP/tgas88tPUbBxQrk= +golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index 390e1a654..20792e9bd 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -1,6 +1,7 @@ package nsqd import ( + "context" "crypto/tls" "crypto/x509" "encoding/json" @@ -46,6 +47,9 @@ type NSQD struct { clientIDSequence int64 sync.RWMutex + ctx context.Context + // ctxCancel cancels a context that main() is waiting on + ctxCancel context.CancelFunc opts atomic.Value @@ -98,6 +102,7 @@ func New(opts *Options) (*NSQD, error) { optsNotificationChan: make(chan struct{}, 1), dl: dirlock.New(dataPath), } + n.ctx, n.ctxCancel = context.WithCancel(context.Background()) httpcli := http_api.NewClient(nil, opts.HTTPClientConnectTimeout, opts.HTTPClientRequestTimeout) n.ci = clusterinfo.New(n.logf, httpcli) @@ -425,6 +430,12 @@ func (n *NSQD) PersistMetadata() error { return nil } +// TermSignal handles a SIGTERM calling Exit +// This is a noop after first call +func (n *NSQD) TermSignal() { + n.Exit() +} + func (n *NSQD) Exit() { if n.tcpListener != nil { n.tcpListener.Close() @@ -457,6 +468,7 @@ func (n *NSQD) Exit() { n.waitGroup.Wait() n.dl.Unlock() n.logf(LOG_INFO, "NSQ: bye") + n.ctxCancel() } // GetTopic performs a thread safe operation @@ -756,3 +768,8 @@ func buildTLSConfig(opts *Options) (*tls.Config, error) { func (n *NSQD) IsAuthEnabled() bool { return len(n.getOpts().AuthHTTPAddresses) != 0 } + +// Context returns a context that will be canceled when nsqd initiates the shutdown +func (n *NSQD) Context() context.Context { + return n.ctx +}