diff --git a/Gopkg.lock b/Gopkg.lock index 45a10e479..d786251a3 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -58,11 +58,11 @@ revision = "v1.2.0" [[projects]] - digest = "1:9408fb9c637c103010e5147469c232ce6b68edc840879cc730a2a15918e6cae8" + digest = "1:15c0562bca5d78ac087fb39c211071dc124e79fb18f8b7c3f8a0bc7ffcb2a38e" name = "github.com/mreiferson/go-options" packages = ["."] pruneopts = "" - revision = "77551d20752b" + revision = "0c63f026bcd6" [[projects]] digest = "1:22ffd73c4580854d935821a0fa95ff7324df836efa86d256adeb392450cbde87" diff --git a/Gopkg.toml b/Gopkg.toml index df9b1551f..bb0bf6390 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -55,7 +55,7 @@ [[constraint]] name = "github.com/mreiferson/go-options" - revision = "77551d20752b" + revision = "0c63f026bcd6" [[constraint]] name = "github.com/nsqio/go-diskqueue" diff --git a/apps/nsq_to_file/nsq_to_file.go b/apps/nsq_to_file/nsq_to_file.go index 17dba48a8..49449032c 100644 --- a/apps/nsq_to_file/nsq_to_file.go +++ b/apps/nsq_to_file/nsq_to_file.go @@ -80,7 +80,7 @@ func main() { options.Resolve(opts, fs, nil) logger := log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds) - logLevel, err := lg.ParseLogLevel(opts.LogLevel, false) + logLevel, err := lg.ParseLogLevel(opts.LogLevel) if err != nil { log.Fatal("--log-level is invalid") } diff --git a/apps/nsqadmin/main.go b/apps/nsqadmin/main.go index c19fa02df..2b3a496a2 100644 --- a/apps/nsqadmin/main.go +++ b/apps/nsqadmin/main.go @@ -3,93 +3,130 @@ package main import ( "flag" "fmt" - "log" "os" - "os/signal" + "path/filepath" + "sync" "syscall" - "time" "github.com/BurntSushi/toml" + "github.com/judwhite/go-svc/svc" "github.com/mreiferson/go-options" "github.com/nsqio/nsq/internal/app" + "github.com/nsqio/nsq/internal/lg" "github.com/nsqio/nsq/internal/version" "github.com/nsqio/nsq/nsqadmin" ) -var ( - flagSet = flag.NewFlagSet("nsqadmin", flag.ExitOnError) +func nsqadminFlagSet(opts *nsqadmin.Options) *flag.FlagSet { + flagSet := flag.NewFlagSet("nsqadmin", flag.ExitOnError) - basePath = flagSet.String("base-path", "/", "URL base path") - config = flagSet.String("config", "", "path to config file") - showVersion = flagSet.Bool("version", false, "print version string") + flagSet.String("config", "", "path to config file") + flagSet.Bool("version", false, "print version string") - logLevel = flagSet.String("log-level", "info", "set log verbosity: debug, info, warn, error, or fatal") - logPrefix = flagSet.String("log-prefix", "[nsqadmin] ", "log message prefix") - verbose = flagSet.Bool("verbose", false, "deprecated in favor of log-level") + logLevel := opts.LogLevel + flagSet.Var(&logLevel, "log-level", "set log verbosity: debug, info, warn, error, or fatal") + flagSet.String("log-prefix", "[nsqadmin] ", "log message prefix") + flagSet.Bool("verbose", false, "[deprecated] has no effect, use --log-level") - httpAddress = flagSet.String("http-address", "0.0.0.0:4171", ": to listen on for HTTP clients") + flagSet.String("http-address", opts.HTTPAddress, ": to listen on for HTTP clients") + flagSet.String("base-path", opts.BasePath, "URL base path") - graphiteURL = flagSet.String("graphite-url", "", "graphite HTTP address") - proxyGraphite = flagSet.Bool("proxy-graphite", false, "proxy HTTP requests to graphite") + flagSet.String("graphite-url", opts.GraphiteURL, "graphite HTTP address") + flagSet.Bool("proxy-graphite", false, "proxy HTTP requests to graphite") - statsdCounterFormat = flagSet.String("statsd-counter-format", "stats.counters.%s.count", "The counter stats key formatting applied by the implementation of statsd. If no formatting is desired, set this to an empty string.") - statsdGaugeFormat = flagSet.String("statsd-gauge-format", "stats.gauges.%s", "The gauge stats key formatting applied by the implementation of statsd. If no formatting is desired, set this to an empty string.") - statsdPrefix = flagSet.String("statsd-prefix", "nsq.%s", "prefix used for keys sent to statsd (%s for host replacement, must match nsqd)") - statsdInterval = flagSet.Duration("statsd-interval", 60*time.Second, "time interval nsqd is configured to push to statsd (must match nsqd)") + flagSet.String("statsd-counter-format", opts.StatsdCounterFormat, "The counter stats key formatting applied by the implementation of statsd. If no formatting is desired, set this to an empty string.") + flagSet.String("statsd-gauge-format", opts.StatsdGaugeFormat, "The gauge stats key formatting applied by the implementation of statsd. If no formatting is desired, set this to an empty string.") + flagSet.String("statsd-prefix", opts.StatsdPrefix, "prefix used for keys sent to statsd (%s for host replacement, must match nsqd)") + flagSet.Duration("statsd-interval", opts.StatsdInterval, "time interval nsqd is configured to push to statsd (must match nsqd)") - notificationHTTPEndpoint = flagSet.String("notification-http-endpoint", "", "HTTP endpoint (fully qualified) to which POST notifications of admin actions will be sent") + flagSet.String("notification-http-endpoint", "", "HTTP endpoint (fully qualified) to which POST notifications of admin actions will be sent") - httpConnectTimeout = flagSet.Duration("http-client-connect-timeout", 2*time.Second, "timeout for HTTP connect") - httpRequestTimeout = flagSet.Duration("http-client-request-timeout", 5*time.Second, "timeout for HTTP request") + flagSet.Duration("http-client-connect-timeout", opts.HTTPClientConnectTimeout, "timeout for HTTP connect") + flagSet.Duration("http-client-request-timeout", opts.HTTPClientRequestTimeout, "timeout for HTTP request") - httpClientTLSInsecureSkipVerify = flagSet.Bool("http-client-tls-insecure-skip-verify", false, "configure the HTTP client to skip verification of TLS certificates") - httpClientTLSRootCAFile = flagSet.String("http-client-tls-root-ca-file", "", "path to CA file for the HTTP client") - httpClientTLSCert = flagSet.String("http-client-tls-cert", "", "path to certificate file for the HTTP client") - httpClientTLSKey = flagSet.String("http-client-tls-key", "", "path to key file for the HTTP client") + flagSet.Bool("http-client-tls-insecure-skip-verify", false, "configure the HTTP client to skip verification of TLS certificates") + flagSet.String("http-client-tls-root-ca-file", "", "path to CA file for the HTTP client") + flagSet.String("http-client-tls-cert", "", "path to certificate file for the HTTP client") + flagSet.String("http-client-tls-key", "", "path to key file for the HTTP client") - allowConfigFromCIDR = flagSet.String("allow-config-from-cidr", "127.0.0.1/8", "A CIDR from which to allow HTTP requests to the /config endpoint") - aclHttpHeader = flagSet.String("acl-http-header", "X-Forwarded-User", "HTTP header to check for authenticated admin users") + flagSet.String("allow-config-from-cidr", opts.AllowConfigFromCIDR, "A CIDR from which to allow HTTP requests to the /config endpoint") + flagSet.String("acl-http-header", opts.AclHttpHeader, "HTTP header to check for authenticated admin users") - adminUsers = app.StringArray{} - nsqlookupdHTTPAddresses = app.StringArray{} - nsqdHTTPAddresses = app.StringArray{} -) - -func init() { + nsqlookupdHTTPAddresses := app.StringArray{} flagSet.Var(&nsqlookupdHTTPAddresses, "lookupd-http-address", "lookupd HTTP address (may be given multiple times)") + nsqdHTTPAddresses := app.StringArray{} flagSet.Var(&nsqdHTTPAddresses, "nsqd-http-address", "nsqd HTTP address (may be given multiple times)") + adminUsers := app.StringArray{} flagSet.Var(&adminUsers, "admin-user", "admin user (may be given multiple times; if specified, only these users will be able to perform privileged actions; acl-http-header is used to determine the authenticated user)") + + return flagSet +} + +type program struct { + once sync.Once + nsqadmin *nsqadmin.NSQAdmin } func main() { + prg := &program{} + if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil { + logFatal("%s", err) + } +} + +func (p *program) Init(env svc.Environment) error { + if env.IsWindowsService() { + dir := filepath.Dir(os.Args[0]) + return os.Chdir(dir) + } + return nil +} + +func (p *program) Start() error { + opts := nsqadmin.NewOptions() + + flagSet := nsqadminFlagSet(opts) flagSet.Parse(os.Args[1:]) - if *showVersion { + if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) { fmt.Println(version.String("nsqadmin")) - return + os.Exit(0) } - exitChan := make(chan int) - signalChan := make(chan os.Signal, 1) - go func() { - <-signalChan - exitChan <- 1 - }() - signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) - var cfg map[string]interface{} - if *config != "" { - _, err := toml.DecodeFile(*config, &cfg) + configFile := flagSet.Lookup("config").Value.String() + if configFile != "" { + _, err := toml.DecodeFile(configFile, &cfg) if err != nil { - log.Fatalf("ERROR: failed to load config file %s - %s", *config, err) + logFatal("failed to load config file %s - %s", configFile, err) } } - opts := nsqadmin.NewOptions() options.Resolve(opts, flagSet, cfg) - nsqadmin := nsqadmin.New(opts) + nsqadmin, err := nsqadmin.New(opts) + if err != nil { + logFatal("failed to instantiate nsqadmin - %s", err) + } + p.nsqadmin = nsqadmin + + go func() { + err := p.nsqadmin.Main() + if err != nil { + p.Stop() + os.Exit(1) + } + }() + + return nil +} + +func (p *program) Stop() error { + p.once.Do(func() { + p.nsqadmin.Exit() + }) + return nil +} - nsqadmin.Main() - <-exitChan - nsqadmin.Exit() +func logFatal(f string, args ...interface{}) { + lg.LogFatal("[nsqadmin] ", f, args...) } diff --git a/apps/nsqd/main.go b/apps/nsqd/main.go new file mode 100644 index 000000000..65ceaecf6 --- /dev/null +++ b/apps/nsqd/main.go @@ -0,0 +1,100 @@ +package main + +import ( + "flag" + "fmt" + "math/rand" + "os" + "path/filepath" + "sync" + "syscall" + "time" + + "github.com/BurntSushi/toml" + "github.com/judwhite/go-svc/svc" + "github.com/mreiferson/go-options" + "github.com/nsqio/nsq/internal/lg" + "github.com/nsqio/nsq/internal/version" + "github.com/nsqio/nsq/nsqd" +) + +type program struct { + once sync.Once + nsqd *nsqd.NSQD +} + +func main() { + prg := &program{} + if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil { + logFatal("%s", err) + } +} + +func (p *program) Init(env svc.Environment) error { + if env.IsWindowsService() { + dir := filepath.Dir(os.Args[0]) + return os.Chdir(dir) + } + return nil +} + +func (p *program) Start() error { + opts := nsqd.NewOptions() + + flagSet := nsqdFlagSet(opts) + flagSet.Parse(os.Args[1:]) + + rand.Seed(time.Now().UTC().UnixNano()) + + if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) { + fmt.Println(version.String("nsqd")) + os.Exit(0) + } + + var cfg config + configFile := flagSet.Lookup("config").Value.String() + if configFile != "" { + _, err := toml.DecodeFile(configFile, &cfg) + if err != nil { + logFatal("failed to load config file %s - %s", configFile, err) + } + } + cfg.Validate() + + options.Resolve(opts, flagSet, cfg) + nsqd, err := nsqd.New(opts) + if err != nil { + logFatal("failed to instantiate nsqd - %s", err) + } + p.nsqd = nsqd + + err = p.nsqd.LoadMetadata() + if err != nil { + logFatal("failed to load metadata - %s", err) + } + err = p.nsqd.PersistMetadata() + if err != nil { + logFatal("failed to persist metadata - %s", err) + } + + go func() { + err := p.nsqd.Main() + if err != nil { + p.Stop() + os.Exit(1) + } + }() + + return nil +} + +func (p *program) Stop() error { + p.once.Do(func() { + p.nsqd.Exit() + }) + return nil +} + +func logFatal(f string, args ...interface{}) { + lg.LogFatal("[nsqd] ", f, args...) +} diff --git a/apps/nsqd/nsqd_test.go b/apps/nsqd/main_test.go similarity index 89% rename from apps/nsqd/nsqd_test.go rename to apps/nsqd/main_test.go index 9c1f69829..bb29e7793 100644 --- a/apps/nsqd/nsqd_test.go +++ b/apps/nsqd/main_test.go @@ -7,11 +7,13 @@ import ( "github.com/BurntSushi/toml" "github.com/mreiferson/go-options" + "github.com/nsqio/nsq/internal/test" "github.com/nsqio/nsq/nsqd" ) func TestConfigFlagParsing(t *testing.T) { opts := nsqd.NewOptions() + opts.Logger = test.NewTestLogger(t) flagSet := nsqdFlagSet(opts) flagSet.Parse([]string{}) diff --git a/apps/nsqd/nsqd.go b/apps/nsqd/options.go similarity index 80% rename from apps/nsqd/nsqd.go rename to apps/nsqd/options.go index 4d85228c1..887478e0d 100644 --- a/apps/nsqd/nsqd.go +++ b/apps/nsqd/options.go @@ -4,20 +4,10 @@ import ( "crypto/tls" "flag" "fmt" - "log" - "math/rand" - "os" - "path/filepath" "strconv" "strings" - "syscall" - "time" - "github.com/BurntSushi/toml" - "github.com/judwhite/go-svc/svc" - "github.com/mreiferson/go-options" "github.com/nsqio/nsq/internal/app" - "github.com/nsqio/nsq/internal/version" "github.com/nsqio/nsq/nsqd" ) @@ -73,6 +63,36 @@ func (t *tlsMinVersionOption) String() string { return strconv.FormatInt(int64(*t), 10) } +type config map[string]interface{} + +// Validate settings in the config file, and fatal on errors +func (cfg config) Validate() { + // special validation/translation + if v, exists := cfg["tls_required"]; exists { + var t tlsRequiredOption + err := t.Set(fmt.Sprintf("%v", v)) + if err == nil { + cfg["tls_required"] = t.String() + } else { + logFatal("failed parsing tls_required %+v", v) + } + } + if v, exists := cfg["tls_min_version"]; exists { + var t tlsMinVersionOption + err := t.Set(fmt.Sprintf("%v", v)) + if err == nil { + newVal := fmt.Sprintf("%v", t.Get()) + if newVal != "0" { + cfg["tls_min_version"] = newVal + } else { + delete(cfg, "tls_min_version") + } + } else { + logFatal("failed parsing tls_min_version %+v", v) + } + } +} + func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet { flagSet := flag.NewFlagSet("nsqd", flag.ExitOnError) @@ -80,12 +100,13 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet { flagSet.Bool("version", false, "print version string") flagSet.String("config", "", "path to config file") - flagSet.String("log-level", "info", "set log verbosity: debug, info, warn, error, or fatal") + logLevel := opts.LogLevel + flagSet.Var(&logLevel, "log-level", "set log verbosity: debug, info, warn, error, or fatal") flagSet.String("log-prefix", "[nsqd] ", "log message prefix") - flagSet.Bool("verbose", false, "deprecated in favor of log-level") + flagSet.Bool("verbose", false, "[deprecated] has no effect, use --log-level") flagSet.Int64("node-id", opts.ID, "unique part for message IDs, (int) in range [0,1024) (default is hash of hostname)") - flagSet.Bool("worker-id", false, "do NOT use this, use --node-id") + flagSet.Bool("worker-id", false, "[deprecated] use --node-id") flagSet.String("https-address", opts.HTTPSAddress, ": to listen on for HTTPS clients") flagSet.String("http-address", opts.HTTPAddress, ": to listen on for HTTP clients") @@ -150,99 +171,3 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet { return flagSet } - -type config map[string]interface{} - -// Validate settings in the config file, and fatal on errors -func (cfg config) Validate() { - // special validation/translation - if v, exists := cfg["tls_required"]; exists { - var t tlsRequiredOption - err := t.Set(fmt.Sprintf("%v", v)) - if err == nil { - cfg["tls_required"] = t.String() - } else { - log.Fatalf("ERROR: failed parsing tls required %v", v) - } - } - if v, exists := cfg["tls_min_version"]; exists { - var t tlsMinVersionOption - err := t.Set(fmt.Sprintf("%v", v)) - if err == nil { - newVal := fmt.Sprintf("%v", t.Get()) - if newVal != "0" { - cfg["tls_min_version"] = newVal - } else { - delete(cfg, "tls_min_version") - } - } else { - log.Fatalf("ERROR: failed parsing tls min version %v", v) - } - } -} - -type program struct { - nsqd *nsqd.NSQD -} - -func main() { - prg := &program{} - if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil { - log.Fatal(err) - } -} - -func (p *program) Init(env svc.Environment) error { - if env.IsWindowsService() { - dir := filepath.Dir(os.Args[0]) - return os.Chdir(dir) - } - return nil -} - -func (p *program) Start() error { - opts := nsqd.NewOptions() - - flagSet := nsqdFlagSet(opts) - flagSet.Parse(os.Args[1:]) - - rand.Seed(time.Now().UTC().UnixNano()) - - if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) { - fmt.Println(version.String("nsqd")) - os.Exit(0) - } - - var cfg config - configFile := flagSet.Lookup("config").Value.String() - if configFile != "" { - _, err := toml.DecodeFile(configFile, &cfg) - if err != nil { - log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error()) - } - } - cfg.Validate() - - options.Resolve(opts, flagSet, cfg) - nsqd := nsqd.New(opts) - - err := nsqd.LoadMetadata() - if err != nil { - log.Fatalf("ERROR: %s", err.Error()) - } - err = nsqd.PersistMetadata() - if err != nil { - log.Fatalf("ERROR: failed to persist metadata - %s", err.Error()) - } - nsqd.Main() - - p.nsqd = nsqd - return nil -} - -func (p *program) Stop() error { - if p.nsqd != nil { - p.nsqd.Exit() - } - return nil -} diff --git a/apps/nsqlookupd/nsqlookupd.go b/apps/nsqlookupd/main.go similarity index 76% rename from apps/nsqlookupd/nsqlookupd.go rename to apps/nsqlookupd/main.go index 9fcc06c0e..02202d216 100644 --- a/apps/nsqlookupd/nsqlookupd.go +++ b/apps/nsqlookupd/main.go @@ -3,14 +3,15 @@ package main import ( "flag" "fmt" - "log" "os" "path/filepath" + "sync" "syscall" "github.com/BurntSushi/toml" "github.com/judwhite/go-svc/svc" "github.com/mreiferson/go-options" + "github.com/nsqio/nsq/internal/lg" "github.com/nsqio/nsq/internal/version" "github.com/nsqio/nsq/nsqlookupd" ) @@ -21,9 +22,10 @@ func nsqlookupdFlagSet(opts *nsqlookupd.Options) *flag.FlagSet { flagSet.String("config", "", "path to config file") flagSet.Bool("version", false, "print version string") - flagSet.String("log-level", "info", "set log verbosity: debug, info, warn, error, or fatal") + logLevel := opts.LogLevel + flagSet.Var(&logLevel, "log-level", "set log verbosity: debug, info, warn, error, or fatal") flagSet.String("log-prefix", "[nsqlookupd] ", "log message prefix") - flagSet.Bool("verbose", false, "deprecated in favor of log-level") + flagSet.Bool("verbose", false, "[deprecated] has no effect, use --log-level") flagSet.String("tcp-address", opts.TCPAddress, ": to listen on for TCP clients") flagSet.String("http-address", opts.HTTPAddress, ": to listen on for HTTP clients") @@ -36,13 +38,14 @@ func nsqlookupdFlagSet(opts *nsqlookupd.Options) *flag.FlagSet { } type program struct { + once sync.Once nsqlookupd *nsqlookupd.NSQLookupd } func main() { prg := &program{} if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil { - log.Fatal(err) + logFatal("%s", err) } } @@ -70,25 +73,35 @@ func (p *program) Start() error { if configFile != "" { _, err := toml.DecodeFile(configFile, &cfg) if err != nil { - log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error()) + logFatal("failed to load config file %s - %s", configFile, err) } } options.Resolve(opts, flagSet, cfg) - daemon := nsqlookupd.New(opts) - - err := daemon.Main() + nsqlookupd, err := nsqlookupd.New(opts) if err != nil { - log.Fatalf("ERROR: failed to start nsqlookupd: %v", err) + logFatal("failed to instantiate nsqlookupd", err) } + p.nsqlookupd = nsqlookupd + + go func() { + err := p.nsqlookupd.Main() + if err != nil { + p.Stop() + os.Exit(1) + } + }() - p.nsqlookupd = daemon return nil } func (p *program) Stop() error { - if p.nsqlookupd != nil { + p.once.Do(func() { p.nsqlookupd.Exit() - } + }) return nil } + +func logFatal(f string, args ...interface{}) { + lg.LogFatal("[nsqlookupd] ", f, args...) +} diff --git a/go.mod b/go.mod index c2da58714..f4b7c15c5 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db github.com/judwhite/go-svc v1.0.0 github.com/julienschmidt/httprouter v1.2.0 - github.com/mreiferson/go-options v0.0.0-20161229190002-77551d20752b + github.com/mreiferson/go-options v0.0.0-20190302015348-0c63f026bcd6 github.com/nsqio/go-diskqueue v0.0.0-20180306152900-74cfbc9de839 github.com/nsqio/go-nsq v1.0.7 github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/go.sum b/go.sum index cbbefd457..63c62ab65 100644 --- a/go.sum +++ b/go.sum @@ -16,8 +16,8 @@ github.com/judwhite/go-svc v1.0.0 h1:W447kYhZsqC14hkfNG8XLy9wbYibeMW75g5DtAIpFGw github.com/judwhite/go-svc v1.0.0/go.mod h1:EeMSAFO3mLgEQfcvnZ50JDG0O1uQlagpAbMS6talrXE= github.com/julienschmidt/httprouter v1.2.0 h1:TDTW5Yz1mjftljbcKqRcrYhd4XeOoI98t+9HbQbYf7g= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= -github.com/mreiferson/go-options v0.0.0-20161229190002-77551d20752b h1:xjKomx939vefURtocD1uaKvcvAp1dNYX05i0TIpnfVI= -github.com/mreiferson/go-options v0.0.0-20161229190002-77551d20752b/go.mod h1:A0JOgZNsj9V+npbgxH0Ib75PvrHS6Ezri/4HdcTp/DI= +github.com/mreiferson/go-options v0.0.0-20190302015348-0c63f026bcd6 h1:frRvTmIp7QT1RPaphBvr6zvEHfvdOX7jMO7rvicCH9Q= +github.com/mreiferson/go-options v0.0.0-20190302015348-0c63f026bcd6/go.mod h1:zHtCks/HQvOt8ATyfwVe3JJq2PPuImzXINPRTC03+9w= github.com/nsqio/go-diskqueue v0.0.0-20180306152900-74cfbc9de839 h1:nZ0z0haJRzCXAWH9Jl+BUnfD2n2MCSbGRSl8VBX+zR0= github.com/nsqio/go-diskqueue v0.0.0-20180306152900-74cfbc9de839/go.mod h1:AYinRDfdKMmVKTPI8wOcLgjcw2pTS3jo8fib1VxOzsE= github.com/nsqio/go-nsq v1.0.7 h1:O0pIZJYTf+x7cZBA0UMY8WxFG79lYTURmWzAAh48ljY= diff --git a/internal/app/float_array.go b/internal/app/float_array.go index e87e9cec8..2ec39c73d 100644 --- a/internal/app/float_array.go +++ b/internal/app/float_array.go @@ -10,6 +10,8 @@ import ( type FloatArray []float64 +func (a *FloatArray) Get() interface{} { return []float64(*a) } + func (a *FloatArray) Set(param string) error { for _, s := range strings.Split(param, ",") { v, err := strconv.ParseFloat(s, 64) diff --git a/internal/app/string_array.go b/internal/app/string_array.go index 3c8da112c..79962c113 100644 --- a/internal/app/string_array.go +++ b/internal/app/string_array.go @@ -6,6 +6,8 @@ import ( type StringArray []string +func (a *StringArray) Get() interface{} { return []string(*a) } + func (a *StringArray) Set(s string) error { *a = append(*a, s) return nil diff --git a/internal/http_api/http_server.go b/internal/http_api/http_server.go index e02b51cfc..cf33b0197 100644 --- a/internal/http_api/http_server.go +++ b/internal/http_api/http_server.go @@ -1,6 +1,7 @@ package http_api import ( + "fmt" "log" "net" "net/http" @@ -18,7 +19,7 @@ func (l logWriter) Write(p []byte) (int, error) { return len(p), nil } -func Serve(listener net.Listener, handler http.Handler, proto string, logf lg.AppLogFunc) { +func Serve(listener net.Listener, handler http.Handler, proto string, logf lg.AppLogFunc) error { logf(lg.INFO, "%s: listening on %s", proto, listener.Addr()) server := &http.Server{ @@ -28,8 +29,10 @@ func Serve(listener net.Listener, handler http.Handler, proto string, logf lg.Ap err := server.Serve(listener) // theres no direct way to detect this error because it is not exposed if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { - logf(lg.ERROR, "http.Serve() - %s", err) + return fmt.Errorf("http.Serve() error - %s", err) } logf(lg.INFO, "%s: closing %s", proto, listener.Addr()) + + return nil } diff --git a/internal/lg/lg.go b/internal/lg/lg.go index 74936b132..aa209a473 100644 --- a/internal/lg/lg.go +++ b/internal/lg/lg.go @@ -3,11 +3,11 @@ package lg import ( "fmt" + "log" + "os" "strings" ) -type LogLevel int - const ( DEBUG = LogLevel(1) INFO = LogLevel(2) @@ -28,43 +28,49 @@ func (l NilLogger) Output(maxdepth int, s string) error { return nil } -func (l LogLevel) String() string { - switch l { - case 1: +type LogLevel int + +func (l *LogLevel) Get() interface{} { return *l } + +func (l *LogLevel) Set(s string) error { + lvl, err := ParseLogLevel(s) + if err != nil { + return err + } + *l = lvl + return nil +} + +func (l *LogLevel) String() string { + switch *l { + case DEBUG: return "DEBUG" - case 2: + case INFO: return "INFO" - case 3: + case WARN: return "WARNING" - case 4: + case ERROR: return "ERROR" - case 5: + case FATAL: return "FATAL" } - panic("invalid LogLevel") + return "invalid" } -func ParseLogLevel(levelstr string, verbose bool) (LogLevel, error) { - lvl := INFO - +func ParseLogLevel(levelstr string) (LogLevel, error) { switch strings.ToLower(levelstr) { case "debug": - lvl = DEBUG + return DEBUG, nil case "info": - lvl = INFO + return INFO, nil case "warn": - lvl = WARN + return WARN, nil case "error": - lvl = ERROR + return ERROR, nil case "fatal": - lvl = FATAL - default: - return lvl, fmt.Errorf("invalid log-level '%s'", levelstr) + return FATAL, nil } - if verbose { - lvl = DEBUG - } - return lvl, nil + return 0, fmt.Errorf("invalid log level '%s' (debug, info, warn, error, fatal)", levelstr) } func Logf(logger Logger, cfgLevel LogLevel, msgLevel LogLevel, f string, args ...interface{}) { @@ -73,3 +79,9 @@ func Logf(logger Logger, cfgLevel LogLevel, msgLevel LogLevel, f string, args .. } logger.Output(3, fmt.Sprintf(msgLevel.String()+": "+f, args...)) } + +func LogFatal(prefix string, f string, args ...interface{}) { + logger := log.New(os.Stderr, prefix, log.Ldate|log.Ltime|log.Lmicroseconds) + Logf(logger, FATAL, FATAL, f, args...) + os.Exit(1) +} diff --git a/internal/lg/lg_test.go b/internal/lg/lg_test.go index ec93ec610..2d8e506cc 100644 --- a/internal/lg/lg_test.go +++ b/internal/lg/lg_test.go @@ -1,53 +1,11 @@ package lg import ( - "log" - "os" "testing" "github.com/nsqio/nsq/internal/test" ) -type options struct { - LogLevel string `flag:"log-level"` - Verbose bool `flag:"verbose"` // for backwards compatibility - Logger Logger - logLevel LogLevel // private, not really an option -} - -func newOptions() *options { - return &options{ - LogLevel: "info", - } -} - -type app struct { - opts *options -} - -func (n *app) logf(level LogLevel, f string, args ...interface{}) { - Logf(n.opts.Logger, n.opts.logLevel, level, f, args) -} - -func newApp(opts *options) *app { - if opts.Logger == nil { - opts.Logger = log.New(os.Stderr, "[app] ", log.Ldate|log.Ltime|log.Lmicroseconds) - } - n := &app{ - opts: opts, - } - - var err error - opts.logLevel, err = ParseLogLevel(opts.LogLevel, opts.Verbose) - if err != nil { - n.logf(FATAL, "%s", err) - os.Exit(1) - } - - n.logf(INFO, "app 0.1") - return n -} - type mockLogger struct { Count int } @@ -59,51 +17,25 @@ func (l *mockLogger) Output(maxdepth int, s string) error { func TestLogging(t *testing.T) { logger := &mockLogger{} - opts := newOptions() - opts.Logger = logger // Test only fatal get through - opts.LogLevel = "FaTaL" - nsqd := newApp(opts) logger.Count = 0 for i := 1; i <= 5; i++ { - nsqd.logf(LogLevel(i), "Test") + Logf(logger, FATAL, LogLevel(i), "Test") } test.Equal(t, 1, logger.Count) // Test only warnings or higher get through - opts.LogLevel = "WARN" - nsqd = newApp(opts) logger.Count = 0 for i := 1; i <= 5; i++ { - nsqd.logf(LogLevel(i), "Test") + Logf(logger, WARN, LogLevel(i), "Test") } test.Equal(t, 3, logger.Count) // Test everything gets through - opts.LogLevel = "debuG" - nsqd = newApp(opts) - logger.Count = 0 - for i := 1; i <= 5; i++ { - nsqd.logf(LogLevel(i), "Test") - } - test.Equal(t, 5, logger.Count) - - // Test everything gets through with verbose = true - opts.LogLevel = "fatal" - opts.Verbose = true - nsqd = newApp(opts) logger.Count = 0 for i := 1; i <= 5; i++ { - nsqd.logf(LogLevel(i), "Test") + Logf(logger, DEBUG, LogLevel(i), "Test") } test.Equal(t, 5, logger.Count) } - -func TestNoLogger(t *testing.T) { - opts := newOptions() - opts.Logger = NilLogger{} - app := newApp(opts) - - app.logf(ERROR, "should never be logged") -} diff --git a/internal/protocol/tcp_server.go b/internal/protocol/tcp_server.go index 20b39d295..186b2965c 100644 --- a/internal/protocol/tcp_server.go +++ b/internal/protocol/tcp_server.go @@ -1,6 +1,7 @@ package protocol import ( + "fmt" "net" "runtime" "strings" @@ -12,7 +13,7 @@ type TCPHandler interface { Handle(net.Conn) } -func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) { +func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error { logf(lg.INFO, "TCP: listening on %s", listener.Addr()) for { @@ -25,7 +26,7 @@ func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) { } // theres no direct way to detect this error because it is not exposed if !strings.Contains(err.Error(), "use of closed network connection") { - logf(lg.ERROR, "listener.Accept() - %s", err) + return fmt.Errorf("listener.Accept() error - %s", err) } break } @@ -33,4 +34,6 @@ func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) { } logf(lg.INFO, "TCP: closing %s", listener.Addr()) + + return nil } diff --git a/nsqadmin/http.go b/nsqadmin/http.go index 4c695bc62..30ba9f2fb 100644 --- a/nsqadmin/http.go +++ b/nsqadmin/http.go @@ -765,12 +765,11 @@ func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httpr } case "log_level": logLevelStr := string(body) - logLevel, err := lg.ParseLogLevel(logLevelStr, opts.Verbose) + logLevel, err := lg.ParseLogLevel(logLevelStr) if err != nil { return nil, http_api.Err{400, "INVALID_VALUE"} } - opts.LogLevel = logLevelStr - opts.logLevel = logLevel + opts.LogLevel = logLevel default: return nil, http_api.Err{400, "INVALID_OPTION"} } diff --git a/nsqadmin/http_test.go b/nsqadmin/http_test.go index 6ffbf6d97..9dc20960c 100644 --- a/nsqadmin/http_test.go +++ b/nsqadmin/http_test.go @@ -49,8 +49,16 @@ type ChannelStatsDoc struct { func mustStartNSQLookupd(opts *nsqlookupd.Options) (*net.TCPAddr, *net.TCPAddr, *nsqlookupd.NSQLookupd) { opts.TCPAddress = "127.0.0.1:0" opts.HTTPAddress = "127.0.0.1:0" - lookupd := nsqlookupd.New(opts) - lookupd.Main() + lookupd, err := nsqlookupd.New(opts) + if err != nil { + panic(err) + } + go func() { + err := lookupd.Main() + if err != nil { + panic(err) + } + }() return lookupd.RealTCPAddr(), lookupd.RealHTTPAddr(), lookupd } @@ -66,8 +74,16 @@ func bootstrapNSQClusterWithAuth(t *testing.T, withAuth bool) (string, []*nsqd.N nsqlookupdOpts.HTTPAddress = "127.0.0.1:0" nsqlookupdOpts.BroadcastAddress = "127.0.0.1" nsqlookupdOpts.Logger = lgr - nsqlookupd1 := nsqlookupd.New(nsqlookupdOpts) - nsqlookupd1.Main() + nsqlookupd1, err := nsqlookupd.New(nsqlookupdOpts) + if err != nil { + panic(err) + } + go func() { + err := nsqlookupd1.Main() + if err != nil { + panic(err) + } + }() time.Sleep(100 * time.Millisecond) @@ -82,8 +98,16 @@ func bootstrapNSQClusterWithAuth(t *testing.T, withAuth bool) (string, []*nsqd.N panic(err) } nsqdOpts.DataPath = tmpDir - nsqd1 := nsqd.New(nsqdOpts) - nsqd1.Main() + nsqd1, err := nsqd.New(nsqdOpts) + if err != nil { + panic(err) + } + go func() { + err := nsqd1.Main() + if err != nil { + panic(err) + } + }() nsqadminOpts := NewOptions() nsqadminOpts.HTTPAddress = "127.0.0.1:0" @@ -92,8 +116,16 @@ func bootstrapNSQClusterWithAuth(t *testing.T, withAuth bool) (string, []*nsqd.N if withAuth { nsqadminOpts.AdminUsers = []string{"matt"} } - nsqadmin1 := New(nsqadminOpts) - nsqadmin1.Main() + nsqadmin1, err := New(nsqadminOpts) + if err != nil { + panic(err) + } + go func() { + err := nsqadmin1.Main() + if err != nil { + panic(err) + } + }() time.Sleep(100 * time.Millisecond) @@ -573,7 +605,7 @@ func TestHTTPconfig(t *testing.T) { defer resp.Body.Close() body, _ = ioutil.ReadAll(resp.Body) test.Equal(t, 200, resp.StatusCode) - test.Equal(t, LOG_FATAL, nsqadmin1.getOpts().logLevel) + test.Equal(t, LOG_FATAL, nsqadmin1.getOpts().LogLevel) url = fmt.Sprintf("http://%s/config/log_level", nsqadmin1.RealHTTPAddr()) req, err = http.NewRequest("PUT", url, bytes.NewBuffer([]byte(`bad`))) @@ -591,8 +623,14 @@ func TestHTTPconfigCIDR(t *testing.T) { opts.NSQLookupdHTTPAddresses = []string{"127.0.0.1:4161"} opts.Logger = test.NewTestLogger(t) opts.AllowConfigFromCIDR = "10.0.0.0/8" - nsqadmin := New(opts) - nsqadmin.Main() + nsqadmin, err := New(opts) + test.Nil(t, err) + go func() { + err := nsqadmin.Main() + if err != nil { + panic(err) + } + }() defer nsqadmin.Exit() time.Sleep(100 * time.Millisecond) diff --git a/nsqadmin/logger.go b/nsqadmin/logger.go index 90c123ba1..3b8160556 100644 --- a/nsqadmin/logger.go +++ b/nsqadmin/logger.go @@ -16,5 +16,5 @@ const ( func (n *NSQAdmin) logf(level lg.LogLevel, f string, args ...interface{}) { opts := n.getOpts() - lg.Logf(opts.Logger, opts.logLevel, level, f, args...) + lg.Logf(opts.Logger, opts.LogLevel, level, f, args...) } diff --git a/nsqadmin/nsqadmin.go b/nsqadmin/nsqadmin.go index f1355d5cf..65e022eba 100644 --- a/nsqadmin/nsqadmin.go +++ b/nsqadmin/nsqadmin.go @@ -5,6 +5,8 @@ import ( "crypto/tls" "crypto/x509" "encoding/json" + "errors" + "fmt" "io/ioutil" "log" "net" @@ -16,7 +18,6 @@ import ( "sync/atomic" "github.com/nsqio/nsq/internal/http_api" - "github.com/nsqio/nsq/internal/lg" "github.com/nsqio/nsq/internal/util" "github.com/nsqio/nsq/internal/version" ) @@ -31,7 +32,7 @@ type NSQAdmin struct { httpClientTLSConfig *tls.Config } -func New(opts *Options) *NSQAdmin { +func New(opts *Options) (*NSQAdmin, error) { if opts.Logger == nil { opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds) } @@ -41,41 +42,20 @@ func New(opts *Options) *NSQAdmin { } n.swapOpts(opts) - var err error - opts.logLevel, err = lg.ParseLogLevel(opts.LogLevel, opts.Verbose) - if err != nil { - n.logf(LOG_FATAL, "%s", err) - os.Exit(1) - } - if len(opts.NSQDHTTPAddresses) == 0 && len(opts.NSQLookupdHTTPAddresses) == 0 { - n.logf(LOG_FATAL, "--nsqd-http-address or --lookupd-http-address required.") - os.Exit(1) + return nil, errors.New("--nsqd-http-address or --lookupd-http-address required") } if len(opts.NSQDHTTPAddresses) != 0 && len(opts.NSQLookupdHTTPAddresses) != 0 { - n.logf(LOG_FATAL, "use --nsqd-http-address or --lookupd-http-address not both") - os.Exit(1) - } - - // verify that the supplied address is valid - verifyAddress := func(arg string, address string) *net.TCPAddr { - addr, err := net.ResolveTCPAddr("tcp", address) - if err != nil { - n.logf(LOG_FATAL, "failed to resolve %s address (%s) - %s", arg, address, err) - os.Exit(1) - } - return addr + return nil, errors.New("use --nsqd-http-address or --lookupd-http-address not both") } if opts.HTTPClientTLSCert != "" && opts.HTTPClientTLSKey == "" { - n.logf(LOG_FATAL, "--http-client-tls-key must be specified with --http-client-tls-cert") - os.Exit(1) + return nil, errors.New("--http-client-tls-key must be specified with --http-client-tls-cert") } if opts.HTTPClientTLSKey != "" && opts.HTTPClientTLSCert == "" { - n.logf(LOG_FATAL, "--http-client-tls-cert must be specified with --http-client-tls-key") - os.Exit(1) + return nil, errors.New("--http-client-tls-cert must be specified with --http-client-tls-key") } n.httpClientTLSConfig = &tls.Config{ @@ -84,9 +64,8 @@ func New(opts *Options) *NSQAdmin { if opts.HTTPClientTLSCert != "" && opts.HTTPClientTLSKey != "" { cert, err := tls.LoadX509KeyPair(opts.HTTPClientTLSCert, opts.HTTPClientTLSKey) if err != nil { - n.logf(LOG_FATAL, "failed to LoadX509KeyPair %s, %s - %s", + return nil, fmt.Errorf("failed to LoadX509KeyPair %s, %s - %s", opts.HTTPClientTLSCert, opts.HTTPClientTLSKey, err) - os.Exit(1) } n.httpClientTLSConfig.Certificates = []tls.Certificate{cert} } @@ -94,30 +73,33 @@ func New(opts *Options) *NSQAdmin { tlsCertPool := x509.NewCertPool() caCertFile, err := ioutil.ReadFile(opts.HTTPClientTLSRootCAFile) if err != nil { - n.logf(LOG_FATAL, "failed to read TLS root CA file %s - %s", + return nil, fmt.Errorf("failed to read TLS root CA file %s - %s", opts.HTTPClientTLSRootCAFile, err) - os.Exit(1) } if !tlsCertPool.AppendCertsFromPEM(caCertFile) { - n.logf(LOG_FATAL, "failed to AppendCertsFromPEM %s", opts.HTTPClientTLSRootCAFile) - os.Exit(1) + return nil, fmt.Errorf("failed to AppendCertsFromPEM %s", opts.HTTPClientTLSRootCAFile) } n.httpClientTLSConfig.RootCAs = tlsCertPool } - // require that both the hostname and port be specified for _, address := range opts.NSQLookupdHTTPAddresses { - verifyAddress("--lookupd-http-address", address) + _, err := net.ResolveTCPAddr("tcp", address) + if err != nil { + return nil, fmt.Errorf("failed to resolve --lookupd-http-address (%s) - %s", address, err) + } } for _, address := range opts.NSQDHTTPAddresses { - verifyAddress("--nsqd-http-address", address) + _, err := net.ResolveTCPAddr("tcp", address) + if err != nil { + return nil, fmt.Errorf("failed to resolve --nsqd-http-address (%s) - %s", address, err) + } } if opts.ProxyGraphite { url, err := url.Parse(opts.GraphiteURL) if err != nil { - n.logf(LOG_FATAL, "failed to parse --graphite-url='%s' - %s", opts.GraphiteURL, err) + return nil, fmt.Errorf("failed to parse --graphite-url (%s) - %s", opts.GraphiteURL, err) os.Exit(1) } n.graphiteURL = url @@ -126,8 +108,7 @@ func New(opts *Options) *NSQAdmin { if opts.AllowConfigFromCIDR != "" { _, _, err := net.ParseCIDR(opts.AllowConfigFromCIDR) if err != nil { - n.logf(LOG_FATAL, "failed to parse --allow-config-from-cidr='%s' - %s", opts.AllowConfigFromCIDR, err) - os.Exit(1) + return nil, fmt.Errorf("failed to parse --allow-config-from-cidr (%s) - %s", opts.AllowConfigFromCIDR, err) } } @@ -135,7 +116,13 @@ func New(opts *Options) *NSQAdmin { n.logf(LOG_INFO, version.String("nsqadmin")) - return n + var err error + n.httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress) + if err != nil { + return nil, fmt.Errorf("listen (%s) failed - %s", n.getOpts().HTTPAddress, err) + } + + return n, nil } func normalizeBasePath(p string) string { @@ -180,22 +167,32 @@ func (n *NSQAdmin) handleAdminActions() { } } -func (n *NSQAdmin) Main() { - httpListener, err := net.Listen("tcp", n.getOpts().HTTPAddress) - if err != nil { - n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPAddress, err) - os.Exit(1) +func (n *NSQAdmin) Main() error { + exitCh := make(chan error) + var once sync.Once + exitFunc := func(err error) { + once.Do(func() { + if err != nil { + n.logf(LOG_FATAL, "%s", err) + } + exitCh <- err + }) } - n.httpListener = httpListener + httpServer := NewHTTPServer(&Context{n}) n.waitGroup.Wrap(func() { - http_api.Serve(n.httpListener, http_api.CompressHandler(httpServer), "HTTP", n.logf) + exitFunc(http_api.Serve(n.httpListener, http_api.CompressHandler(httpServer), "HTTP", n.logf)) }) n.waitGroup.Wrap(n.handleAdminActions) + + err := <-exitCh + return err } func (n *NSQAdmin) Exit() { - n.httpListener.Close() + if n.httpListener != nil { + n.httpListener.Close() + } close(n.notifications) n.waitGroup.Wait() } diff --git a/nsqadmin/nsqadmin_test.go b/nsqadmin/nsqadmin_test.go index c9a7ab891..fe19e2b1a 100644 --- a/nsqadmin/nsqadmin_test.go +++ b/nsqadmin/nsqadmin_test.go @@ -7,7 +7,6 @@ import ( "net/http" "net/url" "os" - "os/exec" "testing" "github.com/nsqio/nsq/internal/lg" @@ -16,41 +15,23 @@ import ( ) func TestNeitherNSQDAndNSQLookup(t *testing.T) { - if os.Getenv("BE_CRASHER") == "1" { - opts := NewOptions() - opts.Logger = lg.NilLogger{} - opts.HTTPAddress = "127.0.0.1:0" - New(opts) - return - } - cmd := exec.Command(os.Args[0], "-test.run=TestNeitherNSQDAndNSQLookup") - cmd.Env = append(os.Environ(), "BE_CRASHER=1") - err := cmd.Run() - test.Equal(t, "exit status 1", fmt.Sprintf("%v", err)) - if e, ok := err.(*exec.ExitError); ok && !e.Success() { - return - } - t.Fatalf("process ran with err %v, want exit status 1", err) + opts := NewOptions() + opts.Logger = lg.NilLogger{} + opts.HTTPAddress = "127.0.0.1:0" + _, err := New(opts) + test.NotNil(t, err) + test.Equal(t, "--nsqd-http-address or --lookupd-http-address required", fmt.Sprintf("%s", err)) } func TestBothNSQDAndNSQLookup(t *testing.T) { - if os.Getenv("BE_CRASHER") == "1" { - opts := NewOptions() - opts.Logger = lg.NilLogger{} - opts.HTTPAddress = "127.0.0.1:0" - opts.NSQLookupdHTTPAddresses = []string{"127.0.0.1:4161"} - opts.NSQDHTTPAddresses = []string{"127.0.0.1:4151"} - New(opts) - return - } - cmd := exec.Command(os.Args[0], "-test.run=TestBothNSQDAndNSQLookup") - cmd.Env = append(os.Environ(), "BE_CRASHER=1") - err := cmd.Run() - test.Equal(t, "exit status 1", fmt.Sprintf("%v", err)) - if e, ok := err.(*exec.ExitError); ok && !e.Success() { - return - } - t.Fatalf("process ran with err %v, want exit status 1", err) + opts := NewOptions() + opts.Logger = lg.NilLogger{} + opts.HTTPAddress = "127.0.0.1:0" + opts.NSQLookupdHTTPAddresses = []string{"127.0.0.1:4161"} + opts.NSQDHTTPAddresses = []string{"127.0.0.1:4151"} + _, err := New(opts) + test.NotNil(t, err) + test.Equal(t, "use --nsqd-http-address or --lookupd-http-address not both", fmt.Sprintf("%s", err)) } func TestTLSHTTPClient(t *testing.T) { @@ -73,8 +54,14 @@ func TestTLSHTTPClient(t *testing.T) { opts.HTTPClientTLSCert = "./test/client.pem" opts.HTTPClientTLSKey = "./test/client.key" opts.Logger = lgr - nsqadmin := New(opts) - nsqadmin.Main() + nsqadmin, err := New(opts) + test.Nil(t, err) + go func() { + err := nsqadmin.Main() + if err != nil { + panic(err) + } + }() defer nsqadmin.Exit() httpAddr := nsqadmin.RealHTTPAddr() @@ -85,7 +72,7 @@ func TestTLSHTTPClient(t *testing.T) { } resp, err := http.Get(u.String()) - test.Equal(t, nil, err) + test.Nil(t, err) defer resp.Body.Close() test.Equal(t, resp.StatusCode < 500, true) @@ -102,25 +89,15 @@ func mustStartNSQD(opts *nsqd.Options) (*net.TCPAddr, *net.TCPAddr, *nsqd.NSQD) } opts.DataPath = tmpDir } - nsqd := nsqd.New(opts) - nsqd.Main() - return nsqd.RealTCPAddr(), nsqd.RealHTTPAddr(), nsqd -} - -func TestCrashingLogger(t *testing.T) { - if os.Getenv("BE_CRASHER") == "1" { - // Test invalid log level causes error - opts := NewOptions() - opts.LogLevel = "bad" - opts.NSQLookupdHTTPAddresses = []string{"127.0.0.1:4161"} - _ = New(opts) - return - } - cmd := exec.Command(os.Args[0], "-test.run=TestCrashingLogger") - cmd.Env = append(os.Environ(), "BE_CRASHER=1") - err := cmd.Run() - if e, ok := err.(*exec.ExitError); ok && !e.Success() { - return + nsqd, err := nsqd.New(opts) + if err != nil { + panic(err) } - t.Fatalf("process ran with err %v, want exit status 1", err) + go func() { + err := nsqd.Main() + if err != nil { + panic(err) + } + }() + return nsqd.RealTCPAddr(), nsqd.RealHTTPAddr(), nsqd } diff --git a/nsqadmin/options.go b/nsqadmin/options.go index 50d00b62b..8100af881 100644 --- a/nsqadmin/options.go +++ b/nsqadmin/options.go @@ -7,11 +7,9 @@ import ( ) type Options struct { - LogLevel string `flag:"log-level"` - LogPrefix string `flag:"log-prefix"` - Verbose bool `flag:"verbose"` // for backwards compatibility + LogLevel lg.LogLevel `flag:"log-level"` + LogPrefix string `flag:"log-prefix"` Logger Logger - logLevel lg.LogLevel // private, not really an option HTTPAddress string `flag:"http-address"` BasePath string `flag:"base-path"` @@ -47,7 +45,7 @@ type Options struct { func NewOptions() *Options { return &Options{ LogPrefix: "[nsqadmin] ", - LogLevel: "info", + LogLevel: lg.INFO, HTTPAddress: "0.0.0.0:4171", BasePath: "/", StatsdPrefix: "nsq.%s", diff --git a/nsqd/channel.go b/nsqd/channel.go index b46138221..ad9c66ce9 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -97,7 +97,7 @@ func NewChannel(topicName string, channelName string, ctx *context, } else { dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) { opts := ctx.nsqd.getOpts() - lg.Logf(opts.Logger, opts.logLevel, lg.LogLevel(level), f, args...) + lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f, args...) } // backend names, for uniqueness, automatically include the topic... backendName := getBackendName(topicName, channelName) diff --git a/nsqd/http.go b/nsqd/http.go index 714b74eef..9914b093f 100644 --- a/nsqd/http.go +++ b/nsqd/http.go @@ -688,17 +688,11 @@ func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httpr } case "log_level": logLevelStr := string(body) - logLevel, err := lg.ParseLogLevel(logLevelStr, opts.Verbose) - if err != nil { - return nil, http_api.Err{400, "INVALID_VALUE"} - } - opts.LogLevel = logLevelStr - opts.logLevel = logLevel - case "verbose": - err := json.Unmarshal(body, &opts.Verbose) + logLevel, err := lg.ParseLogLevel(logLevelStr) if err != nil { return nil, http_api.Err{400, "INVALID_VALUE"} } + opts.LogLevel = logLevel default: return nil, http_api.Err{400, "INVALID_OPTION"} } diff --git a/nsqd/http_test.go b/nsqd/http_test.go index 09e89eb68..47e42f0bb 100644 --- a/nsqd/http_test.go +++ b/nsqd/http_test.go @@ -232,7 +232,7 @@ func TestHTTPpubDefer(t *testing.T) { func TestHTTPSRequire(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) - opts.LogLevel = "debug" + opts.LogLevel = LOG_DEBUG opts.TLSCert = "./test/certs/server.pem" opts.TLSKey = "./test/certs/server.key" opts.TLSClientAuthPolicy = "require" @@ -277,7 +277,7 @@ func TestHTTPSRequire(t *testing.T) { func TestHTTPSRequireVerify(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) - opts.LogLevel = "debug" + opts.LogLevel = LOG_DEBUG opts.TLSCert = "./test/certs/server.pem" opts.TLSKey = "./test/certs/server.key" opts.TLSRootCAFile = "./test/certs/ca.pem" @@ -341,7 +341,7 @@ func TestHTTPSRequireVerify(t *testing.T) { func TestTLSRequireVerifyExceptHTTP(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) - opts.LogLevel = "debug" + opts.LogLevel = LOG_DEBUG opts.TLSCert = "./test/certs/server.pem" opts.TLSKey = "./test/certs/server.key" opts.TLSRootCAFile = "./test/certs/ca.pem" @@ -633,7 +633,7 @@ func TestHTTPconfig(t *testing.T) { defer resp.Body.Close() body, _ = ioutil.ReadAll(resp.Body) test.Equal(t, 200, resp.StatusCode) - test.Equal(t, LOG_FATAL, nsqd.getOpts().logLevel) + test.Equal(t, LOG_FATAL, nsqd.getOpts().LogLevel) url = fmt.Sprintf("http://%s/config/log_level", httpAddr) req, err = http.NewRequest("PUT", url, bytes.NewBuffer([]byte(`bad`))) @@ -643,25 +643,6 @@ func TestHTTPconfig(t *testing.T) { defer resp.Body.Close() body, _ = ioutil.ReadAll(resp.Body) test.Equal(t, 400, resp.StatusCode) - - url = fmt.Sprintf("http://%s/config/verbose", httpAddr) - req, err = http.NewRequest("PUT", url, bytes.NewBuffer([]byte(`true`))) - test.Nil(t, err) - resp, err = client.Do(req) - test.Nil(t, err) - defer resp.Body.Close() - body, _ = ioutil.ReadAll(resp.Body) - test.Equal(t, 200, resp.StatusCode) - test.Equal(t, true, nsqd.getOpts().Verbose) - - url = fmt.Sprintf("http://%s/config/verbose", httpAddr) - req, err = http.NewRequest("PUT", url, bytes.NewBuffer([]byte(`bad`))) - test.Nil(t, err) - resp, err = client.Do(req) - test.Nil(t, err) - defer resp.Body.Close() - body, _ = ioutil.ReadAll(resp.Body) - test.Equal(t, 400, resp.StatusCode) } func TestHTTPerrors(t *testing.T) { diff --git a/nsqd/logger.go b/nsqd/logger.go index 51ac28db7..a850ae44b 100644 --- a/nsqd/logger.go +++ b/nsqd/logger.go @@ -16,5 +16,5 @@ const ( func (n *NSQD) logf(level lg.LogLevel, f string, args ...interface{}) { opts := n.getOpts() - lg.Logf(opts.Logger, opts.logLevel, level, f, args...) + lg.Logf(opts.Logger, opts.LogLevel, level, f, args...) } diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index f663a5d0a..285ae8713 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -20,7 +20,6 @@ import ( "github.com/nsqio/nsq/internal/clusterinfo" "github.com/nsqio/nsq/internal/dirlock" "github.com/nsqio/nsq/internal/http_api" - "github.com/nsqio/nsq/internal/lg" "github.com/nsqio/nsq/internal/protocol" "github.com/nsqio/nsq/internal/statsd" "github.com/nsqio/nsq/internal/util" @@ -77,7 +76,9 @@ type NSQD struct { ci *clusterinfo.ClusterInfo } -func New(opts *Options) *NSQD { +func New(opts *Options) (*NSQD, error) { + var err error + dataPath := opts.DataPath if opts.DataPath == "" { cwd, _ := os.Getwd() @@ -104,35 +105,24 @@ func New(opts *Options) *NSQD { n.swapOpts(opts) n.errValue.Store(errStore{}) - var err error - opts.logLevel, err = lg.ParseLogLevel(opts.LogLevel, opts.Verbose) - if err != nil { - n.logf(LOG_FATAL, "%s", err) - os.Exit(1) - } - err = n.dl.Lock() if err != nil { - n.logf(LOG_FATAL, "--data-path=%s in use (possibly by another instance of nsqd)", dataPath) - os.Exit(1) + return nil, fmt.Errorf("--data-path=%s in use (possibly by another instance of nsqd)", dataPath) } if opts.MaxDeflateLevel < 1 || opts.MaxDeflateLevel > 9 { - n.logf(LOG_FATAL, "--max-deflate-level must be [1,9]") - os.Exit(1) + return nil, errors.New("--max-deflate-level must be [1,9]") } if opts.ID < 0 || opts.ID >= 1024 { - n.logf(LOG_FATAL, "--node-id must be [0,1024)") - os.Exit(1) + return nil, errors.New("--node-id must be [0,1024)") } if opts.StatsdPrefix != "" { var port string _, port, err = net.SplitHostPort(opts.HTTPAddress) if err != nil { - n.logf(LOG_FATAL, "failed to parse HTTP address (%s) - %s", opts.HTTPAddress, err) - os.Exit(1) + return nil, fmt.Errorf("failed to parse HTTP address (%s) - %s", opts.HTTPAddress, err) } statsdHostKey := statsd.HostKey(net.JoinHostPort(opts.BroadcastAddress, port)) prefixWithHost := strings.Replace(opts.StatsdPrefix, "%s", statsdHostKey, -1) @@ -148,26 +138,38 @@ func New(opts *Options) *NSQD { tlsConfig, err := buildTLSConfig(opts) if err != nil { - n.logf(LOG_FATAL, "failed to build TLS config - %s", err) - os.Exit(1) + return nil, fmt.Errorf("failed to build TLS config - %s", err) } if tlsConfig == nil && opts.TLSRequired != TLSNotRequired { - n.logf(LOG_FATAL, "cannot require TLS client connections without TLS key and cert") - os.Exit(1) + return nil, errors.New("cannot require TLS client connections without TLS key and cert") } n.tlsConfig = tlsConfig for _, v := range opts.E2EProcessingLatencyPercentiles { if v <= 0 || v > 1 { - n.logf(LOG_FATAL, "Invalid percentile: %v", v) - os.Exit(1) + return nil, fmt.Errorf("invalid E2E processing latency percentile: %v", v) } } n.logf(LOG_INFO, version.String("nsqd")) n.logf(LOG_INFO, "ID: %d", opts.ID) - return n + n.tcpListener, err = net.Listen("tcp", opts.TCPAddress) + if err != nil { + return nil, fmt.Errorf("listen (%s) failed - %s", opts.TCPAddress, err) + } + n.httpListener, err = net.Listen("tcp", opts.HTTPAddress) + if err != nil { + return nil, fmt.Errorf("listen (%s) failed - %s", opts.HTTPAddress, err) + } + if n.tlsConfig != nil && opts.HTTPSAddress != "" { + n.httpsListener, err = tls.Listen("tcp", opts.HTTPSAddress, n.tlsConfig) + if err != nil { + return nil, fmt.Errorf("listen (%s) failed - %s", opts.HTTPSAddress, err) + } + } + + return n, nil } func (n *NSQD) getOpts() *Options { @@ -239,40 +241,32 @@ func (n *NSQD) RemoveClient(clientID int64) { n.clientLock.Unlock() } -func (n *NSQD) Main() { - var err error +func (n *NSQD) Main() error { ctx := &context{n} - n.tcpListener, err = net.Listen("tcp", n.getOpts().TCPAddress) - if err != nil { - n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().TCPAddress, err) - os.Exit(1) - } - n.httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress) - if err != nil { - n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPAddress, err) - os.Exit(1) - } - if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" { - n.httpsListener, err = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig) - if err != nil { - n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPSAddress, err) - os.Exit(1) - } + exitCh := make(chan error) + var once sync.Once + exitFunc := func(err error) { + once.Do(func() { + if err != nil { + n.logf(LOG_FATAL, "%s", err) + } + exitCh <- err + }) } tcpServer := &tcpServer{ctx: ctx} n.waitGroup.Wrap(func() { - protocol.TCPServer(n.tcpListener, tcpServer, n.logf) + exitFunc(protocol.TCPServer(n.tcpListener, tcpServer, n.logf)) }) httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired) n.waitGroup.Wrap(func() { - http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf) + exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf)) }) if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" { httpsServer := newHTTPServer(ctx, true, true) n.waitGroup.Wrap(func() { - http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf) + exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf)) }) } @@ -281,6 +275,9 @@ func (n *NSQD) Main() { if n.getOpts().StatsdAddress != "" { n.waitGroup.Wrap(n.statsdLoop) } + + err := <-exitCh + return err } type meta struct { diff --git a/nsqd/nsqd_test.go b/nsqd/nsqd_test.go index 9bacd0e6e..147280fa7 100644 --- a/nsqd/nsqd_test.go +++ b/nsqd/nsqd_test.go @@ -7,7 +7,6 @@ import ( "io/ioutil" "net" "os" - "os/exec" "strconv" "sync/atomic" "testing" @@ -244,8 +243,16 @@ func TestPauseMetadata(t *testing.T) { func mustStartNSQLookupd(opts *nsqlookupd.Options) (*net.TCPAddr, *net.TCPAddr, *nsqlookupd.NSQLookupd) { opts.TCPAddress = "127.0.0.1:0" opts.HTTPAddress = "127.0.0.1:0" - lookupd := nsqlookupd.New(opts) - lookupd.Main() + lookupd, err := nsqlookupd.New(opts) + if err != nil { + panic(err) + } + go func() { + err := lookupd.Main() + if err != nil { + panic(err) + } + }() return lookupd.RealTCPAddr(), lookupd.RealHTTPAddr(), lookupd } @@ -411,14 +418,15 @@ func TestCluster(t *testing.T) { func TestSetHealth(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) - nsqd := New(opts) + nsqd, err := New(opts) + test.Nil(t, err) defer nsqd.Exit() - test.Equal(t, nil, nsqd.GetError()) + test.Nil(t, nsqd.GetError()) test.Equal(t, true, nsqd.IsHealthy()) nsqd.SetHealth(nil) - test.Equal(t, nil, nsqd.GetError()) + test.Nil(t, nsqd.GetError()) test.Equal(t, true, nsqd.IsHealthy()) nsqd.SetHealth(errors.New("health error")) @@ -431,20 +439,3 @@ func TestSetHealth(t *testing.T) { test.Equal(t, "OK", nsqd.GetHealth()) test.Equal(t, true, nsqd.IsHealthy()) } - -func TestCrashingLogger(t *testing.T) { - if os.Getenv("BE_CRASHER") == "1" { - // Test invalid log level causes error - opts := NewOptions() - opts.LogLevel = "bad" - _ = New(opts) - return - } - cmd := exec.Command(os.Args[0], "-test.run=TestCrashingLogger") - cmd.Env = append(os.Environ(), "BE_CRASHER=1") - err := cmd.Run() - if e, ok := err.(*exec.ExitError); ok && !e.Success() { - return - } - t.Fatalf("process ran with err %v, want exit status 1", err) -} diff --git a/nsqd/options.go b/nsqd/options.go index 300e1af5a..c55c6d80e 100644 --- a/nsqd/options.go +++ b/nsqd/options.go @@ -14,12 +14,10 @@ import ( type Options struct { // basic options - ID int64 `flag:"node-id" cfg:"id"` - LogLevel string `flag:"log-level"` - LogPrefix string `flag:"log-prefix"` - Verbose bool `flag:"verbose"` // for backwards compatibility + ID int64 `flag:"node-id" cfg:"id"` + LogLevel lg.LogLevel `flag:"log-level"` + LogPrefix string `flag:"log-prefix"` Logger Logger - logLevel lg.LogLevel // private, not really an option TCPAddress string `flag:"tcp-address"` HTTPAddress string `flag:"http-address"` @@ -98,7 +96,7 @@ func NewOptions() *Options { return &Options{ ID: defaultID, LogPrefix: "[nsqd] ", - LogLevel: "info", + LogLevel: lg.INFO, TCPAddress: "0.0.0.0:4150", HTTPAddress: "0.0.0.0:4151", diff --git a/nsqd/protocol_v2_test.go b/nsqd/protocol_v2_test.go index 20e31ed71..e8f7d1ff6 100644 --- a/nsqd/protocol_v2_test.go +++ b/nsqd/protocol_v2_test.go @@ -41,8 +41,16 @@ func mustStartNSQD(opts *Options) (*net.TCPAddr, *net.TCPAddr, *NSQD) { } opts.DataPath = tmpDir } - nsqd := New(opts) - nsqd.Main() + nsqd, err := New(opts) + if err != nil { + panic(err) + } + go func() { + err := nsqd.Main() + if err != nil { + panic(err) + } + }() return nsqd.RealTCPAddr(), nsqd.RealHTTPAddr(), nsqd } @@ -208,7 +216,7 @@ func TestClientTimeout(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) opts.ClientTimeout = 150 * time.Millisecond - opts.LogLevel = "debug" + opts.LogLevel = LOG_DEBUG tcpAddr, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) defer nsqd.Exit() @@ -281,7 +289,7 @@ func TestClientHeartbeatDisableSUB(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) opts.ClientTimeout = 200 * time.Millisecond - opts.LogLevel = "debug" + opts.LogLevel = LOG_DEBUG tcpAddr, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) defer nsqd.Exit() @@ -441,7 +449,7 @@ func TestEmptyCommand(t *testing.T) { func TestSizeLimits(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) - opts.LogLevel = "debug" + opts.LogLevel = LOG_DEBUG opts.MaxMsgSize = 100 opts.MaxBodySize = 1000 tcpAddr, _, nsqd := mustStartNSQD(opts) @@ -558,7 +566,7 @@ func TestSizeLimits(t *testing.T) { func TestDPUB(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) - opts.LogLevel = "debug" + opts.LogLevel = LOG_DEBUG tcpAddr, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) defer nsqd.Exit() @@ -601,7 +609,7 @@ func TestDPUB(t *testing.T) { func TestTouch(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) - opts.LogLevel = "debug" + opts.LogLevel = LOG_DEBUG opts.MsgTimeout = 150 * time.Millisecond tcpAddr, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -647,7 +655,7 @@ func TestTouch(t *testing.T) { func TestMaxRdyCount(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) - opts.LogLevel = "debug" + opts.LogLevel = LOG_DEBUG opts.MaxRdyCount = 50 tcpAddr, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -719,7 +727,7 @@ func TestFatalError(t *testing.T) { func TestOutputBuffering(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) - opts.LogLevel = "debug" + opts.LogLevel = LOG_DEBUG opts.MaxOutputBufferSize = 512 * 1024 opts.MaxOutputBufferTimeout = time.Second tcpAddr, _, nsqd := mustStartNSQD(opts) @@ -771,7 +779,7 @@ func TestOutputBuffering(t *testing.T) { func TestOutputBufferingValidity(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) - opts.LogLevel = "debug" + opts.LogLevel = LOG_DEBUG opts.MaxOutputBufferSize = 512 * 1024 opts.MaxOutputBufferTimeout = time.Second tcpAddr, _, nsqd := mustStartNSQD(opts) @@ -814,7 +822,7 @@ func TestOutputBufferingValidity(t *testing.T) { func TestTLS(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) - opts.LogLevel = "debug" + opts.LogLevel = LOG_DEBUG opts.TLSCert = "./test/certs/server.pem" opts.TLSKey = "./test/certs/server.key" tcpAddr, _, nsqd := mustStartNSQD(opts) @@ -853,7 +861,7 @@ func TestTLS(t *testing.T) { func TestTLSRequired(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) - opts.LogLevel = "debug" + opts.LogLevel = LOG_DEBUG opts.TLSCert = "./test/certs/server.pem" opts.TLSKey = "./test/certs/server.key" opts.TLSRequired = TLSRequiredExceptHTTP @@ -902,7 +910,7 @@ func TestTLSRequired(t *testing.T) { func TestTLSAuthRequire(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) - opts.LogLevel = "debug" + opts.LogLevel = LOG_DEBUG opts.TLSCert = "./test/certs/server.pem" opts.TLSKey = "./test/certs/server.key" opts.TLSClientAuthPolicy = "require" @@ -968,7 +976,7 @@ func TestTLSAuthRequire(t *testing.T) { func TestTLSAuthRequireVerify(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) - opts.LogLevel = "debug" + opts.LogLevel = LOG_DEBUG opts.TLSCert = "./test/certs/server.pem" opts.TLSKey = "./test/certs/server.key" opts.TLSRootCAFile = "./test/certs/ca.pem" @@ -1057,7 +1065,7 @@ func TestTLSAuthRequireVerify(t *testing.T) { func TestDeflate(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) - opts.LogLevel = "debug" + opts.LogLevel = LOG_DEBUG opts.DeflateEnabled = true tcpAddr, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -1093,7 +1101,7 @@ type readWriter struct { func TestSnappy(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) - opts.LogLevel = "debug" + opts.LogLevel = LOG_DEBUG opts.SnappyEnabled = true tcpAddr, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -1146,7 +1154,7 @@ func TestSnappy(t *testing.T) { func TestTLSDeflate(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) - opts.LogLevel = "debug" + opts.LogLevel = LOG_DEBUG opts.DeflateEnabled = true opts.TLSCert = "./test/certs/cert.pem" opts.TLSKey = "./test/certs/key.pem" @@ -1203,7 +1211,7 @@ func TestSampling(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) - opts.LogLevel = "debug" + opts.LogLevel = LOG_DEBUG opts.MaxRdyCount = int64(num) tcpAddr, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -1270,7 +1278,7 @@ func TestSampling(t *testing.T) { func TestTLSSnappy(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) - opts.LogLevel = "debug" + opts.LogLevel = LOG_DEBUG opts.SnappyEnabled = true opts.TLSCert = "./test/certs/cert.pem" opts.TLSKey = "./test/certs/key.pem" @@ -1321,7 +1329,7 @@ func TestTLSSnappy(t *testing.T) { func TestClientMsgTimeout(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) - opts.LogLevel = "debug" + opts.LogLevel = LOG_DEBUG opts.QueueScanRefreshInterval = 100 * time.Millisecond tcpAddr, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -1380,7 +1388,7 @@ func TestClientMsgTimeout(t *testing.T) { func TestBadFin(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) - opts.LogLevel = "debug" + opts.LogLevel = LOG_DEBUG tcpAddr, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) defer nsqd.Exit() @@ -1406,7 +1414,7 @@ func TestBadFin(t *testing.T) { func TestReqTimeoutRange(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) - opts.LogLevel = "debug" + opts.LogLevel = LOG_DEBUG opts.MaxReqTimeout = 1 * time.Minute tcpAddr, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -1511,7 +1519,7 @@ func runAuthTest(t *testing.T, authResponse string, authSecret string, authError opts := NewOptions() opts.Logger = test.NewTestLogger(t) - opts.LogLevel = "debug" + opts.LogLevel = LOG_DEBUG opts.AuthHTTPAddresses = []string{addr.Host} if tlsEnabled { opts.TLSCert = "./test/certs/server.pem" @@ -1593,13 +1601,14 @@ func testIOLoopReturnsClientErr(t *testing.T, fakeConn test.FakeNetConn) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) - opts.LogLevel = "debug" + opts.LogLevel = LOG_DEBUG - prot := &protocolV2{ctx: &context{nsqd: New(opts)}} + nsqd, err := New(opts) + test.Nil(t, err) + prot := &protocolV2{ctx: &context{nsqd: nsqd}} defer prot.ctx.nsqd.Exit() - err := prot.IOLoop(fakeConn) - + err = prot.IOLoop(fakeConn) test.NotNil(t, err) test.Equal(t, "E_INVALID invalid command INVALID_COMMAND", err.Error()) test.NotNil(t, err.(*protocol.FatalClientErr)) @@ -1609,7 +1618,7 @@ func BenchmarkProtocolV2Exec(b *testing.B) { b.StopTimer() opts := NewOptions() opts.Logger = test.NewTestLogger(b) - nsqd := New(opts) + nsqd, _ := New(opts) ctx := &context{nsqd} p := &protocolV2{ctx} c := newClientV2(0, nil, ctx) diff --git a/nsqd/stats_test.go b/nsqd/stats_test.go index 081ade097..7ec8a461f 100644 --- a/nsqd/stats_test.go +++ b/nsqd/stats_test.go @@ -70,7 +70,7 @@ func TestClientAttributes(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) - opts.LogLevel = "debug" + opts.LogLevel = LOG_DEBUG opts.SnappyEnabled = true tcpAddr, httpAddr, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) diff --git a/nsqd/topic.go b/nsqd/topic.go index e41be2b0c..c9884fe22 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -64,7 +64,7 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi } else { dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) { opts := ctx.nsqd.getOpts() - lg.Logf(opts.Logger, opts.logLevel, lg.LogLevel(level), f, args...) + lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f, args...) } t.backend = diskqueue.New( topicName, diff --git a/nsqlookupd/http_test.go b/nsqlookupd/http_test.go index d2250cb69..ef704a220 100644 --- a/nsqlookupd/http_test.go +++ b/nsqlookupd/http_test.go @@ -35,8 +35,16 @@ func bootstrapNSQCluster(t *testing.T) (string, []*nsqd.NSQD, *NSQLookupd) { nsqlookupdOpts.HTTPAddress = "127.0.0.1:0" nsqlookupdOpts.BroadcastAddress = "127.0.0.1" nsqlookupdOpts.Logger = lgr - nsqlookupd1 := New(nsqlookupdOpts) - nsqlookupd1.Main() + nsqlookupd1, err := New(nsqlookupdOpts) + if err != nil { + panic(err) + } + go func() { + err := nsqlookupd1.Main() + if err != nil { + panic(err) + } + }() time.Sleep(100 * time.Millisecond) @@ -51,8 +59,16 @@ func bootstrapNSQCluster(t *testing.T) (string, []*nsqd.NSQD, *NSQLookupd) { panic(err) } nsqdOpts.DataPath = tmpDir - nsqd1 := nsqd.New(nsqdOpts) - nsqd1.Main() + nsqd1, err := nsqd.New(nsqdOpts) + if err != nil { + panic(err) + } + go func() { + err := nsqd1.Main() + if err != nil { + panic(err) + } + }() time.Sleep(100 * time.Millisecond) diff --git a/nsqlookupd/logger.go b/nsqlookupd/logger.go index f3e91a8c3..05f708884 100644 --- a/nsqlookupd/logger.go +++ b/nsqlookupd/logger.go @@ -15,5 +15,5 @@ const ( ) func (n *NSQLookupd) logf(level lg.LogLevel, f string, args ...interface{}) { - lg.Logf(n.opts.Logger, n.opts.logLevel, level, f, args...) + lg.Logf(n.opts.Logger, n.opts.LogLevel, level, f, args...) } diff --git a/nsqlookupd/lookup_protocol_v1_test.go b/nsqlookupd/lookup_protocol_v1_test.go index 966c6e540..5ff524bd9 100644 --- a/nsqlookupd/lookup_protocol_v1_test.go +++ b/nsqlookupd/lookup_protocol_v1_test.go @@ -34,9 +34,11 @@ func testIOLoopReturnsClientErr(t *testing.T, fakeConn test.FakeNetConn) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) - opts.LogLevel = "debug" + opts.LogLevel = LOG_DEBUG - prot := &LookupProtocolV1{ctx: &Context{nsqlookupd: New(opts)}} + nsqlookupd, err := New(opts) + test.Nil(t, err) + prot := &LookupProtocolV1{ctx: &Context{nsqlookupd: nsqlookupd}} errChan := make(chan error) testIOLoop := func() { @@ -45,7 +47,6 @@ func testIOLoopReturnsClientErr(t *testing.T, fakeConn test.FakeNetConn) { } go testIOLoop() - var err error var timeout bool select { diff --git a/nsqlookupd/nsqlookupd.go b/nsqlookupd/nsqlookupd.go index d2dcd0928..11dc3b547 100644 --- a/nsqlookupd/nsqlookupd.go +++ b/nsqlookupd/nsqlookupd.go @@ -8,7 +8,6 @@ import ( "sync" "github.com/nsqio/nsq/internal/http_api" - "github.com/nsqio/nsq/internal/lg" "github.com/nsqio/nsq/internal/protocol" "github.com/nsqio/nsq/internal/util" "github.com/nsqio/nsq/internal/version" @@ -23,24 +22,29 @@ type NSQLookupd struct { DB *RegistrationDB } -func New(opts *Options) *NSQLookupd { +func New(opts *Options) (*NSQLookupd, error) { + var err error + if opts.Logger == nil { opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds) } - n := &NSQLookupd{ + l := &NSQLookupd{ opts: opts, DB: NewRegistrationDB(), } - var err error - opts.logLevel, err = lg.ParseLogLevel(opts.LogLevel, opts.Verbose) + l.logf(LOG_INFO, version.String("nsqlookupd")) + + l.tcpListener, err = net.Listen("tcp", opts.TCPAddress) + if err != nil { + return nil, fmt.Errorf("listen (%s) failed - %s", opts.TCPAddress, err) + } + l.httpListener, err = net.Listen("tcp", opts.HTTPAddress) if err != nil { - n.logf(LOG_FATAL, "%s", err) - os.Exit(1) + return nil, fmt.Errorf("listen (%s) failed - %s", opts.TCPAddress, err) } - n.logf(LOG_INFO, version.String("nsqlookupd")) - return n + return l, nil } // Main starts an instance of nsqlookupd and returns an @@ -48,28 +52,28 @@ func New(opts *Options) *NSQLookupd { func (l *NSQLookupd) Main() error { ctx := &Context{l} - tcpListener, err := net.Listen("tcp", l.opts.TCPAddress) - if err != nil { - return fmt.Errorf("listen (%s) failed - %s", l.opts.TCPAddress, err) + exitCh := make(chan error) + var once sync.Once + exitFunc := func(err error) { + once.Do(func() { + if err != nil { + l.logf(LOG_FATAL, "%s", err) + } + exitCh <- err + }) } - httpListener, err := net.Listen("tcp", l.opts.HTTPAddress) - if err != nil { - return fmt.Errorf("listen (%s) failed - %s", l.opts.TCPAddress, err) - } - - l.tcpListener = tcpListener - l.httpListener = httpListener tcpServer := &tcpServer{ctx: ctx} l.waitGroup.Wrap(func() { - protocol.TCPServer(tcpListener, tcpServer, l.logf) + exitFunc(protocol.TCPServer(l.tcpListener, tcpServer, l.logf)) }) httpServer := newHTTPServer(ctx) l.waitGroup.Wrap(func() { - http_api.Serve(httpListener, httpServer, "HTTP", l.logf) + exitFunc(http_api.Serve(l.httpListener, httpServer, "HTTP", l.logf)) }) - return nil + err := <-exitCh + return err } func (l *NSQLookupd) RealTCPAddr() *net.TCPAddr { diff --git a/nsqlookupd/nsqlookupd_test.go b/nsqlookupd/nsqlookupd_test.go index 19f2f2b2c..eb66123e8 100644 --- a/nsqlookupd/nsqlookupd_test.go +++ b/nsqlookupd/nsqlookupd_test.go @@ -3,8 +3,6 @@ package nsqlookupd import ( "fmt" "net" - "os" - "os/exec" "testing" "time" @@ -39,8 +37,16 @@ type LookupDoc struct { func mustStartLookupd(opts *Options) (*net.TCPAddr, *net.TCPAddr, *NSQLookupd) { opts.TCPAddress = "127.0.0.1:0" opts.HTTPAddress = "127.0.0.1:0" - nsqlookupd := New(opts) - nsqlookupd.Main() + nsqlookupd, err := New(opts) + if err != nil { + panic(err) + } + go func() { + err := nsqlookupd.Main() + if err != nil { + panic(err) + } + }() return nsqlookupd.RealTCPAddr(), nsqlookupd.RealHTTPAddr(), nsqlookupd } @@ -351,20 +357,3 @@ func TestTombstonedNodes(t *testing.T) { test.Equal(t, topicName, producers[0].Topics[0].Topic) test.Equal(t, true, producers[0].Topics[0].Tombstoned) } - -func TestCrashingLogger(t *testing.T) { - if os.Getenv("BE_CRASHER") == "1" { - // Test invalid log level causes error - opts := NewOptions() - opts.LogLevel = "bad" - _ = New(opts) - return - } - cmd := exec.Command(os.Args[0], "-test.run=TestCrashingLogger") - cmd.Env = append(os.Environ(), "BE_CRASHER=1") - err := cmd.Run() - if e, ok := err.(*exec.ExitError); ok && !e.Success() { - return - } - t.Fatalf("process ran with err %v, want exit status 1", err) -} diff --git a/nsqlookupd/options.go b/nsqlookupd/options.go index 6258272ac..20800b611 100644 --- a/nsqlookupd/options.go +++ b/nsqlookupd/options.go @@ -9,11 +9,9 @@ import ( ) type Options struct { - LogLevel string `flag:"log-level"` - LogPrefix string `flag:"log-prefix"` - Verbose bool `flag:"verbose"` // for backwards compatibility + LogLevel lg.LogLevel `flag:"log-level"` + LogPrefix string `flag:"log-prefix"` Logger Logger - logLevel lg.LogLevel // private, not really an option TCPAddress string `flag:"tcp-address"` HTTPAddress string `flag:"http-address"` @@ -31,7 +29,7 @@ func NewOptions() *Options { return &Options{ LogPrefix: "[nsqlookupd] ", - LogLevel: "info", + LogLevel: lg.INFO, TCPAddress: "0.0.0.0:4160", HTTPAddress: "0.0.0.0:4161", BroadcastAddress: hostname,