Skip to content

Commit

Permalink
config: change grpc-keepalive-timeout from uint to float64, for suppo…
Browse files Browse the repository at this point in the history
…rt millisecond timeout

Signed-off-by: crazycs520 <[email protected]>
  • Loading branch information
crazycs520 committed Sep 10, 2024
1 parent 28135fd commit 77b4d09
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 8 deletions.
11 changes: 9 additions & 2 deletions config/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type TiKVClient struct {
GrpcKeepAliveTime uint `toml:"grpc-keepalive-time" json:"grpc-keepalive-time"`
// After having pinged for keepalive check, the client waits for a duration of Timeout in seconds
// and if no activity is seen even after that the connection is closed.
GrpcKeepAliveTimeout uint `toml:"grpc-keepalive-timeout" json:"grpc-keepalive-timeout"`
GrpcKeepAliveTimeout float64 `toml:"grpc-keepalive-timeout" json:"grpc-keepalive-timeout"`
// GrpcCompressionType is the compression type for gRPC channel: none or gzip.
GrpcCompressionType string `toml:"grpc-compression-type" json:"grpc-compression-type"`
// GrpcSharedBufferPool is the flag to control whether to share the buffer pool in the TiKV gRPC clients.
Expand Down Expand Up @@ -154,7 +154,7 @@ func DefaultTiKVClient() TiKVClient {
return TiKVClient{
GrpcConnectionCount: 4,
GrpcKeepAliveTime: 10,
GrpcKeepAliveTimeout: 3,
GrpcKeepAliveTimeout: 0.2,
GrpcCompressionType: "none",
GrpcSharedBufferPool: false,
GrpcInitialWindowSize: DefGrpcInitialWindowSize,
Expand Down Expand Up @@ -204,5 +204,12 @@ func (config *TiKVClient) Valid() error {
if config.GrpcCompressionType != "none" && config.GrpcCompressionType != gzip.Name {
return fmt.Errorf("grpc-compression-type should be none or %s, but got %s", gzip.Name, config.GrpcCompressionType)
}
if config.GetGrpcKeepAliveTimeout() < time.Millisecond*50 {
return fmt.Errorf("grpc-keepalive-timeout should be at least 0.05, but got %f", config.GrpcKeepAliveTimeout)
}
return nil
}

func (config *TiKVClient) GetGrpcKeepAliveTimeout() time.Duration {
return time.Duration(config.GrpcKeepAliveTimeout * float64(time.Second))
}
13 changes: 13 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ package config

import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -76,3 +77,15 @@ func TestTxnScopeValue(t *testing.T) {
err = failpoint.Disable("tikvclient/injectTxnScope")
assert.Nil(t, err)
}

func TestDefaultConfig(t *testing.T) {
cfg := DefaultTiKVClient()
assert.Nil(t, cfg.Valid())
assert.Equal(t, time.Millisecond*200, cfg.GetGrpcKeepAliveTimeout())
cfg.GrpcKeepAliveTimeout = 0.05
assert.Nil(t, cfg.Valid())
assert.Equal(t, time.Millisecond*50, cfg.GetGrpcKeepAliveTimeout())
cfg.GrpcKeepAliveTimeout = 0.04
assert.NotNil(t, cfg.Valid())
assert.Equal(t, "grpc-keepalive-timeout should be at least 0.05, but got 0.040000", cfg.Valid().Error())
}
3 changes: 1 addition & 2 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,6 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint
a.batchConn.initMetrics(a.target)
}
keepAlive := cfg.TiKVClient.GrpcKeepAliveTime
keepAliveTimeout := cfg.TiKVClient.GrpcKeepAliveTimeout
for i := range a.v {
ctx, cancel := context.WithTimeout(context.Background(), a.dialTimeout)
var callOptions []grpc.CallOption
Expand All @@ -330,7 +329,7 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint
}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Duration(keepAlive) * time.Second,
Timeout: time.Duration(keepAliveTimeout) * time.Second,
Timeout: cfg.TiKVClient.GetGrpcKeepAliveTimeout(),
}),
}, opts...)
if cfg.TiKVClient.GrpcSharedBufferPool {
Expand Down
3 changes: 1 addition & 2 deletions internal/locate/store_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,6 @@ func createKVHealthClient(ctx context.Context, addr string) (*grpc.ClientConn, h
opt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
}
keepAlive := cfg.TiKVClient.GrpcKeepAliveTime
keepAliveTimeout := cfg.TiKVClient.GrpcKeepAliveTimeout
conn, err := grpc.DialContext(
ctx,
addr,
Expand All @@ -793,7 +792,7 @@ func createKVHealthClient(ctx context.Context, addr string) (*grpc.ClientConn, h
}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Duration(keepAlive) * time.Second,
Timeout: time.Duration(keepAliveTimeout) * time.Second,
Timeout: cfg.TiKVClient.GetGrpcKeepAliveTimeout(),
}),
)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func createEtcdKV(addrs []string, tlsConfig *tls.Config) (*clientv3.Client, erro
DialTimeout: 5 * time.Second,
TLS: tlsConfig,
DialKeepAliveTime: time.Second * time.Duration(cfg.TiKVClient.GrpcKeepAliveTime),
DialKeepAliveTimeout: time.Second * time.Duration(cfg.TiKVClient.GrpcKeepAliveTimeout),
DialKeepAliveTimeout: cfg.TiKVClient.GetGrpcKeepAliveTimeout(),
},
)
if err != nil {
Expand Down Expand Up @@ -308,7 +308,7 @@ func NewPDClient(pdAddrs []string) (pd.Client, error) {
grpc.WithKeepaliveParams(
keepalive.ClientParameters{
Time: time.Duration(cfg.TiKVClient.GrpcKeepAliveTime) * time.Second,
Timeout: time.Duration(cfg.TiKVClient.GrpcKeepAliveTimeout) * time.Second,
Timeout: cfg.TiKVClient.GetGrpcKeepAliveTimeout(),
},
),
),
Expand Down

0 comments on commit 77b4d09

Please sign in to comment.