Skip to content

Commit

Permalink
GetOrCreateTopic, GetOrCreateChannel: return *Obj, error
Browse files Browse the repository at this point in the history
  • Loading branch information
jehiah committed Mar 1, 2021
1 parent 2cb1881 commit 9cc5d8b
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 120 deletions.
18 changes: 12 additions & 6 deletions nsqadmin/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ func TestHTTPChannelGET(t *testing.T) {
defer nsqadmin1.Exit()

topicName := "test_channel_get" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqds[0].GetOrCreateTopic(topicName)
topic, err := nsqds[0].GetOrCreateTopic(topicName)
test.Nil(t, err)
topic.GetOrCreateChannel("ch")
time.Sleep(100 * time.Millisecond)

Expand Down Expand Up @@ -292,7 +293,8 @@ func TestHTTPNodesSingleGET(t *testing.T) {
defer nsqadmin1.Exit()

topicName := "test_nodes_single_get" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqds[0].GetOrCreateTopic(topicName)
topic, err := nsqds[0].GetOrCreateTopic(topicName)
test.Nil(t, err)
topic.GetOrCreateChannel("ch")
time.Sleep(100 * time.Millisecond)

Expand Down Expand Up @@ -419,7 +421,8 @@ func TestHTTPDeleteChannelPOST(t *testing.T) {
defer nsqadmin1.Exit()

topicName := "test_delete_channel_post" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqds[0].GetOrCreateTopic(topicName)
topic, err := nsqds[0].GetOrCreateTopic(topicName)
test.Nil(t, err)
topic.GetOrCreateChannel("ch")
time.Sleep(100 * time.Millisecond)

Expand Down Expand Up @@ -474,7 +477,8 @@ func TestHTTPPauseChannelPOST(t *testing.T) {
defer nsqadmin1.Exit()

topicName := "test_pause_channel_post" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqds[0].GetOrCreateTopic(topicName)
topic, err := nsqds[0].GetOrCreateTopic(topicName)
test.Nil(t, err)
topic.GetOrCreateChannel("ch")
time.Sleep(100 * time.Millisecond)

Expand Down Expand Up @@ -509,7 +513,8 @@ func TestHTTPEmptyTopicPOST(t *testing.T) {
defer nsqadmin1.Exit()

topicName := "test_empty_topic_post" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqds[0].GetOrCreateTopic(topicName)
topic, err := nsqds[0].GetOrCreateTopic(topicName)
test.Nil(t, err)
topic.PutMessage(nsqd.NewMessage(nsqd.MessageID{}, []byte("1234")))
test.Equal(t, int64(1), topic.Depth())
time.Sleep(100 * time.Millisecond)
Expand Down Expand Up @@ -537,7 +542,8 @@ func TestHTTPEmptyChannelPOST(t *testing.T) {
defer nsqadmin1.Exit()

topicName := "test_empty_channel_post" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqds[0].GetOrCreateTopic(topicName)
topic, err := nsqds[0].GetOrCreateTopic(topicName)
test.Nil(t, err)
channel := topic.GetOrCreateChannel("ch")
channel.PutMessage(nsqd.NewMessage(nsqd.MessageID{}, []byte("1234")))

Expand Down
34 changes: 17 additions & 17 deletions nsqd/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ func TestPutMessage(t *testing.T) {
defer nsqd.Exit()

topicName := "test_put_message" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetOrCreateTopic(topicName)
channel1 := topic.GetOrCreateChannel("ch")
topic, _ := nsqd.GetOrCreateTopic(topicName)
channel1, _ := topic.GetOrCreateChannel("ch")

var id MessageID
msg := NewMessage(id, []byte("test"))
Expand All @@ -42,9 +42,9 @@ func TestPutMessage2Chan(t *testing.T) {
defer nsqd.Exit()

topicName := "test_put_message_2chan" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetOrCreateTopic(topicName)
channel1 := topic.GetOrCreateChannel("ch1")
channel2 := topic.GetOrCreateChannel("ch2")
topic, _ := nsqd.GetOrCreateTopic(topicName)
channel1, _ := topic.GetOrCreateChannel("ch1")
channel2, _ := topic.GetOrCreateChannel("ch2")

var id MessageID
msg := NewMessage(id, []byte("test"))
Expand All @@ -71,8 +71,8 @@ func TestInFlightWorker(t *testing.T) {
defer nsqd.Exit()

topicName := "test_in_flight_worker" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetOrCreateTopic(topicName)
channel := topic.GetOrCreateChannel("channel")
topic, _ := nsqd.GetOrCreateTopic(topicName)
channel, _ := topic.GetOrCreateChannel("channel")

for i := 0; i < count; i++ {
msg := NewMessage(topic.GenerateID(), []byte("test"))
Expand Down Expand Up @@ -112,8 +112,8 @@ func TestChannelEmpty(t *testing.T) {
defer nsqd.Exit()

topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetOrCreateTopic(topicName)
channel := topic.GetOrCreateChannel("channel")
topic, _ := nsqd.GetOrCreateTopic(topicName)
channel, _ := topic.GetOrCreateChannel("channel")

msgs := make([]*Message, 0, 25)
for i := 0; i < 25; i++ {
Expand Down Expand Up @@ -148,8 +148,8 @@ func TestChannelEmptyConsumer(t *testing.T) {
defer conn.Close()

topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetOrCreateTopic(topicName)
channel := topic.GetOrCreateChannel("channel")
topic, _ := nsqd.GetOrCreateTopic(topicName)
channel, _ := topic.GetOrCreateChannel("channel")
client := newClientV2(0, conn, nsqd)
client.SetReadyCount(25)
err := channel.AddClient(client.ID, client)
Expand Down Expand Up @@ -186,8 +186,8 @@ func TestMaxChannelConsumers(t *testing.T) {
defer conn.Close()

topicName := "test_max_channel_consumers" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetOrCreateTopic(topicName)
channel := topic.GetOrCreateChannel("channel")
topic, _ := nsqd.GetOrCreateTopic(topicName)
channel, _ := topic.GetOrCreateChannel("channel")

client1 := newClientV2(1, conn, nsqd)
client1.SetReadyCount(25)
Expand All @@ -209,9 +209,9 @@ func TestChannelHealth(t *testing.T) {
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topic := nsqd.GetOrCreateTopic("test")
topic, _ := nsqd.GetOrCreateTopic("test")

channel := topic.GetOrCreateChannel("channel")
channel, _ := topic.GetOrCreateChannel("channel")

channel.backend = &errorBackendQueue{}

Expand Down Expand Up @@ -258,8 +258,8 @@ func TestChannelDraining(t *testing.T) {
defer nsqd.Exit()

topicName := "test_drain_channel" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetOrCreateTopic(topicName)
channel1 := topic.GetOrCreateChannel("ch")
topic, _ := nsqd.GetOrCreateTopic(topicName)
channel1, _ := topic.GetOrCreateChannel("ch")

msg := NewMessage(topic.GenerateID(), []byte("test"))
topic.PutMessage(msg)
Expand Down
8 changes: 4 additions & 4 deletions nsqd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ func (s *httpServer) getTopicFromQuery(req *http.Request) (url.Values, *Topic, e
if !protocol.IsValidTopicName(topicName) {
return nil, nil, http_api.Err{400, "INVALID_TOPIC"}
}
topic := s.nsqd.GetOrCreateTopic(topicName)
if topic == nil {
topic, err := s.nsqd.GetOrCreateTopic(topicName)
if err != nil {
return nil, nil, http_api.Err{503, "EXITING"}
}

Expand Down Expand Up @@ -416,8 +416,8 @@ func (s *httpServer) doCreateChannel(w http.ResponseWriter, req *http.Request, p
if err != nil {
return nil, err
}
ch := topic.GetOrCreateChannel(channelName)
if ch == nil {
_, err = topic.GetOrCreateChannel(channelName)
if err != nil {
return nil, http_api.Err{503, "EXITING"}
}
return nil, nil
Expand Down
24 changes: 12 additions & 12 deletions nsqd/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestHTTPpub(t *testing.T) {
defer nsqd.Exit()

topicName := "test_http_pub" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetOrCreateTopic(topicName)
topic, _ := nsqd.GetOrCreateTopic(topicName)

buf := bytes.NewBuffer([]byte("test message"))
url := fmt.Sprintf("http://%s/pub?topic=%s", httpAddr, topicName)
Expand All @@ -69,7 +69,7 @@ func TestHTTPpubEmpty(t *testing.T) {
defer nsqd.Exit()

topicName := "test_http_pub_empty" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetOrCreateTopic(topicName)
topic, _ := nsqd.GetOrCreateTopic(topicName)

buf := bytes.NewBuffer([]byte(""))
url := fmt.Sprintf("http://%s/pub?topic=%s", httpAddr, topicName)
Expand All @@ -93,7 +93,7 @@ func TestHTTPmpub(t *testing.T) {
defer nsqd.Exit()

topicName := "test_http_mpub" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetOrCreateTopic(topicName)
topic, _ := nsqd.GetOrCreateTopic(topicName)

msg := []byte("test message")
msgs := make([][]byte, 4)
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestHTTPmpubEmpty(t *testing.T) {
defer nsqd.Exit()

topicName := "test_http_mpub_empty" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetOrCreateTopic(topicName)
topic, _ := nsqd.GetOrCreateTopic(topicName)

msg := []byte("test message")
msgs := make([][]byte, 4)
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestHTTPmpubBinary(t *testing.T) {
defer nsqd.Exit()

topicName := "test_http_mpub_bin" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetOrCreateTopic(topicName)
topic, _ := nsqd.GetOrCreateTopic(topicName)

mpub := make([][]byte, 5)
for i := range mpub {
Expand Down Expand Up @@ -182,7 +182,7 @@ func TestHTTPmpubForNonNormalizedBinaryParam(t *testing.T) {
defer nsqd.Exit()

topicName := "test_http_mpub_bin" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetOrCreateTopic(topicName)
topic, _ := nsqd.GetOrCreateTopic(topicName)

mpub := make([][]byte, 5)
for i := range mpub {
Expand Down Expand Up @@ -211,8 +211,8 @@ func TestHTTPpubDefer(t *testing.T) {
defer nsqd.Exit()

topicName := "test_http_pub_defer" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetOrCreateTopic(topicName)
ch := topic.GetOrCreateChannel("ch")
topic, _ := nsqd.GetOrCreateTopic(topicName)
ch, _ := topic.GetOrCreateChannel("ch")

buf := bytes.NewBuffer([]byte("test message"))
url := fmt.Sprintf("http://%s/pub?topic=%s&defer=%d", httpAddr, topicName, 1000)
Expand Down Expand Up @@ -242,7 +242,7 @@ func TestHTTPSRequire(t *testing.T) {
defer nsqd.Exit()

topicName := "test_http_pub_req" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetOrCreateTopic(topicName)
topic, _ := nsqd.GetOrCreateTopic(topicName)

buf := bytes.NewBuffer([]byte("test message"))
url := fmt.Sprintf("http://%s/pub?topic=%s", httpAddr, topicName)
Expand Down Expand Up @@ -289,7 +289,7 @@ func TestHTTPSRequireVerify(t *testing.T) {

httpsAddr := nsqd.httpsListener.Addr().(*net.TCPAddr)
topicName := "test_http_pub_req_verf" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetOrCreateTopic(topicName)
topic, _ := nsqd.GetOrCreateTopic(topicName)

// no cert
buf := bytes.NewBuffer([]byte("test message"))
Expand Down Expand Up @@ -353,7 +353,7 @@ func TestTLSRequireVerifyExceptHTTP(t *testing.T) {
defer nsqd.Exit()

topicName := "test_http_req_verf_except_http" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetOrCreateTopic(topicName)
topic, _ := nsqd.GetOrCreateTopic(topicName)

// no cert
buf := bytes.NewBuffer([]byte("test message"))
Expand Down Expand Up @@ -761,7 +761,7 @@ func TestEmptyChannel(t *testing.T) {
test.Equal(t, 404, resp.StatusCode)
test.HTTPError(t, resp, 404, "TOPIC_NOT_FOUND")

topic := nsqd.GetOrCreateTopic(topicName)
topic, _ := nsqd.GetOrCreateTopic(topicName)

url = fmt.Sprintf("http://%s/channel/empty?topic=%s&channel=%s", httpAddr, topicName, channelName)
resp, err = http.Post(url, "application/json", nil)
Expand Down
22 changes: 11 additions & 11 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,8 @@ func (n *NSQD) LoadMetadata() error {
n.logf(LOG_WARN, "skipping creation of invalid topic %s", t.Name)
continue
}
topic := n.GetOrCreateTopic(t.Name)
if topic == nil {
topic, err := n.GetOrCreateTopic(t.Name)
if err != nil {
n.logf(LOG_WARN, "skipping creation of topic, nsqd draining %s", t.Name)
continue
}
Expand All @@ -370,8 +370,8 @@ func (n *NSQD) LoadMetadata() error {
n.logf(LOG_WARN, "skipping creation of invalid channel %s", c.Name)
continue
}
channel := topic.GetOrCreateChannel(c.Name)
if c.Paused && channel != nil {
channel, err := topic.GetOrCreateChannel(c.Name)
if c.Paused && err != nil {
channel.Pause()
}
}
Expand Down Expand Up @@ -487,26 +487,26 @@ func (n *NSQD) Exit() {

// GetOrCreateTopic performs a thread safe operation to get an existing topic or create a new one
//
// The creation might fail if nsqd is draining
func (n *NSQD) GetOrCreateTopic(topicName string) *Topic {
// An error will be returned if nsqd is draining
func (n *NSQD) GetOrCreateTopic(topicName string) (*Topic, error) {
// most likely, we already have this topic, so try read lock first.
n.RLock()
t, ok := n.topicMap[topicName]
n.RUnlock()
if ok {
return t
return t, nil
}

n.Lock()

t, ok = n.topicMap[topicName]
if ok {
n.Unlock()
return t
return t, nil
}
if atomic.LoadInt32(&n.isDraining) == 1 {
// don't create new topics when nsqd is draining
return nil
return nil, errors.New("nsqd draining")
}

deleteCallback := func(t *Topic) {
Expand All @@ -533,7 +533,7 @@ func (n *NSQD) GetOrCreateTopic(topicName string) *Topic {

// if loading metadata at startup, no lookupd connections yet, topic started after load
if atomic.LoadInt32(&n.isLoading) == 1 {
return t
return t, nil
}

// if using lookupd, make a blocking call to get the topics, and immediately create them.
Expand All @@ -556,7 +556,7 @@ func (n *NSQD) GetOrCreateTopic(topicName string) *Topic {

// now that all channels are added, start topic messagePump
t.Start()
return t
return t, nil
}

// GetExistingTopic gets a topic only if it exists
Expand Down
16 changes: 8 additions & 8 deletions nsqd/nsqd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,14 @@ func TestStartup(t *testing.T) {
atomic.StoreInt32(&nsqd.isLoading, 0)

body := make([]byte, 256)
topic := nsqd.GetOrCreateTopic(topicName)
topic, _ := nsqd.GetOrCreateTopic(topicName)
for i := 0; i < iterations; i++ {
msg := NewMessage(topic.GenerateID(), body)
topic.PutMessage(msg)
}

t.Logf("pulling from channel")
channel1 := topic.GetOrCreateChannel("ch1")
channel1, _ := topic.GetOrCreateChannel("ch1")

t.Logf("read %d msgs", iterations/2)
for i := 0; i < iterations/2; i++ {
Expand Down Expand Up @@ -124,12 +124,12 @@ func TestStartup(t *testing.T) {
doneExitChan <- 1
}()

topic = nsqd.GetOrCreateTopic(topicName)
topic, _ = nsqd.GetOrCreateTopic(topicName)
// should be empty; channel should have drained everything
count := topic.Depth()
test.Equal(t, int64(0), count)

channel1 = topic.GetOrCreateChannel("ch1")
channel1, _ = topic.GetOrCreateChannel("ch1")

for {
if channel1.Depth() == int64(iterations/2) {
Expand Down Expand Up @@ -176,8 +176,8 @@ func TestEphemeralTopicsAndChannels(t *testing.T) {
}()

body := []byte("an_ephemeral_message")
topic := nsqd.GetOrCreateTopic(topicName)
ephemeralChannel := topic.GetOrCreateChannel("ch1#ephemeral")
topic, _ := nsqd.GetOrCreateTopic(topicName)
ephemeralChannel, _ := topic.GetOrCreateChannel("ch1#ephemeral")
client := newClientV2(0, nil, nsqd)
err := ephemeralChannel.AddClient(client.ID, client)
test.Equal(t, err, nil)
Expand Down Expand Up @@ -215,8 +215,8 @@ func TestPauseMetadata(t *testing.T) {
// avoid concurrency issue of async PersistMetadata() calls
atomic.StoreInt32(&nsqd.isLoading, 1)
topicName := "pause_metadata" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetOrCreateTopic(topicName)
channel := topic.GetOrCreateChannel("ch")
topic, _ := nsqd.GetOrCreateTopic(topicName)
channel, _ := topic.GetOrCreateChannel("ch")
atomic.StoreInt32(&nsqd.isLoading, 0)
nsqd.PersistMetadata()

Expand Down
Loading

0 comments on commit 9cc5d8b

Please sign in to comment.