diff --git a/src/main/java/org/peergos/protocol/http/HttpProtocol.java b/src/main/java/org/peergos/protocol/http/HttpProtocol.java index 217c5f43..2d3fd031 100644 --- a/src/main/java/org/peergos/protocol/http/HttpProtocol.java +++ b/src/main/java/org/peergos/protocol/http/HttpProtocol.java @@ -1,8 +1,5 @@ package org.peergos.protocol.http; -import io.ipfs.cid.*; -import io.ipfs.multibase.*; -import io.ipfs.multihash.*; import io.libp2p.core.*; import io.libp2p.core.multistream.*; import io.libp2p.protocol.*; @@ -46,9 +43,11 @@ public Sender(Stream stream) { @Override public void onMessage(@NotNull Stream stream, FullHttpResponse msg) { CompletableFuture req = queue.poll(); - if (req != null) - req.complete(msg.copy()); - msg.release(); + if (req != null) { + req.complete(msg.retain()); + } else { + msg.release(); + } stream.close(); } @@ -86,7 +85,7 @@ public Receiver(HttpRequestProcessor requestHandler) { } private void sendReply(HttpContent reply, Stream p2pstream) { - p2pstream.writeAndFlush(reply.copy()); + p2pstream.writeAndFlush(reply.retain()); } @Override @@ -120,7 +119,14 @@ protected void initChannel(Channel ch) throws Exception { Channel ch = fut.channel(); FullHttpRequest retained = msg.retain(); - fut.addListener(x -> ch.writeAndFlush(retained)); + fut.addListener(x -> { + if (x.isSuccess()) + ch.writeAndFlush(retained).addListener(f -> { + retained.release(); + }); + else + retained.release(); + }); } private static final long TRAFFIC_LIMIT = Long.MAX_VALUE; // This is the total inbound or outbound traffic allowed, not a rate diff --git a/src/test/java/org/peergos/EmbeddedIpfsTest.java b/src/test/java/org/peergos/EmbeddedIpfsTest.java index dc15bfc4..f06a3165 100644 --- a/src/test/java/org/peergos/EmbeddedIpfsTest.java +++ b/src/test/java/org/peergos/EmbeddedIpfsTest.java @@ -1,5 +1,6 @@ package org.peergos; +import com.sun.net.httpserver.HttpServer; import identify.pb.*; import io.ipfs.cid.*; import io.ipfs.multiaddr.*; @@ -9,12 +10,22 @@ import io.libp2p.core.multiformats.*; import io.libp2p.crypto.keys.*; import io.libp2p.protocol.*; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpVersion; import org.junit.*; import org.peergos.blockstore.*; import org.peergos.config.*; import org.peergos.protocol.dht.*; +import org.peergos.protocol.http.HttpProtocol; import org.peergos.protocol.ipns.*; +import java.io.ByteArrayOutputStream; +import java.io.OutputStream; +import java.net.InetSocketAddress; import java.time.*; import java.util.*; import java.util.concurrent.*; @@ -43,6 +54,53 @@ public void largeBlock() throws Exception { node2.stop(); } + @Test + public void largeWrite() throws Exception { + System.setProperty("io.netty.leakDetection.level", "advanced"); + // Start proxy target + InetSocketAddress proxyTarget = new InetSocketAddress("localhost", 7777); + HttpServer target = HttpServer.create(proxyTarget, 20); + String reply = "AllGood"; + byte[] replyBytes = reply.getBytes(); + target.createContext("/", ex -> { + ex.sendResponseHeaders(200, replyBytes.length); + OutputStream out = ex.getResponseBody(); + out.write(replyBytes); + out.flush(); + out.close(); + }); + target.start(); + + HttpProtocol.HttpRequestProcessor http1 = (s, req, h) -> HttpProtocol.proxyRequest(req, proxyTarget, h); + EmbeddedIpfs node1 = build(Collections.emptyList(), List.of(new MultiAddress("/ip4/127.0.0.1/tcp/" + TestPorts.getPort())), Optional.of(http1)); + node1.start(); + + HttpProtocol.HttpRequestProcessor http2 = (s, req, h) -> HttpProtocol.proxyRequest(req, new InetSocketAddress("localhost", 7778), h); + EmbeddedIpfs node2 = build(node1.node.listenAddresses() + .stream() + .map(a -> new MultiAddress(a.toString())) + .collect(Collectors.toList()), List.of(new MultiAddress("/ip4/127.0.0.1/tcp/" + TestPorts.getPort())), Optional.of(http2)); + node2.start(); + + for (int i = 0; i < 1000; i++) { + ByteBuf largeBody = Unpooled.buffer(2 * 1024 * 1024); + DefaultFullHttpRequest req = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/hey", largeBody); + HttpProtocol.HttpController http = node2.p2pHttp.get().dial(node2.node, node1.node.getPeerId(), node1.node.listenAddresses().toArray(Multiaddr[]::new)) + .getController().join(); + FullHttpResponse resp = http.send(req).join(); + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + int contentLength = resp.headers().getInt("content-length"); + resp.content().readBytes(bout, contentLength); + byte[] body = bout.toByteArray(); + Assert.assertTrue("Correct response", Arrays.equals(body, replyBytes)); + resp.release(); + resp.release(); + } + + node1.stop(); + node2.stop(); + } + @Test public void mdnsDiscovery() throws Exception { EmbeddedIpfs node1 = build(Collections.emptyList(), List.of(new MultiAddress("/ip4/127.0.0.1/tcp/" + TestPorts.getPort()))); @@ -138,12 +196,16 @@ public void wildcardListenerAddressesGetExpanded() { } public static EmbeddedIpfs build(List bootstrap, List swarmAddresses) { + return build(bootstrap, swarmAddresses, Optional.empty()); + } + + public static EmbeddedIpfs build(List bootstrap, List swarmAddresses, Optional http) { BlockRequestAuthoriser blockRequestAuthoriser = (c, p, a) -> CompletableFuture.completedFuture(true); HostBuilder builder = new HostBuilder().generateIdentity(); PrivKey privKey = builder.getPrivateKey(); PeerId peerId = builder.getPeerId(); IdentitySection id = new IdentitySection(privKey.bytes(), peerId); return EmbeddedIpfs.build(new RamRecordStore(), new RamBlockstore(), true, swarmAddresses, bootstrap, - id, blockRequestAuthoriser, Optional.empty()); + id, blockRequestAuthoriser, http); } }