From 08210e956ad537379a4337ea1bb36bda017b552d Mon Sep 17 00:00:00 2001 From: Abdullah Hadi Date: Mon, 29 Jan 2024 11:43:28 -0500 Subject: [PATCH] chore: write server info for sideinput sdk (#97) Signed-off-by: a3hadi --- pkg/shared/util.go | 2 -- pkg/sideinput/options.go | 19 +++++++++++++++---- pkg/sideinput/server.go | 3 +-- pkg/sideinput/server_test.go | 7 ++++++- 4 files changed, 22 insertions(+), 9 deletions(-) diff --git a/pkg/shared/util.go b/pkg/shared/util.go index 1d8ce6a9..f7747971 100644 --- a/pkg/shared/util.go +++ b/pkg/shared/util.go @@ -18,8 +18,6 @@ const ( func PrepareServer(sockAddr string, infoFilePath string) (net.Listener, error) { // If infoFilePath is not empty, write the server info to the file. - // For Side input we don't write data to the info server, hence will pass path as empty here. - // Could be used later on for similar cases if infoFilePath != "" { serverInfo := &info.ServerInfo{Protocol: info.UDS, Language: info.Go, Version: info.GetSDKVersion()} if err := info.Write(serverInfo, info.WithServerInfoFilePath(infoFilePath)); err != nil { diff --git a/pkg/sideinput/options.go b/pkg/sideinput/options.go index 761ef737..5728d801 100644 --- a/pkg/sideinput/options.go +++ b/pkg/sideinput/options.go @@ -1,9 +1,12 @@ package sideinput +import "github.com/numaproj/numaflow-go/pkg/info" + // options is the struct to hold the server options. type options struct { - sockAddr string - maxMessageSize int + sockAddr string + maxMessageSize int + serverInfoFilePath string } // Option is the interface to apply options. @@ -12,8 +15,9 @@ type Option func(*options) // defaultOptions returns the default options. func defaultOptions() *options { return &options{ - sockAddr: address, - maxMessageSize: defaultMaxMessageSize, + sockAddr: address, + maxMessageSize: defaultMaxMessageSize, + serverInfoFilePath: info.ServerInfoFilePath, } } @@ -30,3 +34,10 @@ func WithSockAddr(addr string) Option { opts.sockAddr = addr } } + +// WithServerInfoFilePath sets the server info file path to the given path. +func WithServerInfoFilePath(f string) Option { + return func(opts *options) { + opts.serverInfoFilePath = f + } +} diff --git a/pkg/sideinput/server.go b/pkg/sideinput/server.go index 37cf7d02..f1ad76c1 100644 --- a/pkg/sideinput/server.go +++ b/pkg/sideinput/server.go @@ -37,8 +37,7 @@ func (s *server) Start(ctx context.Context) error { defer stop() // start listening on unix domain socket - // For Side input we don't write data to the info server, hence will pass path as empty here. - lis, err := shared.PrepareServer(s.opts.sockAddr, "") + lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath) if err != nil { return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err) } diff --git a/pkg/sideinput/server_test.go b/pkg/sideinput/server_test.go index f220e804..aecad514 100644 --- a/pkg/sideinput/server_test.go +++ b/pkg/sideinput/server_test.go @@ -16,12 +16,17 @@ func TestServer_Start(t *testing.T) { _ = os.RemoveAll(socketFile.Name()) }() + serverInfoFile, _ := os.CreateTemp("/tmp", "numaflow-test-info") + defer func() { + _ = os.RemoveAll(serverInfoFile.Name()) + }() + var retrieveHandler = RetrieveFunc(func(ctx context.Context) Message { return BroadcastMessage([]byte("test")) }) // note: using actual uds connection ctx, cancel := context.WithTimeout(context.Background(), 6*time.Second) defer cancel() - err := NewSideInputServer(retrieveHandler, WithSockAddr(socketFile.Name())).Start(ctx) + err := NewSideInputServer(retrieveHandler, WithSockAddr(socketFile.Name()), WithServerInfoFilePath(serverInfoFile.Name())).Start(ctx) assert.NoError(t, err) }