From 1652693541b7d6bc25e8978dd079c3942081f91a Mon Sep 17 00:00:00 2001 From: ian Date: Mon, 30 Sep 2024 11:02:26 +0100 Subject: [PATCH] Only close kademlia streams This is to not interfere with other protocols. Then we can close connections with no open streams. --- .../org/peergos/protocol/dht/Kademlia.java | 22 +++++++++++++------ .../peergos/protocol/dht/KademliaEngine.java | 2 +- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/peergos/protocol/dht/Kademlia.java b/src/main/java/org/peergos/protocol/dht/Kademlia.java index 5dac139..a4f526e 100644 --- a/src/main/java/org/peergos/protocol/dht/Kademlia.java +++ b/src/main/java/org/peergos/protocol/dht/Kademlia.java @@ -100,7 +100,7 @@ private boolean connectTo(Host us, PeerAddresses peer) { try { conn.getController().join().id().join(); } finally { - conn.getStream().thenApply(s -> s.getConnection().close()); + conn.getStream().thenApply(s -> s.close()); } return true; } catch (Exception e) { @@ -127,6 +127,14 @@ public void bootstrap(Host us) { connected.add(peer.peerId); } LOG.info("Bootstrap connected to " + connected.size() + " nodes close to us. " + connected.stream().map(Multihash::toString).sorted().limit(5).collect(Collectors.toList())); + + List allConns = us.getNetwork().getConnections(); + Set activeConns = us.getStreams().stream().map(s -> s.getConnection()).collect(Collectors.toSet()); + List toClose = allConns.stream().filter(c -> !activeConns.contains(c)).collect(Collectors.toList()); + LOG.info("Closing " + toClose.size() + " / " + allConns.size() + " connections..."); + for (Connection conn : toClose) { + conn.close(); + } } static class RoutingEntry { @@ -248,7 +256,7 @@ public CompletableFuture> findProviders(Multihash block, Hos return null; } finally { if (conn != null) - conn.getStream().thenApply(s -> s.getConnection().close()); + conn.getStream().thenApply(s -> s.close()); } }).filter(prov -> prov != null) .collect(Collectors.toList()); @@ -297,7 +305,7 @@ else if (e.getCause() instanceof ConnectionClosedException) {} e.printStackTrace(); } finally { if (conn != null) - conn.getStream().thenApply(s -> s.getConnection().close()); + conn.getStream().thenApply(s -> s.close()); } return CompletableFuture.completedFuture(Collections.emptyList()); } @@ -325,14 +333,14 @@ public CompletableFuture provideBlock(Multihash block, Host us, PeerAddres return conn.getController() .thenCompose(contr -> contr.provide(block, ourAddrs)) .thenApply(res -> { - conn.getStream().thenApply(s -> s.getConnection().close()); + conn.getStream().thenApply(s -> s.close()); return res; }) .exceptionally(t -> { if (t.getCause() instanceof NonCompleteException) return true; LOG.log(Level.FINE, t, t::getMessage); - conn.getStream().thenApply(s -> s.getConnection().close()); + conn.getStream().thenApply(s -> s.close()); return true; }); }) @@ -364,7 +372,7 @@ private CompletableFuture putValue(Multihash publisher, .putValue(publisher, signedRecord); } catch (Exception e) {} finally { if (conn != null) - conn.getStream().thenApply(s -> s.getConnection().close()); + conn.getStream().thenApply(s -> s.close()); } return CompletableFuture.completedFuture(false); } @@ -489,7 +497,7 @@ private CompletableFuture> getValueFromPeer(PeerAddresses pe return CompletableFuture.completedFuture(Optional.empty()); } finally { if (conn != null) - conn.getStream().thenApply(s -> s.getConnection().close()); + conn.getStream().thenApply(s -> s.close()); } } public List resolveValue(Multihash publisher, int minResults, Host us) { diff --git a/src/main/java/org/peergos/protocol/dht/KademliaEngine.java b/src/main/java/org/peergos/protocol/dht/KademliaEngine.java index a82c26f..6eb164e 100644 --- a/src/main/java/org/peergos/protocol/dht/KademliaEngine.java +++ b/src/main/java/org/peergos/protocol/dht/KademliaEngine.java @@ -198,7 +198,7 @@ public void receiveRequest(Dht.Message msg, PeerId source, Stream stream) { case PING: {break;} // Not used any more default: throw new IllegalStateException("Unknown message kademlia type: " + msg.getType()); } - stream.getConnection().close(); + stream.close(); } public static boolean isPublic(Multiaddr addr) {