From 715955f1e069735e7fc185999572f427bb72fa74 Mon Sep 17 00:00:00 2001 From: Abdullah Hadi Date: Mon, 12 Feb 2024 15:25:12 -0500 Subject: [PATCH] chore: create new server info file paths for all services (#98) Signed-off-by: a3hadi --- pkg/info/options.go | 4 +--- pkg/info/types.go | 4 ---- pkg/mapper/options.go | 6 +----- pkg/mapper/service.go | 1 + pkg/mapstreamer/options.go | 6 +----- pkg/mapstreamer/service.go | 1 + pkg/reducer/options.go | 6 +----- pkg/reducer/service.go | 1 + pkg/reducestreamer/options.go | 6 +----- pkg/reducestreamer/service.go | 1 + pkg/sessionreducer/options.go | 6 +----- pkg/sessionreducer/service.go | 1 + pkg/sideinput/options.go | 4 +--- pkg/sideinput/service.go | 1 + pkg/sinker/options.go | 6 +----- pkg/sinker/service.go | 1 + pkg/sourcer/options.go | 6 +----- pkg/sourcer/service.go | 1 + pkg/sourcetransformer/options.go | 6 +----- pkg/sourcetransformer/service.go | 1 + 20 files changed, 19 insertions(+), 50 deletions(-) diff --git a/pkg/info/options.go b/pkg/info/options.go index 58fd56c3..ce708184 100644 --- a/pkg/info/options.go +++ b/pkg/info/options.go @@ -5,9 +5,7 @@ type options struct { } func defaultOptions() *options { - return &options{ - svrInfoFilePath: ServerInfoFilePath, - } + return &options{} } type Option func(*options) diff --git a/pkg/info/types.go b/pkg/info/types.go index 2f7b606c..0da0a4ef 100644 --- a/pkg/info/types.go +++ b/pkg/info/types.go @@ -1,9 +1,5 @@ package info -const ( - ServerInfoFilePath = "/var/run/numaflow/server-info" -) - type Protocol string const ( diff --git a/pkg/mapper/options.go b/pkg/mapper/options.go index 45683be5..5b1b89b5 100644 --- a/pkg/mapper/options.go +++ b/pkg/mapper/options.go @@ -1,9 +1,5 @@ package mapper -import ( - "github.com/numaproj/numaflow-go/pkg/info" -) - type options struct { sockAddr string maxMessageSize int @@ -17,7 +13,7 @@ func defaultOptions() *options { return &options{ sockAddr: address, maxMessageSize: defaultMaxMessageSize, - serverInfoFilePath: info.ServerInfoFilePath, + serverInfoFilePath: serverInfoFilePath, } } diff --git a/pkg/mapper/service.go b/pkg/mapper/service.go index 4e2ed223..16aab8a0 100644 --- a/pkg/mapper/service.go +++ b/pkg/mapper/service.go @@ -12,6 +12,7 @@ const ( uds = "unix" address = "/var/run/numaflow/map.sock" defaultMaxMessageSize = 1024 * 1024 * 64 + serverInfoFilePath = "/var/run/numaflow/mapper-server-info" ) // Service implements the proto gen server interface and contains the map operation diff --git a/pkg/mapstreamer/options.go b/pkg/mapstreamer/options.go index 2b5c9290..42a9c4a1 100644 --- a/pkg/mapstreamer/options.go +++ b/pkg/mapstreamer/options.go @@ -1,9 +1,5 @@ package mapstreamer -import ( - "github.com/numaproj/numaflow-go/pkg/info" -) - type options struct { sockAddr string maxMessageSize int @@ -17,7 +13,7 @@ func defaultOptions() *options { return &options{ sockAddr: address, maxMessageSize: defaultMaxMessageSize, - serverInfoFilePath: info.ServerInfoFilePath, + serverInfoFilePath: serverInfoFilePath, } } diff --git a/pkg/mapstreamer/service.go b/pkg/mapstreamer/service.go index b3dc9c2b..2e9022e0 100644 --- a/pkg/mapstreamer/service.go +++ b/pkg/mapstreamer/service.go @@ -12,6 +12,7 @@ const ( uds = "unix" defaultMaxMessageSize = 1024 * 1024 * 64 address = "/var/run/numaflow/mapstream.sock" + serverInfoFilePath = "/var/run/numaflow/mapstreamer-server-info" ) // Service implements the proto gen server interface and contains the map diff --git a/pkg/reducer/options.go b/pkg/reducer/options.go index e015cdb8..a5b17605 100644 --- a/pkg/reducer/options.go +++ b/pkg/reducer/options.go @@ -1,9 +1,5 @@ package reducer -import ( - "github.com/numaproj/numaflow-go/pkg/info" -) - type options struct { sockAddr string maxMessageSize int @@ -17,7 +13,7 @@ func defaultOptions() *options { return &options{ sockAddr: address, maxMessageSize: defaultMaxMessageSize, - serverInfoFilePath: info.ServerInfoFilePath, + serverInfoFilePath: serverInfoFilePath, } } diff --git a/pkg/reducer/service.go b/pkg/reducer/service.go index 1a5148a8..e61e9b4d 100644 --- a/pkg/reducer/service.go +++ b/pkg/reducer/service.go @@ -19,6 +19,7 @@ const ( winStartTime = "x-numaflow-win-start-time" winEndTime = "x-numaflow-win-end-time" delimiter = ":" + serverInfoFilePath = "/var/run/numaflow/reducer-server-info" ) // Service implements the proto gen server interface and contains the reduce operation handler. diff --git a/pkg/reducestreamer/options.go b/pkg/reducestreamer/options.go index 3cabe0df..c4d51713 100644 --- a/pkg/reducestreamer/options.go +++ b/pkg/reducestreamer/options.go @@ -1,9 +1,5 @@ package reducestreamer -import ( - "github.com/numaproj/numaflow-go/pkg/info" -) - type options struct { sockAddr string maxMessageSize int @@ -17,7 +13,7 @@ func defaultOptions() *options { return &options{ sockAddr: address, maxMessageSize: defaultMaxMessageSize, - serverInfoFilePath: info.ServerInfoFilePath, + serverInfoFilePath: serverInfoFilePath, } } diff --git a/pkg/reducestreamer/service.go b/pkg/reducestreamer/service.go index 641ce86d..38182c23 100644 --- a/pkg/reducestreamer/service.go +++ b/pkg/reducestreamer/service.go @@ -19,6 +19,7 @@ const ( winStartTime = "x-numaflow-win-start-time" winEndTime = "x-numaflow-win-end-time" delimiter = ":" + serverInfoFilePath = "/var/run/numaflow/reducestreamer-server-info" ) // Service implements the proto gen server interface and contains the reduceStream operation handler. diff --git a/pkg/sessionreducer/options.go b/pkg/sessionreducer/options.go index ba7af1e3..4572f7d2 100644 --- a/pkg/sessionreducer/options.go +++ b/pkg/sessionreducer/options.go @@ -1,9 +1,5 @@ package sessionreducer -import ( - "github.com/numaproj/numaflow-go/pkg/info" -) - type options struct { sockAddr string maxMessageSize int @@ -17,7 +13,7 @@ func defaultOptions() *options { return &options{ sockAddr: address, maxMessageSize: defaultMaxMessageSize, - serverInfoFilePath: info.ServerInfoFilePath, + serverInfoFilePath: serverInfoFilePath, } } diff --git a/pkg/sessionreducer/service.go b/pkg/sessionreducer/service.go index b3ddd08b..cf033ffa 100644 --- a/pkg/sessionreducer/service.go +++ b/pkg/sessionreducer/service.go @@ -17,6 +17,7 @@ const ( defaultMaxMessageSize = 1024 * 1024 * 64 address = "/var/run/numaflow/sessionreduce.sock" delimiter = ":" + serverInfoFilePath = "/var/run/numaflow/sessionreducer-server-info" ) // Service implements the proto gen server interface and contains the sesionreduce operation handler. diff --git a/pkg/sideinput/options.go b/pkg/sideinput/options.go index 5728d801..7740be62 100644 --- a/pkg/sideinput/options.go +++ b/pkg/sideinput/options.go @@ -1,7 +1,5 @@ package sideinput -import "github.com/numaproj/numaflow-go/pkg/info" - // options is the struct to hold the server options. type options struct { sockAddr string @@ -17,7 +15,7 @@ func defaultOptions() *options { return &options{ sockAddr: address, maxMessageSize: defaultMaxMessageSize, - serverInfoFilePath: info.ServerInfoFilePath, + serverInfoFilePath: serverInfoFilePath, } } diff --git a/pkg/sideinput/service.go b/pkg/sideinput/service.go index 6967e790..7a1e5d53 100644 --- a/pkg/sideinput/service.go +++ b/pkg/sideinput/service.go @@ -13,6 +13,7 @@ const ( address = "/var/run/numaflow/sideinput.sock" DirPath = "/var/numaflow/side-inputs" defaultMaxMessageSize = 1024 * 1024 * 64 // 64MB + serverInfoFilePath = "/var/run/numaflow/sideinput-server-info" ) // Service implements the proto gen server interface and contains the retrieve operation handler diff --git a/pkg/sinker/options.go b/pkg/sinker/options.go index b9974034..a913ae8b 100644 --- a/pkg/sinker/options.go +++ b/pkg/sinker/options.go @@ -1,9 +1,5 @@ package sinker -import ( - "github.com/numaproj/numaflow-go/pkg/info" -) - type options struct { sockAddr string maxMessageSize int @@ -17,7 +13,7 @@ func defaultOptions() *options { return &options{ sockAddr: address, maxMessageSize: defaultMaxMessageSize, - serverInfoFilePath: info.ServerInfoFilePath, + serverInfoFilePath: serverInfoFilePath, } } diff --git a/pkg/sinker/service.go b/pkg/sinker/service.go index ef424afc..d008c1a3 100644 --- a/pkg/sinker/service.go +++ b/pkg/sinker/service.go @@ -15,6 +15,7 @@ const ( uds = "unix" defaultMaxMessageSize = 1024 * 1024 * 64 // 64MB address = "/var/run/numaflow/sink.sock" + serverInfoFilePath = "/var/run/numaflow/sinker-server-info" ) // handlerDatum implements the Datum interface and is used in the sink functions. diff --git a/pkg/sourcer/options.go b/pkg/sourcer/options.go index a665ee25..b91af7ff 100644 --- a/pkg/sourcer/options.go +++ b/pkg/sourcer/options.go @@ -1,9 +1,5 @@ package sourcer -import ( - "github.com/numaproj/numaflow-go/pkg/info" -) - type options struct { sockAddr string maxMessageSize int @@ -17,7 +13,7 @@ func defaultOptions() *options { return &options{ sockAddr: address, maxMessageSize: defaultMaxMessageSize, - serverInfoFilePath: info.ServerInfoFilePath, + serverInfoFilePath: serverInfoFilePath, } } diff --git a/pkg/sourcer/service.go b/pkg/sourcer/service.go index 962691b1..33d82743 100644 --- a/pkg/sourcer/service.go +++ b/pkg/sourcer/service.go @@ -14,6 +14,7 @@ const ( uds = "unix" defaultMaxMessageSize = 1024 * 1024 * 64 // 64MB address = "/var/run/numaflow/source.sock" + serverInfoFilePath = "/var/run/numaflow/sourcer-server-info" ) // Service implements the proto gen server interface diff --git a/pkg/sourcetransformer/options.go b/pkg/sourcetransformer/options.go index 8cbc2fb4..410b685c 100644 --- a/pkg/sourcetransformer/options.go +++ b/pkg/sourcetransformer/options.go @@ -1,9 +1,5 @@ package sourcetransformer -import ( - "github.com/numaproj/numaflow-go/pkg/info" -) - type options struct { sockAddr string maxMessageSize int @@ -17,7 +13,7 @@ func defaultOptions() *options { return &options{ sockAddr: address, maxMessageSize: defaultMaxMessageSize, - serverInfoFilePath: info.ServerInfoFilePath, + serverInfoFilePath: serverInfoFilePath, } } diff --git a/pkg/sourcetransformer/service.go b/pkg/sourcetransformer/service.go index 77c7bfec..327c7fdd 100644 --- a/pkg/sourcetransformer/service.go +++ b/pkg/sourcetransformer/service.go @@ -13,6 +13,7 @@ const ( uds = "unix" defaultMaxMessageSize = 1024 * 1024 * 64 address = "/var/run/numaflow/sourcetransform.sock" + serverInfoFilePath = "/var/run/numaflow/sourcetransformer-server-info" ) // Service implements the proto gen server interface and contains the transformer operation