Skip to content

Commit

Permalink
Fix async consumer to support graceful shutdown by introducing stopOf…
Browse files Browse the repository at this point in the history
…fset flag
  • Loading branch information
sonnes committed Nov 18, 2024
1 parent 38bd50e commit 0fd3b6d
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 14 deletions.
2 changes: 1 addition & 1 deletion examples/xkafka/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func runBasic(c *cli.Context) error {

func basicHandler(tracker *Tracker) xkafka.HandlerFunc {
return func(ctx context.Context, msg *xkafka.Message) error {
err := tracker.SimulateWork()
err := tracker.SimulateWork(msg)
if err != nil {
msg.AckFail(err)

Expand Down
14 changes: 8 additions & 6 deletions examples/xkafka/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/urfave/cli/v2"

"github.com/gojekfarm/xrun"
Expand Down Expand Up @@ -59,12 +60,12 @@ func runBatch(c *cli.Context) error {

func batchHandler(tracker *Tracker) xkafka.BatchHandlerFunc {
return func(ctx context.Context, batch *xkafka.Batch) error {
err := tracker.SimulateWork()
if err != nil {
return batch.AckFail(err)
}

for _, msg := range batch.Messages {
err := tracker.SimulateWork(msg)
if err != nil {
return batch.AckFail(err)
}

tracker.Ack(msg)
}

Expand All @@ -77,7 +78,6 @@ func batchHandler(tracker *Tracker) xkafka.BatchHandlerFunc {
}

func runBatchConsumers(ctx context.Context, tracker *Tracker, pods int, opts ...xkafka.ConsumerOption) {
log := zerolog.Ctx(ctx)
handler := batchHandler(tracker)

for {
Expand All @@ -102,6 +102,8 @@ func runBatchConsumers(ctx context.Context, tracker *Tracker, pods int, opts ...
components = append(components, bc)
}

log.Info().Msg("Running consumers")

err := xrun.All(xrun.NoTimeout, components...).Run(ctx)
if err != nil {
log.Error().Err(err).Msg("Error running consumers")
Expand Down
4 changes: 3 additions & 1 deletion examples/xkafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ func publishMessages(messages []*xkafka.Message) {

defer producer.Close()

ctx := context.Background()

for _, msg := range messages {
if err := producer.Publish(context.Background(), msg); err != nil {
if err := producer.AsyncPublish(ctx, msg); err != nil {
panic(err)
}
}
Expand Down
13 changes: 9 additions & 4 deletions examples/xkafka/state.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package main

import (
"errors"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -41,10 +41,12 @@ func (t *Tracker) Ack(msg *xkafka.Message) {

t.received[string(msg.Key)] = msg
t.order = append(t.order, string(msg.Key))

log.Info().Msgf("[TRACKER] %d/%d", len(t.received), len(t.expect))
}

func (t *Tracker) SimulateWork() error {
<-time.After(time.Duration(rand.Int63n(200)) * time.Millisecond)
func (t *Tracker) SimulateWork(msg *xkafka.Message) error {
<-time.After(time.Duration(rand.Int63n(50)) * time.Millisecond)

t.mu.Lock()
defer t.mu.Unlock()
Expand All @@ -55,7 +57,10 @@ func (t *Tracker) SimulateWork() error {
if len(t.received) >= after && !t.simulateError {
t.simulateError = true

return errors.New("simulated error")
return fmt.Errorf(
"simulated error. partition %d, offset %d",
msg.Partition, msg.Offset,
)
}

return nil
Expand Down
10 changes: 9 additions & 1 deletion xkafka/batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"strings"
"sync/atomic"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
Expand All @@ -18,6 +19,7 @@ type BatchConsumer struct {
handler BatchHandler
middlewares []BatchMiddlewarer
config *consumerConfig
stopOffset atomic.Bool
}

// NewBatchConsumer creates a new BatchConsumer instance.
Expand Down Expand Up @@ -229,7 +231,9 @@ func (c *BatchConsumer) processBatchAsync(
if ferr := c.config.errorHandler(err); ferr != nil {
cancel(ferr)

return func() {}
return func() {
c.stopOffset.Store(true)
}
}

return func() {
Expand All @@ -245,6 +249,10 @@ func (c *BatchConsumer) storeBatch(batch *Batch) error {
return nil
}

if c.stopOffset.Load() {
return nil
}

tps := batch.GroupMaxOffset()

_, err := c.kafka.StoreOffsets(tps)
Expand Down
9 changes: 8 additions & 1 deletion xkafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Consumer struct {
middlewares []Middlewarer
config *consumerConfig
cancelCtx atomic.Pointer[context.CancelFunc]
stopOffset atomic.Bool
}

// NewConsumer creates a new Consumer instance.
Expand Down Expand Up @@ -207,7 +208,9 @@ func (c *Consumer) runAsync(ctx context.Context) error {
if ferr := c.config.errorHandler(err); ferr != nil {
cancel(ferr)

return func() {}
return func() {
c.stopOffset.Store(true)
}
}

return func() {
Expand All @@ -225,6 +228,10 @@ func (c *Consumer) storeMessage(msg *Message) error {
return nil
}

if c.stopOffset.Load() {
return nil
}

// similar to StoreMessage in confluent-kafka-go/consumer.go
// msg.Offset + 1 it ensures that the consumer starts with
// next message when it restarts
Expand Down

0 comments on commit 0fd3b6d

Please sign in to comment.