Skip to content

Commit

Permalink
Close entire connection,not just stream in kademlia.
Browse files Browse the repository at this point in the history
  • Loading branch information
ianopolous committed Sep 30, 2024
1 parent 07e220c commit 7c67445
Showing 1 changed file with 13 additions and 7 deletions.
20 changes: 13 additions & 7 deletions src/main/java/org/peergos/protocol/dht/Kademlia.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,13 @@ public void stopBootstrapThread() {

private boolean connectTo(Host us, PeerAddresses peer) {
try {
new Identify().dial(us, PeerId.fromBase58(peer.peerId.toBase58()), getPublic(peer)).getController().join().id().join();
PeerId ourPeerId = PeerId.fromBase58(peer.peerId.toBase58());
StreamPromise<? extends IdentifyController> conn = new Identify().dial(us, ourPeerId, getPublic(peer));
try {
conn.getController().join().id().join();
} finally {
conn.getStream().thenApply(s -> s.getConnection().close());
}
return true;
} catch (Exception e) {
if (e.getCause() instanceof NothingToCompleteException || e.getCause() instanceof NonCompleteException)
Expand Down Expand Up @@ -242,7 +248,7 @@ public CompletableFuture<List<PeerAddresses>> findProviders(Multihash block, Hos
return null;
} finally {
if (conn != null)
conn.getStream().thenApply(s -> s.close());
conn.getStream().thenApply(s -> s.getConnection().close());
}
}).filter(prov -> prov != null)
.collect(Collectors.toList());
Expand Down Expand Up @@ -291,7 +297,7 @@ else if (e.getCause() instanceof ConnectionClosedException) {}
e.printStackTrace();
} finally {
if (conn != null)
conn.getStream().thenApply(s -> s.close());
conn.getStream().thenApply(s -> s.getConnection().close());
}
return CompletableFuture.completedFuture(Collections.emptyList());
}
Expand Down Expand Up @@ -319,14 +325,14 @@ public CompletableFuture<Void> provideBlock(Multihash block, Host us, PeerAddres
return conn.getController()
.thenCompose(contr -> contr.provide(block, ourAddrs))
.thenApply(res -> {
conn.getStream().thenApply(s -> s.close());
conn.getStream().thenApply(s -> s.getConnection().close());
return res;
})
.exceptionally(t -> {
if (t.getCause() instanceof NonCompleteException)
return true;
LOG.log(Level.FINE, t, t::getMessage);
conn.getStream().thenApply(s -> s.close());
conn.getStream().thenApply(s -> s.getConnection().close());
return true;
});
})
Expand Down Expand Up @@ -358,7 +364,7 @@ private CompletableFuture<Boolean> putValue(Multihash publisher,
.putValue(publisher, signedRecord);
} catch (Exception e) {} finally {
if (conn != null)
conn.getStream().thenApply(s -> s.close());
conn.getStream().thenApply(s -> s.getConnection().close());
}
return CompletableFuture.completedFuture(false);
}
Expand Down Expand Up @@ -483,7 +489,7 @@ private CompletableFuture<Optional<GetResult>> getValueFromPeer(PeerAddresses pe
return CompletableFuture.completedFuture(Optional.empty());
} finally {
if (conn != null)
conn.getStream().thenApply(s -> s.close());
conn.getStream().thenApply(s -> s.getConnection().close());
}
}
public List<IpnsRecord> resolveValue(Multihash publisher, int minResults, Host us) {
Expand Down

0 comments on commit 7c67445

Please sign in to comment.