diff --git a/pkg/apis/proto/map/v1/map.pb.go b/pkg/apis/proto/map/v1/map.pb.go index d47de582..c7708359 100644 --- a/pkg/apis/proto/map/v1/map.pb.go +++ b/pkg/apis/proto/map/v1/map.pb.go @@ -29,11 +29,10 @@ type MapRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"` - Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` - EventTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=event_time,json=eventTime,proto3" json:"event_time,omitempty"` - Watermark *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=watermark,proto3" json:"watermark,omitempty"` - Headers map[string]string `protobuf:"bytes,5,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + Request *MapRequest_Request `protobuf:"bytes,1,opt,name=request,proto3" json:"request,omitempty"` + // This ID is used to uniquely identify a map request + Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"` + Handshake *Handshake `protobuf:"bytes,3,opt,name=handshake,proto3,oneof" json:"handshake,omitempty"` } func (x *MapRequest) Reset() { @@ -68,39 +67,74 @@ func (*MapRequest) Descriptor() ([]byte, []int) { return file_pkg_apis_proto_map_v1_map_proto_rawDescGZIP(), []int{0} } -func (x *MapRequest) GetKeys() []string { +func (x *MapRequest) GetRequest() *MapRequest_Request { if x != nil { - return x.Keys + return x.Request } return nil } -func (x *MapRequest) GetValue() []byte { +func (x *MapRequest) GetId() string { if x != nil { - return x.Value + return x.Id } - return nil + return "" } -func (x *MapRequest) GetEventTime() *timestamppb.Timestamp { +func (x *MapRequest) GetHandshake() *Handshake { if x != nil { - return x.EventTime + return x.Handshake } return nil } -func (x *MapRequest) GetWatermark() *timestamppb.Timestamp { - if x != nil { - return x.Watermark +// Handshake message between client and server to indicate the start of transmission. +type Handshake struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Required field indicating the start of transmission. + Sot bool `protobuf:"varint,1,opt,name=sot,proto3" json:"sot,omitempty"` +} + +func (x *Handshake) Reset() { + *x = Handshake{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } - return nil } -func (x *MapRequest) GetHeaders() map[string]string { +func (x *Handshake) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Handshake) ProtoMessage() {} + +func (x *Handshake) ProtoReflect() protoreflect.Message { + mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Handshake.ProtoReflect.Descriptor instead. +func (*Handshake) Descriptor() ([]byte, []int) { + return file_pkg_apis_proto_map_v1_map_proto_rawDescGZIP(), []int{1} +} + +func (x *Handshake) GetSot() bool { if x != nil { - return x.Headers + return x.Sot } - return nil + return false } // * @@ -111,12 +145,15 @@ type MapResponse struct { unknownFields protoimpl.UnknownFields Results []*MapResponse_Result `protobuf:"bytes,1,rep,name=results,proto3" json:"results,omitempty"` + // This ID is used to refer the responses to the request it corresponds to. + Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"` + Handshake *Handshake `protobuf:"bytes,3,opt,name=handshake,proto3,oneof" json:"handshake,omitempty"` } func (x *MapResponse) Reset() { *x = MapResponse{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[1] + mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -129,7 +166,7 @@ func (x *MapResponse) String() string { func (*MapResponse) ProtoMessage() {} func (x *MapResponse) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[1] + mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -142,7 +179,7 @@ func (x *MapResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use MapResponse.ProtoReflect.Descriptor instead. func (*MapResponse) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_map_v1_map_proto_rawDescGZIP(), []int{1} + return file_pkg_apis_proto_map_v1_map_proto_rawDescGZIP(), []int{2} } func (x *MapResponse) GetResults() []*MapResponse_Result { @@ -152,6 +189,20 @@ func (x *MapResponse) GetResults() []*MapResponse_Result { return nil } +func (x *MapResponse) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *MapResponse) GetHandshake() *Handshake { + if x != nil { + return x.Handshake + } + return nil +} + // * // ReadyResponse is the health check result. type ReadyResponse struct { @@ -165,7 +216,7 @@ type ReadyResponse struct { func (x *ReadyResponse) Reset() { *x = ReadyResponse{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[2] + mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -178,7 +229,7 @@ func (x *ReadyResponse) String() string { func (*ReadyResponse) ProtoMessage() {} func (x *ReadyResponse) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[2] + mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -191,7 +242,7 @@ func (x *ReadyResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use ReadyResponse.ProtoReflect.Descriptor instead. func (*ReadyResponse) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_map_v1_map_proto_rawDescGZIP(), []int{2} + return file_pkg_apis_proto_map_v1_map_proto_rawDescGZIP(), []int{3} } func (x *ReadyResponse) GetReady() bool { @@ -201,6 +252,85 @@ func (x *ReadyResponse) GetReady() bool { return false } +type MapRequest_Request struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"` + Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + EventTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=event_time,json=eventTime,proto3" json:"event_time,omitempty"` + Watermark *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=watermark,proto3" json:"watermark,omitempty"` + Headers map[string]string `protobuf:"bytes,5,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` +} + +func (x *MapRequest_Request) Reset() { + *x = MapRequest_Request{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MapRequest_Request) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MapRequest_Request) ProtoMessage() {} + +func (x *MapRequest_Request) ProtoReflect() protoreflect.Message { + mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MapRequest_Request.ProtoReflect.Descriptor instead. +func (*MapRequest_Request) Descriptor() ([]byte, []int) { + return file_pkg_apis_proto_map_v1_map_proto_rawDescGZIP(), []int{0, 0} +} + +func (x *MapRequest_Request) GetKeys() []string { + if x != nil { + return x.Keys + } + return nil +} + +func (x *MapRequest_Request) GetValue() []byte { + if x != nil { + return x.Value + } + return nil +} + +func (x *MapRequest_Request) GetEventTime() *timestamppb.Timestamp { + if x != nil { + return x.EventTime + } + return nil +} + +func (x *MapRequest_Request) GetWatermark() *timestamppb.Timestamp { + if x != nil { + return x.Watermark + } + return nil +} + +func (x *MapRequest_Request) GetHeaders() map[string]string { + if x != nil { + return x.Headers + } + return nil +} + type MapResponse_Result struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -214,7 +344,7 @@ type MapResponse_Result struct { func (x *MapResponse_Result) Reset() { *x = MapResponse_Result{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[4] + mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -227,7 +357,7 @@ func (x *MapResponse_Result) String() string { func (*MapResponse_Result) ProtoMessage() {} func (x *MapResponse_Result) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[4] + mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -240,7 +370,7 @@ func (x *MapResponse_Result) ProtoReflect() protoreflect.Message { // Deprecated: Use MapResponse_Result.ProtoReflect.Descriptor instead. func (*MapResponse_Result) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_map_v1_map_proto_rawDescGZIP(), []int{1, 0} + return file_pkg_apis_proto_map_v1_map_proto_rawDescGZIP(), []int{2, 0} } func (x *MapResponse_Result) GetKeys() []string { @@ -273,48 +403,65 @@ var file_pkg_apis_proto_map_v1_map_proto_rawDesc = []byte{ 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, - 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xa2, 0x02, 0x0a, 0x0a, 0x4d, 0x61, 0x70, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x18, 0x01, - 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, - 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, - 0x12, 0x39, 0x0a, 0x0a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x03, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, - 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x38, 0x0a, 0x09, 0x77, - 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, - 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, - 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x77, 0x61, 0x74, 0x65, - 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x12, 0x39, 0x0a, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, - 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, - 0x4d, 0x61, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, + 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xc0, 0x03, 0x0a, 0x0a, 0x4d, 0x61, 0x70, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x34, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, + 0x2e, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x52, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, + 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x34, 0x0a, 0x09, + 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x11, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, + 0x6b, 0x65, 0x48, 0x00, 0x52, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x88, + 0x01, 0x01, 0x1a, 0xa7, 0x02, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, + 0x0a, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, + 0x79, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x65, 0x76, 0x65, 0x6e, + 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, + 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, + 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, + 0x69, 0x6d, 0x65, 0x12, 0x38, 0x0a, 0x09, 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, + 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, + 0x6d, 0x70, 0x52, 0x09, 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x12, 0x41, 0x0a, + 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x27, + 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x1a, 0x3a, 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x8b, 0x01, 0x0a, - 0x0b, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x34, 0x0a, 0x07, - 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1a, 0x2e, - 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, - 0x74, 0x73, 0x1a, 0x46, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x12, 0x0a, 0x04, - 0x6b, 0x65, 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, 0x79, 0x73, - 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, - 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x03, - 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x22, 0x25, 0x0a, 0x0d, 0x52, 0x65, - 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, - 0x65, 0x61, 0x64, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, - 0x79, 0x32, 0x71, 0x0a, 0x03, 0x4d, 0x61, 0x70, 0x12, 0x30, 0x0a, 0x05, 0x4d, 0x61, 0x70, 0x46, - 0x6e, 0x12, 0x12, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x4d, - 0x61, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x38, 0x0a, 0x07, 0x49, 0x73, - 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x15, 0x2e, - 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x37, 0x5a, 0x35, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, - 0x6f, 0x6d, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2f, 0x6e, 0x75, 0x6d, 0x61, - 0x66, 0x6c, 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, - 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6d, 0x61, 0x70, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x42, 0x0c, 0x0a, 0x0a, + 0x5f, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x22, 0x1d, 0x0a, 0x09, 0x48, 0x61, + 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x73, 0x6f, 0x74, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x08, 0x52, 0x03, 0x73, 0x6f, 0x74, 0x22, 0xdf, 0x01, 0x0a, 0x0b, 0x4d, 0x61, + 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x34, 0x0a, 0x07, 0x72, 0x65, 0x73, + 0x75, 0x6c, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x6d, 0x61, 0x70, + 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, + 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x12, + 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, + 0x34, 0x0a, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x61, 0x6e, 0x64, + 0x73, 0x68, 0x61, 0x6b, 0x65, 0x48, 0x00, 0x52, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, + 0x6b, 0x65, 0x88, 0x01, 0x01, 0x1a, 0x46, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, + 0x12, 0x0a, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, + 0x65, 0x79, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x61, 0x67, + 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x42, 0x0c, 0x0a, + 0x0a, 0x5f, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x22, 0x25, 0x0a, 0x0d, 0x52, + 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, + 0x72, 0x65, 0x61, 0x64, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, + 0x64, 0x79, 0x32, 0x75, 0x0a, 0x03, 0x4d, 0x61, 0x70, 0x12, 0x34, 0x0a, 0x05, 0x4d, 0x61, 0x70, + 0x46, 0x6e, 0x12, 0x12, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, + 0x4d, 0x61, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, + 0x38, 0x0a, 0x07, 0x49, 0x73, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, + 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, + 0x74, 0x79, 0x1a, 0x15, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, + 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x37, 0x5a, 0x35, 0x67, 0x69, 0x74, + 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, + 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x66, 0x6c, 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, + 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6d, 0x61, 0x70, 0x2f, + 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -329,30 +476,35 @@ func file_pkg_apis_proto_map_v1_map_proto_rawDescGZIP() []byte { return file_pkg_apis_proto_map_v1_map_proto_rawDescData } -var file_pkg_apis_proto_map_v1_map_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_pkg_apis_proto_map_v1_map_proto_msgTypes = make([]protoimpl.MessageInfo, 7) var file_pkg_apis_proto_map_v1_map_proto_goTypes = []any{ (*MapRequest)(nil), // 0: map.v1.MapRequest - (*MapResponse)(nil), // 1: map.v1.MapResponse - (*ReadyResponse)(nil), // 2: map.v1.ReadyResponse - nil, // 3: map.v1.MapRequest.HeadersEntry - (*MapResponse_Result)(nil), // 4: map.v1.MapResponse.Result - (*timestamppb.Timestamp)(nil), // 5: google.protobuf.Timestamp - (*emptypb.Empty)(nil), // 6: google.protobuf.Empty + (*Handshake)(nil), // 1: map.v1.Handshake + (*MapResponse)(nil), // 2: map.v1.MapResponse + (*ReadyResponse)(nil), // 3: map.v1.ReadyResponse + (*MapRequest_Request)(nil), // 4: map.v1.MapRequest.Request + nil, // 5: map.v1.MapRequest.Request.HeadersEntry + (*MapResponse_Result)(nil), // 6: map.v1.MapResponse.Result + (*timestamppb.Timestamp)(nil), // 7: google.protobuf.Timestamp + (*emptypb.Empty)(nil), // 8: google.protobuf.Empty } var file_pkg_apis_proto_map_v1_map_proto_depIdxs = []int32{ - 5, // 0: map.v1.MapRequest.event_time:type_name -> google.protobuf.Timestamp - 5, // 1: map.v1.MapRequest.watermark:type_name -> google.protobuf.Timestamp - 3, // 2: map.v1.MapRequest.headers:type_name -> map.v1.MapRequest.HeadersEntry - 4, // 3: map.v1.MapResponse.results:type_name -> map.v1.MapResponse.Result - 0, // 4: map.v1.Map.MapFn:input_type -> map.v1.MapRequest - 6, // 5: map.v1.Map.IsReady:input_type -> google.protobuf.Empty - 1, // 6: map.v1.Map.MapFn:output_type -> map.v1.MapResponse - 2, // 7: map.v1.Map.IsReady:output_type -> map.v1.ReadyResponse - 6, // [6:8] is the sub-list for method output_type - 4, // [4:6] is the sub-list for method input_type - 4, // [4:4] is the sub-list for extension type_name - 4, // [4:4] is the sub-list for extension extendee - 0, // [0:4] is the sub-list for field type_name + 4, // 0: map.v1.MapRequest.request:type_name -> map.v1.MapRequest.Request + 1, // 1: map.v1.MapRequest.handshake:type_name -> map.v1.Handshake + 6, // 2: map.v1.MapResponse.results:type_name -> map.v1.MapResponse.Result + 1, // 3: map.v1.MapResponse.handshake:type_name -> map.v1.Handshake + 7, // 4: map.v1.MapRequest.Request.event_time:type_name -> google.protobuf.Timestamp + 7, // 5: map.v1.MapRequest.Request.watermark:type_name -> google.protobuf.Timestamp + 5, // 6: map.v1.MapRequest.Request.headers:type_name -> map.v1.MapRequest.Request.HeadersEntry + 0, // 7: map.v1.Map.MapFn:input_type -> map.v1.MapRequest + 8, // 8: map.v1.Map.IsReady:input_type -> google.protobuf.Empty + 2, // 9: map.v1.Map.MapFn:output_type -> map.v1.MapResponse + 3, // 10: map.v1.Map.IsReady:output_type -> map.v1.ReadyResponse + 9, // [9:11] is the sub-list for method output_type + 7, // [7:9] is the sub-list for method input_type + 7, // [7:7] is the sub-list for extension type_name + 7, // [7:7] is the sub-list for extension extendee + 0, // [0:7] is the sub-list for field type_name } func init() { file_pkg_apis_proto_map_v1_map_proto_init() } @@ -374,7 +526,7 @@ func file_pkg_apis_proto_map_v1_map_proto_init() { } } file_pkg_apis_proto_map_v1_map_proto_msgTypes[1].Exporter = func(v any, i int) any { - switch v := v.(*MapResponse); i { + switch v := v.(*Handshake); i { case 0: return &v.state case 1: @@ -386,6 +538,18 @@ func file_pkg_apis_proto_map_v1_map_proto_init() { } } file_pkg_apis_proto_map_v1_map_proto_msgTypes[2].Exporter = func(v any, i int) any { + switch v := v.(*MapResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_apis_proto_map_v1_map_proto_msgTypes[3].Exporter = func(v any, i int) any { switch v := v.(*ReadyResponse); i { case 0: return &v.state @@ -398,6 +562,18 @@ func file_pkg_apis_proto_map_v1_map_proto_init() { } } file_pkg_apis_proto_map_v1_map_proto_msgTypes[4].Exporter = func(v any, i int) any { + switch v := v.(*MapRequest_Request); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_apis_proto_map_v1_map_proto_msgTypes[6].Exporter = func(v any, i int) any { switch v := v.(*MapResponse_Result); i { case 0: return &v.state @@ -410,13 +586,15 @@ func file_pkg_apis_proto_map_v1_map_proto_init() { } } } + file_pkg_apis_proto_map_v1_map_proto_msgTypes[0].OneofWrappers = []any{} + file_pkg_apis_proto_map_v1_map_proto_msgTypes[2].OneofWrappers = []any{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_pkg_apis_proto_map_v1_map_proto_rawDesc, NumEnums: 0, - NumMessages: 5, + NumMessages: 7, NumExtensions: 0, NumServices: 1, }, diff --git a/pkg/apis/proto/map/v1/map.proto b/pkg/apis/proto/map/v1/map.proto index e93a9b01..76f8788d 100644 --- a/pkg/apis/proto/map/v1/map.proto +++ b/pkg/apis/proto/map/v1/map.proto @@ -9,7 +9,7 @@ package map.v1; service Map { // MapFn applies a function to each map request element. - rpc MapFn(MapRequest) returns (MapResponse); + rpc MapFn(stream MapRequest) returns (stream MapResponse); // IsReady is the heartbeat endpoint for gRPC. rpc IsReady(google.protobuf.Empty) returns (ReadyResponse); @@ -19,11 +19,25 @@ service Map { * MapRequest represents a request element. */ message MapRequest { - repeated string keys = 1; - bytes value = 2; - google.protobuf.Timestamp event_time = 3; - google.protobuf.Timestamp watermark = 4; - map headers = 5; + message Request { + repeated string keys = 1; + bytes value = 2; + google.protobuf.Timestamp event_time = 3; + google.protobuf.Timestamp watermark = 4; + map headers = 5; + } + Request request = 1; + // This ID is used to uniquely identify a map request + string id = 2; + optional Handshake handshake = 3; +} + +/* + * Handshake message between client and server to indicate the start of transmission. + */ +message Handshake { + // Required field indicating the start of transmission. + bool sot = 1; } /** @@ -36,6 +50,9 @@ message MapResponse { repeated string tags = 3; } repeated Result results = 1; + // This ID is used to refer the responses to the request it corresponds to. + string id = 2; + optional Handshake handshake = 3; } /** diff --git a/pkg/apis/proto/map/v1/map_grpc.pb.go b/pkg/apis/proto/map/v1/map_grpc.pb.go index d8552f56..0ee687b7 100644 --- a/pkg/apis/proto/map/v1/map_grpc.pb.go +++ b/pkg/apis/proto/map/v1/map_grpc.pb.go @@ -29,7 +29,7 @@ const ( // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type MapClient interface { // MapFn applies a function to each map request element. - MapFn(ctx context.Context, in *MapRequest, opts ...grpc.CallOption) (*MapResponse, error) + MapFn(ctx context.Context, opts ...grpc.CallOption) (Map_MapFnClient, error) // IsReady is the heartbeat endpoint for gRPC. IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) } @@ -42,14 +42,36 @@ func NewMapClient(cc grpc.ClientConnInterface) MapClient { return &mapClient{cc} } -func (c *mapClient) MapFn(ctx context.Context, in *MapRequest, opts ...grpc.CallOption) (*MapResponse, error) { +func (c *mapClient) MapFn(ctx context.Context, opts ...grpc.CallOption) (Map_MapFnClient, error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - out := new(MapResponse) - err := c.cc.Invoke(ctx, Map_MapFn_FullMethodName, in, out, cOpts...) + stream, err := c.cc.NewStream(ctx, &Map_ServiceDesc.Streams[0], Map_MapFn_FullMethodName, cOpts...) if err != nil { return nil, err } - return out, nil + x := &mapMapFnClient{ClientStream: stream} + return x, nil +} + +type Map_MapFnClient interface { + Send(*MapRequest) error + Recv() (*MapResponse, error) + grpc.ClientStream +} + +type mapMapFnClient struct { + grpc.ClientStream +} + +func (x *mapMapFnClient) Send(m *MapRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *mapMapFnClient) Recv() (*MapResponse, error) { + m := new(MapResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil } func (c *mapClient) IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) { @@ -67,7 +89,7 @@ func (c *mapClient) IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc // for forward compatibility type MapServer interface { // MapFn applies a function to each map request element. - MapFn(context.Context, *MapRequest) (*MapResponse, error) + MapFn(Map_MapFnServer) error // IsReady is the heartbeat endpoint for gRPC. IsReady(context.Context, *emptypb.Empty) (*ReadyResponse, error) mustEmbedUnimplementedMapServer() @@ -77,8 +99,8 @@ type MapServer interface { type UnimplementedMapServer struct { } -func (UnimplementedMapServer) MapFn(context.Context, *MapRequest) (*MapResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method MapFn not implemented") +func (UnimplementedMapServer) MapFn(Map_MapFnServer) error { + return status.Errorf(codes.Unimplemented, "method MapFn not implemented") } func (UnimplementedMapServer) IsReady(context.Context, *emptypb.Empty) (*ReadyResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method IsReady not implemented") @@ -96,22 +118,30 @@ func RegisterMapServer(s grpc.ServiceRegistrar, srv MapServer) { s.RegisterService(&Map_ServiceDesc, srv) } -func _Map_MapFn_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(MapRequest) - if err := dec(in); err != nil { +func _Map_MapFn_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(MapServer).MapFn(&mapMapFnServer{ServerStream: stream}) +} + +type Map_MapFnServer interface { + Send(*MapResponse) error + Recv() (*MapRequest, error) + grpc.ServerStream +} + +type mapMapFnServer struct { + grpc.ServerStream +} + +func (x *mapMapFnServer) Send(m *MapResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *mapMapFnServer) Recv() (*MapRequest, error) { + m := new(MapRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { return nil, err } - if interceptor == nil { - return srv.(MapServer).MapFn(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: Map_MapFn_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(MapServer).MapFn(ctx, req.(*MapRequest)) - } - return interceptor(ctx, in, info, handler) + return m, nil } func _Map_IsReady_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { @@ -139,15 +169,18 @@ var Map_ServiceDesc = grpc.ServiceDesc{ ServiceName: "map.v1.Map", HandlerType: (*MapServer)(nil), Methods: []grpc.MethodDesc{ - { - MethodName: "MapFn", - Handler: _Map_MapFn_Handler, - }, { MethodName: "IsReady", Handler: _Map_IsReady_Handler, }, }, - Streams: []grpc.StreamDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "MapFn", + Handler: _Map_MapFn_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, Metadata: "pkg/apis/proto/map/v1/map.proto", } diff --git a/pkg/mapper/examples/even_odd/go.mod b/pkg/mapper/examples/even_odd/go.mod index 3c53ab1b..5d4b69a8 100644 --- a/pkg/mapper/examples/even_odd/go.mod +++ b/pkg/mapper/examples/even_odd/go.mod @@ -8,6 +8,7 @@ require github.com/numaproj/numaflow-go v0.8.1 require ( golang.org/x/net v0.29.0 // indirect + golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.18.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect diff --git a/pkg/mapper/examples/even_odd/go.sum b/pkg/mapper/examples/even_odd/go.sum index 09e06a2c..36997a49 100644 --- a/pkg/mapper/examples/even_odd/go.sum +++ b/pkg/mapper/examples/even_odd/go.sum @@ -8,6 +8,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= diff --git a/pkg/mapper/examples/flatmap/go.mod b/pkg/mapper/examples/flatmap/go.mod index 24b28bb1..e4cdd7b8 100644 --- a/pkg/mapper/examples/flatmap/go.mod +++ b/pkg/mapper/examples/flatmap/go.mod @@ -8,6 +8,7 @@ require github.com/numaproj/numaflow-go v0.8.1 require ( golang.org/x/net v0.29.0 // indirect + golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.18.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect diff --git a/pkg/mapper/examples/flatmap/go.sum b/pkg/mapper/examples/flatmap/go.sum index 09e06a2c..36997a49 100644 --- a/pkg/mapper/examples/flatmap/go.sum +++ b/pkg/mapper/examples/flatmap/go.sum @@ -8,6 +8,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= diff --git a/pkg/mapper/examples/forward_message/go.mod b/pkg/mapper/examples/forward_message/go.mod index 2b0abb6d..c1dc3be7 100644 --- a/pkg/mapper/examples/forward_message/go.mod +++ b/pkg/mapper/examples/forward_message/go.mod @@ -8,6 +8,7 @@ require github.com/numaproj/numaflow-go v0.8.1 require ( golang.org/x/net v0.29.0 // indirect + golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.18.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect diff --git a/pkg/mapper/examples/forward_message/go.sum b/pkg/mapper/examples/forward_message/go.sum index 09e06a2c..36997a49 100644 --- a/pkg/mapper/examples/forward_message/go.sum +++ b/pkg/mapper/examples/forward_message/go.sum @@ -8,6 +8,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= diff --git a/pkg/mapper/examples/retry/go.mod b/pkg/mapper/examples/retry/go.mod index 9612714e..41a9d813 100644 --- a/pkg/mapper/examples/retry/go.mod +++ b/pkg/mapper/examples/retry/go.mod @@ -8,6 +8,7 @@ require github.com/numaproj/numaflow-go v0.8.1 require ( golang.org/x/net v0.29.0 // indirect + golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.18.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect diff --git a/pkg/mapper/examples/retry/go.sum b/pkg/mapper/examples/retry/go.sum index 09e06a2c..36997a49 100644 --- a/pkg/mapper/examples/retry/go.sum +++ b/pkg/mapper/examples/retry/go.sum @@ -8,6 +8,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= diff --git a/pkg/mapper/examples/tickgen/go.mod b/pkg/mapper/examples/tickgen/go.mod index 0be651a1..988c6b88 100644 --- a/pkg/mapper/examples/tickgen/go.mod +++ b/pkg/mapper/examples/tickgen/go.mod @@ -8,6 +8,7 @@ require github.com/numaproj/numaflow-go v0.8.1 require ( golang.org/x/net v0.29.0 // indirect + golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.18.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect diff --git a/pkg/mapper/examples/tickgen/go.sum b/pkg/mapper/examples/tickgen/go.sum index 09e06a2c..36997a49 100644 --- a/pkg/mapper/examples/tickgen/go.sum +++ b/pkg/mapper/examples/tickgen/go.sum @@ -8,6 +8,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= diff --git a/pkg/mapper/server_test.go b/pkg/mapper/server_test.go deleted file mode 100644 index baa94d4a..00000000 --- a/pkg/mapper/server_test.go +++ /dev/null @@ -1,116 +0,0 @@ -package mapper - -import ( - "context" - "os" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - - mappb "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1" -) - -func TestMapServer_Start(t *testing.T) { - socketFile, _ := os.CreateTemp("/tmp", "numaflow-test.sock") - defer func() { - _ = os.RemoveAll(socketFile.Name()) - }() - - serverInfoFile, _ := os.CreateTemp("/tmp", "numaflow-test-info") - defer func() { - _ = os.RemoveAll(serverInfoFile.Name()) - }() - - var mapHandler = MapperFunc(func(ctx context.Context, keys []string, d Datum) Messages { - msg := d.Value() - return MessagesBuilder().Append(NewMessage(msg).WithKeys([]string{keys[0] + "_test"})) - }) - // note: using actual uds connection - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - err := NewServer(mapHandler, WithSockAddr(socketFile.Name()), WithServerInfoFilePath(serverInfoFile.Name())).Start(ctx) - assert.NoError(t, err) -} - -// tests the case where the server is shutdown gracefully when a panic occurs in the map handler -func TestMapServer_GracefulShutdown(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - - dir := t.TempDir() - socketFile, _ := os.Create(dir + "/test.sock") - defer func() { - _ = os.RemoveAll(socketFile.Name()) - }() - - serverInfoFile, _ := os.Create(dir + "/numaflow-test-info") - defer func() { - _ = os.RemoveAll(serverInfoFile.Name()) - }() - - var mapHandler = MapperFunc(func(ctx context.Context, keys []string, d Datum) Messages { - msg := d.Value() - if keys[0] == "key2" { - time.Sleep(20 * time.Millisecond) - panic("panic test") - } - time.Sleep(50 * time.Millisecond) - return MessagesBuilder().Append(NewMessage(msg).WithKeys([]string{keys[0] + "_test"})) - }) - - done := make(chan struct{}) - go func() { - err := NewServer(mapHandler, WithSockAddr(socketFile.Name()), WithServerInfoFilePath(socketFile.Name())).Start(ctx) - assert.NoError(t, err) - close(done) - }() - - // wait for the server to start - time.Sleep(10 * time.Millisecond) - - // create a client - conn, err := grpc.NewClient( - "unix://"+socketFile.Name(), - grpc.WithTransportCredentials(insecure.NewCredentials()), - ) - if err != nil { - t.Fatalf("Failed to dial server: %v", err) - } - defer conn.Close() - - client := mappb.NewMapClient(conn) - // send two map requests with key1 and key2 as keys simultaneously - keys := []string{"key1", "key2"} - var wg sync.WaitGroup - for _, key := range keys { - wg.Add(1) - go func(key string) { - defer wg.Done() - req := &mappb.MapRequest{ - Keys: []string{key}, - } - - resp, err := client.MapFn(ctx, req) - // since there is a panic in the map handler for key2, we should get an error - // other requests should be successful - if key == "key2" { - assert.Error(t, err) - } else { - assert.NoError(t, err) - assert.NotNil(t, resp) - } - }(key) - } - - wg.Wait() - // wait for the server to shutdown gracefully because of the panic - select { - case <-ctx.Done(): - t.Fatal("server did not shutdown gracefully") - case <-done: - } -} diff --git a/pkg/mapper/service.go b/pkg/mapper/service.go index 235e372b..e9a537de 100644 --- a/pkg/mapper/service.go +++ b/pkg/mapper/service.go @@ -2,9 +2,12 @@ package mapper import ( "context" + "fmt" + "io" "log" "runtime/debug" + "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" @@ -33,20 +36,111 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*mappb.ReadyRespons } // MapFn applies a user defined function to each request element and returns a list of results. -func (fs *Service) MapFn(ctx context.Context, d *mappb.MapRequest) (_ *mappb.MapResponse, err error) { - var hd = NewHandlerDatum(d.GetValue(), d.GetEventTime().AsTime(), d.GetWatermark().AsTime(), d.GetHeaders()) - var elements []*mappb.MapResponse_Result +func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error { + // perform handshake with client before processing requests + if err := fs.performHandshake(stream); err != nil { + return err + } + + ctx, cancel := context.WithCancel(stream.Context()) + defer cancel() + + // Use error group to manage goroutines, the grpCtx is cancelled when any of the + // goroutines return an error for the first time or the first time the wait returns. + g, grpCtx := errgroup.WithContext(ctx) + + // Channel to collect responses + responseCh := make(chan *mappb.MapResponse, 500) // FIXME: identify the right buffer size + defer close(responseCh) + + // Dedicated goroutine to send responses to the stream + g.Go(func() error { + for { + select { + case resp := <-responseCh: + if err := stream.Send(resp); err != nil { + log.Printf("failed to send response: %v", err) + return err + } + case <-grpCtx.Done(): + return grpCtx.Err() + } + } + }) - // Use defer and recover to handle panic + var readErr error + // Read requests from the stream and process them +outer: + for { + select { + case <-grpCtx.Done(): + break outer + default: + } + req, err := stream.Recv() + if err == io.EOF { + break outer + } + if err != nil { + log.Printf("Failed to receive request: %v", err) + readErr = err + // read loop is not part of the error group, so we need to cancel the context + // to signal the other goroutines to stop processing. + cancel() + break outer + } + g.Go(func() error { + return fs.handleRequest(grpCtx, req, responseCh) + }) + } + + // wait for all goroutines to finish + if err := g.Wait(); err != nil { + fs.shutdownCh <- struct{}{} + return status.Errorf(codes.Internal, "error processing requests: %v", err) + } + + // check if there was an error while reading from the stream + if readErr != nil { + return status.Errorf(codes.Internal, readErr.Error()) + } + + return nil +} + +// performHandshake handles the handshake logic at the start of the stream. +func (fs *Service) performHandshake(stream mappb.Map_MapFnServer) error { + req, err := stream.Recv() + if err != nil { + return status.Errorf(codes.Internal, "failed to receive handshake: %v", err) + } + if req.GetHandshake() == nil || !req.GetHandshake().GetSot() { + return status.Errorf(codes.InvalidArgument, "invalid handshake") + } + handshakeResponse := &mappb.MapResponse{ + Handshake: &mappb.Handshake{ + Sot: true, + }, + } + if err := stream.Send(handshakeResponse); err != nil { + return fmt.Errorf("sending handshake response to client over gRPC stream: %w", err) + } + return nil +} + +// handleRequest processes each request and sends the response to the response channel. +func (fs *Service) handleRequest(ctx context.Context, req *mappb.MapRequest, responseCh chan<- *mappb.MapResponse) (err error) { defer func() { if r := recover(); r != nil { log.Printf("panic inside map handler: %v %v", r, string(debug.Stack())) - fs.shutdownCh <- struct{}{} // Send shutdown signal - err = status.Errorf(codes.Internal, "panic occurred in Mapper.Map: %v", r) + err = status.Errorf(codes.Internal, "panic inside map handler: %v", r) } }() - messages := fs.Mapper.Map(ctx, d.GetKeys(), hd) + request := req.GetRequest() + hd := NewHandlerDatum(request.GetValue(), request.GetEventTime().AsTime(), request.GetWatermark().AsTime(), request.GetHeaders()) + messages := fs.Mapper.Map(ctx, request.GetKeys(), hd) + var elements []*mappb.MapResponse_Result for _, m := range messages.Items() { elements = append(elements, &mappb.MapResponse_Result{ Keys: m.Keys(), @@ -54,8 +148,14 @@ func (fs *Service) MapFn(ctx context.Context, d *mappb.MapRequest) (_ *mappb.Map Tags: m.Tags(), }) } - datumList := &mappb.MapResponse{ + resp := &mappb.MapResponse{ Results: elements, + Id: req.GetId(), + } + select { + case responseCh <- resp: + case <-ctx.Done(): + return ctx.Err() } - return datumList, err + return nil } diff --git a/pkg/mapper/service_test.go b/pkg/mapper/service_test.go index 48e5a290..7ec939d1 100644 --- a/pkg/mapper/service_test.go +++ b/pkg/mapper/service_test.go @@ -2,26 +2,82 @@ package mapper import ( "context" - "reflect" + "errors" + "fmt" + "net" "testing" "time" - "google.golang.org/protobuf/types/known/timestamppb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" + "google.golang.org/grpc/test/bufconn" + + proto "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1" - mappb "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1" + "google.golang.org/protobuf/types/known/timestamppb" ) -func TestService_MapFn(t *testing.T) { +func newTestServer(t *testing.T, register func(server *grpc.Server)) *grpc.ClientConn { + lis := bufconn.Listen(1024 * 1024) + t.Cleanup(func() { + _ = lis.Close() + }) + + server := grpc.NewServer() + t.Cleanup(func() { + server.Stop() + }) + + register(server) + + errChan := make(chan error, 1) + go func() { + // t.Fatal should only be called from the goroutine running the test + if err := server.Serve(lis); err != nil { + errChan <- err + } + }() + + dialer := func(context.Context, string) (net.Conn, error) { + return lis.Dial() + } + + conn, err := grpc.NewClient("passthrough://", grpc.WithContextDialer(dialer), grpc.WithTransportCredentials(insecure.NewCredentials())) + t.Cleanup(func() { + _ = conn.Close() + }) + if err != nil { + t.Fatalf("Creating new gRPC client connection: %v", err) + } + + var grpcServerErr error + select { + case grpcServerErr = <-errChan: + case <-time.After(500 * time.Millisecond): + grpcServerErr = errors.New("gRPC server didn't start in 500ms") + } + if err != nil { + t.Fatalf("Failed to start gRPC server: %v", grpcServerErr) + } + + return conn +} + +func TestService_mapFn(t *testing.T) { type args struct { ctx context.Context - d *mappb.MapRequest + d *proto.MapRequest } + tests := []struct { name string handler Mapper args args - want *mappb.MapResponse - wantErr bool + want *proto.MapResponse }{ { name: "map_fn_forward_msg", @@ -31,22 +87,23 @@ func TestService_MapFn(t *testing.T) { }), args: args{ ctx: context.Background(), - d: &mappb.MapRequest{ - Keys: []string{"client"}, - Value: []byte(`test`), - EventTime: timestamppb.New(time.Time{}), - Watermark: timestamppb.New(time.Time{}), + d: &proto.MapRequest{ + Request: &proto.MapRequest_Request{ + Keys: []string{"client"}, + Value: []byte(`test`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, }, }, - want: &mappb.MapResponse{ - Results: []*mappb.MapResponse_Result{ + want: &proto.MapResponse{ + Results: []*proto.MapResponse_Result{ { Keys: []string{"client_test"}, Value: []byte(`test`), }, }, }, - wantErr: false, }, { name: "map_fn_forward_msg_forward_to_all", @@ -56,21 +113,22 @@ func TestService_MapFn(t *testing.T) { }), args: args{ ctx: context.Background(), - d: &mappb.MapRequest{ - Keys: []string{"client"}, - Value: []byte(`test`), - EventTime: timestamppb.New(time.Time{}), - Watermark: timestamppb.New(time.Time{}), + d: &proto.MapRequest{ + Request: &proto.MapRequest_Request{ + Keys: []string{"client"}, + Value: []byte(`test`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, }, }, - want: &mappb.MapResponse{ - Results: []*mappb.MapResponse_Result{ + want: &proto.MapResponse{ + Results: []*proto.MapResponse_Result{ { Value: []byte(`test`), }, }, }, - wantErr: false, }, { name: "map_fn_forward_msg_drop_msg", @@ -79,41 +137,157 @@ func TestService_MapFn(t *testing.T) { }), args: args{ ctx: context.Background(), - d: &mappb.MapRequest{ - Keys: []string{"client"}, - Value: []byte(`test`), - EventTime: timestamppb.New(time.Time{}), - Watermark: timestamppb.New(time.Time{}), + d: &proto.MapRequest{ + Request: &proto.MapRequest_Request{ + Keys: []string{"client"}, + Value: []byte(`test`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, }, }, - want: &mappb.MapResponse{ - Results: []*mappb.MapResponse_Result{ + want: &proto.MapResponse{ + Results: []*proto.MapResponse_Result{ { Tags: []string{DROP}, - Value: []byte{}, + Value: nil, }, }, }, - wantErr: false, }, } + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - fs := &Service{ + svc := &Service{ Mapper: tt.handler, } - // here's a trick for testing: - // because we are not using gRPC, we directly set a new incoming ctx - // instead of the regular outgoing context in the real gRPC connection. - ctx := context.Background() - got, err := fs.MapFn(ctx, tt.args.d) - if (err != nil) != tt.wantErr { - t.Errorf("MapFn() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("MapFn() got = %v, want %v", got, tt.want) - } + + conn := newTestServer(t, func(server *grpc.Server) { + proto.RegisterMapServer(server, svc) + }) + + client := proto.NewMapClient(conn) + stream, err := client.MapFn(context.Background()) + require.NoError(t, err, "Creating stream") + + doHandshake(t, stream) + + err = stream.Send(tt.args.d) + require.NoError(t, err, "Sending message over the stream") + + got, err := stream.Recv() + require.NoError(t, err, "Receiving message from the stream") + + assert.Equal(t, got.Results, tt.want.Results) }) } } + +func doHandshake(t *testing.T, stream proto.Map_MapFnClient) { + t.Helper() + handshakeReq := &proto.MapRequest{ + Handshake: &proto.Handshake{Sot: true}, + } + err := stream.Send(handshakeReq) + require.NoError(t, err, "Sending handshake request to the stream") + + handshakeResp, err := stream.Recv() + require.NoError(t, err, "Receiving handshake response") + + require.Empty(t, handshakeResp.Results, "Invalid handshake response") + require.Empty(t, handshakeResp.Id, "Invalid handshake response") + require.NotNil(t, handshakeResp.Handshake, "Invalid handshake response") + require.True(t, handshakeResp.Handshake.Sot, "Invalid handshake response") +} + +func TestService_MapFn_Multiple_Messages(t *testing.T) { + svc := &Service{ + Mapper: MapperFunc(func(ctx context.Context, keys []string, datum Datum) Messages { + msg := datum.Value() + return MessagesBuilder().Append(NewMessage(msg).WithKeys([]string{keys[0] + "_test"})) + }), + } + conn := newTestServer(t, func(server *grpc.Server) { + proto.RegisterMapServer(server, svc) + }) + + client := proto.NewMapClient(conn) + stream, err := client.MapFn(context.Background()) + require.NoError(t, err, "Creating stream") + + doHandshake(t, stream) + + const msgCount = 10 + for i := 0; i < msgCount; i++ { + msg := proto.MapRequest{ + Request: &proto.MapRequest_Request{ + Keys: []string{"client"}, + Value: []byte(fmt.Sprintf("test_%d", i)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + } + err = stream.Send(&msg) + require.NoError(t, err, "Sending message over the stream") + } + err = stream.CloseSend() + require.NoError(t, err, "Closing the send direction of the stream") + + expectedResults := make([][]*proto.MapResponse_Result, msgCount) + for i := 0; i < msgCount; i++ { + expectedResults[i] = []*proto.MapResponse_Result{ + { + Keys: []string{"client_test"}, + Value: []byte(fmt.Sprintf("test_%d", i)), + }, + } + } + + results := make([][]*proto.MapResponse_Result, msgCount) + for i := 0; i < msgCount; i++ { + got, err := stream.Recv() + require.NoError(t, err, "Receiving message from the stream") + results[i] = got.Results + } + require.ElementsMatch(t, results, expectedResults) +} + +func TestService_MapFn_Panic(t *testing.T) { + svc := &Service{ + Mapper: MapperFunc(func(ctx context.Context, keys []string, datum Datum) Messages { + panic("map failed") + }), + // panic in the transformer causes the server to send a shutdown signal to shutdownCh channel. + // The function that errgroup runs in a goroutine will be blocked until this shutdown signal is received somewhere else. + // Since we don't listen for shutdown signal in the tests, we use buffered channel to unblock the server function. + shutdownCh: make(chan<- struct{}, 1), + } + conn := newTestServer(t, func(server *grpc.Server) { + proto.RegisterMapServer(server, svc) + }) + + client := proto.NewMapClient(conn) + stream, err := client.MapFn(context.Background()) + require.NoError(t, err, "Creating stream") + + doHandshake(t, stream) + + msg := proto.MapRequest{ + Request: &proto.MapRequest_Request{ + Keys: []string{"client"}, + Value: []byte("test"), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + } + err = stream.Send(&msg) + require.NoError(t, err, "Sending message over the stream") + err = stream.CloseSend() + require.NoError(t, err, "Closing the send direction of the stream") + _, err = stream.Recv() + require.Error(t, err, "Expected error while receiving message from the stream") + gotStatus, _ := status.FromError(err) + expectedStatus := status.Convert(status.Errorf(codes.Internal, "error processing requests: rpc error: code = Internal desc = panic inside map handler: map failed")) + require.Equal(t, expectedStatus, gotStatus) +} diff --git a/pkg/sideinput/examples/map_sideinput/udf/go.mod b/pkg/sideinput/examples/map_sideinput/udf/go.mod index d783a749..78ff598e 100644 --- a/pkg/sideinput/examples/map_sideinput/udf/go.mod +++ b/pkg/sideinput/examples/map_sideinput/udf/go.mod @@ -11,6 +11,7 @@ require ( require ( golang.org/x/net v0.29.0 // indirect + golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.18.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect diff --git a/pkg/sideinput/examples/map_sideinput/udf/go.sum b/pkg/sideinput/examples/map_sideinput/udf/go.sum index 8ba4f4c5..249ac9a2 100644 --- a/pkg/sideinput/examples/map_sideinput/udf/go.sum +++ b/pkg/sideinput/examples/map_sideinput/udf/go.sum @@ -10,6 +10,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= diff --git a/pkg/sideinput/examples/simple_sideinput/udf/go.mod b/pkg/sideinput/examples/simple_sideinput/udf/go.mod index bbf5f7b3..8eef3bde 100644 --- a/pkg/sideinput/examples/simple_sideinput/udf/go.mod +++ b/pkg/sideinput/examples/simple_sideinput/udf/go.mod @@ -11,6 +11,7 @@ require ( require ( golang.org/x/net v0.29.0 // indirect + golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.18.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect diff --git a/pkg/sideinput/examples/simple_sideinput/udf/go.sum b/pkg/sideinput/examples/simple_sideinput/udf/go.sum index 8ba4f4c5..249ac9a2 100644 --- a/pkg/sideinput/examples/simple_sideinput/udf/go.sum +++ b/pkg/sideinput/examples/simple_sideinput/udf/go.sum @@ -10,6 +10,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= diff --git a/pkg/sourcetransformer/service.go b/pkg/sourcetransformer/service.go index bbd91f19..c7212320 100644 --- a/pkg/sourcetransformer/service.go +++ b/pkg/sourcetransformer/service.go @@ -44,112 +44,123 @@ var errTransformerPanic = errors.New("transformer function panicked") // In addition to map function, SourceTransformFn also supports assigning a new event time to response. // SourceTransformFn can be used only at source vertex by source data transformer. func (fs *Service) SourceTransformFn(stream v1.SourceTransform_SourceTransformFnServer) error { - // handle panic - defer func() { - if r := recover(); r != nil { - log.Printf("panic inside sourcetransform handler: %v %v", r, string(debug.Stack())) - fs.shutdownCh <- struct{}{} - } - }() - - req, err := stream.Recv() - if err != nil { - return fmt.Errorf("reading handshake message from stream: %w", err) - } - if req.Handshake == nil || !req.Handshake.Sot { - return fmt.Errorf("invalid handshake message: %+v", req) + // perform handshake with client before processing requests + if err := fs.performHandshake(stream); err != nil { + return err } - handshakeResponse := &v1.SourceTransformResponse{ - Handshake: &v1.Handshake{ - Sot: true, - }, - } - if err := stream.Send(handshakeResponse); err != nil { - return fmt.Errorf("sending handshake response to client over gRPC stream: %w", err) - } + ctx, cancel := context.WithCancel(stream.Context()) + defer cancel() - ctx := stream.Context() - // We depend on grpContext to cancel all goroutines, since it will be automatically closed when the first function returns a non-nil error. - // This error will be caught later with grp.Wait() + // Use error group to manage goroutines, the grpCtx is cancelled when any of the + // goroutines return an error for the first time or the first time the wait returns. grp, grpCtx := errgroup.WithContext(ctx) senderCh := make(chan *v1.SourceTransformResponse, 500) // FIXME: identify the right buffer size - // goroutine to send the response to the stream + // goroutine to send the responses back to the client grp.Go(func() error { for { - var resp *v1.SourceTransformResponse select { case <-grpCtx.Done(): return grpCtx.Err() - case resp = <-senderCh: - } - if err := stream.Send(resp); err != nil { - return fmt.Errorf("failed to send response to client: %w", err) + case resp := <-senderCh: + if err := stream.Send(resp); err != nil { + return fmt.Errorf("failed to send response to client: %w", err) + } } } }) + var readErr error outer: for { - // Stop reading new messages when we are shutting down select { - case <-grpCtx.Done(): - // If the context was cancelled while this loop is running, it will be caught and returned in one of the errgroup's goroutines. + + case <-grpCtx.Done(): // Stop reading new messages when we are shutting down break outer default: } - d, err := stream.Recv() + if err == io.EOF { + break outer + } if err != nil { - if errors.Is(err, io.EOF) { - break - } - return err + log.Printf("Failed to receive request: %v", err) + readErr = err + // read loop is not part of the error group, so we need to cancel the context + // to signal the other goroutines to stop processing. + cancel() + break outer } - - req := d.Request grp.Go(func() (err error) { - defer func() { - if r := recover(); r != nil { - log.Printf("Panic inside source transform handler: %v %v", r, string(debug.Stack())) - // We only listen for 1 message on the shutdown channel. If multiple requests panic, only the first one will succeed. - // The one that succeds returns the errTransformerPanic. This causes grpCtx to be cancelled. - select { - case fs.shutdownCh <- struct{}{}: - case <-grpCtx.Done(): - } - err = errTransformerPanic - } - }() - var hd = NewHandlerDatum(req.GetValue(), req.EventTime.AsTime(), req.Watermark.AsTime(), req.Headers) - messageTs := fs.Transformer.Transform(grpCtx, req.GetKeys(), hd) - var results []*v1.SourceTransformResponse_Result - for _, m := range messageTs.Items() { - results = append(results, &v1.SourceTransformResponse_Result{ - EventTime: timestamppb.New(m.EventTime()), - Keys: m.Keys(), - Value: m.Value(), - Tags: m.Tags(), - }) - } - resp := &v1.SourceTransformResponse{ - Results: results, - Id: req.GetId(), - } - select { - case senderCh <- resp: - case <-grpCtx.Done(): - return grpCtx.Err() - } - return nil + return fs.handleRequest(grpCtx, d, senderCh) }) } + // wait for all the goroutines to finish, if any of the goroutines return an error, wait will return that error immediately. if err := grp.Wait(); err != nil { + fs.shutdownCh <- struct{}{} statusErr := status.Errorf(codes.Internal, err.Error()) return statusErr } + + // check if there was an error while reading from the stream + if readErr != nil { + return status.Errorf(codes.Internal, readErr.Error()) + } + return nil +} + +// performHandshake handles the handshake logic at the start of the stream. +func (fs *Service) performHandshake(stream v1.SourceTransform_SourceTransformFnServer) error { + req, err := stream.Recv() + if err != nil { + return status.Errorf(codes.Internal, "failed to receive handshake: %v", err) + } + if req.GetHandshake() == nil || !req.GetHandshake().GetSot() { + return status.Errorf(codes.InvalidArgument, "invalid handshake") + } + handshakeResponse := &v1.SourceTransformResponse{ + Handshake: &v1.Handshake{ + Sot: true, + }, + } + if err := stream.Send(handshakeResponse); err != nil { + return fmt.Errorf("sending handshake response to client over gRPC stream: %w", err) + } + return nil +} + +// handleRequest processes each request and sends the response to the response channel. +func (fs *Service) handleRequest(ctx context.Context, req *v1.SourceTransformRequest, responseCh chan<- *v1.SourceTransformResponse) (err error) { + defer func() { + if r := recover(); r != nil { + log.Printf("panic inside handler: %v %v", r, string(debug.Stack())) + err = errTransformerPanic + } + }() + + request := req.GetRequest() + hd := NewHandlerDatum(request.GetValue(), request.GetEventTime().AsTime(), request.GetWatermark().AsTime(), request.GetHeaders()) + messages := fs.Transformer.Transform(ctx, request.GetKeys(), hd) + var elements []*v1.SourceTransformResponse_Result + for _, m := range messages.Items() { + elements = append(elements, &v1.SourceTransformResponse_Result{ + Keys: m.Keys(), + Value: m.Value(), + Tags: m.Tags(), + EventTime: timestamppb.New(m.EventTime()), + }) + } + resp := &v1.SourceTransformResponse{ + Results: elements, + Id: req.GetRequest().GetId(), + } + select { + case responseCh <- resp: + case <-ctx.Done(): + return ctx.Err() + } return nil }