diff --git a/README.md b/README.md index e5dcf113..bcb68ea1 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,8 @@ This SDK provides the interfaces to implement [Numaflow](https://github.com/numa Source Transformer, Functions, Sinks or SideInputs in Golang. - Implement [User Defined Sources](https://pkg.go.dev/github.com/numaproj/numaflow-go/pkg/sourcer) - Implement [User Defined Source Transformers](https://pkg.go.dev/github.com/numaproj/numaflow-go/pkg/sourcetransformer) -- Implement [User Defined Functions](https://pkg.go.dev/github.com/numaproj/numaflow-go/pkg/function) -- Implement [User Defined Sinks](https://pkg.go.dev/github.com/numaproj/numaflow-go/pkg/sink) +- Implement User Defined Functions + - [Map](https://pkg.go.dev/github.com/numaproj/numaflow-go/pkg/mapper) + - [Reduce](https://pkg.go.dev/github.com/numaproj/numaflow-go/pkg/reducer) +- Implement [User Defined Sinks](https://pkg.go.dev/github.com/numaproj/numaflow-go/pkg/sinker) - Implement [User Defined SideInputs](https://pkg.go.dev/github.com/numaproj/numaflow-go/pkg/sideinput) \ No newline at end of file diff --git a/pkg/mapper/options.go b/pkg/mapper/options.go index d1f3b483..45683be5 100644 --- a/pkg/mapper/options.go +++ b/pkg/mapper/options.go @@ -13,7 +13,7 @@ type options struct { // Option is the interface to apply options. type Option func(*options) -func DefaultOptions() *options { +func defaultOptions() *options { return &options{ sockAddr: address, maxMessageSize: defaultMaxMessageSize, diff --git a/pkg/mapper/server.go b/pkg/mapper/server.go index 5a584df7..60399b24 100644 --- a/pkg/mapper/server.go +++ b/pkg/mapper/server.go @@ -20,7 +20,7 @@ type server struct { // NewServer creates a new map server. func NewServer(m Mapper, inputOptions ...Option) numaflow.Server { - opts := DefaultOptions() + opts := defaultOptions() for _, inputOption := range inputOptions { inputOption(opts) } diff --git a/pkg/mapstreamer/options.go b/pkg/mapstreamer/options.go index 5eb1e609..2b5c9290 100644 --- a/pkg/mapstreamer/options.go +++ b/pkg/mapstreamer/options.go @@ -13,7 +13,7 @@ type options struct { // Option is the interface to apply options. type Option func(*options) -func DefaultOptions() *options { +func defaultOptions() *options { return &options{ sockAddr: address, maxMessageSize: defaultMaxMessageSize, diff --git a/pkg/mapstreamer/server.go b/pkg/mapstreamer/server.go index bb8b946c..f080feaf 100644 --- a/pkg/mapstreamer/server.go +++ b/pkg/mapstreamer/server.go @@ -19,7 +19,7 @@ type server struct { // NewServer creates a new map streaming server. func NewServer(ms MapStreamer, inputOptions ...Option) numaflow.Server { - opts := DefaultOptions() + opts := defaultOptions() for _, inputOption := range inputOptions { inputOption(opts) } diff --git a/pkg/reducer/options.go b/pkg/reducer/options.go index e0919a9d..e015cdb8 100644 --- a/pkg/reducer/options.go +++ b/pkg/reducer/options.go @@ -13,7 +13,7 @@ type options struct { // Option is the interface to apply options. type Option func(*options) -func DefaultOptions() *options { +func defaultOptions() *options { return &options{ sockAddr: address, maxMessageSize: defaultMaxMessageSize, diff --git a/pkg/reducer/server.go b/pkg/reducer/server.go index 494282e7..53989ebf 100644 --- a/pkg/reducer/server.go +++ b/pkg/reducer/server.go @@ -19,7 +19,7 @@ type server struct { // NewServer creates a new reduce server. func NewServer(r ReducerCreator, inputOptions ...Option) numaflow.Server { - opts := DefaultOptions() + opts := defaultOptions() for _, inputOption := range inputOptions { inputOption(opts) } diff --git a/pkg/reducestreamer/options.go b/pkg/reducestreamer/options.go index 7862fb03..3cabe0df 100644 --- a/pkg/reducestreamer/options.go +++ b/pkg/reducestreamer/options.go @@ -13,7 +13,7 @@ type options struct { // Option is the interface to apply options. type Option func(*options) -func DefaultOptions() *options { +func defaultOptions() *options { return &options{ sockAddr: address, maxMessageSize: defaultMaxMessageSize, diff --git a/pkg/reducestreamer/server.go b/pkg/reducestreamer/server.go index 1e180637..b24eda57 100644 --- a/pkg/reducestreamer/server.go +++ b/pkg/reducestreamer/server.go @@ -19,7 +19,7 @@ type server struct { // NewServer creates a new reduceStream server. func NewServer(r ReduceStreamerCreator, inputOptions ...Option) numaflow.Server { - opts := DefaultOptions() + opts := defaultOptions() for _, inputOption := range inputOptions { inputOption(opts) } diff --git a/pkg/sessionreducer/options.go b/pkg/sessionreducer/options.go index 6e5da5b7..ba7af1e3 100644 --- a/pkg/sessionreducer/options.go +++ b/pkg/sessionreducer/options.go @@ -13,7 +13,7 @@ type options struct { // Option is the interface to apply options. type Option func(*options) -func DefaultOptions() *options { +func defaultOptions() *options { return &options{ sockAddr: address, maxMessageSize: defaultMaxMessageSize, diff --git a/pkg/sessionreducer/server.go b/pkg/sessionreducer/server.go index 5b630bf3..7bfaf664 100644 --- a/pkg/sessionreducer/server.go +++ b/pkg/sessionreducer/server.go @@ -19,7 +19,7 @@ type server struct { // NewServer creates a new session reduce server. func NewServer(r SessionReducerCreator, inputOptions ...Option) numaflow.Server { - opts := DefaultOptions() + opts := defaultOptions() for _, inputOption := range inputOptions { inputOption(opts) } diff --git a/pkg/sideinput/options.go b/pkg/sideinput/options.go index 6703a90f..761ef737 100644 --- a/pkg/sideinput/options.go +++ b/pkg/sideinput/options.go @@ -9,8 +9,8 @@ type options struct { // Option is the interface to apply options. type Option func(*options) -// DefaultOptions returns the default options. -func DefaultOptions() *options { +// defaultOptions returns the default options. +func defaultOptions() *options { return &options{ sockAddr: address, maxMessageSize: defaultMaxMessageSize, diff --git a/pkg/sideinput/server.go b/pkg/sideinput/server.go index a19a59c9..37cf7d02 100644 --- a/pkg/sideinput/server.go +++ b/pkg/sideinput/server.go @@ -20,7 +20,7 @@ type server struct { // NewSideInputServer creates a new server object. func NewSideInputServer(r SideInputRetriever, inputOptions ...Option) numaflow.Server { - opts := DefaultOptions() + opts := defaultOptions() for _, inputOption := range inputOptions { inputOption(opts) } diff --git a/pkg/sinker/options.go b/pkg/sinker/options.go index 2001e930..b9974034 100644 --- a/pkg/sinker/options.go +++ b/pkg/sinker/options.go @@ -13,7 +13,7 @@ type options struct { // Option is the interface to apply options. type Option func(*options) -func DefaultOptions() *options { +func defaultOptions() *options { return &options{ sockAddr: address, maxMessageSize: defaultMaxMessageSize, diff --git a/pkg/sinker/server.go b/pkg/sinker/server.go index adcc7ce3..4833961a 100644 --- a/pkg/sinker/server.go +++ b/pkg/sinker/server.go @@ -20,7 +20,7 @@ type sinkServer struct { // NewServer creates a new sinkServer object. func NewServer(h Sinker, inputOptions ...Option) numaflow.Server { - opts := DefaultOptions() + opts := defaultOptions() for _, inputOption := range inputOptions { inputOption(opts) } diff --git a/pkg/sourcer/options.go b/pkg/sourcer/options.go index 43c66af7..a665ee25 100644 --- a/pkg/sourcer/options.go +++ b/pkg/sourcer/options.go @@ -13,7 +13,7 @@ type options struct { // Option is the interface to apply options. type Option func(*options) -func DefaultOptions() *options { +func defaultOptions() *options { return &options{ sockAddr: address, maxMessageSize: defaultMaxMessageSize, diff --git a/pkg/sourcer/server.go b/pkg/sourcer/server.go index b52f5310..58bd01b8 100644 --- a/pkg/sourcer/server.go +++ b/pkg/sourcer/server.go @@ -21,7 +21,7 @@ type server struct { func NewServer( source Sourcer, inputOptions ...Option) numaflow.Server { - var opts = DefaultOptions() + var opts = defaultOptions() for _, inputOption := range inputOptions { inputOption(opts) diff --git a/pkg/sourcetransformer/options.go b/pkg/sourcetransformer/options.go index f98147a5..8cbc2fb4 100644 --- a/pkg/sourcetransformer/options.go +++ b/pkg/sourcetransformer/options.go @@ -13,7 +13,7 @@ type options struct { // Option is the interface to apply options. type Option func(*options) -func DefaultOptions() *options { +func defaultOptions() *options { return &options{ sockAddr: address, maxMessageSize: defaultMaxMessageSize, diff --git a/pkg/sourcetransformer/server.go b/pkg/sourcetransformer/server.go index 5d6dbeb2..f4bf8775 100644 --- a/pkg/sourcetransformer/server.go +++ b/pkg/sourcetransformer/server.go @@ -19,7 +19,7 @@ type server struct { // NewServer creates a new SourceTransformer server. func NewServer(m SourceTransformer, inputOptions ...Option) numaflow.Server { - opts := DefaultOptions() + opts := defaultOptions() for _, inputOption := range inputOptions { inputOption(opts) }