From b7c6e55a389acb89976fbb5b08d6a11f3e626f07 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Sun, 13 Oct 2024 17:04:48 +0530 Subject: [PATCH] feat: Unify Map Stream and Unary Map Operations Using a Shared gRPC Protocol Signed-off-by: Yashash H L --- pkg/apis/proto/mapstream/v1/mapstream.pb.go | 437 ------------------ pkg/apis/proto/mapstream/v1/mapstream.proto | 47 -- .../proto/mapstream/v1/mapstream_grpc.pb.go | 181 -------- .../v1/mapstreammock/mapstreammock.go | 202 -------- pkg/apis/proto/mapstream/v1/mockgen.go | 3 - pkg/mapstreamer/server.go | 4 +- pkg/mapstreamer/service.go | 144 ++++-- pkg/mapstreamer/service_test.go | 423 +++++++++++------ 8 files changed, 379 insertions(+), 1062 deletions(-) delete mode 100644 pkg/apis/proto/mapstream/v1/mapstream.pb.go delete mode 100644 pkg/apis/proto/mapstream/v1/mapstream.proto delete mode 100644 pkg/apis/proto/mapstream/v1/mapstream_grpc.pb.go delete mode 100644 pkg/apis/proto/mapstream/v1/mapstreammock/mapstreammock.go delete mode 100644 pkg/apis/proto/mapstream/v1/mockgen.go diff --git a/pkg/apis/proto/mapstream/v1/mapstream.pb.go b/pkg/apis/proto/mapstream/v1/mapstream.pb.go deleted file mode 100644 index 42619189..00000000 --- a/pkg/apis/proto/mapstream/v1/mapstream.pb.go +++ /dev/null @@ -1,437 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.34.2 -// protoc v4.25.1 -// source: pkg/apis/proto/mapstream/v1/mapstream.proto - -package v1 - -import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - emptypb "google.golang.org/protobuf/types/known/emptypb" - timestamppb "google.golang.org/protobuf/types/known/timestamppb" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -// * -// MapStreamRequest represents a request element. -type MapStreamRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"` - Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` - EventTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=event_time,json=eventTime,proto3" json:"event_time,omitempty"` - Watermark *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=watermark,proto3" json:"watermark,omitempty"` - Headers map[string]string `protobuf:"bytes,5,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` -} - -func (x *MapStreamRequest) Reset() { - *x = MapStreamRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_mapstream_v1_mapstream_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *MapStreamRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*MapStreamRequest) ProtoMessage() {} - -func (x *MapStreamRequest) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_mapstream_v1_mapstream_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use MapStreamRequest.ProtoReflect.Descriptor instead. -func (*MapStreamRequest) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_mapstream_v1_mapstream_proto_rawDescGZIP(), []int{0} -} - -func (x *MapStreamRequest) GetKeys() []string { - if x != nil { - return x.Keys - } - return nil -} - -func (x *MapStreamRequest) GetValue() []byte { - if x != nil { - return x.Value - } - return nil -} - -func (x *MapStreamRequest) GetEventTime() *timestamppb.Timestamp { - if x != nil { - return x.EventTime - } - return nil -} - -func (x *MapStreamRequest) GetWatermark() *timestamppb.Timestamp { - if x != nil { - return x.Watermark - } - return nil -} - -func (x *MapStreamRequest) GetHeaders() map[string]string { - if x != nil { - return x.Headers - } - return nil -} - -// * -// MapStreamResponse represents a response element. -type MapStreamResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Result *MapStreamResponse_Result `protobuf:"bytes,1,opt,name=result,proto3" json:"result,omitempty"` -} - -func (x *MapStreamResponse) Reset() { - *x = MapStreamResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_mapstream_v1_mapstream_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *MapStreamResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*MapStreamResponse) ProtoMessage() {} - -func (x *MapStreamResponse) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_mapstream_v1_mapstream_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use MapStreamResponse.ProtoReflect.Descriptor instead. -func (*MapStreamResponse) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_mapstream_v1_mapstream_proto_rawDescGZIP(), []int{1} -} - -func (x *MapStreamResponse) GetResult() *MapStreamResponse_Result { - if x != nil { - return x.Result - } - return nil -} - -// * -// ReadyResponse is the health check result. -type ReadyResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Ready bool `protobuf:"varint,1,opt,name=ready,proto3" json:"ready,omitempty"` -} - -func (x *ReadyResponse) Reset() { - *x = ReadyResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_mapstream_v1_mapstream_proto_msgTypes[2] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *ReadyResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ReadyResponse) ProtoMessage() {} - -func (x *ReadyResponse) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_mapstream_v1_mapstream_proto_msgTypes[2] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ReadyResponse.ProtoReflect.Descriptor instead. -func (*ReadyResponse) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_mapstream_v1_mapstream_proto_rawDescGZIP(), []int{2} -} - -func (x *ReadyResponse) GetReady() bool { - if x != nil { - return x.Ready - } - return false -} - -type MapStreamResponse_Result struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"` - Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` - Tags []string `protobuf:"bytes,3,rep,name=tags,proto3" json:"tags,omitempty"` -} - -func (x *MapStreamResponse_Result) Reset() { - *x = MapStreamResponse_Result{} - if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_mapstream_v1_mapstream_proto_msgTypes[4] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *MapStreamResponse_Result) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*MapStreamResponse_Result) ProtoMessage() {} - -func (x *MapStreamResponse_Result) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_mapstream_v1_mapstream_proto_msgTypes[4] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use MapStreamResponse_Result.ProtoReflect.Descriptor instead. -func (*MapStreamResponse_Result) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_mapstream_v1_mapstream_proto_rawDescGZIP(), []int{1, 0} -} - -func (x *MapStreamResponse_Result) GetKeys() []string { - if x != nil { - return x.Keys - } - return nil -} - -func (x *MapStreamResponse_Result) GetValue() []byte { - if x != nil { - return x.Value - } - return nil -} - -func (x *MapStreamResponse_Result) GetTags() []string { - if x != nil { - return x.Tags - } - return nil -} - -var File_pkg_apis_proto_mapstream_v1_mapstream_proto protoreflect.FileDescriptor - -var file_pkg_apis_proto_mapstream_v1_mapstream_proto_rawDesc = []byte{ - 0x0a, 0x2b, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2f, 0x6d, 0x61, 0x70, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2f, 0x76, 0x31, 0x2f, 0x6d, 0x61, - 0x70, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0c, 0x6d, - 0x61, 0x70, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x76, 0x31, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, - 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, - 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, - 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, - 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xb4, 0x02, 0x0a, 0x10, 0x4d, 0x61, - 0x70, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, - 0x0a, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, - 0x79, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x65, 0x76, 0x65, 0x6e, - 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, - 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, - 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, - 0x69, 0x6d, 0x65, 0x12, 0x38, 0x0a, 0x09, 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, - 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, - 0x6d, 0x70, 0x52, 0x09, 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x12, 0x45, 0x0a, - 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2b, - 0x2e, 0x6d, 0x61, 0x70, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x61, - 0x70, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x48, - 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68, 0x65, 0x61, - 0x64, 0x65, 0x72, 0x73, 0x1a, 0x3a, 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, - 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, - 0x22, 0x9b, 0x01, 0x0a, 0x11, 0x4d, 0x61, 0x70, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3e, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x26, 0x2e, 0x6d, 0x61, 0x70, 0x73, 0x74, 0x72, 0x65, - 0x61, 0x6d, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x06, - 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x1a, 0x46, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, - 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, - 0x6b, 0x65, 0x79, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x61, - 0x67, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x22, 0x25, - 0x0a, 0x0d, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x14, 0x0a, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, - 0x72, 0x65, 0x61, 0x64, 0x79, 0x32, 0x9d, 0x01, 0x0a, 0x09, 0x4d, 0x61, 0x70, 0x53, 0x74, 0x72, - 0x65, 0x61, 0x6d, 0x12, 0x50, 0x0a, 0x0b, 0x4d, 0x61, 0x70, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, - 0x46, 0x6e, 0x12, 0x1e, 0x2e, 0x6d, 0x61, 0x70, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x76, - 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6d, 0x61, 0x70, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x76, - 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x30, 0x01, 0x12, 0x3e, 0x0a, 0x07, 0x49, 0x73, 0x52, 0x65, 0x61, 0x64, 0x79, - 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, - 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1b, 0x2e, 0x6d, 0x61, 0x70, 0x73, 0x74, - 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x3d, 0x5a, 0x3b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, - 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2f, 0x6e, 0x75, 0x6d, - 0x61, 0x66, 0x6c, 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, - 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6d, 0x61, 0x70, 0x73, 0x74, 0x72, 0x65, 0x61, - 0x6d, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_pkg_apis_proto_mapstream_v1_mapstream_proto_rawDescOnce sync.Once - file_pkg_apis_proto_mapstream_v1_mapstream_proto_rawDescData = file_pkg_apis_proto_mapstream_v1_mapstream_proto_rawDesc -) - -func file_pkg_apis_proto_mapstream_v1_mapstream_proto_rawDescGZIP() []byte { - file_pkg_apis_proto_mapstream_v1_mapstream_proto_rawDescOnce.Do(func() { - file_pkg_apis_proto_mapstream_v1_mapstream_proto_rawDescData = protoimpl.X.CompressGZIP(file_pkg_apis_proto_mapstream_v1_mapstream_proto_rawDescData) - }) - return file_pkg_apis_proto_mapstream_v1_mapstream_proto_rawDescData -} - -var file_pkg_apis_proto_mapstream_v1_mapstream_proto_msgTypes = make([]protoimpl.MessageInfo, 5) -var file_pkg_apis_proto_mapstream_v1_mapstream_proto_goTypes = []any{ - (*MapStreamRequest)(nil), // 0: mapstream.v1.MapStreamRequest - (*MapStreamResponse)(nil), // 1: mapstream.v1.MapStreamResponse - (*ReadyResponse)(nil), // 2: mapstream.v1.ReadyResponse - nil, // 3: mapstream.v1.MapStreamRequest.HeadersEntry - (*MapStreamResponse_Result)(nil), // 4: mapstream.v1.MapStreamResponse.Result - (*timestamppb.Timestamp)(nil), // 5: google.protobuf.Timestamp - (*emptypb.Empty)(nil), // 6: google.protobuf.Empty -} -var file_pkg_apis_proto_mapstream_v1_mapstream_proto_depIdxs = []int32{ - 5, // 0: mapstream.v1.MapStreamRequest.event_time:type_name -> google.protobuf.Timestamp - 5, // 1: mapstream.v1.MapStreamRequest.watermark:type_name -> google.protobuf.Timestamp - 3, // 2: mapstream.v1.MapStreamRequest.headers:type_name -> mapstream.v1.MapStreamRequest.HeadersEntry - 4, // 3: mapstream.v1.MapStreamResponse.result:type_name -> mapstream.v1.MapStreamResponse.Result - 0, // 4: mapstream.v1.MapStream.MapStreamFn:input_type -> mapstream.v1.MapStreamRequest - 6, // 5: mapstream.v1.MapStream.IsReady:input_type -> google.protobuf.Empty - 1, // 6: mapstream.v1.MapStream.MapStreamFn:output_type -> mapstream.v1.MapStreamResponse - 2, // 7: mapstream.v1.MapStream.IsReady:output_type -> mapstream.v1.ReadyResponse - 6, // [6:8] is the sub-list for method output_type - 4, // [4:6] is the sub-list for method input_type - 4, // [4:4] is the sub-list for extension type_name - 4, // [4:4] is the sub-list for extension extendee - 0, // [0:4] is the sub-list for field type_name -} - -func init() { file_pkg_apis_proto_mapstream_v1_mapstream_proto_init() } -func file_pkg_apis_proto_mapstream_v1_mapstream_proto_init() { - if File_pkg_apis_proto_mapstream_v1_mapstream_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_pkg_apis_proto_mapstream_v1_mapstream_proto_msgTypes[0].Exporter = func(v any, i int) any { - switch v := v.(*MapStreamRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_pkg_apis_proto_mapstream_v1_mapstream_proto_msgTypes[1].Exporter = func(v any, i int) any { - switch v := v.(*MapStreamResponse); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_pkg_apis_proto_mapstream_v1_mapstream_proto_msgTypes[2].Exporter = func(v any, i int) any { - switch v := v.(*ReadyResponse); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_pkg_apis_proto_mapstream_v1_mapstream_proto_msgTypes[4].Exporter = func(v any, i int) any { - switch v := v.(*MapStreamResponse_Result); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_pkg_apis_proto_mapstream_v1_mapstream_proto_rawDesc, - NumEnums: 0, - NumMessages: 5, - NumExtensions: 0, - NumServices: 1, - }, - GoTypes: file_pkg_apis_proto_mapstream_v1_mapstream_proto_goTypes, - DependencyIndexes: file_pkg_apis_proto_mapstream_v1_mapstream_proto_depIdxs, - MessageInfos: file_pkg_apis_proto_mapstream_v1_mapstream_proto_msgTypes, - }.Build() - File_pkg_apis_proto_mapstream_v1_mapstream_proto = out.File - file_pkg_apis_proto_mapstream_v1_mapstream_proto_rawDesc = nil - file_pkg_apis_proto_mapstream_v1_mapstream_proto_goTypes = nil - file_pkg_apis_proto_mapstream_v1_mapstream_proto_depIdxs = nil -} diff --git a/pkg/apis/proto/mapstream/v1/mapstream.proto b/pkg/apis/proto/mapstream/v1/mapstream.proto deleted file mode 100644 index 43f85f60..00000000 --- a/pkg/apis/proto/mapstream/v1/mapstream.proto +++ /dev/null @@ -1,47 +0,0 @@ -syntax = "proto3"; - -option go_package = "github.com/numaproj/numaflow-go/pkg/apis/proto/mapstream/v1"; - -import "google/protobuf/empty.proto"; -import "google/protobuf/timestamp.proto"; - - -package mapstream.v1; - -service MapStream { - // MapStreamFn applies a function to each request element and returns a stream. - rpc MapStreamFn(MapStreamRequest) returns (stream MapStreamResponse); - - // IsReady is the heartbeat endpoint for gRPC. - rpc IsReady(google.protobuf.Empty) returns (ReadyResponse); -} - -/** - * MapStreamRequest represents a request element. - */ -message MapStreamRequest { - repeated string keys = 1; - bytes value = 2; - google.protobuf.Timestamp event_time = 3; - google.protobuf.Timestamp watermark = 4; - map headers = 5; -} - -/** - * MapStreamResponse represents a response element. - */ -message MapStreamResponse { - message Result { - repeated string keys = 1; - bytes value = 2; - repeated string tags = 3; - } - Result result = 1; -} - -/** - * ReadyResponse is the health check result. - */ -message ReadyResponse { - bool ready = 1; -} \ No newline at end of file diff --git a/pkg/apis/proto/mapstream/v1/mapstream_grpc.pb.go b/pkg/apis/proto/mapstream/v1/mapstream_grpc.pb.go deleted file mode 100644 index 9e9f3ac8..00000000 --- a/pkg/apis/proto/mapstream/v1/mapstream_grpc.pb.go +++ /dev/null @@ -1,181 +0,0 @@ -// Code generated by protoc-gen-go-grpc. DO NOT EDIT. -// versions: -// - protoc-gen-go-grpc v1.4.0 -// - protoc v4.25.1 -// source: pkg/apis/proto/mapstream/v1/mapstream.proto - -package v1 - -import ( - context "context" - grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" - emptypb "google.golang.org/protobuf/types/known/emptypb" -) - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.62.0 or later. -const _ = grpc.SupportPackageIsVersion8 - -const ( - MapStream_MapStreamFn_FullMethodName = "/mapstream.v1.MapStream/MapStreamFn" - MapStream_IsReady_FullMethodName = "/mapstream.v1.MapStream/IsReady" -) - -// MapStreamClient is the client API for MapStream service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. -type MapStreamClient interface { - // MapStreamFn applies a function to each request element and returns a stream. - MapStreamFn(ctx context.Context, in *MapStreamRequest, opts ...grpc.CallOption) (MapStream_MapStreamFnClient, error) - // IsReady is the heartbeat endpoint for gRPC. - IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) -} - -type mapStreamClient struct { - cc grpc.ClientConnInterface -} - -func NewMapStreamClient(cc grpc.ClientConnInterface) MapStreamClient { - return &mapStreamClient{cc} -} - -func (c *mapStreamClient) MapStreamFn(ctx context.Context, in *MapStreamRequest, opts ...grpc.CallOption) (MapStream_MapStreamFnClient, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - stream, err := c.cc.NewStream(ctx, &MapStream_ServiceDesc.Streams[0], MapStream_MapStreamFn_FullMethodName, cOpts...) - if err != nil { - return nil, err - } - x := &mapStreamMapStreamFnClient{ClientStream: stream} - if err := x.ClientStream.SendMsg(in); err != nil { - return nil, err - } - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } - return x, nil -} - -type MapStream_MapStreamFnClient interface { - Recv() (*MapStreamResponse, error) - grpc.ClientStream -} - -type mapStreamMapStreamFnClient struct { - grpc.ClientStream -} - -func (x *mapStreamMapStreamFnClient) Recv() (*MapStreamResponse, error) { - m := new(MapStreamResponse) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -func (c *mapStreamClient) IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - out := new(ReadyResponse) - err := c.cc.Invoke(ctx, MapStream_IsReady_FullMethodName, in, out, cOpts...) - if err != nil { - return nil, err - } - return out, nil -} - -// MapStreamServer is the server API for MapStream service. -// All implementations must embed UnimplementedMapStreamServer -// for forward compatibility -type MapStreamServer interface { - // MapStreamFn applies a function to each request element and returns a stream. - MapStreamFn(*MapStreamRequest, MapStream_MapStreamFnServer) error - // IsReady is the heartbeat endpoint for gRPC. - IsReady(context.Context, *emptypb.Empty) (*ReadyResponse, error) - mustEmbedUnimplementedMapStreamServer() -} - -// UnimplementedMapStreamServer must be embedded to have forward compatible implementations. -type UnimplementedMapStreamServer struct { -} - -func (UnimplementedMapStreamServer) MapStreamFn(*MapStreamRequest, MapStream_MapStreamFnServer) error { - return status.Errorf(codes.Unimplemented, "method MapStreamFn not implemented") -} -func (UnimplementedMapStreamServer) IsReady(context.Context, *emptypb.Empty) (*ReadyResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method IsReady not implemented") -} -func (UnimplementedMapStreamServer) mustEmbedUnimplementedMapStreamServer() {} - -// UnsafeMapStreamServer may be embedded to opt out of forward compatibility for this service. -// Use of this interface is not recommended, as added methods to MapStreamServer will -// result in compilation errors. -type UnsafeMapStreamServer interface { - mustEmbedUnimplementedMapStreamServer() -} - -func RegisterMapStreamServer(s grpc.ServiceRegistrar, srv MapStreamServer) { - s.RegisterService(&MapStream_ServiceDesc, srv) -} - -func _MapStream_MapStreamFn_Handler(srv interface{}, stream grpc.ServerStream) error { - m := new(MapStreamRequest) - if err := stream.RecvMsg(m); err != nil { - return err - } - return srv.(MapStreamServer).MapStreamFn(m, &mapStreamMapStreamFnServer{ServerStream: stream}) -} - -type MapStream_MapStreamFnServer interface { - Send(*MapStreamResponse) error - grpc.ServerStream -} - -type mapStreamMapStreamFnServer struct { - grpc.ServerStream -} - -func (x *mapStreamMapStreamFnServer) Send(m *MapStreamResponse) error { - return x.ServerStream.SendMsg(m) -} - -func _MapStream_IsReady_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(emptypb.Empty) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(MapStreamServer).IsReady(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: MapStream_IsReady_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(MapStreamServer).IsReady(ctx, req.(*emptypb.Empty)) - } - return interceptor(ctx, in, info, handler) -} - -// MapStream_ServiceDesc is the grpc.ServiceDesc for MapStream service. -// It's only intended for direct use with grpc.RegisterService, -// and not to be introspected or modified (even as a copy) -var MapStream_ServiceDesc = grpc.ServiceDesc{ - ServiceName: "mapstream.v1.MapStream", - HandlerType: (*MapStreamServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "IsReady", - Handler: _MapStream_IsReady_Handler, - }, - }, - Streams: []grpc.StreamDesc{ - { - StreamName: "MapStreamFn", - Handler: _MapStream_MapStreamFn_Handler, - ServerStreams: true, - }, - }, - Metadata: "pkg/apis/proto/mapstream/v1/mapstream.proto", -} diff --git a/pkg/apis/proto/mapstream/v1/mapstreammock/mapstreammock.go b/pkg/apis/proto/mapstream/v1/mapstreammock/mapstreammock.go deleted file mode 100644 index 025080bb..00000000 --- a/pkg/apis/proto/mapstream/v1/mapstreammock/mapstreammock.go +++ /dev/null @@ -1,202 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: github.com/numaproj/numaflow-go/pkg/apis/proto/mapstream/v1 (interfaces: MapStreamClient,MapStream_MapStreamFnClient) - -// Package mapstreammock is a generated GoMock package. -package mapstreammock - -import ( - context "context" - reflect "reflect" - - gomock "github.com/golang/mock/gomock" - v1 "github.com/numaproj/numaflow-go/pkg/apis/proto/mapstream/v1" - grpc "google.golang.org/grpc" - metadata "google.golang.org/grpc/metadata" - emptypb "google.golang.org/protobuf/types/known/emptypb" -) - -// MockMapStreamClient is a mock of MapStreamClient interface. -type MockMapStreamClient struct { - ctrl *gomock.Controller - recorder *MockMapStreamClientMockRecorder -} - -// MockMapStreamClientMockRecorder is the mock recorder for MockMapStreamClient. -type MockMapStreamClientMockRecorder struct { - mock *MockMapStreamClient -} - -// NewMockMapStreamClient creates a new mock instance. -func NewMockMapStreamClient(ctrl *gomock.Controller) *MockMapStreamClient { - mock := &MockMapStreamClient{ctrl: ctrl} - mock.recorder = &MockMapStreamClientMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockMapStreamClient) EXPECT() *MockMapStreamClientMockRecorder { - return m.recorder -} - -// IsReady mocks base method. -func (m *MockMapStreamClient) IsReady(arg0 context.Context, arg1 *emptypb.Empty, arg2 ...grpc.CallOption) (*v1.ReadyResponse, error) { - m.ctrl.T.Helper() - varargs := []interface{}{arg0, arg1} - for _, a := range arg2 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "IsReady", varargs...) - ret0, _ := ret[0].(*v1.ReadyResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// IsReady indicates an expected call of IsReady. -func (mr *MockMapStreamClientMockRecorder) IsReady(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0, arg1}, arg2...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsReady", reflect.TypeOf((*MockMapStreamClient)(nil).IsReady), varargs...) -} - -// MapStreamFn mocks base method. -func (m *MockMapStreamClient) MapStreamFn(arg0 context.Context, arg1 *v1.MapStreamRequest, arg2 ...grpc.CallOption) (v1.MapStream_MapStreamFnClient, error) { - m.ctrl.T.Helper() - varargs := []interface{}{arg0, arg1} - for _, a := range arg2 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "MapStreamFn", varargs...) - ret0, _ := ret[0].(v1.MapStream_MapStreamFnClient) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// MapStreamFn indicates an expected call of MapStreamFn. -func (mr *MockMapStreamClientMockRecorder) MapStreamFn(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0, arg1}, arg2...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MapStreamFn", reflect.TypeOf((*MockMapStreamClient)(nil).MapStreamFn), varargs...) -} - -// MockMapStream_MapStreamFnClient is a mock of MapStream_MapStreamFnClient interface. -type MockMapStream_MapStreamFnClient struct { - ctrl *gomock.Controller - recorder *MockMapStream_MapStreamFnClientMockRecorder -} - -// MockMapStream_MapStreamFnClientMockRecorder is the mock recorder for MockMapStream_MapStreamFnClient. -type MockMapStream_MapStreamFnClientMockRecorder struct { - mock *MockMapStream_MapStreamFnClient -} - -// NewMockMapStream_MapStreamFnClient creates a new mock instance. -func NewMockMapStream_MapStreamFnClient(ctrl *gomock.Controller) *MockMapStream_MapStreamFnClient { - mock := &MockMapStream_MapStreamFnClient{ctrl: ctrl} - mock.recorder = &MockMapStream_MapStreamFnClientMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockMapStream_MapStreamFnClient) EXPECT() *MockMapStream_MapStreamFnClientMockRecorder { - return m.recorder -} - -// CloseSend mocks base method. -func (m *MockMapStream_MapStreamFnClient) CloseSend() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CloseSend") - ret0, _ := ret[0].(error) - return ret0 -} - -// CloseSend indicates an expected call of CloseSend. -func (mr *MockMapStream_MapStreamFnClientMockRecorder) CloseSend() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseSend", reflect.TypeOf((*MockMapStream_MapStreamFnClient)(nil).CloseSend)) -} - -// Context mocks base method. -func (m *MockMapStream_MapStreamFnClient) Context() context.Context { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Context") - ret0, _ := ret[0].(context.Context) - return ret0 -} - -// Context indicates an expected call of Context. -func (mr *MockMapStream_MapStreamFnClientMockRecorder) Context() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockMapStream_MapStreamFnClient)(nil).Context)) -} - -// Header mocks base method. -func (m *MockMapStream_MapStreamFnClient) Header() (metadata.MD, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Header") - ret0, _ := ret[0].(metadata.MD) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Header indicates an expected call of Header. -func (mr *MockMapStream_MapStreamFnClientMockRecorder) Header() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Header", reflect.TypeOf((*MockMapStream_MapStreamFnClient)(nil).Header)) -} - -// Recv mocks base method. -func (m *MockMapStream_MapStreamFnClient) Recv() (*v1.MapStreamResponse, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Recv") - ret0, _ := ret[0].(*v1.MapStreamResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Recv indicates an expected call of Recv. -func (mr *MockMapStream_MapStreamFnClientMockRecorder) Recv() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Recv", reflect.TypeOf((*MockMapStream_MapStreamFnClient)(nil).Recv)) -} - -// RecvMsg mocks base method. -func (m *MockMapStream_MapStreamFnClient) RecvMsg(arg0 interface{}) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "RecvMsg", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// RecvMsg indicates an expected call of RecvMsg. -func (mr *MockMapStream_MapStreamFnClientMockRecorder) RecvMsg(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockMapStream_MapStreamFnClient)(nil).RecvMsg), arg0) -} - -// SendMsg mocks base method. -func (m *MockMapStream_MapStreamFnClient) SendMsg(arg0 interface{}) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SendMsg", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// SendMsg indicates an expected call of SendMsg. -func (mr *MockMapStream_MapStreamFnClientMockRecorder) SendMsg(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockMapStream_MapStreamFnClient)(nil).SendMsg), arg0) -} - -// Trailer mocks base method. -func (m *MockMapStream_MapStreamFnClient) Trailer() metadata.MD { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Trailer") - ret0, _ := ret[0].(metadata.MD) - return ret0 -} - -// Trailer indicates an expected call of Trailer. -func (mr *MockMapStream_MapStreamFnClientMockRecorder) Trailer() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trailer", reflect.TypeOf((*MockMapStream_MapStreamFnClient)(nil).Trailer)) -} diff --git a/pkg/apis/proto/mapstream/v1/mockgen.go b/pkg/apis/proto/mapstream/v1/mockgen.go deleted file mode 100644 index f75e0855..00000000 --- a/pkg/apis/proto/mapstream/v1/mockgen.go +++ /dev/null @@ -1,3 +0,0 @@ -package v1 - -//go:generate mockgen -destination mapstreammock/mapstreammock.go -package mapstreammock github.com/numaproj/numaflow-go/pkg/apis/proto/mapstream/v1 MapStreamClient,MapStream_MapStreamFnClient diff --git a/pkg/mapstreamer/server.go b/pkg/mapstreamer/server.go index 5426c6cc..42bdcd82 100644 --- a/pkg/mapstreamer/server.go +++ b/pkg/mapstreamer/server.go @@ -11,7 +11,7 @@ import ( "google.golang.org/grpc" "github.com/numaproj/numaflow-go/pkg" - mapstreampb "github.com/numaproj/numaflow-go/pkg/apis/proto/mapstream/v1" + mappb "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1" "github.com/numaproj/numaflow-go/pkg/info" "github.com/numaproj/numaflow-go/pkg/shared" ) @@ -68,7 +68,7 @@ func (m *server) Start(ctx context.Context) error { m.grpcServer = shared.CreateGRPCServer(m.opts.maxMessageSize) // register the map stream service - mapstreampb.RegisterMapStreamServer(m.grpcServer, m.svc) + mappb.RegisterMapServer(m.grpcServer, m.svc) // start a go routine to stop the server gracefully when the context is done // or a shutdown signal is received from the service diff --git a/pkg/mapstreamer/service.go b/pkg/mapstreamer/service.go index 2b883316..5a19ab71 100644 --- a/pkg/mapstreamer/service.go +++ b/pkg/mapstreamer/service.go @@ -2,12 +2,17 @@ package mapstreamer import ( "context" + "fmt" + "io" "log" "runtime/debug" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" - mapstreampb "github.com/numaproj/numaflow-go/pkg/apis/proto/mapstream/v1" + mappb "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1" ) const ( @@ -20,63 +25,128 @@ const ( // Service implements the proto gen server interface and contains the map // streaming function. type Service struct { - mapstreampb.UnimplementedMapStreamServer + mappb.UnimplementedMapServer shutdownCh chan<- struct{} MapperStream MapStreamer } // IsReady returns true to indicate the gRPC connection is ready. -func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*mapstreampb.ReadyResponse, error) { - return &mapstreampb.ReadyResponse{Ready: true}, nil +func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*mappb.ReadyResponse, error) { + return &mappb.ReadyResponse{Ready: true}, nil } -// MapStreamFn applies a function to each request element and streams the results back. -func (fs *Service) MapStreamFn(d *mapstreampb.MapStreamRequest, stream mapstreampb.MapStream_MapStreamFnServer) error { - var hd = NewHandlerDatum(d.GetValue(), d.GetEventTime().AsTime(), d.GetWatermark().AsTime(), d.GetHeaders()) +// MapFn applies a function to each request element and streams the results back. +func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error { + // perform handshake with client before processing requests + if err := fs.performHandshake(stream); err != nil { + return err + } + ctx := stream.Context() - messageCh := make(chan Message) - - done := make(chan bool) - go func() { - // handle panic - defer func() { - if r := recover(); r != nil { - log.Printf("panic inside mapStream handler: %v %v", r, string(debug.Stack())) - fs.shutdownCh <- struct{}{} + + for { + req, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + log.Printf("Failed to receive request: %v", err) + return err + } + + messageCh := make(chan Message) + g, groupCtx := errgroup.WithContext(ctx) + + g.Go(func() error { + return fs.invokeHandler(groupCtx, req, messageCh) + }) + + g.Go(func() error { + return fs.writeResponseToClient(groupCtx, stream, req.GetId(), messageCh) + }) + + // Wait for the error group to finish + if err := g.Wait(); err != nil { + log.Printf("error processing requests: %v", err) + if err == io.EOF { + return nil } - }() - fs.MapperStream.MapStream(ctx, d.GetKeys(), hd, messageCh) - done <- true + fs.shutdownCh <- struct{}{} + return status.Errorf(codes.Internal, "error processing requests: %v", err) + } + } + + return nil +} + +// invokeHandler handles the map stream invocation. +func (fs *Service) invokeHandler(ctx context.Context, req *mappb.MapRequest, messageCh chan<- Message) (err error) { + defer func() { + if r := recover(); r != nil { + log.Printf("panic inside mapStream handler: %v %v", r, string(debug.Stack())) + err = fmt.Errorf("panic inside mapStream handler: %v", r) + return + } + // close the message channel after the handler is done processing the request + close(messageCh) }() + streamReq := req.GetRequest() + hd := NewHandlerDatum(streamReq.GetValue(), streamReq.GetEventTime().AsTime(), streamReq.GetWatermark().AsTime(), streamReq.GetHeaders()) + fs.MapperStream.MapStream(ctx, req.GetRequest().GetKeys(), hd, messageCh) + return nil +} - finished := false +// writeResponseToClient writes the response back to the client. +func (fs *Service) writeResponseToClient(ctx context.Context, stream mappb.Map_MapFnServer, reqID string, messageCh <-chan Message) error { for { select { - case <-done: - finished = true case message, ok := <-messageCh: if !ok { - // Channel already closed, not closing again. + // Send EOT message since we are done processing the request. + eotMessage := &mappb.MapResponse{ + Status: &mappb.Status{Eot: true}, + Id: reqID, + } + if err := stream.Send(eotMessage); err != nil { + return err + } return nil } - element := &mapstreampb.MapStreamResponse{ - Result: &mapstreampb.MapStreamResponse_Result{ - Keys: message.Keys(), - Value: message.Value(), - Tags: message.Tags(), + element := &mappb.MapResponse{ + Results: []*mappb.MapResponse_Result{ + { + Keys: message.Keys(), + Value: message.Value(), + Tags: message.Tags(), + }, }, + Id: reqID, } - err := stream.Send(element) - // the error here is returned by stream.Send() which is already a gRPC error - if err != nil { - // Channel may or may not be closed, as we are not sure leave it to GC. + if err := stream.Send(element); err != nil { return err } - default: - if finished { - close(messageCh) - return nil - } + case <-ctx.Done(): + return ctx.Err() } } } + +// performHandshake handles the handshake logic at the start of the stream. +func (fs *Service) performHandshake(stream mappb.Map_MapFnServer) error { + req, err := stream.Recv() + if err != nil { + return status.Errorf(codes.Internal, "failed to receive handshake: %v", err) + } + if req.GetHandshake() == nil || !req.GetHandshake().GetSot() { + return status.Errorf(codes.InvalidArgument, "invalid handshake") + } + handshakeResponse := &mappb.MapResponse{ + Handshake: &mappb.Handshake{ + Sot: true, + }, + } + if err := stream.Send(handshakeResponse); err != nil { + return fmt.Errorf("sending handshake response to client over gRPC stream: %w", err) + } + return nil +} diff --git a/pkg/mapstreamer/service_test.go b/pkg/mapstreamer/service_test.go index 6b8f00a4..ba6b746f 100644 --- a/pkg/mapstreamer/service_test.go +++ b/pkg/mapstreamer/service_test.go @@ -2,229 +2,346 @@ package mapstreamer import ( "context" + "errors" "fmt" - "reflect" - "sync" + "net" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "google.golang.org/grpc" - "google.golang.org/protobuf/types/known/timestamppb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" + "google.golang.org/grpc/test/bufconn" + + proto "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1" - mapstreampb "github.com/numaproj/numaflow-go/pkg/apis/proto/mapstream/v1" + "google.golang.org/protobuf/types/known/timestamppb" ) -type MapStreamFnServerTest struct { - ctx context.Context - outputCh chan mapstreampb.MapStreamResponse - grpc.ServerStream -} +func newTestServer(t *testing.T, register func(server *grpc.Server)) *grpc.ClientConn { + lis := bufconn.Listen(1024 * 1024) + t.Cleanup(func() { + _ = lis.Close() + }) -func NewMapStreamFnServerTest( - ctx context.Context, - outputCh chan mapstreampb.MapStreamResponse, -) *MapStreamFnServerTest { - return &MapStreamFnServerTest{ - ctx: ctx, - outputCh: outputCh, - } -} + server := grpc.NewServer() + t.Cleanup(func() { + server.Stop() + }) -func (u *MapStreamFnServerTest) Send(d *mapstreampb.MapStreamResponse) error { - u.outputCh <- *d - return nil -} + register(server) -func (u *MapStreamFnServerTest) Context() context.Context { - return u.ctx -} + errChan := make(chan error, 1) + go func() { + // t.Fatal should only be called from the goroutine running the test + if err := server.Serve(lis); err != nil { + errChan <- err + } + }() -type MapStreamFnServerErrTest struct { - ctx context.Context - grpc.ServerStream -} + dialer := func(context.Context, string) (net.Conn, error) { + return lis.Dial() + } -func NewMapStreamFnServerErrTest( - ctx context.Context, + conn, err := grpc.NewClient("passthrough://", grpc.WithContextDialer(dialer), grpc.WithTransportCredentials(insecure.NewCredentials())) + t.Cleanup(func() { + _ = conn.Close() + }) + if err != nil { + t.Fatalf("Creating new gRPC client connection: %v", err) + } -) *MapStreamFnServerErrTest { - return &MapStreamFnServerErrTest{ - ctx: ctx, + var grpcServerErr error + select { + case grpcServerErr = <-errChan: + case <-time.After(500 * time.Millisecond): + grpcServerErr = errors.New("gRPC server didn't start in 500ms") + } + if err != nil { + t.Fatalf("Failed to start gRPC server: %v", grpcServerErr) } -} -func (u *MapStreamFnServerErrTest) Send(_ *mapstreampb.MapStreamResponse) error { - return fmt.Errorf("send error") + return conn } -func (u *MapStreamFnServerErrTest) Context() context.Context { - return u.ctx -} +func TestService_MapFn(t *testing.T) { + type args struct { + ctx context.Context + d *proto.MapRequest + } -func TestService_MapFnStream(t *testing.T) { tests := []struct { - name string - handler MapStreamer - input *mapstreampb.MapStreamRequest - expected []*mapstreampb.MapStreamResponse - expectedErr bool - streamErr bool + name string + handler MapStreamer + args args + want *proto.MapResponse }{ { - name: "map_stream_fn_forward_msg", + name: "map_fn_forward_msg", handler: MapStreamerFunc(func(ctx context.Context, keys []string, datum Datum, messageCh chan<- Message) { msg := datum.Value() messageCh <- NewMessage(msg).WithKeys([]string{keys[0] + "_test"}) - close(messageCh) }), - input: &mapstreampb.MapStreamRequest{ - Keys: []string{"client"}, - Value: []byte(`test`), - EventTime: timestamppb.New(time.Time{}), - Watermark: timestamppb.New(time.Time{}), - }, - expected: []*mapstreampb.MapStreamResponse{ - { - Result: &mapstreampb.MapStreamResponse_Result{ - Keys: []string{"client_test"}, - Value: []byte(`test`), + args: args{ + ctx: context.Background(), + d: &proto.MapRequest{ + Request: &proto.MapRequest_Request{ + Keys: []string{"client"}, + Value: []byte(`test`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), }, }, }, - expectedErr: false, - }, - { - name: "map_stream_fn_forward_msg_without_close_stream", - handler: MapStreamerFunc(func(ctx context.Context, keys []string, datum Datum, messageCh chan<- Message) { - msg := datum.Value() - messageCh <- NewMessage(msg).WithKeys([]string{keys[0] + "_test"}) - }), - input: &mapstreampb.MapStreamRequest{ - Keys: []string{"client"}, - Value: []byte(`test`), - EventTime: timestamppb.New(time.Time{}), - Watermark: timestamppb.New(time.Time{}), - }, - expected: []*mapstreampb.MapStreamResponse{ - { - Result: &mapstreampb.MapStreamResponse_Result{ + want: &proto.MapResponse{ + Results: []*proto.MapResponse_Result{ + { Keys: []string{"client_test"}, Value: []byte(`test`), }, }, }, - expectedErr: false, }, { - name: "map_stream_fn_forward_msg_forward_to_all", + name: "map_fn_forward_msg_forward_to_all", handler: MapStreamerFunc(func(ctx context.Context, keys []string, datum Datum, messageCh chan<- Message) { msg := datum.Value() messageCh <- NewMessage(msg) - close(messageCh) }), - input: &mapstreampb.MapStreamRequest{ - Keys: []string{"client"}, - Value: []byte(`test`), - EventTime: timestamppb.New(time.Time{}), - Watermark: timestamppb.New(time.Time{}), + args: args{ + ctx: context.Background(), + d: &proto.MapRequest{ + Request: &proto.MapRequest_Request{ + Keys: []string{"client"}, + Value: []byte(`test`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + }, }, - expected: []*mapstreampb.MapStreamResponse{ - { - Result: &mapstreampb.MapStreamResponse_Result{ + want: &proto.MapResponse{ + Results: []*proto.MapResponse_Result{ + { Value: []byte(`test`), }, }, }, - expectedErr: false, }, { - name: "map_stream_fn_forward_msg_drop_msg", + name: "map_fn_forward_msg_drop_msg", handler: MapStreamerFunc(func(ctx context.Context, keys []string, datum Datum, messageCh chan<- Message) { messageCh <- MessageToDrop() - close(messageCh) }), - input: &mapstreampb.MapStreamRequest{ - Keys: []string{"client"}, - Value: []byte(`test`), - EventTime: timestamppb.New(time.Time{}), - Watermark: timestamppb.New(time.Time{}), - }, - expected: []*mapstreampb.MapStreamResponse{ - { - Result: &mapstreampb.MapStreamResponse_Result{ - Tags: []string{DROP}, - Value: []byte{}, + args: args{ + ctx: context.Background(), + d: &proto.MapRequest{ + Request: &proto.MapRequest_Request{ + Keys: []string{"client"}, + Value: []byte(`test`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), }, }, }, - expectedErr: false, - }, - { - name: "map_stream_fn_forward_err", - handler: MapStreamerFunc(func(ctx context.Context, keys []string, datum Datum, messageCh chan<- Message) { - messageCh <- MessageToDrop() - close(messageCh) - }), - input: &mapstreampb.MapStreamRequest{ - Keys: []string{"client"}, - Value: []byte(`test`), - EventTime: timestamppb.New(time.Time{}), - Watermark: timestamppb.New(time.Time{}), - }, - expected: []*mapstreampb.MapStreamResponse{ - { - Result: &mapstreampb.MapStreamResponse_Result{ + want: &proto.MapResponse{ + Results: []*proto.MapResponse_Result{ + { Tags: []string{DROP}, - Value: []byte{}, + Value: nil, }, }, }, - expectedErr: true, }, } + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - fs := &Service{ + svc := &Service{ MapperStream: tt.handler, } - // here's a trick for testing: - // because we are not using gRPC, we directly set a new incoming ctx - // instead of the regular outgoing context in the real gRPC connection. - ctx := context.Background() - outputCh := make(chan mapstreampb.MapStreamResponse) - result := make([]*mapstreampb.MapStreamResponse, 0) - - var udfMapStreamFnStream mapstreampb.MapStream_MapStreamFnServer - if tt.streamErr { - udfMapStreamFnStream = NewMapStreamFnServerErrTest(ctx) - } else { - udfMapStreamFnStream = NewMapStreamFnServerTest(ctx, outputCh) - } - var wg sync.WaitGroup + conn := newTestServer(t, func(server *grpc.Server) { + proto.RegisterMapServer(server, svc) + }) - wg.Add(1) - go func() { - defer wg.Done() - for msg := range outputCh { - result = append(result, &msg) - } - }() + client := proto.NewMapClient(conn) + stream, err := client.MapFn(context.Background()) + require.NoError(t, err, "Creating stream") - err := fs.MapStreamFn(tt.input, udfMapStreamFnStream) - close(outputCh) - wg.Wait() + doHandshake(t, stream) - if err != nil { - assert.True(t, tt.expectedErr, "MapStreamFn() error = %v, expectedErr %v", err, tt.expectedErr) - return - } + err = stream.Send(tt.args.d) + require.NoError(t, err, "Sending message over the stream") - if !reflect.DeepEqual(result, tt.expected) { - t.Errorf("MapStreamFn() got = %v, want %v", result, tt.expected) - } + got, err := stream.Recv() + require.NoError(t, err, "Receiving message from the stream") + assert.Equal(t, got.Results, tt.want.Results) }) } } + +func doHandshake(t *testing.T, stream proto.Map_MapFnClient) { + t.Helper() + handshakeReq := &proto.MapRequest{ + Handshake: &proto.Handshake{Sot: true}, + } + err := stream.Send(handshakeReq) + require.NoError(t, err, "Sending handshake request to the stream") + + handshakeResp, err := stream.Recv() + require.NoError(t, err, "Receiving handshake response") + + require.Empty(t, handshakeResp.Results, "Invalid handshake response") + require.Empty(t, handshakeResp.Id, "Invalid handshake response") + require.NotNil(t, handshakeResp.Handshake, "Invalid handshake response") + require.True(t, handshakeResp.Handshake.Sot, "Invalid handshake response") +} + +func TestService_MapFn_SingleMessage_MultipleResponses(t *testing.T) { + svc := &Service{ + MapperStream: MapStreamerFunc(func(ctx context.Context, keys []string, datum Datum, messageCh chan<- Message) { + for i := 0; i < 10; i++ { + msg := fmt.Sprintf("response_%d", i) + messageCh <- NewMessage([]byte(msg)).WithKeys([]string{keys[0] + "_test"}) + } + }), + } + conn := newTestServer(t, func(server *grpc.Server) { + proto.RegisterMapServer(server, svc) + }) + + client := proto.NewMapClient(conn) + stream, err := client.MapFn(context.Background()) + require.NoError(t, err, "Creating stream") + + doHandshake(t, stream) + + msg := proto.MapRequest{ + Request: &proto.MapRequest_Request{ + Keys: []string{"client"}, + Value: []byte("test"), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + } + err = stream.Send(&msg) + require.NoError(t, err, "Sending message over the stream") + err = stream.CloseSend() + require.NoError(t, err, "Closing the send direction of the stream") + + expectedResults := make([][]*proto.MapResponse_Result, 10) + for i := 0; i < 10; i++ { + expectedResults[i] = []*proto.MapResponse_Result{ + { + Keys: []string{"client_test"}, + Value: []byte(fmt.Sprintf("response_%d", i)), + }, + } + } + + results := make([][]*proto.MapResponse_Result, 0) + for i := 0; i < 10; i++ { + got, err := stream.Recv() + require.NoError(t, err, "Receiving message from the stream") + results = append(results, got.Results) + } + + require.ElementsMatch(t, results, expectedResults) + +} + +func TestService_MapFn_Multiple_Messages(t *testing.T) { + svc := &Service{ + MapperStream: MapStreamerFunc(func(ctx context.Context, keys []string, datum Datum, messageCh chan<- Message) { + msg := datum.Value() + messageCh <- NewMessage(msg).WithKeys([]string{keys[0] + "_test"}) + }), + } + conn := newTestServer(t, func(server *grpc.Server) { + proto.RegisterMapServer(server, svc) + }) + + client := proto.NewMapClient(conn) + stream, err := client.MapFn(context.Background()) + require.NoError(t, err, "Creating stream") + + doHandshake(t, stream) + + const msgCount = 10 + for i := 0; i < msgCount; i++ { + msg := proto.MapRequest{ + Request: &proto.MapRequest_Request{ + Keys: []string{"client"}, + Value: []byte(fmt.Sprintf("test_%d", i)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + } + err = stream.Send(&msg) + require.NoError(t, err, "Sending message over the stream") + } + err = stream.CloseSend() + require.NoError(t, err, "Closing the send direction of the stream") + + expectedResults := make([][]*proto.MapResponse_Result, msgCount) + results := make([][]*proto.MapResponse_Result, 0) + + for i := 0; i < msgCount; i++ { + expectedResults[i] = []*proto.MapResponse_Result{ + { + Keys: []string{"client_test"}, + Value: []byte(fmt.Sprintf("test_%d", i)), + }, + } + + got, err := stream.Recv() + require.NoError(t, err, "Receiving message from the stream") + results = append(results, got.Results) + + eot, err := stream.Recv() + require.NoError(t, err, "Receiving message from the stream") + require.True(t, eot.Status.Eot, "Expected EOT message") + } + + require.ElementsMatch(t, results, expectedResults) +} + +func TestService_MapFn_Panic(t *testing.T) { + svc := &Service{ + MapperStream: MapStreamerFunc(func(ctx context.Context, keys []string, datum Datum, messageCh chan<- Message) { + panic("map failed") + }), + shutdownCh: make(chan<- struct{}, 1), + } + conn := newTestServer(t, func(server *grpc.Server) { + proto.RegisterMapServer(server, svc) + }) + + client := proto.NewMapClient(conn) + stream, err := client.MapFn(context.Background()) + require.NoError(t, err, "Creating stream") + + doHandshake(t, stream) + + msg := proto.MapRequest{ + Request: &proto.MapRequest_Request{ + Keys: []string{"client"}, + Value: []byte("test"), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + } + err = stream.Send(&msg) + require.NoError(t, err, "Sending message over the stream") + err = stream.CloseSend() + require.NoError(t, err, "Closing the send direction of the stream") + _, err = stream.Recv() + require.Error(t, err, "Expected error while receiving message from the stream") + gotStatus, _ := status.FromError(err) + expectedStatus := status.Convert(status.Errorf(codes.Internal, "error processing requests: panic inside mapStream handler: map failed")) + require.Equal(t, expectedStatus, gotStatus) +}