diff --git a/apps/nsqd/nsqd.go b/apps/nsqd/nsqd.go index 60bc26b6f..b2606a2ca 100644 --- a/apps/nsqd/nsqd.go +++ b/apps/nsqd/nsqd.go @@ -80,7 +80,8 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet { flagSet.Bool("version", false, "print version string") flagSet.Bool("verbose", false, "enable verbose logging") flagSet.String("config", "", "path to config file") - flagSet.Int64("worker-id", opts.ID, "unique seed for message ID generation (int) in range [0,4096) (will default to a hash of hostname)") + flagSet.Int64("node-id", opts.ID, "unique part for message IDs, (int) in range [0,1024) (default is hash of hostname)") + flagSet.Bool("worker-id", false, "do NOT use this, use --node-id") flagSet.String("https-address", opts.HTTPSAddress, ": to listen on for HTTPS clients") flagSet.String("http-address", opts.HTTPAddress, ": to listen on for HTTP clients") @@ -217,8 +218,11 @@ func (p *program) Start() error { options.Resolve(opts, flagSet, cfg) nsqd := nsqd.New(opts) - nsqd.LoadMetadata() - err := nsqd.PersistMetadata() + err := nsqd.LoadMetadata() + if err != nil { + log.Fatalf("ERROR: %s", err.Error()) + } + err = nsqd.PersistMetadata() if err != nil { log.Fatalf("ERROR: failed to persist metadata - %s", err.Error()) } diff --git a/nsqadmin/http_test.go b/nsqadmin/http_test.go index de5ffc9f4..b45e22fa3 100644 --- a/nsqadmin/http_test.go +++ b/nsqadmin/http_test.go @@ -73,7 +73,7 @@ func bootstrapNSQCluster(t *testing.T) (string, []*nsqd.NSQD, []*nsqlookupd.NSQL nsqdOpts.BroadcastAddress = "127.0.0.1" nsqdOpts.NSQLookupdTCPAddresses = []string{nsqlookupd1.RealTCPAddr().String()} nsqdOpts.Logger = lgr - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + tmpDir, err := ioutil.TempDir("", "nsq-test-") if err != nil { panic(err) } diff --git a/nsqadmin/nsqadmin_test.go b/nsqadmin/nsqadmin_test.go index e77d51ef2..8de362d60 100644 --- a/nsqadmin/nsqadmin_test.go +++ b/nsqadmin/nsqadmin_test.go @@ -9,7 +9,6 @@ import ( "os" "os/exec" "testing" - "time" "github.com/nsqio/nsq/internal/test" "github.com/nsqio/nsq/nsqd" @@ -103,7 +102,7 @@ func mustStartNSQD(opts *nsqd.Options) (*net.TCPAddr, *net.TCPAddr, *nsqd.NSQD) opts.HTTPAddress = "127.0.0.1:0" opts.HTTPSAddress = "127.0.0.1:0" if opts.DataPath == "" { - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + tmpDir, err := ioutil.TempDir("", "nsq-test-") if err != nil { panic(err) } diff --git a/nsqd/diskqueue.go b/nsqd/diskqueue.go index 01e7e9d3b..beaeefe95 100644 --- a/nsqd/diskqueue.go +++ b/nsqd/diskqueue.go @@ -450,7 +450,7 @@ func (d *diskQueue) persistMetaData() error { f.Close() // atomically rename - return atomicRename(tmpFileName, fileName) + return os.Rename(tmpFileName, fileName) } func (d *diskQueue) metaDataFileName() string { @@ -538,14 +538,12 @@ func (d *diskQueue) handleReadError() { badFn := d.fileName(d.readFileNum) badRenameFn := badFn + ".bad" - d.logf( - "NOTICE: diskqueue(%s) jump to next file and saving bad file as %s", + d.logf("NOTICE: diskqueue(%s) jump to next file and saving bad file as %s", d.name, badRenameFn) - err := atomicRename(badFn, badRenameFn) + err := os.Rename(badFn, badRenameFn) if err != nil { - d.logf( - "ERROR: diskqueue(%s) failed to rename bad diskqueue file %s to %s", + d.logf("ERROR: diskqueue(%s) failed to rename bad diskqueue file %s to %s", d.name, badFn, badRenameFn) } diff --git a/nsqd/diskqueue_test.go b/nsqd/diskqueue_test.go index f0aead160..ae4076896 100644 --- a/nsqd/diskqueue_test.go +++ b/nsqd/diskqueue_test.go @@ -20,7 +20,7 @@ func TestDiskQueue(t *testing.T) { l := test.NewTestLogger(t) dqName := "test_disk_queue" + strconv.Itoa(int(time.Now().Unix())) - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + tmpDir, err := ioutil.TempDir("", "nsq-test-") if err != nil { panic(err) } @@ -42,7 +42,7 @@ func TestDiskQueue(t *testing.T) { func TestDiskQueueRoll(t *testing.T) { l := test.NewTestLogger(t) dqName := "test_disk_queue_roll" + strconv.Itoa(int(time.Now().Unix())) - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + tmpDir, err := ioutil.TempDir("", "nsq-test-") if err != nil { panic(err) } @@ -73,7 +73,7 @@ func assertFileNotExist(t *testing.T, fn string) { func TestDiskQueueEmpty(t *testing.T) { l := test.NewTestLogger(t) dqName := "test_disk_queue_empty" + strconv.Itoa(int(time.Now().Unix())) - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + tmpDir, err := ioutil.TempDir("", "nsq-test-") if err != nil { panic(err) } @@ -141,7 +141,7 @@ func TestDiskQueueEmpty(t *testing.T) { func TestDiskQueueCorruption(t *testing.T) { l := test.NewTestLogger(t) dqName := "test_disk_queue_corruption" + strconv.Itoa(int(time.Now().Unix())) - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + tmpDir, err := ioutil.TempDir("", "nsq-test-") if err != nil { panic(err) } @@ -219,7 +219,7 @@ func readMetaDataFile(fileName string, retried int) md { func TestDiskQueueSyncAfterRead(t *testing.T) { l := test.NewTestLogger(t) dqName := "test_disk_queue_read_after_sync" + strconv.Itoa(int(time.Now().Unix())) - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + tmpDir, err := ioutil.TempDir("", "nsq-test-") if err != nil { panic(err) } @@ -270,7 +270,7 @@ func TestDiskQueueTorture(t *testing.T) { l := test.NewTestLogger(t) dqName := "test_disk_queue_torture" + strconv.Itoa(int(time.Now().Unix())) - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + tmpDir, err := ioutil.TempDir("", "nsq-test-") if err != nil { panic(err) } @@ -385,7 +385,7 @@ func benchmarkDiskQueuePut(size int64, b *testing.B) { b.StopTimer() l := test.NewTestLogger(b) dqName := "bench_disk_queue_put" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix())) - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + tmpDir, err := ioutil.TempDir("", "nsq-test-") if err != nil { panic(err) } @@ -434,7 +434,7 @@ func BenchmarkDiskWrite1048576(b *testing.B) { func benchmarkDiskWrite(size int64, b *testing.B) { b.StopTimer() fileName := "bench_disk_queue_put" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix())) - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + tmpDir, err := ioutil.TempDir("", "nsq-test-") if err != nil { panic(err) } @@ -480,7 +480,7 @@ func BenchmarkDiskWriteBuffered1048576(b *testing.B) { func benchmarkDiskWriteBuffered(size int64, b *testing.B) { b.StopTimer() fileName := "bench_disk_queue_put" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix())) - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + tmpDir, err := ioutil.TempDir("", "nsq-test-") if err != nil { panic(err) } @@ -536,7 +536,7 @@ func benchmarkDiskQueueGet(size int64, b *testing.B) { b.StopTimer() l := test.NewTestLogger(b) dqName := "bench_disk_queue_get" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix())) - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + tmpDir, err := ioutil.TempDir("", "nsq-test-") if err != nil { panic(err) } diff --git a/nsqd/guid.go b/nsqd/guid.go index f86c78b8e..89ba83e9e 100644 --- a/nsqd/guid.go +++ b/nsqd/guid.go @@ -17,10 +17,10 @@ import ( ) const ( - workerIDBits = uint64(10) + nodeIDBits = uint64(10) sequenceBits = uint64(12) - workerIDShift = sequenceBits - timestampShift = sequenceBits + workerIDBits + nodeIDShift = sequenceBits + timestampShift = sequenceBits + nodeIDBits sequenceMask = int64(-1) ^ (int64(-1) << sequenceBits) // ( 2012-10-28 16:23:42 UTC ).UnixNano() >> 20 @@ -36,15 +36,15 @@ type guid int64 type guidFactory struct { sync.Mutex - workerID int64 + nodeID int64 sequence int64 lastTimestamp int64 lastID guid } -func NewGUIDFactory(workerID int64) *guidFactory { +func NewGUIDFactory(nodeID int64) *guidFactory { return &guidFactory{ - workerID: workerID, + nodeID: nodeID, } } @@ -72,7 +72,7 @@ func (f *guidFactory) NewGUID() (guid, error) { f.lastTimestamp = ts id := guid(((ts - twepoch) << timestampShift) | - (f.workerID << workerIDShift) | + (f.nodeID << nodeIDShift) | f.sequence) if id <= f.lastID { diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index 018332691..60be46225 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -1,6 +1,7 @@ package nsqd import ( + "bytes" "crypto/tls" "crypto/x509" "encoding/json" @@ -98,7 +99,7 @@ func New(opts *Options) *NSQD { } if opts.ID < 0 || opts.ID >= 1024 { - n.logf("FATAL: --worker-id must be [0,1024)") + n.logf("FATAL: --node-id must be [0,1024)") os.Exit(1) } @@ -267,24 +268,51 @@ type meta struct { } `json:"topics"` } -func (n *NSQD) LoadMetadata() { +func readOrEmpty(fn string) ([]byte, error) { + data, err := ioutil.ReadFile(fn) + if err != nil { + if !os.IsNotExist(err) { + return nil, fmt.Errorf("failed to read metadata from %s - %s", fn, err) + } + } + return data, nil +} + +func (n *NSQD) LoadMetadata() error { atomic.StoreInt32(&n.isLoading, 1) defer atomic.StoreInt32(&n.isLoading, 0) - fn := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID) - data, err := ioutil.ReadFile(fn) + fn := path.Join(n.getOpts().DataPath, "nsqd.dat") + // old metadata filename with ID, maintained in parallel to enable roll-back + fnID := path.Join(n.getOpts().DataPath, fmt.Sprintf("nsqd.%d.dat", n.getOpts().ID)) + + data, err := readOrEmpty(fn) if err != nil { - if !os.IsNotExist(err) { - n.logf("ERROR: failed to read channel metadata from %s - %s", fn, err) + return err + } + dataID, errID := readOrEmpty(fnID) + if errID != nil { + return errID + } + + if data == nil && dataID == nil { + return nil // fresh start + } + if data != nil && dataID != nil { + if bytes.Compare(data, dataID) != 0 { + return fmt.Errorf("metadata in %s and %s do not match (delete one)", fn, fnID) } - return + } + if data == nil { + // only old metadata file exists, use it + fn = fnID + data = dataID } var m meta err = json.Unmarshal(data, &m) if err != nil { - n.logf("ERROR: failed to parse metadata - %s", err) - return + return fmt.Errorf("failed to parse metadata in %s - %s", fn, err) } for _, t := range m.Topics { @@ -308,12 +336,15 @@ func (n *NSQD) LoadMetadata() { } } } + return nil } func (n *NSQD) PersistMetadata() error { - // persist metadata about what topics/channels we have - // so that upon restart we can get back to the same state - fileName := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID) + // persist metadata about what topics/channels we have, across restarts + fileName := path.Join(n.getOpts().DataPath, "nsqd.dat") + // old metadata filename with ID, maintained in parallel to enable roll-back + fileNameID := path.Join(n.getOpts().DataPath, fmt.Sprintf("nsqd.%d.dat", n.getOpts().ID)) + n.logf("NSQ: persisting topic/channel metadata to %s", fileName) js := make(map[string]interface{}) @@ -365,11 +396,26 @@ func (n *NSQD) PersistMetadata() error { f.Sync() f.Close() - err = atomicRename(tmpFileName, fileName) + err = os.Rename(tmpFileName, fileName) if err != nil { return err } + stat, err := os.Lstat(fileNameID) + if err != nil || (stat.Mode()&os.ModeSymlink) == 0 { + tmpFileNameID := fmt.Sprintf("%s.%d.tmp", fileNameID, rand.Int()) + + err = os.Symlink(fileName, tmpFileNameID) + if err != nil { + return err + } + err = os.Rename(tmpFileNameID, fileNameID) + if err != nil { + return err + } + } + + // technically should fsync DataPath here return nil } diff --git a/nsqd/nsqd_test.go b/nsqd/nsqd_test.go index dcc3ba501..bee29ab34 100644 --- a/nsqd/nsqd_test.go +++ b/nsqd/nsqd_test.go @@ -23,8 +23,15 @@ const ( RequestTimeout = 5 * time.Second ) +func newMetadataFile(opts *Options) string { + return path.Join(opts.DataPath, "nsqd.dat") +} +func oldMetadataFile(opts *Options) string { + return path.Join(opts.DataPath, fmt.Sprintf("nsqd.%d.dat", opts.ID)) +} + func getMetadata(n *NSQD) (*meta, error) { - fn := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID) + fn := newMetadataFile(n.getOpts()) data, err := ioutil.ReadFile(fn) if err != nil { return nil, err @@ -158,6 +165,106 @@ func TestStartup(t *testing.T) { <-doneExitChan } +func TestMetadataMigrate(t *testing.T) { + old_meta := ` + { + "topics": [ + { + "channels": [ + {"name": "c1", "paused": false}, + {"name": "c2", "paused": false} + ], + "name": "t1", + "paused": false + } + ], + "version": "1.0.0-alpha" + }` + + tmpDir, err := ioutil.TempDir("", "nsq-test-") + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + + opts := NewOptions() + opts.DataPath = tmpDir + opts.Logger = test.NewTestLogger(t) + + oldFn := oldMetadataFile(opts) + err = ioutil.WriteFile(oldFn, []byte(old_meta), 0600) + if err != nil { + panic(err) + } + + _, _, nsqd := mustStartNSQD(opts) + err = nsqd.LoadMetadata() + test.Nil(t, err) + err = nsqd.PersistMetadata() + test.Nil(t, err) + nsqd.Exit() + + oldFi, err := os.Lstat(oldFn) + test.Nil(t, err) + test.Equal(t, oldFi.Mode()&os.ModeType, os.ModeSymlink) + + _, _, nsqd = mustStartNSQD(opts) + err = nsqd.LoadMetadata() + test.Nil(t, err) + + t1, err := nsqd.GetExistingTopic("t1") + test.Nil(t, err) + test.NotNil(t, t1) + c2, err := t1.GetExistingChannel("c2") + test.Nil(t, err) + test.NotNil(t, c2) + + nsqd.Exit() +} + +func TestMetadataConflict(t *testing.T) { + old_meta := ` + { + "topics": [{ + "name": "t1", "paused": false, + "channels": [{"name": "c1", "paused": false}] + }], + "version": "1.0.0-alpha" + }` + new_meta := ` + { + "topics": [{ + "name": "t2", "paused": false, + "channels": [{"name": "c2", "paused": false}] + }], + "version": "1.0.0-alpha" + }` + + tmpDir, err := ioutil.TempDir("", "nsq-test-") + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + + opts := NewOptions() + opts.DataPath = tmpDir + opts.Logger = test.NewTestLogger(t) + + err = ioutil.WriteFile(oldMetadataFile(opts), []byte(old_meta), 0600) + if err != nil { + panic(err) + } + err = ioutil.WriteFile(newMetadataFile(opts), []byte(new_meta), 0600) + if err != nil { + panic(err) + } + + _, _, nsqd := mustStartNSQD(opts) + err = nsqd.LoadMetadata() + test.NotNil(t, err) + nsqd.Exit() +} + func TestEphemeralTopicsAndChannels(t *testing.T) { // ephemeral topics/channels are lazily removed after the last channel/client is removed opts := NewOptions() diff --git a/nsqd/options.go b/nsqd/options.go index 6803a3a6b..02389ef51 100644 --- a/nsqd/options.go +++ b/nsqd/options.go @@ -12,7 +12,7 @@ import ( type Options struct { // basic options - ID int64 `flag:"worker-id" cfg:"id"` + ID int64 `flag:"node-id" cfg:"id"` Verbose bool `flag:"verbose"` TCPAddress string `flag:"tcp-address"` HTTPAddress string `flag:"http-address"` diff --git a/nsqd/protocol_v2_test.go b/nsqd/protocol_v2_test.go index 4db7c58ff..d4d24ebb3 100644 --- a/nsqd/protocol_v2_test.go +++ b/nsqd/protocol_v2_test.go @@ -35,7 +35,7 @@ func mustStartNSQD(opts *Options) (*net.TCPAddr, *net.TCPAddr, *NSQD) { opts.HTTPAddress = "127.0.0.1:0" opts.HTTPSAddress = "127.0.0.1:0" if opts.DataPath == "" { - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + tmpDir, err := ioutil.TempDir("", "nsq-test-") if err != nil { panic(err) } diff --git a/nsqd/rename.go b/nsqd/rename.go deleted file mode 100644 index 29ad8d0b9..000000000 --- a/nsqd/rename.go +++ /dev/null @@ -1,11 +0,0 @@ -// +build !windows - -package nsqd - -import ( - "os" -) - -func atomicRename(sourceFile, targetFile string) error { - return os.Rename(sourceFile, targetFile) -} diff --git a/nsqd/rename_windows.go b/nsqd/rename_windows.go deleted file mode 100644 index ce3eca856..000000000 --- a/nsqd/rename_windows.go +++ /dev/null @@ -1,42 +0,0 @@ -// +build windows - -package nsqd - -import ( - "syscall" - "unsafe" -) - -var ( - modkernel32 = syscall.NewLazyDLL("kernel32.dll") - procMoveFileExW = modkernel32.NewProc("MoveFileExW") -) - -const ( - MOVEFILE_REPLACE_EXISTING = 1 -) - -func moveFileEx(sourceFile, targetFile *uint16, flags uint32) error { - ret, _, err := procMoveFileExW.Call(uintptr(unsafe.Pointer(sourceFile)), uintptr(unsafe.Pointer(targetFile)), uintptr(flags)) - if ret == 0 { - if err != nil { - return err - } - return syscall.EINVAL - } - return nil -} - -func atomicRename(sourceFile, targetFile string) error { - lpReplacedFileName, err := syscall.UTF16PtrFromString(targetFile) - if err != nil { - return err - } - - lpReplacementFileName, err := syscall.UTF16PtrFromString(sourceFile) - if err != nil { - return err - } - - return moveFileEx(lpReplacementFileName, lpReplacedFileName, MOVEFILE_REPLACE_EXISTING) -} diff --git a/nsqd/rename_windows_test.go b/nsqd/rename_windows_test.go deleted file mode 100644 index 155cbdad4..000000000 --- a/nsqd/rename_windows_test.go +++ /dev/null @@ -1,94 +0,0 @@ -// +build windows - -package nsqd - -import ( - "fmt" - "io/ioutil" - "math/rand" - "os" - "path/filepath" - "strings" - "testing" - "time" - - "github.com/nsqio/nsq/internal/util" -) - -const TEST_FILE_COUNT = 500 - -func TestConcurrentRenames(t *testing.T) { - var waitGroup util.WaitGroupWrapper - - r := rand.New(rand.NewSource(time.Now().UnixNano())) - trigger := make(chan struct{}) - testDir := filepath.Join(os.TempDir(), fmt.Sprintf("nsqd_TestConcurrentRenames_%d", r.Int())) - - err := os.MkdirAll(testDir, 644) - if err != nil { - t.Error(err) - } - - fis, err := ioutil.ReadDir(testDir) - if err != nil { - t.Error(err) - } else if len(fis) > 0 { - t.Errorf("Test directory %s unexpectedly has %d items in it!", testDir, len(fis)) - t.FailNow() - } - - // create a bunch of source files and attempt to concurrently rename them all - for i := 1; i <= TEST_FILE_COUNT; i++ { - //First rename doesn't overwrite/replace; no target present - sourcePath1 := filepath.Join(testDir, fmt.Sprintf("source1_%d.txt", i)) - //Second rename will replace - sourcePath2 := filepath.Join(testDir, fmt.Sprintf("source2_%d.txt", i)) - targetPath := filepath.Join(testDir, fmt.Sprintf("target_%d.txt", i)) - err = ioutil.WriteFile(sourcePath1, []byte(sourcePath1), 0644) - if err != nil { - t.Error(err) - } - err = ioutil.WriteFile(sourcePath2, []byte(sourcePath2), 0644) - if err != nil { - t.Error(err) - } - - waitGroup.Wrap(func() { - _, _ = <-trigger - err := atomicRename(sourcePath1, targetPath) - if err != nil { - t.Error(err) - } - err = atomicRename(sourcePath2, targetPath) - if err != nil { - t.Error(err) - } - }) - } - - // start.. they're off to the races! - close(trigger) - - // wait for completion... - waitGroup.Wait() - - // no source files should exist any longer; we should just have 500 target files - fis, err = ioutil.ReadDir(testDir) - if err != nil { - t.Error(err) - } else if len(fis) != TEST_FILE_COUNT { - t.Errorf("Test directory %s unexpectedly has %d items in it!", testDir, len(fis)) - } else { - for _, fi := range fis { - if !strings.HasPrefix(fi.Name(), "target_") { - t.Errorf("Test directory file %s is not expected target file!", fi.Name()) - } - } - } - - // clean up the test directory - err = os.RemoveAll(testDir) - if err != nil { - t.Error(err) - } -}