Skip to content

Commit

Permalink
Merge pull request #47 from Peergos/fix/http-proxy-race
Browse files Browse the repository at this point in the history
Fix race condition setting up p2p http proxy handler
  • Loading branch information
ianopolous authored Jul 10, 2023
2 parents d75b15b + 4a0f5e0 commit 1ce93f8
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 25 deletions.
46 changes: 26 additions & 20 deletions src/main/java/org/peergos/protocol/http/HttpProtocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ public Sender(Stream stream) {

@Override
public void onMessage(@NotNull Stream stream, FullHttpResponse msg) {
queue.poll().complete(msg.copy());
CompletableFuture<FullHttpResponse> req = queue.poll();
if (req != null)
req.complete(msg.copy());
msg.release();
}

public CompletableFuture<FullHttpResponse> send(FullHttpRequest req) {
Expand All @@ -54,21 +57,21 @@ public CompletableFuture<FullHttpResponse> send(FullHttpRequest req) {
}
}

public static class ResponseWriter extends SimpleChannelInboundHandler<HttpObject> {
private final Consumer<HttpObject> replyProcessor;
public static class ResponseWriter extends SimpleChannelInboundHandler<HttpContent> {
private final Consumer<HttpContent> replyProcessor;

public ResponseWriter(Consumer<HttpObject> replyProcessor) {
public ResponseWriter(Consumer<HttpContent> replyProcessor) {
this.replyProcessor = replyProcessor;
}

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject reply) {
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpContent reply) {
replyProcessor.accept(reply);
}
}

public interface HttpRequestProcessor {
void handle(Stream stream, HttpRequest msg, Consumer<HttpObject> replyHandler);
void handle(Stream stream, HttpRequest msg, Consumer<HttpContent> replyHandler);
}

public static class Receiver implements ProtocolMessageHandler<HttpRequest>, HttpController {
Expand All @@ -78,11 +81,8 @@ public Receiver(HttpRequestProcessor requestHandler) {
this.requestHandler = requestHandler;
}

private void sendReply(HttpObject reply, Stream p2pstream) {
if (reply instanceof HttpContent)
p2pstream.writeAndFlush(((HttpContent) reply).retain());
else
p2pstream.writeAndFlush(reply);
private void sendReply(HttpContent reply, Stream p2pstream) {
p2pstream.writeAndFlush(reply.copy());
}

@Override
Expand All @@ -91,24 +91,29 @@ public void onMessage(@NotNull Stream stream, HttpRequest msg) {
}

public CompletableFuture<FullHttpResponse> send(FullHttpRequest req) {
return CompletableFuture.failedFuture(new IllegalStateException("Cannot send form a receiver!"));
return CompletableFuture.failedFuture(new IllegalStateException("Cannot send from a receiver!"));
}
}

private static final NioEventLoopGroup pool = new NioEventLoopGroup();
public static void proxyRequest(HttpRequest msg,
SocketAddress proxyTarget,
Consumer<HttpObject> replyHandler) {
Bootstrap b = new Bootstrap();
b.group(pool)
Consumer<HttpContent> replyHandler) {
Bootstrap b = new Bootstrap()
.group(pool)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler(LogLevel.TRACE));
.handler(new ChannelInitializer<>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new HttpRequestEncoder());
ch.pipeline().addLast(new HttpResponseDecoder());
ch.pipeline().addLast(new HttpObjectAggregator(2*1024*1024));
ch.pipeline().addLast(new ResponseWriter(replyHandler));
}
});

ChannelFuture fut = b.connect(proxyTarget);
Channel ch = fut.channel();
ch.pipeline().addLast(new HttpRequestEncoder());
ch.pipeline().addLast(new HttpResponseDecoder());
ch.pipeline().addLast(new ResponseWriter(replyHandler));

HttpRequest retained = retain(msg);
fut.addListener(x -> ch.writeAndFlush(retained));
Expand All @@ -120,7 +125,7 @@ private static HttpRequest retain(HttpRequest req) {
return req;
}

private static final int TRAFFIC_LIMIT = 2*1024*1024;
private static final long TRAFFIC_LIMIT = Long.MAX_VALUE; // This is the total inbound or outbound traffic allowed, not a rate
private final HttpRequestProcessor handler;

public HttpProtocol(HttpRequestProcessor handler) {
Expand All @@ -145,6 +150,7 @@ protected CompletableFuture<HttpController> onStartInitiator(@NotNull Stream str
stream.pushHandler(new HttpResponseDecoder());
stream.pushHandler(new HttpObjectAggregator(1024*1024));
stream.pushHandler(replyPropagator);
stream.pushHandler(new LoggingHandler(LogLevel.TRACE));
return CompletableFuture.completedFuture(replyPropagator);
}

Expand Down
25 changes: 20 additions & 5 deletions src/test/java/org/peergos/HttpProxyTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ public void p2pProxyRequest() throws IOException {
node2.start().join();

// start local server with fixed HTTP response
byte[] httpReply = "G'day from Java P2P HTTP proxy!".getBytes(StandardCharsets.UTF_8);
byte[] httpReply = new byte[577*1024];
new Random(42).nextBytes(httpReply);
HttpServer localhostServer = HttpServer.create(proxyTarget, 20);
localhostServer.createContext("/", ex -> {
ex.sendResponseHeaders(200, httpReply.length);
ex.getResponseBody().write(httpReply);
ex.getResponseBody().close();
System.out.println("Target http server responded");
//System.out.println("Target http server responded");
});
localhostServer.setExecutor(Executors.newSingleThreadExecutor());
localhostServer.start();
Expand All @@ -56,19 +57,33 @@ public void p2pProxyRequest() throws IOException {
long t1 = System.currentTimeMillis();
FullHttpResponse resp = proxier.send(httpRequest.retain()).join();
long t2 = System.currentTimeMillis();
System.out.println("P2P HTTP request took " + (t2 - t1) + "ms");
System.out.println(i + ": P2P HTTP request took " + (t2 - t1) + "ms");
totalTime += t2 - t1;

ByteArrayOutputStream bout = new ByteArrayOutputStream();
resp.content().readBytes(bout, resp.headers().getInt("content-length"));
resp.release();
byte[] replyBody = bout.toByteArray();
if (!Arrays.equals(replyBody, httpReply))
throw new IllegalStateException("Different http response!");
equal(replyBody, httpReply);
}
System.out.println("Average: " + totalTime / count);
} finally {
node1.stop();
node2.stop();
}
}

private static void equal(byte[] a, byte[] b) {
if (a.length != b.length)
throw new IllegalStateException("different lengths!");
for (int i = 0; i < a.length; i++)
if (a[i] != b[i]) {
byte[] diff = Arrays.copyOfRange(a, i, i + 24);
int j=0;
for (;j < b.length-2;j++)
if (b[j] == diff[0] && b[j+1] == diff[1]&& b[j+2] == diff[2])
break;
throw new IllegalStateException("bytes differ at " + i + " " + a[i] + " != " + b[i]);
}
}
}

0 comments on commit 1ce93f8

Please sign in to comment.