diff --git a/README.md b/README.md index 7e77dcc..e643804 100644 --- a/README.md +++ b/README.md @@ -190,6 +190,18 @@ The `rebalance` subcommand, on the other hand, performs a rebalance for **all** See the [rebalancing](#rebalancing) section below for more information on rebalancing. +#### delete + +``` +topicctl delete [flags] [operation] +``` + +The `delete` subcommand deletes a particular resource type in the cluster. +Currently, the following operations are supported: +| Subcommand | Description | +| --------- | ----------- | +| `delete topic [topic]` | Deletes a single topic in the cluster | + #### repl ``` @@ -199,6 +211,8 @@ topicctl repl [flags] The `repl` subcommand starts up a shell that allows running the `get` and `tail` subcommands interactively. +By default, `repl` is in read-only mode. Disable this behaviour with: `--read-only-enabled=false` + #### reset-offsets ``` diff --git a/cmd/topicctl/subcmd/delete.go b/cmd/topicctl/subcmd/delete.go new file mode 100644 index 0000000..f479c40 --- /dev/null +++ b/cmd/topicctl/subcmd/delete.go @@ -0,0 +1,62 @@ +package subcmd + +import ( + "context" + "strings" + + "github.com/aws/aws-sdk-go/aws/session" + "github.com/segmentio/topicctl/pkg/cli" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +var deleteCmd = &cobra.Command{ + Use: "delete [resource type]", + Short: "delete instances of a particular type", + Long: strings.Join( + []string{ + "Deletes instances of a particular type.", + }, + "\n", + ), + PersistentPreRunE: deletePreRun, +} + +type deleteCmdConfig struct { + shared sharedOptions +} + +var deleteConfig deleteCmdConfig + +func init() { + addSharedFlags(deleteCmd, &deleteConfig.shared) + deleteCmd.AddCommand( + deleteTopicCmd(), + ) + RootCmd.AddCommand(deleteCmd) +} + +func deletePreRun(cmd *cobra.Command, args []string) error { + return deleteConfig.shared.validate() +} + +func deleteTopicCmd() *cobra.Command { + return &cobra.Command{ + Use: "topic [topic name]", + Short: "Delete a topic", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + ctx := context.Background() + sess := session.Must(session.NewSession()) + + adminClient, err := deleteConfig.shared.getAdminClient(ctx, sess, false) + if err != nil { + return err + } + defer adminClient.Close() + + cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner) + return cliRunner.DeleteTopic(ctx, args[0]) + }, + } +} diff --git a/cmd/topicctl/subcmd/repl.go b/cmd/topicctl/subcmd/repl.go index c3f8aaf..c097808 100644 --- a/cmd/topicctl/subcmd/repl.go +++ b/cmd/topicctl/subcmd/repl.go @@ -15,13 +15,24 @@ var replCmd = &cobra.Command{ RunE: replRun, } +type replCmdOptions struct { + readOnly bool +} + type replCmdConfig struct { - shared sharedOptions + options replCmdOptions + shared sharedOptions } var replConfig replCmdConfig func init() { + replCmd.Flags().BoolVar( + &replConfig.options.readOnly, + "read-only-enabled", + true, + "Use read only mode") + addSharedFlags(replCmd, &replConfig.shared) RootCmd.AddCommand(replCmd) } @@ -34,7 +45,7 @@ func replRun(cmd *cobra.Command, args []string) error { ctx := context.Background() sess := session.Must(session.NewSession()) - adminClient, err := replConfig.shared.getAdminClient(ctx, sess, true) + adminClient, err := replConfig.shared.getAdminClient(ctx, sess, replConfig.options.readOnly) if err != nil { return err } diff --git a/pkg/admin/brokerclient.go b/pkg/admin/brokerclient.go index 48c4bd3..4ad1f16 100644 --- a/pkg/admin/brokerclient.go +++ b/pkg/admin/brokerclient.go @@ -599,6 +599,31 @@ func (c *BrokerAdminClient) AssignPartitions( return err } +// DeleteTopic deletes a topic in the cluster. +func (c *BrokerAdminClient) DeleteTopic(ctx context.Context, topic string) error { + if c.config.ReadOnly { + return errors.New("Cannot delete topics in read-only mode") + } + + req := &kafka.DeleteTopicsRequest{ + Topics: []string{topic}, + } + log.Debugf("DeleteTopics request: %+v", req) + + resp, err := c.client.DeleteTopics(ctx, req) + log.Debugf("DeleteTopics response: %+v (%+v)", resp, err) + + if err != nil { + return err + } + + if err, ok := resp.Errors[topic]; ok { + return err + } + + return nil +} + // AddPartitions extends a topic by adding one or more new partitions to it. func (c *BrokerAdminClient) AddPartitions( ctx context.Context, diff --git a/pkg/admin/brokerclient_test.go b/pkg/admin/brokerclient_test.go index 542d225..9d62cc1 100644 --- a/pkg/admin/brokerclient_test.go +++ b/pkg/admin/brokerclient_test.go @@ -388,6 +388,70 @@ func TestBrokerClientAddPartitions(t *testing.T) { assert.Equal(t, []int{6, 1}, topicInfo.Partitions[4].Replicas) } +func TestBrokerDeleteTopic(t *testing.T) { + if !util.CanTestBrokerAdmin() { + t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set") + } + + ctx := context.Background() + client, err := NewBrokerAdminClient( + ctx, + BrokerAdminClientConfig{ + ConnectorConfig: ConnectorConfig{ + BrokerAddr: util.TestKafkaAddr(), + }, + }, + ) + require.NoError(t, err) + + topicName := util.RandomString("topic-delete-", 6) + err = client.CreateTopic( + ctx, + kafka.TopicConfig{ + Topic: topicName, + NumPartitions: -1, + ReplicationFactor: -1, + ReplicaAssignments: []kafka.ReplicaAssignment{ + { + Partition: 0, + Replicas: []int{1, 2}, + }, + { + Partition: 1, + Replicas: []int{2, 3}, + }, + { + Partition: 2, + Replicas: []int{3, 4}, + }, + }, + ConfigEntries: []kafka.ConfigEntry{ + { + ConfigName: "flush.ms", + ConfigValue: "2000", + }, + { + ConfigName: "retention.ms", + ConfigValue: "10000000", + }, + }, + }, + ) + require.NoError(t, err) + util.RetryUntil(t, 5*time.Second, func() error { + _, err := client.GetTopic(ctx, topicName, true) + return err + }) + + err = client.DeleteTopic(ctx, topicName) + require.NoError(t, err) + + time.Sleep(time.Second * 10) + + _, err = client.GetTopic(ctx, topicName, false) + require.Error(t, err) +} + func TestBrokerClientAlterAssignments(t *testing.T) { if !util.CanTestBrokerAdmin() { t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set") diff --git a/pkg/admin/client.go b/pkg/admin/client.go index f872ad9..bf47d9b 100644 --- a/pkg/admin/client.go +++ b/pkg/admin/client.go @@ -38,6 +38,9 @@ type Client interface { detailed bool, ) (TopicInfo, error) + // DeleteTopic deletes a single topic in the cluster. + DeleteTopic(ctx context.Context, topic string) error + // GetACLs gets full information about each ACL in the cluster. GetACLs( ctx context.Context, diff --git a/pkg/admin/zkclient.go b/pkg/admin/zkclient.go index 8f7f4d9..d608acb 100644 --- a/pkg/admin/zkclient.go +++ b/pkg/admin/zkclient.go @@ -613,6 +613,29 @@ func (c *ZKAdminClient) CreateTopic( return err } +func (c *ZKAdminClient) DeleteTopic(ctx context.Context, topic string) error { + if c.readOnly { + return errors.New("Cannot delete topics in read-only mode") + } + + req := kafka.DeleteTopicsRequest{ + Topics: []string{topic}, + } + log.Debugf("DeleteTopics request: %+v", req) + + resp, err := c.Connector.KafkaClient.DeleteTopics(ctx, &req) + log.Debugf("DeleteTopics response: %+v (%+v)", resp, err) + if err != nil { + return err + } + + if err, ok := resp.Errors[topic]; ok { + return err + } + + return nil +} + // AssignPartitions notifies the cluster to begin a partition reassignment. // This should only be used for existing partitions; to create new partitions, // use the AddPartitions method. diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index 312e286..53dbda4 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -520,6 +520,40 @@ func (c *CLIRunner) GetTopics(ctx context.Context, full bool) error { return nil } +// DeleteTopic deletes a single topic. +func (c *CLIRunner) DeleteTopic(ctx context.Context, topic string) error { + c.printer("Checking if topic %s exists...", topic) + c.startSpinner() + // First check that topic exists + _, err := c.adminClient.GetTopic(ctx, topic, false) + if err != nil { + c.stopSpinner() + return fmt.Errorf("Error fetching topic info: %+v", err) + } + c.stopSpinner() + c.printer("Topic %s exists in the cluster!", topic) + + confirm, err := apply.Confirm(fmt.Sprintf("Delete topic \"%s\"", topic), false) + if err != nil { + return err + } + + if !confirm { + return nil + } + + c.startSpinner() + err = c.adminClient.DeleteTopic(ctx, topic) + c.stopSpinner() + if err != nil { + return err + } + + c.printer("Topic %s successfully deleted", topic) + + return nil +} + // GerUsers fetches the details of each user in the cluster and prints out a table of them. func (c *CLIRunner) GetUsers(ctx context.Context, names []string) error { c.startSpinner() diff --git a/pkg/cli/repl.go b/pkg/cli/repl.go index d1f2682..780ee25 100644 --- a/pkg/cli/repl.go +++ b/pkg/cli/repl.go @@ -24,6 +24,10 @@ var ( Text: "get", Description: "Get information about one or more resources in the cluster", }, + { + Text: "delete", + Description: "Delete a resource in the cluster", + }, { Text: "tail", Description: "Tail all messages in a topic", @@ -38,6 +42,13 @@ var ( }, } + deleteSuggestions = []prompt.Suggest{ + { + Text: "topic", + Description: "Delete a single topic", + }, + } + getSuggestions = []prompt.Suggest{ { Text: "acls", @@ -227,6 +238,25 @@ func (r *Repl) executor(in string) { case "exit": fmt.Println("Bye!") os.Exit(0) + case "delete": + if len(command.args) == 1 { + log.Error("Unrecognized input. Run 'help' for details on available commands.") + return + } + + switch command.args[1] { + case "topic": + if err := command.checkArgs(3, 3, nil); err != nil { + log.Errorf("Error: %+v", err) + return + } + + topicName := command.args[2] + if err := r.cliRunner.DeleteTopic(ctx, topicName); err != nil { + log.Errorf("Error: %+v", err) + return + } + } case "get": if len(command.args) == 1 { log.Error("Unrecognized input. Run 'help' for details on available commands.") @@ -431,6 +461,10 @@ func (r *Repl) completer(doc prompt.Document) []prompt.Suggest { suggestions = commandSuggestions } else if len(words) == 2 && words[0] == "get" { suggestions = getSuggestions + } else if len(words) == 2 && words[0] == "delete" { + suggestions = deleteSuggestions + } else if len(words) == 3 && words[0] == "delete" && (words[1] == "topic") { + suggestions = r.topicSuggestions } else if len(words) == 3 && words[0] == "get" && (words[1] == "balance" || words[1] == "lags" || @@ -518,6 +552,10 @@ func helpTable() string { " get topics", "Get all topics", }, + { + " delete topic", + "Deletes a single topic", + }, { " get users", "Get all users",