Skip to content

Commit

Permalink
context.WithTimeout 1s on ping
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitryRomanov committed Jul 17, 2024
1 parent c47f1e1 commit 1820cc9
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 2 deletions.
7 changes: 6 additions & 1 deletion plugin/input/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kafka

import (
"context"
"time"

"github.com/ozontech/file.d/cfg"
"github.com/twmb/franz-go/pkg/kgo"
Expand Down Expand Up @@ -47,7 +48,11 @@ func NewClient(c *Config, l *zap.Logger) *kgo.Client {
if err != nil {
l.Fatal("can't create kafka client", zap.Error(err))
}
err = client.Ping(context.TODO())

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

err = client.Ping(ctx)
if err != nil {
l.Fatal("can't connect to kafka", zap.Error(err))
}
Expand Down
5 changes: 4 additions & 1 deletion plugin/output/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ func NewClient(c *Config, l *zap.Logger) *kgo.Client {
l.Fatal("can't create kafka client", zap.Error(err))
}

err = client.Ping(context.TODO())
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

err = client.Ping(ctx)
if err != nil {
l.Fatal("can't connect to kafka", zap.Error(err))
}
Expand Down

0 comments on commit 1820cc9

Please sign in to comment.