Skip to content

Commit

Permalink
Deploying to gh-pages from @ 293ab5c 🚀
Browse files Browse the repository at this point in the history
  • Loading branch information
d-rk committed Aug 14, 2024
1 parent 0970e79 commit 3ca7ccb
Show file tree
Hide file tree
Showing 11 changed files with 243 additions and 7 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## 5.3.0 - 2024-08-14
### Added
- [#203](https://github.com/deviceinsight/kafkactl/pull/203) Add pod override fields affinity and tolerations
- [#210](https://github.com/deviceinsight/kafkactl/pull/210) Create topic from file

## 5.2.0 - 2024-08-08

## 5.1.0 - 2024-08-07
Expand Down
58 changes: 57 additions & 1 deletion README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,25 @@ contexts:
# optional: nodeSelector to add to the pod
nodeSelector:
key: value
# optional: affinity to add to the pod
affinity:
# note: other types of affinity also supported
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: "<key>"
operator: "<operator>"
values: [ "<value>" ]
# optional: tolerations to add to the pod
tolerations:
- key: "<key>"
operator: "<operator>"
value: "<value>"
effect: "<effect>"
# optional: clientID config (defaults to kafkactl-{username})
clientID: my-client-id
Expand Down Expand Up @@ -400,6 +419,7 @@ See the plugin documentation for additional documentation and usage examples.

Available plugins:

* https://github.com/deviceinsight/kafkactl-plugins/blob/main/aws/README.adoc[aws plugin]
* https://github.com/deviceinsight/kafkactl-plugins/blob/main/azure/README.adoc[azure plugin]

== Examples
Expand Down Expand Up @@ -623,7 +643,7 @@ Producing protobuf message converted from JSON:
kafkactl produce my-topic --key='{"keyField":123}' --key-proto-type MyKeyMessage --value='{"valueField":"value"}' --value-proto-type MyValueMessage --proto-file kafkamsg.proto
----

A more complex protobuf message converted from a multi-line JSON string can be produced using a file input with custom separators.
A more complex protobuf message converted from a multi-line JSON string can be produced using a file input with custom separators.

For example, if you have the following protobuf definition (`complex.proto`):

Expand Down Expand Up @@ -796,6 +816,42 @@ or with protoset
kafkactl consume <topic> --key-proto-type TopicKey --value-proto-type TopicValue --protoset-file kafkamsg.protoset
----

=== Create topics

The `create topic` allows you to create one or multiple topics.

Basic usage:
[,bash]
----
kafkactl create topic my-topic
----

The partition count can be specified with:
[,bash]
----
kafkactl create topic my-topic --partitions 32
----

The replication factor can be specified with:
[,bash]
----
kafkactl create topic my-topic --replication-factor 3
----

Configs can also be provided:
[,bash]
----
kafkactl create topic my-topic --config retention.ms=3600000 --config=cleanup.policy=compact
----

The topic configuration can also be taken from an existing topic using the following:
[,bash]
----
kafkactl describe topic my-topic -o json > my-topic-config.json
kafkactl create topic my-topic-clone --file my-topic-config.json
----


=== Altering topics

Using the `alter topic` command allows you to change the partition count, replication factor and topic-level
Expand Down
1 change: 1 addition & 0 deletions cmd/create/create-topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func newCreateTopicCmd() *cobra.Command {
cmdCreateTopic.Flags().Int32VarP(&flags.Partitions, "partitions", "p", 1, "number of partitions")
cmdCreateTopic.Flags().Int16VarP(&flags.ReplicationFactor, "replication-factor", "r", -1, "replication factor")
cmdCreateTopic.Flags().BoolVarP(&flags.ValidateOnly, "validate-only", "v", false, "validate only")
cmdCreateTopic.Flags().StringVarP(&flags.File, "file", "f", "", "file with topic description")
cmdCreateTopic.Flags().StringArrayVarP(&flags.Configs, "config", "c", flags.Configs, "configs in format `key=value`")

return cmdCreateTopic
Expand Down
96 changes: 96 additions & 0 deletions cmd/create/create-topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package create_test

import (
"fmt"
"os"
"testing"
"time"

Expand Down Expand Up @@ -109,6 +110,101 @@ partitions:
testutil.AssertEquals(t, fmt.Sprintf(expected, topicName), stdOut)
}

func TestCreateTopicWithConfigFileIntegration(t *testing.T) {

testutil.StartIntegrationTest(t)

kafkaCtl := testutil.CreateKafkaCtlCommand()

topicName := testutil.GetPrefixedName("new-topic")
configFile := fmt.Sprintf(`
name: %s
partitions:
- id: 0
oldestOffset: 0
newestOffset: 290
leader: kafka:9092
replicas: [1]
inSyncReplicas: [1]
- id: 1
oldestOffset: 0
newestOffset: 258
leader: kafka:9092
replicas: [1]
inSyncReplicas: [1]
- id: 2
oldestOffset: 0
newestOffset: 290
leader: kafka:9092
replicas: [1]
inSyncReplicas: [1]
configs:
- name: cleanup.policy
value: compact
- name: max.message.bytes
value: "10485880"
- name: min.cleanable.dirty.ratio
value: "1.0E-4"
- name: delete.retention.ms
value: "0"
- name: segment.ms
value: "100"
`, topicName)

tmpFile, err := os.CreateTemp("", fmt.Sprintf("%s-*.yaml", topicName))
if err != nil {
t.Fatalf("could not create temp file with topic config: %s", err)
}
defer tmpFile.Close()
if _, err := tmpFile.WriteString(configFile); err != nil {
t.Fatalf("could not write temp config file: %s", err)
}

if _, err := kafkaCtl.Execute("create", "topic", topicName, "-f", tmpFile.Name()); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, fmt.Sprintf("topic created: %s", topicName), kafkaCtl.GetStdOut())

describeTopic(t, kafkaCtl, topicName)
stdOut := testutil.WithoutBrokerReferences(kafkaCtl.GetStdOut())

expected := `
name: %s
partitions:
- id: 0
oldestOffset: 0
newestOffset: 0
leader: any-broker
replicas: [any-broker-id]
inSyncReplicas: [any-broker-id]
- id: 1
oldestOffset: 0
newestOffset: 0
leader: any-broker
replicas: [any-broker-id]
inSyncReplicas: [any-broker-id]
- id: 2
oldestOffset: 0
newestOffset: 0
leader: any-broker
replicas: [any-broker-id]
inSyncReplicas: [any-broker-id]
configs:
- name: cleanup.policy
value: compact
- name: max.message.bytes
value: "10485880"
- name: min.cleanable.dirty.ratio
value: "1.0E-4"
- name: delete.retention.ms
value: "0"
- name: segment.ms
value: "100"`

testutil.AssertEquals(t, fmt.Sprintf(expected, topicName), stdOut)
}

func describeTopic(t *testing.T, kafkaCtl testutil.KafkaCtlTestCommand, topicName string) {
describeTopic := func(_ uint) error {
_, err := kafkaCtl.Execute("describe", "topic", topicName, "-o", "yaml")
Expand Down
1 change: 1 addition & 0 deletions docs/kafkactl_docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ kafkactl create topic TOPIC [flags]

```
-c, --config key=value configs in format key=value
-f, --file string file with topic description
-h, --help help for topic
-p, --partitions int32 number of partitions (default 1)
-r, --replication-factor int16 replication factor (default -1)
Expand Down
14 changes: 14 additions & 0 deletions internal/common-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ type TLSConfig struct {
Insecure bool
}

type K8sToleration struct {
Key string `json:"key" yaml:"key"`
Operator string `json:"operator" yaml:"operator"`
Value string `json:"value" yaml:"value"`
Effect string `json:"effect" yaml:"effect"`
}

type K8sConfig struct {
Enabled bool
Binary string
Expand All @@ -74,6 +81,8 @@ type K8sConfig struct {
Labels map[string]string
Annotations map[string]string
NodeSelector map[string]string
Affinity map[string]any
Tolerations []K8sToleration
}

type ConsumerConfig struct {
Expand Down Expand Up @@ -174,6 +183,11 @@ func CreateClientContext() (ClientContext, error) {
context.Kubernetes.Labels = viper.GetStringMapString("contexts." + context.Name + ".kubernetes.labels")
context.Kubernetes.Annotations = viper.GetStringMapString("contexts." + context.Name + ".kubernetes.annotations")
context.Kubernetes.NodeSelector = viper.GetStringMapString("contexts." + context.Name + ".kubernetes.nodeSelector")
context.Kubernetes.Affinity = viper.GetStringMap("contexts." + context.Name + ".kubernetes.affinity")

if err := viper.UnmarshalKey("contexts."+context.Name+".kubernetes.tolerations", &context.Kubernetes.Tolerations); err != nil {
return context, err
}

return context, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type ConsumerGroupOffsetOperation struct {

func (operation *ConsumerGroupOffsetOperation) ResetConsumerGroupOffset(flags ResetConsumerGroupOffsetFlags, groupName string) error {

if (flags.Topic == nil || len(flags.Topic) == 0) && (!flags.AllTopics) {
if (len(flags.Topic) == 0) && (!flags.AllTopics) {
return errors.New("no topic specified")
}

Expand Down
4 changes: 4 additions & 0 deletions internal/k8s/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type executor struct {
labels map[string]string
annotations map[string]string
nodeSelector map[string]string
affinity map[string]any
tolerations []internal.K8sToleration
}

const letterBytes = "abcdefghijklmnpqrstuvwxyz123456789"
Expand Down Expand Up @@ -111,6 +113,8 @@ func newExecutor(context internal.ClientContext, runner Runner) *executor {
labels: context.Kubernetes.Labels,
annotations: context.Kubernetes.Annotations,
nodeSelector: context.Kubernetes.NodeSelector,
affinity: context.Kubernetes.Affinity,
tolerations: context.Kubernetes.Tolerations,
runner: runner,
}
}
Expand Down
20 changes: 16 additions & 4 deletions internal/k8s/pod_overrides.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package k8s

import "github.com/deviceinsight/kafkactl/v5/internal"

type imagePullSecretType struct {
Name string `json:"name"`
}
Expand All @@ -10,9 +12,11 @@ type metadataType struct {
}

type specType struct {
ImagePullSecrets []imagePullSecretType `json:"imagePullSecrets,omitempty"`
ServiceAccountName *string `json:"serviceAccountName,omitempty"`
NodeSelector *map[string]string `json:"nodeSelector,omitempty"`
ImagePullSecrets []imagePullSecretType `json:"imagePullSecrets,omitempty"`
ServiceAccountName *string `json:"serviceAccountName,omitempty"`
NodeSelector *map[string]string `json:"nodeSelector,omitempty"`
Affinity *map[string]any `json:"affinity,omitempty"`
Tolerations *[]internal.K8sToleration `json:"tolerations,omitempty"`
}

type PodOverrideType struct {
Expand All @@ -29,7 +33,7 @@ func (kubectl *executor) createPodOverride() PodOverrideType {
var override PodOverrideType
override.APIVersion = "v1"

if kubectl.serviceAccount != "" || kubectl.imagePullSecret != "" || len(kubectl.nodeSelector) > 0 {
if kubectl.serviceAccount != "" || kubectl.imagePullSecret != "" || len(kubectl.nodeSelector) > 0 || len(kubectl.affinity) > 0 || len(kubectl.tolerations) > 0 {
override.Spec = &specType{}

if kubectl.serviceAccount != "" {
Expand All @@ -44,6 +48,14 @@ func (kubectl *executor) createPodOverride() PodOverrideType {
if len(kubectl.nodeSelector) > 0 {
override.Spec.NodeSelector = &kubectl.nodeSelector
}

if len(kubectl.affinity) > 0 {
override.Spec.Affinity = &kubectl.affinity
}

if len(kubectl.tolerations) > 0 {
override.Spec.Tolerations = &kubectl.tolerations
}
}

if len(kubectl.labels) > 0 || len(kubectl.annotations) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion internal/producer/producer-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (operation *Operation) Produce(topic string, flags Flags) error {
}

if inputMessage, err = inputParser.ParseLine(line); err != nil {
return failWithMessageCount(messageCount, err.Error())
return failWithMessageCount(messageCount, err.Error()) //nolint:govet
}

messageCount++
Expand Down
Loading

0 comments on commit 3ca7ccb

Please sign in to comment.