From f0a6686e771403accbb0627e3fa5208a4e26fc3c Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Mon, 2 Oct 2023 18:51:48 -0400 Subject: [PATCH] chore: update the description for Pending() interface (#84) Signed-off-by: Keran Yang --- pkg/sourcer/examples/simple_source/Makefile | 2 +- pkg/sourcer/examples/simple_source/go.mod | 4 ++-- pkg/sourcer/examples/simple_source/go.sum | 4 ++-- pkg/sourcer/examples/simple_source/impl/simple_source.go | 7 +------ pkg/sourcer/examples/simple_source/main.go | 1 + pkg/sourcer/interface.go | 3 +++ 6 files changed, 10 insertions(+), 11 deletions(-) diff --git a/pkg/sourcer/examples/simple_source/Makefile b/pkg/sourcer/examples/simple_source/Makefile index 9bd60751..cbe79fd4 100644 --- a/pkg/sourcer/examples/simple_source/Makefile +++ b/pkg/sourcer/examples/simple_source/Makefile @@ -4,7 +4,7 @@ build: .PHONY: image image: build - docker build -t "quay.io/numaio/numaflow-go/source-simple-source:v0.5.0" --target simple-source . + docker build -t "quay.io/numaio/numaflow-go/source-simple-source:v0.5.1" --target simple-source . clean: -rm -rf ./dist diff --git a/pkg/sourcer/examples/simple_source/go.mod b/pkg/sourcer/examples/simple_source/go.mod index ccac323c..a0550a49 100644 --- a/pkg/sourcer/examples/simple_source/go.mod +++ b/pkg/sourcer/examples/simple_source/go.mod @@ -1,9 +1,9 @@ -module even_odd +module github.com/numaproj/numaflow-go/pkg/sourcer/examples/simple_source go 1.20 require ( - github.com/numaproj/numaflow-go v0.4.6-0.20230828035951-6f79b632ecfe + github.com/numaproj/numaflow-go v0.5.1 github.com/stretchr/testify v1.8.1 ) diff --git a/pkg/sourcer/examples/simple_source/go.sum b/pkg/sourcer/examples/simple_source/go.sum index 170264f6..79cbb35d 100644 --- a/pkg/sourcer/examples/simple_source/go.sum +++ b/pkg/sourcer/examples/simple_source/go.sum @@ -10,8 +10,8 @@ github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/numaproj/numaflow-go v0.4.6-0.20230828035951-6f79b632ecfe h1:nK/BGffgwQ4L9pyllwzSZttPxMf+OOqK3DOP97KZdRk= -github.com/numaproj/numaflow-go v0.4.6-0.20230828035951-6f79b632ecfe/go.mod h1:zcJq1YAA/jnxCQLW7EFK4+HXWCd2QtW4tMOvRjHFa2g= +github.com/numaproj/numaflow-go v0.5.1 h1:mvala+EmlrRtI20cr1y928zR7dO/HKUJsLai7vISHEA= +github.com/numaproj/numaflow-go v0.5.1/go.mod h1:5zwvvREIbqaCPCKsNE1MVjVToD0kvkCh2Z90Izlhw5U= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= diff --git a/pkg/sourcer/examples/simple_source/impl/simple_source.go b/pkg/sourcer/examples/simple_source/impl/simple_source.go index fc16cb27..a7ce0c75 100644 --- a/pkg/sourcer/examples/simple_source/impl/simple_source.go +++ b/pkg/sourcer/examples/simple_source/impl/simple_source.go @@ -24,15 +24,11 @@ func NewSimpleSource() *SimpleSource { } } -// Pending returns the number of pending records. func (s *SimpleSource) Pending(_ context.Context) int64 { - // The simple source always returns zero to indicate no pending records. + // The simple source always returns zero to indicate there is no pending record. return 0 } -// Read reads messages from the source and sends the messages to the message channel. -// If the read request is timed out, the function returns without reading new data. -// Right after reading a message, the function marks the offset as to be acked. func (s *SimpleSource) Read(_ context.Context, readRequest sourcesdk.ReadRequest, messageCh chan<- sourcesdk.Message) { // Handle the timeout specification in the read request. ctx, cancel := context.WithTimeout(context.Background(), readRequest.TimeOut()) @@ -70,7 +66,6 @@ func (s *SimpleSource) Read(_ context.Context, readRequest sourcesdk.ReadRequest } } -// Ack acknowledges the data from the source. func (s *SimpleSource) Ack(_ context.Context, request sourcesdk.AckRequest) { for _, offset := range request.Offsets() { delete(s.toAckSet, deserializeOffset(offset.Value())) diff --git a/pkg/sourcer/examples/simple_source/main.go b/pkg/sourcer/examples/simple_source/main.go index b2a7a3cb..77bcf20e 100644 --- a/pkg/sourcer/examples/simple_source/main.go +++ b/pkg/sourcer/examples/simple_source/main.go @@ -5,6 +5,7 @@ import ( "log" "github.com/numaproj/numaflow-go/pkg/sourcer" + "github.com/numaproj/numaflow-go/pkg/sourcer/examples/simple_source/impl" ) diff --git a/pkg/sourcer/interface.go b/pkg/sourcer/interface.go index fe58f8bf..9e7a8540 100644 --- a/pkg/sourcer/interface.go +++ b/pkg/sourcer/interface.go @@ -8,12 +8,15 @@ import ( // Sourcer is the interface for implementation of the source. type Sourcer interface { // Read reads the data from the source and sends the data to the message channel. + // If the read request is timed out, the function returns without reading new data. + // Right after reading a message, the function marks the offset as to be acked. // Read should never attempt to close the message channel as the caller owns the channel. Read(ctx context.Context, readRequest ReadRequest, messageCh chan<- Message) // Ack acknowledges the data from the source. Ack(ctx context.Context, request AckRequest) // Pending returns the number of pending messages. // When the return value is negative, it indicates the pending information is not available. + // With pending information being not available, the Numaflow platform doesn't auto-scale the source. Pending(ctx context.Context) int64 }