From e9e6a731d78bc968ed0585615e7396d8dd15f327 Mon Sep 17 00:00:00 2001 From: Dirk Wilden Date: Wed, 17 Jan 2024 14:52:26 +0100 Subject: [PATCH] add delete records command (fixes #183) --- CHANGELOG.md | 1 + README.md | 9 ++++ cmd/deletion/delete-records.go | 36 +++++++++++++ cmd/deletion/delete-records_test.go | 77 +++++++++++++++++++++++++++ cmd/deletion/delete.go | 3 +- internal/consume/PartitionConsumer.go | 30 +---------- internal/topic/topic-operation.go | 28 ++++++++++ util/parse_offsets.go | 65 ++++++++++++++++++++++ util/parse_offsets_test.go | 66 +++++++++++++++++++++++ 9 files changed, 285 insertions(+), 30 deletions(-) create mode 100644 cmd/deletion/delete-records.go create mode 100644 cmd/deletion/delete-records_test.go create mode 100644 util/parse_offsets.go create mode 100644 util/parse_offsets_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 540d6dd..87cb440 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - [#184](https://github.com/deviceinsight/kafkactl/pull/184) Added option to show default configs when describing topics +- [#183](https://github.com/deviceinsight/kafkactl/issues/183) Add command `delete records` to delete records from topic ## 3.5.1 - 2023-11-10 diff --git a/README.md b/README.md index bc94437..46c85ec 100644 --- a/README.md +++ b/README.md @@ -686,6 +686,15 @@ kafkactl describe consumer-group my-group --topic my-topic kafkactl describe cg my-group ``` +### Delete Records from a topics + +Command to be used to delete records from partition, which have an offset smaller than the provided offset. + +```bash +# delete records with offset < 123 from partition 0 and offset < 456 from partition 1 +kafkactl delete records my-topic --offset 0=123 --offset 1=456 +``` + ### Create consumer groups A consumer-group can be created as follows: diff --git a/cmd/deletion/delete-records.go b/cmd/deletion/delete-records.go new file mode 100644 index 0000000..9bbdbf7 --- /dev/null +++ b/cmd/deletion/delete-records.go @@ -0,0 +1,36 @@ +package deletion + +import ( + "github.com/deviceinsight/kafkactl/cmd/validation" + "github.com/deviceinsight/kafkactl/internal/k8s" + "github.com/deviceinsight/kafkactl/internal/topic" + "github.com/deviceinsight/kafkactl/output" + "github.com/spf13/cobra" +) + +func newDeleteRecordsCmd() *cobra.Command { + + var flags topic.DeleteRecordsFlags + + var cmdDeleteRecords = &cobra.Command{ + Use: "records TOPIC", + Short: "delete a records from a topic", + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, args []string) { + if !k8s.NewOperation().TryRun(cmd, args) { + if err := (&topic.Operation{}).DeleteRecords(args[0], flags); err != nil { + output.Fail(err) + } + } + }, + ValidArgsFunction: topic.CompleteTopicNames, + } + + cmdDeleteRecords.Flags().StringArrayVarP(&flags.Offsets, "offset", "", flags.Offsets, "offsets in format `partition=offset`. records with smaller offset will be deleted.") + + if err := validation.MarkFlagAtLeastOneRequired(cmdDeleteRecords.Flags(), "offset"); err != nil { + panic(err) + } + + return cmdDeleteRecords +} diff --git a/cmd/deletion/delete-records_test.go b/cmd/deletion/delete-records_test.go new file mode 100644 index 0000000..32b26a4 --- /dev/null +++ b/cmd/deletion/delete-records_test.go @@ -0,0 +1,77 @@ +package deletion_test + +import ( + "strings" + "testing" + + "github.com/deviceinsight/kafkactl/testutil" +) + +func TestDeleteRecordsIntegration(t *testing.T) { + + testutil.StartIntegrationTest(t) + + topicName := testutil.CreateTopic(t, "delete-records-", "--partitions", "2") + + testutil.ProduceMessageOnPartition(t, topicName, "key-1", "a", 0, 0) + testutil.ProduceMessageOnPartition(t, topicName, "key-1", "b", 0, 1) + testutil.ProduceMessageOnPartition(t, topicName, "key-2", "c", 1, 0) + testutil.ProduceMessageOnPartition(t, topicName, "key-2", "d", 1, 1) + testutil.ProduceMessageOnPartition(t, topicName, "key-2", "e", 1, 2) + + kafkaCtl := testutil.CreateKafkaCtlCommand() + + // check initial messages + if _, err := kafkaCtl.Execute("consume", topicName, "--from-beginning", "--print-keys", "--exit"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + messages := strings.Split(strings.TrimSpace(kafkaCtl.GetStdOut()), "\n") + + if len(messages) != 5 { + t.Fatalf("expected 5 messages, got %d", len(messages)) + } + + // delete records + if _, err := kafkaCtl.Execute("delete", "records", topicName, "--offset", "0=1", "--offset", "1=2"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + // check messages + if _, err := kafkaCtl.Execute("consume", topicName, "--from-beginning", "--print-keys", "--exit"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + messages = strings.Split(strings.TrimSpace(kafkaCtl.GetStdOut()), "\n") + + if len(messages) != 2 { + t.Fatalf("expected 2 messages, got %d", len(messages)) + } + + testutil.AssertEquals(t, "key-1#b", messages[0]) + testutil.AssertEquals(t, "key-2#e", messages[1]) +} + +func TestDeleteRecordsAutoCompletionIntegration(t *testing.T) { + + testutil.StartIntegrationTest(t) + + prefix := "delete-complete-" + + topicName1 := testutil.CreateTopic(t, prefix+"a") + topicName2 := testutil.CreateTopic(t, prefix+"b") + topicName3 := testutil.CreateTopic(t, prefix+"c") + + kafkaCtl := testutil.CreateKafkaCtlCommand() + kafkaCtl.Verbose = false + + if _, err := kafkaCtl.Execute("__complete", "delete", "records", ""); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + outputLines := strings.Split(strings.TrimSpace(kafkaCtl.GetStdOut()), "\n") + + testutil.AssertContains(t, topicName1, outputLines) + testutil.AssertContains(t, topicName2, outputLines) + testutil.AssertContains(t, topicName3, outputLines) +} diff --git a/cmd/deletion/delete.go b/cmd/deletion/delete.go index ead6d31..c825d58 100644 --- a/cmd/deletion/delete.go +++ b/cmd/deletion/delete.go @@ -8,12 +8,13 @@ func NewDeleteCmd() *cobra.Command { var cmdDelete = &cobra.Command{ Use: "delete", - Short: "delete topics, consumerGroups, consumer-group-offset, acls", + Short: "delete topics, consumerGroups, consumer-group-offset, acls, records", } cmdDelete.AddCommand(newDeleteTopicCmd()) cmdDelete.AddCommand(newDeleteConsumerGroupCmd()) cmdDelete.AddCommand(newDeleteConsumerGroupOffsetCmd()) cmdDelete.AddCommand(newDeleteACLCmd()) + cmdDelete.AddCommand(newDeleteRecordsCmd()) return cmdDelete } diff --git a/internal/consume/PartitionConsumer.go b/internal/consume/PartitionConsumer.go index ae17ae0..f111b70 100644 --- a/internal/consume/PartitionConsumer.go +++ b/internal/consume/PartitionConsumer.go @@ -3,8 +3,6 @@ package consume import ( "context" "math" - "strconv" - "strings" "time" "github.com/IBM/sarama" @@ -190,7 +188,7 @@ func getStartOffset(client *sarama.Client, topic string, flags Flags, currentPar } else if flags.FromBeginning { return (*client).GetOffset(topic, currentPartition, sarama.OffsetOldest) } else if len(flags.Offsets) > 0 { - return extractOffsetForPartition(flags, currentPartition) + return util.ExtractOffsetForPartition(flags.Offsets, currentPartition) } return sarama.OffsetNewest, nil } @@ -213,32 +211,6 @@ func getEndOffset(client *sarama.Client, topic string, flags Flags, currentParti return sarama.OffsetNewest, nil } -func extractOffsetForPartition(flags Flags, currentPartition int32) (int64, error) { - for _, offsetFlag := range flags.Offsets { - offsetParts := strings.Split(offsetFlag, "=") - - if len(offsetParts) == 2 { - - partition, err := strconv.Atoi(offsetParts[0]) - if err != nil { - return ErrOffset, errors.Errorf("unable to parse offset parameter: %s (%v)", offsetFlag, err) - } - - if int32(partition) != currentPartition { - continue - } - - offset, err := strconv.ParseInt(offsetParts[1], 10, 64) - if err != nil { - return ErrOffset, errors.Errorf("unable to parse offset parameter: %s (%v)", offsetFlag, err) - } - - return offset, nil - } - } - return ErrOffset, errors.Errorf("unable to find offset parameter for partition %d: %s", currentPartition, flags.Offsets) -} - func hasExclusiveConditions(flags ...bool) bool { value := 0 for _, flag := range flags { diff --git a/internal/topic/topic-operation.go b/internal/topic/topic-operation.go index 464174e..ca0c37f 100644 --- a/internal/topic/topic-operation.go +++ b/internal/topic/topic-operation.go @@ -63,6 +63,10 @@ type AlterTopicFlags struct { Configs []string } +type DeleteRecordsFlags struct { + Offsets []string +} + type PrintConfigsParam string const ( @@ -670,6 +674,30 @@ func (operation *Operation) GetTopics(flags GetTopicsFlags) error { return nil } +func (operation *Operation) DeleteRecords(topic string, flags DeleteRecordsFlags) error { + + var ( + err error + context internal.ClientContext + admin sarama.ClusterAdmin + ) + + if context, err = internal.CreateClientContext(); err != nil { + return err + } + + if admin, err = internal.CreateClusterAdmin(&context); err != nil { + return errors.Wrap(err, "failed to create cluster admin") + } + + offsets, parseErr := util.ParseOffsets(flags.Offsets) + if parseErr != nil { + return parseErr + } + + return admin.DeleteRecords(topic, offsets) +} + func readTopic(client *sarama.Client, admin *sarama.ClusterAdmin, name string, requestedFields requestedTopicFields) (Topic, error) { var ( err error diff --git a/util/parse_offsets.go b/util/parse_offsets.go new file mode 100644 index 0000000..bdebdc3 --- /dev/null +++ b/util/parse_offsets.go @@ -0,0 +1,65 @@ +package util + +import ( + "math" + "strconv" + "strings" + + "github.com/pkg/errors" +) + +const ErrOffset = math.MinInt64 +const offsetSeparator = "=" + +func ParseOffsets(rawOffsets []string) (map[int32]int64, error) { + + offsets := make(map[int32]int64) + + for _, offsetFlag := range rawOffsets { + offsetParts := strings.Split(offsetFlag, offsetSeparator) + + if len(offsetParts) != 2 { + return nil, errors.Errorf("offset parameter has wrong format: %s %v", offsetFlag, rawOffsets) + } + + partition, err := strconv.Atoi(offsetParts[0]) + if err != nil { + return nil, errors.Errorf("unable to parse offset parameter: %s (%v)", offsetFlag, err) + } + + offset, err := strconv.ParseInt(offsetParts[1], 10, 64) + if err != nil { + return nil, errors.Errorf("unable to parse offset parameter: %s (%v)", offsetFlag, err) + } + + offsets[int32(partition)] = offset + } + + return offsets, nil +} + +func ExtractOffsetForPartition(rawOffsets []string, currentPartition int32) (int64, error) { + for _, offsetFlag := range rawOffsets { + offsetParts := strings.Split(offsetFlag, offsetSeparator) + + if len(offsetParts) == 2 { + + partition, err := strconv.Atoi(offsetParts[0]) + if err != nil { + return ErrOffset, errors.Errorf("unable to parse offset parameter: %s (%v)", offsetFlag, err) + } + + if int32(partition) != currentPartition { + continue + } + + offset, err := strconv.ParseInt(offsetParts[1], 10, 64) + if err != nil { + return ErrOffset, errors.Errorf("unable to parse offset parameter: %s (%v)", offsetFlag, err) + } + + return offset, nil + } + } + return ErrOffset, errors.Errorf("unable to find offset parameter for partition %d: %v", currentPartition, rawOffsets) +} diff --git a/util/parse_offsets_test.go b/util/parse_offsets_test.go new file mode 100644 index 0000000..9eb6b9b --- /dev/null +++ b/util/parse_offsets_test.go @@ -0,0 +1,66 @@ +package util_test + +import ( + "reflect" + "strings" + "testing" + + "github.com/deviceinsight/kafkactl/util" +) + +func TestParseOffsets(t *testing.T) { + + type testCases struct { + description string + input []string + wantOffsets map[int32]int64 + wantErr string + } + + for _, test := range []testCases{ + { + description: "successful_parsing", + input: []string{"1=222", "2=333", "5=444"}, + wantOffsets: map[int32]int64{1: 222, 2: 333, 5: 444}, + }, + { + description: "wrong_separator_fails", + input: []string{"1:222"}, + wantErr: "offset parameter has wrong format: 1:222 [1:222]", + }, + { + description: "partition_not_an_int_fails", + input: []string{"abc=222"}, + wantErr: "parsing \"abc\": invalid syntax", + }, + { + description: "offset_not_an_int_fails", + input: []string{"1=nope"}, + wantErr: "parsing \"nope\": invalid syntax", + }, + } { + t.Run(test.description, func(t *testing.T) { + + offsets, err := util.ParseOffsets(test.input) + + if test.wantErr != "" { + if err == nil { + t.Errorf("want error %q but got nil", test.wantErr) + } + + if !strings.Contains(err.Error(), test.wantErr) { + t.Errorf("want error %q got %q", test.wantErr, err) + } + + return + } + if err != nil { + t.Errorf("doesn't want error but got %s", err) + } + + if eq := reflect.DeepEqual(test.wantOffsets, offsets); !eq { + t.Errorf("want %q got %q", test.wantOffsets, offsets) + } + }) + } +}