Skip to content

Commit

Permalink
chore: update the description for Pending() interface (#84)
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang authored Oct 2, 2023
1 parent caaec42 commit f0a6686
Show file tree
Hide file tree
Showing 6 changed files with 10 additions and 11 deletions.
2 changes: 1 addition & 1 deletion pkg/sourcer/examples/simple_source/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build:

.PHONY: image
image: build
docker build -t "quay.io/numaio/numaflow-go/source-simple-source:v0.5.0" --target simple-source .
docker build -t "quay.io/numaio/numaflow-go/source-simple-source:v0.5.1" --target simple-source .

clean:
-rm -rf ./dist
4 changes: 2 additions & 2 deletions pkg/sourcer/examples/simple_source/go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
module even_odd
module github.com/numaproj/numaflow-go/pkg/sourcer/examples/simple_source

go 1.20

require (
github.com/numaproj/numaflow-go v0.4.6-0.20230828035951-6f79b632ecfe
github.com/numaproj/numaflow-go v0.5.1
github.com/stretchr/testify v1.8.1
)

Expand Down
4 changes: 2 additions & 2 deletions pkg/sourcer/examples/simple_source/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/numaproj/numaflow-go v0.4.6-0.20230828035951-6f79b632ecfe h1:nK/BGffgwQ4L9pyllwzSZttPxMf+OOqK3DOP97KZdRk=
github.com/numaproj/numaflow-go v0.4.6-0.20230828035951-6f79b632ecfe/go.mod h1:zcJq1YAA/jnxCQLW7EFK4+HXWCd2QtW4tMOvRjHFa2g=
github.com/numaproj/numaflow-go v0.5.1 h1:mvala+EmlrRtI20cr1y928zR7dO/HKUJsLai7vISHEA=
github.com/numaproj/numaflow-go v0.5.1/go.mod h1:5zwvvREIbqaCPCKsNE1MVjVToD0kvkCh2Z90Izlhw5U=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
Expand Down
7 changes: 1 addition & 6 deletions pkg/sourcer/examples/simple_source/impl/simple_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,11 @@ func NewSimpleSource() *SimpleSource {
}
}

// Pending returns the number of pending records.
func (s *SimpleSource) Pending(_ context.Context) int64 {
// The simple source always returns zero to indicate no pending records.
// The simple source always returns zero to indicate there is no pending record.
return 0
}

// Read reads messages from the source and sends the messages to the message channel.
// If the read request is timed out, the function returns without reading new data.
// Right after reading a message, the function marks the offset as to be acked.
func (s *SimpleSource) Read(_ context.Context, readRequest sourcesdk.ReadRequest, messageCh chan<- sourcesdk.Message) {
// Handle the timeout specification in the read request.
ctx, cancel := context.WithTimeout(context.Background(), readRequest.TimeOut())
Expand Down Expand Up @@ -70,7 +66,6 @@ func (s *SimpleSource) Read(_ context.Context, readRequest sourcesdk.ReadRequest
}
}

// Ack acknowledges the data from the source.
func (s *SimpleSource) Ack(_ context.Context, request sourcesdk.AckRequest) {
for _, offset := range request.Offsets() {
delete(s.toAckSet, deserializeOffset(offset.Value()))
Expand Down
1 change: 1 addition & 0 deletions pkg/sourcer/examples/simple_source/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"log"

"github.com/numaproj/numaflow-go/pkg/sourcer"

"github.com/numaproj/numaflow-go/pkg/sourcer/examples/simple_source/impl"
)

Expand Down
3 changes: 3 additions & 0 deletions pkg/sourcer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ import (
// Sourcer is the interface for implementation of the source.
type Sourcer interface {
// Read reads the data from the source and sends the data to the message channel.
// If the read request is timed out, the function returns without reading new data.
// Right after reading a message, the function marks the offset as to be acked.
// Read should never attempt to close the message channel as the caller owns the channel.
Read(ctx context.Context, readRequest ReadRequest, messageCh chan<- Message)
// Ack acknowledges the data from the source.
Ack(ctx context.Context, request AckRequest)
// Pending returns the number of pending messages.
// When the return value is negative, it indicates the pending information is not available.
// With pending information being not available, the Numaflow platform doesn't auto-scale the source.
Pending(ctx context.Context) int64
}

Expand Down

0 comments on commit f0a6686

Please sign in to comment.