diff --git a/pkg/outputs/kafka_output/kafka_output.go b/pkg/outputs/kafka_output/kafka_output.go index c769aa03..9217a76b 100644 --- a/pkg/outputs/kafka_output/kafka_output.go +++ b/pkg/outputs/kafka_output/kafka_output.go @@ -95,6 +95,7 @@ type config struct { SplitEvents bool `mapstructure:"split-events,omitempty"` NumWorkers int `mapstructure:"num-workers,omitempty"` CompressionCodec string `mapstructure:"compression-codec,omitempty"` + KafkaVersion string `mapstructure:"kafka-version,omitempty"` Debug bool `mapstructure:"debug,omitempty"` BufferSize int `mapstructure:"buffer-size,omitempty"` OverrideTimestamps bool `mapstructure:"override-timestamps,omitempty"` @@ -508,6 +509,13 @@ func (k *kafkaOutput) SetTargetsConfig(map[string]*types.TargetConfig) {} func (k *kafkaOutput) createConfig() (*sarama.Config, error) { cfg := sarama.NewConfig() cfg.ClientID = k.cfg.Name + if k.cfg.KafkaVersion != "" { + var err error + cfg.Version, err = sarama.ParseKafkaVersion(k.cfg.KafkaVersion) + if err != nil { + return nil, err + } + } // SASL_PLAINTEXT or SASL_SSL if k.cfg.SASL != nil { cfg.Net.SASL.Enable = true