From 6c6bc002293ce4b303efdb7cb5c65044406960ae Mon Sep 17 00:00:00 2001 From: Jehiah Czebotar Date: Thu, 26 Nov 2020 23:15:05 -0500 Subject: [PATCH] ctx refactor --- nsqd/channel.go | 2 +- nsqd/http.go | 8 ++++---- nsqd/topic.go | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/nsqd/channel.go b/nsqd/channel.go index cf6506849..753f86993 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -160,7 +160,7 @@ func (c *Channel) StartDraining() { depth := c.Depth() inFlight := int64(c.InFlightCount()) deferred := int64(c.DeferredCount()) - c.ctx.nsqd.logf(LOG_INFO, "CHANNEL(%s): draining depth:%d inFlight:%d deferred:%d", c.name, depth, inFlight, deferred) + c.nsqd.logf(LOG_INFO, "CHANNEL(%s): draining depth:%d inFlight:%d deferred:%d", c.name, depth, inFlight, deferred) // if we are empty delete if depth+inFlight+deferred == 0 { go c.Delete() diff --git a/nsqd/http.go b/nsqd/http.go index 347a99941..c3fc10048 100644 --- a/nsqd/http.go +++ b/nsqd/http.go @@ -408,7 +408,7 @@ func (s *httpServer) doPauseTopic(w http.ResponseWriter, req *http.Request, ps h func (s *httpServer) doDrainTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { - s.ctx.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err) + s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err) return nil, http_api.Err{400, "INVALID_REQUEST"} } @@ -417,14 +417,14 @@ func (s *httpServer) doDrainTopic(w http.ResponseWriter, req *http.Request, ps h return nil, http_api.Err{400, "MISSING_ARG_TOPIC"} } - topic, err := s.ctx.nsqd.GetExistingTopic(topicName) + topic, err := s.nsqd.GetExistingTopic(topicName) if err != nil { return nil, http_api.Err{404, "TOPIC_NOT_FOUND"} } err = topic.StartDraining() if err != nil { - s.ctx.nsqd.logf(LOG_ERROR, "failure in %s - %s", req.URL.Path, err) + s.nsqd.logf(LOG_ERROR, "failure in %s - %s", req.URL.Path, err) return nil, http_api.Err{500, "INTERNAL_ERROR"} } return nil, nil @@ -775,7 +775,7 @@ func getOptByCfgName(opts interface{}, name string) (interface{}, bool) { } func (s *httpServer) startDraining(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { - err := s.ctx.nsqd.StartDraining() + err := s.nsqd.StartDraining() if err != nil { return nil, http_api.Err{500, "TODO"} } diff --git a/nsqd/topic.go b/nsqd/topic.go index 4f86b2412..70d07b745 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -112,7 +112,7 @@ func (t *Topic) StartDraining() error { atomic.StoreInt32(&t.isDraining, 1) msgsLeft := int64(len(t.memoryMsgChan)) + t.backend.Depth() - t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): draining. topic depth:%d", t.name, msgsLeft) + t.nsqd.logf(LOG_INFO, "TOPIC(%s): draining. topic depth:%d", t.name, msgsLeft) // if no outstanding messages, start channel drain if msgsLeft == 0 {