Skip to content

Commit

Permalink
Only close kademlia streams
Browse files Browse the repository at this point in the history
This is to not interfere with other protocols.
Then we can close connections with no open streams.
  • Loading branch information
ianopolous committed Sep 30, 2024
1 parent 89f4ac5 commit 1652693
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 8 deletions.
22 changes: 15 additions & 7 deletions src/main/java/org/peergos/protocol/dht/Kademlia.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private boolean connectTo(Host us, PeerAddresses peer) {
try {
conn.getController().join().id().join();
} finally {
conn.getStream().thenApply(s -> s.getConnection().close());
conn.getStream().thenApply(s -> s.close());
}
return true;
} catch (Exception e) {
Expand All @@ -127,6 +127,14 @@ public void bootstrap(Host us) {
connected.add(peer.peerId);
}
LOG.info("Bootstrap connected to " + connected.size() + " nodes close to us. " + connected.stream().map(Multihash::toString).sorted().limit(5).collect(Collectors.toList()));

List<Connection> allConns = us.getNetwork().getConnections();
Set<Connection> activeConns = us.getStreams().stream().map(s -> s.getConnection()).collect(Collectors.toSet());
List<Connection> toClose = allConns.stream().filter(c -> !activeConns.contains(c)).collect(Collectors.toList());
LOG.info("Closing " + toClose.size() + " / " + allConns.size() + " connections...");
for (Connection conn : toClose) {
conn.close();
}
}

static class RoutingEntry {
Expand Down Expand Up @@ -248,7 +256,7 @@ public CompletableFuture<List<PeerAddresses>> findProviders(Multihash block, Hos
return null;
} finally {
if (conn != null)
conn.getStream().thenApply(s -> s.getConnection().close());
conn.getStream().thenApply(s -> s.close());
}
}).filter(prov -> prov != null)
.collect(Collectors.toList());
Expand Down Expand Up @@ -297,7 +305,7 @@ else if (e.getCause() instanceof ConnectionClosedException) {}
e.printStackTrace();
} finally {
if (conn != null)
conn.getStream().thenApply(s -> s.getConnection().close());
conn.getStream().thenApply(s -> s.close());
}
return CompletableFuture.completedFuture(Collections.emptyList());
}
Expand Down Expand Up @@ -325,14 +333,14 @@ public CompletableFuture<Void> provideBlock(Multihash block, Host us, PeerAddres
return conn.getController()
.thenCompose(contr -> contr.provide(block, ourAddrs))
.thenApply(res -> {
conn.getStream().thenApply(s -> s.getConnection().close());
conn.getStream().thenApply(s -> s.close());
return res;
})
.exceptionally(t -> {
if (t.getCause() instanceof NonCompleteException)
return true;
LOG.log(Level.FINE, t, t::getMessage);
conn.getStream().thenApply(s -> s.getConnection().close());
conn.getStream().thenApply(s -> s.close());
return true;
});
})
Expand Down Expand Up @@ -364,7 +372,7 @@ private CompletableFuture<Boolean> putValue(Multihash publisher,
.putValue(publisher, signedRecord);
} catch (Exception e) {} finally {
if (conn != null)
conn.getStream().thenApply(s -> s.getConnection().close());
conn.getStream().thenApply(s -> s.close());
}
return CompletableFuture.completedFuture(false);
}
Expand Down Expand Up @@ -489,7 +497,7 @@ private CompletableFuture<Optional<GetResult>> getValueFromPeer(PeerAddresses pe
return CompletableFuture.completedFuture(Optional.empty());
} finally {
if (conn != null)
conn.getStream().thenApply(s -> s.getConnection().close());
conn.getStream().thenApply(s -> s.close());
}
}
public List<IpnsRecord> resolveValue(Multihash publisher, int minResults, Host us) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/peergos/protocol/dht/KademliaEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public void receiveRequest(Dht.Message msg, PeerId source, Stream stream) {
case PING: {break;} // Not used any more
default: throw new IllegalStateException("Unknown message kademlia type: " + msg.getType());
}
stream.getConnection().close();
stream.close();
}

public static boolean isPublic(Multiaddr addr) {
Expand Down

0 comments on commit 1652693

Please sign in to comment.