Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition setting up p2p http proxy handler #47

Merged
merged 3 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 26 additions & 20 deletions src/main/java/org/peergos/protocol/http/HttpProtocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ public Sender(Stream stream) {

@Override
public void onMessage(@NotNull Stream stream, FullHttpResponse msg) {
queue.poll().complete(msg.copy());
CompletableFuture<FullHttpResponse> req = queue.poll();
if (req != null)
req.complete(msg.copy());
msg.release();
}

public CompletableFuture<FullHttpResponse> send(FullHttpRequest req) {
Expand All @@ -54,21 +57,21 @@ public CompletableFuture<FullHttpResponse> send(FullHttpRequest req) {
}
}

public static class ResponseWriter extends SimpleChannelInboundHandler<HttpObject> {
private final Consumer<HttpObject> replyProcessor;
public static class ResponseWriter extends SimpleChannelInboundHandler<HttpContent> {
private final Consumer<HttpContent> replyProcessor;

public ResponseWriter(Consumer<HttpObject> replyProcessor) {
public ResponseWriter(Consumer<HttpContent> replyProcessor) {
this.replyProcessor = replyProcessor;
}

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject reply) {
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpContent reply) {
replyProcessor.accept(reply);
}
}

public interface HttpRequestProcessor {
void handle(Stream stream, HttpRequest msg, Consumer<HttpObject> replyHandler);
void handle(Stream stream, HttpRequest msg, Consumer<HttpContent> replyHandler);
}

public static class Receiver implements ProtocolMessageHandler<HttpRequest>, HttpController {
Expand All @@ -78,11 +81,8 @@ public Receiver(HttpRequestProcessor requestHandler) {
this.requestHandler = requestHandler;
}

private void sendReply(HttpObject reply, Stream p2pstream) {
if (reply instanceof HttpContent)
p2pstream.writeAndFlush(((HttpContent) reply).retain());
else
p2pstream.writeAndFlush(reply);
private void sendReply(HttpContent reply, Stream p2pstream) {
p2pstream.writeAndFlush(reply.copy());
}

@Override
Expand All @@ -91,24 +91,29 @@ public void onMessage(@NotNull Stream stream, HttpRequest msg) {
}

public CompletableFuture<FullHttpResponse> send(FullHttpRequest req) {
return CompletableFuture.failedFuture(new IllegalStateException("Cannot send form a receiver!"));
return CompletableFuture.failedFuture(new IllegalStateException("Cannot send from a receiver!"));
}
}

private static final NioEventLoopGroup pool = new NioEventLoopGroup();
public static void proxyRequest(HttpRequest msg,
SocketAddress proxyTarget,
Consumer<HttpObject> replyHandler) {
Bootstrap b = new Bootstrap();
b.group(pool)
Consumer<HttpContent> replyHandler) {
Bootstrap b = new Bootstrap()
.group(pool)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler(LogLevel.TRACE));
.handler(new ChannelInitializer<>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new HttpRequestEncoder());
ch.pipeline().addLast(new HttpResponseDecoder());
ch.pipeline().addLast(new HttpObjectAggregator(2*1024*1024));
ch.pipeline().addLast(new ResponseWriter(replyHandler));
}
});

ChannelFuture fut = b.connect(proxyTarget);
Channel ch = fut.channel();
ch.pipeline().addLast(new HttpRequestEncoder());
ch.pipeline().addLast(new HttpResponseDecoder());
ch.pipeline().addLast(new ResponseWriter(replyHandler));

HttpRequest retained = retain(msg);
fut.addListener(x -> ch.writeAndFlush(retained));
Expand All @@ -120,7 +125,7 @@ private static HttpRequest retain(HttpRequest req) {
return req;
}

private static final int TRAFFIC_LIMIT = 2*1024*1024;
private static final long TRAFFIC_LIMIT = Long.MAX_VALUE; // This is the total inbound or outbound traffic allowed, not a rate
private final HttpRequestProcessor handler;

public HttpProtocol(HttpRequestProcessor handler) {
Expand All @@ -145,6 +150,7 @@ protected CompletableFuture<HttpController> onStartInitiator(@NotNull Stream str
stream.pushHandler(new HttpResponseDecoder());
stream.pushHandler(new HttpObjectAggregator(1024*1024));
stream.pushHandler(replyPropagator);
stream.pushHandler(new LoggingHandler(LogLevel.TRACE));
return CompletableFuture.completedFuture(replyPropagator);
}

Expand Down
25 changes: 20 additions & 5 deletions src/test/java/org/peergos/HttpProxyTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ public void p2pProxyRequest() throws IOException {
node2.start().join();

// start local server with fixed HTTP response
byte[] httpReply = "G'day from Java P2P HTTP proxy!".getBytes(StandardCharsets.UTF_8);
byte[] httpReply = new byte[577*1024];
new Random(42).nextBytes(httpReply);
HttpServer localhostServer = HttpServer.create(proxyTarget, 20);
localhostServer.createContext("/", ex -> {
ex.sendResponseHeaders(200, httpReply.length);
ex.getResponseBody().write(httpReply);
ex.getResponseBody().close();
System.out.println("Target http server responded");
//System.out.println("Target http server responded");
});
localhostServer.setExecutor(Executors.newSingleThreadExecutor());
localhostServer.start();
Expand All @@ -56,19 +57,33 @@ public void p2pProxyRequest() throws IOException {
long t1 = System.currentTimeMillis();
FullHttpResponse resp = proxier.send(httpRequest.retain()).join();
long t2 = System.currentTimeMillis();
System.out.println("P2P HTTP request took " + (t2 - t1) + "ms");
System.out.println(i + ": P2P HTTP request took " + (t2 - t1) + "ms");
totalTime += t2 - t1;

ByteArrayOutputStream bout = new ByteArrayOutputStream();
resp.content().readBytes(bout, resp.headers().getInt("content-length"));
resp.release();
byte[] replyBody = bout.toByteArray();
if (!Arrays.equals(replyBody, httpReply))
throw new IllegalStateException("Different http response!");
equal(replyBody, httpReply);
}
System.out.println("Average: " + totalTime / count);
} finally {
node1.stop();
node2.stop();
}
}

private static void equal(byte[] a, byte[] b) {
if (a.length != b.length)
throw new IllegalStateException("different lengths!");
for (int i = 0; i < a.length; i++)
if (a[i] != b[i]) {
byte[] diff = Arrays.copyOfRange(a, i, i + 24);
int j=0;
for (;j < b.length-2;j++)
if (b[j] == diff[0] && b[j+1] == diff[1]&& b[j+2] == diff[2])
break;
throw new IllegalStateException("bytes differ at " + i + " " + a[i] + " != " + b[i]);
}
}
}
Loading