From 0fd3b6dc892ce67a3d352b312743ee773d81e66e Mon Sep 17 00:00:00 2001 From: Ravi Atluri Date: Mon, 18 Nov 2024 09:49:39 +0530 Subject: [PATCH] Fix async consumer to support graceful shutdown by introducing stopOffset flag --- examples/xkafka/basic.go | 2 +- examples/xkafka/batch.go | 14 ++++++++------ examples/xkafka/producer.go | 4 +++- examples/xkafka/state.go | 13 +++++++++---- xkafka/batch_consumer.go | 10 +++++++++- xkafka/consumer.go | 9 ++++++++- 6 files changed, 38 insertions(+), 14 deletions(-) diff --git a/examples/xkafka/basic.go b/examples/xkafka/basic.go index 8497f81..c5a963c 100644 --- a/examples/xkafka/basic.go +++ b/examples/xkafka/basic.go @@ -54,7 +54,7 @@ func runBasic(c *cli.Context) error { func basicHandler(tracker *Tracker) xkafka.HandlerFunc { return func(ctx context.Context, msg *xkafka.Message) error { - err := tracker.SimulateWork() + err := tracker.SimulateWork(msg) if err != nil { msg.AckFail(err) diff --git a/examples/xkafka/batch.go b/examples/xkafka/batch.go index fbc6012..dcb5c57 100644 --- a/examples/xkafka/batch.go +++ b/examples/xkafka/batch.go @@ -5,6 +5,7 @@ import ( "time" "github.com/rs/zerolog" + "github.com/rs/zerolog/log" "github.com/urfave/cli/v2" "github.com/gojekfarm/xrun" @@ -59,12 +60,12 @@ func runBatch(c *cli.Context) error { func batchHandler(tracker *Tracker) xkafka.BatchHandlerFunc { return func(ctx context.Context, batch *xkafka.Batch) error { - err := tracker.SimulateWork() - if err != nil { - return batch.AckFail(err) - } - for _, msg := range batch.Messages { + err := tracker.SimulateWork(msg) + if err != nil { + return batch.AckFail(err) + } + tracker.Ack(msg) } @@ -77,7 +78,6 @@ func batchHandler(tracker *Tracker) xkafka.BatchHandlerFunc { } func runBatchConsumers(ctx context.Context, tracker *Tracker, pods int, opts ...xkafka.ConsumerOption) { - log := zerolog.Ctx(ctx) handler := batchHandler(tracker) for { @@ -102,6 +102,8 @@ func runBatchConsumers(ctx context.Context, tracker *Tracker, pods int, opts ... components = append(components, bc) } + log.Info().Msg("Running consumers") + err := xrun.All(xrun.NoTimeout, components...).Run(ctx) if err != nil { log.Error().Err(err).Msg("Error running consumers") diff --git a/examples/xkafka/producer.go b/examples/xkafka/producer.go index 4356ba1..a1ef07e 100644 --- a/examples/xkafka/producer.go +++ b/examples/xkafka/producer.go @@ -24,8 +24,10 @@ func publishMessages(messages []*xkafka.Message) { defer producer.Close() + ctx := context.Background() + for _, msg := range messages { - if err := producer.Publish(context.Background(), msg); err != nil { + if err := producer.AsyncPublish(ctx, msg); err != nil { panic(err) } } diff --git a/examples/xkafka/state.go b/examples/xkafka/state.go index c3d0d87..cb6171c 100644 --- a/examples/xkafka/state.go +++ b/examples/xkafka/state.go @@ -1,7 +1,7 @@ package main import ( - "errors" + "fmt" "sync" "time" @@ -41,10 +41,12 @@ func (t *Tracker) Ack(msg *xkafka.Message) { t.received[string(msg.Key)] = msg t.order = append(t.order, string(msg.Key)) + + log.Info().Msgf("[TRACKER] %d/%d", len(t.received), len(t.expect)) } -func (t *Tracker) SimulateWork() error { - <-time.After(time.Duration(rand.Int63n(200)) * time.Millisecond) +func (t *Tracker) SimulateWork(msg *xkafka.Message) error { + <-time.After(time.Duration(rand.Int63n(50)) * time.Millisecond) t.mu.Lock() defer t.mu.Unlock() @@ -55,7 +57,10 @@ func (t *Tracker) SimulateWork() error { if len(t.received) >= after && !t.simulateError { t.simulateError = true - return errors.New("simulated error") + return fmt.Errorf( + "simulated error. partition %d, offset %d", + msg.Partition, msg.Offset, + ) } return nil diff --git a/xkafka/batch_consumer.go b/xkafka/batch_consumer.go index f47895e..e8faef7 100644 --- a/xkafka/batch_consumer.go +++ b/xkafka/batch_consumer.go @@ -4,6 +4,7 @@ import ( "context" "errors" "strings" + "sync/atomic" "time" "github.com/confluentinc/confluent-kafka-go/v2/kafka" @@ -18,6 +19,7 @@ type BatchConsumer struct { handler BatchHandler middlewares []BatchMiddlewarer config *consumerConfig + stopOffset atomic.Bool } // NewBatchConsumer creates a new BatchConsumer instance. @@ -229,7 +231,9 @@ func (c *BatchConsumer) processBatchAsync( if ferr := c.config.errorHandler(err); ferr != nil { cancel(ferr) - return func() {} + return func() { + c.stopOffset.Store(true) + } } return func() { @@ -245,6 +249,10 @@ func (c *BatchConsumer) storeBatch(batch *Batch) error { return nil } + if c.stopOffset.Load() { + return nil + } + tps := batch.GroupMaxOffset() _, err := c.kafka.StoreOffsets(tps) diff --git a/xkafka/consumer.go b/xkafka/consumer.go index 1506751..7423a56 100644 --- a/xkafka/consumer.go +++ b/xkafka/consumer.go @@ -20,6 +20,7 @@ type Consumer struct { middlewares []Middlewarer config *consumerConfig cancelCtx atomic.Pointer[context.CancelFunc] + stopOffset atomic.Bool } // NewConsumer creates a new Consumer instance. @@ -207,7 +208,9 @@ func (c *Consumer) runAsync(ctx context.Context) error { if ferr := c.config.errorHandler(err); ferr != nil { cancel(ferr) - return func() {} + return func() { + c.stopOffset.Store(true) + } } return func() { @@ -225,6 +228,10 @@ func (c *Consumer) storeMessage(msg *Message) error { return nil } + if c.stopOffset.Load() { + return nil + } + // similar to StoreMessage in confluent-kafka-go/consumer.go // msg.Offset + 1 it ensures that the consumer starts with // next message when it restarts