Skip to content

Commit

Permalink
add zone & region msg channels to flush and empty
Browse files Browse the repository at this point in the history
  • Loading branch information
zoemccormick committed Apr 24, 2024
1 parent 00fcfa9 commit f700139
Showing 1 changed file with 15 additions and 3 deletions.
18 changes: 15 additions & 3 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ func (c *Channel) Empty() error {

for {
select {
case <-c.zoneLocalMsgChan:
case <-c.regionLocalMsgChan:
case <-c.memoryMsgChan:
default:
goto finish
Expand All @@ -227,13 +229,23 @@ finish:
// flush persists all the messages in internal memory buffers to the backend
// it does not drain inflight/deferred because it is only called in Close()
func (c *Channel) flush() error {
if len(c.memoryMsgChan) > 0 || len(c.inFlightMessages) > 0 || len(c.deferredMessages) > 0 {
if len(c.zoneLocalMsgChan) > 0 || len(c.regionLocalMsgChan) > 0 || len(c.memoryMsgChan) > 0 || len(c.inFlightMessages) > 0 || len(c.deferredMessages) > 0 {
c.nsqd.logf(LOG_INFO, "CHANNEL(%s): flushing %d memory %d in-flight %d deferred messages to backend",
c.name, len(c.memoryMsgChan), len(c.inFlightMessages), len(c.deferredMessages))
c.name, len(c.memoryMsgChan)+len(c.zoneLocalMsgChan)+len(c.regionLocalMsgChan), len(c.inFlightMessages), len(c.deferredMessages))
}

for {
select {
case msg := <-c.zoneLocalMsgChan:
err := writeMessageToBackend(msg, c.backend)
if err != nil {
c.nsqd.logf(LOG_ERROR, "failed to write message to backend - %s", err)
}
case msg := <-c.regionLocalMsgChan:
err := writeMessageToBackend(msg, c.backend)
if err != nil {
c.nsqd.logf(LOG_ERROR, "failed to write message to backend - %s", err)
}
case msg := <-c.memoryMsgChan:
err := writeMessageToBackend(msg, c.backend)
if err != nil {
Expand Down Expand Up @@ -268,7 +280,7 @@ finish:
}

func (c *Channel) Depth() int64 {
return int64(len(c.memoryMsgChan)) + c.backend.Depth()
return int64(len(c.memoryMsgChan)) + int64(len(c.zoneLocalMsgChan)) + int64(len(c.regionLocalMsgChan)) + c.backend.Depth()
}

func (c *Channel) Pause() error {
Expand Down

0 comments on commit f700139

Please sign in to comment.