Skip to content

Commit

Permalink
chore: create new server info file paths for all services (#98)
Browse files Browse the repository at this point in the history
Signed-off-by: a3hadi <[email protected]>
  • Loading branch information
ayildirim21 authored Feb 12, 2024
1 parent 08210e9 commit 715955f
Show file tree
Hide file tree
Showing 20 changed files with 19 additions and 50 deletions.
4 changes: 1 addition & 3 deletions pkg/info/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ type options struct {
}

func defaultOptions() *options {
return &options{
svrInfoFilePath: ServerInfoFilePath,
}
return &options{}
}

type Option func(*options)
Expand Down
4 changes: 0 additions & 4 deletions pkg/info/types.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package info

const (
ServerInfoFilePath = "/var/run/numaflow/server-info"
)

type Protocol string

const (
Expand Down
6 changes: 1 addition & 5 deletions pkg/mapper/options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package mapper

import (
"github.com/numaproj/numaflow-go/pkg/info"
)

type options struct {
sockAddr string
maxMessageSize int
Expand All @@ -17,7 +13,7 @@ func defaultOptions() *options {
return &options{
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: info.ServerInfoFilePath,
serverInfoFilePath: serverInfoFilePath,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/mapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const (
uds = "unix"
address = "/var/run/numaflow/map.sock"
defaultMaxMessageSize = 1024 * 1024 * 64
serverInfoFilePath = "/var/run/numaflow/mapper-server-info"
)

// Service implements the proto gen server interface and contains the map operation
Expand Down
6 changes: 1 addition & 5 deletions pkg/mapstreamer/options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package mapstreamer

import (
"github.com/numaproj/numaflow-go/pkg/info"
)

type options struct {
sockAddr string
maxMessageSize int
Expand All @@ -17,7 +13,7 @@ func defaultOptions() *options {
return &options{
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: info.ServerInfoFilePath,
serverInfoFilePath: serverInfoFilePath,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/mapstreamer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const (
uds = "unix"
defaultMaxMessageSize = 1024 * 1024 * 64
address = "/var/run/numaflow/mapstream.sock"
serverInfoFilePath = "/var/run/numaflow/mapstreamer-server-info"
)

// Service implements the proto gen server interface and contains the map
Expand Down
6 changes: 1 addition & 5 deletions pkg/reducer/options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package reducer

import (
"github.com/numaproj/numaflow-go/pkg/info"
)

type options struct {
sockAddr string
maxMessageSize int
Expand All @@ -17,7 +13,7 @@ func defaultOptions() *options {
return &options{
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: info.ServerInfoFilePath,
serverInfoFilePath: serverInfoFilePath,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/reducer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
winStartTime = "x-numaflow-win-start-time"
winEndTime = "x-numaflow-win-end-time"
delimiter = ":"
serverInfoFilePath = "/var/run/numaflow/reducer-server-info"
)

// Service implements the proto gen server interface and contains the reduce operation handler.
Expand Down
6 changes: 1 addition & 5 deletions pkg/reducestreamer/options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package reducestreamer

import (
"github.com/numaproj/numaflow-go/pkg/info"
)

type options struct {
sockAddr string
maxMessageSize int
Expand All @@ -17,7 +13,7 @@ func defaultOptions() *options {
return &options{
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: info.ServerInfoFilePath,
serverInfoFilePath: serverInfoFilePath,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/reducestreamer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
winStartTime = "x-numaflow-win-start-time"
winEndTime = "x-numaflow-win-end-time"
delimiter = ":"
serverInfoFilePath = "/var/run/numaflow/reducestreamer-server-info"
)

// Service implements the proto gen server interface and contains the reduceStream operation handler.
Expand Down
6 changes: 1 addition & 5 deletions pkg/sessionreducer/options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package sessionreducer

import (
"github.com/numaproj/numaflow-go/pkg/info"
)

type options struct {
sockAddr string
maxMessageSize int
Expand All @@ -17,7 +13,7 @@ func defaultOptions() *options {
return &options{
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: info.ServerInfoFilePath,
serverInfoFilePath: serverInfoFilePath,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sessionreducer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (
defaultMaxMessageSize = 1024 * 1024 * 64
address = "/var/run/numaflow/sessionreduce.sock"
delimiter = ":"
serverInfoFilePath = "/var/run/numaflow/sessionreducer-server-info"
)

// Service implements the proto gen server interface and contains the sesionreduce operation handler.
Expand Down
4 changes: 1 addition & 3 deletions pkg/sideinput/options.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package sideinput

import "github.com/numaproj/numaflow-go/pkg/info"

// options is the struct to hold the server options.
type options struct {
sockAddr string
Expand All @@ -17,7 +15,7 @@ func defaultOptions() *options {
return &options{
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: info.ServerInfoFilePath,
serverInfoFilePath: serverInfoFilePath,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sideinput/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const (
address = "/var/run/numaflow/sideinput.sock"
DirPath = "/var/numaflow/side-inputs"
defaultMaxMessageSize = 1024 * 1024 * 64 // 64MB
serverInfoFilePath = "/var/run/numaflow/sideinput-server-info"
)

// Service implements the proto gen server interface and contains the retrieve operation handler
Expand Down
6 changes: 1 addition & 5 deletions pkg/sinker/options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package sinker

import (
"github.com/numaproj/numaflow-go/pkg/info"
)

type options struct {
sockAddr string
maxMessageSize int
Expand All @@ -17,7 +13,7 @@ func defaultOptions() *options {
return &options{
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: info.ServerInfoFilePath,
serverInfoFilePath: serverInfoFilePath,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sinker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (
uds = "unix"
defaultMaxMessageSize = 1024 * 1024 * 64 // 64MB
address = "/var/run/numaflow/sink.sock"
serverInfoFilePath = "/var/run/numaflow/sinker-server-info"
)

// handlerDatum implements the Datum interface and is used in the sink functions.
Expand Down
6 changes: 1 addition & 5 deletions pkg/sourcer/options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package sourcer

import (
"github.com/numaproj/numaflow-go/pkg/info"
)

type options struct {
sockAddr string
maxMessageSize int
Expand All @@ -17,7 +13,7 @@ func defaultOptions() *options {
return &options{
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: info.ServerInfoFilePath,
serverInfoFilePath: serverInfoFilePath,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sourcer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const (
uds = "unix"
defaultMaxMessageSize = 1024 * 1024 * 64 // 64MB
address = "/var/run/numaflow/source.sock"
serverInfoFilePath = "/var/run/numaflow/sourcer-server-info"
)

// Service implements the proto gen server interface
Expand Down
6 changes: 1 addition & 5 deletions pkg/sourcetransformer/options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package sourcetransformer

import (
"github.com/numaproj/numaflow-go/pkg/info"
)

type options struct {
sockAddr string
maxMessageSize int
Expand All @@ -17,7 +13,7 @@ func defaultOptions() *options {
return &options{
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: info.ServerInfoFilePath,
serverInfoFilePath: serverInfoFilePath,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sourcetransformer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const (
uds = "unix"
defaultMaxMessageSize = 1024 * 1024 * 64
address = "/var/run/numaflow/sourcetransform.sock"
serverInfoFilePath = "/var/run/numaflow/sourcetransformer-server-info"
)

// Service implements the proto gen server interface and contains the transformer operation
Expand Down

0 comments on commit 715955f

Please sign in to comment.