Skip to content

Commit

Permalink
nsqd: metadata file back-compat only, no rollback-compat
Browse files Browse the repository at this point in the history
  • Loading branch information
ploxiln committed Jan 29, 2017
1 parent 3b4b6a5 commit 7678df0
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 92 deletions.
54 changes: 7 additions & 47 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package nsqd

import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
Expand All @@ -13,7 +12,6 @@ import (
"net"
"os"
"path"
"runtime"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -307,30 +305,21 @@ func (n *NSQD) LoadMetadata() error {
defer atomic.StoreInt32(&n.isLoading, 0)

fn := newMetadataFile(n.getOpts())
// old metadata filename with ID, maintained in parallel to enable roll-back
fnID := oldMetadataFile(n.getOpts())

data, err := readOrEmpty(fn)
if err != nil {
return err
}
dataID, errID := readOrEmpty(fnID)
if errID != nil {
return errID
}

if data == nil && dataID == nil {
return nil // fresh start
}
if data != nil && dataID != nil {
if bytes.Compare(data, dataID) != 0 {
return fmt.Errorf("metadata in %s and %s do not match (delete one)", fn, fnID)
if data == nil {
// try fallback to old metadata filename (with ID)
fnID := oldMetadataFile(n.getOpts())
data, err = readOrEmpty(fnID)
if err != nil {
return err
}
}
if data == nil {
// only old metadata file exists, use it
fn = fnID
data = dataID
return nil // fresh start
}

var m meta
Expand Down Expand Up @@ -366,8 +355,6 @@ func (n *NSQD) LoadMetadata() error {
func (n *NSQD) PersistMetadata() error {
// persist metadata about what topics/channels we have, across restarts
fileName := newMetadataFile(n.getOpts())
// old metadata filename with ID, maintained in parallel to enable roll-back
fileNameID := oldMetadataFile(n.getOpts())

n.logf("NSQ: persisting topic/channel metadata to %s", fileName)

Expand Down Expand Up @@ -418,33 +405,6 @@ func (n *NSQD) PersistMetadata() error {
}
// technically should fsync DataPath here

stat, err := os.Lstat(fileNameID)
if err == nil && stat.Mode()&os.ModeSymlink != 0 {
return nil
}

// if no symlink (yet), race condition:
// crash right here may cause next startup to see metadata conflict and abort

tmpFileNameID := fmt.Sprintf("%s.%d.tmp", fileNameID, rand.Int())

if runtime.GOOS != "windows" {
err = os.Symlink(fileName, tmpFileNameID)
} else {
// on Windows need Administrator privs to Symlink
// instead write copy every time
err = writeSyncFile(tmpFileNameID, data)
}
if err != nil {
return err
}

err = os.Rename(tmpFileNameID, fileNameID)
if err != nil {
return err
}
// technically should fsync DataPath here

return nil
}

Expand Down
47 changes: 2 additions & 45 deletions nsqd/nsqd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,9 @@ func TestMetadataMigrate(t *testing.T) {
test.Nil(t, err)
nsqd.Exit()

oldFi, err := os.Lstat(oldFn)
newFn := newMetadataFile(opts)
_, err = os.Stat(newFn)
test.Nil(t, err)
test.Equal(t, oldFi.Mode()&os.ModeType, os.ModeSymlink)

_, _, nsqd = mustStartNSQD(opts)
err = nsqd.LoadMetadata()
Expand All @@ -214,49 +214,6 @@ func TestMetadataMigrate(t *testing.T) {
nsqd.Exit()
}

func TestMetadataConflict(t *testing.T) {
old_meta := `
{
"topics": [{
"name": "t1", "paused": false,
"channels": [{"name": "c1", "paused": false}]
}],
"version": "1.0.0-alpha"
}`
new_meta := `
{
"topics": [{
"name": "t2", "paused": false,
"channels": [{"name": "c2", "paused": false}]
}],
"version": "1.0.0-alpha"
}`

tmpDir, err := ioutil.TempDir("", "nsq-test-")
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)

opts := NewOptions()
opts.DataPath = tmpDir
opts.Logger = test.NewTestLogger(t)

err = ioutil.WriteFile(oldMetadataFile(opts), []byte(old_meta), 0600)
if err != nil {
panic(err)
}
err = ioutil.WriteFile(newMetadataFile(opts), []byte(new_meta), 0600)
if err != nil {
panic(err)
}

_, _, nsqd := mustStartNSQD(opts)
err = nsqd.LoadMetadata()
test.NotNil(t, err)
nsqd.Exit()
}

func TestEphemeralTopicsAndChannels(t *testing.T) {
// ephemeral topics/channels are lazily removed after the last channel/client is removed
opts := NewOptions()
Expand Down

0 comments on commit 7678df0

Please sign in to comment.