From 24b54b99623d3bb0d4a67cd711e35448bcd1b71f Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Tue, 12 Sep 2023 15:19:45 -0400 Subject: [PATCH] fix: allow returning negative value from source pending api (#81) Signed-off-by: Keran Yang --- .github/workflows/ci.yaml | 27 +++++++++++++++++++ pkg/apis/proto/map/v1/map.pb.go | 2 +- pkg/apis/proto/map/v1/map_grpc.pb.go | 2 +- pkg/apis/proto/mapstream/v1/mapstream.pb.go | 2 +- .../proto/mapstream/v1/mapstream_grpc.pb.go | 2 +- pkg/apis/proto/reduce/v1/reduce.pb.go | 2 +- pkg/apis/proto/reduce/v1/reduce_grpc.pb.go | 2 +- pkg/apis/proto/sideinput/v1/sideinput.pb.go | 2 +- .../proto/sideinput/v1/sideinput_grpc.pb.go | 2 +- pkg/apis/proto/sink/v1/sink.pb.go | 2 +- pkg/apis/proto/sink/v1/sink_grpc.pb.go | 2 +- pkg/apis/proto/source/v1/source.pb.go | 9 ++++--- pkg/apis/proto/source/v1/source.proto | 3 ++- pkg/apis/proto/source/v1/source_grpc.pb.go | 2 +- .../proto/sourcetransform/v1/transform.pb.go | 2 +- .../sourcetransform/v1/transform_grpc.pb.go | 2 +- .../simple_source/impl/simple_source.go | 4 +-- pkg/sourcer/interface.go | 5 ++-- pkg/sourcer/options.go | 2 +- pkg/sourcer/server_test.go | 2 +- pkg/sourcer/service_test.go | 4 +-- 21 files changed, 56 insertions(+), 26 deletions(-) create mode 100644 .github/workflows/ci.yaml diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 00000000..aad64817 --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,27 @@ +name: Go + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +jobs: + build: + name: Unit Tests + runs-on: ubuntu-latest + steps: + + - name: Set up Go 1.x + uses: actions/setup-go@v2 + with: + go-version: ^1.17 + + - name: Check out code into the Go module directory + uses: actions/checkout@v2 + + - name: Get dependencies + run: go mod download + + - name: Test + run: make all diff --git a/pkg/apis/proto/map/v1/map.pb.go b/pkg/apis/proto/map/v1/map.pb.go index 03bc0967..6dcf613c 100644 --- a/pkg/apis/proto/map/v1/map.pb.go +++ b/pkg/apis/proto/map/v1/map.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.28.1 -// protoc v3.21.9 +// protoc v4.23.4 // source: pkg/apis/proto/map/v1/map.proto package v1 diff --git a/pkg/apis/proto/map/v1/map_grpc.pb.go b/pkg/apis/proto/map/v1/map_grpc.pb.go index 39b713bb..9026727d 100644 --- a/pkg/apis/proto/map/v1/map_grpc.pb.go +++ b/pkg/apis/proto/map/v1/map_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.2.0 -// - protoc v3.21.9 +// - protoc v4.23.4 // source: pkg/apis/proto/map/v1/map.proto package v1 diff --git a/pkg/apis/proto/mapstream/v1/mapstream.pb.go b/pkg/apis/proto/mapstream/v1/mapstream.pb.go index 889b7648..5681354a 100644 --- a/pkg/apis/proto/mapstream/v1/mapstream.pb.go +++ b/pkg/apis/proto/mapstream/v1/mapstream.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.28.1 -// protoc v3.21.9 +// protoc v4.23.4 // source: pkg/apis/proto/mapstream/v1/mapstream.proto package v1 diff --git a/pkg/apis/proto/mapstream/v1/mapstream_grpc.pb.go b/pkg/apis/proto/mapstream/v1/mapstream_grpc.pb.go index 1ad13424..ad88a113 100644 --- a/pkg/apis/proto/mapstream/v1/mapstream_grpc.pb.go +++ b/pkg/apis/proto/mapstream/v1/mapstream_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.2.0 -// - protoc v3.21.9 +// - protoc v4.23.4 // source: pkg/apis/proto/mapstream/v1/mapstream.proto package v1 diff --git a/pkg/apis/proto/reduce/v1/reduce.pb.go b/pkg/apis/proto/reduce/v1/reduce.pb.go index e1963f5d..e952a775 100644 --- a/pkg/apis/proto/reduce/v1/reduce.pb.go +++ b/pkg/apis/proto/reduce/v1/reduce.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.28.1 -// protoc v3.21.9 +// protoc v4.23.4 // source: pkg/apis/proto/reduce/v1/reduce.proto package v1 diff --git a/pkg/apis/proto/reduce/v1/reduce_grpc.pb.go b/pkg/apis/proto/reduce/v1/reduce_grpc.pb.go index e93cca9c..11919bac 100644 --- a/pkg/apis/proto/reduce/v1/reduce_grpc.pb.go +++ b/pkg/apis/proto/reduce/v1/reduce_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.2.0 -// - protoc v3.21.9 +// - protoc v4.23.4 // source: pkg/apis/proto/reduce/v1/reduce.proto package v1 diff --git a/pkg/apis/proto/sideinput/v1/sideinput.pb.go b/pkg/apis/proto/sideinput/v1/sideinput.pb.go index cb561dc0..7f8b7280 100644 --- a/pkg/apis/proto/sideinput/v1/sideinput.pb.go +++ b/pkg/apis/proto/sideinput/v1/sideinput.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.28.1 -// protoc v3.21.12 +// protoc v4.23.4 // source: pkg/apis/proto/sideinput/v1/sideinput.proto package v1 diff --git a/pkg/apis/proto/sideinput/v1/sideinput_grpc.pb.go b/pkg/apis/proto/sideinput/v1/sideinput_grpc.pb.go index 1359068c..9d38558e 100644 --- a/pkg/apis/proto/sideinput/v1/sideinput_grpc.pb.go +++ b/pkg/apis/proto/sideinput/v1/sideinput_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.2.0 -// - protoc v3.21.12 +// - protoc v4.23.4 // source: pkg/apis/proto/sideinput/v1/sideinput.proto package v1 diff --git a/pkg/apis/proto/sink/v1/sink.pb.go b/pkg/apis/proto/sink/v1/sink.pb.go index e746cedd..50658427 100644 --- a/pkg/apis/proto/sink/v1/sink.pb.go +++ b/pkg/apis/proto/sink/v1/sink.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.28.1 -// protoc v3.21.9 +// protoc v4.23.4 // source: pkg/apis/proto/sink/v1/sink.proto package v1 diff --git a/pkg/apis/proto/sink/v1/sink_grpc.pb.go b/pkg/apis/proto/sink/v1/sink_grpc.pb.go index eddd9ef8..9af7b4e4 100644 --- a/pkg/apis/proto/sink/v1/sink_grpc.pb.go +++ b/pkg/apis/proto/sink/v1/sink_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.2.0 -// - protoc v3.21.9 +// - protoc v4.23.4 // source: pkg/apis/proto/sink/v1/sink.proto package v1 diff --git a/pkg/apis/proto/source/v1/source.pb.go b/pkg/apis/proto/source/v1/source.pb.go index 0c3394a3..1c8dcba4 100644 --- a/pkg/apis/proto/source/v1/source.pb.go +++ b/pkg/apis/proto/source/v1/source.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.28.1 -// protoc v3.21.9 +// protoc v4.23.4 // source: pkg/apis/proto/source/v1/source.proto package v1 @@ -629,7 +629,8 @@ type PendingResponse_Result struct { unknownFields protoimpl.UnknownFields // Required field holding the number of pending records at the user defined source. - Count uint64 `protobuf:"varint,1,opt,name=count,proto3" json:"count,omitempty"` + // A negative count indicates that the pending information is not available. + Count int64 `protobuf:"varint,1,opt,name=count,proto3" json:"count,omitempty"` } func (x *PendingResponse_Result) Reset() { @@ -664,7 +665,7 @@ func (*PendingResponse_Result) Descriptor() ([]byte, []int) { return file_pkg_apis_proto_source_v1_source_proto_rawDescGZIP(), []int{5, 0} } -func (x *PendingResponse_Result) GetCount() uint64 { +func (x *PendingResponse_Result) GetCount() int64 { if x != nil { return x.Count } @@ -730,7 +731,7 @@ var file_pkg_apis_proto_source_v1_source_proto_rawDesc = []byte{ 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x1a, 0x1e, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x04, 0x52, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x43, 0x0a, 0x06, 0x4f, 0x66, 0x66, + 0x28, 0x03, 0x52, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x43, 0x0a, 0x06, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x21, 0x0a, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, diff --git a/pkg/apis/proto/source/v1/source.proto b/pkg/apis/proto/source/v1/source.proto index 617e8e65..ab21a5cb 100644 --- a/pkg/apis/proto/source/v1/source.proto +++ b/pkg/apis/proto/source/v1/source.proto @@ -113,7 +113,8 @@ message ReadyResponse { message PendingResponse { message Result { // Required field holding the number of pending records at the user defined source. - uint64 count = 1; + // A negative count indicates that the pending information is not available. + int64 count = 1; } // Required field holding the result. Result result = 1; diff --git a/pkg/apis/proto/source/v1/source_grpc.pb.go b/pkg/apis/proto/source/v1/source_grpc.pb.go index 1dce880a..79c769d6 100644 --- a/pkg/apis/proto/source/v1/source_grpc.pb.go +++ b/pkg/apis/proto/source/v1/source_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.2.0 -// - protoc v3.21.9 +// - protoc v4.23.4 // source: pkg/apis/proto/source/v1/source.proto package v1 diff --git a/pkg/apis/proto/sourcetransform/v1/transform.pb.go b/pkg/apis/proto/sourcetransform/v1/transform.pb.go index e38b6247..f20a8d88 100644 --- a/pkg/apis/proto/sourcetransform/v1/transform.pb.go +++ b/pkg/apis/proto/sourcetransform/v1/transform.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.28.1 -// protoc v3.21.9 +// protoc v4.23.4 // source: pkg/apis/proto/sourcetransform/v1/transform.proto package v1 diff --git a/pkg/apis/proto/sourcetransform/v1/transform_grpc.pb.go b/pkg/apis/proto/sourcetransform/v1/transform_grpc.pb.go index 2bd5561b..5aac6e78 100644 --- a/pkg/apis/proto/sourcetransform/v1/transform_grpc.pb.go +++ b/pkg/apis/proto/sourcetransform/v1/transform_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.2.0 -// - protoc v3.21.9 +// - protoc v4.23.4 // source: pkg/apis/proto/sourcetransform/v1/transform.proto package v1 diff --git a/pkg/sourcer/examples/simple_source/impl/simple_source.go b/pkg/sourcer/examples/simple_source/impl/simple_source.go index f1f0c155..fc16cb27 100644 --- a/pkg/sourcer/examples/simple_source/impl/simple_source.go +++ b/pkg/sourcer/examples/simple_source/impl/simple_source.go @@ -25,8 +25,8 @@ func NewSimpleSource() *SimpleSource { } // Pending returns the number of pending records. -func (s *SimpleSource) Pending(_ context.Context) uint64 { - // The simple source always returns 0 to indicate no pending records. +func (s *SimpleSource) Pending(_ context.Context) int64 { + // The simple source always returns zero to indicate no pending records. return 0 } diff --git a/pkg/sourcer/interface.go b/pkg/sourcer/interface.go index 159a86a0..fe58f8bf 100644 --- a/pkg/sourcer/interface.go +++ b/pkg/sourcer/interface.go @@ -5,7 +5,7 @@ import ( "time" ) -// Sourcer is the interface for implementation of source. +// Sourcer is the interface for implementation of the source. type Sourcer interface { // Read reads the data from the source and sends the data to the message channel. // Read should never attempt to close the message channel as the caller owns the channel. @@ -13,7 +13,8 @@ type Sourcer interface { // Ack acknowledges the data from the source. Ack(ctx context.Context, request AckRequest) // Pending returns the number of pending messages. - Pending(ctx context.Context) uint64 + // When the return value is negative, it indicates the pending information is not available. + Pending(ctx context.Context) int64 } // ReadRequest is the interface of read request. diff --git a/pkg/sourcer/options.go b/pkg/sourcer/options.go index 1385f705..43c66af7 100644 --- a/pkg/sourcer/options.go +++ b/pkg/sourcer/options.go @@ -28,7 +28,7 @@ func WithMaxMessageSize(size int) Option { } } -// WithSockAddr start the server with the given sock addr. This is mainly used for testing purpose. +// WithSockAddr start the server with the given sock addr. This is mainly used for testing purposes. func WithSockAddr(addr string) Option { return func(opts *options) { opts.sockAddr = addr diff --git a/pkg/sourcer/server_test.go b/pkg/sourcer/server_test.go index cd3e2ace..42067ea6 100644 --- a/pkg/sourcer/server_test.go +++ b/pkg/sourcer/server_test.go @@ -19,7 +19,7 @@ func (ts TestNoopSource) Ack(ctx context.Context, request AckRequest) { return } -func (ts TestNoopSource) Pending(ctx context.Context) uint64 { +func (ts TestNoopSource) Pending(ctx context.Context) int64 { return 0 } diff --git a/pkg/sourcer/service_test.go b/pkg/sourcer/service_test.go index cda52674..29f6bb2c 100644 --- a/pkg/sourcer/service_test.go +++ b/pkg/sourcer/service_test.go @@ -18,7 +18,7 @@ import ( var TestEventTime = time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC) var TestKey = "test-key" -var TestPendingNumber uint64 = 123 +var TestPendingNumber int64 = 123 type TestSource struct{} @@ -32,7 +32,7 @@ func (ts TestSource) Ack(_ context.Context, _ AckRequest) { return } -func (ts TestSource) Pending(_ context.Context) uint64 { +func (ts TestSource) Pending(_ context.Context) int64 { return TestPendingNumber }