Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Reset by time #171

Merged
merged 3 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,10 @@ kafkactl reset offset my-group --topic my-topic --partition 5 --offset 100
kafkactl reset offset my-group --all-topics --newest
# reset offset of for all partitions on multiple topics to oldest offset
kafkactl reset offset my-group --topic my-topic-a --topic my-topic-b --oldest
# reset offset to offset at a given timestamp(epoch)/datetime
kafkactl reset offset my-group --topic my-topic-a --to-datetime 2014-04-26T17:24:37.123Z
# reset offset to offset at a given timestamp(epoch)/datetime
kafkactl reset offset my-group --topic my-topic-a --to-datetime 1697726906352
```

### Delete consumer group offsets
Expand Down
3 changes: 1 addition & 2 deletions cmd/alter/alter-partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ func newAlterPartitionCmd() *cobra.Command {
return topic.CompleteTopicNames(cmd, args, toComplete)
} else if len(args) == 1 {
return partition.CompletePartitionIds(cmd, args, toComplete)
} else {
return nil, cobra.ShellCompDirectiveNoFileComp
}
return nil, cobra.ShellCompDirectiveNoFileComp
},
}

Expand Down
1 change: 1 addition & 0 deletions cmd/reset/reset-consumer-group-offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func newResetOffsetCmd() *cobra.Command {
cmdResetOffset.Flags().StringArrayVarP(&offsetFlags.Topic, "topic", "t", offsetFlags.Topic, "one ore more topics to change offset for")
cmdResetOffset.Flags().BoolVarP(&offsetFlags.Execute, "execute", "e", false, "execute the reset (as default only the results are displayed for validation)")
cmdResetOffset.Flags().StringVarP(&offsetFlags.OutputFormat, "output", "o", offsetFlags.OutputFormat, "output format. One of: json|yaml")
cmdResetOffset.Flags().StringVarP(&offsetFlags.ToDatetime, "to-datetime", "", "", "set the offset to offset of given timestamp")

return cmdResetOffset
}
43 changes: 43 additions & 0 deletions cmd/reset/reset-consumer-group-offset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package reset_test
import (
"strings"
"testing"
"time"

"github.com/deviceinsight/kafkactl/testutil"
)
Expand Down Expand Up @@ -130,6 +131,48 @@ func TestResetCGOForAllTopicsInTheGroupIntegration(t *testing.T) {
testutil.VerifyTopicNotInConsumerGroup(t, group, topicOther)
}

func TestResetCGOToDatetimeIntegration(t *testing.T) {
d-rk marked this conversation as resolved.
Show resolved Hide resolved

testutil.StartIntegrationTest(t)

topicName := testutil.CreateTopic(t, "reset-cgo-datetime")

group := testutil.CreateConsumerGroup(t, "reset-cgo-datetime", topicName)

testutil.ProduceMessage(t, topicName, "test-key", "a", 0, 0)
testutil.ProduceMessage(t, topicName, "test-key", "b", 0, 1)

time.Sleep(1 * time.Millisecond) // need to have messaged produced at different milliseconds to have reproducible test

t1 := time.Now()
t1Formatted := t1.Format("2006-01-02T15:04:05.000Z")

testutil.ProduceMessage(t, topicName, "test-key", "c", 0, 2)
testutil.ProduceMessage(t, topicName, "test-key", "d", 0, 3)
testutil.ProduceMessage(t, topicName, "test-key", "e", 0, 4)
testutil.ProduceMessage(t, topicName, "test-key", "f", 0, 5)

testutil.VerifyConsumerGroupOffset(t, group, topicName, 0)

//test with --to-datetime
kafkaCtl := testutil.CreateKafkaCtlCommand()

if _, err := kafkaCtl.Execute("reset", "offset", group, "--topic", topicName, "--to-datetime", t1Formatted, "--execute"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.VerifyConsumerGroupOffset(t, group, topicName, 2)

kafkaCtl = testutil.CreateKafkaCtlCommand()

if _, err := kafkaCtl.Execute("consume", topicName, "--group", group, "--max-messages", "4"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

messages := strings.Split(strings.TrimSpace(kafkaCtl.GetStdOut()), "\n")
testutil.AssertArraysEquals(t, []string{"c", "d", "e", "f"}, messages)
}

func TestResetCGOAutoCompletionIntegration(t *testing.T) {

testutil.StartIntegrationTest(t)
Expand Down
48 changes: 24 additions & 24 deletions internal/broker/broker-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,40 +163,40 @@ func (operation *Operation) DescribeBroker(id int32, flags DescribeBrokerFlags)
return output.PrintObject(brokerInfo, flags.OutputFormat)
} else if flags.OutputFormat != "" && flags.OutputFormat != "wide" {
return errors.Errorf("unknown outputFormat: %s", flags.OutputFormat)
} else {

tableWriter := output.CreateTableWriter()
}

// write broker info table
if err := tableWriter.WriteHeader("ID", "ADDRESS"); err != nil {
return err
}
tableWriter := output.CreateTableWriter()

if err := tableWriter.Write(fmt.Sprint(brokerInfo.ID), brokerInfo.Address); err != nil {
return err
}
// write broker info table
if err := tableWriter.WriteHeader("ID", "ADDRESS"); err != nil {
return err
}

if err := tableWriter.Flush(); err != nil {
return err
}
if err := tableWriter.Write(fmt.Sprint(brokerInfo.ID), brokerInfo.Address); err != nil {
return err
}

output.PrintStrings("")
if err := tableWriter.Flush(); err != nil {
return err
}

// first write config table
if err := tableWriter.WriteHeader("CONFIG", "VALUE"); err != nil {
return err
}
output.PrintStrings("")

for _, c := range brokerInfo.Configs {
if err := tableWriter.Write(c.Name, c.Value); err != nil {
return err
}
}
// first write config table
if err := tableWriter.WriteHeader("CONFIG", "VALUE"); err != nil {
return err
}

if err := tableWriter.Flush(); err != nil {
for _, c := range brokerInfo.Configs {
if err := tableWriter.Write(c.Name, c.Value); err != nil {
return err
}
}

if err := tableWriter.Flush(); err != nil {
return err
}

return nil
}

Expand Down
3 changes: 1 addition & 2 deletions internal/common-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,8 @@ func GetClientID(context *ClientContext, defaultPrefix string) string {
} else if usr, err = user.Current(); err != nil {
output.Warnf("Failed to read current user: %v", err)
return strings.TrimSuffix(defaultPrefix, "-")
} else {
return defaultPrefix + sanitizeUsername(usr.Username)
}
return defaultPrefix + sanitizeUsername(usr.Username)
}

func sanitizeUsername(u string) string {
Expand Down
3 changes: 1 addition & 2 deletions internal/consume/AvroMessageDeserializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,8 @@ func (deserializer AvroMessageDeserializer) CanDeserialize(topic string) (bool,
return true, nil
} else if util.ContainsString(subjects, topic+"-value") {
return true, nil
} else {
return false, nil
}
return false, nil
}

func (deserializer AvroMessageDeserializer) Deserialize(rawMsg *sarama.ConsumerMessage, flags Flags) error {
Expand Down
12 changes: 5 additions & 7 deletions internal/consume/PartitionConsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func getOffsetBounds(client *sarama.Client, topic string, flags Flags, currentPa

// Converts string to epoch unix timestamp
// The string might be null in that case, the flag is considered absent and the value -1 is returned
func convertToEpocUnixMillis(timestamp string) (int64, error) {
func ConvertToEpocUnixMillis(timestamp string) (int64, error) {
if timestamp == "" {
return -1, nil
}
Expand All @@ -178,7 +178,7 @@ func convertToEpocUnixMillis(timestamp string) (int64, error) {
}

func getStartOffset(client *sarama.Client, topic string, flags Flags, currentPartition int32) (int64, error) {
var fromUnixMillis, err = convertToEpocUnixMillis(flags.FromTimestamp)
var fromUnixMillis, err = ConvertToEpocUnixMillis(flags.FromTimestamp)
if err != nil {
return ErrOffset, err
}
Expand All @@ -191,13 +191,12 @@ func getStartOffset(client *sarama.Client, topic string, flags Flags, currentPar
return (*client).GetOffset(topic, currentPartition, sarama.OffsetOldest)
} else if len(flags.Offsets) > 0 {
return extractOffsetForPartition(flags, currentPartition)
} else {
return sarama.OffsetNewest, nil
}
return sarama.OffsetNewest, nil
}

func getEndOffset(client *sarama.Client, topic string, flags Flags, currentPartition int32) (int64, error) {
var toUnixMillis, err = convertToEpocUnixMillis(flags.ToTimestamp)
var toUnixMillis, err = ConvertToEpocUnixMillis(flags.ToTimestamp)
if err != nil {
return ErrOffset, err
}
Expand All @@ -210,9 +209,8 @@ func getEndOffset(client *sarama.Client, topic string, flags Flags, currentParti
return ErrOffset, err
}
return newestOffset, nil
} else {
return sarama.OffsetNewest, nil
}
return sarama.OffsetNewest, nil
}

func extractOffsetForPartition(flags Flags, currentPartition int32) (int64, error) {
Expand Down
26 changes: 18 additions & 8 deletions internal/consumergroupoffsets/OffsetResettingConsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"strconv"

"github.com/IBM/sarama"
"github.com/deviceinsight/kafkactl/internal/consume"
"github.com/deviceinsight/kafkactl/output"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -49,7 +50,7 @@ func (consumer *OffsetResettingConsumer) Setup(session sarama.ConsumerGroupSessi
offsets := make([]partitionOffsets, 0)

if flags.Partition > -1 {
offset, err := resetOffset(consumer.client, consumer.topicName, flags.Partition, flags, groupOffsets, session)
offset, err := resetOffset(&consumer.client, consumer.topicName, flags.Partition, flags, groupOffsets, session)
if err != nil {
return err
}
Expand All @@ -62,7 +63,7 @@ func (consumer *OffsetResettingConsumer) Setup(session sarama.ConsumerGroupSessi
}

for _, partition := range partitions {
offset, err := resetOffset(consumer.client, consumer.topicName, partition, flags, groupOffsets, session)
offset, err := resetOffset(&consumer.client, consumer.topicName, partition, flags, groupOffsets, session)
if err != nil {
return err
}
Expand Down Expand Up @@ -102,7 +103,7 @@ func (consumer *OffsetResettingConsumer) ConsumeClaim(sarama.ConsumerGroupSessio
return nil
}

func resetOffset(client sarama.Client, topic string, partition int32, flags ResetConsumerGroupOffsetFlags, groupOffsets *sarama.OffsetFetchResponse, session sarama.ConsumerGroupSession) (partitionOffsets, error) {
func resetOffset(client *sarama.Client, topic string, partition int32, flags ResetConsumerGroupOffsetFlags, groupOffsets *sarama.OffsetFetchResponse, session sarama.ConsumerGroupSession) (partitionOffsets, error) {
offset, err := getPartitionOffsets(client, topic, partition, flags)
if err != nil {
return offset, err
Expand All @@ -121,16 +122,26 @@ func resetOffset(client sarama.Client, topic string, partition int32, flags Rese
return offset, nil
}

func getPartitionOffsets(client sarama.Client, topic string, partition int32, flags ResetConsumerGroupOffsetFlags) (partitionOffsets, error) {
func getPartitionOffsets(client *sarama.Client, topic string, partition int32, flags ResetConsumerGroupOffsetFlags) (partitionOffsets, error) {

var err error
offsets := partitionOffsets{Partition: partition}

if offsets.OldestOffset, err = client.GetOffset(topic, partition, sarama.OffsetOldest); err != nil {
if flags.ToDatetime != "" {
milliTime, err := consume.ConvertToEpocUnixMillis(flags.ToDatetime)
if err != nil {
return offsets, err
}
if offsets.TargetOffset, err = (*client).GetOffset(topic, partition, milliTime); err == nil {
return offsets, nil
}
}

if offsets.OldestOffset, err = (*client).GetOffset(topic, partition, sarama.OffsetOldest); err != nil {
return offsets, errors.Errorf("failed to get offset for topic %s Partition %d: %v", topic, partition, err)
}

if offsets.NewestOffset, err = client.GetOffset(topic, partition, sarama.OffsetNewest); err != nil {
if offsets.NewestOffset, err = (*client).GetOffset(topic, partition, sarama.OffsetNewest); err != nil {
return offsets, errors.Errorf("failed to get offset for topic %s Partition %d: %v", topic, partition, err)
}

Expand All @@ -139,9 +150,8 @@ func getPartitionOffsets(client sarama.Client, topic string, partition int32, fl
return offsets, errors.Errorf("cannot set offset for Partition %d: offset (%d) < oldest offset (%d)", partition, flags.Offset, offsets.OldestOffset)
} else if flags.Offset > offsets.NewestOffset {
return offsets, errors.Errorf("cannot set offset for Partition %d: offset (%d) > newest offset (%d)", partition, flags.Offset, offsets.NewestOffset)
} else {
offsets.TargetOffset = flags.Offset
}
offsets.TargetOffset = flags.Offset
} else {
if flags.OldestOffset {
offsets.TargetOffset = offsets.OldestOffset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type ResetConsumerGroupOffsetFlags struct {
Execute bool
OutputFormat string
allowedGroupState string
ToDatetime string
}

type ConsumerGroupOffsetOperation struct {
Expand Down
3 changes: 1 addition & 2 deletions internal/producer/AvroMessageSerializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,8 @@ func (serializer AvroMessageSerializer) CanSerialize(topic string) (bool, error)
return true, nil
} else if util.ContainsString(subjects, topic+"-value") {
return true, nil
} else {
return false, nil
}
return false, nil
}

func (serializer AvroMessageSerializer) Serialize(key, value []byte, flags Flags) (*sarama.ProducerMessage, error) {
Expand Down
3 changes: 1 addition & 2 deletions internal/producer/producer-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,8 @@ func resolveColumns(line []string) (keyColumnIdx, valueColumnIdx, columnCount in
} else if isTimestamp(line[1]) {
output.Warnf("assuming column 1 to be message timestamp. Column will be ignored")
return 0, 2, 3, nil
} else {
return -1, -1, -1, errors.Errorf("line contains unexpected amount of separators:\n%s", line)
}
return -1, -1, -1, errors.Errorf("line contains unexpected amount of separators:\n%s", line)
}

func isTimestamp(value string) bool {
Expand Down
9 changes: 6 additions & 3 deletions util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import (
)

var dateFormats = []string{
"2006-01-02T15:04:05.123Z",
"2006-01-02T15:04:05.123",
"2006-01-02T15:04:05.00000Z",
"2006-01-02T15:04:05.000Z",
"2006-01-02T15:04:05.000",
"2006-01-02T15:04:05Z",
"2006-01-02T15:04:05",
"2006-01-02T15:04",
Expand All @@ -21,8 +22,10 @@ func ParseTimestamp(timestamp string) (time.Time, error) {
return time.UnixMilli(timeMs), nil
}

loc, _ := time.LoadLocation("Local")

for _, format := range dateFormats {
if val, e := time.Parse(format, timestamp); e == nil {
if val, e := time.ParseInLocation(format, timestamp, loc); e == nil {
return val, nil
}
}
Expand Down
Loading