-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
nsqd: support draining messages / removing nsqd from rotation #1305
base: master
Are you sure you want to change the base?
Conversation
adfc18b
to
4d00173
Compare
RFR @mreiferson @ploxiln - this is ready for a review pass. I'm pretty happy with how this came out, but still have another pass to make to expand some test coverage. |
go.mod
Outdated
@@ -16,4 +18,4 @@ require ( | |||
golang.org/x/sys v0.0.0-20191224085550-c709ea063b76 // indirect | |||
) | |||
|
|||
go 1.13 | |||
replace github.com/judwhite/go-svc => github.com/jehiah/go-svc v1.1.3-0.20201125205428-33f8faa2d870 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO pending judwhite/go-svc#15
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good so far! 👍
nsqd/nsqd.go
Outdated
@@ -477,8 +489,24 @@ func (n *NSQD) GetTopic(topicName string) *Topic { | |||
n.Unlock() | |||
return t | |||
} | |||
if atomic.LoadInt32(&n.isDraining) == 1 { | |||
// don't create new topics when nsqd is draining | |||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Callers of GetTopic()
previously expected a non-nil response. It seems like this check is to prevent any of the code paths that create topics as a side effect from recreating topics during drain.
Would it be better to just bite-the-bullet and convert this to return an error rather than retrofitting this into a nil
response?
Alternatively, we could add an nsqd.IsTopicDraining()
func and the call sites that create topics as side effects can check it before calling GetTopic()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I generally agree this is the most non-obvious bit of logic - I'll start by renaming these to GetOrCreate....
and docstring that creation may not succeed.
I think despite being atomic we want the isDraining
here in the read lock so this can't race a concurrent nsqd starting to drain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point on the race. Still, we shouldn't abuse nil
here. This is a new edge case where a topic won't be returned and we have to modify all call sites anyway to check for nil
, might as well do it "right" and add a proper error
return value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PTAL - I think the rename to GetOrCreate{Topic,Channel}
helped a lot; I also documented that these may or may not succeed. Given that, checking nil feels an idiomatic way to implement.
We could switch but feels like bike shedding? The rename was helpful in ensuring i've covered all code paths (a few more were updated) - i should have done a more exhaustive check originally.
// If in draining mode and we wrote a message to channels | ||
// check if it was the last message on the topic (there are no more left) | ||
// in which case we start draining each channel | ||
if atomic.LoadInt32(&t.isDraining) == 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably worth benchmarking... but we've tried hard to minimize the logic in the core message loop. In this case, draining is an irreversible state, meaning we just need to propagate the state change locally into this goroutine (e.g. like paused state) rather than check an atomic for every message published.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to write some benchmarks generally for messagePump
but I couldn't get a consistent benchmark output. (as such i'll probably drop that benchmark from this PR before landing - unless it looks useful). In trying to benchmark i ran into issues with GUID sequences expiring so I switched some benchmarks to use an integer sequence to avoid timing impacts from GUID sequences expiring.
name old time/op new time/op delta
TopicMessagePump-2 3.06µs ±181% 7.04µs ±131% ~ (p=0.113 n=9+10)
I'd assume a refactor which added another 'drainChanchannel for
Topic.messagePumpto watch for draining state updates would be more expensive than
atomic.LoadInt32` but.. i'm not sure - @ploxiln have any intuition here?
It seems like it would be simpler to just dis-allow publishing messages. Allow creating channels like normal, and don't bother deleting any before regular shutdown. Just return error for any message publish request. The only other bit of logic needed is the check on each FIN if this channel is now completely empty, and then if it was the last one. |
@ploxiln that functionality would be similar but i'm not sure it would be easier to implement because you then need a new method of propagating FIN's back up where this piggy backs on the delete functionality. - #1302 (comment) discusses the tradeoffs between the two and this is the functionality i'm interested in (to remove a node from rotation) - Can you think of a compelling use case for the other, or where it becomes materially different? |
I see. I didn't have a need for the other behavior, of keeping the topics/channels until exit, I just think it may be simpler to implement, and sufficient. Fewer global state checks, fewer new error cases. There was some mess with ephemeral topics/channels being deleted ... though it looks like this case may not be so bad. |
@mreiferson PTAL - i think this is the main outstanding discussion to resolve - #1305 (comment) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Looks like this needs a rebase)
As @ploxiln mentioned, there are a lot of global state checks. I'm still trying to map it all out, but it feels like we're missing something.
I am also still unhappy with the nil
checks in the revised Get*Topic
paths, especially the implicit knowledge that call sites now have that it means draining (i.e. why not return that error?).
depth, inFlight, deferred := c.Depth(), c.InFlightCount(), c.DeferredCount() | ||
c.nsqd.logf(LOG_INFO, "CHANNEL(%s): draining. depth:%d inFlight:%d deferred:%d", c.name, depth, inFlight, deferred) | ||
// if we are empty delete | ||
if depth+inFlight+deferred == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a race condition b/w this line and line 162 (a concurrent pub that had already passed the isDraining
check but lost the race w/ line 162)
router.Handle("POST", "/channel/create", http_api.Decorate(s.doCreateChannel, log, http_api.V1)) | ||
router.Handle("POST", "/channel/delete", http_api.Decorate(s.doDeleteChannel, log, http_api.V1)) | ||
router.Handle("POST", "/channel/empty", http_api.Decorate(s.doEmptyChannel, log, http_api.V1)) | ||
router.Handle("POST", "/channel/pause", http_api.Decorate(s.doPauseChannel, log, http_api.V1)) | ||
router.Handle("POST", "/channel/unpause", http_api.Decorate(s.doPauseChannel, log, http_api.V1)) | ||
router.Handle("GET", "/config/:opt", http_api.Decorate(s.doConfig, log, http_api.V1)) | ||
router.Handle("PUT", "/config/:opt", http_api.Decorate(s.doConfig, log, http_api.V1)) | ||
router.Handle("PUT", "/state/drain", http_api.Decorate(s.startDraining, log, http_api.V1)) | ||
router.Handle("PUT", "/state/shutdown", http_api.Decorate(s.shutdown, log, http_api.V1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming bla bla bla... do we need the /state
prefix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From an API layout (and documentation) perspective I felt it was good to have these grouped together, but i'm also not sold on the /state
prefix; Any other naming ideas?
go func() { | ||
// in some cases StartDraining results in an exit immediately | ||
// allow this API call to respond before exiting | ||
time.Sleep(time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's up with these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is a HTTP request, there is some desire to allow the HTTP request to complete (i.e. the caller should get a HTTP response back) before the action takes place. If we call immediately an shutdown can (and will in many cases) close this connection and the caller will get a connection error.
I'm open to other ways to solve this (because it's obviously not a generally good pattern to rely on timing), but this seemed reasonable for the scope of this endpoint.
nsqd/nsqd.go
Outdated
@@ -350,7 +357,11 @@ func (n *NSQD) LoadMetadata() error { | |||
n.logf(LOG_WARN, "skipping creation of invalid topic %s", t.Name) | |||
continue | |||
} | |||
topic := n.GetTopic(t.Name) | |||
topic := n.GetOrCreateTopic(t.Name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need these changes in LoadMetadata
. It is only ever called from Main()
, so it's impossible for it to encounter topics/channels that are being drained.
I was debating the rebase mid-review to pickup upstream go-svc changes 🤷
Open to ideas
ok; updated. |
This adds support for a
nsqd
mode where messages are drained to facilitate removing ansqd
instance from a cluster.New
nsqd
API EndpointsPUT /state/shutdown
PUT /state/drain
POST /topic/drain?topic=...
New
nsqd
CLI options (also settable via config file)--sigterm-mode=shutdown
(default - existing behavior) and--sigterm-mode=drain
(new option)Resolves #1302