Skip to content

Commit

Permalink
rename to transmission status
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Oct 14, 2024
1 parent d393a61 commit e12c1b5
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 89 deletions.
164 changes: 83 additions & 81 deletions pkg/apis/proto/map/v1/map.pb.go

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions pkg/apis/proto/map/v1/map.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ message MapRequest {
// This ID is used to uniquely identify a map request
string id = 2;
optional Handshake handshake = 3;
optional Status status = 4;
optional TransmissionStatus status = 4;
}

/*
Expand All @@ -44,7 +44,7 @@ message Handshake {
/*
* Status message to indicate the status of the message.
*/
message Status {
message TransmissionStatus {
bool eot = 1;
}

Expand All @@ -61,7 +61,7 @@ message MapResponse {
// This ID is used to refer the responses to the request it corresponds to.
string id = 2;
optional Handshake handshake = 3;
optional Status status = 4;
optional TransmissionStatus status = 4;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion pkg/batchmapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (fs *Service) processData(ctx context.Context, stream mappb.Map_MapFnServer
default:
// send the end of transmission message
eot := &mappb.MapResponse{
Status: &mappb.Status{
Status: &mappb.TransmissionStatus{
Eot: true,
},
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/batchmapper/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func TestService_BatchMapFn(t *testing.T) {
},
{
Request: &mappb.MapRequest_Request{},
Status: &mappb.Status{
Status: &mappb.TransmissionStatus{
Eot: true,
},
},
Expand Down Expand Up @@ -163,7 +163,7 @@ func TestService_BatchMapFn(t *testing.T) {
Id: "test2",
},
{
Status: &mappb.Status{
Status: &mappb.TransmissionStatus{
Eot: true,
},
},
Expand Down Expand Up @@ -231,7 +231,7 @@ func TestService_BatchMapFn(t *testing.T) {
Id: "test2",
},
{
Status: &mappb.Status{
Status: &mappb.TransmissionStatus{
Eot: true,
},
},
Expand Down
1 change: 1 addition & 0 deletions pkg/mapstreamer/examples/flatmap_stream/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type FlatMap struct {
}

func (f *FlatMap) MapStream(ctx context.Context, keys []string, d mapstreamer.Datum, messageCh chan<- mapstreamer.Message) {
// we have to close to indicate the end of the stream, otherwise the client will wait forever.
defer close(messageCh)
msg := d.Value()
_ = d.EventTime() // Event time is available
Expand Down
2 changes: 1 addition & 1 deletion pkg/mapstreamer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (fs *Service) writeResponseToClient(ctx context.Context, stream mappb.Map_M
if !ok {
// Send EOT message since we are done processing the request.
eotMessage := &mappb.MapResponse{
Status: &mappb.Status{Eot: true},
Status: &mappb.TransmissionStatus{Eot: true},
Id: reqID,
}
if err := stream.Send(eotMessage); err != nil {
Expand Down
5 changes: 5 additions & 0 deletions pkg/mapstreamer/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func TestService_MapFn(t *testing.T) {
{
name: "map_fn_forward_msg",
handler: MapStreamerFunc(func(ctx context.Context, keys []string, datum Datum, messageCh chan<- Message) {
defer close(messageCh)
msg := datum.Value()
messageCh <- NewMessage(msg).WithKeys([]string{keys[0] + "_test"})
}),
Expand All @@ -108,6 +109,7 @@ func TestService_MapFn(t *testing.T) {
{
name: "map_fn_forward_msg_forward_to_all",
handler: MapStreamerFunc(func(ctx context.Context, keys []string, datum Datum, messageCh chan<- Message) {
defer close(messageCh)
msg := datum.Value()
messageCh <- NewMessage(msg)
}),
Expand All @@ -133,6 +135,7 @@ func TestService_MapFn(t *testing.T) {
{
name: "map_fn_forward_msg_drop_msg",
handler: MapStreamerFunc(func(ctx context.Context, keys []string, datum Datum, messageCh chan<- Message) {
defer close(messageCh)
messageCh <- MessageToDrop()
}),
args: args{
Expand Down Expand Up @@ -204,6 +207,7 @@ func doHandshake(t *testing.T, stream proto.Map_MapFnClient) {
func TestService_MapFn_SingleMessage_MultipleResponses(t *testing.T) {
svc := &Service{
MapperStream: MapStreamerFunc(func(ctx context.Context, keys []string, datum Datum, messageCh chan<- Message) {
defer close(messageCh)
for i := 0; i < 10; i++ {
msg := fmt.Sprintf("response_%d", i)
messageCh <- NewMessage([]byte(msg)).WithKeys([]string{keys[0] + "_test"})
Expand Down Expand Up @@ -257,6 +261,7 @@ func TestService_MapFn_SingleMessage_MultipleResponses(t *testing.T) {
func TestService_MapFn_Multiple_Messages(t *testing.T) {
svc := &Service{
MapperStream: MapStreamerFunc(func(ctx context.Context, keys []string, datum Datum, messageCh chan<- Message) {
defer close(messageCh)
msg := datum.Value()
messageCh <- NewMessage(msg).WithKeys([]string{keys[0] + "_test"})
}),
Expand Down

0 comments on commit e12c1b5

Please sign in to comment.