Skip to content

Commit

Permalink
chore: write server info for sideinput sdk (#97)
Browse files Browse the repository at this point in the history
Signed-off-by: a3hadi <[email protected]>
  • Loading branch information
ayildirim21 authored Jan 29, 2024
1 parent 5424b35 commit 08210e9
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 9 deletions.
2 changes: 0 additions & 2 deletions pkg/shared/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ const (

func PrepareServer(sockAddr string, infoFilePath string) (net.Listener, error) {
// If infoFilePath is not empty, write the server info to the file.
// For Side input we don't write data to the info server, hence will pass path as empty here.
// Could be used later on for similar cases
if infoFilePath != "" {
serverInfo := &info.ServerInfo{Protocol: info.UDS, Language: info.Go, Version: info.GetSDKVersion()}
if err := info.Write(serverInfo, info.WithServerInfoFilePath(infoFilePath)); err != nil {
Expand Down
19 changes: 15 additions & 4 deletions pkg/sideinput/options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package sideinput

import "github.com/numaproj/numaflow-go/pkg/info"

// options is the struct to hold the server options.
type options struct {
sockAddr string
maxMessageSize int
sockAddr string
maxMessageSize int
serverInfoFilePath string
}

// Option is the interface to apply options.
Expand All @@ -12,8 +15,9 @@ type Option func(*options)
// defaultOptions returns the default options.
func defaultOptions() *options {
return &options{
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: info.ServerInfoFilePath,
}
}

Expand All @@ -30,3 +34,10 @@ func WithSockAddr(addr string) Option {
opts.sockAddr = addr
}
}

// WithServerInfoFilePath sets the server info file path to the given path.
func WithServerInfoFilePath(f string) Option {
return func(opts *options) {
opts.serverInfoFilePath = f
}
}
3 changes: 1 addition & 2 deletions pkg/sideinput/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ func (s *server) Start(ctx context.Context) error {
defer stop()

// start listening on unix domain socket
// For Side input we don't write data to the info server, hence will pass path as empty here.
lis, err := shared.PrepareServer(s.opts.sockAddr, "")
lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath)
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err)
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/sideinput/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@ func TestServer_Start(t *testing.T) {
_ = os.RemoveAll(socketFile.Name())
}()

serverInfoFile, _ := os.CreateTemp("/tmp", "numaflow-test-info")
defer func() {
_ = os.RemoveAll(serverInfoFile.Name())
}()

var retrieveHandler = RetrieveFunc(func(ctx context.Context) Message {
return BroadcastMessage([]byte("test"))
})
// note: using actual uds connection
ctx, cancel := context.WithTimeout(context.Background(), 6*time.Second)
defer cancel()
err := NewSideInputServer(retrieveHandler, WithSockAddr(socketFile.Name())).Start(ctx)
err := NewSideInputServer(retrieveHandler, WithSockAddr(socketFile.Name()), WithServerInfoFilePath(serverInfoFile.Name())).Start(ctx)
assert.NoError(t, err)
}

0 comments on commit 08210e9

Please sign in to comment.