diff --git a/src/main/java/org/peergos/protocol/http/HttpProtocol.java b/src/main/java/org/peergos/protocol/http/HttpProtocol.java index 55b35c14..f04a669b 100644 --- a/src/main/java/org/peergos/protocol/http/HttpProtocol.java +++ b/src/main/java/org/peergos/protocol/http/HttpProtocol.java @@ -41,7 +41,7 @@ public Sender(Stream stream) { } @Override - public synchronized void onMessage(@NotNull Stream stream, FullHttpResponse msg) { + public void onMessage(@NotNull Stream stream, FullHttpResponse msg) { CompletableFuture req = queue.poll(); if (req != null) req.complete(msg.copy()); @@ -49,7 +49,7 @@ public synchronized void onMessage(@NotNull Stream stream, FullHttpResponse msg) msg.release(); } - public synchronized CompletableFuture send(FullHttpRequest req) { + public CompletableFuture send(FullHttpRequest req) { CompletableFuture res = new CompletableFuture<>(); queue.add(res); req.headers().set(HttpHeaderNames.HOST, stream.remotePeerId()); @@ -95,7 +95,7 @@ public void onMessage(@NotNull Stream stream, HttpRequest msg) { } public CompletableFuture send(FullHttpRequest req) { - return CompletableFuture.failedFuture(new IllegalStateException("Cannot send form a receiver!")); + return CompletableFuture.failedFuture(new IllegalStateException("Cannot send from a receiver!")); } } @@ -111,6 +111,7 @@ public static void proxyRequest(HttpRequest msg, protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new HttpRequestEncoder()); ch.pipeline().addLast(new HttpResponseDecoder()); + ch.pipeline().addLast(new HttpObjectAggregator(2*1024*1024)); ch.pipeline().addLast(new ResponseWriter(replyHandler)); } }); @@ -153,6 +154,7 @@ protected CompletableFuture onStartInitiator(@NotNull Stream str stream.pushHandler(new HttpResponseDecoder()); stream.pushHandler(new HttpObjectAggregator(1024*1024)); stream.pushHandler(replyPropagator); + stream.pushHandler(new LoggingHandler(LogLevel.TRACE)); return CompletableFuture.completedFuture(replyPropagator); } diff --git a/src/test/java/org/peergos/HttpProxyTest.java b/src/test/java/org/peergos/HttpProxyTest.java index 813d2f58..ac21a75a 100644 --- a/src/test/java/org/peergos/HttpProxyTest.java +++ b/src/test/java/org/peergos/HttpProxyTest.java @@ -33,7 +33,8 @@ public void p2pProxyRequest() throws IOException { node2.start().join(); // start local server with fixed HTTP response - byte[] httpReply = "G'day from Java P2P HTTP proxy!".getBytes(StandardCharsets.UTF_8); + byte[] httpReply = new byte[1024*1024]; + new Random(42).nextBytes(httpReply); HttpServer localhostServer = HttpServer.create(proxyTarget, 20); localhostServer.createContext("/", ex -> { ex.sendResponseHeaders(200, httpReply.length); @@ -63,8 +64,7 @@ public void p2pProxyRequest() throws IOException { resp.content().readBytes(bout, resp.headers().getInt("content-length")); resp.release(); byte[] replyBody = bout.toByteArray(); - if (!Arrays.equals(replyBody, httpReply)) - throw new IllegalStateException("Different http response!"); + equal(replyBody, httpReply); } System.out.println("Average: " + totalTime / count); } finally { @@ -72,4 +72,18 @@ public void p2pProxyRequest() throws IOException { node2.stop(); } } + + private static void equal(byte[] a, byte[] b) { + if (a.length != b.length) + throw new IllegalStateException("different lengths!"); + for (int i = 0; i < a.length; i++) + if (a[i] != b[i]) { + byte[] diff = Arrays.copyOfRange(a, i, i + 24); + int j=0; + for (;j < b.length-2;j++) + if (b[j] == diff[0] && b[j+1] == diff[1]&& b[j+2] == diff[2]) + break; + throw new IllegalStateException("bytes differ at " + i + " " + a[i] + " != " + b[i]); + } + } }