Skip to content

Commit

Permalink
Add Blockstore.hasAny(raw multihash)
Browse files Browse the repository at this point in the history
this is needed by the dht to answer if we are providing a block
  • Loading branch information
ianopolous committed Jul 11, 2023
1 parent 4ed0f57 commit e457e3d
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 3 deletions.
2 changes: 1 addition & 1 deletion src/main/java/org/peergos/EmbeddedIpfs.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public static EmbeddedIpfs build(Path ipfsPath,
}
Multihash ourPeerId = Multihash.deserialize(builder.getPeerId().getBytes());

Kademlia dht = new Kademlia(new KademliaEngine(ourPeerId, providers, records), false);
Kademlia dht = new Kademlia(new KademliaEngine(ourPeerId, providers, records, blockstore), false);
CircuitStopProtocol.Binding stop = new CircuitStopProtocol.Binding();
CircuitHopProtocol.RelayManager relayManager = CircuitHopProtocol.RelayManager.limitTo(builder.getPrivateKey(), ourPeerId, 5);
Bitswap bitswap = new Bitswap(new BitswapEngine(blockstore, authoriser));
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/peergos/HostBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public static HostBuilder create(int listenPort,
.generateIdentity()
.listen(List.of(new MultiAddress("/ip4/0.0.0.0/tcp/" + listenPort)));
Multihash ourPeerId = Multihash.deserialize(builder.peerId.getBytes());
Kademlia dht = new Kademlia(new KademliaEngine(ourPeerId, providers, records), false);
Kademlia dht = new Kademlia(new KademliaEngine(ourPeerId, providers, records, blocks), false);
CircuitStopProtocol.Binding stop = new CircuitStopProtocol.Binding();
CircuitHopProtocol.RelayManager relayManager = CircuitHopProtocol.RelayManager.limitTo(builder.privKey, ourPeerId, 5);
return builder.addProtocols(List.of(
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/peergos/blockstore/Blockstore.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ default Cid keyToHash(String key) {

CompletableFuture<Boolean> has(Cid c);

CompletableFuture<Boolean> hasAny(Multihash h);

CompletableFuture<Optional<byte[]>> get(Cid c);

CompletableFuture<Cid> put(byte[] block, Cid.Codec codec);
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/org/peergos/blockstore/FileBlockstore.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.ipfs.cid.Cid;
import io.ipfs.multihash.Multihash;
import org.peergos.Hash;
import org.peergos.util.*;

import java.io.*;
import java.nio.file.Files;
Expand Down Expand Up @@ -60,6 +61,12 @@ public CompletableFuture<Boolean> has(Cid cid) {
return CompletableFuture.completedFuture(file.exists());
}

@Override
public CompletableFuture<Boolean> hasAny(Multihash h) {
return Futures.of(Stream.of(Cid.Codec.DagCbor, Cid.Codec.Raw, Cid.Codec.DagProtobuf)
.anyMatch(c -> has(new Cid(1, c, h.getType(), h.getHash())).join()));
}

@Override
public CompletableFuture<Optional<byte[]>> get(Cid cid) {
try {
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/org/peergos/blockstore/FilteredBlockstore.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package org.peergos.blockstore;

import io.ipfs.cid.Cid;
import io.ipfs.multihash.*;
import org.peergos.util.*;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.*;

public class FilteredBlockstore implements Blockstore {

Expand All @@ -28,6 +31,12 @@ public CompletableFuture<Boolean> has(Cid c) {
return CompletableFuture.completedFuture(false);
}

@Override
public CompletableFuture<Boolean> hasAny(Multihash h) {
return Futures.of(Stream.of(Cid.Codec.DagCbor, Cid.Codec.Raw, Cid.Codec.DagProtobuf)
.anyMatch(c -> has(new Cid(1, c, h.getType(), h.getHash())).join()));
}

@Override
public CompletableFuture<Optional<byte[]>> get(Cid c) {
if (filter.has(c))
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/org/peergos/blockstore/ProvidingBlockstore.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.peergos.blockstore;

import io.ipfs.cid.*;
import io.ipfs.multihash.*;

import java.util.*;
import java.util.concurrent.*;
Expand All @@ -19,6 +20,11 @@ public CompletableFuture<Boolean> has(Cid c) {
return target.has(c);
}

@Override
public CompletableFuture<Boolean> hasAny(Multihash h) {
return target.hasAny(h);
}

@Override
public CompletableFuture<Optional<byte[]>> get(Cid c) {
return target.get(c);
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/org/peergos/blockstore/RamBlockstore.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import io.ipfs.cid.*;
import io.ipfs.multihash.*;
import org.peergos.*;
import org.peergos.util.*;

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;

public class RamBlockstore implements Blockstore {

Expand All @@ -16,6 +18,12 @@ public CompletableFuture<Boolean> has(Cid c) {
return CompletableFuture.completedFuture(blocks.containsKey(c));
}

@Override
public CompletableFuture<Boolean> hasAny(Multihash h) {
return Futures.of(Stream.of(Cid.Codec.DagCbor, Cid.Codec.Raw, Cid.Codec.DagProtobuf)
.anyMatch(c -> has(new Cid(1, c, h.getType(), h.getHash())).join()));
}

@Override
public CompletableFuture<Optional<byte[]>> get(Cid c) {
return CompletableFuture.completedFuture(Optional.ofNullable(blocks.get(c)));
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/org/peergos/blockstore/TypeLimitedBlockstore.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.peergos.blockstore;

import io.ipfs.cid.Cid;
import io.ipfs.multihash.*;
import org.peergos.util.*;

import java.util.List;
import java.util.Optional;
Expand All @@ -24,6 +26,15 @@ public CompletableFuture<Boolean> bloomAdd(Cid cid) {
throw new IllegalArgumentException("Unsupported codec: " + cid.codec);
}

@Override
public CompletableFuture<Boolean> hasAny(Multihash h) {
for (Cid.Codec codec : allowedCodecs) {
if (has(new Cid(1, codec, h.getType(), h.getHash())).join())
return Futures.of(true);
}
return Futures.of(false);
}

@Override
public CompletableFuture<Boolean> has(Cid cid) {
if (allowedCodecs.contains(cid.codec)) {
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/org/peergos/blockstore/s3/S3Blockstore.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.*;

public class S3Blockstore implements Blockstore {

Expand Down Expand Up @@ -164,6 +165,12 @@ public CompletableFuture<Boolean> has(Cid cid) {
return getWithBackoff(() -> getSizeWithoutRetry(cid)).thenApply(optSize -> optSize.isPresent());
}

@Override
public CompletableFuture<Boolean> hasAny(Multihash h) {
return Futures.of(Stream.of(Cid.Codec.DagCbor, Cid.Codec.Raw, Cid.Codec.DagProtobuf)
.anyMatch(c -> has(new Cid(1, c, h.getType(), h.getHash())).join()));
}

private CompletableFuture<Optional<Integer>> getSizeWithoutRetry(Cid cid) {
try {
PresignedUrl headUrl = S3Request.preSignHead(folder + hashToKey(cid), Optional.of(60),
Expand Down
13 changes: 12 additions & 1 deletion src/main/java/org/peergos/protocol/dht/KademliaEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.libp2p.core.Stream;
import io.libp2p.core.multiformats.*;
import org.peergos.*;
import org.peergos.blockstore.*;
import org.peergos.protocol.dht.pb.*;
import org.peergos.protocol.ipns.*;

Expand All @@ -22,11 +23,15 @@ public class KademliaEngine {
private final RecordStore ipnsStore;
public final Router router;
private AddressBook addressBook;
private final Multihash ourPeerId;
private final Blockstore blocks;

public KademliaEngine(Multihash ourPeerId, ProviderStore providersStore, RecordStore ipnsStore) {
public KademliaEngine(Multihash ourPeerId, ProviderStore providersStore, RecordStore ipnsStore, Blockstore blocks) {
this.providersStore = providersStore;
this.ipnsStore = ipnsStore;
this.ourPeerId = ourPeerId;
this.router = new Router(Id.create(ourPeerId.bareMultihash().toBytes(), 256), 2, 2, 2);
this.blocks = blocks;
}

public void setAddressBook(AddressBook addrs) {
Expand Down Expand Up @@ -104,6 +109,12 @@ public void receiveRequest(Dht.Message msg, PeerId source, Stream stream) {
case GET_PROVIDERS: {
Multihash hash = Multihash.deserialize(msg.getKey().toByteArray());
Set<PeerAddresses> providers = providersStore.getProviders(hash);
if (blocks.hasAny(hash).join())
providers.add(new PeerAddresses(ourPeerId, addressBook.getAddrs(PeerId.fromBase58(ourPeerId.toBase58()))
.join()
.stream()
.map(a -> new MultiAddress(a.toString()))
.collect(Collectors.toList())));
Dht.Message.Builder builder = msg.toBuilder();
builder = builder.addAllProviderPeers(providers.stream()
.map(PeerAddresses::toProtobuf)
Expand Down

0 comments on commit e457e3d

Please sign in to comment.