Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
jiachun.fjc committed Jan 2, 2017
1 parent 7846b1c commit 1a74f15
Show file tree
Hide file tree
Showing 30 changed files with 64 additions and 72 deletions.
2 changes: 1 addition & 1 deletion jupiter-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<artifactId>jupiter</artifactId>
<groupId>org.jupiter</groupId>
<version>1.1.11</version>
<version>1.1.12</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion jupiter-example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<artifactId>jupiter</artifactId>
<groupId>org.jupiter</groupId>
<version>1.1.11</version>
<version>1.1.12</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion jupiter-flightexec/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<artifactId>jupiter</artifactId>
<groupId>org.jupiter</groupId>
<version>1.1.11</version>
<version>1.1.12</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion jupiter-monitor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<artifactId>jupiter</artifactId>
<groupId>org.jupiter</groupId>
<version>1.1.11</version>
<version>1.1.12</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion jupiter-registry-default/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<artifactId>jupiter</artifactId>
<groupId>org.jupiter</groupId>
<version>1.1.11</version>
<version>1.1.12</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion jupiter-registry-zookeeper/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<artifactId>jupiter</artifactId>
<groupId>org.jupiter</groupId>
<version>1.1.11</version>
<version>1.1.12</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion jupiter-registry/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<artifactId>jupiter</artifactId>
<groupId>org.jupiter</groupId>
<version>1.1.11</version>
<version>1.1.12</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion jupiter-rpc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<artifactId>jupiter</artifactId>
<groupId>org.jupiter</groupId>
<version>1.1.11</version>
<version>1.1.12</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,22 +258,22 @@ public GenericInvoker newProxyInstance() {
}

// dispatcher
Dispatcher dispatcher = asDispatcher(metadata, serializerType)
Dispatcher dispatcher = dispatcher(metadata, serializerType)
.hooks(hooks)
.timeoutMillis(timeoutMillis)
.methodsSpecialTimeoutMillis(methodsSpecialTimeoutMillis);

switch (invokeType) {
case SYNC:
return new SyncGenericInvoker(asClusterInvoker(strategy, dispatcher));
return new SyncGenericInvoker(clusterInvoker(strategy, dispatcher));
case ASYNC:
return new AsyncGenericInvoker(asClusterInvoker(strategy, dispatcher));
return new AsyncGenericInvoker(clusterInvoker(strategy, dispatcher));
default:
throw new IllegalStateException("InvokeType: " + invokeType);
}
}

protected Dispatcher asDispatcher(ServiceMetadata metadata, SerializerType serializerType) {
protected Dispatcher dispatcher(ServiceMetadata metadata, SerializerType serializerType) {
switch (dispatchType) {
case ROUND:
return new DefaultRoundDispatcher(
Expand All @@ -285,7 +285,7 @@ protected Dispatcher asDispatcher(ServiceMetadata metadata, SerializerType seria
}
}

private ClusterInvoker asClusterInvoker(ClusterInvoker.Strategy strategy, Dispatcher dispatcher) {
private ClusterInvoker clusterInvoker(ClusterInvoker.Strategy strategy, Dispatcher dispatcher) {
switch (strategy) {
case FAIL_FAST:
return new FailFastClusterInvoker(client, dispatcher);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,18 +236,18 @@ public I newProxyInstance() {
}

// dispatcher
Dispatcher dispatcher = asDispatcher(metadata, serializerType)
Dispatcher dispatcher = dispatcher(metadata, serializerType)
.hooks(hooks)
.timeoutMillis(timeoutMillis)
.methodsSpecialTimeoutMillis(methodsSpecialTimeoutMillis);

Object handler;
switch (invokeType) {
case SYNC:
handler = new SyncInvoker(asClusterInvoker(strategy, dispatcher));
handler = new SyncInvoker(clusterInvoker(strategy, dispatcher));
break;
case ASYNC:
handler = new AsyncInvoker(asClusterInvoker(strategy, dispatcher));
handler = new AsyncInvoker(clusterInvoker(strategy, dispatcher));
break;
default:
throw new IllegalStateException("InvokeType: " + invokeType);
Expand All @@ -256,7 +256,7 @@ public I newProxyInstance() {
return Proxies.getDefault().newProxy(interfaceClass, handler);
}

protected Dispatcher asDispatcher(ServiceMetadata metadata, SerializerType serializerType) {
protected Dispatcher dispatcher(ServiceMetadata metadata, SerializerType serializerType) {
switch (dispatchType) {
case ROUND:
return new DefaultRoundDispatcher(
Expand All @@ -268,7 +268,7 @@ protected Dispatcher asDispatcher(ServiceMetadata metadata, SerializerType seria
}
}

private ClusterInvoker asClusterInvoker(ClusterInvoker.Strategy strategy, Dispatcher dispatcher) {
private ClusterInvoker clusterInvoker(ClusterInvoker.Strategy strategy, Dispatcher dispatcher) {
switch (strategy) {
case FAIL_FAST:
return new FailFastClusterInvoker(client, dispatcher);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,5 @@ public static Strategy parse(String name) {

String name();

InvokeFuture<?> invoke(String methodName, Object[] args, Class<?> returnType) throws Exception;
<T> InvokeFuture<T> invoke(String methodName, Object[] args, Class<T> returnType) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public String name() {
}

@Override
public InvokeFuture<?> invoke(String methodName, Object[] args, Class<?> returnType) throws Exception {
public <T> InvokeFuture<T> invoke(String methodName, Object[] args, Class<T> returnType) throws Exception {
return dispatcher.dispatch(client, methodName, args, returnType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,30 +69,29 @@ public String name() {
}

@Override
public InvokeFuture<?> invoke(String methodName, Object[] args, Class<?> returnType) throws Exception {
FailOverInvokeFuture<?> future = new FailOverInvokeFuture<>(returnType);
public <T> InvokeFuture<T> invoke(String methodName, Object[] args, Class<T> returnType) throws Exception {
FailOverInvokeFuture<T> future = FailOverInvokeFuture.with(returnType);

int tryCount = retries + 1;
invoke0(methodName, args, returnType, tryCount, future, null);

return future;
}

@SuppressWarnings("unchecked")
private void invoke0(final String methodName,
private <T> void invoke0(final String methodName,
final Object[] args,
final Class<?> returnType,
final Class<T> returnType,
final int tryCount,
final FailOverInvokeFuture<?> future,
final FailOverInvokeFuture<T> future,
Throwable lastCause) {

if (tryCount > 0 && isFailoverNeeded(lastCause)) {
InvokeFuture<Object> f = (InvokeFuture<Object>) dispatcher.dispatch(client, methodName, args, returnType);
InvokeFuture<T> f = dispatcher.dispatch(client, methodName, args, returnType);

f.addListener(new JListener<Object>() {
f.addListener(new JListener<T>() {

@Override
public void complete(Object result) {
public void complete(T result) {
future.setSuccess(result);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public String name() {
}

@Override
public InvokeFuture<?> invoke(String methodName, Object[] args, Class<?> returnType) throws Exception {
InvokeFuture<?> future = dispatcher.dispatch(client, methodName, args, returnType);
return new FailSafeInvokeFuture<>(future);
public <T> InvokeFuture<T> invoke(String methodName, Object[] args, Class<T> returnType) throws Exception {
InvokeFuture<T> future = dispatcher.dispatch(client, methodName, args, returnType);
return FailSafeInvokeFuture.with(future);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ protected MessageWrapper doTracing(JRequest request, MessageWrapper message, Str
return message;
}

protected DefaultInvokeFuture<?> write(
JChannel channel, final JRequest request, final DefaultInvokeFuture<?> future, final DispatchType dispatchType) {
protected <T> DefaultInvokeFuture<T> write(
JChannel channel, final JRequest request, final DefaultInvokeFuture<T> future, final DispatchType dispatchType) {

final JRequestBytes requestBytes = request.requestBytes();
final ConsumerHook[] hooks = future.hooks();
Expand Down Expand Up @@ -213,6 +213,4 @@ public void operationFailure(JChannel channel, Throwable cause) throws Exception

return future;
}

protected abstract DefaultInvokeFuture<?> asFuture(JRequest request, JChannel channel, Class<?> returnType, long timeoutMillis);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

import org.jupiter.rpc.JClient;
import org.jupiter.rpc.JRequest;
import org.jupiter.rpc.consumer.future.DefaultInvokeFutureGroup;
import org.jupiter.rpc.consumer.future.DefaultInvokeFuture;
import org.jupiter.rpc.consumer.future.DefaultInvokeFutureGroup;
import org.jupiter.rpc.consumer.future.InvokeFuture;
import org.jupiter.rpc.load.balance.LoadBalancer;
import org.jupiter.rpc.model.metadata.MessageWrapper;
Expand All @@ -46,8 +46,9 @@ public DefaultBroadcastDispatcher(
super(loadBalancer, metadata, serializerType);
}

@SuppressWarnings("unchecked")
@Override
public InvokeFuture<?> dispatch(JClient client, String methodName, Object[] args, Class<?> returnType) {
public <T> InvokeFuture<T> dispatch(JClient client, String methodName, Object[] args, Class<T> returnType) {
// stack copy
final ServiceMetadata _metadata = metadata();
final Serializer _serializer = serializer();
Expand All @@ -60,7 +61,6 @@ public InvokeFuture<?> dispatch(JClient client, String methodName, Object[] args

CopyOnWriteGroupList groups = client.connector().directory(_metadata);
JChannel[] channels = new JChannel[groups.size()];
InvokeFuture<?>[] futures = new DefaultInvokeFuture[channels.length];
for (int i = 0; i < groups.size(); i++) {
channels[i] = groups.get(i).next();
}
Expand All @@ -72,20 +72,15 @@ public InvokeFuture<?> dispatch(JClient client, String methodName, Object[] args
request.message(message);
request.bytes(s_code, bytes);

InvokeFuture<T>[] futures = new DefaultInvokeFuture[channels.length];
long timeoutMillis = methodSpecialTimeoutMillis(methodName);
for (int i = 0; i < channels.length; i++) {
JChannel ch = channels[i];
DefaultInvokeFuture<?> future = asFuture(request, ch, returnType, timeoutMillis)
DefaultInvokeFuture<T> future = DefaultInvokeFuture.with(request.invokeId(), ch, returnType, timeoutMillis, BROADCAST)
.hooks(hooks());
futures[i] = write(ch, request, future, BROADCAST);
}

return DefaultInvokeFutureGroup.with(futures);
}

@SuppressWarnings("unchecked")
@Override
protected DefaultInvokeFuture<?> asFuture(JRequest request, JChannel channel, Class<?> returnType, long timeoutMillis) {
return new DefaultInvokeFuture(request.invokeId(), channel, returnType, timeoutMillis, BROADCAST);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public DefaultRoundDispatcher(
}

@Override
public InvokeFuture<?> dispatch(JClient client, String methodName, Object[] args, Class<?> returnType) {
public <T> InvokeFuture<T> dispatch(JClient client, String methodName, Object[] args, Class<T> returnType) {
// stack copy
final Serializer _serializer = serializer();

Expand All @@ -68,15 +68,9 @@ public InvokeFuture<?> dispatch(JClient client, String methodName, Object[] args
request.bytes(s_code, bytes);

long timeoutMillis = methodSpecialTimeoutMillis(methodName);
DefaultInvokeFuture<?> future = asFuture(request, channel, returnType, timeoutMillis)
DefaultInvokeFuture<T> future = DefaultInvokeFuture.with(request.invokeId(), channel, returnType, timeoutMillis, ROUND)
.hooks(hooks());

return write(channel, request, future, ROUND);
}

@SuppressWarnings("unchecked")
@Override
protected DefaultInvokeFuture<?> asFuture(JRequest request, JChannel channel, Class<?> returnType, long timeoutMillis) {
return new DefaultInvokeFuture(request.invokeId(), channel, returnType, timeoutMillis);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
*/
public interface Dispatcher {

InvokeFuture<?> dispatch(JClient client, String methodName, Object[] args, Class<?> returnType);
<T> InvokeFuture<T> dispatch(JClient client, String methodName, Object[] args, Class<T> returnType);

ServiceMetadata metadata();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import static org.jupiter.common.util.Preconditions.checkNotNull;
import static org.jupiter.common.util.StackTraceUtil.stackTrace;
import static org.jupiter.rpc.ConsumerHook.EMPTY_HOOKS;
import static org.jupiter.rpc.DispatchType.ROUND;
import static org.jupiter.transport.Status.*;

/**
Expand Down Expand Up @@ -71,11 +70,11 @@ public class DefaultInvokeFuture<V> extends AbstractInvokeFuture<V> {

private ConsumerHook[] hooks = EMPTY_HOOKS;

public DefaultInvokeFuture(long invokeId, JChannel channel, Class<V> returnType, long timeoutMillis) {
this(invokeId, channel, returnType, timeoutMillis, ROUND);
public static <T> DefaultInvokeFuture<T> with(long invokeId, JChannel channel, Class<T> returnType, long timeoutMillis, DispatchType dispatchType) {
return new DefaultInvokeFuture<T>(invokeId, channel, returnType, timeoutMillis, dispatchType);
}

public DefaultInvokeFuture(long invokeId, JChannel channel, Class<V> returnType, long timeoutMillis, DispatchType dispatchType) {
private DefaultInvokeFuture(long invokeId, JChannel channel, Class<V> returnType, long timeoutMillis, DispatchType dispatchType) {
this.invokeId = invokeId;
this.channel = channel;
this.returnType = returnType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ public class DefaultInvokeFutureGroup<V> implements InvokeFutureGroup<V> {

private final InvokeFuture<V>[] futures;

@SuppressWarnings("unchecked")
public static DefaultInvokeFutureGroup<?> with(InvokeFuture<?>[] futures) {
return new DefaultInvokeFutureGroup(futures);
public static <T> DefaultInvokeFutureGroup<T> with(InvokeFuture<T>[] futures) {
return new DefaultInvokeFutureGroup<>(futures);
}

private DefaultInvokeFutureGroup(InvokeFuture<V>[] futures) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ public class FailOverInvokeFuture<V> extends AbstractInvokeFuture<V> {

private final Class<V> returnType;

public FailOverInvokeFuture(Class<V> returnType) {
public static <T> FailOverInvokeFuture<T> with(Class<T> returnType) {
return new FailOverInvokeFuture<>(returnType);
}

private FailOverInvokeFuture(Class<V> returnType) {
this.returnType = returnType;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ public class FailSafeInvokeFuture<V> implements InvokeFuture<V> {

private final InvokeFuture<V> future;

public FailSafeInvokeFuture(InvokeFuture<V> future) {
public static <T> FailSafeInvokeFuture<T> with(InvokeFuture<T> future) {
return new FailSafeInvokeFuture<>(future);
}

private FailSafeInvokeFuture(InvokeFuture<V> future) {
this.future = future;
}

Expand Down
2 changes: 1 addition & 1 deletion jupiter-serialization-hessian/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<artifactId>jupiter</artifactId>
<groupId>org.jupiter</groupId>
<version>1.1.11</version>
<version>1.1.12</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion jupiter-serialization-kryo/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<artifactId>jupiter</artifactId>
<groupId>org.jupiter</groupId>
<version>1.1.11</version>
<version>1.1.12</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Loading

0 comments on commit 1a74f15

Please sign in to comment.