Skip to content

Commit

Permalink
gomaxprocs*1 default for kafka consumer max_concurrent_fetches
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitryRomanov committed Aug 19, 2024
1 parent 2f880de commit 64c939d
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 2 deletions.
2 changes: 1 addition & 1 deletion plugin/input/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func NewClient(c *Config, l *zap.Logger, s Consumer) *kgo.Client {
kgo.ConsumeTopics(c.Topics...),
kgo.FetchMaxWait(c.ConsumerMaxWaitTime_),
kgo.AutoCommitMarks(),
kgo.MaxConcurrentFetches(c.MaxConcurrentFetches),
kgo.MaxConcurrentFetches(c.MaxConcurrentFetches_),
kgo.FetchMaxBytes(c.FetchMaxBytes_),
kgo.FetchMinBytes(c.FetchMinBytes_),
kgo.AutoCommitInterval(c.AutoCommitInterval_),
Expand Down
3 changes: 2 additions & 1 deletion plugin/input/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ type Config struct {
// > MaxConcurrentFetches sets the maximum number of fetch requests to allow in
// > flight or buffered at once, overriding the unbounded (i.e. number of
// > brokers) default.
MaxConcurrentFetches int `json:"max_concurrent_fetches" default:"0"` // *
MaxConcurrentFetches cfg.Expression `json:"max_concurrent_fetches" default:"gomaxprocs*1" parse:"expression"` // *
MaxConcurrentFetches_ int

// > @3@4@5@6
// >
Expand Down

0 comments on commit 64c939d

Please sign in to comment.