Skip to content

Commit

Permalink
fix: added adapter stop (#4948)
Browse files Browse the repository at this point in the history
  • Loading branch information
exu authored Jan 25, 2024
1 parent ede8db8 commit 56f6d67
Showing 1 changed file with 10 additions and 2 deletions.
12 changes: 10 additions & 2 deletions pkg/logs/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,15 @@ func (ls *LogsService) handleStop(ctx context.Context) func(msg *nats.Msg) {
wg.Add(1)
stopped++
consumer := c.(Consumer)

go ls.stopConsumer(ctx, &wg, consumer)

// call adapter stop to handle given id
err := adapter.Stop(event.Id)
if err != nil {
l.Errorw("stop error", "adapter", adapter.Name(), "error", err)
continue
}

}

wg.Wait()
Expand All @@ -200,7 +207,6 @@ func (ls *LogsService) handleStop(ctx context.Context) func(msg *nats.Msg) {
} else {
l.Debugw("no consumers found on this pod to stop")
}

}
}

Expand Down Expand Up @@ -231,7 +237,9 @@ func (ls *LogsService) stopConsumer(ctx context.Context, wg *sync.WaitGroup, con

// check if there was some messages processed
if nothingToProcess && messagesDelivered {
// stop nats consumer
consumer.Context.Stop()
// delete nats consumer instance from memory
ls.consumerInstances.Delete(consumer.Name)
l.Infow("stopping and removing consumer", "name", consumer.Name, "consumerSeq", info.Delivered.Consumer, "streamSeq", info.Delivered.Stream, "last", info.Delivered.Last)
return
Expand Down

0 comments on commit 56f6d67

Please sign in to comment.