Skip to content

Commit

Permalink
Merge pull request #212 from deviceinsight/feature/create-from-file
Browse files Browse the repository at this point in the history
Allow creating topic using topic description from file (rebased)
  • Loading branch information
d-rk authored Aug 14, 2024
2 parents 2fb456e + 593b15f commit e9f6e6b
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Added
- [#203](https://github.com/deviceinsight/kafkactl/pull/203) Add pod override fields affinity and tolerations
- [#210](https://github.com/deviceinsight/kafkactl/pull/210) Create topic from file

## 5.2.0 - 2024-08-08

Expand Down
36 changes: 36 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,42 @@ or with protoset
kafkactl consume <topic> --key-proto-type TopicKey --value-proto-type TopicValue --protoset-file kafkamsg.protoset
----

=== Create topics

The `create topic` allows you to create one or multiple topics.

Basic usage:
[,bash]
----
kafkactl create topic my-topic
----

The partition count can be specified with:
[,bash]
----
kafkactl create topic my-topic --partitions 32
----

The replication factor can be specified with:
[,bash]
----
kafkactl create topic my-topic --replication-factor 3
----

Configs can also be provided:
[,bash]
----
kafkactl create topic my-topic --config retention.ms=3600000 --config=cleanup.policy=compact
----

The topic configuration can also be taken from an existing topic using the following:
[,bash]
----
kafkactl describe topic my-topic -o json > my-topic-config.json
kafkactl create topic my-topic-clone --file my-topic-config.json
----


=== Altering topics

Using the `alter topic` command allows you to change the partition count, replication factor and topic-level
Expand Down
1 change: 1 addition & 0 deletions cmd/create/create-topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func newCreateTopicCmd() *cobra.Command {
cmdCreateTopic.Flags().Int32VarP(&flags.Partitions, "partitions", "p", 1, "number of partitions")
cmdCreateTopic.Flags().Int16VarP(&flags.ReplicationFactor, "replication-factor", "r", -1, "replication factor")
cmdCreateTopic.Flags().BoolVarP(&flags.ValidateOnly, "validate-only", "v", false, "validate only")
cmdCreateTopic.Flags().StringVarP(&flags.File, "file", "f", "", "file with topic description")
cmdCreateTopic.Flags().StringArrayVarP(&flags.Configs, "config", "c", flags.Configs, "configs in format `key=value`")

return cmdCreateTopic
Expand Down
96 changes: 96 additions & 0 deletions cmd/create/create-topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package create_test

import (
"fmt"
"os"
"testing"
"time"

Expand Down Expand Up @@ -109,6 +110,101 @@ partitions:
testutil.AssertEquals(t, fmt.Sprintf(expected, topicName), stdOut)
}

func TestCreateTopicWithConfigFileIntegration(t *testing.T) {

testutil.StartIntegrationTest(t)

kafkaCtl := testutil.CreateKafkaCtlCommand()

topicName := testutil.GetPrefixedName("new-topic")
configFile := fmt.Sprintf(`
name: %s
partitions:
- id: 0
oldestOffset: 0
newestOffset: 290
leader: kafka:9092
replicas: [1]
inSyncReplicas: [1]
- id: 1
oldestOffset: 0
newestOffset: 258
leader: kafka:9092
replicas: [1]
inSyncReplicas: [1]
- id: 2
oldestOffset: 0
newestOffset: 290
leader: kafka:9092
replicas: [1]
inSyncReplicas: [1]
configs:
- name: cleanup.policy
value: compact
- name: max.message.bytes
value: "10485880"
- name: min.cleanable.dirty.ratio
value: "1.0E-4"
- name: delete.retention.ms
value: "0"
- name: segment.ms
value: "100"
`, topicName)

tmpFile, err := os.CreateTemp("", fmt.Sprintf("%s-*.yaml", topicName))
if err != nil {
t.Fatalf("could not create temp file with topic config: %s", err)
}
defer tmpFile.Close()
if _, err := tmpFile.WriteString(configFile); err != nil {
t.Fatalf("could not write temp config file: %s", err)
}

if _, err := kafkaCtl.Execute("create", "topic", topicName, "-f", tmpFile.Name()); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, fmt.Sprintf("topic created: %s", topicName), kafkaCtl.GetStdOut())

describeTopic(t, kafkaCtl, topicName)
stdOut := testutil.WithoutBrokerReferences(kafkaCtl.GetStdOut())

expected := `
name: %s
partitions:
- id: 0
oldestOffset: 0
newestOffset: 0
leader: any-broker
replicas: [any-broker-id]
inSyncReplicas: [any-broker-id]
- id: 1
oldestOffset: 0
newestOffset: 0
leader: any-broker
replicas: [any-broker-id]
inSyncReplicas: [any-broker-id]
- id: 2
oldestOffset: 0
newestOffset: 0
leader: any-broker
replicas: [any-broker-id]
inSyncReplicas: [any-broker-id]
configs:
- name: cleanup.policy
value: compact
- name: max.message.bytes
value: "10485880"
- name: min.cleanable.dirty.ratio
value: "1.0E-4"
- name: delete.retention.ms
value: "0"
- name: segment.ms
value: "100"`

testutil.AssertEquals(t, fmt.Sprintf(expected, topicName), stdOut)
}

func describeTopic(t *testing.T, kafkaCtl testutil.KafkaCtlTestCommand, topicName string) {
describeTopic := func(_ uint) error {
_, err := kafkaCtl.Execute("describe", "topic", topicName, "-o", "yaml")
Expand Down
47 changes: 47 additions & 0 deletions internal/topic/topic-operation.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package topic

import (
"encoding/json"
"fmt"
"os"
"path"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -53,6 +56,7 @@ type CreateTopicFlags struct {
Partitions int32
ReplicationFactor int16
ValidateOnly bool
File string
Configs []string
}

Expand Down Expand Up @@ -111,6 +115,49 @@ func (operation *Operation) CreateTopics(topics []string, flags CreateTopicFlags
topicDetails.ConfigEntries[configParts[0]] = &configParts[1]
}

if flags.File != "" {
fileContent, err := os.ReadFile(flags.File)
if err != nil {
return errors.Wrap(err, "could not read topic description file")
}

fileTopicConfig := Topic{}
ext := path.Ext(flags.File)
var unmarshalErr error
switch ext {
case ".yml", ".yaml":
unmarshalErr = yaml.Unmarshal(fileContent, &fileTopicConfig)
case ".json":
unmarshalErr = json.Unmarshal(fileContent, &fileTopicConfig)
default:
return errors.Wrapf(err, "unsupported file format '%s'", ext)
}
if unmarshalErr != nil {
return errors.Wrap(err, "could not unmarshal config file")
}

numPartitions := int32(len(fileTopicConfig.Partitions))
if flags.Partitions == 1 {
topicDetails.NumPartitions = numPartitions
}

replicationFactors := map[int16]struct{}{}
for _, partition := range fileTopicConfig.Partitions {
replicationFactors[int16(len(partition.Replicas))] = struct{}{}
}
if flags.ReplicationFactor == -1 && len(replicationFactors) == 1 {
topicDetails.ReplicationFactor = int16(len(fileTopicConfig.Partitions[0].Replicas))
} else if flags.ReplicationFactor == -1 && len(replicationFactors) != 1 {
output.Warnf("replication factor from file ignored. partitions have different replicaCounts.")
}

for _, v := range fileTopicConfig.Configs {
if _, ok := topicDetails.ConfigEntries[v.Name]; !ok {
topicDetails.ConfigEntries[v.Name] = &v.Value
}
}
}

for _, topic := range topics {
if err = admin.CreateTopic(topic, &topicDetails, flags.ValidateOnly); err != nil {
return errors.Wrap(err, "failed to create topic")
Expand Down

0 comments on commit e9f6e6b

Please sign in to comment.