diff --git a/nsqd/channel.go b/nsqd/channel.go index 3c106ddb7..a9919a836 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -214,6 +214,8 @@ func (c *Channel) Empty() error { for { select { + case <-c.zoneLocalMsgChan: + case <-c.regionLocalMsgChan: case <-c.memoryMsgChan: default: goto finish @@ -227,13 +229,23 @@ finish: // flush persists all the messages in internal memory buffers to the backend // it does not drain inflight/deferred because it is only called in Close() func (c *Channel) flush() error { - if len(c.memoryMsgChan) > 0 || len(c.inFlightMessages) > 0 || len(c.deferredMessages) > 0 { + if len(c.zoneLocalMsgChan) > 0 || len(c.regionLocalMsgChan) > 0 || len(c.memoryMsgChan) > 0 || len(c.inFlightMessages) > 0 || len(c.deferredMessages) > 0 { c.nsqd.logf(LOG_INFO, "CHANNEL(%s): flushing %d memory %d in-flight %d deferred messages to backend", - c.name, len(c.memoryMsgChan), len(c.inFlightMessages), len(c.deferredMessages)) + c.name, len(c.memoryMsgChan)+len(c.zoneLocalMsgChan)+len(c.regionLocalMsgChan), len(c.inFlightMessages), len(c.deferredMessages)) } for { select { + case msg := <-c.zoneLocalMsgChan: + err := writeMessageToBackend(msg, c.backend) + if err != nil { + c.nsqd.logf(LOG_ERROR, "failed to write message to backend - %s", err) + } + case msg := <-c.regionLocalMsgChan: + err := writeMessageToBackend(msg, c.backend) + if err != nil { + c.nsqd.logf(LOG_ERROR, "failed to write message to backend - %s", err) + } case msg := <-c.memoryMsgChan: err := writeMessageToBackend(msg, c.backend) if err != nil { @@ -268,7 +280,7 @@ finish: } func (c *Channel) Depth() int64 { - return int64(len(c.memoryMsgChan)) + c.backend.Depth() + return int64(len(c.memoryMsgChan)) + int64(len(c.zoneLocalMsgChan)) + int64(len(c.regionLocalMsgChan)) + c.backend.Depth() } func (c *Channel) Pause() error {