Skip to content

Commit

Permalink
send eot from response as well
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Oct 13, 2024
1 parent 4beb518 commit a150023
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 149 deletions.
293 changes: 153 additions & 140 deletions pkg/apis/proto/map/v1/map.pb.go

Large diffs are not rendered by default.

11 changes: 8 additions & 3 deletions pkg/apis/proto/map/v1/map.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ message MapRequest {
google.protobuf.Timestamp watermark = 4;
map<string, string> headers = 5;
}
message Status {
bool eot = 1;
}
Request request = 1;
// This ID is used to uniquely identify a map request
string id = 2;
Expand All @@ -44,6 +41,13 @@ message Handshake {
bool sot = 1;
}

/*
* Status message to indicate the status of the message.
*/
message Status {
bool eot = 1;
}

/**
* MapResponse represents a response element.
*/
Expand All @@ -57,6 +61,7 @@ message MapResponse {
// This ID is used to refer the responses to the request it corresponds to.
string id = 2;
optional Handshake handshake = 3;
optional Status status = 4;
}

/**
Expand Down
1 change: 0 additions & 1 deletion pkg/batchmapper/examples/batchmap_flatmap/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ replace github.com/numaproj/numaflow-go => ../../../..
require github.com/numaproj/numaflow-go v0.8.1

require (
go.uber.org/atomic v1.11.0 // indirect
golang.org/x/net v0.29.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.25.0 // indirect
Expand Down
2 changes: 0 additions & 2 deletions pkg/batchmapper/examples/batchmap_flatmap/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
Expand Down
24 changes: 22 additions & 2 deletions pkg/batchmapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*mappb.ReadyRespons
return &mappb.ReadyResponse{Ready: true}, nil
}

// BatchMapFn applies a user defined function to a stream of request elements and streams back the responses for them.
func (fs *Service) BatchMapFn(stream mappb.Map_MapFnServer) error {
// MapFn applies a user defined function to a stream of request elements and streams back the responses for them.
func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error {
ctx := stream.Context()

// Perform handshake before entering the main loop
Expand Down Expand Up @@ -153,10 +153,30 @@ func (fs *Service) processData(ctx context.Context, stream mappb.Map_MapFnServer
Results: elements,
Id: batchResp.Id(),
}
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if err := stream.Send(singleRequestResp); err != nil {
log.Println("BatchMapFn: Got an error while Send() on stream", err)
return err
}
}

select {
case <-ctx.Done():
return ctx.Err()
default:
// send the end of transmission message
eot := &mappb.MapResponse{
Status: &mappb.Status{
Eot: true,
},
}
if err := stream.Send(eot); err != nil {
log.Println("BatchMapFn: Got an error while Send() on stream", err)
}
}
return nil
}
18 changes: 17 additions & 1 deletion pkg/batchmapper/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ func TestService_BatchMapFn(t *testing.T) {
},
Id: "test2",
},
{
Request: &mappb.MapRequest_Request{},
Status: &mappb.Status{
Eot: true,
},
},
},
expected: []*mappb.MapResponse{
{
Expand All @@ -156,6 +162,11 @@ func TestService_BatchMapFn(t *testing.T) {
},
Id: "test2",
},
{
Status: &mappb.Status{
Eot: true,
},
},
},
expectedErr: false,
},
Expand Down Expand Up @@ -219,6 +230,11 @@ func TestService_BatchMapFn(t *testing.T) {
},
Id: "test2",
},
{
Status: &mappb.Status{
Eot: true,
},
},
},
expectedErr: true,
},
Expand Down Expand Up @@ -250,7 +266,7 @@ func TestService_BatchMapFn(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
err = fs.BatchMapFn(udfBatchMapFnStream)
err = fs.MapFn(udfBatchMapFnStream)
close(outputCh)
}()

Expand Down

0 comments on commit a150023

Please sign in to comment.