From 77b4d092ccfbf2297ddbb4648074e1b924053180 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Tue, 10 Sep 2024 16:45:28 +0800 Subject: [PATCH] config: change grpc-keepalive-timeout from uint to float64, for support millisecond timeout Signed-off-by: crazycs520 --- config/client.go | 11 +++++++++-- config/config_test.go | 13 +++++++++++++ internal/client/client.go | 3 +-- internal/locate/store_cache.go | 3 +-- tikv/kv.go | 4 ++-- 5 files changed, 26 insertions(+), 8 deletions(-) diff --git a/config/client.go b/config/client.go index 83b4155f5..15d4a51f1 100644 --- a/config/client.go +++ b/config/client.go @@ -72,7 +72,7 @@ type TiKVClient struct { GrpcKeepAliveTime uint `toml:"grpc-keepalive-time" json:"grpc-keepalive-time"` // After having pinged for keepalive check, the client waits for a duration of Timeout in seconds // and if no activity is seen even after that the connection is closed. - GrpcKeepAliveTimeout uint `toml:"grpc-keepalive-timeout" json:"grpc-keepalive-timeout"` + GrpcKeepAliveTimeout float64 `toml:"grpc-keepalive-timeout" json:"grpc-keepalive-timeout"` // GrpcCompressionType is the compression type for gRPC channel: none or gzip. GrpcCompressionType string `toml:"grpc-compression-type" json:"grpc-compression-type"` // GrpcSharedBufferPool is the flag to control whether to share the buffer pool in the TiKV gRPC clients. @@ -154,7 +154,7 @@ func DefaultTiKVClient() TiKVClient { return TiKVClient{ GrpcConnectionCount: 4, GrpcKeepAliveTime: 10, - GrpcKeepAliveTimeout: 3, + GrpcKeepAliveTimeout: 0.2, GrpcCompressionType: "none", GrpcSharedBufferPool: false, GrpcInitialWindowSize: DefGrpcInitialWindowSize, @@ -204,5 +204,12 @@ func (config *TiKVClient) Valid() error { if config.GrpcCompressionType != "none" && config.GrpcCompressionType != gzip.Name { return fmt.Errorf("grpc-compression-type should be none or %s, but got %s", gzip.Name, config.GrpcCompressionType) } + if config.GetGrpcKeepAliveTimeout() < time.Millisecond*50 { + return fmt.Errorf("grpc-keepalive-timeout should be at least 0.05, but got %f", config.GrpcKeepAliveTimeout) + } return nil } + +func (config *TiKVClient) GetGrpcKeepAliveTimeout() time.Duration { + return time.Duration(config.GrpcKeepAliveTimeout * float64(time.Second)) +} diff --git a/config/config_test.go b/config/config_test.go index 3058a29e5..cf44b00ae 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -36,6 +36,7 @@ package config import ( "testing" + "time" "github.com/pingcap/failpoint" "github.com/stretchr/testify/assert" @@ -76,3 +77,15 @@ func TestTxnScopeValue(t *testing.T) { err = failpoint.Disable("tikvclient/injectTxnScope") assert.Nil(t, err) } + +func TestDefaultConfig(t *testing.T) { + cfg := DefaultTiKVClient() + assert.Nil(t, cfg.Valid()) + assert.Equal(t, time.Millisecond*200, cfg.GetGrpcKeepAliveTimeout()) + cfg.GrpcKeepAliveTimeout = 0.05 + assert.Nil(t, cfg.Valid()) + assert.Equal(t, time.Millisecond*50, cfg.GetGrpcKeepAliveTimeout()) + cfg.GrpcKeepAliveTimeout = 0.04 + assert.NotNil(t, cfg.Valid()) + assert.Equal(t, "grpc-keepalive-timeout should be at least 0.05, but got 0.040000", cfg.Valid().Error()) +} diff --git a/internal/client/client.go b/internal/client/client.go index 10bb7ed5b..fc832d368 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -303,7 +303,6 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint a.batchConn.initMetrics(a.target) } keepAlive := cfg.TiKVClient.GrpcKeepAliveTime - keepAliveTimeout := cfg.TiKVClient.GrpcKeepAliveTimeout for i := range a.v { ctx, cancel := context.WithTimeout(context.Background(), a.dialTimeout) var callOptions []grpc.CallOption @@ -330,7 +329,7 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint }), grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: time.Duration(keepAlive) * time.Second, - Timeout: time.Duration(keepAliveTimeout) * time.Second, + Timeout: cfg.TiKVClient.GetGrpcKeepAliveTimeout(), }), }, opts...) if cfg.TiKVClient.GrpcSharedBufferPool { diff --git a/internal/locate/store_cache.go b/internal/locate/store_cache.go index d0bea45a7..7d4583ff2 100644 --- a/internal/locate/store_cache.go +++ b/internal/locate/store_cache.go @@ -775,7 +775,6 @@ func createKVHealthClient(ctx context.Context, addr string) (*grpc.ClientConn, h opt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)) } keepAlive := cfg.TiKVClient.GrpcKeepAliveTime - keepAliveTimeout := cfg.TiKVClient.GrpcKeepAliveTimeout conn, err := grpc.DialContext( ctx, addr, @@ -793,7 +792,7 @@ func createKVHealthClient(ctx context.Context, addr string) (*grpc.ClientConn, h }), grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: time.Duration(keepAlive) * time.Second, - Timeout: time.Duration(keepAliveTimeout) * time.Second, + Timeout: cfg.TiKVClient.GetGrpcKeepAliveTimeout(), }), ) if err != nil { diff --git a/tikv/kv.go b/tikv/kv.go index 1afedc294..7c45137b2 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -91,7 +91,7 @@ func createEtcdKV(addrs []string, tlsConfig *tls.Config) (*clientv3.Client, erro DialTimeout: 5 * time.Second, TLS: tlsConfig, DialKeepAliveTime: time.Second * time.Duration(cfg.TiKVClient.GrpcKeepAliveTime), - DialKeepAliveTimeout: time.Second * time.Duration(cfg.TiKVClient.GrpcKeepAliveTimeout), + DialKeepAliveTimeout: cfg.TiKVClient.GetGrpcKeepAliveTimeout(), }, ) if err != nil { @@ -308,7 +308,7 @@ func NewPDClient(pdAddrs []string) (pd.Client, error) { grpc.WithKeepaliveParams( keepalive.ClientParameters{ Time: time.Duration(cfg.TiKVClient.GrpcKeepAliveTime) * time.Second, - Timeout: time.Duration(cfg.TiKVClient.GrpcKeepAliveTimeout) * time.Second, + Timeout: cfg.TiKVClient.GetGrpcKeepAliveTimeout(), }, ), ),