From 19eb81973007adff32bc1edaba215c58286cd359 Mon Sep 17 00:00:00 2001 From: Dirk Wilden Date: Mon, 4 Mar 2024 10:40:07 +0100 Subject: [PATCH 1/3] add config fallback for project configs --- .gitignore | 1 + cmd/alter/alter-partition.go | 4 +- cmd/alter/alter-partition_test.go | 2 +- cmd/alter/alter-topic.go | 2 +- cmd/alter/alter-topic_test.go | 6 +- cmd/attach/attach.go | 2 +- cmd/clone/clone-topic_test.go | 2 +- cmd/config/currentContext.go | 6 +- cmd/config/getContexts.go | 5 +- cmd/config/useContext.go | 10 +- cmd/config/view.go | 2 +- cmd/create/create-acl.go | 4 +- cmd/create/create-topic_test.go | 2 +- cmd/deletion/delete-acl.go | 4 +- .../delete-consumer-group-offset_test.go | 2 +- cmd/deletion/delete-consumer-group_test.go | 2 +- cmd/deletion/delete-topic_test.go | 2 +- cmd/describe/describe-broker.go | 2 +- cmd/describe/describe-consumer-group.go | 2 +- cmd/docs.go | 2 +- cmd/get/get-acl.go | 4 +- cmd/root.go | 152 +---------- cmd/root_test.go | 48 ++-- cmd/version.go | 2 +- internal/broker/broker-operation.go | 14 +- internal/common-operation.go | 4 +- internal/global/config.go | 242 ++++++++++++++++++ .../variables.go => global/env-variables.go} | 4 +- internal/k8s/k8s-operation.go | 44 ++-- internal/k8s/k8s-operation_test.go | 47 ++-- internal/partition/partition-operation.go | 2 +- internal/topic/topic-operation.go | 18 +- output/output.go | 5 + testutil/helpers.go | 8 +- testutil/test_util.go | 4 +- 35 files changed, 390 insertions(+), 272 deletions(-) create mode 100644 internal/global/config.go rename internal/{env/variables.go => global/env-variables.go} (96%) diff --git a/.gitignore b/.gitignore index 2ec7b5e..105c855 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,7 @@ dist kafkactl ### Configs +.kafkactl.yml kafkactl.yml ### Snap diff --git a/cmd/alter/alter-partition.go b/cmd/alter/alter-partition.go index 9fe60fb..6fa0dc9 100644 --- a/cmd/alter/alter-partition.go +++ b/cmd/alter/alter-partition.go @@ -36,14 +36,14 @@ func newAlterPartitionCmd() *cobra.Command { } } }, - PreRunE: func(cmd *cobra.Command, args []string) error { + PreRunE: func(cmd *cobra.Command, _ []string) error { return validation.ValidateAtLeastOneRequiredFlag(cmd) }, ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { if len(args) == 0 { return topic.CompleteTopicNames(cmd, args, toComplete) } else if len(args) == 1 { - return partition.CompletePartitionIds(cmd, args, toComplete) + return partition.CompletePartitionIDs(cmd, args, toComplete) } return nil, cobra.ShellCompDirectiveNoFileComp }, diff --git a/cmd/alter/alter-partition_test.go b/cmd/alter/alter-partition_test.go index a230426..5a94c19 100644 --- a/cmd/alter/alter-partition_test.go +++ b/cmd/alter/alter-partition_test.go @@ -61,7 +61,7 @@ func TestAlterPartitionReplicationFactorIntegration(t *testing.T) { return } - checkReplicas := func(attempt uint) error { + checkReplicas := func(_ uint) error { _, err := kafkaCtl.Execute("describe", "topic", topicName, "-o", "yaml") if err != nil { diff --git a/cmd/alter/alter-topic.go b/cmd/alter/alter-topic.go index 9c361d9..71d4682 100644 --- a/cmd/alter/alter-topic.go +++ b/cmd/alter/alter-topic.go @@ -23,7 +23,7 @@ func newAlterTopicCmd() *cobra.Command { } } }, - PreRunE: func(cmd *cobra.Command, args []string) error { + PreRunE: func(cmd *cobra.Command, _ []string) error { return validation.ValidateAtLeastOneRequiredFlag(cmd) }, ValidArgsFunction: topic.CompleteTopicNames, diff --git a/cmd/alter/alter-topic_test.go b/cmd/alter/alter-topic_test.go index ca1467a..6a4e912 100644 --- a/cmd/alter/alter-topic_test.go +++ b/cmd/alter/alter-topic_test.go @@ -51,7 +51,7 @@ func TestAlterTopicPartitionsIntegration(t *testing.T) { t.Fatalf("failed to execute command: %v", err) } - getPartitions := func(attempt uint) error { + getPartitions := func(_ uint) error { _, err := kafkaCtl.Execute("describe", "topic", topicName, "-o", "yaml") if err != nil { @@ -93,7 +93,7 @@ func TestAlterTopicIncreaseReplicationFactorIntegration(t *testing.T) { return } - checkReplicas := func(attempt uint) error { + checkReplicas := func(_ uint) error { _, err := kafkaCtl.Execute("describe", "topic", topicName, "-o", "yaml") if err != nil { @@ -137,7 +137,7 @@ func TestAlterTopicDecreaseReplicationFactorIntegration(t *testing.T) { return } - checkReplicas := func(attempt uint) error { + checkReplicas := func(_ uint) error { _, err := kafkaCtl.Execute("describe", "topic", topicName, "-o", "yaml") if err != nil { diff --git a/cmd/attach/attach.go b/cmd/attach/attach.go index a703115..c75619e 100644 --- a/cmd/attach/attach.go +++ b/cmd/attach/attach.go @@ -12,7 +12,7 @@ func NewAttachCmd() *cobra.Command { Use: "attach", Short: "run kafkactl pod in kubernetes and attach to it", Args: cobra.NoArgs, - Run: func(cobraCmd *cobra.Command, args []string) { + Run: func(_ *cobra.Command, _ []string) { if err := k8s.NewOperation().Attach(); err != nil { output.Fail(err) } diff --git a/cmd/clone/clone-topic_test.go b/cmd/clone/clone-topic_test.go index afc8554..da30471 100644 --- a/cmd/clone/clone-topic_test.go +++ b/cmd/clone/clone-topic_test.go @@ -29,7 +29,7 @@ func TestCloneTopicIntegration(t *testing.T) { testutil.AssertEquals(t, fmt.Sprintf("topic %s cloned to %s", srcTopic, targetTopic), kafkaCtl.GetStdOut()) - getTopic := func(attempt uint) error { + getTopic := func(_ uint) error { _, err := kafkaCtl.Execute("describe", "topic", targetTopic, "-o", "yaml") if err != nil { diff --git a/cmd/config/currentContext.go b/cmd/config/currentContext.go index 6812a35..b1e522e 100644 --- a/cmd/config/currentContext.go +++ b/cmd/config/currentContext.go @@ -1,9 +1,9 @@ package config import ( + "github.com/deviceinsight/kafkactl/internal/global" "github.com/deviceinsight/kafkactl/output" "github.com/spf13/cobra" - "github.com/spf13/viper" ) func newCurrentContextCmd() *cobra.Command { @@ -12,8 +12,8 @@ func newCurrentContextCmd() *cobra.Command { Aliases: []string{"currentContext"}, Short: "show current context", Long: `Displays the name of context that is currently active`, - Run: func(cmd *cobra.Command, args []string) { - context := viper.GetString("current-context") + Run: func(_ *cobra.Command, _ []string) { + context := global.GetCurrentContext() output.Infof("%s", context) }, } diff --git a/cmd/config/getContexts.go b/cmd/config/getContexts.go index 7cc4411..4c68d16 100644 --- a/cmd/config/getContexts.go +++ b/cmd/config/getContexts.go @@ -1,6 +1,7 @@ package config import ( + "github.com/deviceinsight/kafkactl/internal/global" "github.com/deviceinsight/kafkactl/output" "github.com/spf13/cobra" @@ -16,9 +17,9 @@ func newGetContextsCmd() *cobra.Command { Aliases: []string{"getContexts"}, Short: "list configured contexts", Long: `Output names of all configured contexts`, - Run: func(cmd *cobra.Command, args []string) { + Run: func(_ *cobra.Command, _ []string) { contexts := viper.GetStringMap("contexts") - currentContext := viper.GetString("current-context") + currentContext := global.GetCurrentContext() if outputFormat == "compact" { for name := range contexts { diff --git a/cmd/config/useContext.go b/cmd/config/useContext.go index aa9a64d..f55bfa2 100644 --- a/cmd/config/useContext.go +++ b/cmd/config/useContext.go @@ -3,6 +3,8 @@ package config import ( "sort" + "github.com/deviceinsight/kafkactl/internal/global" + "github.com/deviceinsight/kafkactl/output" "github.com/pkg/errors" @@ -20,7 +22,7 @@ func newUseContextCmd() *cobra.Command { Short: "switch active context", Long: `command to switch active context`, Args: cobra.MinimumNArgs(1), - Run: func(cmd *cobra.Command, args []string) { + Run: func(_ *cobra.Command, args []string) { context := strings.Join(args, " ") @@ -31,13 +33,11 @@ func newUseContextCmd() *cobra.Command { output.Fail(errors.Errorf("not a valid context: %s", context)) } - viper.Set("current-context", context) - - if err := viper.WriteConfig(); err != nil { + if err := global.SetCurrentContext(context); err != nil { output.Fail(errors.Wrap(err, "unable to write config")) } }, - ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { + ValidArgsFunction: func(_ *cobra.Command, args []string, _ string) ([]string, cobra.ShellCompDirective) { if len(args) != 0 { return nil, cobra.ShellCompDirectiveNoFileComp } diff --git a/cmd/config/view.go b/cmd/config/view.go index 2567a68..1a9b574 100644 --- a/cmd/config/view.go +++ b/cmd/config/view.go @@ -15,7 +15,7 @@ func newViewCmd() *cobra.Command { Use: "view", Short: "show contents of config file", Long: `Shows the contents of the config file that is currently used`, - Run: func(cmd *cobra.Command, args []string) { + Run: func(_ *cobra.Command, _ []string) { yamlFile, err := os.ReadFile(viper.ConfigFileUsed()) if err != nil { diff --git a/cmd/create/create-acl.go b/cmd/create/create-acl.go index 9f9d70f..ab104a1 100644 --- a/cmd/create/create-acl.go +++ b/cmd/create/create-acl.go @@ -47,11 +47,11 @@ func newCreateACLCmd() *cobra.Command { _ = cmdCreateACL.MarkFlagRequired("principal") _ = cmdCreateACL.MarkFlagRequired("operation") - _ = cmdCreateACL.RegisterFlagCompletionFunc("pattern", func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { + _ = cmdCreateACL.RegisterFlagCompletionFunc("pattern", func(_ *cobra.Command, _ []string, _ string) ([]string, cobra.ShellCompDirective) { return []string{"match", "prefixed", "literal"}, cobra.ShellCompDirectiveDefault }) - _ = cmdCreateACL.RegisterFlagCompletionFunc("operation", func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { + _ = cmdCreateACL.RegisterFlagCompletionFunc("operation", func(_ *cobra.Command, _ []string, _ string) ([]string, cobra.ShellCompDirective) { return []string{"any", "all", "read", "write", "create", "delete", "alter", "describe", "clusteraction", "describeconfigs", "alterconfigs", "idempotentwrite"}, cobra.ShellCompDirectiveDefault }) diff --git a/cmd/create/create-topic_test.go b/cmd/create/create-topic_test.go index 3368b56..f60f478 100644 --- a/cmd/create/create-topic_test.go +++ b/cmd/create/create-topic_test.go @@ -110,7 +110,7 @@ partitions: } func describeTopic(t *testing.T, kafkaCtl testutil.KafkaCtlTestCommand, topicName string) { - describeTopic := func(attempt uint) error { + describeTopic := func(_ uint) error { _, err := kafkaCtl.Execute("describe", "topic", topicName, "-o", "yaml") return err } diff --git a/cmd/deletion/delete-acl.go b/cmd/deletion/delete-acl.go index 1cc252a..e507842 100644 --- a/cmd/deletion/delete-acl.go +++ b/cmd/deletion/delete-acl.go @@ -42,11 +42,11 @@ func newDeleteACLCmd() *cobra.Command { _ = cmdDeleteACL.MarkFlagRequired("operation") _ = cmdDeleteACL.MarkFlagRequired("pattern") - _ = cmdDeleteACL.RegisterFlagCompletionFunc("operation", func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { + _ = cmdDeleteACL.RegisterFlagCompletionFunc("operation", func(_ *cobra.Command, _ []string, _ string) ([]string, cobra.ShellCompDirective) { return []string{"any", "all", "read", "write", "create", "delete", "alter", "describe", "clusteraction", "describeconfigs", "alterconfigs", "idempotentwrite"}, cobra.ShellCompDirectiveDefault }) - _ = cmdDeleteACL.RegisterFlagCompletionFunc("pattern", func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { + _ = cmdDeleteACL.RegisterFlagCompletionFunc("pattern", func(_ *cobra.Command, _ []string, _ string) ([]string, cobra.ShellCompDirective) { return []string{"any", "match", "prefixed", "literal"}, cobra.ShellCompDirectiveDefault }) diff --git a/cmd/deletion/delete-consumer-group-offset_test.go b/cmd/deletion/delete-consumer-group-offset_test.go index 72f597b..af51493 100644 --- a/cmd/deletion/delete-consumer-group-offset_test.go +++ b/cmd/deletion/delete-consumer-group-offset_test.go @@ -188,7 +188,7 @@ func failedToDeleteMessage(groupName string, topic string, partition int32) stri } func checkOffsetDeleted(kafkaCtl testutil.KafkaCtlTestCommand, groupName string, topic string, partition int32) error { - checkOffsetDeleted := func(attempt uint) error { + checkOffsetDeleted := func(_ uint) error { _, err := kafkaCtl.Execute("describe", "consumer-group", groupName, "-o", "yaml") if err != nil { diff --git a/cmd/deletion/delete-consumer-group_test.go b/cmd/deletion/delete-consumer-group_test.go index fb4db79..7b270a8 100644 --- a/cmd/deletion/delete-consumer-group_test.go +++ b/cmd/deletion/delete-consumer-group_test.go @@ -83,7 +83,7 @@ func TestDeleteConsumerGroupAutoCompletionIntegration(t *testing.T) { func verifyConsumerGroupDeleted(t *testing.T, kafkaCtl testutil.KafkaCtlTestCommand, groupName string) { - checkConsumerGrouDeleted := func(attempt uint) error { + checkConsumerGrouDeleted := func(_ uint) error { _, err := kafkaCtl.Execute("get", "consumer-groups", "-o", "compact") if err != nil { diff --git a/cmd/deletion/delete-topic_test.go b/cmd/deletion/delete-topic_test.go index 1ee929d..5ccf6b6 100644 --- a/cmd/deletion/delete-topic_test.go +++ b/cmd/deletion/delete-topic_test.go @@ -83,7 +83,7 @@ func TestDeleteTopicAutoCompletionIntegration(t *testing.T) { func verifyTopicDeleted(t *testing.T, kafkaCtl testutil.KafkaCtlTestCommand, topicName string) { - checkTopicDeleted := func(attempt uint) error { + checkTopicDeleted := func(_ uint) error { _, err := kafkaCtl.Execute("get", "topics", "-o", "compact") if err != nil { diff --git a/cmd/describe/describe-broker.go b/cmd/describe/describe-broker.go index 9dd58b4..ec868c1 100644 --- a/cmd/describe/describe-broker.go +++ b/cmd/describe/describe-broker.go @@ -30,7 +30,7 @@ func newDescribeBrokerCmd() *cobra.Command { } } }, - ValidArgsFunction: broker.CompleteBrokerIds, + ValidArgsFunction: broker.CompleteBrokerIDs, } cmdDescribeBroker.Flags().StringVarP(&flags.OutputFormat, "output", "o", flags.OutputFormat, "output format. One of: json|yaml|wide") diff --git a/cmd/describe/describe-consumer-group.go b/cmd/describe/describe-consumer-group.go index d995570..d437633 100644 --- a/cmd/describe/describe-consumer-group.go +++ b/cmd/describe/describe-consumer-group.go @@ -24,7 +24,7 @@ func newDescribeConsumerGroupCmd() *cobra.Command { } } }, - ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { + ValidArgsFunction: func(_ *cobra.Command, _ []string, _ string) ([]string, cobra.ShellCompDirective) { return consumergroups.CompleteConsumerGroupsFiltered(flags) }, } diff --git a/cmd/docs.go b/cmd/docs.go index 2301d35..f2e7bb5 100644 --- a/cmd/docs.go +++ b/cmd/docs.go @@ -22,7 +22,7 @@ func newDocsCmd() *cobra.Command { Short: "Generate documentation as markdown or man pages", Long: docsDesc, Hidden: true, - Run: func(cmd *cobra.Command, args []string) { + Run: func(cmd *cobra.Command, _ []string) { if err := (&internal.DocsOperation{}).GenerateDocs(cmd.Root(), flags); err != nil { output.Fail(err) } diff --git a/cmd/get/get-acl.go b/cmd/get/get-acl.go index 3563016..686627c 100644 --- a/cmd/get/get-acl.go +++ b/cmd/get/get-acl.go @@ -39,11 +39,11 @@ func newGetACLCmd() *cobra.Command { cmdGetAcls.Flags().StringVarP(&flags.OutputFormat, "output", "o", flags.OutputFormat, "output format. One of: json|yaml") - _ = cmdGetAcls.RegisterFlagCompletionFunc("operation", func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { + _ = cmdGetAcls.RegisterFlagCompletionFunc("operation", func(_ *cobra.Command, _ []string, _ string) ([]string, cobra.ShellCompDirective) { return []string{"any", "all", "read", "write", "create", "delete", "alter", "describe", "clusteraction", "describeconfigs", "alterconfigs", "idempotentwrite"}, cobra.ShellCompDirectiveDefault }) - _ = cmdGetAcls.RegisterFlagCompletionFunc("pattern", func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { + _ = cmdGetAcls.RegisterFlagCompletionFunc("pattern", func(_ *cobra.Command, _ []string, _ string) ([]string, cobra.ShellCompDirective) { return []string{"any", "match", "prefixed", "literal"}, cobra.ShellCompDirectiveDefault }) diff --git a/cmd/root.go b/cmd/root.go index 483d6dc..43e4620 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -2,11 +2,8 @@ package cmd import ( "fmt" - "os" - "path/filepath" - "strings" - "github.com/deviceinsight/kafkactl/internal/env" + "github.com/deviceinsight/kafkactl/internal/global" "github.com/deviceinsight/kafkactl/cmd/alter" "github.com/deviceinsight/kafkactl/cmd/attach" @@ -21,26 +18,9 @@ import ( "github.com/deviceinsight/kafkactl/cmd/reset" "github.com/deviceinsight/kafkactl/internal/k8s" "github.com/deviceinsight/kafkactl/output" - "github.com/pkg/errors" "github.com/spf13/cobra" - "github.com/spf13/viper" ) -var cfgFile string -var Verbose bool - -const defaultContextPrefix = "CONTEXTS_DEFAULT_" - -const localConfigName = "kafkactl.yml" - -var configPaths = []string{ - "$HOME/.config/kafkactl", - "$HOME/.kafkactl", - "$SNAP_REAL_HOME/.config/kafkactl", - "$SNAP_DATA/kafkactl", - "/etc/kafkactl", -} - func NewKafkactlCommand(streams output.IOStreams) *cobra.Command { var rootCmd = &cobra.Command{ @@ -49,7 +29,9 @@ func NewKafkactlCommand(streams output.IOStreams) *cobra.Command { Long: `A command-line interface the simplifies interaction with Kafka.`, } - cobra.OnInitialize(initConfig) + globalConfig := global.NewConfig() + + cobra.OnInitialize(globalConfig.Init) rootCmd.AddCommand(config.NewConfigCmd()) rootCmd.AddCommand(consume.NewConsumeCmd()) @@ -66,10 +48,12 @@ func NewKafkactlCommand(streams output.IOStreams) *cobra.Command { rootCmd.AddCommand(newVersionCmd()) rootCmd.AddCommand(newDocsCmd()) + globalFlags := globalConfig.Flags() + // use upper-case letters for shorthand params to avoid conflicts with local flags - rootCmd.PersistentFlags().StringVarP(&cfgFile, "config-file", "C", "", - fmt.Sprintf("config file. one of: %v", configPaths)) - rootCmd.PersistentFlags().BoolVarP(&Verbose, "verbose", "V", false, "verbose output") + rootCmd.PersistentFlags().StringVarP(&globalFlags.ConfigFile, "config-file", "C", "", + fmt.Sprintf("config file. default locations: %v", globalConfig.DefaultPaths())) + rootCmd.PersistentFlags().BoolVarP(&globalFlags.Verbose, "verbose", "V", false, "verbose output") k8s.KafkaCtlVersion = Version @@ -78,121 +62,3 @@ func NewKafkactlCommand(streams output.IOStreams) *cobra.Command { rootCmd.SetErr(streams.ErrOut) return rootCmd } - -// initConfig reads in config file and ENV variables if set. -func initConfig() { - - viper.Reset() - - localConfigFile := getConfigFileFromWorkingDir() - - switch { - case cfgFile != "": - viper.SetConfigFile(cfgFile) - case os.Getenv("KAFKA_CTL_CONFIG") != "": - viper.SetConfigFile(os.Getenv("KAFKA_CTL_CONFIG")) - case localConfigFile != "": - viper.SetConfigFile(localConfigFile) - default: - for _, path := range configPaths { - viper.AddConfigPath(os.ExpandEnv(path)) - } - viper.SetConfigName("config") - } - - if Verbose { - output.IoStreams.EnableDebug() - } - - if Verbose && os.Getenv("SNAP_NAME") != "" { - output.Debugf("Running snap version %s on %s", os.Getenv("SNAP_VERSION"), os.Getenv("SNAP_ARCH")) - } - - mapEnvVariables() - - replacer := strings.NewReplacer("-", "_", ".", "_") - viper.SetEnvKeyReplacer(replacer) - - viper.SetDefault("contexts.default.brokers", []string{"localhost:9092"}) - viper.SetDefault("current-context", "default") - - viper.SetConfigType("yml") - viper.AutomaticEnv() // read in environment variables that match - - if err := readConfig(); err != nil { - output.Fail(err) - } -} - -func getConfigFileFromWorkingDir() string { - if _, err := os.Stat(localConfigName); err != nil { - return "" - } - - return localConfigName -} - -func mapEnvVariables() { - for _, short := range env.Variables { - long := defaultContextPrefix + short - if os.Getenv(short) != "" && os.Getenv(long) == "" { - _ = os.Setenv(long, os.Getenv(short)) - } - } -} - -func readConfig() error { - var err error - if err = viper.ReadInConfig(); err == nil { - output.Debugf("Using config file: %s", viper.ConfigFileUsed()) - return nil - } - - _, isConfigFileNotFoundError := err.(viper.ConfigFileNotFoundError) - _, isOsPathError := err.(*os.PathError) - - if !isConfigFileNotFoundError && !isOsPathError { - return errors.Errorf("Error reading config file: %s (%v)", viper.ConfigFileUsed(), err) - } - err = generateDefaultConfig() - if err != nil { - return errors.Wrap(err, "Error generating default config: ") - } - - // We read generated config now - if err = viper.ReadInConfig(); err == nil { - output.Debugf("Using config file: %s", viper.ConfigFileUsed()) - return nil - } - return errors.Errorf("Error reading config file: %s (%v)", viper.ConfigFileUsed(), err) -} - -// generateDefaultConfig generates default config in case there is no config -func generateDefaultConfig() error { - - cfgFile := filepath.Join(os.ExpandEnv(configPaths[0]), "config.yml") - - if os.Getenv("KAFKA_CTL_CONFIG") != "" { - // use config file provided via env - cfgFile = os.Getenv("KAFKA_CTL_CONFIG") - } else if os.Getenv("SNAP_REAL_HOME") != "" { - // use different configFile when running in snap - for _, configPath := range configPaths { - if strings.Contains(configPath, "$SNAP_REAL_HOME") { - cfgFile = filepath.Join(os.ExpandEnv(configPath), "config.yml") - break - } - } - } - - if err := os.MkdirAll(filepath.Dir(cfgFile), os.FileMode(0700)); err != nil { - return err - } - - if err := viper.WriteConfigAs(cfgFile); err != nil { - return err - } - - output.Debugf("generated default config at %s", cfgFile) - return nil -} diff --git a/cmd/root_test.go b/cmd/root_test.go index 52c6af5..fb29e1c 100644 --- a/cmd/root_test.go +++ b/cmd/root_test.go @@ -4,7 +4,7 @@ import ( "os" "testing" - "github.com/deviceinsight/kafkactl/internal/env" + "github.com/deviceinsight/kafkactl/internal/global" "github.com/deviceinsight/kafkactl/testutil" "github.com/spf13/viper" @@ -42,29 +42,29 @@ func TestEnvironmentVariableLoadingAliases(t *testing.T) { testutil.StartUnitTest(t) - _ = os.Setenv(env.RequestTimeout, "30") - _ = os.Setenv(env.Brokers, "broker1:9092 broker2:9092") - _ = os.Setenv(env.TLSEnabled, "true") - _ = os.Setenv(env.TLSCa, "my-ca") - _ = os.Setenv(env.TLSCert, "my-cert") - _ = os.Setenv(env.TLSCertKey, "my-cert-key") - _ = os.Setenv(env.TLSInsecure, "true") - _ = os.Setenv(env.SaslEnabled, "true") - _ = os.Setenv(env.SaslUsername, "user") - _ = os.Setenv(env.SaslPassword, "pass") - _ = os.Setenv(env.SaslMechanism, "scram-sha512") - _ = os.Setenv(env.ClientID, "my-client") - _ = os.Setenv(env.KafkaVersion, "2.0.1") - _ = os.Setenv(env.AvroSchemaRegistry, "registry:8888") - _ = os.Setenv(env.AvroJSONCodec, "avro") - _ = os.Setenv(env.ProtobufProtoSetFiles, "/usr/include/protosets/ps1.protoset /usr/lib/ps2.protoset") - _ = os.Setenv(env.ProtobufImportPaths, "/usr/include/protobuf /usr/lib/protobuf") - _ = os.Setenv(env.ProtobufProtoFiles, "message.proto other.proto") - _ = os.Setenv(env.ProducerPartitioner, "hash") - _ = os.Setenv(env.ProducerRequiredAcks, "WaitForAll") - _ = os.Setenv(env.ProducerMaxMessageBytes, "1234") - - for _, key := range env.Variables { + _ = os.Setenv(global.RequestTimeout, "30") + _ = os.Setenv(global.Brokers, "broker1:9092 broker2:9092") + _ = os.Setenv(global.TLSEnabled, "true") + _ = os.Setenv(global.TLSCa, "my-ca") + _ = os.Setenv(global.TLSCert, "my-cert") + _ = os.Setenv(global.TLSCertKey, "my-cert-key") + _ = os.Setenv(global.TLSInsecure, "true") + _ = os.Setenv(global.SaslEnabled, "true") + _ = os.Setenv(global.SaslUsername, "user") + _ = os.Setenv(global.SaslPassword, "pass") + _ = os.Setenv(global.SaslMechanism, "scram-sha512") + _ = os.Setenv(global.ClientID, "my-client") + _ = os.Setenv(global.KafkaVersion, "2.0.1") + _ = os.Setenv(global.AvroSchemaRegistry, "registry:8888") + _ = os.Setenv(global.AvroJSONCodec, "avro") + _ = os.Setenv(global.ProtobufProtoSetFiles, "/usr/include/protosets/ps1.protoset /usr/lib/ps2.protoset") + _ = os.Setenv(global.ProtobufImportPaths, "/usr/include/protobuf /usr/lib/protobuf") + _ = os.Setenv(global.ProtobufProtoFiles, "message.proto other.proto") + _ = os.Setenv(global.ProducerPartitioner, "hash") + _ = os.Setenv(global.ProducerRequiredAcks, "WaitForAll") + _ = os.Setenv(global.ProducerMaxMessageBytes, "1234") + + for _, key := range global.EnvVariables { if os.Getenv(key) == "" { t.Fatalf("missing test case for env variable: %s", key) } diff --git a/cmd/version.go b/cmd/version.go index 7e3a4e6..1b229ef 100644 --- a/cmd/version.go +++ b/cmd/version.go @@ -25,7 +25,7 @@ func newVersionCmd() *cobra.Command { var cmdVersion = &cobra.Command{ Use: "version", Short: "print the version of kafkactl", - Run: func(cmd *cobra.Command, args []string) { + Run: func(_ *cobra.Command, _ []string) { output.Infof("%#v", info{ version: Version, buildTime: BuildTime, diff --git a/internal/broker/broker-operation.go b/internal/broker/broker-operation.go index 711a1c8..beefdcf 100644 --- a/internal/broker/broker-operation.go +++ b/internal/broker/broker-operation.go @@ -200,7 +200,7 @@ func (operation *Operation) DescribeBroker(id int32, flags DescribeBrokerFlags) return nil } -func (operation *Operation) listBrokerIds() ([]string, error) { +func (operation *Operation) listBrokerIDs() ([]string, error) { var ( err error @@ -216,28 +216,28 @@ func (operation *Operation) listBrokerIds() ([]string, error) { return nil, errors.Wrap(err, "failed to create client") } - var brokerIds = make([]string, 0) + var brokerIDs = make([]string, 0) for _, broker := range client.Brokers() { - brokerIds = append(brokerIds, fmt.Sprint(broker.ID())) + brokerIDs = append(brokerIDs, fmt.Sprint(broker.ID())) } - return brokerIds, nil + return brokerIDs, nil } -func CompleteBrokerIds(_ *cobra.Command, args []string, _ string) ([]string, cobra.ShellCompDirective) { +func CompleteBrokerIDs(_ *cobra.Command, args []string, _ string) ([]string, cobra.ShellCompDirective) { if len(args) != 0 { return nil, cobra.ShellCompDirectiveNoFileComp } - brokerIds, err := (&Operation{}).listBrokerIds() + brokerIDs, err := (&Operation{}).listBrokerIDs() if err != nil { return nil, cobra.ShellCompDirectiveError } - return brokerIds, cobra.ShellCompDirectiveNoFileComp + return brokerIDs, cobra.ShellCompDirectiveNoFileComp } func FromYaml(yamlString string) (Broker, error) { diff --git a/internal/common-operation.go b/internal/common-operation.go index 8d8c929..f8e1f3d 100644 --- a/internal/common-operation.go +++ b/internal/common-operation.go @@ -10,6 +10,8 @@ import ( "strings" "time" + "github.com/deviceinsight/kafkactl/internal/global" + "github.com/deviceinsight/kafkactl/internal/helpers/avro" "github.com/IBM/sarama" @@ -84,7 +86,7 @@ type Config struct { func CreateClientContext() (ClientContext, error) { var context ClientContext - context.Name = viper.GetString("current-context") + context.Name = global.GetCurrentContext() if viper.Get("contexts."+context.Name) == nil { return context, errors.Errorf("no context with name %s found", context.Name) diff --git a/internal/global/config.go b/internal/global/config.go new file mode 100644 index 0000000..ba48cd2 --- /dev/null +++ b/internal/global/config.go @@ -0,0 +1,242 @@ +package global + +import ( + "errors" + "os" + "path/filepath" + "strings" + + "github.com/deviceinsight/kafkactl/output" + "github.com/spf13/viper" +) + +type Flags struct { + ConfigFile string + Verbose bool +} + +const defaultContextPrefix = "CONTEXTS_DEFAULT_" +const GoContextKey = "global-config" + +var projectConfigNames = []string{"kafkactl.yml", ".kafkactl.yml"} + +var configPaths = []string{ + "$HOME/.config/kafkactl", + "$HOME/.kafkactl", + "$SNAP_REAL_HOME/.config/kafkactl", + "$SNAP_DATA/kafkactl", + "/etc/kafkactl", +} + +var configInstance *config + +type Config interface { + Flags() *Flags + DefaultPaths() []string + Init() + currentContext() string + setCurrentContext(contextName string) error +} + +func NewConfig() Config { + configInstance = &config{ + flags: Flags{}, + } + return configInstance +} + +func GetCurrentContext() string { + return configInstance.currentContext() +} + +func SetCurrentContext(contextName string) error { + return configInstance.setCurrentContext(contextName) +} + +type config struct { + flags Flags + writableConfig *viper.Viper +} + +func (c *config) Flags() *Flags { + return &c.flags +} + +func (c *config) DefaultPaths() []string { + return configPaths +} + +func (c *config) currentContext() string { + return c.writableConfig.GetString("current-context") +} +func (c *config) setCurrentContext(contextName string) error { + c.writableConfig.Set("current-context", contextName) + return c.writableConfig.WriteConfig() +} + +// Init reads in config file and ENV variables if set. +func (c *config) Init() { + + viper.Reset() + + configFile := resolveProjectConfigFileFromWorkingDir() + + switch { + case c.flags.ConfigFile != "": + configFile = &c.flags.ConfigFile + case os.Getenv("KAFKA_CTL_CONFIG") != "": + envConfig := os.Getenv("KAFKA_CTL_CONFIG") + configFile = &envConfig + } + + if c.flags.Verbose { + output.IoStreams.EnableDebug() + } + + if c.flags.Verbose && os.Getenv("SNAP_NAME") != "" { + output.Debugf("Running snap version %s on %s", os.Getenv("SNAP_VERSION"), os.Getenv("SNAP_ARCH")) + } + + mapEnvVariables() + + if err := c.loadConfig(viper.GetViper(), configFile); err != nil { + if isUnknownError(err) { + output.Failf("Error reading config file: %s (%v)", viper.ConfigFileUsed(), err.Error()) + } + err = generateDefaultConfig() + if err != nil { + output.Failf("Error generating default config file: %v", err.Error()) + } + + // We read generated config now + if err = c.loadConfig(viper.GetViper(), configFile); err != nil { + output.Failf("Error reading config file: %s (%v)", viper.ConfigFileUsed(), err.Error()) + } + } + + if configFile != nil && viper.GetString("current-context") == "" { + // assuming the provided configFile is read-only + c.writableConfig = viper.New() + if err := c.loadConfig(c.writableConfig, nil); err != nil { + output.Fail(err) + } + } else { + c.writableConfig = viper.GetViper() + } +} + +func isUnknownError(err error) bool { + + var configFileNotFoundError viper.ConfigFileNotFoundError + var pathError *os.PathError + isConfigFileNotFoundError := errors.As(err, &configFileNotFoundError) + isOsPathError := errors.As(err, &pathError) + + return !isConfigFileNotFoundError && !isOsPathError +} + +func (c *config) loadConfig(viperInstance *viper.Viper, configFile *string) error { + + if configFile != nil { + viperInstance.SetConfigFile(*configFile) + } else { + for _, path := range configPaths { + viperInstance.AddConfigPath(os.ExpandEnv(path)) + } + viperInstance.SetConfigName("config") + } + + replacer := strings.NewReplacer("-", "_", ".", "_") + viperInstance.SetEnvKeyReplacer(replacer) + + viperInstance.SetConfigType("yml") + viperInstance.AutomaticEnv() // read in environment variables that match + + var err error + if err = viperInstance.ReadInConfig(); err == nil { + output.Debugf("Using config file: %s", viperInstance.ConfigFileUsed()) + } + + return err +} + +func resolveProjectConfigFileFromWorkingDir() *string { + + path, err := os.Getwd() + if err != nil { + output.Debugf("cannot find project config file. unable to get working dir") + return nil + } + + for _, projectConfigName := range projectConfigNames { + _, err = os.Stat(filepath.Join(path, projectConfigName)) + found := true + + for os.IsNotExist(err) { + + // stop when leaving a git repo + if gitDir, statErr := os.Stat(filepath.Join(path, ".git")); statErr == nil && gitDir.IsDir() { + found = false + break + } + + oldPath := path + + if path = filepath.Dir(oldPath); path == oldPath { + output.Debugf("cannot find project config file: %s", projectConfigName) + found = false + break + } + _, err = os.Stat(filepath.Join(path, projectConfigName)) + } + + if found { + configFile := filepath.Join(path, projectConfigName) + return &configFile + } + } + + return nil +} + +func mapEnvVariables() { + for _, short := range EnvVariables { + long := defaultContextPrefix + short + if os.Getenv(short) != "" && os.Getenv(long) == "" { + _ = os.Setenv(long, os.Getenv(short)) + } + } +} + +// generateDefaultConfig generates default config in case there is no config +func generateDefaultConfig() error { + + cfgFile := filepath.Join(os.ExpandEnv(configPaths[0]), "config.yml") + + if os.Getenv("KAFKA_CTL_CONFIG") != "" { + // use config file provided via env + cfgFile = os.Getenv("KAFKA_CTL_CONFIG") + } else if os.Getenv("SNAP_REAL_HOME") != "" { + // use different configFile when running in snap + for _, configPath := range configPaths { + if strings.Contains(configPath, "$SNAP_REAL_HOME") { + cfgFile = filepath.Join(os.ExpandEnv(configPath), "config.yml") + break + } + } + } + + if err := os.MkdirAll(filepath.Dir(cfgFile), os.FileMode(0700)); err != nil { + return err + } + + viper.SetDefault("contexts.default.brokers", []string{"localhost:9092"}) + viper.SetDefault("current-context", "default") + + if err := viper.WriteConfigAs(cfgFile); err != nil { + return err + } + + output.Debugf("generated default config at %s", cfgFile) + return nil +} diff --git a/internal/env/variables.go b/internal/global/env-variables.go similarity index 96% rename from internal/env/variables.go rename to internal/global/env-variables.go index 7946b44..235c817 100644 --- a/internal/env/variables.go +++ b/internal/global/env-variables.go @@ -1,4 +1,4 @@ -package env +package global const ( RequestTimeout = "REQUESTTIMEOUT" @@ -24,7 +24,7 @@ const ( ProducerMaxMessageBytes = "PRODUCER_MAXMESSAGEBYTES" ) -var Variables = []string{ +var EnvVariables = []string{ RequestTimeout, Brokers, TLSEnabled, diff --git a/internal/k8s/k8s-operation.go b/internal/k8s/k8s-operation.go index 7dd6c7b..e4aefae 100644 --- a/internal/k8s/k8s-operation.go +++ b/internal/k8s/k8s-operation.go @@ -4,7 +4,7 @@ import ( "fmt" "strings" - "github.com/deviceinsight/kafkactl/internal/env" + "github.com/deviceinsight/kafkactl/internal/global" "github.com/deviceinsight/kafkactl/internal" "github.com/deviceinsight/kafkactl/output" @@ -174,27 +174,27 @@ func parsePodEnvironment(context internal.ClientContext) []string { var envVariables []string - envVariables = appendStrings(envVariables, env.Brokers, context.Brokers) - envVariables = appendBool(envVariables, env.TLSEnabled, context.TLS.Enabled) - envVariables = appendStringIfDefined(envVariables, env.TLSCa, context.TLS.CA) - envVariables = appendStringIfDefined(envVariables, env.TLSCert, context.TLS.Cert) - envVariables = appendStringIfDefined(envVariables, env.TLSCertKey, context.TLS.CertKey) - envVariables = appendBool(envVariables, env.TLSInsecure, context.TLS.Insecure) - envVariables = appendBool(envVariables, env.SaslEnabled, context.Sasl.Enabled) - envVariables = appendStringIfDefined(envVariables, env.SaslUsername, context.Sasl.Username) - envVariables = appendStringIfDefined(envVariables, env.SaslPassword, context.Sasl.Password) - envVariables = appendStringIfDefined(envVariables, env.SaslMechanism, context.Sasl.Mechanism) - envVariables = appendStringIfDefined(envVariables, env.RequestTimeout, context.RequestTimeout.String()) - envVariables = appendStringIfDefined(envVariables, env.ClientID, context.ClientID) - envVariables = appendStringIfDefined(envVariables, env.KafkaVersion, context.KafkaVersion.String()) - envVariables = appendStringIfDefined(envVariables, env.AvroSchemaRegistry, context.AvroSchemaRegistry) - envVariables = appendStringIfDefined(envVariables, env.AvroJSONCodec, context.AvroJSONCodec.String()) - envVariables = appendStrings(envVariables, env.ProtobufProtoSetFiles, context.Protobuf.ProtosetFiles) - envVariables = appendStrings(envVariables, env.ProtobufImportPaths, context.Protobuf.ProtoImportPaths) - envVariables = appendStrings(envVariables, env.ProtobufProtoFiles, context.Protobuf.ProtoFiles) - envVariables = appendStringIfDefined(envVariables, env.ProducerPartitioner, context.Producer.Partitioner) - envVariables = appendStringIfDefined(envVariables, env.ProducerRequiredAcks, context.Producer.RequiredAcks) - envVariables = appendIntIfGreaterZero(envVariables, env.ProducerMaxMessageBytes, context.Producer.MaxMessageBytes) + envVariables = appendStrings(envVariables, global.Brokers, context.Brokers) + envVariables = appendBool(envVariables, global.TLSEnabled, context.TLS.Enabled) + envVariables = appendStringIfDefined(envVariables, global.TLSCa, context.TLS.CA) + envVariables = appendStringIfDefined(envVariables, global.TLSCert, context.TLS.Cert) + envVariables = appendStringIfDefined(envVariables, global.TLSCertKey, context.TLS.CertKey) + envVariables = appendBool(envVariables, global.TLSInsecure, context.TLS.Insecure) + envVariables = appendBool(envVariables, global.SaslEnabled, context.Sasl.Enabled) + envVariables = appendStringIfDefined(envVariables, global.SaslUsername, context.Sasl.Username) + envVariables = appendStringIfDefined(envVariables, global.SaslPassword, context.Sasl.Password) + envVariables = appendStringIfDefined(envVariables, global.SaslMechanism, context.Sasl.Mechanism) + envVariables = appendStringIfDefined(envVariables, global.RequestTimeout, context.RequestTimeout.String()) + envVariables = appendStringIfDefined(envVariables, global.ClientID, context.ClientID) + envVariables = appendStringIfDefined(envVariables, global.KafkaVersion, context.KafkaVersion.String()) + envVariables = appendStringIfDefined(envVariables, global.AvroSchemaRegistry, context.AvroSchemaRegistry) + envVariables = appendStringIfDefined(envVariables, global.AvroJSONCodec, context.AvroJSONCodec.String()) + envVariables = appendStrings(envVariables, global.ProtobufProtoSetFiles, context.Protobuf.ProtosetFiles) + envVariables = appendStrings(envVariables, global.ProtobufImportPaths, context.Protobuf.ProtoImportPaths) + envVariables = appendStrings(envVariables, global.ProtobufProtoFiles, context.Protobuf.ProtoFiles) + envVariables = appendStringIfDefined(envVariables, global.ProducerPartitioner, context.Producer.Partitioner) + envVariables = appendStringIfDefined(envVariables, global.ProducerRequiredAcks, context.Producer.RequiredAcks) + envVariables = appendIntIfGreaterZero(envVariables, global.ProducerMaxMessageBytes, context.Producer.MaxMessageBytes) return envVariables } diff --git a/internal/k8s/k8s-operation_test.go b/internal/k8s/k8s-operation_test.go index b013d2c..0bb5089 100644 --- a/internal/k8s/k8s-operation_test.go +++ b/internal/k8s/k8s-operation_test.go @@ -5,11 +5,12 @@ import ( "testing" "time" + "github.com/deviceinsight/kafkactl/internal/global" + "github.com/deviceinsight/kafkactl/internal/helpers/avro" "github.com/IBM/sarama" "github.com/deviceinsight/kafkactl/internal" - "github.com/deviceinsight/kafkactl/internal/env" "github.com/deviceinsight/kafkactl/internal/k8s" "github.com/deviceinsight/kafkactl/testutil" ) @@ -48,31 +49,31 @@ func TestAllAvailableEnvironmentVariablesAreParsed(t *testing.T) { envMap[strings.TrimSuffix(parts[0], "=")] = parts[1] } - for _, key := range env.Variables { + for _, key := range global.EnvVariables { if _, found := envMap[key]; !found { t.Fatalf("env variable not found in parsed environment: %s", key) } } - testutil.AssertEquals(t, "broker1:9092 broker2:9092", envMap[env.Brokers]) - testutil.AssertEquals(t, "30s", envMap[env.RequestTimeout]) - testutil.AssertEquals(t, "true", envMap[env.TLSEnabled]) - testutil.AssertEquals(t, "my-ca", envMap[env.TLSCa]) - testutil.AssertEquals(t, "my-cert", envMap[env.TLSCert]) - testutil.AssertEquals(t, "my-cert-key", envMap[env.TLSCertKey]) - testutil.AssertEquals(t, "true", envMap[env.TLSInsecure]) - testutil.AssertEquals(t, "true", envMap[env.SaslEnabled]) - testutil.AssertEquals(t, "user", envMap[env.SaslUsername]) - testutil.AssertEquals(t, "pass", envMap[env.SaslPassword]) - testutil.AssertEquals(t, "scram-sha512", envMap[env.SaslMechanism]) - testutil.AssertEquals(t, "my-client", envMap[env.ClientID]) - testutil.AssertEquals(t, "2.0.1", envMap[env.KafkaVersion]) - testutil.AssertEquals(t, "registry:8888", envMap[env.AvroSchemaRegistry]) - testutil.AssertEquals(t, "avro", envMap[env.AvroJSONCodec]) - testutil.AssertEquals(t, "/usr/include/protosets/ps1.protoset /usr/lib/ps2.protoset", envMap[env.ProtobufProtoSetFiles]) - testutil.AssertEquals(t, "/usr/include/protobuf /usr/lib/protobuf", envMap[env.ProtobufImportPaths]) - testutil.AssertEquals(t, "message.proto other.proto", envMap[env.ProtobufProtoFiles]) - testutil.AssertEquals(t, "hash", envMap[env.ProducerPartitioner]) - testutil.AssertEquals(t, "WaitForAll", envMap[env.ProducerRequiredAcks]) - testutil.AssertEquals(t, "1234", envMap[env.ProducerMaxMessageBytes]) + testutil.AssertEquals(t, "broker1:9092 broker2:9092", envMap[global.Brokers]) + testutil.AssertEquals(t, "30s", envMap[global.RequestTimeout]) + testutil.AssertEquals(t, "true", envMap[global.TLSEnabled]) + testutil.AssertEquals(t, "my-ca", envMap[global.TLSCa]) + testutil.AssertEquals(t, "my-cert", envMap[global.TLSCert]) + testutil.AssertEquals(t, "my-cert-key", envMap[global.TLSCertKey]) + testutil.AssertEquals(t, "true", envMap[global.TLSInsecure]) + testutil.AssertEquals(t, "true", envMap[global.SaslEnabled]) + testutil.AssertEquals(t, "user", envMap[global.SaslUsername]) + testutil.AssertEquals(t, "pass", envMap[global.SaslPassword]) + testutil.AssertEquals(t, "scram-sha512", envMap[global.SaslMechanism]) + testutil.AssertEquals(t, "my-client", envMap[global.ClientID]) + testutil.AssertEquals(t, "2.0.1", envMap[global.KafkaVersion]) + testutil.AssertEquals(t, "registry:8888", envMap[global.AvroSchemaRegistry]) + testutil.AssertEquals(t, "avro", envMap[global.AvroJSONCodec]) + testutil.AssertEquals(t, "/usr/include/protosets/ps1.protoset /usr/lib/ps2.protoset", envMap[global.ProtobufProtoSetFiles]) + testutil.AssertEquals(t, "/usr/include/protobuf /usr/lib/protobuf", envMap[global.ProtobufImportPaths]) + testutil.AssertEquals(t, "message.proto other.proto", envMap[global.ProtobufProtoFiles]) + testutil.AssertEquals(t, "hash", envMap[global.ProducerPartitioner]) + testutil.AssertEquals(t, "WaitForAll", envMap[global.ProducerRequiredAcks]) + testutil.AssertEquals(t, "1234", envMap[global.ProducerMaxMessageBytes]) } diff --git a/internal/partition/partition-operation.go b/internal/partition/partition-operation.go index aaa206f..da159b8 100644 --- a/internal/partition/partition-operation.go +++ b/internal/partition/partition-operation.go @@ -236,7 +236,7 @@ func readCurrentReplicas(client *sarama.Client, topic string) ([][]int32, error) return replicaAssignment, nil } -func CompletePartitionIds(_ *cobra.Command, args []string, _ string) ([]string, cobra.ShellCompDirective) { +func CompletePartitionIDs(_ *cobra.Command, args []string, _ string) ([]string, cobra.ShellCompDirective) { if len(args) != 1 { return nil, cobra.ShellCompDirectiveNoFileComp diff --git a/internal/topic/topic-operation.go b/internal/topic/topic-operation.go index ca0c37f..c38008b 100644 --- a/internal/topic/topic-operation.go +++ b/internal/topic/topic-operation.go @@ -537,30 +537,30 @@ func getTargetReplicas(currentReplicas []int32, brokerReplicaCount map[int32]int brokerReplicaCount[lastReplica]-- } - var unusedBrokerIds []int32 + var unusedBrokerIDs []int32 if len(replicas) < int(targetReplicationFactor) { for brokerID := range brokerReplicaCount { if !util.ContainsInt32(replicas, brokerID) { - unusedBrokerIds = append(unusedBrokerIds, brokerID) + unusedBrokerIDs = append(unusedBrokerIDs, brokerID) } } - if len(unusedBrokerIds) < (int(targetReplicationFactor) - len(replicas)) { + if len(unusedBrokerIDs) < (int(targetReplicationFactor) - len(replicas)) { return nil, errors.New("not enough brokers") } } for len(replicas) < int(targetReplicationFactor) { - sort.Slice(unusedBrokerIds, func(i, j int) bool { - brokerI := unusedBrokerIds[i] - brokerJ := unusedBrokerIds[j] + sort.Slice(unusedBrokerIDs, func(i, j int) bool { + brokerI := unusedBrokerIDs[i] + brokerJ := unusedBrokerIDs[j] return brokerReplicaCount[brokerI] < brokerReplicaCount[brokerJ] || (brokerReplicaCount[brokerI] == brokerReplicaCount[brokerJ] && brokerI > brokerJ) }) - replicas = append(replicas, unusedBrokerIds[0]) - brokerReplicaCount[unusedBrokerIds[0]]++ - unusedBrokerIds = unusedBrokerIds[1:] + replicas = append(replicas, unusedBrokerIDs[0]) + brokerReplicaCount[unusedBrokerIDs[0]]++ + unusedBrokerIDs = unusedBrokerIDs[1:] } return replicas, nil diff --git a/output/output.go b/output/output.go index 4f4fbd8..2f59d91 100644 --- a/output/output.go +++ b/output/output.go @@ -26,6 +26,11 @@ var Fail = func(err error) { os.Exit(1) } +var Failf = func(msg string, args ...interface{}) { + _, _ = fmt.Fprintf(IoStreams.ErrOut, msg+"\n", args...) + os.Exit(1) +} + func Warnf(msg string, args ...interface{}) { _, _ = fmt.Fprintf(IoStreams.ErrOut, msg+"\n", args...) } diff --git a/testutil/helpers.go b/testutil/helpers.go index b42a1fa..c4646c6 100644 --- a/testutil/helpers.go +++ b/testutil/helpers.go @@ -63,7 +63,7 @@ func VerifyTopicExists(t *testing.T, topic string) { kafkaCtl := CreateKafkaCtlCommand() - findTopic := func(attempt uint) error { + findTopic := func(_ uint) error { _, err := kafkaCtl.Execute("get", "topics", "-o", "compact") if err != nil { @@ -139,7 +139,7 @@ func VerifyGroupExists(t *testing.T, group string) { kafkaCtl := CreateKafkaCtlCommand() - findConsumerGroup := func(attempt uint) error { + findConsumerGroup := func(_ uint) error { _, err := kafkaCtl.Execute("get", "cg", "-o", "compact") if err != nil { @@ -173,7 +173,7 @@ func VerifyConsumerGroupOffset(t *testing.T, group, topic string, expectedConsum consumerOffsetRegex, _ := regexp.Compile(`consumerOffset: (\d)`) - verifyConsumerOffset := func(attempt uint) error { + verifyConsumerOffset := func(_ uint) error { _, err := kafkaCtl.Execute("describe", "cg", group, "--topic", topic, "-o", "yaml") if err != nil { @@ -209,7 +209,7 @@ func VerifyTopicNotInConsumerGroup(t *testing.T, group, topic string) { emptyTopicsRegex, _ := regexp.Compile(`topics: \[]`) - verifyTopicNotInGroup := func(attempt uint) error { + verifyTopicNotInGroup := func(_ uint) error { _, err := kafkaCtl.Execute("describe", "cg", group, "--topic", topic, "-o", "yaml") if err != nil { diff --git a/testutil/test_util.go b/testutil/test_util.go index ba05d37..1cfa160 100644 --- a/testutil/test_util.go +++ b/testutil/test_util.go @@ -14,7 +14,7 @@ import ( "testing" "time" - "github.com/deviceinsight/kafkactl/internal/env" + "github.com/deviceinsight/kafkactl/internal/global" "github.com/IBM/sarama" "github.com/deviceinsight/kafkactl/cmd" @@ -69,7 +69,7 @@ func init() { panic(err) } - for _, variable := range env.Variables { + for _, variable := range global.EnvVariables { if err := os.Setenv(variable, ""); err != nil { panic(err) } From 426c91b03a54c4ee62ad9cd6aa086a44e6e33559 Mon Sep 17 00:00:00 2001 From: Dirk Wilden Date: Tue, 5 Mar 2024 08:06:01 +0100 Subject: [PATCH 2/3] document project config files --- README.adoc | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/README.adoc b/README.adoc index c46f065..9a12f10 100644 --- a/README.adoc +++ b/README.adoc @@ -163,14 +163,16 @@ contexts: # optional: isolationLevel (defaults to ReadCommitted) isolationLevel: ReadUncommitted +# optional for project config files current-context: default ---- +[#_config_file_read_order] The config file location is resolved by . checking for a provided commandline argument: `--config-file=$PATH_TO_CONFIG` . evaluating the environment variable: `export KAFKA_CTL_CONFIG=$PATH_TO_CONFIG` -. checking for a config file in the working directory i.e. `$PWD/kafkactl.yml` +. checking for a project config file in the working directory (see <<_project_config_files>>) . as default the config file is looked up from one of the following locations: ** `$HOME/.config/kafkactl/config.yml` ** `$HOME/.kafkactl/config.yml` @@ -178,6 +180,31 @@ The config file location is resolved by ** `$SNAP_DATA/kafkactl/config.yml` ** `/etc/kafkactl/config.yml` +[#_project_config_files] +==== Project config files + +In addition to the config file locations above, _kafkactl_ allows to create a config file on project level. +A project config file is meant to be placed at the root level of a git repo and declares the kafka configuration +for this repository/project. + +In order to identify the config file as belonging to _kafkactl_ the following names can be used: + +* `kafkactl.yml` +* `.kafkactl.yml` + +During initialization _kafkactl_ starts from the current working directory and recursively looks for a project level +config file. The recursive lookup ends at the boundary of a git repository (i.e. if a `.git` folder is found). +This way, _kafkactl_ can be used conveniently anywhere in the git repository. + +Additionally, project config files have a special feature to use them read-only. Topically, if you configure more than +one context in a config file, and you switch the context with `kafkactl config use-context xy` this will lead to a write +operation on the config file to save the _current context_. + +In order to avoid this for project config files, one can just omit the `current-context` parameter from the config file. +In this case _kafkactl_ will delegate read and write operations for the _current context_ to the next configuration file +according to <<_config_file_read_order, the config file read order>>. + + === Auto completion ==== bash From a86ac6fd291fead9681a36413f889d99c3dba26b Mon Sep 17 00:00:00 2001 From: Dirk Wilden Date: Tue, 5 Mar 2024 08:09:17 +0100 Subject: [PATCH 3/3] add changelog entry for project level configs --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 70bc80e..21da1f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added +- [#190](https://github.com/deviceinsight/kafkactl/pull/190) Improve handling of project config files + ## 4.0.0 - 2024-01-18 ### Added