From d9e6ecc30dffde4f7d0702d635b37b7e288ce1cf Mon Sep 17 00:00:00 2001 From: Pierce Lopez Date: Sat, 7 Jan 2017 03:20:00 -0500 Subject: [PATCH] nsqd: rename --worker-id to --node-id, no ID in metadata filename --node-id is a new command-line flag to do what --worker-id did --worker-id is left as a bool to error out and give a hint (The config file key was already just "ID", no change needed there.) new metadata filename without ID symlink old metadata filename to new when loading, if both exist, ensure they match this makes rollback possible without losing messages (rolling back and then forward again will abort, if no manual intervention) Also: tests for nsqd metadata migration to new filename no longer need atomicRename() for windows os.Rename() now does the same thing on Windows that atomicRename() did (since go 1.5) ioutil.TempDir() adds a pseudo-random suffix, no need to add our own --- apps/nsqd/nsqd.go | 10 +++- nsqadmin/http_test.go | 2 +- nsqadmin/nsqadmin_test.go | 3 +- nsqd/diskqueue.go | 10 ++-- nsqd/diskqueue_test.go | 20 +++---- nsqd/guid.go | 14 ++--- nsqd/nsqd.go | 72 +++++++++++++++++++----- nsqd/nsqd_test.go | 109 +++++++++++++++++++++++++++++++++++- nsqd/options.go | 2 +- nsqd/protocol_v2_test.go | 2 +- nsqd/rename.go | 11 ---- nsqd/rename_windows.go | 42 -------------- nsqd/rename_windows_test.go | 94 ------------------------------- 13 files changed, 199 insertions(+), 192 deletions(-) delete mode 100644 nsqd/rename.go delete mode 100644 nsqd/rename_windows.go delete mode 100644 nsqd/rename_windows_test.go diff --git a/apps/nsqd/nsqd.go b/apps/nsqd/nsqd.go index 60bc26b6f..b2606a2ca 100644 --- a/apps/nsqd/nsqd.go +++ b/apps/nsqd/nsqd.go @@ -80,7 +80,8 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet { flagSet.Bool("version", false, "print version string") flagSet.Bool("verbose", false, "enable verbose logging") flagSet.String("config", "", "path to config file") - flagSet.Int64("worker-id", opts.ID, "unique seed for message ID generation (int) in range [0,4096) (will default to a hash of hostname)") + flagSet.Int64("node-id", opts.ID, "unique part for message IDs, (int) in range [0,1024) (default is hash of hostname)") + flagSet.Bool("worker-id", false, "do NOT use this, use --node-id") flagSet.String("https-address", opts.HTTPSAddress, ": to listen on for HTTPS clients") flagSet.String("http-address", opts.HTTPAddress, ": to listen on for HTTP clients") @@ -217,8 +218,11 @@ func (p *program) Start() error { options.Resolve(opts, flagSet, cfg) nsqd := nsqd.New(opts) - nsqd.LoadMetadata() - err := nsqd.PersistMetadata() + err := nsqd.LoadMetadata() + if err != nil { + log.Fatalf("ERROR: %s", err.Error()) + } + err = nsqd.PersistMetadata() if err != nil { log.Fatalf("ERROR: failed to persist metadata - %s", err.Error()) } diff --git a/nsqadmin/http_test.go b/nsqadmin/http_test.go index de5ffc9f4..b45e22fa3 100644 --- a/nsqadmin/http_test.go +++ b/nsqadmin/http_test.go @@ -73,7 +73,7 @@ func bootstrapNSQCluster(t *testing.T) (string, []*nsqd.NSQD, []*nsqlookupd.NSQL nsqdOpts.BroadcastAddress = "127.0.0.1" nsqdOpts.NSQLookupdTCPAddresses = []string{nsqlookupd1.RealTCPAddr().String()} nsqdOpts.Logger = lgr - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + tmpDir, err := ioutil.TempDir("", "nsq-test-") if err != nil { panic(err) } diff --git a/nsqadmin/nsqadmin_test.go b/nsqadmin/nsqadmin_test.go index e77d51ef2..8de362d60 100644 --- a/nsqadmin/nsqadmin_test.go +++ b/nsqadmin/nsqadmin_test.go @@ -9,7 +9,6 @@ import ( "os" "os/exec" "testing" - "time" "github.com/nsqio/nsq/internal/test" "github.com/nsqio/nsq/nsqd" @@ -103,7 +102,7 @@ func mustStartNSQD(opts *nsqd.Options) (*net.TCPAddr, *net.TCPAddr, *nsqd.NSQD) opts.HTTPAddress = "127.0.0.1:0" opts.HTTPSAddress = "127.0.0.1:0" if opts.DataPath == "" { - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + tmpDir, err := ioutil.TempDir("", "nsq-test-") if err != nil { panic(err) } diff --git a/nsqd/diskqueue.go b/nsqd/diskqueue.go index 01e7e9d3b..beaeefe95 100644 --- a/nsqd/diskqueue.go +++ b/nsqd/diskqueue.go @@ -450,7 +450,7 @@ func (d *diskQueue) persistMetaData() error { f.Close() // atomically rename - return atomicRename(tmpFileName, fileName) + return os.Rename(tmpFileName, fileName) } func (d *diskQueue) metaDataFileName() string { @@ -538,14 +538,12 @@ func (d *diskQueue) handleReadError() { badFn := d.fileName(d.readFileNum) badRenameFn := badFn + ".bad" - d.logf( - "NOTICE: diskqueue(%s) jump to next file and saving bad file as %s", + d.logf("NOTICE: diskqueue(%s) jump to next file and saving bad file as %s", d.name, badRenameFn) - err := atomicRename(badFn, badRenameFn) + err := os.Rename(badFn, badRenameFn) if err != nil { - d.logf( - "ERROR: diskqueue(%s) failed to rename bad diskqueue file %s to %s", + d.logf("ERROR: diskqueue(%s) failed to rename bad diskqueue file %s to %s", d.name, badFn, badRenameFn) } diff --git a/nsqd/diskqueue_test.go b/nsqd/diskqueue_test.go index f0aead160..ae4076896 100644 --- a/nsqd/diskqueue_test.go +++ b/nsqd/diskqueue_test.go @@ -20,7 +20,7 @@ func TestDiskQueue(t *testing.T) { l := test.NewTestLogger(t) dqName := "test_disk_queue" + strconv.Itoa(int(time.Now().Unix())) - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + tmpDir, err := ioutil.TempDir("", "nsq-test-") if err != nil { panic(err) } @@ -42,7 +42,7 @@ func TestDiskQueue(t *testing.T) { func TestDiskQueueRoll(t *testing.T) { l := test.NewTestLogger(t) dqName := "test_disk_queue_roll" + strconv.Itoa(int(time.Now().Unix())) - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + tmpDir, err := ioutil.TempDir("", "nsq-test-") if err != nil { panic(err) } @@ -73,7 +73,7 @@ func assertFileNotExist(t *testing.T, fn string) { func TestDiskQueueEmpty(t *testing.T) { l := test.NewTestLogger(t) dqName := "test_disk_queue_empty" + strconv.Itoa(int(time.Now().Unix())) - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + tmpDir, err := ioutil.TempDir("", "nsq-test-") if err != nil { panic(err) } @@ -141,7 +141,7 @@ func TestDiskQueueEmpty(t *testing.T) { func TestDiskQueueCorruption(t *testing.T) { l := test.NewTestLogger(t) dqName := "test_disk_queue_corruption" + strconv.Itoa(int(time.Now().Unix())) - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + tmpDir, err := ioutil.TempDir("", "nsq-test-") if err != nil { panic(err) } @@ -219,7 +219,7 @@ func readMetaDataFile(fileName string, retried int) md { func TestDiskQueueSyncAfterRead(t *testing.T) { l := test.NewTestLogger(t) dqName := "test_disk_queue_read_after_sync" + strconv.Itoa(int(time.Now().Unix())) - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + tmpDir, err := ioutil.TempDir("", "nsq-test-") if err != nil { panic(err) } @@ -270,7 +270,7 @@ func TestDiskQueueTorture(t *testing.T) { l := test.NewTestLogger(t) dqName := "test_disk_queue_torture" + strconv.Itoa(int(time.Now().Unix())) - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + tmpDir, err := ioutil.TempDir("", "nsq-test-") if err != nil { panic(err) } @@ -385,7 +385,7 @@ func benchmarkDiskQueuePut(size int64, b *testing.B) { b.StopTimer() l := test.NewTestLogger(b) dqName := "bench_disk_queue_put" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix())) - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + tmpDir, err := ioutil.TempDir("", "nsq-test-") if err != nil { panic(err) } @@ -434,7 +434,7 @@ func BenchmarkDiskWrite1048576(b *testing.B) { func benchmarkDiskWrite(size int64, b *testing.B) { b.StopTimer() fileName := "bench_disk_queue_put" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix())) - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + tmpDir, err := ioutil.TempDir("", "nsq-test-") if err != nil { panic(err) } @@ -480,7 +480,7 @@ func BenchmarkDiskWriteBuffered1048576(b *testing.B) { func benchmarkDiskWriteBuffered(size int64, b *testing.B) { b.StopTimer() fileName := "bench_disk_queue_put" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix())) - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + tmpDir, err := ioutil.TempDir("", "nsq-test-") if err != nil { panic(err) } @@ -536,7 +536,7 @@ func benchmarkDiskQueueGet(size int64, b *testing.B) { b.StopTimer() l := test.NewTestLogger(b) dqName := "bench_disk_queue_get" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix())) - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + tmpDir, err := ioutil.TempDir("", "nsq-test-") if err != nil { panic(err) } diff --git a/nsqd/guid.go b/nsqd/guid.go index f86c78b8e..89ba83e9e 100644 --- a/nsqd/guid.go +++ b/nsqd/guid.go @@ -17,10 +17,10 @@ import ( ) const ( - workerIDBits = uint64(10) + nodeIDBits = uint64(10) sequenceBits = uint64(12) - workerIDShift = sequenceBits - timestampShift = sequenceBits + workerIDBits + nodeIDShift = sequenceBits + timestampShift = sequenceBits + nodeIDBits sequenceMask = int64(-1) ^ (int64(-1) << sequenceBits) // ( 2012-10-28 16:23:42 UTC ).UnixNano() >> 20 @@ -36,15 +36,15 @@ type guid int64 type guidFactory struct { sync.Mutex - workerID int64 + nodeID int64 sequence int64 lastTimestamp int64 lastID guid } -func NewGUIDFactory(workerID int64) *guidFactory { +func NewGUIDFactory(nodeID int64) *guidFactory { return &guidFactory{ - workerID: workerID, + nodeID: nodeID, } } @@ -72,7 +72,7 @@ func (f *guidFactory) NewGUID() (guid, error) { f.lastTimestamp = ts id := guid(((ts - twepoch) << timestampShift) | - (f.workerID << workerIDShift) | + (f.nodeID << nodeIDShift) | f.sequence) if id <= f.lastID { diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index 018332691..60be46225 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -1,6 +1,7 @@ package nsqd import ( + "bytes" "crypto/tls" "crypto/x509" "encoding/json" @@ -98,7 +99,7 @@ func New(opts *Options) *NSQD { } if opts.ID < 0 || opts.ID >= 1024 { - n.logf("FATAL: --worker-id must be [0,1024)") + n.logf("FATAL: --node-id must be [0,1024)") os.Exit(1) } @@ -267,24 +268,51 @@ type meta struct { } `json:"topics"` } -func (n *NSQD) LoadMetadata() { +func readOrEmpty(fn string) ([]byte, error) { + data, err := ioutil.ReadFile(fn) + if err != nil { + if !os.IsNotExist(err) { + return nil, fmt.Errorf("failed to read metadata from %s - %s", fn, err) + } + } + return data, nil +} + +func (n *NSQD) LoadMetadata() error { atomic.StoreInt32(&n.isLoading, 1) defer atomic.StoreInt32(&n.isLoading, 0) - fn := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID) - data, err := ioutil.ReadFile(fn) + fn := path.Join(n.getOpts().DataPath, "nsqd.dat") + // old metadata filename with ID, maintained in parallel to enable roll-back + fnID := path.Join(n.getOpts().DataPath, fmt.Sprintf("nsqd.%d.dat", n.getOpts().ID)) + + data, err := readOrEmpty(fn) if err != nil { - if !os.IsNotExist(err) { - n.logf("ERROR: failed to read channel metadata from %s - %s", fn, err) + return err + } + dataID, errID := readOrEmpty(fnID) + if errID != nil { + return errID + } + + if data == nil && dataID == nil { + return nil // fresh start + } + if data != nil && dataID != nil { + if bytes.Compare(data, dataID) != 0 { + return fmt.Errorf("metadata in %s and %s do not match (delete one)", fn, fnID) } - return + } + if data == nil { + // only old metadata file exists, use it + fn = fnID + data = dataID } var m meta err = json.Unmarshal(data, &m) if err != nil { - n.logf("ERROR: failed to parse metadata - %s", err) - return + return fmt.Errorf("failed to parse metadata in %s - %s", fn, err) } for _, t := range m.Topics { @@ -308,12 +336,15 @@ func (n *NSQD) LoadMetadata() { } } } + return nil } func (n *NSQD) PersistMetadata() error { - // persist metadata about what topics/channels we have - // so that upon restart we can get back to the same state - fileName := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID) + // persist metadata about what topics/channels we have, across restarts + fileName := path.Join(n.getOpts().DataPath, "nsqd.dat") + // old metadata filename with ID, maintained in parallel to enable roll-back + fileNameID := path.Join(n.getOpts().DataPath, fmt.Sprintf("nsqd.%d.dat", n.getOpts().ID)) + n.logf("NSQ: persisting topic/channel metadata to %s", fileName) js := make(map[string]interface{}) @@ -365,11 +396,26 @@ func (n *NSQD) PersistMetadata() error { f.Sync() f.Close() - err = atomicRename(tmpFileName, fileName) + err = os.Rename(tmpFileName, fileName) if err != nil { return err } + stat, err := os.Lstat(fileNameID) + if err != nil || (stat.Mode()&os.ModeSymlink) == 0 { + tmpFileNameID := fmt.Sprintf("%s.%d.tmp", fileNameID, rand.Int()) + + err = os.Symlink(fileName, tmpFileNameID) + if err != nil { + return err + } + err = os.Rename(tmpFileNameID, fileNameID) + if err != nil { + return err + } + } + + // technically should fsync DataPath here return nil } diff --git a/nsqd/nsqd_test.go b/nsqd/nsqd_test.go index dcc3ba501..bee29ab34 100644 --- a/nsqd/nsqd_test.go +++ b/nsqd/nsqd_test.go @@ -23,8 +23,15 @@ const ( RequestTimeout = 5 * time.Second ) +func newMetadataFile(opts *Options) string { + return path.Join(opts.DataPath, "nsqd.dat") +} +func oldMetadataFile(opts *Options) string { + return path.Join(opts.DataPath, fmt.Sprintf("nsqd.%d.dat", opts.ID)) +} + func getMetadata(n *NSQD) (*meta, error) { - fn := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID) + fn := newMetadataFile(n.getOpts()) data, err := ioutil.ReadFile(fn) if err != nil { return nil, err @@ -158,6 +165,106 @@ func TestStartup(t *testing.T) { <-doneExitChan } +func TestMetadataMigrate(t *testing.T) { + old_meta := ` + { + "topics": [ + { + "channels": [ + {"name": "c1", "paused": false}, + {"name": "c2", "paused": false} + ], + "name": "t1", + "paused": false + } + ], + "version": "1.0.0-alpha" + }` + + tmpDir, err := ioutil.TempDir("", "nsq-test-") + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + + opts := NewOptions() + opts.DataPath = tmpDir + opts.Logger = test.NewTestLogger(t) + + oldFn := oldMetadataFile(opts) + err = ioutil.WriteFile(oldFn, []byte(old_meta), 0600) + if err != nil { + panic(err) + } + + _, _, nsqd := mustStartNSQD(opts) + err = nsqd.LoadMetadata() + test.Nil(t, err) + err = nsqd.PersistMetadata() + test.Nil(t, err) + nsqd.Exit() + + oldFi, err := os.Lstat(oldFn) + test.Nil(t, err) + test.Equal(t, oldFi.Mode()&os.ModeType, os.ModeSymlink) + + _, _, nsqd = mustStartNSQD(opts) + err = nsqd.LoadMetadata() + test.Nil(t, err) + + t1, err := nsqd.GetExistingTopic("t1") + test.Nil(t, err) + test.NotNil(t, t1) + c2, err := t1.GetExistingChannel("c2") + test.Nil(t, err) + test.NotNil(t, c2) + + nsqd.Exit() +} + +func TestMetadataConflict(t *testing.T) { + old_meta := ` + { + "topics": [{ + "name": "t1", "paused": false, + "channels": [{"name": "c1", "paused": false}] + }], + "version": "1.0.0-alpha" + }` + new_meta := ` + { + "topics": [{ + "name": "t2", "paused": false, + "channels": [{"name": "c2", "paused": false}] + }], + "version": "1.0.0-alpha" + }` + + tmpDir, err := ioutil.TempDir("", "nsq-test-") + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + + opts := NewOptions() + opts.DataPath = tmpDir + opts.Logger = test.NewTestLogger(t) + + err = ioutil.WriteFile(oldMetadataFile(opts), []byte(old_meta), 0600) + if err != nil { + panic(err) + } + err = ioutil.WriteFile(newMetadataFile(opts), []byte(new_meta), 0600) + if err != nil { + panic(err) + } + + _, _, nsqd := mustStartNSQD(opts) + err = nsqd.LoadMetadata() + test.NotNil(t, err) + nsqd.Exit() +} + func TestEphemeralTopicsAndChannels(t *testing.T) { // ephemeral topics/channels are lazily removed after the last channel/client is removed opts := NewOptions() diff --git a/nsqd/options.go b/nsqd/options.go index 6803a3a6b..02389ef51 100644 --- a/nsqd/options.go +++ b/nsqd/options.go @@ -12,7 +12,7 @@ import ( type Options struct { // basic options - ID int64 `flag:"worker-id" cfg:"id"` + ID int64 `flag:"node-id" cfg:"id"` Verbose bool `flag:"verbose"` TCPAddress string `flag:"tcp-address"` HTTPAddress string `flag:"http-address"` diff --git a/nsqd/protocol_v2_test.go b/nsqd/protocol_v2_test.go index 4db7c58ff..d4d24ebb3 100644 --- a/nsqd/protocol_v2_test.go +++ b/nsqd/protocol_v2_test.go @@ -35,7 +35,7 @@ func mustStartNSQD(opts *Options) (*net.TCPAddr, *net.TCPAddr, *NSQD) { opts.HTTPAddress = "127.0.0.1:0" opts.HTTPSAddress = "127.0.0.1:0" if opts.DataPath == "" { - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + tmpDir, err := ioutil.TempDir("", "nsq-test-") if err != nil { panic(err) } diff --git a/nsqd/rename.go b/nsqd/rename.go deleted file mode 100644 index 29ad8d0b9..000000000 --- a/nsqd/rename.go +++ /dev/null @@ -1,11 +0,0 @@ -// +build !windows - -package nsqd - -import ( - "os" -) - -func atomicRename(sourceFile, targetFile string) error { - return os.Rename(sourceFile, targetFile) -} diff --git a/nsqd/rename_windows.go b/nsqd/rename_windows.go deleted file mode 100644 index ce3eca856..000000000 --- a/nsqd/rename_windows.go +++ /dev/null @@ -1,42 +0,0 @@ -// +build windows - -package nsqd - -import ( - "syscall" - "unsafe" -) - -var ( - modkernel32 = syscall.NewLazyDLL("kernel32.dll") - procMoveFileExW = modkernel32.NewProc("MoveFileExW") -) - -const ( - MOVEFILE_REPLACE_EXISTING = 1 -) - -func moveFileEx(sourceFile, targetFile *uint16, flags uint32) error { - ret, _, err := procMoveFileExW.Call(uintptr(unsafe.Pointer(sourceFile)), uintptr(unsafe.Pointer(targetFile)), uintptr(flags)) - if ret == 0 { - if err != nil { - return err - } - return syscall.EINVAL - } - return nil -} - -func atomicRename(sourceFile, targetFile string) error { - lpReplacedFileName, err := syscall.UTF16PtrFromString(targetFile) - if err != nil { - return err - } - - lpReplacementFileName, err := syscall.UTF16PtrFromString(sourceFile) - if err != nil { - return err - } - - return moveFileEx(lpReplacementFileName, lpReplacedFileName, MOVEFILE_REPLACE_EXISTING) -} diff --git a/nsqd/rename_windows_test.go b/nsqd/rename_windows_test.go deleted file mode 100644 index 155cbdad4..000000000 --- a/nsqd/rename_windows_test.go +++ /dev/null @@ -1,94 +0,0 @@ -// +build windows - -package nsqd - -import ( - "fmt" - "io/ioutil" - "math/rand" - "os" - "path/filepath" - "strings" - "testing" - "time" - - "github.com/nsqio/nsq/internal/util" -) - -const TEST_FILE_COUNT = 500 - -func TestConcurrentRenames(t *testing.T) { - var waitGroup util.WaitGroupWrapper - - r := rand.New(rand.NewSource(time.Now().UnixNano())) - trigger := make(chan struct{}) - testDir := filepath.Join(os.TempDir(), fmt.Sprintf("nsqd_TestConcurrentRenames_%d", r.Int())) - - err := os.MkdirAll(testDir, 644) - if err != nil { - t.Error(err) - } - - fis, err := ioutil.ReadDir(testDir) - if err != nil { - t.Error(err) - } else if len(fis) > 0 { - t.Errorf("Test directory %s unexpectedly has %d items in it!", testDir, len(fis)) - t.FailNow() - } - - // create a bunch of source files and attempt to concurrently rename them all - for i := 1; i <= TEST_FILE_COUNT; i++ { - //First rename doesn't overwrite/replace; no target present - sourcePath1 := filepath.Join(testDir, fmt.Sprintf("source1_%d.txt", i)) - //Second rename will replace - sourcePath2 := filepath.Join(testDir, fmt.Sprintf("source2_%d.txt", i)) - targetPath := filepath.Join(testDir, fmt.Sprintf("target_%d.txt", i)) - err = ioutil.WriteFile(sourcePath1, []byte(sourcePath1), 0644) - if err != nil { - t.Error(err) - } - err = ioutil.WriteFile(sourcePath2, []byte(sourcePath2), 0644) - if err != nil { - t.Error(err) - } - - waitGroup.Wrap(func() { - _, _ = <-trigger - err := atomicRename(sourcePath1, targetPath) - if err != nil { - t.Error(err) - } - err = atomicRename(sourcePath2, targetPath) - if err != nil { - t.Error(err) - } - }) - } - - // start.. they're off to the races! - close(trigger) - - // wait for completion... - waitGroup.Wait() - - // no source files should exist any longer; we should just have 500 target files - fis, err = ioutil.ReadDir(testDir) - if err != nil { - t.Error(err) - } else if len(fis) != TEST_FILE_COUNT { - t.Errorf("Test directory %s unexpectedly has %d items in it!", testDir, len(fis)) - } else { - for _, fi := range fis { - if !strings.HasPrefix(fi.Name(), "target_") { - t.Errorf("Test directory file %s is not expected target file!", fi.Name()) - } - } - } - - // clean up the test directory - err = os.RemoveAll(testDir) - if err != nil { - t.Error(err) - } -}