diff --git a/pkg/apis/proto/batchmap/v1/batchmap.pb.go b/pkg/apis/proto/batchmap/v1/batchmap.pb.go deleted file mode 100644 index 5b5c9bd5..00000000 --- a/pkg/apis/proto/batchmap/v1/batchmap.pb.go +++ /dev/null @@ -1,456 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.34.2 -// protoc v4.25.1 -// source: pkg/apis/proto/batchmap/v1/batchmap.proto - -package v1 - -import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - emptypb "google.golang.org/protobuf/types/known/emptypb" - timestamppb "google.golang.org/protobuf/types/known/timestamppb" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -// * -// BatchMapRequest represents a request element. -type BatchMapRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"` - Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` - EventTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=event_time,json=eventTime,proto3" json:"event_time,omitempty"` - Watermark *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=watermark,proto3" json:"watermark,omitempty"` - Headers map[string]string `protobuf:"bytes,5,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` - // This ID is used uniquely identify a map request - Id string `protobuf:"bytes,6,opt,name=id,proto3" json:"id,omitempty"` -} - -func (x *BatchMapRequest) Reset() { - *x = BatchMapRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *BatchMapRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*BatchMapRequest) ProtoMessage() {} - -func (x *BatchMapRequest) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use BatchMapRequest.ProtoReflect.Descriptor instead. -func (*BatchMapRequest) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescGZIP(), []int{0} -} - -func (x *BatchMapRequest) GetKeys() []string { - if x != nil { - return x.Keys - } - return nil -} - -func (x *BatchMapRequest) GetValue() []byte { - if x != nil { - return x.Value - } - return nil -} - -func (x *BatchMapRequest) GetEventTime() *timestamppb.Timestamp { - if x != nil { - return x.EventTime - } - return nil -} - -func (x *BatchMapRequest) GetWatermark() *timestamppb.Timestamp { - if x != nil { - return x.Watermark - } - return nil -} - -func (x *BatchMapRequest) GetHeaders() map[string]string { - if x != nil { - return x.Headers - } - return nil -} - -func (x *BatchMapRequest) GetId() string { - if x != nil { - return x.Id - } - return "" -} - -// * -// BatchMapResponse represents a response element. -type BatchMapResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Results []*BatchMapResponse_Result `protobuf:"bytes,1,rep,name=results,proto3" json:"results,omitempty"` - // This ID is used to refer the responses to the request it corresponds to. - Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"` -} - -func (x *BatchMapResponse) Reset() { - *x = BatchMapResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *BatchMapResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*BatchMapResponse) ProtoMessage() {} - -func (x *BatchMapResponse) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use BatchMapResponse.ProtoReflect.Descriptor instead. -func (*BatchMapResponse) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescGZIP(), []int{1} -} - -func (x *BatchMapResponse) GetResults() []*BatchMapResponse_Result { - if x != nil { - return x.Results - } - return nil -} - -func (x *BatchMapResponse) GetId() string { - if x != nil { - return x.Id - } - return "" -} - -// * -// ReadyResponse is the health check result. -type ReadyResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Ready bool `protobuf:"varint,1,opt,name=ready,proto3" json:"ready,omitempty"` -} - -func (x *ReadyResponse) Reset() { - *x = ReadyResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[2] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *ReadyResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ReadyResponse) ProtoMessage() {} - -func (x *ReadyResponse) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[2] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ReadyResponse.ProtoReflect.Descriptor instead. -func (*ReadyResponse) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescGZIP(), []int{2} -} - -func (x *ReadyResponse) GetReady() bool { - if x != nil { - return x.Ready - } - return false -} - -type BatchMapResponse_Result struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"` - Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` - Tags []string `protobuf:"bytes,3,rep,name=tags,proto3" json:"tags,omitempty"` -} - -func (x *BatchMapResponse_Result) Reset() { - *x = BatchMapResponse_Result{} - if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[4] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *BatchMapResponse_Result) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*BatchMapResponse_Result) ProtoMessage() {} - -func (x *BatchMapResponse_Result) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[4] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use BatchMapResponse_Result.ProtoReflect.Descriptor instead. -func (*BatchMapResponse_Result) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescGZIP(), []int{1, 0} -} - -func (x *BatchMapResponse_Result) GetKeys() []string { - if x != nil { - return x.Keys - } - return nil -} - -func (x *BatchMapResponse_Result) GetValue() []byte { - if x != nil { - return x.Value - } - return nil -} - -func (x *BatchMapResponse_Result) GetTags() []string { - if x != nil { - return x.Tags - } - return nil -} - -var File_pkg_apis_proto_batchmap_v1_batchmap_proto protoreflect.FileDescriptor - -var file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDesc = []byte{ - 0x0a, 0x29, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2f, 0x62, 0x61, 0x74, 0x63, 0x68, 0x6d, 0x61, 0x70, 0x2f, 0x76, 0x31, 0x2f, 0x62, 0x61, 0x74, - 0x63, 0x68, 0x6d, 0x61, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0b, 0x62, 0x61, 0x74, - 0x63, 0x68, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, - 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xc1, 0x02, 0x0a, 0x0f, 0x42, 0x61, 0x74, 0x63, 0x68, - 0x4d, 0x61, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, - 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x12, 0x14, - 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, - 0x61, 0x6c, 0x75, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x69, - 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, - 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, - 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, - 0x38, 0x0a, 0x09, 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x18, 0x04, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, - 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x12, 0x43, 0x0a, 0x07, 0x68, 0x65, 0x61, - 0x64, 0x65, 0x72, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x29, 0x2e, 0x62, 0x61, 0x74, - 0x63, 0x68, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x4d, 0x61, - 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, - 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x12, 0x0e, - 0x0a, 0x02, 0x69, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x1a, 0x3a, - 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, - 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, - 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xaa, 0x01, 0x0a, 0x10, 0x42, - 0x61, 0x74, 0x63, 0x68, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x3e, 0x0a, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, - 0x32, 0x24, 0x2e, 0x62, 0x61, 0x74, 0x63, 0x68, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x42, - 0x61, 0x74, 0x63, 0x68, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, - 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x12, - 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x1a, - 0x46, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, 0x79, - 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x12, 0x14, 0x0a, - 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, - 0x6c, 0x75, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, - 0x09, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x22, 0x25, 0x0a, 0x0d, 0x52, 0x65, 0x61, 0x64, 0x79, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x61, 0x64, - 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x32, 0x98, - 0x01, 0x0a, 0x08, 0x42, 0x61, 0x74, 0x63, 0x68, 0x4d, 0x61, 0x70, 0x12, 0x3d, 0x0a, 0x07, 0x49, - 0x73, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1a, - 0x2e, 0x62, 0x61, 0x74, 0x63, 0x68, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, - 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4d, 0x0a, 0x0a, 0x42, 0x61, - 0x74, 0x63, 0x68, 0x4d, 0x61, 0x70, 0x46, 0x6e, 0x12, 0x1c, 0x2e, 0x62, 0x61, 0x74, 0x63, 0x68, - 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x4d, 0x61, 0x70, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x62, 0x61, 0x74, 0x63, 0x68, 0x6d, 0x61, - 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x42, 0x3c, 0x5a, 0x3a, 0x67, 0x69, 0x74, - 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, - 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x66, 0x6c, 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, - 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62, 0x61, 0x74, 0x63, - 0x68, 0x6d, 0x61, 0x70, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescOnce sync.Once - file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescData = file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDesc -) - -func file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescGZIP() []byte { - file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescOnce.Do(func() { - file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescData = protoimpl.X.CompressGZIP(file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescData) - }) - return file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescData -} - -var file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes = make([]protoimpl.MessageInfo, 5) -var file_pkg_apis_proto_batchmap_v1_batchmap_proto_goTypes = []any{ - (*BatchMapRequest)(nil), // 0: batchmap.v1.BatchMapRequest - (*BatchMapResponse)(nil), // 1: batchmap.v1.BatchMapResponse - (*ReadyResponse)(nil), // 2: batchmap.v1.ReadyResponse - nil, // 3: batchmap.v1.BatchMapRequest.HeadersEntry - (*BatchMapResponse_Result)(nil), // 4: batchmap.v1.BatchMapResponse.Result - (*timestamppb.Timestamp)(nil), // 5: google.protobuf.Timestamp - (*emptypb.Empty)(nil), // 6: google.protobuf.Empty -} -var file_pkg_apis_proto_batchmap_v1_batchmap_proto_depIdxs = []int32{ - 5, // 0: batchmap.v1.BatchMapRequest.event_time:type_name -> google.protobuf.Timestamp - 5, // 1: batchmap.v1.BatchMapRequest.watermark:type_name -> google.protobuf.Timestamp - 3, // 2: batchmap.v1.BatchMapRequest.headers:type_name -> batchmap.v1.BatchMapRequest.HeadersEntry - 4, // 3: batchmap.v1.BatchMapResponse.results:type_name -> batchmap.v1.BatchMapResponse.Result - 6, // 4: batchmap.v1.BatchMap.IsReady:input_type -> google.protobuf.Empty - 0, // 5: batchmap.v1.BatchMap.BatchMapFn:input_type -> batchmap.v1.BatchMapRequest - 2, // 6: batchmap.v1.BatchMap.IsReady:output_type -> batchmap.v1.ReadyResponse - 1, // 7: batchmap.v1.BatchMap.BatchMapFn:output_type -> batchmap.v1.BatchMapResponse - 6, // [6:8] is the sub-list for method output_type - 4, // [4:6] is the sub-list for method input_type - 4, // [4:4] is the sub-list for extension type_name - 4, // [4:4] is the sub-list for extension extendee - 0, // [0:4] is the sub-list for field type_name -} - -func init() { file_pkg_apis_proto_batchmap_v1_batchmap_proto_init() } -func file_pkg_apis_proto_batchmap_v1_batchmap_proto_init() { - if File_pkg_apis_proto_batchmap_v1_batchmap_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[0].Exporter = func(v any, i int) any { - switch v := v.(*BatchMapRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[1].Exporter = func(v any, i int) any { - switch v := v.(*BatchMapResponse); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[2].Exporter = func(v any, i int) any { - switch v := v.(*ReadyResponse); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[4].Exporter = func(v any, i int) any { - switch v := v.(*BatchMapResponse_Result); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDesc, - NumEnums: 0, - NumMessages: 5, - NumExtensions: 0, - NumServices: 1, - }, - GoTypes: file_pkg_apis_proto_batchmap_v1_batchmap_proto_goTypes, - DependencyIndexes: file_pkg_apis_proto_batchmap_v1_batchmap_proto_depIdxs, - MessageInfos: file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes, - }.Build() - File_pkg_apis_proto_batchmap_v1_batchmap_proto = out.File - file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDesc = nil - file_pkg_apis_proto_batchmap_v1_batchmap_proto_goTypes = nil - file_pkg_apis_proto_batchmap_v1_batchmap_proto_depIdxs = nil -} diff --git a/pkg/apis/proto/batchmap/v1/batchmap.proto b/pkg/apis/proto/batchmap/v1/batchmap.proto deleted file mode 100644 index f7f4d10e..00000000 --- a/pkg/apis/proto/batchmap/v1/batchmap.proto +++ /dev/null @@ -1,52 +0,0 @@ -syntax = "proto3"; - -option go_package = "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1"; - -import "google/protobuf/empty.proto"; -import "google/protobuf/timestamp.proto"; - -package batchmap.v1; - -service BatchMap { - // IsReady is the heartbeat endpoint for gRPC. - rpc IsReady(google.protobuf.Empty) returns (ReadyResponse); - - // BatchMapFn is a bi-directional streaming rpc which applies a - // map function on each BatchMapRequest element of the stream and then streams - // back BatchMapResponse elements. - rpc BatchMapFn(stream BatchMapRequest) returns (stream BatchMapResponse); -} - -/** - * BatchMapRequest represents a request element. - */ -message BatchMapRequest { - repeated string keys = 1; - bytes value = 2; - google.protobuf.Timestamp event_time = 3; - google.protobuf.Timestamp watermark = 4; - map headers = 5; - // This ID is used uniquely identify a map request - string id = 6; -} - -/** - * BatchMapResponse represents a response element. - */ -message BatchMapResponse { - message Result { - repeated string keys = 1; - bytes value = 2; - repeated string tags = 3; - } - repeated Result results = 1; - // This ID is used to refer the responses to the request it corresponds to. - string id = 2; -} - -/** - * ReadyResponse is the health check result. - */ -message ReadyResponse { - bool ready = 1; -} \ No newline at end of file diff --git a/pkg/apis/proto/batchmap/v1/batchmap_grpc.pb.go b/pkg/apis/proto/batchmap/v1/batchmap_grpc.pb.go deleted file mode 100644 index d6d7abf1..00000000 --- a/pkg/apis/proto/batchmap/v1/batchmap_grpc.pb.go +++ /dev/null @@ -1,190 +0,0 @@ -// Code generated by protoc-gen-go-grpc. DO NOT EDIT. -// versions: -// - protoc-gen-go-grpc v1.4.0 -// - protoc v4.25.1 -// source: pkg/apis/proto/batchmap/v1/batchmap.proto - -package v1 - -import ( - context "context" - grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" - emptypb "google.golang.org/protobuf/types/known/emptypb" -) - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.62.0 or later. -const _ = grpc.SupportPackageIsVersion8 - -const ( - BatchMap_IsReady_FullMethodName = "/batchmap.v1.BatchMap/IsReady" - BatchMap_BatchMapFn_FullMethodName = "/batchmap.v1.BatchMap/BatchMapFn" -) - -// BatchMapClient is the client API for BatchMap service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. -type BatchMapClient interface { - // IsReady is the heartbeat endpoint for gRPC. - IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) - // BatchMapFn is a bi-directional streaming rpc which applies a - // map function on each BatchMapRequest element of the stream and then streams - // back BatchMapResponse elements. - BatchMapFn(ctx context.Context, opts ...grpc.CallOption) (BatchMap_BatchMapFnClient, error) -} - -type batchMapClient struct { - cc grpc.ClientConnInterface -} - -func NewBatchMapClient(cc grpc.ClientConnInterface) BatchMapClient { - return &batchMapClient{cc} -} - -func (c *batchMapClient) IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - out := new(ReadyResponse) - err := c.cc.Invoke(ctx, BatchMap_IsReady_FullMethodName, in, out, cOpts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *batchMapClient) BatchMapFn(ctx context.Context, opts ...grpc.CallOption) (BatchMap_BatchMapFnClient, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - stream, err := c.cc.NewStream(ctx, &BatchMap_ServiceDesc.Streams[0], BatchMap_BatchMapFn_FullMethodName, cOpts...) - if err != nil { - return nil, err - } - x := &batchMapBatchMapFnClient{ClientStream: stream} - return x, nil -} - -type BatchMap_BatchMapFnClient interface { - Send(*BatchMapRequest) error - Recv() (*BatchMapResponse, error) - grpc.ClientStream -} - -type batchMapBatchMapFnClient struct { - grpc.ClientStream -} - -func (x *batchMapBatchMapFnClient) Send(m *BatchMapRequest) error { - return x.ClientStream.SendMsg(m) -} - -func (x *batchMapBatchMapFnClient) Recv() (*BatchMapResponse, error) { - m := new(BatchMapResponse) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -// BatchMapServer is the server API for BatchMap service. -// All implementations must embed UnimplementedBatchMapServer -// for forward compatibility -type BatchMapServer interface { - // IsReady is the heartbeat endpoint for gRPC. - IsReady(context.Context, *emptypb.Empty) (*ReadyResponse, error) - // BatchMapFn is a bi-directional streaming rpc which applies a - // map function on each BatchMapRequest element of the stream and then streams - // back BatchMapResponse elements. - BatchMapFn(BatchMap_BatchMapFnServer) error - mustEmbedUnimplementedBatchMapServer() -} - -// UnimplementedBatchMapServer must be embedded to have forward compatible implementations. -type UnimplementedBatchMapServer struct { -} - -func (UnimplementedBatchMapServer) IsReady(context.Context, *emptypb.Empty) (*ReadyResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method IsReady not implemented") -} -func (UnimplementedBatchMapServer) BatchMapFn(BatchMap_BatchMapFnServer) error { - return status.Errorf(codes.Unimplemented, "method BatchMapFn not implemented") -} -func (UnimplementedBatchMapServer) mustEmbedUnimplementedBatchMapServer() {} - -// UnsafeBatchMapServer may be embedded to opt out of forward compatibility for this service. -// Use of this interface is not recommended, as added methods to BatchMapServer will -// result in compilation errors. -type UnsafeBatchMapServer interface { - mustEmbedUnimplementedBatchMapServer() -} - -func RegisterBatchMapServer(s grpc.ServiceRegistrar, srv BatchMapServer) { - s.RegisterService(&BatchMap_ServiceDesc, srv) -} - -func _BatchMap_IsReady_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(emptypb.Empty) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(BatchMapServer).IsReady(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: BatchMap_IsReady_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(BatchMapServer).IsReady(ctx, req.(*emptypb.Empty)) - } - return interceptor(ctx, in, info, handler) -} - -func _BatchMap_BatchMapFn_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(BatchMapServer).BatchMapFn(&batchMapBatchMapFnServer{ServerStream: stream}) -} - -type BatchMap_BatchMapFnServer interface { - Send(*BatchMapResponse) error - Recv() (*BatchMapRequest, error) - grpc.ServerStream -} - -type batchMapBatchMapFnServer struct { - grpc.ServerStream -} - -func (x *batchMapBatchMapFnServer) Send(m *BatchMapResponse) error { - return x.ServerStream.SendMsg(m) -} - -func (x *batchMapBatchMapFnServer) Recv() (*BatchMapRequest, error) { - m := new(BatchMapRequest) - if err := x.ServerStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -// BatchMap_ServiceDesc is the grpc.ServiceDesc for BatchMap service. -// It's only intended for direct use with grpc.RegisterService, -// and not to be introspected or modified (even as a copy) -var BatchMap_ServiceDesc = grpc.ServiceDesc{ - ServiceName: "batchmap.v1.BatchMap", - HandlerType: (*BatchMapServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "IsReady", - Handler: _BatchMap_IsReady_Handler, - }, - }, - Streams: []grpc.StreamDesc{ - { - StreamName: "BatchMapFn", - Handler: _BatchMap_BatchMapFn_Handler, - ServerStreams: true, - ClientStreams: true, - }, - }, - Metadata: "pkg/apis/proto/batchmap/v1/batchmap.proto", -} diff --git a/pkg/apis/proto/batchmap/v1/batchmapmock/batchmapmock.go b/pkg/apis/proto/batchmap/v1/batchmapmock/batchmapmock.go deleted file mode 100644 index 43f0315b..00000000 --- a/pkg/apis/proto/batchmap/v1/batchmapmock/batchmapmock.go +++ /dev/null @@ -1,216 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1 (interfaces: BatchMapClient,BatchMap_BatchMapFnClient) - -// Package batchmapmock is a generated GoMock package. -package batchmapmock - -import ( - context "context" - reflect "reflect" - - gomock "github.com/golang/mock/gomock" - v1 "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1" - grpc "google.golang.org/grpc" - metadata "google.golang.org/grpc/metadata" - emptypb "google.golang.org/protobuf/types/known/emptypb" -) - -// MockBatchMapClient is a mock of BatchMapClient interface. -type MockBatchMapClient struct { - ctrl *gomock.Controller - recorder *MockBatchMapClientMockRecorder -} - -// MockBatchMapClientMockRecorder is the mock recorder for MockBatchMapClient. -type MockBatchMapClientMockRecorder struct { - mock *MockBatchMapClient -} - -// NewMockBatchMapClient creates a new mock instance. -func NewMockBatchMapClient(ctrl *gomock.Controller) *MockBatchMapClient { - mock := &MockBatchMapClient{ctrl: ctrl} - mock.recorder = &MockBatchMapClientMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockBatchMapClient) EXPECT() *MockBatchMapClientMockRecorder { - return m.recorder -} - -// BatchMapFn mocks base method. -func (m *MockBatchMapClient) BatchMapFn(arg0 context.Context, arg1 ...grpc.CallOption) (v1.BatchMap_BatchMapFnClient, error) { - m.ctrl.T.Helper() - varargs := []interface{}{arg0} - for _, a := range arg1 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "BatchMapFn", varargs...) - ret0, _ := ret[0].(v1.BatchMap_BatchMapFnClient) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// BatchMapFn indicates an expected call of BatchMapFn. -func (mr *MockBatchMapClientMockRecorder) BatchMapFn(arg0 interface{}, arg1 ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0}, arg1...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BatchMapFn", reflect.TypeOf((*MockBatchMapClient)(nil).BatchMapFn), varargs...) -} - -// IsReady mocks base method. -func (m *MockBatchMapClient) IsReady(arg0 context.Context, arg1 *emptypb.Empty, arg2 ...grpc.CallOption) (*v1.ReadyResponse, error) { - m.ctrl.T.Helper() - varargs := []interface{}{arg0, arg1} - for _, a := range arg2 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "IsReady", varargs...) - ret0, _ := ret[0].(*v1.ReadyResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// IsReady indicates an expected call of IsReady. -func (mr *MockBatchMapClientMockRecorder) IsReady(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0, arg1}, arg2...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsReady", reflect.TypeOf((*MockBatchMapClient)(nil).IsReady), varargs...) -} - -// MockBatchMap_BatchMapFnClient is a mock of BatchMap_BatchMapFnClient interface. -type MockBatchMap_BatchMapFnClient struct { - ctrl *gomock.Controller - recorder *MockBatchMap_BatchMapFnClientMockRecorder -} - -// MockBatchMap_BatchMapFnClientMockRecorder is the mock recorder for MockBatchMap_BatchMapFnClient. -type MockBatchMap_BatchMapFnClientMockRecorder struct { - mock *MockBatchMap_BatchMapFnClient -} - -// NewMockBatchMap_BatchMapFnClient creates a new mock instance. -func NewMockBatchMap_BatchMapFnClient(ctrl *gomock.Controller) *MockBatchMap_BatchMapFnClient { - mock := &MockBatchMap_BatchMapFnClient{ctrl: ctrl} - mock.recorder = &MockBatchMap_BatchMapFnClientMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockBatchMap_BatchMapFnClient) EXPECT() *MockBatchMap_BatchMapFnClientMockRecorder { - return m.recorder -} - -// CloseSend mocks base method. -func (m *MockBatchMap_BatchMapFnClient) CloseSend() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CloseSend") - ret0, _ := ret[0].(error) - return ret0 -} - -// CloseSend indicates an expected call of CloseSend. -func (mr *MockBatchMap_BatchMapFnClientMockRecorder) CloseSend() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseSend", reflect.TypeOf((*MockBatchMap_BatchMapFnClient)(nil).CloseSend)) -} - -// Context mocks base method. -func (m *MockBatchMap_BatchMapFnClient) Context() context.Context { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Context") - ret0, _ := ret[0].(context.Context) - return ret0 -} - -// Context indicates an expected call of Context. -func (mr *MockBatchMap_BatchMapFnClientMockRecorder) Context() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockBatchMap_BatchMapFnClient)(nil).Context)) -} - -// Header mocks base method. -func (m *MockBatchMap_BatchMapFnClient) Header() (metadata.MD, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Header") - ret0, _ := ret[0].(metadata.MD) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Header indicates an expected call of Header. -func (mr *MockBatchMap_BatchMapFnClientMockRecorder) Header() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Header", reflect.TypeOf((*MockBatchMap_BatchMapFnClient)(nil).Header)) -} - -// Recv mocks base method. -func (m *MockBatchMap_BatchMapFnClient) Recv() (*v1.BatchMapResponse, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Recv") - ret0, _ := ret[0].(*v1.BatchMapResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Recv indicates an expected call of Recv. -func (mr *MockBatchMap_BatchMapFnClientMockRecorder) Recv() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Recv", reflect.TypeOf((*MockBatchMap_BatchMapFnClient)(nil).Recv)) -} - -// RecvMsg mocks base method. -func (m *MockBatchMap_BatchMapFnClient) RecvMsg(arg0 interface{}) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "RecvMsg", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// RecvMsg indicates an expected call of RecvMsg. -func (mr *MockBatchMap_BatchMapFnClientMockRecorder) RecvMsg(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockBatchMap_BatchMapFnClient)(nil).RecvMsg), arg0) -} - -// Send mocks base method. -func (m *MockBatchMap_BatchMapFnClient) Send(arg0 *v1.BatchMapRequest) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Send", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// Send indicates an expected call of Send. -func (mr *MockBatchMap_BatchMapFnClientMockRecorder) Send(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockBatchMap_BatchMapFnClient)(nil).Send), arg0) -} - -// SendMsg mocks base method. -func (m *MockBatchMap_BatchMapFnClient) SendMsg(arg0 interface{}) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SendMsg", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// SendMsg indicates an expected call of SendMsg. -func (mr *MockBatchMap_BatchMapFnClientMockRecorder) SendMsg(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockBatchMap_BatchMapFnClient)(nil).SendMsg), arg0) -} - -// Trailer mocks base method. -func (m *MockBatchMap_BatchMapFnClient) Trailer() metadata.MD { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Trailer") - ret0, _ := ret[0].(metadata.MD) - return ret0 -} - -// Trailer indicates an expected call of Trailer. -func (mr *MockBatchMap_BatchMapFnClientMockRecorder) Trailer() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trailer", reflect.TypeOf((*MockBatchMap_BatchMapFnClient)(nil).Trailer)) -} diff --git a/pkg/apis/proto/batchmap/v1/mockgen.go b/pkg/apis/proto/batchmap/v1/mockgen.go deleted file mode 100644 index 04982aeb..00000000 --- a/pkg/apis/proto/batchmap/v1/mockgen.go +++ /dev/null @@ -1,3 +0,0 @@ -package v1 - -//go:generate mockgen -destination batchmapmock/batchmapmock.go -package batchmapmock github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1 BatchMapClient,BatchMap_BatchMapFnClient diff --git a/pkg/apis/proto/map/v1/map.pb.go b/pkg/apis/proto/map/v1/map.pb.go index c7708359..f638b4e7 100644 --- a/pkg/apis/proto/map/v1/map.pb.go +++ b/pkg/apis/proto/map/v1/map.pb.go @@ -33,6 +33,7 @@ type MapRequest struct { // This ID is used to uniquely identify a map request Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"` Handshake *Handshake `protobuf:"bytes,3,opt,name=handshake,proto3,oneof" json:"handshake,omitempty"` + Status *Status `protobuf:"bytes,4,opt,name=status,proto3,oneof" json:"status,omitempty"` } func (x *MapRequest) Reset() { @@ -88,6 +89,13 @@ func (x *MapRequest) GetHandshake() *Handshake { return nil } +func (x *MapRequest) GetStatus() *Status { + if x != nil { + return x.Status + } + return nil +} + // Handshake message between client and server to indicate the start of transmission. type Handshake struct { state protoimpl.MessageState @@ -137,6 +145,54 @@ func (x *Handshake) GetSot() bool { return false } +// Status message to indicate the status of the message. +type Status struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Eot bool `protobuf:"varint,1,opt,name=eot,proto3" json:"eot,omitempty"` +} + +func (x *Status) Reset() { + *x = Status{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Status) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Status) ProtoMessage() {} + +func (x *Status) ProtoReflect() protoreflect.Message { + mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Status.ProtoReflect.Descriptor instead. +func (*Status) Descriptor() ([]byte, []int) { + return file_pkg_apis_proto_map_v1_map_proto_rawDescGZIP(), []int{2} +} + +func (x *Status) GetEot() bool { + if x != nil { + return x.Eot + } + return false +} + // * // MapResponse represents a response element. type MapResponse struct { @@ -148,12 +204,13 @@ type MapResponse struct { // This ID is used to refer the responses to the request it corresponds to. Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"` Handshake *Handshake `protobuf:"bytes,3,opt,name=handshake,proto3,oneof" json:"handshake,omitempty"` + Status *Status `protobuf:"bytes,4,opt,name=status,proto3,oneof" json:"status,omitempty"` } func (x *MapResponse) Reset() { *x = MapResponse{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[2] + mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -166,7 +223,7 @@ func (x *MapResponse) String() string { func (*MapResponse) ProtoMessage() {} func (x *MapResponse) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[2] + mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -179,7 +236,7 @@ func (x *MapResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use MapResponse.ProtoReflect.Descriptor instead. func (*MapResponse) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_map_v1_map_proto_rawDescGZIP(), []int{2} + return file_pkg_apis_proto_map_v1_map_proto_rawDescGZIP(), []int{3} } func (x *MapResponse) GetResults() []*MapResponse_Result { @@ -203,6 +260,13 @@ func (x *MapResponse) GetHandshake() *Handshake { return nil } +func (x *MapResponse) GetStatus() *Status { + if x != nil { + return x.Status + } + return nil +} + // * // ReadyResponse is the health check result. type ReadyResponse struct { @@ -216,7 +280,7 @@ type ReadyResponse struct { func (x *ReadyResponse) Reset() { *x = ReadyResponse{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[3] + mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -229,7 +293,7 @@ func (x *ReadyResponse) String() string { func (*ReadyResponse) ProtoMessage() {} func (x *ReadyResponse) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[3] + mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -242,7 +306,7 @@ func (x *ReadyResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use ReadyResponse.ProtoReflect.Descriptor instead. func (*ReadyResponse) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_map_v1_map_proto_rawDescGZIP(), []int{3} + return file_pkg_apis_proto_map_v1_map_proto_rawDescGZIP(), []int{4} } func (x *ReadyResponse) GetReady() bool { @@ -267,7 +331,7 @@ type MapRequest_Request struct { func (x *MapRequest_Request) Reset() { *x = MapRequest_Request{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[4] + mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -280,7 +344,7 @@ func (x *MapRequest_Request) String() string { func (*MapRequest_Request) ProtoMessage() {} func (x *MapRequest_Request) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[4] + mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -344,7 +408,7 @@ type MapResponse_Result struct { func (x *MapResponse_Result) Reset() { *x = MapResponse_Result{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[6] + mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -357,7 +421,7 @@ func (x *MapResponse_Result) String() string { func (*MapResponse_Result) ProtoMessage() {} func (x *MapResponse_Result) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[6] + mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -370,7 +434,7 @@ func (x *MapResponse_Result) ProtoReflect() protoreflect.Message { // Deprecated: Use MapResponse_Result.ProtoReflect.Descriptor instead. func (*MapResponse_Result) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_map_v1_map_proto_rawDescGZIP(), []int{2, 0} + return file_pkg_apis_proto_map_v1_map_proto_rawDescGZIP(), []int{3, 0} } func (x *MapResponse_Result) GetKeys() []string { @@ -403,7 +467,7 @@ var file_pkg_apis_proto_map_v1_map_proto_rawDesc = []byte{ 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, - 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xc0, 0x03, 0x0a, 0x0a, 0x4d, 0x61, 0x70, 0x52, + 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xf8, 0x03, 0x0a, 0x0a, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x34, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x52, 0x65, 0x71, 0x75, @@ -412,56 +476,65 @@ var file_pkg_apis_proto_map_v1_map_proto_rawDesc = []byte{ 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x48, 0x00, 0x52, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x88, - 0x01, 0x01, 0x1a, 0xa7, 0x02, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, - 0x0a, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, - 0x79, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x65, 0x76, 0x65, 0x6e, - 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, - 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, - 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, - 0x69, 0x6d, 0x65, 0x12, 0x38, 0x0a, 0x09, 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, - 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, - 0x6d, 0x70, 0x52, 0x09, 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x12, 0x41, 0x0a, - 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x27, + 0x01, 0x01, 0x12, 0x2b, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x48, 0x01, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x88, 0x01, 0x01, 0x1a, + 0xa7, 0x02, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6b, + 0x65, 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x12, + 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, + 0x69, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, + 0x12, 0x38, 0x0a, 0x09, 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x18, 0x04, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, + 0x09, 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x12, 0x41, 0x0a, 0x07, 0x68, 0x65, + 0x61, 0x64, 0x65, 0x72, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x6d, 0x61, + 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, + 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x1a, 0x3a, 0x0a, + 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, + 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, + 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x68, 0x61, + 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x42, 0x09, 0x0a, 0x07, 0x5f, 0x73, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x22, 0x1d, 0x0a, 0x09, 0x48, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x12, + 0x10, 0x0a, 0x03, 0x73, 0x6f, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x03, 0x73, 0x6f, + 0x74, 0x22, 0x1a, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x10, 0x0a, 0x03, 0x65, + 0x6f, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x03, 0x65, 0x6f, 0x74, 0x22, 0x97, 0x02, + 0x0a, 0x0b, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x34, 0x0a, + 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1a, + 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, + 0x6c, 0x74, 0x73, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x02, 0x69, 0x64, 0x12, 0x34, 0x0a, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, + 0x48, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x48, 0x00, 0x52, 0x09, 0x68, 0x61, 0x6e, + 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x88, 0x01, 0x01, 0x12, 0x2b, 0x0a, 0x06, 0x73, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x6d, 0x61, 0x70, 0x2e, + 0x76, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x48, 0x01, 0x52, 0x06, 0x73, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x88, 0x01, 0x01, 0x1a, 0x46, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, + 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, + 0x6b, 0x65, 0x79, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x61, + 0x67, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x42, 0x0c, + 0x0a, 0x0a, 0x5f, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x42, 0x09, 0x0a, 0x07, + 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x25, 0x0a, 0x0d, 0x52, 0x65, 0x61, 0x64, 0x79, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x61, 0x64, + 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x32, 0x75, + 0x0a, 0x03, 0x4d, 0x61, 0x70, 0x12, 0x34, 0x0a, 0x05, 0x4d, 0x61, 0x70, 0x46, 0x6e, 0x12, 0x12, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, - 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, - 0x1a, 0x3a, 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, - 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, - 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x42, 0x0c, 0x0a, 0x0a, - 0x5f, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x22, 0x1d, 0x0a, 0x09, 0x48, 0x61, - 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x73, 0x6f, 0x74, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x08, 0x52, 0x03, 0x73, 0x6f, 0x74, 0x22, 0xdf, 0x01, 0x0a, 0x0b, 0x4d, 0x61, - 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x34, 0x0a, 0x07, 0x72, 0x65, 0x73, - 0x75, 0x6c, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x6d, 0x61, 0x70, - 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, - 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x12, - 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, - 0x34, 0x0a, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x18, 0x03, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x61, 0x6e, 0x64, - 0x73, 0x68, 0x61, 0x6b, 0x65, 0x48, 0x00, 0x52, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, - 0x6b, 0x65, 0x88, 0x01, 0x01, 0x1a, 0x46, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, - 0x12, 0x0a, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, - 0x65, 0x79, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x61, 0x67, - 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x42, 0x0c, 0x0a, - 0x0a, 0x5f, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x22, 0x25, 0x0a, 0x0d, 0x52, - 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, - 0x72, 0x65, 0x61, 0x64, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, - 0x64, 0x79, 0x32, 0x75, 0x0a, 0x03, 0x4d, 0x61, 0x70, 0x12, 0x34, 0x0a, 0x05, 0x4d, 0x61, 0x70, - 0x46, 0x6e, 0x12, 0x12, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, - 0x4d, 0x61, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, - 0x38, 0x0a, 0x07, 0x49, 0x73, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, - 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, - 0x74, 0x79, 0x1a, 0x15, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, - 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x37, 0x5a, 0x35, 0x67, 0x69, 0x74, - 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, - 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x66, 0x6c, 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, - 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6d, 0x61, 0x70, 0x2f, - 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, 0x38, 0x0a, 0x07, 0x49, + 0x73, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x15, + 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x37, 0x5a, 0x35, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, + 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2f, 0x6e, 0x75, 0x6d, + 0x61, 0x66, 0x6c, 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, + 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6d, 0x61, 0x70, 0x2f, 0x76, 0x31, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -476,35 +549,38 @@ func file_pkg_apis_proto_map_v1_map_proto_rawDescGZIP() []byte { return file_pkg_apis_proto_map_v1_map_proto_rawDescData } -var file_pkg_apis_proto_map_v1_map_proto_msgTypes = make([]protoimpl.MessageInfo, 7) +var file_pkg_apis_proto_map_v1_map_proto_msgTypes = make([]protoimpl.MessageInfo, 8) var file_pkg_apis_proto_map_v1_map_proto_goTypes = []any{ (*MapRequest)(nil), // 0: map.v1.MapRequest (*Handshake)(nil), // 1: map.v1.Handshake - (*MapResponse)(nil), // 2: map.v1.MapResponse - (*ReadyResponse)(nil), // 3: map.v1.ReadyResponse - (*MapRequest_Request)(nil), // 4: map.v1.MapRequest.Request - nil, // 5: map.v1.MapRequest.Request.HeadersEntry - (*MapResponse_Result)(nil), // 6: map.v1.MapResponse.Result - (*timestamppb.Timestamp)(nil), // 7: google.protobuf.Timestamp - (*emptypb.Empty)(nil), // 8: google.protobuf.Empty + (*Status)(nil), // 2: map.v1.Status + (*MapResponse)(nil), // 3: map.v1.MapResponse + (*ReadyResponse)(nil), // 4: map.v1.ReadyResponse + (*MapRequest_Request)(nil), // 5: map.v1.MapRequest.Request + nil, // 6: map.v1.MapRequest.Request.HeadersEntry + (*MapResponse_Result)(nil), // 7: map.v1.MapResponse.Result + (*timestamppb.Timestamp)(nil), // 8: google.protobuf.Timestamp + (*emptypb.Empty)(nil), // 9: google.protobuf.Empty } var file_pkg_apis_proto_map_v1_map_proto_depIdxs = []int32{ - 4, // 0: map.v1.MapRequest.request:type_name -> map.v1.MapRequest.Request - 1, // 1: map.v1.MapRequest.handshake:type_name -> map.v1.Handshake - 6, // 2: map.v1.MapResponse.results:type_name -> map.v1.MapResponse.Result - 1, // 3: map.v1.MapResponse.handshake:type_name -> map.v1.Handshake - 7, // 4: map.v1.MapRequest.Request.event_time:type_name -> google.protobuf.Timestamp - 7, // 5: map.v1.MapRequest.Request.watermark:type_name -> google.protobuf.Timestamp - 5, // 6: map.v1.MapRequest.Request.headers:type_name -> map.v1.MapRequest.Request.HeadersEntry - 0, // 7: map.v1.Map.MapFn:input_type -> map.v1.MapRequest - 8, // 8: map.v1.Map.IsReady:input_type -> google.protobuf.Empty - 2, // 9: map.v1.Map.MapFn:output_type -> map.v1.MapResponse - 3, // 10: map.v1.Map.IsReady:output_type -> map.v1.ReadyResponse - 9, // [9:11] is the sub-list for method output_type - 7, // [7:9] is the sub-list for method input_type - 7, // [7:7] is the sub-list for extension type_name - 7, // [7:7] is the sub-list for extension extendee - 0, // [0:7] is the sub-list for field type_name + 5, // 0: map.v1.MapRequest.request:type_name -> map.v1.MapRequest.Request + 1, // 1: map.v1.MapRequest.handshake:type_name -> map.v1.Handshake + 2, // 2: map.v1.MapRequest.status:type_name -> map.v1.Status + 7, // 3: map.v1.MapResponse.results:type_name -> map.v1.MapResponse.Result + 1, // 4: map.v1.MapResponse.handshake:type_name -> map.v1.Handshake + 2, // 5: map.v1.MapResponse.status:type_name -> map.v1.Status + 8, // 6: map.v1.MapRequest.Request.event_time:type_name -> google.protobuf.Timestamp + 8, // 7: map.v1.MapRequest.Request.watermark:type_name -> google.protobuf.Timestamp + 6, // 8: map.v1.MapRequest.Request.headers:type_name -> map.v1.MapRequest.Request.HeadersEntry + 0, // 9: map.v1.Map.MapFn:input_type -> map.v1.MapRequest + 9, // 10: map.v1.Map.IsReady:input_type -> google.protobuf.Empty + 3, // 11: map.v1.Map.MapFn:output_type -> map.v1.MapResponse + 4, // 12: map.v1.Map.IsReady:output_type -> map.v1.ReadyResponse + 11, // [11:13] is the sub-list for method output_type + 9, // [9:11] is the sub-list for method input_type + 9, // [9:9] is the sub-list for extension type_name + 9, // [9:9] is the sub-list for extension extendee + 0, // [0:9] is the sub-list for field type_name } func init() { file_pkg_apis_proto_map_v1_map_proto_init() } @@ -538,7 +614,7 @@ func file_pkg_apis_proto_map_v1_map_proto_init() { } } file_pkg_apis_proto_map_v1_map_proto_msgTypes[2].Exporter = func(v any, i int) any { - switch v := v.(*MapResponse); i { + switch v := v.(*Status); i { case 0: return &v.state case 1: @@ -550,7 +626,7 @@ func file_pkg_apis_proto_map_v1_map_proto_init() { } } file_pkg_apis_proto_map_v1_map_proto_msgTypes[3].Exporter = func(v any, i int) any { - switch v := v.(*ReadyResponse); i { + switch v := v.(*MapResponse); i { case 0: return &v.state case 1: @@ -562,6 +638,18 @@ func file_pkg_apis_proto_map_v1_map_proto_init() { } } file_pkg_apis_proto_map_v1_map_proto_msgTypes[4].Exporter = func(v any, i int) any { + switch v := v.(*ReadyResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_apis_proto_map_v1_map_proto_msgTypes[5].Exporter = func(v any, i int) any { switch v := v.(*MapRequest_Request); i { case 0: return &v.state @@ -573,7 +661,7 @@ func file_pkg_apis_proto_map_v1_map_proto_init() { return nil } } - file_pkg_apis_proto_map_v1_map_proto_msgTypes[6].Exporter = func(v any, i int) any { + file_pkg_apis_proto_map_v1_map_proto_msgTypes[7].Exporter = func(v any, i int) any { switch v := v.(*MapResponse_Result); i { case 0: return &v.state @@ -587,14 +675,14 @@ func file_pkg_apis_proto_map_v1_map_proto_init() { } } file_pkg_apis_proto_map_v1_map_proto_msgTypes[0].OneofWrappers = []any{} - file_pkg_apis_proto_map_v1_map_proto_msgTypes[2].OneofWrappers = []any{} + file_pkg_apis_proto_map_v1_map_proto_msgTypes[3].OneofWrappers = []any{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_pkg_apis_proto_map_v1_map_proto_rawDesc, NumEnums: 0, - NumMessages: 7, + NumMessages: 8, NumExtensions: 0, NumServices: 1, }, diff --git a/pkg/apis/proto/map/v1/map.proto b/pkg/apis/proto/map/v1/map.proto index 76f8788d..f34cc4e9 100644 --- a/pkg/apis/proto/map/v1/map.proto +++ b/pkg/apis/proto/map/v1/map.proto @@ -30,6 +30,7 @@ message MapRequest { // This ID is used to uniquely identify a map request string id = 2; optional Handshake handshake = 3; + optional Status status = 4; } /* @@ -40,6 +41,13 @@ message Handshake { bool sot = 1; } +/* + * Status message to indicate the status of the message. + */ +message Status { + bool eot = 1; +} + /** * MapResponse represents a response element. */ @@ -53,6 +61,7 @@ message MapResponse { // This ID is used to refer the responses to the request it corresponds to. string id = 2; optional Handshake handshake = 3; + optional Status status = 4; } /** diff --git a/pkg/batchmapper/examples/batchmap_flatmap/go.mod b/pkg/batchmapper/examples/batchmap_flatmap/go.mod index 91c38a70..fa00e50d 100644 --- a/pkg/batchmapper/examples/batchmap_flatmap/go.mod +++ b/pkg/batchmapper/examples/batchmap_flatmap/go.mod @@ -7,7 +7,6 @@ replace github.com/numaproj/numaflow-go => ../../../.. require github.com/numaproj/numaflow-go v0.8.1 require ( - go.uber.org/atomic v1.11.0 // indirect golang.org/x/net v0.29.0 // indirect golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.25.0 // indirect diff --git a/pkg/batchmapper/examples/batchmap_flatmap/go.sum b/pkg/batchmapper/examples/batchmap_flatmap/go.sum index 38feccd7..36997a49 100644 --- a/pkg/batchmapper/examples/batchmap_flatmap/go.sum +++ b/pkg/batchmapper/examples/batchmap_flatmap/go.sum @@ -6,8 +6,6 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= -go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= diff --git a/pkg/batchmapper/server.go b/pkg/batchmapper/server.go index c6fe51e4..701aff63 100644 --- a/pkg/batchmapper/server.go +++ b/pkg/batchmapper/server.go @@ -11,7 +11,7 @@ import ( "google.golang.org/grpc" "github.com/numaproj/numaflow-go/pkg" - batchmappb "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1" + mappb "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1" "github.com/numaproj/numaflow-go/pkg/info" "github.com/numaproj/numaflow-go/pkg/shared" ) @@ -67,7 +67,7 @@ func (m *server) Start(ctx context.Context) error { m.grpcServer = shared.CreateGRPCServer(m.opts.maxMessageSize) // register the batch map service - batchmappb.RegisterBatchMapServer(m.grpcServer, m.svc) + mappb.RegisterMapServer(m.grpcServer, m.svc) // start a go routine to stop the server gracefully when the context is done // or a shutdown signal is received from the service diff --git a/pkg/batchmapper/service.go b/pkg/batchmapper/service.go index 06b4a3cf..b81cc217 100644 --- a/pkg/batchmapper/service.go +++ b/pkg/batchmapper/service.go @@ -2,18 +2,16 @@ package batchmapper import ( "context" + "errors" "fmt" "io" "log" "runtime/debug" - "go.uber.org/atomic" "golang.org/x/sync/errgroup" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" - batchmappb "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1" + mappb "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1" ) const ( @@ -23,113 +21,162 @@ const ( serverInfoFilePath = "/var/run/numaflow/mapper-server-info" ) -// Service implements the proto gen server interface and contains the map operation -// handler. +// Service implements the proto gen server interface and contains the map operation handler. type Service struct { - batchmappb.UnimplementedBatchMapServer + mappb.UnimplementedMapServer BatchMapper BatchMapper shutdownCh chan<- struct{} } // IsReady returns true to indicate the gRPC connection is ready. -func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*batchmappb.ReadyResponse, error) { - return &batchmappb.ReadyResponse{Ready: true}, nil +func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*mappb.ReadyResponse, error) { + return &mappb.ReadyResponse{Ready: true}, nil } -// BatchMapFn applies a user defined function to a stream of request element and streams back the responses for them. -func (fs *Service) BatchMapFn(stream batchmappb.BatchMap_BatchMapFnServer) error { +// MapFn applies a user defined function to a stream of request elements and streams back the responses for them. +func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error { ctx := stream.Context() - var g errgroup.Group - - // totalRequests is a counter for keeping a track of the number of datum requests - // that were received on the stream. We use an atomic int as this needs to be synchronized - // between the request/response go routines. - totalRequests := atomic.NewInt32(0) - - // datumStreamCh is used to stream messages to the user code interface - // As the BatchMap interface expects a list of request elements - // we read all the requests coming on the stream and keep streaming them to the user code on this channel. - datumStreamCh := make(chan Datum) - - // go routine to invoke the user handler function, and process the responses. - g.Go(func() error { - // handle panic - defer func() { - if r := recover(); r != nil { - log.Printf("panic inside reduce handler: %v %v", r, string(debug.Stack())) - fs.shutdownCh <- struct{}{} - } - }() - // Apply the user BatchMap implementation function - responses := fs.BatchMapper.BatchMap(ctx, datumStreamCh) - - // If the number of responses received does not align with the request batch size, - // we will not be able to process the data correctly. - // This should be marked as an error and the container is restarted. - // As this is a user error, we restart the container to mitigate any transient error otherwise, this - // crash should indicate to the user that there is some issue. - if len(responses.Items()) != int(totalRequests.Load()) { - errMsg := fmt.Sprintf("batchMapFn: mismatch between length of batch requests and responses, "+ - "expected:%d, got:%d", int(totalRequests.Load()), len(responses.Items())) - log.Panic(errMsg) - } - // iterate over the responses received and covert to the required proto format - for _, batchResp := range responses.Items() { - var elements []*batchmappb.BatchMapResponse_Result - for _, resp := range batchResp.Items() { - elements = append(elements, &batchmappb.BatchMapResponse_Result{ - Keys: resp.Keys(), - Value: resp.Value(), - Tags: resp.Tags(), - }) - } - singleRequestResp := &batchmappb.BatchMapResponse{ - Results: elements, - Id: batchResp.Id(), - } - // We stream back the result for a single request ID - // this would contain all the responses for that request. - err := stream.Send(singleRequestResp) - if err != nil { - log.Println("BatchMapFn: Got an error while Send() on stream", err) - return err + // Perform handshake before entering the main loop + if err := fs.performHandshake(stream); err != nil { + return err + } + + for { + datumStreamCh := make(chan Datum) + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + return fs.receiveRequests(ctx, stream, datumStreamCh) + }) + + g.Go(func() error { + return fs.processData(ctx, stream, datumStreamCh) + }) + + // Wait for the goroutines to finish + if err := g.Wait(); err != nil { + if errors.Is(err, io.EOF) { + log.Printf("Stopping the BatchMapFn") + return nil } + log.Printf("Stopping the BatchMapFn with err, %s", err) + fs.shutdownCh <- struct{}{} + return err } - return nil - }) + } +} + +// performHandshake performs the handshake with the client. +func (fs *Service) performHandshake(stream mappb.Map_MapFnServer) error { + req, err := stream.Recv() + if err != nil { + log.Printf("error receiving handshake from stream: %v", err) + return err + } + + if req.Handshake == nil || !req.Handshake.Sot { + return fmt.Errorf("expected handshake message") + } + + handshakeResponse := &mappb.MapResponse{ + Handshake: &mappb.Handshake{ + Sot: true, + }, + } + if err := stream.Send(handshakeResponse); err != nil { + return err + } + + return nil +} + +// receiveRequests receives the requests from the client and writes them to the datumStreamCh channel. +func (fs *Service) receiveRequests(ctx context.Context, stream mappb.Map_MapFnServer, datumStreamCh chan<- Datum) error { + defer close(datumStreamCh) - // loop to keep reading messages from the stream and sending it to the datumStreamCh for { - d, err := stream.Recv() - // if we see EOF on the stream we do not have any more messages coming up + select { + case <-ctx.Done(): + return nil + default: + } + req, err := stream.Recv() if err == io.EOF { - // close the input data channel to indicate that no more messages expected - close(datumStreamCh) - break + log.Printf("end of batch map stream") + return err } if err != nil { - // close the input data channel to indicate that no more messages expected - close(datumStreamCh) - log.Println("BatchMapFn: Got an error while recv() on stream", err) + log.Printf("error receiving from batch map stream: %v", err) return err } - var hd = NewHandlerDatum(d.GetValue(), d.GetEventTime().AsTime(), d.GetWatermark().AsTime(), d.GetHeaders(), d.GetId(), d.GetKeys()) - // send the datum to the input channel - datumStreamCh <- hd - // Increase the counter for number of requests received - totalRequests.Inc() + + if req.Status != nil && req.Status.Eot { + break + } + + datum := &handlerDatum{ + id: req.GetId(), + value: req.GetRequest().GetValue(), + keys: req.GetRequest().GetKeys(), + eventTime: req.GetRequest().GetEventTime().AsTime(), + watermark: req.GetRequest().GetWatermark().AsTime(), + headers: req.GetRequest().GetHeaders(), + } + + datumStreamCh <- datum } + return nil +} - // wait for all the responses to be processed - err := g.Wait() - // if there was any error during processing return the error - if err != nil { - statusErr := status.Errorf(codes.Internal, err.Error()) - return statusErr +// processData invokes the batch mapper to process the data and sends the response back to the client. +func (fs *Service) processData(ctx context.Context, stream mappb.Map_MapFnServer, datumStreamCh chan Datum) (err error) { + defer func() { + if r := recover(); r != nil { + log.Printf("panic inside batch map handler: %v %v", r, string(debug.Stack())) + err = fmt.Errorf("panic inside batch map handler: %v", r) + } + }() + + responses := fs.BatchMapper.BatchMap(ctx, datumStreamCh) + + for _, batchResp := range responses.Items() { + var elements []*mappb.MapResponse_Result + for _, resp := range batchResp.Items() { + elements = append(elements, &mappb.MapResponse_Result{ + Keys: resp.Keys(), + Value: resp.Value(), + Tags: resp.Tags(), + }) + } + singleRequestResp := &mappb.MapResponse{ + Results: elements, + Id: batchResp.Id(), + } + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if err := stream.Send(singleRequestResp); err != nil { + log.Println("BatchMapFn: Got an error while Send() on stream", err) + return err + } } - // Once all responses are sent we can return, this would indicate the end of the rpc and - // send an EOF to the client on the stream + select { + case <-ctx.Done(): + return ctx.Err() + default: + // send the end of transmission message + eot := &mappb.MapResponse{ + Status: &mappb.Status{ + Eot: true, + }, + } + if err := stream.Send(eot); err != nil { + log.Println("BatchMapFn: Got an error while Send() on stream", err) + } + } return nil } diff --git a/pkg/batchmapper/service_test.go b/pkg/batchmapper/service_test.go index af387948..a0278e11 100644 --- a/pkg/batchmapper/service_test.go +++ b/pkg/batchmapper/service_test.go @@ -13,20 +13,20 @@ import ( "google.golang.org/grpc" "google.golang.org/protobuf/types/known/timestamppb" - batchmappb "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1" + mappb "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1" ) type BatchMapStreamFnServerTest struct { ctx context.Context - outputCh chan *batchmappb.BatchMapResponse - inputCh chan *batchmappb.BatchMapRequest + outputCh chan *mappb.MapResponse + inputCh chan *mappb.MapRequest grpc.ServerStream } func NewBatchBatchMapStreamFnServerTest( ctx context.Context, - inputCh chan *batchmappb.BatchMapRequest, - outputCh chan *batchmappb.BatchMapResponse, + inputCh chan *mappb.MapRequest, + outputCh chan *mappb.MapResponse, ) *BatchMapStreamFnServerTest { return &BatchMapStreamFnServerTest{ ctx: ctx, @@ -35,7 +35,7 @@ func NewBatchBatchMapStreamFnServerTest( } } -func (u *BatchMapStreamFnServerTest) Recv() (*batchmappb.BatchMapRequest, error) { +func (u *BatchMapStreamFnServerTest) Recv() (*mappb.MapRequest, error) { val, ok := <-u.inputCh if !ok { return val, io.EOF @@ -43,7 +43,7 @@ func (u *BatchMapStreamFnServerTest) Recv() (*batchmappb.BatchMapRequest, error) return val, nil } -func (u *BatchMapStreamFnServerTest) Send(d *batchmappb.BatchMapResponse) error { +func (u *BatchMapStreamFnServerTest) Send(d *mappb.MapResponse) error { u.outputCh <- d return nil } @@ -54,15 +54,15 @@ func (u *BatchMapStreamFnServerTest) Context() context.Context { type BatchMapFnServerErrTest struct { ctx context.Context - inputCh chan *batchmappb.BatchMapRequest - outputCh chan *batchmappb.BatchMapResponse + inputCh chan *mappb.MapRequest + outputCh chan *mappb.MapResponse grpc.ServerStream } func NewBatchMapFnServerErrTest( ctx context.Context, - inputCh chan *batchmappb.BatchMapRequest, - outputCh chan *batchmappb.BatchMapResponse, + inputCh chan *mappb.MapRequest, + outputCh chan *mappb.MapResponse, ) *BatchMapFnServerErrTest { return &BatchMapFnServerErrTest{ @@ -72,7 +72,7 @@ func NewBatchMapFnServerErrTest( } } -func (u *BatchMapFnServerErrTest) Recv() (*batchmappb.BatchMapRequest, error) { +func (u *BatchMapFnServerErrTest) Recv() (*mappb.MapRequest, error) { val, ok := <-u.inputCh if !ok { return val, io.EOF @@ -80,7 +80,7 @@ func (u *BatchMapFnServerErrTest) Recv() (*batchmappb.BatchMapRequest, error) { return val, nil } -func (u *BatchMapFnServerErrTest) Send(_ *batchmappb.BatchMapResponse) error { +func (u *BatchMapFnServerErrTest) Send(_ *mappb.MapResponse) error { return fmt.Errorf("send error") } @@ -92,8 +92,8 @@ func TestService_BatchMapFn(t *testing.T) { tests := []struct { name string handler BatchMapper - input []*batchmappb.BatchMapRequest - expected []*batchmappb.BatchMapResponse + input []*mappb.MapRequest + expected []*mappb.MapResponse expectedErr bool }{ { @@ -107,22 +107,45 @@ func TestService_BatchMapFn(t *testing.T) { } return batchResponses }), - input: []*batchmappb.BatchMapRequest{{ - Keys: []string{"client"}, - Value: []byte(`test1`), - EventTime: timestamppb.New(time.Time{}), - Watermark: timestamppb.New(time.Time{}), - Id: "test1", - }, { - Keys: []string{"client"}, - Value: []byte(`test2`), - EventTime: timestamppb.New(time.Time{}), - Watermark: timestamppb.New(time.Time{}), - Id: "test2", - }}, - expected: []*batchmappb.BatchMapResponse{ + input: []*mappb.MapRequest{ { - Results: []*batchmappb.BatchMapResponse_Result{ + Handshake: &mappb.Handshake{ + Sot: true, + }, + }, + { + Request: &mappb.MapRequest_Request{ + Keys: []string{"client"}, + Value: []byte(`test1`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Id: "test1", + }, + { + Request: &mappb.MapRequest_Request{ + Keys: []string{"client"}, + Value: []byte(`test2`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Id: "test2", + }, + { + Request: &mappb.MapRequest_Request{}, + Status: &mappb.Status{ + Eot: true, + }, + }, + }, + expected: []*mappb.MapResponse{ + { + Handshake: &mappb.Handshake{ + Sot: true, + }, + }, + { + Results: []*mappb.MapResponse_Result{ { Keys: []string{"client_test"}, Value: []byte(`test1`), @@ -131,7 +154,7 @@ func TestService_BatchMapFn(t *testing.T) { Id: "test1", }, { - Results: []*batchmappb.BatchMapResponse_Result{ + Results: []*mappb.MapResponse_Result{ { Keys: []string{"client_test"}, Value: []byte(`test2`), @@ -139,6 +162,11 @@ func TestService_BatchMapFn(t *testing.T) { }, Id: "test2", }, + { + Status: &mappb.Status{ + Eot: true, + }, + }, }, expectedErr: false, }, @@ -153,22 +181,39 @@ func TestService_BatchMapFn(t *testing.T) { } return batchResponses }), - input: []*batchmappb.BatchMapRequest{{ - Keys: []string{"client"}, - Value: []byte(`test1`), - EventTime: timestamppb.New(time.Time{}), - Watermark: timestamppb.New(time.Time{}), - Id: "test1", - }, { - Keys: []string{"client"}, - Value: []byte(`test2`), - EventTime: timestamppb.New(time.Time{}), - Watermark: timestamppb.New(time.Time{}), - Id: "test2", - }}, - expected: []*batchmappb.BatchMapResponse{ + input: []*mappb.MapRequest{ + { + Handshake: &mappb.Handshake{ + Sot: true, + }, + }, + { + Request: &mappb.MapRequest_Request{ + Keys: []string{"client"}, + Value: []byte(`test1`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Id: "test1", + }, + { + Request: &mappb.MapRequest_Request{ + Keys: []string{"client"}, + Value: []byte(`test2`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Id: "test2", + }, + }, + expected: []*mappb.MapResponse{ + { + Handshake: &mappb.Handshake{ + Sot: true, + }, + }, { - Results: []*batchmappb.BatchMapResponse_Result{ + Results: []*mappb.MapResponse_Result{ { Keys: []string{"client_test"}, Value: []byte(`test1`), @@ -177,7 +222,7 @@ func TestService_BatchMapFn(t *testing.T) { Id: "test1", }, { - Results: []*batchmappb.BatchMapResponse_Result{ + Results: []*mappb.MapResponse_Result{ { Keys: []string{"client_test"}, Value: []byte(`test2`), @@ -185,6 +230,11 @@ func TestService_BatchMapFn(t *testing.T) { }, Id: "test2", }, + { + Status: &mappb.Status{ + Eot: true, + }, + }, }, expectedErr: true, }, @@ -199,11 +249,11 @@ func TestService_BatchMapFn(t *testing.T) { // instead of the regular outgoing context in the real gRPC connection. ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() - inputCh := make(chan *batchmappb.BatchMapRequest) - outputCh := make(chan *batchmappb.BatchMapResponse) - result := make([]*batchmappb.BatchMapResponse, 0) + inputCh := make(chan *mappb.MapRequest, 3) + outputCh := make(chan *mappb.MapResponse) + result := make([]*mappb.MapResponse, 0) - var udfBatchMapFnStream batchmappb.BatchMap_BatchMapFnServer + var udfBatchMapFnStream mappb.Map_MapFnServer if tt.expectedErr { udfBatchMapFnStream = NewBatchMapFnServerErrTest(ctx, inputCh, outputCh) } else { @@ -216,7 +266,7 @@ func TestService_BatchMapFn(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - err = fs.BatchMapFn(udfBatchMapFnStream) + err = fs.MapFn(udfBatchMapFnStream) close(outputCh) }() @@ -243,7 +293,6 @@ func TestService_BatchMapFn(t *testing.T) { if !reflect.DeepEqual(result, tt.expected) { t.Errorf("BatchMapFn() got = %v, want %v", result, tt.expected) } - }) } } diff --git a/pkg/sinker/service.go b/pkg/sinker/service.go index 82cd59b8..a012afea 100644 --- a/pkg/sinker/service.go +++ b/pkg/sinker/service.go @@ -100,6 +100,7 @@ func (fs *Service) SinkFn(stream sinkpb.Sink_SinkFnServer) error { return nil } log.Printf("Stopping the SinkFn with err, %s", err) + fs.shutdownCh <- struct{}{} return err } } @@ -135,12 +136,6 @@ func (fs *Service) performHandshake(stream sinkpb.Sink_SinkFnServer) error { // receiveRequests receives the requests from the client writes them to the datumStreamCh channel. func (fs *Service) receiveRequests(stream sinkpb.Sink_SinkFnServer, datumStreamCh chan<- Datum) error { defer close(datumStreamCh) - defer func() { - if r := recover(); r != nil { - log.Printf("panic inside sink handler: %v %v", r, string(debug.Stack())) - fs.shutdownCh <- struct{}{} - } - }() for { req, err := stream.Recv() @@ -172,7 +167,13 @@ func (fs *Service) receiveRequests(stream sinkpb.Sink_SinkFnServer, datumStreamC } // processData invokes the sinker to process the data and sends the response back to the client. -func (fs *Service) processData(ctx context.Context, stream sinkpb.Sink_SinkFnServer, datumStreamCh chan Datum) error { +func (fs *Service) processData(ctx context.Context, stream sinkpb.Sink_SinkFnServer, datumStreamCh chan Datum) (err error) { + defer func() { + if r := recover(); r != nil { + log.Printf("panic inside sink handler: %v %v", r, string(debug.Stack())) + err = fmt.Errorf("panic inside sink handler: %v", r) + } + }() responses := fs.Sinker.Sink(ctx, datumStreamCh) for _, response := range responses { var status sinkpb.Status