Skip to content

Commit

Permalink
Merge pull request #1140 from mdh67899/fix-nsqd-tcp-accept-error
Browse files Browse the repository at this point in the history
nsqd/nsqlookupd: properly handle fatal accept errors
  • Loading branch information
mreiferson authored Mar 3, 2019
2 parents fd1cde1 + 9d332eb commit 3eb619f
Show file tree
Hide file tree
Showing 38 changed files with 613 additions and 595 deletions.
4 changes: 2 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@

[[constraint]]
name = "github.com/mreiferson/go-options"
revision = "77551d20752b"
revision = "0c63f026bcd6"

[[constraint]]
name = "github.com/nsqio/go-diskqueue"
Expand Down
2 changes: 1 addition & 1 deletion apps/nsq_to_file/nsq_to_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func main() {
options.Resolve(opts, fs, nil)

logger := log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds)
logLevel, err := lg.ParseLogLevel(opts.LogLevel, false)
logLevel, err := lg.ParseLogLevel(opts.LogLevel)
if err != nil {
log.Fatal("--log-level is invalid")
}
Expand Down
139 changes: 88 additions & 51 deletions apps/nsqadmin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,93 +3,130 @@ package main
import (
"flag"
"fmt"
"log"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"

"github.com/BurntSushi/toml"
"github.com/judwhite/go-svc/svc"
"github.com/mreiferson/go-options"
"github.com/nsqio/nsq/internal/app"
"github.com/nsqio/nsq/internal/lg"
"github.com/nsqio/nsq/internal/version"
"github.com/nsqio/nsq/nsqadmin"
)

var (
flagSet = flag.NewFlagSet("nsqadmin", flag.ExitOnError)
func nsqadminFlagSet(opts *nsqadmin.Options) *flag.FlagSet {
flagSet := flag.NewFlagSet("nsqadmin", flag.ExitOnError)

basePath = flagSet.String("base-path", "/", "URL base path")
config = flagSet.String("config", "", "path to config file")
showVersion = flagSet.Bool("version", false, "print version string")
flagSet.String("config", "", "path to config file")
flagSet.Bool("version", false, "print version string")

logLevel = flagSet.String("log-level", "info", "set log verbosity: debug, info, warn, error, or fatal")
logPrefix = flagSet.String("log-prefix", "[nsqadmin] ", "log message prefix")
verbose = flagSet.Bool("verbose", false, "deprecated in favor of log-level")
logLevel := opts.LogLevel
flagSet.Var(&logLevel, "log-level", "set log verbosity: debug, info, warn, error, or fatal")
flagSet.String("log-prefix", "[nsqadmin] ", "log message prefix")
flagSet.Bool("verbose", false, "[deprecated] has no effect, use --log-level")

httpAddress = flagSet.String("http-address", "0.0.0.0:4171", "<addr>:<port> to listen on for HTTP clients")
flagSet.String("http-address", opts.HTTPAddress, "<addr>:<port> to listen on for HTTP clients")
flagSet.String("base-path", opts.BasePath, "URL base path")

graphiteURL = flagSet.String("graphite-url", "", "graphite HTTP address")
proxyGraphite = flagSet.Bool("proxy-graphite", false, "proxy HTTP requests to graphite")
flagSet.String("graphite-url", opts.GraphiteURL, "graphite HTTP address")
flagSet.Bool("proxy-graphite", false, "proxy HTTP requests to graphite")

statsdCounterFormat = flagSet.String("statsd-counter-format", "stats.counters.%s.count", "The counter stats key formatting applied by the implementation of statsd. If no formatting is desired, set this to an empty string.")
statsdGaugeFormat = flagSet.String("statsd-gauge-format", "stats.gauges.%s", "The gauge stats key formatting applied by the implementation of statsd. If no formatting is desired, set this to an empty string.")
statsdPrefix = flagSet.String("statsd-prefix", "nsq.%s", "prefix used for keys sent to statsd (%s for host replacement, must match nsqd)")
statsdInterval = flagSet.Duration("statsd-interval", 60*time.Second, "time interval nsqd is configured to push to statsd (must match nsqd)")
flagSet.String("statsd-counter-format", opts.StatsdCounterFormat, "The counter stats key formatting applied by the implementation of statsd. If no formatting is desired, set this to an empty string.")
flagSet.String("statsd-gauge-format", opts.StatsdGaugeFormat, "The gauge stats key formatting applied by the implementation of statsd. If no formatting is desired, set this to an empty string.")
flagSet.String("statsd-prefix", opts.StatsdPrefix, "prefix used for keys sent to statsd (%s for host replacement, must match nsqd)")
flagSet.Duration("statsd-interval", opts.StatsdInterval, "time interval nsqd is configured to push to statsd (must match nsqd)")

notificationHTTPEndpoint = flagSet.String("notification-http-endpoint", "", "HTTP endpoint (fully qualified) to which POST notifications of admin actions will be sent")
flagSet.String("notification-http-endpoint", "", "HTTP endpoint (fully qualified) to which POST notifications of admin actions will be sent")

httpConnectTimeout = flagSet.Duration("http-client-connect-timeout", 2*time.Second, "timeout for HTTP connect")
httpRequestTimeout = flagSet.Duration("http-client-request-timeout", 5*time.Second, "timeout for HTTP request")
flagSet.Duration("http-client-connect-timeout", opts.HTTPClientConnectTimeout, "timeout for HTTP connect")
flagSet.Duration("http-client-request-timeout", opts.HTTPClientRequestTimeout, "timeout for HTTP request")

httpClientTLSInsecureSkipVerify = flagSet.Bool("http-client-tls-insecure-skip-verify", false, "configure the HTTP client to skip verification of TLS certificates")
httpClientTLSRootCAFile = flagSet.String("http-client-tls-root-ca-file", "", "path to CA file for the HTTP client")
httpClientTLSCert = flagSet.String("http-client-tls-cert", "", "path to certificate file for the HTTP client")
httpClientTLSKey = flagSet.String("http-client-tls-key", "", "path to key file for the HTTP client")
flagSet.Bool("http-client-tls-insecure-skip-verify", false, "configure the HTTP client to skip verification of TLS certificates")
flagSet.String("http-client-tls-root-ca-file", "", "path to CA file for the HTTP client")
flagSet.String("http-client-tls-cert", "", "path to certificate file for the HTTP client")
flagSet.String("http-client-tls-key", "", "path to key file for the HTTP client")

allowConfigFromCIDR = flagSet.String("allow-config-from-cidr", "127.0.0.1/8", "A CIDR from which to allow HTTP requests to the /config endpoint")
aclHttpHeader = flagSet.String("acl-http-header", "X-Forwarded-User", "HTTP header to check for authenticated admin users")
flagSet.String("allow-config-from-cidr", opts.AllowConfigFromCIDR, "A CIDR from which to allow HTTP requests to the /config endpoint")
flagSet.String("acl-http-header", opts.AclHttpHeader, "HTTP header to check for authenticated admin users")

adminUsers = app.StringArray{}
nsqlookupdHTTPAddresses = app.StringArray{}
nsqdHTTPAddresses = app.StringArray{}
)

func init() {
nsqlookupdHTTPAddresses := app.StringArray{}
flagSet.Var(&nsqlookupdHTTPAddresses, "lookupd-http-address", "lookupd HTTP address (may be given multiple times)")
nsqdHTTPAddresses := app.StringArray{}
flagSet.Var(&nsqdHTTPAddresses, "nsqd-http-address", "nsqd HTTP address (may be given multiple times)")
adminUsers := app.StringArray{}
flagSet.Var(&adminUsers, "admin-user", "admin user (may be given multiple times; if specified, only these users will be able to perform privileged actions; acl-http-header is used to determine the authenticated user)")

return flagSet
}

type program struct {
once sync.Once
nsqadmin *nsqadmin.NSQAdmin
}

func main() {
prg := &program{}
if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
logFatal("%s", err)
}
}

func (p *program) Init(env svc.Environment) error {
if env.IsWindowsService() {
dir := filepath.Dir(os.Args[0])
return os.Chdir(dir)
}
return nil
}

func (p *program) Start() error {
opts := nsqadmin.NewOptions()

flagSet := nsqadminFlagSet(opts)
flagSet.Parse(os.Args[1:])

if *showVersion {
if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) {
fmt.Println(version.String("nsqadmin"))
return
os.Exit(0)
}

exitChan := make(chan int)
signalChan := make(chan os.Signal, 1)
go func() {
<-signalChan
exitChan <- 1
}()
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)

var cfg map[string]interface{}
if *config != "" {
_, err := toml.DecodeFile(*config, &cfg)
configFile := flagSet.Lookup("config").Value.String()
if configFile != "" {
_, err := toml.DecodeFile(configFile, &cfg)
if err != nil {
log.Fatalf("ERROR: failed to load config file %s - %s", *config, err)
logFatal("failed to load config file %s - %s", configFile, err)
}
}

opts := nsqadmin.NewOptions()
options.Resolve(opts, flagSet, cfg)
nsqadmin := nsqadmin.New(opts)
nsqadmin, err := nsqadmin.New(opts)
if err != nil {
logFatal("failed to instantiate nsqadmin - %s", err)
}
p.nsqadmin = nsqadmin

go func() {
err := p.nsqadmin.Main()
if err != nil {
p.Stop()
os.Exit(1)
}
}()

return nil
}

func (p *program) Stop() error {
p.once.Do(func() {
p.nsqadmin.Exit()
})
return nil
}

nsqadmin.Main()
<-exitChan
nsqadmin.Exit()
func logFatal(f string, args ...interface{}) {
lg.LogFatal("[nsqadmin] ", f, args...)
}
100 changes: 100 additions & 0 deletions apps/nsqd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package main

import (
"flag"
"fmt"
"math/rand"
"os"
"path/filepath"
"sync"
"syscall"
"time"

"github.com/BurntSushi/toml"
"github.com/judwhite/go-svc/svc"
"github.com/mreiferson/go-options"
"github.com/nsqio/nsq/internal/lg"
"github.com/nsqio/nsq/internal/version"
"github.com/nsqio/nsq/nsqd"
)

type program struct {
once sync.Once
nsqd *nsqd.NSQD
}

func main() {
prg := &program{}
if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
logFatal("%s", err)
}
}

func (p *program) Init(env svc.Environment) error {
if env.IsWindowsService() {
dir := filepath.Dir(os.Args[0])
return os.Chdir(dir)
}
return nil
}

func (p *program) Start() error {
opts := nsqd.NewOptions()

flagSet := nsqdFlagSet(opts)
flagSet.Parse(os.Args[1:])

rand.Seed(time.Now().UTC().UnixNano())

if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) {
fmt.Println(version.String("nsqd"))
os.Exit(0)
}

var cfg config
configFile := flagSet.Lookup("config").Value.String()
if configFile != "" {
_, err := toml.DecodeFile(configFile, &cfg)
if err != nil {
logFatal("failed to load config file %s - %s", configFile, err)
}
}
cfg.Validate()

options.Resolve(opts, flagSet, cfg)
nsqd, err := nsqd.New(opts)
if err != nil {
logFatal("failed to instantiate nsqd - %s", err)
}
p.nsqd = nsqd

err = p.nsqd.LoadMetadata()
if err != nil {
logFatal("failed to load metadata - %s", err)
}
err = p.nsqd.PersistMetadata()
if err != nil {
logFatal("failed to persist metadata - %s", err)
}

go func() {
err := p.nsqd.Main()
if err != nil {
p.Stop()
os.Exit(1)
}
}()

return nil
}

func (p *program) Stop() error {
p.once.Do(func() {
p.nsqd.Exit()
})
return nil
}

func logFatal(f string, args ...interface{}) {
lg.LogFatal("[nsqd] ", f, args...)
}
2 changes: 2 additions & 0 deletions apps/nsqd/nsqd_test.go → apps/nsqd/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import (

"github.com/BurntSushi/toml"
"github.com/mreiferson/go-options"
"github.com/nsqio/nsq/internal/test"
"github.com/nsqio/nsq/nsqd"
)

func TestConfigFlagParsing(t *testing.T) {
opts := nsqd.NewOptions()
opts.Logger = test.NewTestLogger(t)

flagSet := nsqdFlagSet(opts)
flagSet.Parse([]string{})
Expand Down
Loading

0 comments on commit 3eb619f

Please sign in to comment.