Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Oct 8, 2024
1 parent 9179ac5 commit b32d0e7
Show file tree
Hide file tree
Showing 17 changed files with 54 additions and 33 deletions.
1 change: 1 addition & 0 deletions pkg/mapper/examples/even_odd/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require github.com/numaproj/numaflow-go v0.8.1

require (
golang.org/x/net v0.29.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions pkg/mapper/examples/even_odd/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
Expand Down
1 change: 1 addition & 0 deletions pkg/mapper/examples/flatmap/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require github.com/numaproj/numaflow-go v0.8.1

require (
golang.org/x/net v0.29.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions pkg/mapper/examples/flatmap/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
Expand Down
1 change: 1 addition & 0 deletions pkg/mapper/examples/forward_message/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require github.com/numaproj/numaflow-go v0.8.1

require (
golang.org/x/net v0.29.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions pkg/mapper/examples/forward_message/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
Expand Down
1 change: 1 addition & 0 deletions pkg/mapper/examples/retry/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require github.com/numaproj/numaflow-go v0.8.1

require (
golang.org/x/net v0.29.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions pkg/mapper/examples/retry/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
Expand Down
1 change: 1 addition & 0 deletions pkg/mapper/examples/tickgen/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require github.com/numaproj/numaflow-go v0.8.1

require (
golang.org/x/net v0.29.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions pkg/mapper/examples/tickgen/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
Expand Down
30 changes: 15 additions & 15 deletions pkg/mapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,22 @@ outer:
case <-grpCtx.Done():
break outer
default:
req, err := stream.Recv()
if err == io.EOF {
break outer
}
if err != nil {
log.Printf("failed to receive request: %v", err)
readErr = err
// read loop is not part of the error group, so we need to cancel the context
// to signal the other goroutines to stop processing.
cancel()
break outer
}
g.Go(func() error {
return fs.handleRequest(grpCtx, req, responseCh)
})
}
req, err := stream.Recv()
if err == io.EOF {
break outer
}
if err != nil {
log.Printf("Failed to receive request: %v", err)
readErr = err
// read loop is not part of the error group, so we need to cancel the context
// to signal the other goroutines to stop processing.
cancel()
break outer
}
g.Go(func() error {
return fs.handleRequest(grpCtx, req, responseCh)
})
}

// wait for all goroutines to finish
Expand Down
6 changes: 3 additions & 3 deletions pkg/mapper/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestService_mapFn(t *testing.T) {
want *proto.MapResponse
}{
{
name: "sourceTransform_fn_forward_msg",
name: "map_fn_forward_msg",
handler: MapperFunc(func(ctx context.Context, keys []string, datum Datum) Messages {
msg := datum.Value()
return MessagesBuilder().Append(NewMessage(msg).WithKeys([]string{keys[0] + "_test"}))
Expand All @@ -106,7 +106,7 @@ func TestService_mapFn(t *testing.T) {
},
},
{
name: "sourceTransform_fn_forward_msg_forward_to_all",
name: "map_fn_forward_msg_forward_to_all",
handler: MapperFunc(func(ctx context.Context, keys []string, datum Datum) Messages {
msg := datum.Value()
return MessagesBuilder().Append(NewMessage(msg))
Expand All @@ -131,7 +131,7 @@ func TestService_mapFn(t *testing.T) {
},
},
{
name: "sourceTransform_fn_forward_msg_drop_msg",
name: "map_fn_forward_msg_drop_msg",
handler: MapperFunc(func(ctx context.Context, keys []string, datum Datum) Messages {
return MessagesBuilder().Append(MessageToDrop())
}),
Expand Down
1 change: 1 addition & 0 deletions pkg/sideinput/examples/map_sideinput/udf/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (

require (
golang.org/x/net v0.29.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions pkg/sideinput/examples/map_sideinput/udf/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
Expand Down
1 change: 1 addition & 0 deletions pkg/sideinput/examples/simple_sideinput/udf/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (

require (
golang.org/x/net v0.29.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions pkg/sideinput/examples/simple_sideinput/udf/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
Expand Down
30 changes: 15 additions & 15 deletions pkg/sourcetransformer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,22 +80,22 @@ outer:
case <-grpCtx.Done(): // Stop reading new messages when we are shutting down
break outer
default:
d, err := stream.Recv()
if err == io.EOF {
break outer
}
if err != nil {
log.Printf("failed to receive request: %v", err)
readErr = err
// read loop is not part of the error group, so we need to cancel the context
// to signal the other goroutines to stop processing.
cancel()
break outer
}
grp.Go(func() (err error) {
return fs.handleRequest(grpCtx, d, senderCh)
})
}
d, err := stream.Recv()
if err == io.EOF {
break outer
}
if err != nil {
log.Printf("Failed to receive request: %v", err)
readErr = err
// read loop is not part of the error group, so we need to cancel the context
// to signal the other goroutines to stop processing.
cancel()
break outer
}
grp.Go(func() (err error) {
return fs.handleRequest(grpCtx, d, senderCh)
})
}

// wait for all the goroutines to finish, if any of the goroutines return an error, wait will return that error immediately.
Expand Down

0 comments on commit b32d0e7

Please sign in to comment.