Skip to content

Commit

Permalink
ctx refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
jehiah committed Nov 28, 2020
1 parent e4c7257 commit 6c6bc00
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 6 deletions.
2 changes: 1 addition & 1 deletion nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (c *Channel) StartDraining() {
depth := c.Depth()
inFlight := int64(c.InFlightCount())
deferred := int64(c.DeferredCount())
c.ctx.nsqd.logf(LOG_INFO, "CHANNEL(%s): draining depth:%d inFlight:%d deferred:%d", c.name, depth, inFlight, deferred)
c.nsqd.logf(LOG_INFO, "CHANNEL(%s): draining depth:%d inFlight:%d deferred:%d", c.name, depth, inFlight, deferred)
// if we are empty delete
if depth+inFlight+deferred == 0 {
go c.Delete()
Expand Down
8 changes: 4 additions & 4 deletions nsqd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func (s *httpServer) doPauseTopic(w http.ResponseWriter, req *http.Request, ps h
func (s *httpServer) doDrainTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
reqParams, err := http_api.NewReqParams(req)
if err != nil {
s.ctx.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err)
s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err)
return nil, http_api.Err{400, "INVALID_REQUEST"}
}

Expand All @@ -417,14 +417,14 @@ func (s *httpServer) doDrainTopic(w http.ResponseWriter, req *http.Request, ps h
return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
}

topic, err := s.ctx.nsqd.GetExistingTopic(topicName)
topic, err := s.nsqd.GetExistingTopic(topicName)
if err != nil {
return nil, http_api.Err{404, "TOPIC_NOT_FOUND"}
}

err = topic.StartDraining()
if err != nil {
s.ctx.nsqd.logf(LOG_ERROR, "failure in %s - %s", req.URL.Path, err)
s.nsqd.logf(LOG_ERROR, "failure in %s - %s", req.URL.Path, err)
return nil, http_api.Err{500, "INTERNAL_ERROR"}
}
return nil, nil
Expand Down Expand Up @@ -775,7 +775,7 @@ func getOptByCfgName(opts interface{}, name string) (interface{}, bool) {
}

func (s *httpServer) startDraining(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
err := s.ctx.nsqd.StartDraining()
err := s.nsqd.StartDraining()
if err != nil {
return nil, http_api.Err{500, "TODO"}
}
Expand Down
2 changes: 1 addition & 1 deletion nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (t *Topic) StartDraining() error {
atomic.StoreInt32(&t.isDraining, 1)

msgsLeft := int64(len(t.memoryMsgChan)) + t.backend.Depth()
t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): draining. topic depth:%d", t.name, msgsLeft)
t.nsqd.logf(LOG_INFO, "TOPIC(%s): draining. topic depth:%d", t.name, msgsLeft)

// if no outstanding messages, start channel drain
if msgsLeft == 0 {
Expand Down

0 comments on commit 6c6bc00

Please sign in to comment.