From 8fb1d0527f920ec06f2f7b9d737eb29810fbace6 Mon Sep 17 00:00:00 2001 From: ian Date: Mon, 7 Oct 2024 09:46:09 +0100 Subject: [PATCH 1/2] Fix p2p http memory leak Add failing test for it --- .../peergos/protocol/http/HttpProtocol.java | 21 +++++-- .../java/org/peergos/EmbeddedIpfsTest.java | 63 ++++++++++++++++++- 2 files changed, 78 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/peergos/protocol/http/HttpProtocol.java b/src/main/java/org/peergos/protocol/http/HttpProtocol.java index 217c5f43..9d76affe 100644 --- a/src/main/java/org/peergos/protocol/http/HttpProtocol.java +++ b/src/main/java/org/peergos/protocol/http/HttpProtocol.java @@ -46,9 +46,11 @@ public Sender(Stream stream) { @Override public void onMessage(@NotNull Stream stream, FullHttpResponse msg) { CompletableFuture req = queue.poll(); - if (req != null) - req.complete(msg.copy()); - msg.release(); + if (req != null) { + req.complete(msg.retain()); + req.thenAccept(x -> msg.release()); + } else + msg.release(); stream.close(); } @@ -86,7 +88,7 @@ public Receiver(HttpRequestProcessor requestHandler) { } private void sendReply(HttpContent reply, Stream p2pstream) { - p2pstream.writeAndFlush(reply.copy()); + p2pstream.writeAndFlush(reply.retain()); } @Override @@ -120,7 +122,16 @@ protected void initChannel(Channel ch) throws Exception { Channel ch = fut.channel(); FullHttpRequest retained = msg.retain(); - fut.addListener(x -> ch.writeAndFlush(retained)); + fut.addListener(x -> { + if (x.isSuccess()) + ch.writeAndFlush(retained).addListener(f -> { + if (!f.isSuccess()) { + retained.release(); + } + }); + else + retained.release(); + }); } private static final long TRAFFIC_LIMIT = Long.MAX_VALUE; // This is the total inbound or outbound traffic allowed, not a rate diff --git a/src/test/java/org/peergos/EmbeddedIpfsTest.java b/src/test/java/org/peergos/EmbeddedIpfsTest.java index dc15bfc4..54549808 100644 --- a/src/test/java/org/peergos/EmbeddedIpfsTest.java +++ b/src/test/java/org/peergos/EmbeddedIpfsTest.java @@ -1,5 +1,6 @@ package org.peergos; +import com.sun.net.httpserver.HttpServer; import identify.pb.*; import io.ipfs.cid.*; import io.ipfs.multiaddr.*; @@ -9,12 +10,22 @@ import io.libp2p.core.multiformats.*; import io.libp2p.crypto.keys.*; import io.libp2p.protocol.*; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpVersion; import org.junit.*; import org.peergos.blockstore.*; import org.peergos.config.*; import org.peergos.protocol.dht.*; +import org.peergos.protocol.http.HttpProtocol; import org.peergos.protocol.ipns.*; +import java.io.ByteArrayOutputStream; +import java.io.OutputStream; +import java.net.InetSocketAddress; import java.time.*; import java.util.*; import java.util.concurrent.*; @@ -43,6 +54,52 @@ public void largeBlock() throws Exception { node2.stop(); } + @Test + public void largeWrite() throws Exception { + System.setProperty("io.netty.leakDetection.level", "advanced"); + // Start proxy target + InetSocketAddress proxyTarget = new InetSocketAddress("localhost", 7777); + HttpServer target = HttpServer.create(proxyTarget, 20); + String reply = "AllGood"; + byte[] replyBytes = reply.getBytes(); + target.createContext("/", ex -> { + ex.sendResponseHeaders(200, replyBytes.length); + OutputStream out = ex.getResponseBody(); + out.write(replyBytes); + out.flush(); + out.close(); + }); + target.start(); + + HttpProtocol.HttpRequestProcessor http1 = (s, req, h) -> HttpProtocol.proxyRequest(req, proxyTarget, h); + EmbeddedIpfs node1 = build(Collections.emptyList(), List.of(new MultiAddress("/ip4/127.0.0.1/tcp/" + TestPorts.getPort())), Optional.of(http1)); + node1.start(); + + HttpProtocol.HttpRequestProcessor http2 = (s, req, h) -> HttpProtocol.proxyRequest(req, new InetSocketAddress("localhost", 7778), h); + EmbeddedIpfs node2 = build(node1.node.listenAddresses() + .stream() + .map(a -> new MultiAddress(a.toString())) + .collect(Collectors.toList()), List.of(new MultiAddress("/ip4/127.0.0.1/tcp/" + TestPorts.getPort())), Optional.of(http2)); + node2.start(); + + for (int i = 0; i < 100; i++) { + ByteBuf largeBody = Unpooled.buffer(2 * 1024 * 1024); + DefaultFullHttpRequest req = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/hey", largeBody); + HttpProtocol.HttpController http = node2.p2pHttp.get().dial(node2.node, node1.node.getPeerId(), node1.node.listenAddresses().toArray(Multiaddr[]::new)) + .getController().join(); + FullHttpResponse resp = http.send(req).join(); + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + int contentLength = resp.headers().getInt("content-length"); + resp.content().readBytes(bout, contentLength); + byte[] body = bout.toByteArray(); + Assert.assertTrue("Correct response", Arrays.equals(body, replyBytes)); + resp.release(); + } + + node1.stop(); + node2.stop(); + } + @Test public void mdnsDiscovery() throws Exception { EmbeddedIpfs node1 = build(Collections.emptyList(), List.of(new MultiAddress("/ip4/127.0.0.1/tcp/" + TestPorts.getPort()))); @@ -138,12 +195,16 @@ public void wildcardListenerAddressesGetExpanded() { } public static EmbeddedIpfs build(List bootstrap, List swarmAddresses) { + return build(bootstrap, swarmAddresses, Optional.empty()); + } + + public static EmbeddedIpfs build(List bootstrap, List swarmAddresses, Optional http) { BlockRequestAuthoriser blockRequestAuthoriser = (c, p, a) -> CompletableFuture.completedFuture(true); HostBuilder builder = new HostBuilder().generateIdentity(); PrivKey privKey = builder.getPrivateKey(); PeerId peerId = builder.getPeerId(); IdentitySection id = new IdentitySection(privKey.bytes(), peerId); return EmbeddedIpfs.build(new RamRecordStore(), new RamBlockstore(), true, swarmAddresses, bootstrap, - id, blockRequestAuthoriser, Optional.empty()); + id, blockRequestAuthoriser, http); } } From 0290ddd7d05594e7114a6d210e42a3bd920e1a5d Mon Sep 17 00:00:00 2001 From: ian Date: Mon, 7 Oct 2024 15:55:23 +0100 Subject: [PATCH 2/2] More p2p http ref count fixes --- .../java/org/peergos/protocol/http/HttpProtocol.java | 11 +++-------- src/test/java/org/peergos/EmbeddedIpfsTest.java | 3 ++- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/peergos/protocol/http/HttpProtocol.java b/src/main/java/org/peergos/protocol/http/HttpProtocol.java index 9d76affe..2d3fd031 100644 --- a/src/main/java/org/peergos/protocol/http/HttpProtocol.java +++ b/src/main/java/org/peergos/protocol/http/HttpProtocol.java @@ -1,8 +1,5 @@ package org.peergos.protocol.http; -import io.ipfs.cid.*; -import io.ipfs.multibase.*; -import io.ipfs.multihash.*; import io.libp2p.core.*; import io.libp2p.core.multistream.*; import io.libp2p.protocol.*; @@ -48,9 +45,9 @@ public void onMessage(@NotNull Stream stream, FullHttpResponse msg) { CompletableFuture req = queue.poll(); if (req != null) { req.complete(msg.retain()); - req.thenAccept(x -> msg.release()); - } else + } else { msg.release(); + } stream.close(); } @@ -125,9 +122,7 @@ protected void initChannel(Channel ch) throws Exception { fut.addListener(x -> { if (x.isSuccess()) ch.writeAndFlush(retained).addListener(f -> { - if (!f.isSuccess()) { - retained.release(); - } + retained.release(); }); else retained.release(); diff --git a/src/test/java/org/peergos/EmbeddedIpfsTest.java b/src/test/java/org/peergos/EmbeddedIpfsTest.java index 54549808..f06a3165 100644 --- a/src/test/java/org/peergos/EmbeddedIpfsTest.java +++ b/src/test/java/org/peergos/EmbeddedIpfsTest.java @@ -82,7 +82,7 @@ public void largeWrite() throws Exception { .collect(Collectors.toList()), List.of(new MultiAddress("/ip4/127.0.0.1/tcp/" + TestPorts.getPort())), Optional.of(http2)); node2.start(); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 1000; i++) { ByteBuf largeBody = Unpooled.buffer(2 * 1024 * 1024); DefaultFullHttpRequest req = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/hey", largeBody); HttpProtocol.HttpController http = node2.p2pHttp.get().dial(node2.node, node1.node.getPeerId(), node1.node.listenAddresses().toArray(Multiaddr[]::new)) @@ -94,6 +94,7 @@ public void largeWrite() throws Exception { byte[] body = bout.toByteArray(); Assert.assertTrue("Correct response", Arrays.equals(body, replyBytes)); resp.release(); + resp.release(); } node1.stop();