From f029850af4a20af7c2c824d25dbc08ef0f991fda Mon Sep 17 00:00:00 2001 From: Elvis58 Date: Wed, 9 Nov 2022 16:14:29 +0800 Subject: [PATCH] fix dq consumer stop bug --- dq/consumer.go | 7 ++++--- example/dq/consumer/consumer.go | 5 ++++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/dq/consumer.go b/dq/consumer.go index daf4c1c..5b5dde4 100644 --- a/dq/consumer.go +++ b/dq/consumer.go @@ -22,7 +22,7 @@ type ( Consume func(body []byte) Consumer interface { - Consume(consume Consume) + Consume(consume Consume) *service.ServiceGroup } consumerCluster struct { @@ -42,7 +42,7 @@ func NewConsumer(c DqConf) Consumer { } } -func (c *consumerCluster) Consume(consume Consume) { +func (c *consumerCluster) Consume(consume Consume) *service.ServiceGroup { guardedConsume := func(body []byte) { key := hash.Md5Hex(body) body, ok := c.unwrap(body) @@ -66,7 +66,8 @@ func (c *consumerCluster) Consume(consume Consume) { consume: guardedConsume, }) } - group.Start() + + return group } func (c *consumerCluster) unwrap(body []byte) ([]byte, bool) { diff --git a/example/dq/consumer/consumer.go b/example/dq/consumer/consumer.go index 70760bf..15b429d 100644 --- a/example/dq/consumer/consumer.go +++ b/example/dq/consumer/consumer.go @@ -24,7 +24,10 @@ func main() { Type: redis.NodeType, }, }) - consumer.Consume(func(body []byte) { + group := consumer.Consume(func(body []byte) { fmt.Println(string(body)) }) + + defer group.Stop() + group.Start() }