Skip to content

Commit

Permalink
added hot reload to receptor config (#1135)
Browse files Browse the repository at this point in the history
  • Loading branch information
resoluteCoder authored Sep 9, 2024
1 parent 4413a87 commit b9bab8f
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 58 deletions.
91 changes: 74 additions & 17 deletions cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,16 @@ type Runer interface {
Run() error
}

type Reloader interface {
Reload() error
}

type ReceptorConfig struct {
// Used pointer structs to apply defaults to config
Node *types.NodeCfg
Trace logger.TraceCfg
LocalOnly backends.NullBackendCfg `mapstructure:"local-only"`
LogLevel *logger.LoglevelCfg `mapstructure:"log-level"`
ControlServices []*controlsvc.CmdlineConfigUnix `mapstructure:"control-services"`
TCPPeers []*backends.TCPDialerCfg `mapstructure:"tcp-peers"`
UDPPeers []*backends.UDPDialerCfg `mapstructure:"udp-peers"`
WSPeers []*backends.WebsocketDialerCfg `mapstructure:"ws-peers"`
TCPListeners []*backends.TCPListenerCfg `mapstructure:"tcp-listeners"`
UDPListeners []*backends.UDPListenerCfg `mapstructure:"udp-listeners"`
WSListeners []*backends.WebsocketListenerCfg `mapstructure:"ws-listeners"`
TLSClients []netceptor.TLSClientConfig `mapstructure:"tls-clients"`
TLSServer []netceptor.TLSServerConfig `mapstructure:"tls-servers"`
WorkCommands []workceptor.CommandWorkerCfg `mapstructure:"work-commands"`
Expand All @@ -63,28 +60,48 @@ type CertificatesConfig struct {
SignReq []certificates.SignReqConfig `mapstructure:"cert-signreqs"`
}

type BackendConfig struct {
TCPListeners []*backends.TCPListenerCfg `mapstructure:"tcp-listeners"`
UDPListeners []*backends.UDPListenerCfg `mapstructure:"udp-listeners"`
WSListeners []*backends.WebsocketListenerCfg `mapstructure:"ws-listeners"`
TCPPeers []*backends.TCPDialerCfg `mapstructure:"tcp-peers"`
UDPPeers []*backends.UDPDialerCfg `mapstructure:"udp-peers"`
WSPeers []*backends.WebsocketDialerCfg `mapstructure:"ws-peers"`
LocalOnly backends.NullBackendCfg `mapstructure:"local-only"`
}

func PrintPhaseErrorMessage(configName string, phase string, err error) {
fmt.Printf("ERROR: %s for %s on %s phase\n", err, configName, phase)
}

func ParseConfigs(configFile string) (*ReceptorConfig, *CertificatesConfig, error) {
if configFile == "" && viper.ConfigFileUsed() == "" {
fmt.Fprintln(os.Stderr, "Could not locate config file (default is $HOME/receptor.yaml)")
os.Exit(1)
}
func ParseReceptorConfig(configFile string) (*ReceptorConfig, error) {
var receptorConfig ReceptorConfig
var certifcatesConfig CertificatesConfig
err := viper.Unmarshal(&receptorConfig)
if err != nil {
return nil, nil, err
return nil, err
}

err = viper.Unmarshal(&certifcatesConfig)
return &receptorConfig, nil
}

func ParseCertificatesConfig(configFile string) (*CertificatesConfig, error) {
var certifcatesConfig CertificatesConfig
err := viper.Unmarshal(&certifcatesConfig)
if err != nil {
return nil, err
}

return &certifcatesConfig, nil
}

func ParseBackendConfig(configFile string) (*BackendConfig, error) {
var backendConfig BackendConfig
err := viper.Unmarshal(&backendConfig)
if err != nil {
return nil, nil, err
return nil, err
}

return &receptorConfig, &certifcatesConfig, nil
return &backendConfig, nil
}

func isConfigEmpty(v reflect.Value) bool {
Expand Down Expand Up @@ -157,6 +174,46 @@ func RunPhases(phase string, v reflect.Value) {
}
}

// ReloadServices iterates through key/values calling reload on applicable services.
func ReloadServices(v reflect.Value) {
for i := 0; i < v.NumField(); i++ {
// if the services is not initialised, skip
if reflect.Value.IsZero(v.Field(i)) {
continue
}

var err error
switch v.Field(i).Kind() {
case reflect.Slice:
// iterate over all the type fields
for j := 0; j < v.Field(i).Len(); j++ {
serviceItem := v.Field(i).Index(j).Interface()
switch c := serviceItem.(type) {
// check to see if the selected type field satisfies reload
// call reload on cfg object
case Reloader:
err = c.Reload()
if err != nil {
PrintPhaseErrorMessage(v.Type().Name(), "reload", err)
}
// if cfg object does not satisfy, do nothing
default:
}
}
// runs for non slice fields
default:
switch c := v.Field(i).Interface().(type) {
case Reloader:
err = c.Reload()
if err != nil {
PrintPhaseErrorMessage(v.Type().Name(), "reload", err)
}
default:
}
}
}
}

func RunConfigV1() {
cl := cmdline.NewCmdline()
cl.AddConfigType("node", "Specifies the node configuration of this instance", types.NodeCfg{}, cmdline.Required, cmdline.Singleton)
Expand Down
25 changes: 14 additions & 11 deletions cmd/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package cmd

import "github.com/ansible/receptor/pkg/types"

func SetTCPListenerDefaults(config *ReceptorConfig) {
func SetTCPListenerDefaults(config *BackendConfig) {
for _, listener := range config.TCPListeners {
if listener.Cost == 0 {
listener.Cost = 1.0
Expand All @@ -13,7 +13,7 @@ func SetTCPListenerDefaults(config *ReceptorConfig) {
}
}

func SetUDPListenerDefaults(config *ReceptorConfig) {
func SetUDPListenerDefaults(config *BackendConfig) {
for _, listener := range config.UDPListeners {
if listener.Cost == 0 {
listener.Cost = 1.0
Expand All @@ -24,7 +24,7 @@ func SetUDPListenerDefaults(config *ReceptorConfig) {
}
}

func SetWSListenerDefaults(config *ReceptorConfig) {
func SetWSListenerDefaults(config *BackendConfig) {
for _, listener := range config.WSListeners {
if listener.Cost == 0 {
listener.Cost = 1.0
Expand All @@ -38,7 +38,7 @@ func SetWSListenerDefaults(config *ReceptorConfig) {
}
}

func SetUDPPeerDefaults(config *ReceptorConfig) {
func SetUDPPeerDefaults(config *BackendConfig) {
for _, peer := range config.UDPPeers {
if peer.Cost == 0 {
peer.Cost = 1.0
Expand All @@ -50,7 +50,7 @@ func SetUDPPeerDefaults(config *ReceptorConfig) {
}
}

func SetTCPPeerDefaults(config *ReceptorConfig) {
func SetTCPPeerDefaults(config *BackendConfig) {
for _, peer := range config.TCPPeers {
if peer.Cost == 0 {
peer.Cost = 1.0
Expand All @@ -62,7 +62,7 @@ func SetTCPPeerDefaults(config *ReceptorConfig) {
}
}

func SetWSPeerDefaults(config *ReceptorConfig) {
func SetWSPeerDefaults(config *BackendConfig) {
for _, peer := range config.WSPeers {
if peer.Cost == 0 {
peer.Cost = 1.0
Expand Down Expand Up @@ -116,15 +116,18 @@ func SetKubeWorkerDefaults(config *ReceptorConfig) {
}
}

func SetConfigDefaults(config *ReceptorConfig) {
func SetReceptorConfigDefaults(config *ReceptorConfig) {
SetCmdlineUnixDefaults(config)
SetLogLevelDefaults(config)
SetNodeDefaults(config)
SetKubeWorkerDefaults(config)
}

func SetBackendConfigDefaults(config *BackendConfig) {
SetTCPListenerDefaults(config)
SetUDPListenerDefaults(config)
SetWSListenerDefaults(config)
SetTCPPeerDefaults(config)
SetUDPPeerDefaults(config)
SetWSPeerDefaults(config)
SetCmdlineUnixDefaults(config)
SetLogLevelDefaults(config)
SetNodeDefaults(config)
SetKubeWorkerDefaults(config)
}
4 changes: 0 additions & 4 deletions cmd/receptor-cl/receptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@ import (
"os"

"github.com/ansible/receptor/cmd"
_ "github.com/ansible/receptor/internal/version"
_ "github.com/ansible/receptor/pkg/backends"
_ "github.com/ansible/receptor/pkg/certificates"
"github.com/ansible/receptor/pkg/netceptor"
_ "github.com/ansible/receptor/pkg/services"
)

func main() {
Expand Down
116 changes: 90 additions & 26 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@ import (
"reflect"

receptorVersion "github.com/ansible/receptor/internal/version"
"github.com/ansible/receptor/pkg/logger"
"github.com/ansible/receptor/pkg/netceptor"
"github.com/fsnotify/fsnotify"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)

var (
cfgFile string
version bool
cfgFile string
version bool
backendConfig *BackendConfig
)

// rootCmd represents the base command when called without any subcommands.
Expand All @@ -23,28 +27,7 @@ var rootCmd = &cobra.Command{
Receptor is an overlay network intended to ease the distribution of work across a large and dispersed collection of workers.
Receptor nodes establish peer-to-peer connections with each other via existing networks.
Once connected, the receptor mesh provides datagram (UDP-like) and stream (TCP-like) capabilities to applications, as well as robust unit-of-work handling with resiliency against transient network failures.`,
Run: func(cmd *cobra.Command, args []string) {
if version {
fmt.Println(receptorVersion.Version)
os.Exit(0)
}
receptorConfig, certifcatesConfig, err := ParseConfigs(cfgFile)
if err != nil {
fmt.Printf("unable to decode into struct, %v", err)
os.Exit(1)
}

isEmptyReceptorConfig := isConfigEmpty(reflect.ValueOf(*receptorConfig))

RunConfigV2(reflect.ValueOf(*certifcatesConfig))
if isEmptyReceptorConfig {
fmt.Println("empty receptor config, skipping...")
os.Exit(0)
}

SetConfigDefaults(receptorConfig)
RunConfigV2(reflect.ValueOf(*receptorConfig))
},
Run: handleRootCommand,
}

func Execute() {
Expand All @@ -61,10 +44,9 @@ func init() {
rootCmd.Flags().BoolVar(&version, "version", false, "Show the Receptor version")
}

var FindMe = true

// initConfig reads in config file and ENV variables if set.
func initConfig() {
l := logger.NewReceptorLogger("")
if cfgFile != "" {
viper.SetConfigFile(cfgFile)
} else {
Expand All @@ -78,8 +60,90 @@ func initConfig() {

viper.AutomaticEnv()

viper.OnConfigChange(func(e fsnotify.Event) {
l.Info("Config file changed: %s\n", e.Name)

var newConfig *BackendConfig
viper.Unmarshal(&newConfig)

// used because OnConfigChange runs twice for some reason
// allows to skip empty first config
isEmpty := isConfigEmpty(reflect.ValueOf(*newConfig))
if isEmpty {
return
}

SetBackendConfigDefaults(newConfig)

isEqual := reflect.DeepEqual(*backendConfig, *newConfig)
if !isEqual {
// fmt.Println("reloading backends")

// this will do a reload of all reloadable services
// TODO: Optimize to only reload services that have config change
// NOTE: Make sure to account for two things
// if current config had two services then new config has zero cancel those backends
// if services has two items in a slice and one of them has changed iterate and reload on changed service
netceptor.MainInstance.CancelBackends()
l.Info("Reloading backends")

ReloadServices(reflect.ValueOf(*newConfig))
backendConfig = newConfig

return
}

l.Info("No reloadable backends were found.")
})
// TODO: use env to turn off watch config
viper.WatchConfig()

err := viper.ReadInConfig()
if err == nil {
fmt.Fprintln(os.Stdout, "Using config file:", viper.ConfigFileUsed())
}
}

func handleRootCommand(cmd *cobra.Command, args []string) {
if version {
fmt.Println(receptorVersion.Version)
os.Exit(0)
}

if cfgFile == "" && viper.ConfigFileUsed() == "" {
fmt.Fprintln(os.Stderr, "Could not locate config file (default is $HOME/receptor.yaml)")
os.Exit(1)
}

receptorConfig, err := ParseReceptorConfig(cfgFile)
if err != nil {
fmt.Printf("unable to decode into struct, %v", err)
os.Exit(1)
}

certifcatesConfig, err := ParseCertificatesConfig(cfgFile)
if err != nil {
fmt.Printf("unable to decode into struct, %v", err)
os.Exit(1)
}

backendConfig, err = ParseBackendConfig(cfgFile)
if err != nil {
fmt.Printf("unable to decode into struct, %v", err)
os.Exit(1)
}

isEmptyReceptorConfig := isConfigEmpty(reflect.ValueOf(*receptorConfig))
isEmptyReloadableServicesConfig := isConfigEmpty(reflect.ValueOf(*backendConfig))

RunConfigV2(reflect.ValueOf(*certifcatesConfig))
if isEmptyReceptorConfig && isEmptyReloadableServicesConfig {
fmt.Println("empty receptor config, skipping...")
os.Exit(0)
}

SetReceptorConfigDefaults(receptorConfig)
SetBackendConfigDefaults(backendConfig)
RunConfigV2(reflect.ValueOf(*receptorConfig))
RunConfigV2(reflect.ValueOf(*backendConfig))
}

0 comments on commit b9bab8f

Please sign in to comment.