From d037958371b67b5d0317b45cf9e0a9032ac8fc2f Mon Sep 17 00:00:00 2001 From: Vigith Maurice Date: Thu, 26 Sep 2024 17:12:23 -0700 Subject: [PATCH 1/2] fix: remove unwanted go routines Signed-off-by: Vigith Maurice --- pkg/sourcer/service.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/pkg/sourcer/service.go b/pkg/sourcer/service.go index d3f0fe7c..98c36735 100644 --- a/pkg/sourcer/service.go +++ b/pkg/sourcer/service.go @@ -9,7 +9,6 @@ import ( "runtime/debug" "time" - "golang.org/x/sync/errgroup" "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/timestamppb" @@ -167,14 +166,10 @@ func (fs *Service) AckFn(stream sourcepb.Source_AckFnServer) error { } for { - g, ctx := errgroup.WithContext(ctx) - - g.Go(func() error { - return fs.receiveAckRequests(ctx, stream) - }) + err := fs.receiveAckRequests(ctx, stream) // Wait for the goroutines to finish - if err := g.Wait(); err != nil { + if err != nil { if errors.Is(err, io.EOF) { return nil } From d5f50a6026e01c7f82e53d83eb8e2b17b4797ab0 Mon Sep 17 00:00:00 2001 From: Vigith Maurice Date: Thu, 26 Sep 2024 19:03:36 -0700 Subject: [PATCH 2/2] chore: fix stale comment Signed-off-by: Vigith Maurice --- pkg/sourcer/service.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/sourcer/service.go b/pkg/sourcer/service.go index 98c36735..90cb63d2 100644 --- a/pkg/sourcer/service.go +++ b/pkg/sourcer/service.go @@ -168,7 +168,6 @@ func (fs *Service) AckFn(stream sourcepb.Source_AckFnServer) error { for { err := fs.receiveAckRequests(ctx, stream) - // Wait for the goroutines to finish if err != nil { if errors.Is(err, io.EOF) { return nil