Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

delete topic #163

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,18 @@ The `rebalance` subcommand, on the other hand, performs a rebalance for **all**

See the [rebalancing](#rebalancing) section below for more information on rebalancing.

#### delete

```
topicctl delete [flags] [operation]
```

The `delete` subcommand deletes a particular resource type in the cluster.
Currently, the following operations are supported:
| Subcommand | Description |
| --------- | ----------- |
| `delete topic [topic]` | Deletes a single topic in the cluster |

#### repl

```
Expand All @@ -199,6 +211,8 @@ topicctl repl [flags]
The `repl` subcommand starts up a shell that allows running the `get` and `tail`
subcommands interactively.

By default, `repl` is in read-only mode. Disable this behaviour with: `--read-only-enabled=false`

#### reset-offsets

```
Expand Down
62 changes: 62 additions & 0 deletions cmd/topicctl/subcmd/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package subcmd

import (
"context"
"strings"

"github.com/aws/aws-sdk-go/aws/session"
"github.com/segmentio/topicctl/pkg/cli"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

var deleteCmd = &cobra.Command{
Use: "delete [resource type]",
Short: "delete instances of a particular type",
Long: strings.Join(
[]string{
"Deletes instances of a particular type.",
},
"\n",
),
PersistentPreRunE: deletePreRun,
}

type deleteCmdConfig struct {
shared sharedOptions
}

var deleteConfig deleteCmdConfig

func init() {
addSharedFlags(deleteCmd, &deleteConfig.shared)
deleteCmd.AddCommand(
deleteTopicCmd(),
)
RootCmd.AddCommand(deleteCmd)
}

func deletePreRun(cmd *cobra.Command, args []string) error {
return deleteConfig.shared.validate()
}

func deleteTopicCmd() *cobra.Command {
return &cobra.Command{
Use: "topic [topic name]",
Short: "Delete a topic",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
sess := session.Must(session.NewSession())

adminClient, err := deleteConfig.shared.getAdminClient(ctx, sess, false)
if err != nil {
return err
}
defer adminClient.Close()

cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner)
return cliRunner.DeleteTopic(ctx, args[0])
},
}
}
15 changes: 13 additions & 2 deletions cmd/topicctl/subcmd/repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,24 @@ var replCmd = &cobra.Command{
RunE: replRun,
}

type replCmdOptions struct {
readOnly bool
}

type replCmdConfig struct {
shared sharedOptions
options replCmdOptions
shared sharedOptions
}

var replConfig replCmdConfig

func init() {
replCmd.Flags().BoolVar(
&replConfig.options.readOnly,
"read-only-enabled",
true,
"Use read only mode")

addSharedFlags(replCmd, &replConfig.shared)
RootCmd.AddCommand(replCmd)
}
Expand All @@ -34,7 +45,7 @@ func replRun(cmd *cobra.Command, args []string) error {
ctx := context.Background()
sess := session.Must(session.NewSession())

adminClient, err := replConfig.shared.getAdminClient(ctx, sess, true)
adminClient, err := replConfig.shared.getAdminClient(ctx, sess, replConfig.options.readOnly)
if err != nil {
return err
}
Expand Down
25 changes: 25 additions & 0 deletions pkg/admin/brokerclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,31 @@ func (c *BrokerAdminClient) AssignPartitions(
return err
}

// DeleteTopic deletes a topic in the cluster.
func (c *BrokerAdminClient) DeleteTopic(ctx context.Context, topic string) error {
if c.config.ReadOnly {
return errors.New("Cannot delete topics in read-only mode")
}

req := &kafka.DeleteTopicsRequest{
Topics: []string{topic},
}
log.Debugf("DeleteTopics request: %+v", req)

resp, err := c.client.DeleteTopics(ctx, req)
log.Debugf("DeleteTopics response: %+v (%+v)", resp, err)

if err != nil {
return err
}

if err, ok := resp.Errors[topic]; ok {
return err
}

return nil
}

// AddPartitions extends a topic by adding one or more new partitions to it.
func (c *BrokerAdminClient) AddPartitions(
ctx context.Context,
Expand Down
64 changes: 64 additions & 0 deletions pkg/admin/brokerclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,70 @@ func TestBrokerClientAddPartitions(t *testing.T) {
assert.Equal(t, []int{6, 1}, topicInfo.Partitions[4].Replicas)
}

func TestBrokerDeleteTopic(t *testing.T) {
if !util.CanTestBrokerAdmin() {
t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set")
}

ctx := context.Background()
client, err := NewBrokerAdminClient(
ctx,
BrokerAdminClientConfig{
ConnectorConfig: ConnectorConfig{
BrokerAddr: util.TestKafkaAddr(),
},
},
)
require.NoError(t, err)

topicName := util.RandomString("topic-delete-", 6)
err = client.CreateTopic(
ctx,
kafka.TopicConfig{
Topic: topicName,
NumPartitions: -1,
ReplicationFactor: -1,
ReplicaAssignments: []kafka.ReplicaAssignment{
{
Partition: 0,
Replicas: []int{1, 2},
},
{
Partition: 1,
Replicas: []int{2, 3},
},
{
Partition: 2,
Replicas: []int{3, 4},
},
},
ConfigEntries: []kafka.ConfigEntry{
{
ConfigName: "flush.ms",
ConfigValue: "2000",
},
{
ConfigName: "retention.ms",
ConfigValue: "10000000",
},
},
},
)
require.NoError(t, err)
util.RetryUntil(t, 5*time.Second, func() error {
_, err := client.GetTopic(ctx, topicName, true)
return err
})

err = client.DeleteTopic(ctx, topicName)
require.NoError(t, err)

time.Sleep(time.Second * 10)

_, err = client.GetTopic(ctx, topicName, false)
require.Error(t, err)
}

func TestBrokerClientAlterAssignments(t *testing.T) {
if !util.CanTestBrokerAdmin() {
t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set")
Expand Down
3 changes: 3 additions & 0 deletions pkg/admin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ type Client interface {
detailed bool,
) (TopicInfo, error)

// DeleteTopic deletes a single topic in the cluster.
DeleteTopic(ctx context.Context, topic string) error

// GetACLs gets full information about each ACL in the cluster.
GetACLs(
ctx context.Context,
Expand Down
23 changes: 23 additions & 0 deletions pkg/admin/zkclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,29 @@ func (c *ZKAdminClient) CreateTopic(
return err
}

func (c *ZKAdminClient) DeleteTopic(ctx context.Context, topic string) error {
if c.readOnly {
return errors.New("Cannot delete topics in read-only mode")
}

req := kafka.DeleteTopicsRequest{
Topics: []string{topic},
}
log.Debugf("DeleteTopics request: %+v", req)

resp, err := c.Connector.KafkaClient.DeleteTopics(ctx, &req)
log.Debugf("DeleteTopics response: %+v (%+v)", resp, err)
if err != nil {
return err
}

if err, ok := resp.Errors[topic]; ok {
return err
}

return nil
}

// AssignPartitions notifies the cluster to begin a partition reassignment.
// This should only be used for existing partitions; to create new partitions,
// use the AddPartitions method.
Expand Down
34 changes: 34 additions & 0 deletions pkg/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,40 @@ func (c *CLIRunner) GetTopics(ctx context.Context, full bool) error {
return nil
}

// DeleteTopic deletes a single topic.
func (c *CLIRunner) DeleteTopic(ctx context.Context, topic string) error {
c.printer("Checking if topic %s exists...", topic)
c.startSpinner()
// First check that topic exists
_, err := c.adminClient.GetTopic(ctx, topic, false)
if err != nil {
c.stopSpinner()
return fmt.Errorf("Error fetching topic info: %+v", err)
}
c.stopSpinner()
c.printer("Topic %s exists in the cluster!", topic)

confirm, err := apply.Confirm(fmt.Sprintf("Delete topic \"%s\"", topic), false)
if err != nil {
return err
}

if !confirm {
return nil
}

c.startSpinner()
err = c.adminClient.DeleteTopic(ctx, topic)
c.stopSpinner()
if err != nil {
return err
}

c.printer("Topic %s successfully deleted", topic)

return nil
}

// GerUsers fetches the details of each user in the cluster and prints out a table of them.
func (c *CLIRunner) GetUsers(ctx context.Context, names []string) error {
c.startSpinner()
Expand Down
38 changes: 38 additions & 0 deletions pkg/cli/repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ var (
Text: "get",
Description: "Get information about one or more resources in the cluster",
},
{
Text: "delete",
Description: "Delete a resource in the cluster",
},
{
Text: "tail",
Description: "Tail all messages in a topic",
Expand All @@ -38,6 +42,13 @@ var (
},
}

deleteSuggestions = []prompt.Suggest{
{
Text: "topic",
Description: "Delete a single topic",
},
}

getSuggestions = []prompt.Suggest{
{
Text: "acls",
Expand Down Expand Up @@ -227,6 +238,25 @@ func (r *Repl) executor(in string) {
case "exit":
fmt.Println("Bye!")
os.Exit(0)
case "delete":
if len(command.args) == 1 {
log.Error("Unrecognized input. Run 'help' for details on available commands.")
return
}

switch command.args[1] {
case "topic":
if err := command.checkArgs(3, 3, nil); err != nil {
log.Errorf("Error: %+v", err)
return
}

topicName := command.args[2]
if err := r.cliRunner.DeleteTopic(ctx, topicName); err != nil {
log.Errorf("Error: %+v", err)
return
}
}
case "get":
if len(command.args) == 1 {
log.Error("Unrecognized input. Run 'help' for details on available commands.")
Expand Down Expand Up @@ -431,6 +461,10 @@ func (r *Repl) completer(doc prompt.Document) []prompt.Suggest {
suggestions = commandSuggestions
} else if len(words) == 2 && words[0] == "get" {
suggestions = getSuggestions
} else if len(words) == 2 && words[0] == "delete" {
suggestions = deleteSuggestions
} else if len(words) == 3 && words[0] == "delete" && (words[1] == "topic") {
suggestions = r.topicSuggestions
} else if len(words) == 3 && words[0] == "get" &&
(words[1] == "balance" ||
words[1] == "lags" ||
Expand Down Expand Up @@ -518,6 +552,10 @@ func helpTable() string {
" get topics",
"Get all topics",
},
{
" delete topic",
"Deletes a single topic",
},
{
" get users",
"Get all users",
Expand Down
Loading