From 56f6d67a66eac57845104c098f695f3d7b51d6f7 Mon Sep 17 00:00:00 2001 From: Jacek Wysocki Date: Thu, 25 Jan 2024 15:48:26 +0100 Subject: [PATCH] fix: added adapter stop (#4948) --- pkg/logs/events.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/logs/events.go b/pkg/logs/events.go index 91bb1c59fe4..c8d8ab7512a 100644 --- a/pkg/logs/events.go +++ b/pkg/logs/events.go @@ -187,8 +187,15 @@ func (ls *LogsService) handleStop(ctx context.Context) func(msg *nats.Msg) { wg.Add(1) stopped++ consumer := c.(Consumer) - go ls.stopConsumer(ctx, &wg, consumer) + + // call adapter stop to handle given id + err := adapter.Stop(event.Id) + if err != nil { + l.Errorw("stop error", "adapter", adapter.Name(), "error", err) + continue + } + } wg.Wait() @@ -200,7 +207,6 @@ func (ls *LogsService) handleStop(ctx context.Context) func(msg *nats.Msg) { } else { l.Debugw("no consumers found on this pod to stop") } - } } @@ -231,7 +237,9 @@ func (ls *LogsService) stopConsumer(ctx context.Context, wg *sync.WaitGroup, con // check if there was some messages processed if nothingToProcess && messagesDelivered { + // stop nats consumer consumer.Context.Stop() + // delete nats consumer instance from memory ls.consumerInstances.Delete(consumer.Name) l.Infow("stopping and removing consumer", "name", consumer.Name, "consumerSeq", info.Delivered.Consumer, "streamSeq", info.Delivered.Stream, "last", info.Delivered.Last) return