diff --git a/src/main/java/org/peergos/protocol/http/HttpProtocol.java b/src/main/java/org/peergos/protocol/http/HttpProtocol.java index 2012301d..55b35c14 100644 --- a/src/main/java/org/peergos/protocol/http/HttpProtocol.java +++ b/src/main/java/org/peergos/protocol/http/HttpProtocol.java @@ -41,11 +41,15 @@ public Sender(Stream stream) { } @Override - public void onMessage(@NotNull Stream stream, FullHttpResponse msg) { - queue.poll().complete(msg.copy()); + public synchronized void onMessage(@NotNull Stream stream, FullHttpResponse msg) { + CompletableFuture req = queue.poll(); + if (req != null) + req.complete(msg.copy()); + else + msg.release(); } - public CompletableFuture send(FullHttpRequest req) { + public synchronized CompletableFuture send(FullHttpRequest req) { CompletableFuture res = new CompletableFuture<>(); queue.add(res); req.headers().set(HttpHeaderNames.HOST, stream.remotePeerId()); @@ -99,16 +103,20 @@ public CompletableFuture send(FullHttpRequest req) { public static void proxyRequest(HttpRequest msg, SocketAddress proxyTarget, Consumer replyHandler) { - Bootstrap b = new Bootstrap(); - b.group(pool) + Bootstrap b = new Bootstrap() + .group(pool) .channel(NioSocketChannel.class) - .handler(new LoggingHandler(LogLevel.TRACE)); + .handler(new ChannelInitializer<>() { + @Override + protected void initChannel(Channel ch) throws Exception { + ch.pipeline().addLast(new HttpRequestEncoder()); + ch.pipeline().addLast(new HttpResponseDecoder()); + ch.pipeline().addLast(new ResponseWriter(replyHandler)); + } + }); ChannelFuture fut = b.connect(proxyTarget); Channel ch = fut.channel(); - ch.pipeline().addLast(new HttpRequestEncoder()); - ch.pipeline().addLast(new HttpResponseDecoder()); - ch.pipeline().addLast(new ResponseWriter(replyHandler)); HttpRequest retained = retain(msg); fut.addListener(x -> ch.writeAndFlush(retained)); @@ -120,7 +128,7 @@ private static HttpRequest retain(HttpRequest req) { return req; } - private static final int TRAFFIC_LIMIT = 2*1024*1024; + private static final long TRAFFIC_LIMIT = Long.MAX_VALUE; // This is the total inbound or outbound traffic allowed, not a rate private final HttpRequestProcessor handler; public HttpProtocol(HttpRequestProcessor handler) { diff --git a/src/test/java/org/peergos/HttpProxyTest.java b/src/test/java/org/peergos/HttpProxyTest.java index f9d43bb4..813d2f58 100644 --- a/src/test/java/org/peergos/HttpProxyTest.java +++ b/src/test/java/org/peergos/HttpProxyTest.java @@ -39,7 +39,7 @@ public void p2pProxyRequest() throws IOException { ex.sendResponseHeaders(200, httpReply.length); ex.getResponseBody().write(httpReply); ex.getResponseBody().close(); - System.out.println("Target http server responded"); + //System.out.println("Target http server responded"); }); localhostServer.setExecutor(Executors.newSingleThreadExecutor()); localhostServer.start(); @@ -56,11 +56,12 @@ public void p2pProxyRequest() throws IOException { long t1 = System.currentTimeMillis(); FullHttpResponse resp = proxier.send(httpRequest.retain()).join(); long t2 = System.currentTimeMillis(); - System.out.println("P2P HTTP request took " + (t2 - t1) + "ms"); + System.out.println(i + ": P2P HTTP request took " + (t2 - t1) + "ms"); totalTime += t2 - t1; ByteArrayOutputStream bout = new ByteArrayOutputStream(); resp.content().readBytes(bout, resp.headers().getInt("content-length")); + resp.release(); byte[] replyBody = bout.toByteArray(); if (!Arrays.equals(replyBody, httpReply)) throw new IllegalStateException("Different http response!");