Skip to content

Commit

Permalink
[ISSUE #1097] Naming support grpc server forward request (#3480)
Browse files Browse the repository at this point in the history
* re subscribe service when reconnect

* change grpc instance maintain by heartbeat

* Add lifecycle for remoting workers

* Refactor naming client redo when reconnect

* Fix checkstyle and PMD

* Implement forward instance request to responsible server

* Implement forward heart beat to servers
  • Loading branch information
KomachiSion authored Jul 30, 2020
1 parent b1a587d commit 5f528d8
Show file tree
Hide file tree
Showing 46 changed files with 1,728 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* @author liuzunfei
* @version $Id: ConfigCommonRequest.java, v 0.1 2020年07月13日 9:05 PM liuzunfei Exp $
*/
public abstract class ConfigCommonRequest extends Request {
public abstract class AbstractConfigRequest extends Request {

@Override
public String getModule() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* @author liuzunfei
* @version $Id: ConfigBatchListenRequest.java, v 0.1 2020年07月27日 7:46 PM liuzunfei Exp $
*/
public class ConfigBatchListenRequest extends ConfigCommonRequest {
public class ConfigBatchListenRequest extends AbstractConfigRequest {

private static final String Y = "Y";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* @author liuzunfei
* @version $Id: ConfigPublishRequest.java, v 0.1 2020年07月16日 4:30 PM liuzunfei Exp $
*/
public class ConfigPublishRequest extends ConfigCommonRequest {
public class ConfigPublishRequest extends AbstractConfigRequest {

String dataId;

Expand Down Expand Up @@ -59,7 +59,7 @@ public String getAdditionParam(String key) {
*/
public void putAdditonalParam(String key, String value) {
if (additonMap == null) {
additonMap = new HashMap<String, String>();
additonMap = new HashMap<String, String>(2);
}
additonMap.put(key, value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* @author liuzunfei
* @version $Id: ConfigQueryRequest.java, v 0.1 2020年07月13日 9:06 PM liuzunfei Exp $
*/
public class ConfigQueryRequest extends ConfigCommonRequest {
public class ConfigQueryRequest extends AbstractConfigRequest {

private String dataId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* @author liuzunfei
* @version $Id: ConfigRemoveRequest.java, v 0.1 2020年07月16日 4:31 PM liuzunfei Exp $
*/
public class ConfigRemoveRequest extends ConfigCommonRequest {
public class ConfigRemoveRequest extends AbstractConfigRequest {

String dataId;

Expand Down Expand Up @@ -119,4 +119,4 @@ public String getTenant() {
public void setTenant(String tenant) {
this.tenant = tenant;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,8 @@ public class NamingRemoteConstants {
public static final String NOTIFY_SUBSCRIBER = "notifySubscriber";

public static final String LIST_SERVICE = "listService";

public static final String FORWARD_INSTANCE = "forwardInstance";

public static final String FORWARD_HEART_BEAT = "forwardHeartBeat";
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@
*
* @author liuzunfei
*/
public abstract class NamingCommonRequest extends Request {
public abstract class AbstractNamingRequest extends Request {

private String namespace;

private String serviceName;

private String groupName;

public NamingCommonRequest() {
public AbstractNamingRequest() {
}

public NamingCommonRequest(String namespace, String serviceName, String groupName) {
public AbstractNamingRequest(String namespace, String serviceName, String groupName) {
this.namespace = namespace;
this.serviceName = serviceName;
this.groupName = groupName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
*
* @author xiweng.yy
*/
public class InstanceRequest extends NamingCommonRequest {
public class InstanceRequest extends AbstractNamingRequest {

private String type;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
*
* @author xiweng.yy
*/
public class ServiceListRequest extends NamingCommonRequest {
public class ServiceListRequest extends AbstractNamingRequest {

private int pageNo;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
*
* @author xiweng.yy
*/
public class ServiceQueryRequest extends NamingCommonRequest {
public class ServiceQueryRequest extends AbstractNamingRequest {

private String cluster;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
*
* @author xiweng.yy
*/
public class SubscribeServiceRequest extends NamingCommonRequest {
public class SubscribeServiceRequest extends AbstractNamingRequest {

private boolean subscribe;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
* @author liuzunfei
* @version $Id: Connection.java, v 0.1 2020年07月13日 7:08 PM liuzunfei Exp $
*/
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class Connection {

public static final String HEALTHY = "healthy";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
*/
public enum ConnectionType {

/**
* gRPC connection.
*/
GRPC("GRPC", "Grpc Connection");

String type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* @author liuzunfei
* @version $Id: InternalRequest.java, v 0.1 2020年07月22日 8:33 PM liuzunfei Exp $
*/
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class InternalRequest extends Request {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
*
* @author liuzunfei
*/
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class Request {

private final Map<String, String> headers = new HashMap<String, String>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* @author liuzunfei
* @version $Id: Response.java, v 0.1 2020年07月13日 6:03 PM liuzunfei Exp $
*/
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class Response {

int resultCode = ResponseCode.SUCCESS.getCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,14 @@
*/
public enum ResponseCode {

/**
* Request success.
*/
SUCCESS(200, "response ok"),

/**
* Request failed.
*/
FAIL(500, "response fail");

int code;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* @author liuzunfei
* @version $Id: ServerPushResponse.java, v 0.1 2020年07月20日 1:21 PM liuzunfei Exp $
*/
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class ServerPushResponse extends Response {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,21 @@ public class NamingGrpcClientProxy implements NamingClientProxy {

private final RpcClient rpcClient;

private final NamingGrpcConnectionEventListener namingGrpcConnectionEventListener;

public NamingGrpcClientProxy(String namespaceId, ServerListFactory serverListFactory,
ServiceInfoHolder serviceInfoHolder) throws NacosException {
this.namespaceId = namespaceId;
this.rpcClient = RpcClientFactory.getClient("naming");
this.namingGrpcConnectionEventListener = new NamingGrpcConnectionEventListener(this);
start(serverListFactory, serviceInfoHolder);
}

private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {
rpcClient.init(serverListFactory);
rpcClient.start();
rpcClient.registerServerPushResponseHandler(new NamingPushResponseHandler(serviceInfoHolder));
rpcClient.registerConnectionListener(namingGrpcConnectionEventListener);
}

@Override
Expand All @@ -76,6 +80,7 @@ public void registerService(String serviceName, String groupName, Instance insta
InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
NamingRemoteConstants.REGISTER_INSTANCE, instance);
requestToServer(request, Response.class);
namingGrpcConnectionEventListener.cacheInstanceForRedo(serviceName, groupName, instance);
}

@Override
Expand All @@ -86,6 +91,7 @@ public void deregisterService(String serviceName, String groupName, Instance ins
InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
NamingRemoteConstants.DE_REGISTER_INSTANCE, instance);
requestToServer(request, Response.class);
namingGrpcConnectionEventListener.removeInstanceForRedo(serviceName, groupName, instance);
}

@Override
Expand Down Expand Up @@ -147,14 +153,17 @@ public ServiceInfo subscribe(String serviceName, String groupName, String cluste
SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, serviceNameWithGroup, clusters,
true);
SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);
namingGrpcConnectionEventListener.cacheSubscriberForRedo(serviceNameWithGroup, clusters);
return response.getServiceInfo();
}

@Override
public void unsubscribe(String serviceName, String groupName, String clusters) throws NacosException {
SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId,
NamingUtils.getGroupedName(serviceName, groupName), clusters, false);
String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, serviceNameWithGroup, clusters,
false);
requestToServer(request, SubscribeServiceResponse.class);
namingGrpcConnectionEventListener.removeSubscriberForRedo(serviceNameWithGroup, clusters);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.client.naming.remote.gprc;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.client.remote.ConnectionEventListener;
import com.alibaba.nacos.client.utils.LogUtils;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* Naming client gprc connection event listener.
*
* <p>
* When connection reconnect to server, redo the register and subscribe.
* </p>
*
* @author xiweng.yy
*/
public class NamingGrpcConnectionEventListener implements ConnectionEventListener {

private final NamingGrpcClientProxy clientProxy;

private final ConcurrentMap<String, Set<Instance>> registeredInstanceCached = new ConcurrentHashMap<String, Set<Instance>>();

private final Set<String> subscribes = new ConcurrentHashSet<String>();

public NamingGrpcConnectionEventListener(NamingGrpcClientProxy clientProxy) {
this.clientProxy = clientProxy;
}

@Override
public void onConnected() {
}

@Override
public void onReconnected() {
redoSubscribe();
redoRegisterEachService();
}

private void redoSubscribe() {
for (String each : subscribes) {
ServiceInfo serviceInfo = ServiceInfo.fromKey(each);
try {
clientProxy.subscribe(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters());
} catch (NacosException e) {
LogUtils.NAMING_LOGGER.warn(String.format("re subscribe service %s failed", serviceInfo.getName()), e);
}
}
}

private void redoRegisterEachService() {
for (Map.Entry<String, Set<Instance>> each : registeredInstanceCached.entrySet()) {
String serviceName = NamingUtils.getServiceName(each.getKey());
String groupName = NamingUtils.getGroupName(each.getKey());
redoRegisterEachInstance(serviceName, groupName, each.getValue());
}
}

private void redoRegisterEachInstance(String serviceName, String groupName, Set<Instance> instances) {
for (Instance each : instances) {
try {
clientProxy.registerService(serviceName, groupName, each);
} catch (NacosException e) {
LogUtils.NAMING_LOGGER
.warn(String.format("redo register for service %s@@%s failed", groupName, serviceName), e);
}
}
}

@Override
public void onDisConnect() {
}

/**
* Cache registered instance for redo.
*
* @param serviceName service name
* @param groupName group name
* @param instance registered instance
*/
public void cacheInstanceForRedo(String serviceName, String groupName, Instance instance) {
String key = NamingUtils.getGroupedName(serviceName, groupName);
registeredInstanceCached.putIfAbsent(key, new ConcurrentHashSet<Instance>());
registeredInstanceCached.get(key).add(instance);
}

/**
* Remove registered instance for redo.
*
* @param serviceName service name
* @param groupName group name
* @param instance registered instance
*/
public void removeInstanceForRedo(String serviceName, String groupName, Instance instance) {
String key = NamingUtils.getGroupedName(serviceName, groupName);
Set<Instance> instances = registeredInstanceCached.get(key);
if (null != instances) {
instances.remove(instance);
}
}

public void cacheSubscriberForRedo(String fullServiceName, String cluster) {
subscribes.add(ServiceInfo.getKey(fullServiceName, cluster));
}

public void removeSubscriberForRedo(String fullServiceName, String cluster) {
subscribes.remove(ServiceInfo.getKey(fullServiceName, cluster));
}
}
Loading

0 comments on commit 5f528d8

Please sign in to comment.