Skip to content

Commit

Permalink
fix unit test
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Oct 3, 2024
1 parent 96451e7 commit 9179ac5
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 8 deletions.
7 changes: 4 additions & 3 deletions pkg/mapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error {
return err
}
case <-grpCtx.Done():
return nil
return grpCtx.Err()
}
}
})
Expand Down Expand Up @@ -129,10 +129,11 @@ func (fs *Service) performHandshake(stream mappb.Map_MapFnServer) error {
}

// handleRequest processes each request and sends the response to the response channel.
func (fs *Service) handleRequest(ctx context.Context, req *mappb.MapRequest, responseCh chan<- *mappb.MapResponse) error {
func (fs *Service) handleRequest(ctx context.Context, req *mappb.MapRequest, responseCh chan<- *mappb.MapResponse) (err error) {
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside map handler: %v %v", r, string(debug.Stack()))
err = status.Errorf(codes.Internal, "panic inside map handler: %v", r)
}
}()

Expand All @@ -154,7 +155,7 @@ func (fs *Service) handleRequest(ctx context.Context, req *mappb.MapRequest, res
select {
case responseCh <- resp:
case <-ctx.Done():
return nil
return ctx.Err()
}
return nil
}
4 changes: 2 additions & 2 deletions pkg/mapper/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func TestService_MapFn_Multiple_Messages(t *testing.T) {
func TestService_MapFn_Panic(t *testing.T) {
svc := &Service{
Mapper: MapperFunc(func(ctx context.Context, keys []string, datum Datum) Messages {
panic("transformer panicked")
panic("map failed")
}),
// panic in the transformer causes the server to send a shutdown signal to shutdownCh channel.
// The function that errgroup runs in a goroutine will be blocked until this shutdown signal is received somewhere else.
Expand Down Expand Up @@ -288,6 +288,6 @@ func TestService_MapFn_Panic(t *testing.T) {
_, err = stream.Recv()
require.Error(t, err, "Expected error while receiving message from the stream")
gotStatus, _ := status.FromError(err)
expectedStatus := status.Convert(status.Errorf(codes.Internal, "panic"))
expectedStatus := status.Convert(status.Errorf(codes.Internal, "error processing requests: rpc error: code = Internal desc = panic inside map handler: map failed"))
require.Equal(t, expectedStatus, gotStatus)
}
6 changes: 3 additions & 3 deletions pkg/sourcetransformer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (fs *Service) SourceTransformFn(stream v1.SourceTransform_SourceTransformFn
for {
select {
case <-grpCtx.Done():
return nil
return grpCtx.Err()
case resp := <-senderCh:
if err := stream.Send(resp); err != nil {
return fmt.Errorf("failed to send response to client: %w", err)
Expand All @@ -87,7 +87,7 @@ outer:
if err != nil {
log.Printf("failed to receive request: %v", err)
readErr = err
// read loop is not part of the errgroup, so we need to cancel the context
// read loop is not part of the error group, so we need to cancel the context
// to signal the other goroutines to stop processing.
cancel()
break outer
Expand Down Expand Up @@ -160,7 +160,7 @@ func (fs *Service) handleRequest(ctx context.Context, req *v1.SourceTransformReq
select {
case responseCh <- resp:
case <-ctx.Done():
return nil
return ctx.Err()
}
return nil
}

0 comments on commit 9179ac5

Please sign in to comment.