diff --git a/src/main/java/org/peergos/EmbeddedIpfs.java b/src/main/java/org/peergos/EmbeddedIpfs.java index 2e146370..412fa740 100644 --- a/src/main/java/org/peergos/EmbeddedIpfs.java +++ b/src/main/java/org/peergos/EmbeddedIpfs.java @@ -131,7 +131,7 @@ public static EmbeddedIpfs build(Path ipfsPath, } Multihash ourPeerId = Multihash.deserialize(builder.getPeerId().getBytes()); - Kademlia dht = new Kademlia(new KademliaEngine(ourPeerId, providers, records), false); + Kademlia dht = new Kademlia(new KademliaEngine(ourPeerId, providers, records, blockstore), false); CircuitStopProtocol.Binding stop = new CircuitStopProtocol.Binding(); CircuitHopProtocol.RelayManager relayManager = CircuitHopProtocol.RelayManager.limitTo(builder.getPrivateKey(), ourPeerId, 5); Bitswap bitswap = new Bitswap(new BitswapEngine(blockstore, authoriser)); diff --git a/src/main/java/org/peergos/HostBuilder.java b/src/main/java/org/peergos/HostBuilder.java index 3ec2ca21..3cfae420 100644 --- a/src/main/java/org/peergos/HostBuilder.java +++ b/src/main/java/org/peergos/HostBuilder.java @@ -112,7 +112,7 @@ public static HostBuilder create(int listenPort, .generateIdentity() .listen(List.of(new MultiAddress("/ip4/0.0.0.0/tcp/" + listenPort))); Multihash ourPeerId = Multihash.deserialize(builder.peerId.getBytes()); - Kademlia dht = new Kademlia(new KademliaEngine(ourPeerId, providers, records), false); + Kademlia dht = new Kademlia(new KademliaEngine(ourPeerId, providers, records, blocks), false); CircuitStopProtocol.Binding stop = new CircuitStopProtocol.Binding(); CircuitHopProtocol.RelayManager relayManager = CircuitHopProtocol.RelayManager.limitTo(builder.privKey, ourPeerId, 5); return builder.addProtocols(List.of( diff --git a/src/main/java/org/peergos/blockstore/Blockstore.java b/src/main/java/org/peergos/blockstore/Blockstore.java index 5fb562cb..79b9c833 100644 --- a/src/main/java/org/peergos/blockstore/Blockstore.java +++ b/src/main/java/org/peergos/blockstore/Blockstore.java @@ -22,6 +22,8 @@ default Cid keyToHash(String key) { CompletableFuture has(Cid c); + CompletableFuture hasAny(Multihash h); + CompletableFuture> get(Cid c); CompletableFuture put(byte[] block, Cid.Codec codec); diff --git a/src/main/java/org/peergos/blockstore/FileBlockstore.java b/src/main/java/org/peergos/blockstore/FileBlockstore.java index 03849b7d..7d855aa4 100644 --- a/src/main/java/org/peergos/blockstore/FileBlockstore.java +++ b/src/main/java/org/peergos/blockstore/FileBlockstore.java @@ -3,6 +3,7 @@ import io.ipfs.cid.Cid; import io.ipfs.multihash.Multihash; import org.peergos.Hash; +import org.peergos.util.*; import java.io.*; import java.nio.file.Files; @@ -60,6 +61,12 @@ public CompletableFuture has(Cid cid) { return CompletableFuture.completedFuture(file.exists()); } + @Override + public CompletableFuture hasAny(Multihash h) { + return Futures.of(Stream.of(Cid.Codec.DagCbor, Cid.Codec.Raw, Cid.Codec.DagProtobuf) + .anyMatch(c -> has(new Cid(1, c, h.getType(), h.getHash())).join())); + } + @Override public CompletableFuture> get(Cid cid) { try { diff --git a/src/main/java/org/peergos/blockstore/FilteredBlockstore.java b/src/main/java/org/peergos/blockstore/FilteredBlockstore.java index 9ff5ad5b..330af81a 100644 --- a/src/main/java/org/peergos/blockstore/FilteredBlockstore.java +++ b/src/main/java/org/peergos/blockstore/FilteredBlockstore.java @@ -1,10 +1,13 @@ package org.peergos.blockstore; import io.ipfs.cid.Cid; +import io.ipfs.multihash.*; +import org.peergos.util.*; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.stream.*; public class FilteredBlockstore implements Blockstore { @@ -28,6 +31,12 @@ public CompletableFuture has(Cid c) { return CompletableFuture.completedFuture(false); } + @Override + public CompletableFuture hasAny(Multihash h) { + return Futures.of(Stream.of(Cid.Codec.DagCbor, Cid.Codec.Raw, Cid.Codec.DagProtobuf) + .anyMatch(c -> has(new Cid(1, c, h.getType(), h.getHash())).join())); + } + @Override public CompletableFuture> get(Cid c) { if (filter.has(c)) diff --git a/src/main/java/org/peergos/blockstore/ProvidingBlockstore.java b/src/main/java/org/peergos/blockstore/ProvidingBlockstore.java index 296e6788..e3d4251c 100644 --- a/src/main/java/org/peergos/blockstore/ProvidingBlockstore.java +++ b/src/main/java/org/peergos/blockstore/ProvidingBlockstore.java @@ -1,6 +1,7 @@ package org.peergos.blockstore; import io.ipfs.cid.*; +import io.ipfs.multihash.*; import java.util.*; import java.util.concurrent.*; @@ -19,6 +20,11 @@ public CompletableFuture has(Cid c) { return target.has(c); } + @Override + public CompletableFuture hasAny(Multihash h) { + return target.hasAny(h); + } + @Override public CompletableFuture> get(Cid c) { return target.get(c); diff --git a/src/main/java/org/peergos/blockstore/RamBlockstore.java b/src/main/java/org/peergos/blockstore/RamBlockstore.java index 77e3a4ee..a3784d1b 100644 --- a/src/main/java/org/peergos/blockstore/RamBlockstore.java +++ b/src/main/java/org/peergos/blockstore/RamBlockstore.java @@ -3,9 +3,11 @@ import io.ipfs.cid.*; import io.ipfs.multihash.*; import org.peergos.*; +import org.peergos.util.*; import java.util.*; import java.util.concurrent.*; +import java.util.stream.*; public class RamBlockstore implements Blockstore { @@ -16,6 +18,12 @@ public CompletableFuture has(Cid c) { return CompletableFuture.completedFuture(blocks.containsKey(c)); } + @Override + public CompletableFuture hasAny(Multihash h) { + return Futures.of(Stream.of(Cid.Codec.DagCbor, Cid.Codec.Raw, Cid.Codec.DagProtobuf) + .anyMatch(c -> has(new Cid(1, c, h.getType(), h.getHash())).join())); + } + @Override public CompletableFuture> get(Cid c) { return CompletableFuture.completedFuture(Optional.ofNullable(blocks.get(c))); diff --git a/src/main/java/org/peergos/blockstore/TypeLimitedBlockstore.java b/src/main/java/org/peergos/blockstore/TypeLimitedBlockstore.java index d84bf29f..ef426e8e 100644 --- a/src/main/java/org/peergos/blockstore/TypeLimitedBlockstore.java +++ b/src/main/java/org/peergos/blockstore/TypeLimitedBlockstore.java @@ -1,6 +1,8 @@ package org.peergos.blockstore; import io.ipfs.cid.Cid; +import io.ipfs.multihash.*; +import org.peergos.util.*; import java.util.List; import java.util.Optional; @@ -24,6 +26,15 @@ public CompletableFuture bloomAdd(Cid cid) { throw new IllegalArgumentException("Unsupported codec: " + cid.codec); } + @Override + public CompletableFuture hasAny(Multihash h) { + for (Cid.Codec codec : allowedCodecs) { + if (has(new Cid(1, codec, h.getType(), h.getHash())).join()) + return Futures.of(true); + } + return Futures.of(false); + } + @Override public CompletableFuture has(Cid cid) { if (allowedCodecs.contains(cid.codec)) { diff --git a/src/main/java/org/peergos/blockstore/s3/S3Blockstore.java b/src/main/java/org/peergos/blockstore/s3/S3Blockstore.java index 57fbeabf..df2c19a7 100644 --- a/src/main/java/org/peergos/blockstore/s3/S3Blockstore.java +++ b/src/main/java/org/peergos/blockstore/s3/S3Blockstore.java @@ -20,6 +20,7 @@ import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.*; public class S3Blockstore implements Blockstore { @@ -164,6 +165,12 @@ public CompletableFuture has(Cid cid) { return getWithBackoff(() -> getSizeWithoutRetry(cid)).thenApply(optSize -> optSize.isPresent()); } + @Override + public CompletableFuture hasAny(Multihash h) { + return Futures.of(Stream.of(Cid.Codec.DagCbor, Cid.Codec.Raw, Cid.Codec.DagProtobuf) + .anyMatch(c -> has(new Cid(1, c, h.getType(), h.getHash())).join())); + } + private CompletableFuture> getSizeWithoutRetry(Cid cid) { try { PresignedUrl headUrl = S3Request.preSignHead(folder + hashToKey(cid), Optional.of(60), diff --git a/src/main/java/org/peergos/protocol/dht/KademliaEngine.java b/src/main/java/org/peergos/protocol/dht/KademliaEngine.java index 47127c35..dd72f3f3 100644 --- a/src/main/java/org/peergos/protocol/dht/KademliaEngine.java +++ b/src/main/java/org/peergos/protocol/dht/KademliaEngine.java @@ -9,6 +9,7 @@ import io.libp2p.core.Stream; import io.libp2p.core.multiformats.*; import org.peergos.*; +import org.peergos.blockstore.*; import org.peergos.protocol.dht.pb.*; import org.peergos.protocol.ipns.*; @@ -22,11 +23,15 @@ public class KademliaEngine { private final RecordStore ipnsStore; public final Router router; private AddressBook addressBook; + private final Multihash ourPeerId; + private final Blockstore blocks; - public KademliaEngine(Multihash ourPeerId, ProviderStore providersStore, RecordStore ipnsStore) { + public KademliaEngine(Multihash ourPeerId, ProviderStore providersStore, RecordStore ipnsStore, Blockstore blocks) { this.providersStore = providersStore; this.ipnsStore = ipnsStore; + this.ourPeerId = ourPeerId; this.router = new Router(Id.create(ourPeerId.bareMultihash().toBytes(), 256), 2, 2, 2); + this.blocks = blocks; } public void setAddressBook(AddressBook addrs) { @@ -104,6 +109,12 @@ public void receiveRequest(Dht.Message msg, PeerId source, Stream stream) { case GET_PROVIDERS: { Multihash hash = Multihash.deserialize(msg.getKey().toByteArray()); Set providers = providersStore.getProviders(hash); + if (blocks.hasAny(hash).join()) + providers.add(new PeerAddresses(ourPeerId, addressBook.getAddrs(PeerId.fromBase58(ourPeerId.toBase58())) + .join() + .stream() + .map(a -> new MultiAddress(a.toString())) + .collect(Collectors.toList()))); Dht.Message.Builder builder = msg.toBuilder(); builder = builder.addAllProviderPeers(providers.stream() .map(PeerAddresses::toProtobuf)