Skip to content

Commit

Permalink
add delete records command (fixes #183)
Browse files Browse the repository at this point in the history
  • Loading branch information
d-rk committed Jan 17, 2024
1 parent 0ac3c55 commit e9e6a73
Show file tree
Hide file tree
Showing 9 changed files with 285 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added
- [#184](https://github.com/deviceinsight/kafkactl/pull/184) Added option to show default configs when describing topics
- [#183](https://github.com/deviceinsight/kafkactl/issues/183) Add command `delete records` to delete records from topic

## 3.5.1 - 2023-11-10

Expand Down
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,15 @@ kafkactl describe consumer-group my-group --topic my-topic
kafkactl describe cg my-group
```

### Delete Records from a topics

Command to be used to delete records from partition, which have an offset smaller than the provided offset.

```bash
# delete records with offset < 123 from partition 0 and offset < 456 from partition 1
kafkactl delete records my-topic --offset 0=123 --offset 1=456
```

### Create consumer groups

A consumer-group can be created as follows:
Expand Down
36 changes: 36 additions & 0 deletions cmd/deletion/delete-records.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package deletion

import (
"github.com/deviceinsight/kafkactl/cmd/validation"
"github.com/deviceinsight/kafkactl/internal/k8s"
"github.com/deviceinsight/kafkactl/internal/topic"
"github.com/deviceinsight/kafkactl/output"
"github.com/spf13/cobra"
)

func newDeleteRecordsCmd() *cobra.Command {

var flags topic.DeleteRecordsFlags

var cmdDeleteRecords = &cobra.Command{
Use: "records TOPIC",
Short: "delete a records from a topic",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
if !k8s.NewOperation().TryRun(cmd, args) {
if err := (&topic.Operation{}).DeleteRecords(args[0], flags); err != nil {
output.Fail(err)
}
}
},
ValidArgsFunction: topic.CompleteTopicNames,
}

cmdDeleteRecords.Flags().StringArrayVarP(&flags.Offsets, "offset", "", flags.Offsets, "offsets in format `partition=offset`. records with smaller offset will be deleted.")

if err := validation.MarkFlagAtLeastOneRequired(cmdDeleteRecords.Flags(), "offset"); err != nil {
panic(err)
}

return cmdDeleteRecords
}
77 changes: 77 additions & 0 deletions cmd/deletion/delete-records_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package deletion_test

import (
"strings"
"testing"

"github.com/deviceinsight/kafkactl/testutil"
)

func TestDeleteRecordsIntegration(t *testing.T) {

testutil.StartIntegrationTest(t)

topicName := testutil.CreateTopic(t, "delete-records-", "--partitions", "2")

testutil.ProduceMessageOnPartition(t, topicName, "key-1", "a", 0, 0)
testutil.ProduceMessageOnPartition(t, topicName, "key-1", "b", 0, 1)
testutil.ProduceMessageOnPartition(t, topicName, "key-2", "c", 1, 0)
testutil.ProduceMessageOnPartition(t, topicName, "key-2", "d", 1, 1)
testutil.ProduceMessageOnPartition(t, topicName, "key-2", "e", 1, 2)

kafkaCtl := testutil.CreateKafkaCtlCommand()

// check initial messages
if _, err := kafkaCtl.Execute("consume", topicName, "--from-beginning", "--print-keys", "--exit"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

messages := strings.Split(strings.TrimSpace(kafkaCtl.GetStdOut()), "\n")

if len(messages) != 5 {
t.Fatalf("expected 5 messages, got %d", len(messages))
}

// delete records
if _, err := kafkaCtl.Execute("delete", "records", topicName, "--offset", "0=1", "--offset", "1=2"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

// check messages
if _, err := kafkaCtl.Execute("consume", topicName, "--from-beginning", "--print-keys", "--exit"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

messages = strings.Split(strings.TrimSpace(kafkaCtl.GetStdOut()), "\n")

if len(messages) != 2 {
t.Fatalf("expected 2 messages, got %d", len(messages))
}

testutil.AssertEquals(t, "key-1#b", messages[0])
testutil.AssertEquals(t, "key-2#e", messages[1])
}

func TestDeleteRecordsAutoCompletionIntegration(t *testing.T) {

testutil.StartIntegrationTest(t)

prefix := "delete-complete-"

topicName1 := testutil.CreateTopic(t, prefix+"a")
topicName2 := testutil.CreateTopic(t, prefix+"b")
topicName3 := testutil.CreateTopic(t, prefix+"c")

kafkaCtl := testutil.CreateKafkaCtlCommand()
kafkaCtl.Verbose = false

if _, err := kafkaCtl.Execute("__complete", "delete", "records", ""); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

outputLines := strings.Split(strings.TrimSpace(kafkaCtl.GetStdOut()), "\n")

testutil.AssertContains(t, topicName1, outputLines)
testutil.AssertContains(t, topicName2, outputLines)
testutil.AssertContains(t, topicName3, outputLines)
}
3 changes: 2 additions & 1 deletion cmd/deletion/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ func NewDeleteCmd() *cobra.Command {

var cmdDelete = &cobra.Command{
Use: "delete",
Short: "delete topics, consumerGroups, consumer-group-offset, acls",
Short: "delete topics, consumerGroups, consumer-group-offset, acls, records",
}

cmdDelete.AddCommand(newDeleteTopicCmd())
cmdDelete.AddCommand(newDeleteConsumerGroupCmd())
cmdDelete.AddCommand(newDeleteConsumerGroupOffsetCmd())
cmdDelete.AddCommand(newDeleteACLCmd())
cmdDelete.AddCommand(newDeleteRecordsCmd())
return cmdDelete
}
30 changes: 1 addition & 29 deletions internal/consume/PartitionConsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package consume
import (
"context"
"math"
"strconv"
"strings"
"time"

"github.com/IBM/sarama"
Expand Down Expand Up @@ -190,7 +188,7 @@ func getStartOffset(client *sarama.Client, topic string, flags Flags, currentPar
} else if flags.FromBeginning {
return (*client).GetOffset(topic, currentPartition, sarama.OffsetOldest)
} else if len(flags.Offsets) > 0 {
return extractOffsetForPartition(flags, currentPartition)
return util.ExtractOffsetForPartition(flags.Offsets, currentPartition)
}
return sarama.OffsetNewest, nil
}
Expand All @@ -213,32 +211,6 @@ func getEndOffset(client *sarama.Client, topic string, flags Flags, currentParti
return sarama.OffsetNewest, nil
}

func extractOffsetForPartition(flags Flags, currentPartition int32) (int64, error) {
for _, offsetFlag := range flags.Offsets {
offsetParts := strings.Split(offsetFlag, "=")

if len(offsetParts) == 2 {

partition, err := strconv.Atoi(offsetParts[0])
if err != nil {
return ErrOffset, errors.Errorf("unable to parse offset parameter: %s (%v)", offsetFlag, err)
}

if int32(partition) != currentPartition {
continue
}

offset, err := strconv.ParseInt(offsetParts[1], 10, 64)
if err != nil {
return ErrOffset, errors.Errorf("unable to parse offset parameter: %s (%v)", offsetFlag, err)
}

return offset, nil
}
}
return ErrOffset, errors.Errorf("unable to find offset parameter for partition %d: %s", currentPartition, flags.Offsets)
}

func hasExclusiveConditions(flags ...bool) bool {
value := 0
for _, flag := range flags {
Expand Down
28 changes: 28 additions & 0 deletions internal/topic/topic-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ type AlterTopicFlags struct {
Configs []string
}

type DeleteRecordsFlags struct {
Offsets []string
}

type PrintConfigsParam string

const (
Expand Down Expand Up @@ -670,6 +674,30 @@ func (operation *Operation) GetTopics(flags GetTopicsFlags) error {
return nil
}

func (operation *Operation) DeleteRecords(topic string, flags DeleteRecordsFlags) error {

var (
err error
context internal.ClientContext
admin sarama.ClusterAdmin
)

if context, err = internal.CreateClientContext(); err != nil {
return err
}

if admin, err = internal.CreateClusterAdmin(&context); err != nil {
return errors.Wrap(err, "failed to create cluster admin")
}

offsets, parseErr := util.ParseOffsets(flags.Offsets)
if parseErr != nil {
return parseErr
}

return admin.DeleteRecords(topic, offsets)
}

func readTopic(client *sarama.Client, admin *sarama.ClusterAdmin, name string, requestedFields requestedTopicFields) (Topic, error) {
var (
err error
Expand Down
65 changes: 65 additions & 0 deletions util/parse_offsets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package util

import (
"math"
"strconv"
"strings"

"github.com/pkg/errors"
)

const ErrOffset = math.MinInt64
const offsetSeparator = "="

func ParseOffsets(rawOffsets []string) (map[int32]int64, error) {

offsets := make(map[int32]int64)

for _, offsetFlag := range rawOffsets {
offsetParts := strings.Split(offsetFlag, offsetSeparator)

if len(offsetParts) != 2 {
return nil, errors.Errorf("offset parameter has wrong format: %s %v", offsetFlag, rawOffsets)
}

partition, err := strconv.Atoi(offsetParts[0])
if err != nil {
return nil, errors.Errorf("unable to parse offset parameter: %s (%v)", offsetFlag, err)
}

offset, err := strconv.ParseInt(offsetParts[1], 10, 64)
if err != nil {
return nil, errors.Errorf("unable to parse offset parameter: %s (%v)", offsetFlag, err)
}

offsets[int32(partition)] = offset
}

return offsets, nil
}

func ExtractOffsetForPartition(rawOffsets []string, currentPartition int32) (int64, error) {
for _, offsetFlag := range rawOffsets {
offsetParts := strings.Split(offsetFlag, offsetSeparator)

if len(offsetParts) == 2 {

partition, err := strconv.Atoi(offsetParts[0])
if err != nil {
return ErrOffset, errors.Errorf("unable to parse offset parameter: %s (%v)", offsetFlag, err)
}

if int32(partition) != currentPartition {
continue
}

offset, err := strconv.ParseInt(offsetParts[1], 10, 64)
if err != nil {
return ErrOffset, errors.Errorf("unable to parse offset parameter: %s (%v)", offsetFlag, err)
}

return offset, nil
}
}
return ErrOffset, errors.Errorf("unable to find offset parameter for partition %d: %v", currentPartition, rawOffsets)
}
66 changes: 66 additions & 0 deletions util/parse_offsets_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package util_test

import (
"reflect"
"strings"
"testing"

"github.com/deviceinsight/kafkactl/util"
)

func TestParseOffsets(t *testing.T) {

type testCases struct {
description string
input []string
wantOffsets map[int32]int64
wantErr string
}

for _, test := range []testCases{
{
description: "successful_parsing",
input: []string{"1=222", "2=333", "5=444"},
wantOffsets: map[int32]int64{1: 222, 2: 333, 5: 444},
},
{
description: "wrong_separator_fails",
input: []string{"1:222"},
wantErr: "offset parameter has wrong format: 1:222 [1:222]",
},
{
description: "partition_not_an_int_fails",
input: []string{"abc=222"},
wantErr: "parsing \"abc\": invalid syntax",
},
{
description: "offset_not_an_int_fails",
input: []string{"1=nope"},
wantErr: "parsing \"nope\": invalid syntax",
},
} {
t.Run(test.description, func(t *testing.T) {

offsets, err := util.ParseOffsets(test.input)

if test.wantErr != "" {
if err == nil {
t.Errorf("want error %q but got nil", test.wantErr)
}

if !strings.Contains(err.Error(), test.wantErr) {
t.Errorf("want error %q got %q", test.wantErr, err)
}

return
}
if err != nil {
t.Errorf("doesn't want error but got %s", err)
}

if eq := reflect.DeepEqual(test.wantOffsets, offsets); !eq {
t.Errorf("want %q got %q", test.wantOffsets, offsets)
}
})
}
}

0 comments on commit e9e6a73

Please sign in to comment.