diff --git a/src/main/java/org/peergos/PeerAddresses.java b/src/main/java/org/peergos/PeerAddresses.java index f2afb37..c2efe8c 100644 --- a/src/main/java/org/peergos/PeerAddresses.java +++ b/src/main/java/org/peergos/PeerAddresses.java @@ -15,6 +15,7 @@ import java.util.*; import java.util.function.*; import java.util.stream.*; +import java.util.stream.Stream; public class PeerAddresses { public final Multihash peerId; @@ -57,7 +58,13 @@ public static PeerAddresses fromProtobuf(Dht.Message.Peer peer) { Multihash peerId = Multihash.deserialize(peer.getId().toByteArray()); List addrs = peer.getAddrsList() .stream() - .map(b -> Multiaddr.deserialize(b.toByteArray())) + .flatMap(b -> { + try { + return Stream.of(Multiaddr.deserialize(b.toByteArray())); + } catch (Exception e) { + return Stream.empty(); + } + }) .collect(Collectors.toList()); return new PeerAddresses(peerId, addrs); } diff --git a/src/main/java/org/peergos/protocol/dht/Kademlia.java b/src/main/java/org/peergos/protocol/dht/Kademlia.java index a4f526e..7d474de 100644 --- a/src/main/java/org/peergos/protocol/dht/Kademlia.java +++ b/src/main/java/org/peergos/protocol/dht/Kademlia.java @@ -131,12 +131,24 @@ public void bootstrap(Host us) { List allConns = us.getNetwork().getConnections(); Set activeConns = us.getStreams().stream().map(s -> s.getConnection()).collect(Collectors.toSet()); List toClose = allConns.stream().filter(c -> !activeConns.contains(c)).collect(Collectors.toList()); - LOG.info("Closing " + toClose.size() + " / " + allConns.size() + " connections..."); for (Connection conn : toClose) { conn.close(); } } + private static CompletableFuture closeAfter(CompletableFuture sf, Supplier> query) { + CompletableFuture res = new CompletableFuture<>(); + query.get().thenAccept(v -> { + sf.thenAccept(s -> s.close()); + res.complete(v); + }).exceptionally(t -> { + sf.thenAccept(s -> s.close()); + res.completeExceptionally(t); + return null; + }); + return res; + } + static class RoutingEntry { public final Id key; public final PeerAddresses addresses; @@ -287,11 +299,10 @@ public CompletableFuture> findProviders(Multihash block, Hos } private CompletableFuture> getCloserPeers(byte[] key, PeerAddresses target, Host us) { - StreamPromise conn = null; try { - conn = dialPeer(target, us); + StreamPromise conn = dialPeer(target, us); KademliaController contr = conn.getController().orTimeout(2, TimeUnit.SECONDS).join(); - return contr.closerPeers(key); + return closeAfter(conn.getStream(), () -> contr.closerPeers(key)); } catch (Exception e) { // we can't dial quic only nodes until it's implemented if (target.addresses.stream().allMatch(a -> a.toString().contains("quic"))) @@ -303,9 +314,6 @@ private CompletableFuture> getCloserPeers(byte[] key, PeerAd else if (e.getCause() instanceof ConnectionClosedException) {} else e.printStackTrace(); - } finally { - if (conn != null) - conn.getStream().thenApply(s -> s.close()); } return CompletableFuture.completedFuture(Collections.emptyList()); } @@ -365,15 +373,11 @@ private CompletableFuture putValue(Multihash publisher, byte[] signedRecord, PeerAddresses peer, Host us) { - StreamPromise conn = null; try { - conn = dialPeer(peer, us); - return conn.getController().join() - .putValue(publisher, signedRecord); - } catch (Exception e) {} finally { - if (conn != null) - conn.getStream().thenApply(s -> s.close()); - } + StreamPromise conn = dialPeer(peer, us); + return closeAfter(conn.getStream(), () -> conn.getController().join() + .putValue(publisher, signedRecord)); + } catch (Exception e) {} return CompletableFuture.completedFuture(false); }