Skip to content

Commit

Permalink
update go-svc v1.2.1; expose context.Context from nsqd
Browse files Browse the repository at this point in the history
  • Loading branch information
jehiah committed Feb 2, 2021
1 parent 8adb229 commit cfc1f0e
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 12 deletions.
2 changes: 1 addition & 1 deletion apps/nsqadmin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"syscall"

"github.com/BurntSushi/toml"
"github.com/judwhite/go-svc/svc"
svc "github.com/judwhite/go-svc"
"github.com/mreiferson/go-options"
"github.com/nsqio/nsq/internal/app"
"github.com/nsqio/nsq/internal/lg"
Expand Down
24 changes: 22 additions & 2 deletions apps/nsqd/main.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
package main

import (
"context"
"flag"
"fmt"
"math/rand"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"

"github.com/BurntSushi/toml"
"github.com/judwhite/go-svc/svc"
svc "github.com/judwhite/go-svc"
"github.com/mreiferson/go-options"
"github.com/nsqio/nsq/internal/lg"
"github.com/nsqio/nsq/internal/version"
Expand All @@ -25,7 +27,8 @@ type program struct {

func main() {
prg := &program{}
if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
// SIGTERM handling is in Start()
if err := svc.Run(prg, syscall.SIGINT); err != nil {
logFatal("%s", err)
}
}
Expand Down Expand Up @@ -62,6 +65,7 @@ func (p *program) Start() error {
cfg.Validate()

options.Resolve(opts, flagSet, cfg)

nsqd, err := nsqd.New(opts)
if err != nil {
logFatal("failed to instantiate nsqd - %s", err)
Expand All @@ -77,6 +81,17 @@ func (p *program) Start() error {
logFatal("failed to persist metadata - %s", err)
}

signalChan := make(chan os.Signal, 1)
go func() {
// range over all term signals
// we don't want to un-register our sigterm handler which would
// cause default go behavior to apply
for range signalChan {
p.nsqd.TermSignal()
}
}()
signal.Notify(signalChan, syscall.SIGTERM)

go func() {
err := p.nsqd.Main()
if err != nil {
Expand All @@ -95,6 +110,11 @@ func (p *program) Stop() error {
return nil
}

// Context returns a context that will be canceled when nsqd initiates the shutdown
func (p *program) Context() context.Context {
return p.nsqd.Context()
}

func logFatal(f string, args ...interface{}) {
lg.LogFatal("[nsqd] ", f, args...)
}
2 changes: 1 addition & 1 deletion apps/nsqlookupd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"syscall"

"github.com/BurntSushi/toml"
"github.com/judwhite/go-svc/svc"
svc "github.com/judwhite/go-svc"
"github.com/mreiferson/go-options"
"github.com/nsqio/nsq/internal/lg"
"github.com/nsqio/nsq/internal/version"
Expand Down
7 changes: 3 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
module github.com/nsqio/nsq

go 1.13

require (
github.com/BurntSushi/toml v0.3.1
github.com/bitly/go-hostpool v0.1.0
Expand All @@ -8,12 +10,9 @@ require (
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/snappy v0.0.1
github.com/judwhite/go-svc v1.1.2
github.com/judwhite/go-svc v1.2.1
github.com/julienschmidt/httprouter v1.3.0
github.com/mreiferson/go-options v1.0.0
github.com/nsqio/go-diskqueue v1.0.0
github.com/nsqio/go-nsq v1.0.8
golang.org/x/sys v0.0.0-20191224085550-c709ea063b76 // indirect
)

go 1.13
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/judwhite/go-svc v1.1.2 h1:wKroC8SKFs2EmtoS3XVmZinnRtGmu9qVrjubFp8talY=
github.com/judwhite/go-svc v1.1.2/go.mod h1:EeMSAFO3mLgEQfcvnZ50JDG0O1uQlagpAbMS6talrXE=
github.com/judwhite/go-svc v1.2.1 h1:a7fsJzYUa33sfDJRF2N/WXhA+LonCEEY8BJb1tuS5tA=
github.com/judwhite/go-svc v1.2.1/go.mod h1:mo/P2JNX8C07ywpP9YtO2gnBgnUiFTHqtsZekJrUuTk=
github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/mreiferson/go-options v1.0.0 h1:RMLidydGlDWpL+lQTXo0bVIf/XT2CTq7AEJMoz5/VWs=
Expand All @@ -28,8 +28,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
golang.org/x/sys v0.0.0-20191224085550-c709ea063b76 h1:Dho5nD6R3PcW2SH1or8vS0dszDaXRxIw55lBX7XiE5g=
golang.org/x/sys v0.0.0-20191224085550-c709ea063b76/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c h1:VwygUrnw9jn88c4u8GD3rZQbqrP/tgas88tPUbBxQrk=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
Expand Down
17 changes: 17 additions & 0 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nsqd

import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
Expand Down Expand Up @@ -46,6 +47,9 @@ type NSQD struct {
clientIDSequence int64

sync.RWMutex
ctx context.Context
// ctxCancel cancels a context that main() is waiting on
ctxCancel context.CancelFunc

opts atomic.Value

Expand Down Expand Up @@ -98,6 +102,7 @@ func New(opts *Options) (*NSQD, error) {
optsNotificationChan: make(chan struct{}, 1),
dl: dirlock.New(dataPath),
}
n.ctx, n.ctxCancel = context.WithCancel(context.Background())
httpcli := http_api.NewClient(nil, opts.HTTPClientConnectTimeout, opts.HTTPClientRequestTimeout)
n.ci = clusterinfo.New(n.logf, httpcli)

Expand Down Expand Up @@ -425,6 +430,12 @@ func (n *NSQD) PersistMetadata() error {
return nil
}

// TermSignal handles a SIGTERM calling Exit
// This is a noop after first call
func (n *NSQD) TermSignal() {
n.Exit()
}

func (n *NSQD) Exit() {
if n.tcpListener != nil {
n.tcpListener.Close()
Expand Down Expand Up @@ -457,6 +468,7 @@ func (n *NSQD) Exit() {
n.waitGroup.Wait()
n.dl.Unlock()
n.logf(LOG_INFO, "NSQ: bye")
n.ctxCancel()
}

// GetTopic performs a thread safe operation
Expand Down Expand Up @@ -756,3 +768,8 @@ func buildTLSConfig(opts *Options) (*tls.Config, error) {
func (n *NSQD) IsAuthEnabled() bool {
return len(n.getOpts().AuthHTTPAddresses) != 0
}

// Context returns a context that will be canceled when nsqd initiates the shutdown
func (n *NSQD) Context() context.Context {
return n.ctx
}

0 comments on commit cfc1f0e

Please sign in to comment.