Skip to content

Commit

Permalink
Fix race condition setting up p2p http proxy handler
Browse files Browse the repository at this point in the history
Don't limit p2p http streams to 2 MiB total bytes!
  • Loading branch information
ianopolous committed Jul 7, 2023
1 parent d75b15b commit 43b1334
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 12 deletions.
28 changes: 18 additions & 10 deletions src/main/java/org/peergos/protocol/http/HttpProtocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,15 @@ public Sender(Stream stream) {
}

@Override
public void onMessage(@NotNull Stream stream, FullHttpResponse msg) {
queue.poll().complete(msg.copy());
public synchronized void onMessage(@NotNull Stream stream, FullHttpResponse msg) {
CompletableFuture<FullHttpResponse> req = queue.poll();
if (req != null)
req.complete(msg.copy());
else
msg.release();
}

public CompletableFuture<FullHttpResponse> send(FullHttpRequest req) {
public synchronized CompletableFuture<FullHttpResponse> send(FullHttpRequest req) {
CompletableFuture<FullHttpResponse> res = new CompletableFuture<>();
queue.add(res);
req.headers().set(HttpHeaderNames.HOST, stream.remotePeerId());
Expand Down Expand Up @@ -99,16 +103,20 @@ public CompletableFuture<FullHttpResponse> send(FullHttpRequest req) {
public static void proxyRequest(HttpRequest msg,
SocketAddress proxyTarget,
Consumer<HttpObject> replyHandler) {
Bootstrap b = new Bootstrap();
b.group(pool)
Bootstrap b = new Bootstrap()
.group(pool)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler(LogLevel.TRACE));
.handler(new ChannelInitializer<>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new HttpRequestEncoder());
ch.pipeline().addLast(new HttpResponseDecoder());
ch.pipeline().addLast(new ResponseWriter(replyHandler));
}
});

ChannelFuture fut = b.connect(proxyTarget);
Channel ch = fut.channel();
ch.pipeline().addLast(new HttpRequestEncoder());
ch.pipeline().addLast(new HttpResponseDecoder());
ch.pipeline().addLast(new ResponseWriter(replyHandler));

HttpRequest retained = retain(msg);
fut.addListener(x -> ch.writeAndFlush(retained));
Expand All @@ -120,7 +128,7 @@ private static HttpRequest retain(HttpRequest req) {
return req;
}

private static final int TRAFFIC_LIMIT = 2*1024*1024;
private static final long TRAFFIC_LIMIT = Long.MAX_VALUE; // This is the total inbound or outbound traffic allowed, not a rate
private final HttpRequestProcessor handler;

public HttpProtocol(HttpRequestProcessor handler) {
Expand Down
5 changes: 3 additions & 2 deletions src/test/java/org/peergos/HttpProxyTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void p2pProxyRequest() throws IOException {
ex.sendResponseHeaders(200, httpReply.length);
ex.getResponseBody().write(httpReply);
ex.getResponseBody().close();
System.out.println("Target http server responded");
//System.out.println("Target http server responded");
});
localhostServer.setExecutor(Executors.newSingleThreadExecutor());
localhostServer.start();
Expand All @@ -56,11 +56,12 @@ public void p2pProxyRequest() throws IOException {
long t1 = System.currentTimeMillis();
FullHttpResponse resp = proxier.send(httpRequest.retain()).join();
long t2 = System.currentTimeMillis();
System.out.println("P2P HTTP request took " + (t2 - t1) + "ms");
System.out.println(i + ": P2P HTTP request took " + (t2 - t1) + "ms");
totalTime += t2 - t1;

ByteArrayOutputStream bout = new ByteArrayOutputStream();
resp.content().readBytes(bout, resp.headers().getInt("content-length"));
resp.release();
byte[] replyBody = bout.toByteArray();
if (!Arrays.equals(replyBody, httpReply))
throw new IllegalStateException("Different http response!");
Expand Down

0 comments on commit 43b1334

Please sign in to comment.