Skip to content

Commit

Permalink
nsqd: rename --worker-id to --node-id, no ID in metadata filename
Browse files Browse the repository at this point in the history
--node-id is a new command-line flag to do what --worker-id did
--worker-id is left as a bool to error out and give a hint
(The config file key was already just "ID", no change needed there.)

new metadata filename without ID
symlink old metadata filename to new
when loading, if both exist, ensure they match
this makes rollback possible without losing messages
(rolling back and then forward again will abort, if no manual intervention)

Also:

tests for nsqd metadata migration to new filename

no longer need atomicRename() for windows
os.Rename() now does the same thing on Windows
that atomicRename() did (since go 1.5)

ioutil.TempDir() adds a pseudo-random suffix, no need to add our own
  • Loading branch information
ploxiln committed Jan 12, 2017
1 parent ecfb30f commit d9e6ecc
Show file tree
Hide file tree
Showing 13 changed files with 199 additions and 192 deletions.
10 changes: 7 additions & 3 deletions apps/nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet {
flagSet.Bool("version", false, "print version string")
flagSet.Bool("verbose", false, "enable verbose logging")
flagSet.String("config", "", "path to config file")
flagSet.Int64("worker-id", opts.ID, "unique seed for message ID generation (int) in range [0,4096) (will default to a hash of hostname)")
flagSet.Int64("node-id", opts.ID, "unique part for message IDs, (int) in range [0,1024) (default is hash of hostname)")
flagSet.Bool("worker-id", false, "do NOT use this, use --node-id")

flagSet.String("https-address", opts.HTTPSAddress, "<addr>:<port> to listen on for HTTPS clients")
flagSet.String("http-address", opts.HTTPAddress, "<addr>:<port> to listen on for HTTP clients")
Expand Down Expand Up @@ -217,8 +218,11 @@ func (p *program) Start() error {
options.Resolve(opts, flagSet, cfg)
nsqd := nsqd.New(opts)

nsqd.LoadMetadata()
err := nsqd.PersistMetadata()
err := nsqd.LoadMetadata()
if err != nil {
log.Fatalf("ERROR: %s", err.Error())
}
err = nsqd.PersistMetadata()
if err != nil {
log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion nsqadmin/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func bootstrapNSQCluster(t *testing.T) (string, []*nsqd.NSQD, []*nsqlookupd.NSQL
nsqdOpts.BroadcastAddress = "127.0.0.1"
nsqdOpts.NSQLookupdTCPAddresses = []string{nsqlookupd1.RealTCPAddr().String()}
nsqdOpts.Logger = lgr
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
tmpDir, err := ioutil.TempDir("", "nsq-test-")
if err != nil {
panic(err)
}
Expand Down
3 changes: 1 addition & 2 deletions nsqadmin/nsqadmin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"os"
"os/exec"
"testing"
"time"

"github.com/nsqio/nsq/internal/test"
"github.com/nsqio/nsq/nsqd"
Expand Down Expand Up @@ -103,7 +102,7 @@ func mustStartNSQD(opts *nsqd.Options) (*net.TCPAddr, *net.TCPAddr, *nsqd.NSQD)
opts.HTTPAddress = "127.0.0.1:0"
opts.HTTPSAddress = "127.0.0.1:0"
if opts.DataPath == "" {
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
tmpDir, err := ioutil.TempDir("", "nsq-test-")
if err != nil {
panic(err)
}
Expand Down
10 changes: 4 additions & 6 deletions nsqd/diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func (d *diskQueue) persistMetaData() error {
f.Close()

// atomically rename
return atomicRename(tmpFileName, fileName)
return os.Rename(tmpFileName, fileName)
}

func (d *diskQueue) metaDataFileName() string {
Expand Down Expand Up @@ -538,14 +538,12 @@ func (d *diskQueue) handleReadError() {
badFn := d.fileName(d.readFileNum)
badRenameFn := badFn + ".bad"

d.logf(
"NOTICE: diskqueue(%s) jump to next file and saving bad file as %s",
d.logf("NOTICE: diskqueue(%s) jump to next file and saving bad file as %s",
d.name, badRenameFn)

err := atomicRename(badFn, badRenameFn)
err := os.Rename(badFn, badRenameFn)
if err != nil {
d.logf(
"ERROR: diskqueue(%s) failed to rename bad diskqueue file %s to %s",
d.logf("ERROR: diskqueue(%s) failed to rename bad diskqueue file %s to %s",
d.name, badFn, badRenameFn)
}

Expand Down
20 changes: 10 additions & 10 deletions nsqd/diskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestDiskQueue(t *testing.T) {
l := test.NewTestLogger(t)

dqName := "test_disk_queue" + strconv.Itoa(int(time.Now().Unix()))
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
tmpDir, err := ioutil.TempDir("", "nsq-test-")
if err != nil {
panic(err)
}
Expand All @@ -42,7 +42,7 @@ func TestDiskQueue(t *testing.T) {
func TestDiskQueueRoll(t *testing.T) {
l := test.NewTestLogger(t)
dqName := "test_disk_queue_roll" + strconv.Itoa(int(time.Now().Unix()))
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
tmpDir, err := ioutil.TempDir("", "nsq-test-")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -73,7 +73,7 @@ func assertFileNotExist(t *testing.T, fn string) {
func TestDiskQueueEmpty(t *testing.T) {
l := test.NewTestLogger(t)
dqName := "test_disk_queue_empty" + strconv.Itoa(int(time.Now().Unix()))
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
tmpDir, err := ioutil.TempDir("", "nsq-test-")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -141,7 +141,7 @@ func TestDiskQueueEmpty(t *testing.T) {
func TestDiskQueueCorruption(t *testing.T) {
l := test.NewTestLogger(t)
dqName := "test_disk_queue_corruption" + strconv.Itoa(int(time.Now().Unix()))
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
tmpDir, err := ioutil.TempDir("", "nsq-test-")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -219,7 +219,7 @@ func readMetaDataFile(fileName string, retried int) md {
func TestDiskQueueSyncAfterRead(t *testing.T) {
l := test.NewTestLogger(t)
dqName := "test_disk_queue_read_after_sync" + strconv.Itoa(int(time.Now().Unix()))
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
tmpDir, err := ioutil.TempDir("", "nsq-test-")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -270,7 +270,7 @@ func TestDiskQueueTorture(t *testing.T) {

l := test.NewTestLogger(t)
dqName := "test_disk_queue_torture" + strconv.Itoa(int(time.Now().Unix()))
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
tmpDir, err := ioutil.TempDir("", "nsq-test-")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -385,7 +385,7 @@ func benchmarkDiskQueuePut(size int64, b *testing.B) {
b.StopTimer()
l := test.NewTestLogger(b)
dqName := "bench_disk_queue_put" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix()))
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
tmpDir, err := ioutil.TempDir("", "nsq-test-")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -434,7 +434,7 @@ func BenchmarkDiskWrite1048576(b *testing.B) {
func benchmarkDiskWrite(size int64, b *testing.B) {
b.StopTimer()
fileName := "bench_disk_queue_put" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix()))
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
tmpDir, err := ioutil.TempDir("", "nsq-test-")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -480,7 +480,7 @@ func BenchmarkDiskWriteBuffered1048576(b *testing.B) {
func benchmarkDiskWriteBuffered(size int64, b *testing.B) {
b.StopTimer()
fileName := "bench_disk_queue_put" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix()))
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
tmpDir, err := ioutil.TempDir("", "nsq-test-")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -536,7 +536,7 @@ func benchmarkDiskQueueGet(size int64, b *testing.B) {
b.StopTimer()
l := test.NewTestLogger(b)
dqName := "bench_disk_queue_get" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix()))
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
tmpDir, err := ioutil.TempDir("", "nsq-test-")
if err != nil {
panic(err)
}
Expand Down
14 changes: 7 additions & 7 deletions nsqd/guid.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import (
)

const (
workerIDBits = uint64(10)
nodeIDBits = uint64(10)
sequenceBits = uint64(12)
workerIDShift = sequenceBits
timestampShift = sequenceBits + workerIDBits
nodeIDShift = sequenceBits
timestampShift = sequenceBits + nodeIDBits
sequenceMask = int64(-1) ^ (int64(-1) << sequenceBits)

// ( 2012-10-28 16:23:42 UTC ).UnixNano() >> 20
Expand All @@ -36,15 +36,15 @@ type guid int64
type guidFactory struct {
sync.Mutex

workerID int64
nodeID int64
sequence int64
lastTimestamp int64
lastID guid
}

func NewGUIDFactory(workerID int64) *guidFactory {
func NewGUIDFactory(nodeID int64) *guidFactory {
return &guidFactory{
workerID: workerID,
nodeID: nodeID,
}
}

Expand Down Expand Up @@ -72,7 +72,7 @@ func (f *guidFactory) NewGUID() (guid, error) {
f.lastTimestamp = ts

id := guid(((ts - twepoch) << timestampShift) |
(f.workerID << workerIDShift) |
(f.nodeID << nodeIDShift) |
f.sequence)

if id <= f.lastID {
Expand Down
72 changes: 59 additions & 13 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nsqd

import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
Expand Down Expand Up @@ -98,7 +99,7 @@ func New(opts *Options) *NSQD {
}

if opts.ID < 0 || opts.ID >= 1024 {
n.logf("FATAL: --worker-id must be [0,1024)")
n.logf("FATAL: --node-id must be [0,1024)")
os.Exit(1)
}

Expand Down Expand Up @@ -267,24 +268,51 @@ type meta struct {
} `json:"topics"`
}

func (n *NSQD) LoadMetadata() {
func readOrEmpty(fn string) ([]byte, error) {
data, err := ioutil.ReadFile(fn)
if err != nil {
if !os.IsNotExist(err) {
return nil, fmt.Errorf("failed to read metadata from %s - %s", fn, err)
}
}
return data, nil
}

func (n *NSQD) LoadMetadata() error {
atomic.StoreInt32(&n.isLoading, 1)
defer atomic.StoreInt32(&n.isLoading, 0)

fn := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID)
data, err := ioutil.ReadFile(fn)
fn := path.Join(n.getOpts().DataPath, "nsqd.dat")
// old metadata filename with ID, maintained in parallel to enable roll-back
fnID := path.Join(n.getOpts().DataPath, fmt.Sprintf("nsqd.%d.dat", n.getOpts().ID))

data, err := readOrEmpty(fn)
if err != nil {
if !os.IsNotExist(err) {
n.logf("ERROR: failed to read channel metadata from %s - %s", fn, err)
return err
}
dataID, errID := readOrEmpty(fnID)
if errID != nil {
return errID
}

if data == nil && dataID == nil {
return nil // fresh start
}
if data != nil && dataID != nil {
if bytes.Compare(data, dataID) != 0 {
return fmt.Errorf("metadata in %s and %s do not match (delete one)", fn, fnID)
}
return
}
if data == nil {
// only old metadata file exists, use it
fn = fnID
data = dataID
}

var m meta
err = json.Unmarshal(data, &m)
if err != nil {
n.logf("ERROR: failed to parse metadata - %s", err)
return
return fmt.Errorf("failed to parse metadata in %s - %s", fn, err)
}

for _, t := range m.Topics {
Expand All @@ -308,12 +336,15 @@ func (n *NSQD) LoadMetadata() {
}
}
}
return nil
}

func (n *NSQD) PersistMetadata() error {
// persist metadata about what topics/channels we have
// so that upon restart we can get back to the same state
fileName := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID)
// persist metadata about what topics/channels we have, across restarts
fileName := path.Join(n.getOpts().DataPath, "nsqd.dat")
// old metadata filename with ID, maintained in parallel to enable roll-back
fileNameID := path.Join(n.getOpts().DataPath, fmt.Sprintf("nsqd.%d.dat", n.getOpts().ID))

n.logf("NSQ: persisting topic/channel metadata to %s", fileName)

js := make(map[string]interface{})
Expand Down Expand Up @@ -365,11 +396,26 @@ func (n *NSQD) PersistMetadata() error {
f.Sync()
f.Close()

err = atomicRename(tmpFileName, fileName)
err = os.Rename(tmpFileName, fileName)
if err != nil {
return err
}

stat, err := os.Lstat(fileNameID)
if err != nil || (stat.Mode()&os.ModeSymlink) == 0 {
tmpFileNameID := fmt.Sprintf("%s.%d.tmp", fileNameID, rand.Int())

err = os.Symlink(fileName, tmpFileNameID)
if err != nil {
return err
}
err = os.Rename(tmpFileNameID, fileNameID)
if err != nil {
return err
}
}

// technically should fsync DataPath here
return nil
}

Expand Down
Loading

0 comments on commit d9e6ecc

Please sign in to comment.