Skip to content

Commit

Permalink
For alibaba#1097, client support subscribe service.
Browse files Browse the repository at this point in the history
  • Loading branch information
KomachiSion committed Jul 17, 2020
1 parent 0bdc470 commit 7dc45dc
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.alibaba.nacos.api.naming.remote.response;

import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants;
import com.alibaba.nacos.api.remote.response.Response;

Expand All @@ -26,17 +27,28 @@
*/
public class SubscribeServiceResponse extends Response {

private ServiceInfo serviceInfo;

public SubscribeServiceResponse() {
}

public SubscribeServiceResponse(int resultCode, String message) {
public SubscribeServiceResponse(int resultCode, String message, ServiceInfo serviceInfo) {
super();
setResultCode(resultCode);
setMessage(message);
this.serviceInfo = serviceInfo;
}

@Override
public String getType() {
return NamingRemoteConstants.SUBSCRIBE_SERVICE;
}

public ServiceInfo getServiceInfo() {
return serviceInfo;
}

public void setServiceInfo(ServiceInfo serviceInfo) {
this.serviceInfo = serviceInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,17 @@ public synchronized ScheduledFuture<?> addTask(UpdateTask task) {
*/
public ServiceInfo processServiceJson(String json) {
ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
serviceInfo.setJsonFromServer(json);
return processServiceJson(serviceInfo);
}

/**
* Process service info.
*
* @param serviceInfo new service info
* @return service info
*/
public ServiceInfo processServiceJson(ServiceInfo serviceInfo) {
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
if (serviceInfo.getHosts() == null || !serviceInfo.validate()) {
//empty or error push, just ignore
Expand Down Expand Up @@ -204,8 +215,6 @@ public ServiceInfo processServiceJson(String json) {
+ JacksonUtils.toJson(modHosts));
}

serviceInfo.setJsonFromServer(json);

if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {
eventDispatcher.serviceChanged(serviceInfo);
DiskCache.write(serviceInfo, cacheDir);
Expand All @@ -217,10 +226,13 @@ public ServiceInfo processServiceJson(String json) {
+ JacksonUtils.toJson(serviceInfo.getHosts()));
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
eventDispatcher.serviceChanged(serviceInfo);
serviceInfo.setJsonFromServer(json);
DiskCache.write(serviceInfo, cacheDir);
}

if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
}

MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());

if (changed) {
Expand Down Expand Up @@ -271,10 +283,10 @@ public ServiceInfo getServiceInfo(final String serviceName, final String cluster
serviceObj = new ServiceInfo(serviceName, clusters);

serviceInfoMap.put(serviceObj.getKey(), serviceObj);
updatingMap.put(serviceName, new Object());

updatingService(serviceName);
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName);
finishUpdating(serviceName);

} else if (updatingMap.containsKey(serviceName)) {

Expand Down Expand Up @@ -367,6 +379,14 @@ public void shutdown() throws NacosException {
NAMING_LOGGER.info("{} do shutdown stop", className);
}

public void updatingService(String serviceName) {
updatingMap.put(serviceName, new Object());
}

public void finishUpdating(String serviceName) {
updatingMap.remove(serviceName);
}

public class UpdateTask implements Runnable {

long lastRefTime = Long.MAX_VALUE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,20 @@
* limitations under the License.
*/

package com.alibaba.nacos.client.naming.net;
package com.alibaba.nacos.client.naming.net.gprc;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants;
import com.alibaba.nacos.api.naming.remote.request.InstanceRequest;
import com.alibaba.nacos.api.naming.remote.request.ServiceQueryRequest;
import com.alibaba.nacos.api.naming.remote.request.SubscribeServiceRequest;
import com.alibaba.nacos.api.naming.remote.response.QueryServiceResponse;
import com.alibaba.nacos.api.naming.remote.response.SubscribeServiceResponse;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.client.naming.core.HostReactor;
import com.alibaba.nacos.client.remote.RpcClient;
import com.alibaba.nacos.client.remote.RpcClientFactory;
import com.alibaba.nacos.client.remote.ServerListFactory;
Expand All @@ -40,11 +43,14 @@ public class NamingGrpcClientProxy {

private final String namespaceId;

private HostReactor hostReactor;

private RpcClient rpcClient;

public NamingGrpcClientProxy(String namespaceId) {
public NamingGrpcClientProxy(String namespaceId, HostReactor hostReactor) {
this.namespaceId = namespaceId;
rpcClient = RpcClientFactory.getClient("naming");
this.hostReactor = hostReactor;
this.rpcClient = RpcClientFactory.getClient("naming");
}

public void start(ServerListFactory serverListFactory) throws NacosException {
Expand Down Expand Up @@ -105,6 +111,28 @@ public ServiceInfo queryInstancesOfService(String serviceName, String clusters,
return response.getServiceInfo();
}

/**
* Subscribe service.
*
* @param serviceName full service name with group
* @param clusters clusters, current only support subscribe all clusters
* @return current ervice info of subscribe service
* @throws NacosException nacos exception
*/
public ServiceInfo subscribe(String serviceName, String clusters) throws NacosException {
ServiceInfo serviceInfo = new ServiceInfo(serviceName, clusters);
if (hostReactor.getServiceInfoMap().containsKey(serviceInfo.getKey())) {
return hostReactor.getServiceInfoMap().get(serviceInfo.getKey());
}
hostReactor.updatingService(serviceName);
SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, serviceName, clusters, true);
SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);
ServiceInfo result = response.getServiceInfo();
hostReactor.getServiceInfoMap().put(result.getKey(), result);
hostReactor.finishUpdating(serviceName);
return result;
}

private <T extends Response> T requestToServer(Request request, Class<T> responseClass) throws NacosException {
try {
Response response = rpcClient.request(request);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.client.naming.net.gprc;

import com.alibaba.nacos.api.naming.remote.response.NotifySubscriberResponse;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.client.naming.core.HostReactor;
import com.alibaba.nacos.client.remote.ServerPushResponseHandler;

/**
* Naming push response handler.
*
* @author xiweng.yy
*/
public class NamingPushResponseHandler implements ServerPushResponseHandler<NotifySubscriberResponse> {

private final HostReactor hostReactor;

public NamingPushResponseHandler(HostReactor hostReactor) {
this.hostReactor = hostReactor;
}

@Override
public void responseReply(Response response) {
NotifySubscriberResponse notifyResponse = (NotifySubscriberResponse) response;
hostReactor.processServiceJson(notifyResponse.getServiceInfo());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@

/**
* ServerPushResponseHandler.
*
* @author liuzunfei
* @version $Id: ServerPushResponseHandler.java, v 0.1 2020年07月14日 11:41 AM liuzunfei Exp $
*/
public abstract interface ServerPushResponseHandler<T> {
public interface ServerPushResponseHandler<T> {

/**
* handle logic when response ceceive.
* @param response.
* Handle logic when response received.
* @param response response
*/
public abstract void responseReply(Response response);
void responseReply(Response response);

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.alibaba.nacos.naming.remote.handler;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants;
import com.alibaba.nacos.api.naming.remote.request.SubscribeServiceRequest;
import com.alibaba.nacos.api.naming.remote.response.SubscribeServiceResponse;
Expand All @@ -25,12 +26,13 @@
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.remote.response.ResponseCode;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.core.remote.AsyncListenContext;
import com.alibaba.nacos.core.remote.NacosRemoteConstants;
import com.alibaba.nacos.core.remote.RequestHandler;
import com.alibaba.nacos.naming.core.ServiceInfoGenerator;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;
Expand All @@ -44,8 +46,15 @@
@Component
public class SubscribeServiceRequestHandler extends RequestHandler<SubscribeServiceRequest> {

@Autowired
AsyncListenContext asyncListenContext;
private final AsyncListenContext asyncListenContext;

private final ServiceInfoGenerator serviceInfoGenerator;

public SubscribeServiceRequestHandler(AsyncListenContext asyncListenContext,
ServiceInfoGenerator serviceInfoGenerator) {
this.asyncListenContext = asyncListenContext;
this.serviceInfoGenerator = serviceInfoGenerator;
}

@Override
public SubscribeServiceRequest parseBodyString(String bodyString) {
Expand All @@ -54,15 +63,19 @@ public SubscribeServiceRequest parseBodyString(String bodyString) {

@Override
public Response handle(Request request, RequestMeta meta) throws NacosException {
SubscribeServiceRequest subRequest = (SubscribeServiceRequest) request;
String serviceKey = UtilsAndCommons.assembleFullServiceName(subRequest.getNamespace(), subRequest.getServiceName());
SubscribeServiceRequest subscribeServiceRequest = (SubscribeServiceRequest) request;
String namespaceId = subscribeServiceRequest.getNamespace();
String serviceName = subscribeServiceRequest.getServiceName();
String serviceKey = UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName);
String connectionId = meta.getConnectionId();
if (subRequest.isSubscribe()) {
ServiceInfo serviceInfo = serviceInfoGenerator
.generateServiceInfo(namespaceId, serviceName, StringUtils.EMPTY, false, meta.getClientIp());
if (subscribeServiceRequest.isSubscribe()) {
asyncListenContext.addListen(NacosRemoteConstants.LISTEN_CONTEXT_NAMING, serviceKey, connectionId);
} else {
asyncListenContext.removeListen(NacosRemoteConstants.LISTEN_CONTEXT_NAMING, serviceKey, connectionId);
}
return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success");
return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
}

@Override
Expand Down

0 comments on commit 7dc45dc

Please sign in to comment.