Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #12103] Enhance ClientWorker support grpc request timeout param. #12619

Merged
merged 2 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/src/main/java/com/alibaba/nacos/api/PropertyKeyConst.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public class PropertyKeyConst {

public static final String CONFIG_RETRY_TIME = "configRetryTime";

public static final String CONFIG_REQUEST_TIMEOUT = "configRequestTimeout";

public static final String CLIENT_WORKER_MAX_THREAD_COUNT = "clientWorkerMaxThreadCount";

public static final String CLIENT_WORKER_THREAD_COUNT = "clientWorkerThreadCount";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ public class ClientWorker implements Closeable {

private long timeout;

private long requestTimeout;

private final ConfigRpcTransportClient agent;

private int taskPenaltyTime;
Expand Down Expand Up @@ -405,7 +407,7 @@ public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant
cache.setTaskId(taskId);
// fix issue # 1317
if (enableRemoteSyncConfig) {
ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L, false);
ConfigResponse response = getServerConfig(dataId, group, tenant, requestTimeout, false);
cache.setEncryptedDataKey(response.getEncryptedDataKey());
cache.setContent(response.getContent());
}
Expand Down Expand Up @@ -510,6 +512,8 @@ private int initWorkerThreadCount(NacosClientProperties properties) {

private void init(NacosClientProperties properties) {

requestTimeout = ConvertUtils.toLong(properties.getProperty(PropertyKeyConst.CONFIG_REQUEST_TIMEOUT, "-1"));

timeout = Math.max(ConvertUtils.toInt(properties.getProperty(PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT),
Constants.CONFIG_LONG_POLL_TIMEOUT), Constants.MIN_CONFIG_LONG_POLL_TIMEOUT);

Expand Down Expand Up @@ -927,7 +931,7 @@ private void refreshContentAndCheck(RpcClient rpcClient, CacheData cacheData, bo
try {

ConfigResponse response = this.queryConfigInner(rpcClient, cacheData.dataId, cacheData.group,
cacheData.tenant, 3000L, notify);
cacheData.tenant, requestTimeout, notify);
cacheData.setEncryptedDataKey(response.getEncryptedDataKey());
cacheData.setContent(response.getContent());
if (null != response.getConfigType()) {
Expand Down Expand Up @@ -1198,7 +1202,7 @@ ConfigResponse queryConfigInner(RpcClient rpcClient, String dataId, String group
}

private Response requestProxy(RpcClient rpcClientInner, Request request) throws NacosException {
return requestProxy(rpcClientInner, request, 3000L);
return requestProxy(rpcClientInner, request, requestTimeout);
}

private Response requestProxy(RpcClient rpcClientInner, Request request, long timeoutMills)
Expand All @@ -1217,6 +1221,9 @@ private Response requestProxy(RpcClient rpcClientInner, Request request, long ti
throw new NacosException(NacosException.CLIENT_OVER_THRESHOLD,
"More than client-side current limit threshold");
}
if (timeoutMills < 0) {
return rpcClientInner.request(request);
}
return rpcClientInner.request(request, timeoutMills);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ void testPublishConfigSuccess() throws NacosException {
String casMd5 = "1111";

String type = "properties";
Mockito.when(rpcClient.request(any(ConfigPublishRequest.class), anyLong()))
Mockito.when(rpcClient.request(any(ConfigPublishRequest.class)))
.thenReturn(new ConfigPublishResponse());
boolean b = clientWorker.publishConfig(dataId, group, tenant, appName, tag, betaIps, content, null, casMd5,
type);
Expand Down Expand Up @@ -261,7 +261,7 @@ void testPublishConfigFail() throws NacosException {
String casMd5 = "1111";

String type = "properties";
Mockito.when(rpcClient.request(any(ConfigPublishRequest.class), anyLong()))
Mockito.when(rpcClient.request(any(ConfigPublishRequest.class)))
.thenReturn(ConfigPublishResponse.buildFailResponse(503, "over limit"));
boolean b = clientWorker.publishConfig(dataId, group, tenant, appName, tag, betaIps, content, null, casMd5,
type);
Expand Down Expand Up @@ -290,7 +290,7 @@ void testPublishConfigException() throws NacosException {
String casMd5 = "1111";

String type = "properties";
Mockito.when(rpcClient.request(any(ConfigPublishRequest.class), anyLong())).thenThrow(new NacosException());
Mockito.when(rpcClient.request(any(ConfigPublishRequest.class))).thenThrow(new NacosException());
boolean b = clientWorker.publishConfig(dataId, group, tenant, appName, tag, betaIps, content, null, casMd5,
type);
assertFalse(b);
Expand All @@ -313,7 +313,7 @@ void testRemoveConfig() throws NacosException {

String tag = "tag";
try {
Mockito.when(rpcClient.request(any(ConfigRemoveRequest.class), anyLong()))
Mockito.when(rpcClient.request(any(ConfigRemoveRequest.class)))
.thenThrow(new NacosException(503, "overlimit"));

clientWorker.removeConfig(dataId, group, tenant, tag);
Expand Down Expand Up @@ -562,13 +562,13 @@ public void receiveConfigInfo(String configInfo) {
() -> RpcClientFactory.createClient(anyString(), any(ConnectionType.class), any(Map.class),
any(RpcClientTlsConfig.class))).thenReturn(rpcClientInner);
// mock listen and remove listen request
Mockito.when(rpcClientInner.request(any(ConfigBatchListenRequest.class), anyLong()))
Mockito.when(rpcClientInner.request(any(ConfigBatchListenRequest.class)))
.thenReturn(response, response);
// mock query changed config
ConfigQueryResponse configQueryResponse = new ConfigQueryResponse();
configQueryResponse.setContent("content" + System.currentTimeMillis());
configQueryResponse.setContentType(ConfigType.JSON.getType());
Mockito.when(rpcClientInner.request(any(ConfigQueryRequest.class), anyLong())).thenReturn(configQueryResponse);
Mockito.when(rpcClientInner.request(any(ConfigQueryRequest.class))).thenReturn(configQueryResponse);
(clientWorker.getAgent()).executeConfigListen();
//assert
//use local cache.
Expand Down
Loading