Skip to content

Commit

Permalink
fix: allow returning negative value from source pending api (#81)
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang authored Sep 12, 2023
1 parent 2728d6f commit 24b54b9
Show file tree
Hide file tree
Showing 21 changed files with 56 additions and 26 deletions.
27 changes: 27 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: Go

on:
push:
branches: [ main ]
pull_request:
branches: [ main ]

jobs:
build:
name: Unit Tests
runs-on: ubuntu-latest
steps:

- name: Set up Go 1.x
uses: actions/setup-go@v2
with:
go-version: ^1.17

- name: Check out code into the Go module directory
uses: actions/checkout@v2

- name: Get dependencies
run: go mod download

- name: Test
run: make all
2 changes: 1 addition & 1 deletion pkg/apis/proto/map/v1/map.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/map/v1/map_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/mapstream/v1/mapstream.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/mapstream/v1/mapstream_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/reduce/v1/reduce.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/reduce/v1/reduce_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/sideinput/v1/sideinput.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/sideinput/v1/sideinput_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/sink/v1/sink.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/sink/v1/sink_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions pkg/apis/proto/source/v1/source.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pkg/apis/proto/source/v1/source.proto
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ message ReadyResponse {
message PendingResponse {
message Result {
// Required field holding the number of pending records at the user defined source.
uint64 count = 1;
// A negative count indicates that the pending information is not available.
int64 count = 1;
}
// Required field holding the result.
Result result = 1;
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/proto/source/v1/source_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/sourcetransform/v1/transform.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/sourcetransform/v1/transform_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/sourcer/examples/simple_source/impl/simple_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ func NewSimpleSource() *SimpleSource {
}

// Pending returns the number of pending records.
func (s *SimpleSource) Pending(_ context.Context) uint64 {
// The simple source always returns 0 to indicate no pending records.
func (s *SimpleSource) Pending(_ context.Context) int64 {
// The simple source always returns zero to indicate no pending records.
return 0
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/sourcer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ import (
"time"
)

// Sourcer is the interface for implementation of source.
// Sourcer is the interface for implementation of the source.
type Sourcer interface {
// Read reads the data from the source and sends the data to the message channel.
// Read should never attempt to close the message channel as the caller owns the channel.
Read(ctx context.Context, readRequest ReadRequest, messageCh chan<- Message)
// Ack acknowledges the data from the source.
Ack(ctx context.Context, request AckRequest)
// Pending returns the number of pending messages.
Pending(ctx context.Context) uint64
// When the return value is negative, it indicates the pending information is not available.
Pending(ctx context.Context) int64
}

// ReadRequest is the interface of read request.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sourcer/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func WithMaxMessageSize(size int) Option {
}
}

// WithSockAddr start the server with the given sock addr. This is mainly used for testing purpose.
// WithSockAddr start the server with the given sock addr. This is mainly used for testing purposes.
func WithSockAddr(addr string) Option {
return func(opts *options) {
opts.sockAddr = addr
Expand Down
2 changes: 1 addition & 1 deletion pkg/sourcer/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func (ts TestNoopSource) Ack(ctx context.Context, request AckRequest) {
return
}

func (ts TestNoopSource) Pending(ctx context.Context) uint64 {
func (ts TestNoopSource) Pending(ctx context.Context) int64 {
return 0
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sourcer/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

var TestEventTime = time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC)
var TestKey = "test-key"
var TestPendingNumber uint64 = 123
var TestPendingNumber int64 = 123

type TestSource struct{}

Expand All @@ -32,7 +32,7 @@ func (ts TestSource) Ack(_ context.Context, _ AckRequest) {
return
}

func (ts TestSource) Pending(_ context.Context) uint64 {
func (ts TestSource) Pending(_ context.Context) int64 {
return TestPendingNumber
}

Expand Down

0 comments on commit 24b54b9

Please sign in to comment.