Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Grouping multiple event streams to use single netty bootstrap, thread pools and etc. #2

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
target
out
*.iws
*.iml
*.ipr
*.swp
.idea
release.properties
Expand Down
25 changes: 0 additions & 25 deletions eventsource-client.iml

This file was deleted.

294 changes: 0 additions & 294 deletions eventsource-client.ipr

This file was deleted.

78 changes: 45 additions & 33 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,35 +1,37 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.github.aslakhellesoy</groupId>
<groupId>com.github</groupId>
<artifactId>eventsource-client</artifactId>
<name>${project.artifactId}</name>
<name>Java EventSource Client</name>
<description>A Java EventSource Client</description>
<url>http://aslakhellesoy.github.com/eventsource-java</url>
<version>0.1.2.1</version>
<version>0.6-SNAPSHOT</version>
<packaging>jar</packaging>
<parent>
<groupId>org.sonatype.oss</groupId>
<artifactId>oss-parent</artifactId>
<version>6</version>
</parent>

<licenses>
<license>
<name>BSD License</name>
<url>http://www.opensource.org/licenses/bsd-license</url>
<distribution>repo</distribution>
</license>
</licenses>

<scm>
<connection>scm:git:git://github.com/aslakhellesoy/eventsource-java.git</connection>
<developerConnection>scm:git:git@github.com:aslakhellesoy/eventsource-java.git</developerConnection>
<url>git://github.com/aslakhellesoy/eventsource-java.git</url>
<connection>scm:git:https://github.com/andll/eventsource-java.git</connection>
<developerConnection>scm:git:https://github.com/andll/eventsource-java.git</developerConnection>
<url>git://github.com/andll/eventsource-java.git</url>
</scm>
<repositories>

<distributionManagement>
<repository>
<id>repository.jboss.org</id>
<url>http://repository.jboss.org/nexus/content/groups/public/</url>
<id>mind.releases</id>
<url>http://nexus.mindlabs.com/content/repositories/mind.releases</url>
</repository>
</repositories>
<snapshotRepository>
<id>mind.snapshots</id>
<url>http://nexus.mindlabs.com/content/repositories/mind.snapshots</url>
</snapshotRepository>
</distributionManagement>

<dependencies>
<dependency>
<groupId>org.jboss.netty</groupId>
Expand Down Expand Up @@ -67,23 +69,33 @@
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.1</version>
<configuration>
<useAgent>true</useAgent>
</configuration>
<executions>
<execution>
<id>sign-artifacts</id>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>sign</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.1</version>
<configuration>
<useAgent>true</useAgent>
</configuration>
<executions>
<execution>
<id>sign-artifacts</id>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
57 changes: 12 additions & 45 deletions src/main/java/com/github/eventsource/client/EventSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,80 +2,47 @@

import com.github.eventsource.client.impl.AsyncEventSourceHandler;
import com.github.eventsource.client.impl.netty.EventSourceChannelHandler;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.Delimiters;
import org.jboss.netty.handler.codec.http.HttpRequestEncoder;
import org.jboss.netty.handler.codec.string.StringDecoder;

import java.net.InetSocketAddress;
import java.net.URI;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class EventSource {
public class EventSource {
public static final long DEFAULT_RECONNECTION_TIME_MILLIS = 2000;

public static final int CONNECTING = 0;
public static final int OPEN = 1;
public static final int CLOSED = 2;

private final ClientBootstrap bootstrap;
private final EventSourceChannelHandler clientHandler;

private int readyState;

/**
* Creates a new <a href="http://dev.w3.org/html5/eventsource/">EventSource</a> client. The client will reconnect on
* lost connections automatically, unless the connection is closed explicitly by a call to
* Creates a new <a href="http://dev.w3.org/html5/eventsource/">EventSource</a> client. The client will reconnect on
* lost connections automatically, unless the connection is closed explicitly by a call to
* {@link com.github.eventsource.client.EventSource#close()}.
*
* For sample usage, see examples at <a href="https://github.com/aslakhellesoy/eventsource-java/tree/master/src/test/java/com/github/eventsource/client">GitHub</a>.
*
* @param executor the executor that will receive events
*
* @param eventSourceClient EventSourceClient to start event source at
* @param reconnectionTimeMillis delay before a reconnect is made - in the event of a lost connection
* @param uri where to connect
* @param eventSourceHandler receives events
* @see #close()
*/
public EventSource(Executor executor, long reconnectionTimeMillis, final URI uri, EventSourceHandler eventSourceHandler) {
bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newSingleThreadExecutor(),
Executors.newSingleThreadExecutor()));
bootstrap.setOption("remoteAddress", new InetSocketAddress(uri.getHost(), uri.getPort()));

clientHandler = new EventSourceChannelHandler(new AsyncEventSourceHandler(executor, eventSourceHandler), reconnectionTimeMillis, bootstrap, uri);

bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("line", new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Delimiters.lineDelimiter()));
pipeline.addLast("string", new StringDecoder());
public EventSource(EventSourceClient eventSourceClient, long reconnectionTimeMillis, final URI uri, EventSourceHandler eventSourceHandler) {
clientHandler = new EventSourceChannelHandler(new AsyncEventSourceHandler(eventSourceClient.getEventExecutor(), eventSourceHandler), reconnectionTimeMillis, eventSourceClient, uri);
}

pipeline.addLast("encoder", new HttpRequestEncoder());
pipeline.addLast("es-handler", clientHandler);
return pipeline;
}
});
public EventSource(Executor eventExecutor, long reconnectionTimeMillis, URI uri, EventSourceHandler eventSourceHandler) {
this(new EventSourceClient(eventExecutor), reconnectionTimeMillis, uri, eventSourceHandler);
}

public EventSource(String uri, EventSourceHandler eventSourceHandler) {
this(URI.create(uri), eventSourceHandler);
}

public EventSource(URI uri, EventSourceHandler eventSourceHandler) {
this(Executors.newSingleThreadExecutor(), DEFAULT_RECONNECTION_TIME_MILLIS, uri, eventSourceHandler);
this(new EventSourceClient(), DEFAULT_RECONNECTION_TIME_MILLIS, uri, eventSourceHandler);
}

public ChannelFuture connect() {
readyState = CONNECTING;
return bootstrap.connect();
return clientHandler.connect();
}

/**
Expand Down
91 changes: 91 additions & 0 deletions src/main/java/com/github/eventsource/client/EventSourceClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.github.eventsource.client;

import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.Delimiters;
import org.jboss.netty.handler.codec.http.HttpRequestEncoder;
import org.jboss.netty.handler.codec.string.StringDecoder;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class EventSourceClient {
private final ClientBootstrap bootstrap;
private final Executor eventExecutor;

private final HashMap<Channel, ChannelUpstreamHandler> handlerMap = new HashMap<Channel, ChannelUpstreamHandler>();

public EventSourceClient() {
this(Executors.newSingleThreadExecutor());
}

public EventSourceClient(Executor eventExecutor) {
this.eventExecutor = eventExecutor;
bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newSingleThreadExecutor(),
Executors.newCachedThreadPool()));

bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("line", new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Delimiters.lineDelimiter()));
pipeline.addLast("string", new StringDecoder());

pipeline.addLast("encoder", new HttpRequestEncoder());
pipeline.addLast("es-handler", new Handler());
return pipeline;
}
});
}

public ChannelFuture connect(InetSocketAddress address, ChannelUpstreamHandler handler) {
synchronized (handlerMap) {
ChannelFuture channelFuture = bootstrap.connect(address);
handlerMap.put(channelFuture.getChannel(), handler);
return channelFuture;
}
}

private class Handler extends SimpleChannelUpstreamHandler {
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
final ChannelUpstreamHandler handler;
synchronized (handlerMap) {
handler = handlerMap.get(ctx.getChannel());
}
if (handler == null) {
super.handleUpstream(ctx, e);

if (e instanceof ChannelStateEvent && ((ChannelStateEvent) e).getState() == ChannelState.OPEN) {
return; //Do nothing, this one will not be dispatched to handler, but it's ok
}

System.err.println("Something wrong with dispatching");
} else {
handler.handleUpstream(ctx, e);

if (e instanceof ChannelStateEvent) {
ChannelStateEvent stateEvent = (ChannelStateEvent) e;
if (stateEvent.getState() == ChannelState.BOUND && stateEvent.getValue() == null) {
synchronized (handlerMap) {
handlerMap.remove(ctx.getChannel());
}
}
}
}
}
}

public Executor getEventExecutor() {
return eventExecutor;
}

public void shutdown() {
bootstrap.releaseExternalResources();
}
}
Loading