Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conn timeout as flag in topicctl #200

Merged
merged 2 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions cmd/topicctl/subcmd/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"os"
"time"

"github.com/aws/aws-sdk-go/aws/session"
"github.com/hashicorp/go-multierror"
Expand All @@ -29,6 +30,7 @@ type sharedOptions struct {
tlsServerName string
zkAddr string
zkPrefix string
connTimeout time.Duration
}

func (s sharedOptions) validate() error {
Expand Down Expand Up @@ -164,6 +166,7 @@ func (s sharedOptions) getAdminClient(
Username: s.saslUsername,
SecretsManagerArn: s.saslSecretsManagerArn,
},
ConnTimeout: s.connTimeout,
},
ReadOnly: readOnly,
},
Expand All @@ -172,10 +175,11 @@ func (s sharedOptions) getAdminClient(
return admin.NewZKAdminClient(
ctx,
admin.ZKAdminClientConfig{
ZKAddrs: []string{s.zkAddr},
ZKPrefix: s.zkPrefix,
Sess: sess,
ReadOnly: readOnly,
ZKAddrs: []string{s.zkAddr},
ZKPrefix: s.zkPrefix,
Sess: sess,
ReadOnly: readOnly,
KafkaConnTimeout: s.connTimeout,
},
)
}
Expand Down Expand Up @@ -275,6 +279,12 @@ func addSharedFlags(cmd *cobra.Command, options *sharedOptions) {
"",
"Prefix for cluster-related nodes in zk",
)
cmd.PersistentFlags().DurationVar(
&options.connTimeout,
"conn-timeout",
10*time.Second,
"Kafka connection timeout",
)
}

func addSharedConfigOnlyFlags(cmd *cobra.Command, options *sharedOptions) {
Expand Down
7 changes: 4 additions & 3 deletions pkg/admin/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ const (

// ConnectorConfig contains the configuration used to contruct a connector.
type ConnectorConfig struct {
BrokerAddr string
TLS TLSConfig
SASL SASLConfig
BrokerAddr string
TLS TLSConfig
SASL SASLConfig
ConnTimeout time.Duration
}

// TLSConfig stores the TLS-related configuration for a connection.
Expand Down
4 changes: 3 additions & 1 deletion pkg/admin/zkclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type ZKAdminClientConfig struct {
ExpectedClusterID string
Sess *session.Session
ReadOnly bool
KafkaConnTimeout time.Duration
}

// NewZKAdminClient creates and returns a new Client instance.
Expand Down Expand Up @@ -136,7 +137,8 @@ func NewZKAdminClient(
client.bootstrapAddrs = bootstrapAddrs
client.Connector, err = NewConnector(
ConnectorConfig{
BrokerAddr: bootstrapAddrs[0],
BrokerAddr: bootstrapAddrs[0],
ConnTimeout: config.KafkaConnTimeout,
},
)

Expand Down
6 changes: 6 additions & 0 deletions pkg/groups/groups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func TestGetGroups(t *testing.T) {
ctx := context.Background()
connector, err := admin.NewConnector(admin.ConnectorConfig{
BrokerAddr: util.TestKafkaAddr(),
ConnTimeout: 10 * time.Second,
})
require.NoError(t, err)

Expand Down Expand Up @@ -83,6 +84,7 @@ func TestGetGroupsMultiMember(t *testing.T) {
ctx := context.Background()
connector, err := admin.NewConnector(admin.ConnectorConfig{
BrokerAddr: util.TestKafkaAddr(),
ConnTimeout: 10 * time.Second,
})
require.NoError(t, err)

Expand Down Expand Up @@ -164,6 +166,7 @@ func TestGetGroupsMultiMemberMultiTopic(t *testing.T) {
ctx := context.Background()
connector, err := admin.NewConnector(admin.ConnectorConfig{
BrokerAddr: util.TestKafkaAddr(),
ConnTimeout: 10 * time.Second,
})
require.NoError(t, err)

Expand Down Expand Up @@ -260,6 +263,7 @@ func TestGetLags(t *testing.T) {
ctx := context.Background()
connector, err := admin.NewConnector(admin.ConnectorConfig{
BrokerAddr: util.TestKafkaAddr(),
ConnTimeout: 10 * time.Second,
})
require.NoError(t, err)

Expand Down Expand Up @@ -303,6 +307,7 @@ func TestGetEarliestOrLatestOffset(t *testing.T) {
ctx := context.Background()
connector, err := admin.NewConnector(admin.ConnectorConfig{
BrokerAddr: util.TestKafkaAddr(),
ConnTimeout: 10 * time.Second,
})
require.NoError(t, err)

Expand Down Expand Up @@ -350,6 +355,7 @@ func TestResetOffsets(t *testing.T) {
ctx := context.Background()
connector, err := admin.NewConnector(admin.ConnectorConfig{
BrokerAddr: util.TestKafkaAddr(),
ConnTimeout: 10 * time.Second,
})
require.NoError(t, err)

Expand Down
5 changes: 1 addition & 4 deletions pkg/messages/bounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ const (
// Parameters for backoff when there are connection errors
maxRetries = 4
backoffInitSleepDuration = 200 * time.Millisecond

// Connection timeout
connTimeout = 10 * time.Second
)

// Bounds represents the start and end "bounds" of the messages in
Expand Down Expand Up @@ -284,6 +281,6 @@ func dialLeaderRetries(
return nil, fmt.Errorf("Error dialing partition %d: %+v", partition, err)
}

conn.SetDeadline(time.Now().Add(connTimeout))
conn.SetDeadline(time.Now().Add(connector.Config.ConnTimeout))
return conn, nil
}
1 change: 1 addition & 0 deletions pkg/messages/bounds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func TestGetAllPartitionBounds(t *testing.T) {
ctx := context.Background()
connector, err := admin.NewConnector(admin.ConnectorConfig{
BrokerAddr: util.TestKafkaAddr(),
ConnTimeout: 10 * time.Second,
})
require.NoError(t, err)

Expand Down
1 change: 1 addition & 0 deletions pkg/messages/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func TestTailerGetMessages(t *testing.T) {

connector, err := admin.NewConnector(admin.ConnectorConfig{
BrokerAddr: util.TestKafkaAddr(),
ConnTimeout: 10 * time.Second,
})
require.NoError(t, err)

Expand Down
2 changes: 1 addition & 1 deletion pkg/version/version.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package version

// Version is the current topicctl version.
const Version = "1.17.0"
const Version = "1.18.0"
Loading