From b9bab8f13a6bfcd9467952196857c41721cb099c Mon Sep 17 00:00:00 2001 From: Chris Santiago Date: Mon, 9 Sep 2024 10:11:34 -0500 Subject: [PATCH] added hot reload to receptor config (#1135) --- cmd/config.go | 91 ++++++++++++++++++++++------ cmd/defaults.go | 25 ++++---- cmd/receptor-cl/receptor.go | 4 -- cmd/root.go | 116 ++++++++++++++++++++++++++++-------- 4 files changed, 178 insertions(+), 58 deletions(-) diff --git a/cmd/config.go b/cmd/config.go index df371a1dc..5f331d32b 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -29,19 +29,16 @@ type Runer interface { Run() error } +type Reloader interface { + Reload() error +} + type ReceptorConfig struct { // Used pointer structs to apply defaults to config Node *types.NodeCfg Trace logger.TraceCfg - LocalOnly backends.NullBackendCfg `mapstructure:"local-only"` LogLevel *logger.LoglevelCfg `mapstructure:"log-level"` ControlServices []*controlsvc.CmdlineConfigUnix `mapstructure:"control-services"` - TCPPeers []*backends.TCPDialerCfg `mapstructure:"tcp-peers"` - UDPPeers []*backends.UDPDialerCfg `mapstructure:"udp-peers"` - WSPeers []*backends.WebsocketDialerCfg `mapstructure:"ws-peers"` - TCPListeners []*backends.TCPListenerCfg `mapstructure:"tcp-listeners"` - UDPListeners []*backends.UDPListenerCfg `mapstructure:"udp-listeners"` - WSListeners []*backends.WebsocketListenerCfg `mapstructure:"ws-listeners"` TLSClients []netceptor.TLSClientConfig `mapstructure:"tls-clients"` TLSServer []netceptor.TLSServerConfig `mapstructure:"tls-servers"` WorkCommands []workceptor.CommandWorkerCfg `mapstructure:"work-commands"` @@ -63,28 +60,48 @@ type CertificatesConfig struct { SignReq []certificates.SignReqConfig `mapstructure:"cert-signreqs"` } +type BackendConfig struct { + TCPListeners []*backends.TCPListenerCfg `mapstructure:"tcp-listeners"` + UDPListeners []*backends.UDPListenerCfg `mapstructure:"udp-listeners"` + WSListeners []*backends.WebsocketListenerCfg `mapstructure:"ws-listeners"` + TCPPeers []*backends.TCPDialerCfg `mapstructure:"tcp-peers"` + UDPPeers []*backends.UDPDialerCfg `mapstructure:"udp-peers"` + WSPeers []*backends.WebsocketDialerCfg `mapstructure:"ws-peers"` + LocalOnly backends.NullBackendCfg `mapstructure:"local-only"` +} + func PrintPhaseErrorMessage(configName string, phase string, err error) { fmt.Printf("ERROR: %s for %s on %s phase\n", err, configName, phase) } -func ParseConfigs(configFile string) (*ReceptorConfig, *CertificatesConfig, error) { - if configFile == "" && viper.ConfigFileUsed() == "" { - fmt.Fprintln(os.Stderr, "Could not locate config file (default is $HOME/receptor.yaml)") - os.Exit(1) - } +func ParseReceptorConfig(configFile string) (*ReceptorConfig, error) { var receptorConfig ReceptorConfig - var certifcatesConfig CertificatesConfig err := viper.Unmarshal(&receptorConfig) if err != nil { - return nil, nil, err + return nil, err } - err = viper.Unmarshal(&certifcatesConfig) + return &receptorConfig, nil +} + +func ParseCertificatesConfig(configFile string) (*CertificatesConfig, error) { + var certifcatesConfig CertificatesConfig + err := viper.Unmarshal(&certifcatesConfig) + if err != nil { + return nil, err + } + + return &certifcatesConfig, nil +} + +func ParseBackendConfig(configFile string) (*BackendConfig, error) { + var backendConfig BackendConfig + err := viper.Unmarshal(&backendConfig) if err != nil { - return nil, nil, err + return nil, err } - return &receptorConfig, &certifcatesConfig, nil + return &backendConfig, nil } func isConfigEmpty(v reflect.Value) bool { @@ -157,6 +174,46 @@ func RunPhases(phase string, v reflect.Value) { } } +// ReloadServices iterates through key/values calling reload on applicable services. +func ReloadServices(v reflect.Value) { + for i := 0; i < v.NumField(); i++ { + // if the services is not initialised, skip + if reflect.Value.IsZero(v.Field(i)) { + continue + } + + var err error + switch v.Field(i).Kind() { + case reflect.Slice: + // iterate over all the type fields + for j := 0; j < v.Field(i).Len(); j++ { + serviceItem := v.Field(i).Index(j).Interface() + switch c := serviceItem.(type) { + // check to see if the selected type field satisfies reload + // call reload on cfg object + case Reloader: + err = c.Reload() + if err != nil { + PrintPhaseErrorMessage(v.Type().Name(), "reload", err) + } + // if cfg object does not satisfy, do nothing + default: + } + } + // runs for non slice fields + default: + switch c := v.Field(i).Interface().(type) { + case Reloader: + err = c.Reload() + if err != nil { + PrintPhaseErrorMessage(v.Type().Name(), "reload", err) + } + default: + } + } + } +} + func RunConfigV1() { cl := cmdline.NewCmdline() cl.AddConfigType("node", "Specifies the node configuration of this instance", types.NodeCfg{}, cmdline.Required, cmdline.Singleton) diff --git a/cmd/defaults.go b/cmd/defaults.go index 0710d86d1..a06d903a8 100644 --- a/cmd/defaults.go +++ b/cmd/defaults.go @@ -2,7 +2,7 @@ package cmd import "github.com/ansible/receptor/pkg/types" -func SetTCPListenerDefaults(config *ReceptorConfig) { +func SetTCPListenerDefaults(config *BackendConfig) { for _, listener := range config.TCPListeners { if listener.Cost == 0 { listener.Cost = 1.0 @@ -13,7 +13,7 @@ func SetTCPListenerDefaults(config *ReceptorConfig) { } } -func SetUDPListenerDefaults(config *ReceptorConfig) { +func SetUDPListenerDefaults(config *BackendConfig) { for _, listener := range config.UDPListeners { if listener.Cost == 0 { listener.Cost = 1.0 @@ -24,7 +24,7 @@ func SetUDPListenerDefaults(config *ReceptorConfig) { } } -func SetWSListenerDefaults(config *ReceptorConfig) { +func SetWSListenerDefaults(config *BackendConfig) { for _, listener := range config.WSListeners { if listener.Cost == 0 { listener.Cost = 1.0 @@ -38,7 +38,7 @@ func SetWSListenerDefaults(config *ReceptorConfig) { } } -func SetUDPPeerDefaults(config *ReceptorConfig) { +func SetUDPPeerDefaults(config *BackendConfig) { for _, peer := range config.UDPPeers { if peer.Cost == 0 { peer.Cost = 1.0 @@ -50,7 +50,7 @@ func SetUDPPeerDefaults(config *ReceptorConfig) { } } -func SetTCPPeerDefaults(config *ReceptorConfig) { +func SetTCPPeerDefaults(config *BackendConfig) { for _, peer := range config.TCPPeers { if peer.Cost == 0 { peer.Cost = 1.0 @@ -62,7 +62,7 @@ func SetTCPPeerDefaults(config *ReceptorConfig) { } } -func SetWSPeerDefaults(config *ReceptorConfig) { +func SetWSPeerDefaults(config *BackendConfig) { for _, peer := range config.WSPeers { if peer.Cost == 0 { peer.Cost = 1.0 @@ -116,15 +116,18 @@ func SetKubeWorkerDefaults(config *ReceptorConfig) { } } -func SetConfigDefaults(config *ReceptorConfig) { +func SetReceptorConfigDefaults(config *ReceptorConfig) { + SetCmdlineUnixDefaults(config) + SetLogLevelDefaults(config) + SetNodeDefaults(config) + SetKubeWorkerDefaults(config) +} + +func SetBackendConfigDefaults(config *BackendConfig) { SetTCPListenerDefaults(config) SetUDPListenerDefaults(config) SetWSListenerDefaults(config) SetTCPPeerDefaults(config) SetUDPPeerDefaults(config) SetWSPeerDefaults(config) - SetCmdlineUnixDefaults(config) - SetLogLevelDefaults(config) - SetNodeDefaults(config) - SetKubeWorkerDefaults(config) } diff --git a/cmd/receptor-cl/receptor.go b/cmd/receptor-cl/receptor.go index 42f797c78..6425bab18 100644 --- a/cmd/receptor-cl/receptor.go +++ b/cmd/receptor-cl/receptor.go @@ -5,11 +5,7 @@ import ( "os" "github.com/ansible/receptor/cmd" - _ "github.com/ansible/receptor/internal/version" - _ "github.com/ansible/receptor/pkg/backends" - _ "github.com/ansible/receptor/pkg/certificates" "github.com/ansible/receptor/pkg/netceptor" - _ "github.com/ansible/receptor/pkg/services" ) func main() { diff --git a/cmd/root.go b/cmd/root.go index 400ffe08e..8d3adc35f 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -6,13 +6,17 @@ import ( "reflect" receptorVersion "github.com/ansible/receptor/internal/version" + "github.com/ansible/receptor/pkg/logger" + "github.com/ansible/receptor/pkg/netceptor" + "github.com/fsnotify/fsnotify" "github.com/spf13/cobra" "github.com/spf13/viper" ) var ( - cfgFile string - version bool + cfgFile string + version bool + backendConfig *BackendConfig ) // rootCmd represents the base command when called without any subcommands. @@ -23,28 +27,7 @@ var rootCmd = &cobra.Command{ Receptor is an overlay network intended to ease the distribution of work across a large and dispersed collection of workers. Receptor nodes establish peer-to-peer connections with each other via existing networks. Once connected, the receptor mesh provides datagram (UDP-like) and stream (TCP-like) capabilities to applications, as well as robust unit-of-work handling with resiliency against transient network failures.`, - Run: func(cmd *cobra.Command, args []string) { - if version { - fmt.Println(receptorVersion.Version) - os.Exit(0) - } - receptorConfig, certifcatesConfig, err := ParseConfigs(cfgFile) - if err != nil { - fmt.Printf("unable to decode into struct, %v", err) - os.Exit(1) - } - - isEmptyReceptorConfig := isConfigEmpty(reflect.ValueOf(*receptorConfig)) - - RunConfigV2(reflect.ValueOf(*certifcatesConfig)) - if isEmptyReceptorConfig { - fmt.Println("empty receptor config, skipping...") - os.Exit(0) - } - - SetConfigDefaults(receptorConfig) - RunConfigV2(reflect.ValueOf(*receptorConfig)) - }, + Run: handleRootCommand, } func Execute() { @@ -61,10 +44,9 @@ func init() { rootCmd.Flags().BoolVar(&version, "version", false, "Show the Receptor version") } -var FindMe = true - // initConfig reads in config file and ENV variables if set. func initConfig() { + l := logger.NewReceptorLogger("") if cfgFile != "" { viper.SetConfigFile(cfgFile) } else { @@ -78,8 +60,90 @@ func initConfig() { viper.AutomaticEnv() + viper.OnConfigChange(func(e fsnotify.Event) { + l.Info("Config file changed: %s\n", e.Name) + + var newConfig *BackendConfig + viper.Unmarshal(&newConfig) + + // used because OnConfigChange runs twice for some reason + // allows to skip empty first config + isEmpty := isConfigEmpty(reflect.ValueOf(*newConfig)) + if isEmpty { + return + } + + SetBackendConfigDefaults(newConfig) + + isEqual := reflect.DeepEqual(*backendConfig, *newConfig) + if !isEqual { + // fmt.Println("reloading backends") + + // this will do a reload of all reloadable services + // TODO: Optimize to only reload services that have config change + // NOTE: Make sure to account for two things + // if current config had two services then new config has zero cancel those backends + // if services has two items in a slice and one of them has changed iterate and reload on changed service + netceptor.MainInstance.CancelBackends() + l.Info("Reloading backends") + + ReloadServices(reflect.ValueOf(*newConfig)) + backendConfig = newConfig + + return + } + + l.Info("No reloadable backends were found.") + }) + // TODO: use env to turn off watch config + viper.WatchConfig() + err := viper.ReadInConfig() if err == nil { fmt.Fprintln(os.Stdout, "Using config file:", viper.ConfigFileUsed()) } } + +func handleRootCommand(cmd *cobra.Command, args []string) { + if version { + fmt.Println(receptorVersion.Version) + os.Exit(0) + } + + if cfgFile == "" && viper.ConfigFileUsed() == "" { + fmt.Fprintln(os.Stderr, "Could not locate config file (default is $HOME/receptor.yaml)") + os.Exit(1) + } + + receptorConfig, err := ParseReceptorConfig(cfgFile) + if err != nil { + fmt.Printf("unable to decode into struct, %v", err) + os.Exit(1) + } + + certifcatesConfig, err := ParseCertificatesConfig(cfgFile) + if err != nil { + fmt.Printf("unable to decode into struct, %v", err) + os.Exit(1) + } + + backendConfig, err = ParseBackendConfig(cfgFile) + if err != nil { + fmt.Printf("unable to decode into struct, %v", err) + os.Exit(1) + } + + isEmptyReceptorConfig := isConfigEmpty(reflect.ValueOf(*receptorConfig)) + isEmptyReloadableServicesConfig := isConfigEmpty(reflect.ValueOf(*backendConfig)) + + RunConfigV2(reflect.ValueOf(*certifcatesConfig)) + if isEmptyReceptorConfig && isEmptyReloadableServicesConfig { + fmt.Println("empty receptor config, skipping...") + os.Exit(0) + } + + SetReceptorConfigDefaults(receptorConfig) + SetBackendConfigDefaults(backendConfig) + RunConfigV2(reflect.ValueOf(*receptorConfig)) + RunConfigV2(reflect.ValueOf(*backendConfig)) +}