Skip to content

Commit

Permalink
Add HttpObjectAggregator to http proxy target response receiver
Browse files Browse the repository at this point in the history
Make http proxy test check responses, and use 1 MB response size
  • Loading branch information
ianopolous committed Jul 7, 2023
1 parent 43b1334 commit 25ef969
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 6 deletions.
8 changes: 5 additions & 3 deletions src/main/java/org/peergos/protocol/http/HttpProtocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ public Sender(Stream stream) {
}

@Override
public synchronized void onMessage(@NotNull Stream stream, FullHttpResponse msg) {
public void onMessage(@NotNull Stream stream, FullHttpResponse msg) {
CompletableFuture<FullHttpResponse> req = queue.poll();
if (req != null)
req.complete(msg.copy());
else
msg.release();
}

public synchronized CompletableFuture<FullHttpResponse> send(FullHttpRequest req) {
public CompletableFuture<FullHttpResponse> send(FullHttpRequest req) {
CompletableFuture<FullHttpResponse> res = new CompletableFuture<>();
queue.add(res);
req.headers().set(HttpHeaderNames.HOST, stream.remotePeerId());
Expand Down Expand Up @@ -95,7 +95,7 @@ public void onMessage(@NotNull Stream stream, HttpRequest msg) {
}

public CompletableFuture<FullHttpResponse> send(FullHttpRequest req) {
return CompletableFuture.failedFuture(new IllegalStateException("Cannot send form a receiver!"));
return CompletableFuture.failedFuture(new IllegalStateException("Cannot send from a receiver!"));
}
}

Expand All @@ -111,6 +111,7 @@ public static void proxyRequest(HttpRequest msg,
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new HttpRequestEncoder());
ch.pipeline().addLast(new HttpResponseDecoder());
ch.pipeline().addLast(new HttpObjectAggregator(2*1024*1024));
ch.pipeline().addLast(new ResponseWriter(replyHandler));
}
});
Expand Down Expand Up @@ -153,6 +154,7 @@ protected CompletableFuture<HttpController> onStartInitiator(@NotNull Stream str
stream.pushHandler(new HttpResponseDecoder());
stream.pushHandler(new HttpObjectAggregator(1024*1024));
stream.pushHandler(replyPropagator);
stream.pushHandler(new LoggingHandler(LogLevel.TRACE));
return CompletableFuture.completedFuture(replyPropagator);
}

Expand Down
20 changes: 17 additions & 3 deletions src/test/java/org/peergos/HttpProxyTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public void p2pProxyRequest() throws IOException {
node2.start().join();

// start local server with fixed HTTP response
byte[] httpReply = "G'day from Java P2P HTTP proxy!".getBytes(StandardCharsets.UTF_8);
byte[] httpReply = new byte[1024*1024];
new Random(42).nextBytes(httpReply);
HttpServer localhostServer = HttpServer.create(proxyTarget, 20);
localhostServer.createContext("/", ex -> {
ex.sendResponseHeaders(200, httpReply.length);
Expand Down Expand Up @@ -63,13 +64,26 @@ public void p2pProxyRequest() throws IOException {
resp.content().readBytes(bout, resp.headers().getInt("content-length"));
resp.release();
byte[] replyBody = bout.toByteArray();
if (!Arrays.equals(replyBody, httpReply))
throw new IllegalStateException("Different http response!");
equal(replyBody, httpReply);
}
System.out.println("Average: " + totalTime / count);
} finally {
node1.stop();
node2.stop();
}
}

private static void equal(byte[] a, byte[] b) {
if (a.length != b.length)
throw new IllegalStateException("different lengths!");
for (int i = 0; i < a.length; i++)
if (a[i] != b[i]) {
byte[] diff = Arrays.copyOfRange(a, i, i + 24);
int j=0;
for (;j < b.length-2;j++)
if (b[j] == diff[0] && b[j+1] == diff[1]&& b[j+2] == diff[2])
break;
throw new IllegalStateException("bytes differ at " + i + " " + a[i] + " != " + b[i]);
}
}
}

0 comments on commit 25ef969

Please sign in to comment.