Skip to content

Commit

Permalink
Merge pull request #83 from Peergos/opt/kademlia-ipns
Browse files Browse the repository at this point in the history
Optimise IPNS publishing and resolution
  • Loading branch information
ianopolous authored Dec 14, 2023
2 parents 9cd535d + 119bc18 commit 21243bf
Show file tree
Hide file tree
Showing 15 changed files with 350 additions and 105 deletions.
24 changes: 24 additions & 0 deletions src/main/java/org/peergos/EmbeddedIpfs.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.ipfs.multiaddr.*;
import io.ipfs.multihash.Multihash;
import io.libp2p.core.*;
import io.libp2p.core.crypto.*;
import io.libp2p.core.multiformats.*;
import io.libp2p.core.multistream.*;
import io.libp2p.protocol.*;
Expand All @@ -22,12 +23,14 @@
import org.peergos.protocol.circuit.*;
import org.peergos.protocol.dht.*;
import org.peergos.protocol.http.*;
import org.peergos.protocol.ipns.*;
import org.peergos.util.Logging;

import java.nio.file.*;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.logging.*;
Expand Down Expand Up @@ -98,6 +101,27 @@ public List<HashedBlock> getBlocks(List<Want> wants, Set<PeerId> peers, boolean
.collect(Collectors.toList());
}

public CompletableFuture<Void> publishValue(PrivKey priv, byte[] value, long sequence, int hoursTtl) {
Multihash pub = Multihash.deserialize(PeerId.fromPubKey(priv.publicKey()).getBytes());
LocalDateTime expiry = LocalDateTime.now().plusHours(hoursTtl);
long ttlNanos = hoursTtl * 3600_000_000_000L;
byte[] signedRecord = IPNS.createSignedRecord(value, expiry, sequence, ttlNanos, priv);
return dht.publishValue(pub, signedRecord, node);
}

public CompletableFuture<Void> publishPresignedRecord(Multihash pub, byte[] presignedRecord) {
return dht.publishValue(pub, presignedRecord, node);
}

public CompletableFuture<byte[]> resolveValue(PubKey pub, int minResults) {
Multihash publisher = Multihash.deserialize(PeerId.fromPubKey(pub).getBytes());
List<IpnsRecord> candidates = dht.resolveValue(publisher, minResults, node);
List<IpnsRecord> records = candidates.stream().sorted().collect(Collectors.toList());
if (records.isEmpty())
return CompletableFuture.failedFuture(new IllegalStateException("Couldn't resolve IPNS value for " + pub));
return CompletableFuture.completedFuture(records.get(records.size() - 1).value);
}

public void start() {
LOG.info("Starting IPFS...");
Thread shutdownHook = new Thread(() -> {
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/org/peergos/protocol/dht/DatabaseRecordStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.sql.*;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Optional;
import java.util.*;

public class DatabaseRecordStore implements RecordStore {

Expand Down Expand Up @@ -61,7 +61,7 @@ private String hashToKey(Multihash hash) {
}

@Override
public Optional<IpnsRecord> get(Cid peerId) {
public Optional<IpnsRecord> get(Multihash peerId) {
String selectSQL = "SELECT raw, sequence, ttlNanos, expiryUTC, val FROM " + RECORD_TABLE + " WHERE peerId=?";
try (PreparedStatement pstmt = connection.prepareStatement(selectSQL)) {
pstmt.setString(1, hashToKey(peerId));
Expand All @@ -76,7 +76,7 @@ public Optional<IpnsRecord> get(Cid peerId) {
LocalDateTime expiry = LocalDateTime.ofEpochSecond(rs.getLong("expiryUTC"),
0, ZoneOffset.UTC);
IpnsRecord record = new IpnsRecord(bout.toByteArray(), rs.getLong("sequence"),
rs.getLong("ttlNanos"), expiry, rs.getString("val"));
rs.getLong("ttlNanos"), expiry, rs.getString("val").getBytes());
return Optional.of(record);
} catch (IOException readEx) {
throw new IllegalStateException(readEx);
Expand All @@ -102,8 +102,8 @@ public void put(Multihash peerId, IpnsRecord record) {
pstmt.setLong(3, record.sequence);
pstmt.setLong(4, record.ttlNanos);
pstmt.setLong(5, record.expiry.toEpochSecond(ZoneOffset.UTC));
pstmt.setString(6, record.value.length() > SIZE_OF_VAL ?
record.value.substring(0, SIZE_OF_VAL) : record.value);
pstmt.setString(6, new String(record.value.length > SIZE_OF_VAL ?
Arrays.copyOfRange(record.value, 0, SIZE_OF_VAL) : record.value));
pstmt.executeUpdate();
} catch (SQLException ex) {
throw new IllegalStateException(ex);
Expand Down
236 changes: 187 additions & 49 deletions src/main/java/org/peergos/protocol/dht/Kademlia.java

Large diffs are not rendered by default.

29 changes: 2 additions & 27 deletions src/main/java/org/peergos/protocol/dht/KademliaController.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ public interface KademliaController {

CompletableFuture<Boolean> send(Dht.Message msg);

default CompletableFuture<List<PeerAddresses>> closerPeers(Multihash peerID) {
default CompletableFuture<List<PeerAddresses>> closerPeers(byte[] key) {
return rpc(Dht.Message.newBuilder()
.setType(Dht.Message.MessageType.FIND_NODE)
.setKey(ByteString.copyFrom(peerID.toBytes()))
.setKey(ByteString.copyFrom(key))
.build())
.thenApply(resp -> resp.getCloserPeersList().stream()
.map(PeerAddresses::fromProtobuf)
Expand All @@ -47,31 +47,6 @@ default CompletableFuture<Providers> getProviders(Multihash block) {
.thenApply(Providers::fromProtobuf);
}

default CompletableFuture<Boolean> putValue(String pathToPublish, LocalDateTime expiry, long sequence,
long ttlNanos, Multihash peerId, PrivKey ourKey) {
byte[] cborEntryData = IPNS.createCborDataForIpnsEntry(pathToPublish, expiry,
Ipns.IpnsEntry.ValidityType.EOL_VALUE, sequence, ttlNanos);
String expiryString = IPNS.formatExpiry(expiry);
byte[] signature = ourKey.sign(IPNS.createSigV2Data(cborEntryData));
PubKey pubKey = ourKey.publicKey();
byte[] pubKeyProtobuf = Crypto.PublicKey.newBuilder()
.setType(pubKey.getKeyType())
.setData(ByteString.copyFrom(pubKey.raw()))
.build()
.toByteArray();
byte[] ipnsEntry = Ipns.IpnsEntry.newBuilder()
.setSequence(sequence)
.setTtl(ttlNanos)
.setValue(ByteString.copyFrom(pathToPublish.getBytes()))
.setValidityType(Ipns.IpnsEntry.ValidityType.EOL)
.setValidity(ByteString.copyFrom(expiryString.getBytes()))
.setData(ByteString.copyFrom(cborEntryData))
.setSignatureV2(ByteString.copyFrom(signature))
.setPubKey(ByteString.copyFrom(pubKeyProtobuf)) // not needed with Ed25519
.build().toByteArray();
return putValue(peerId, ipnsEntry);
}

default CompletableFuture<Boolean> putValue(Multihash peerId, byte[] value) {
byte[] ipnsRecordKey = IPNS.getKey(peerId);
Dht.Message outgoing = Dht.Message.newBuilder()
Expand Down
16 changes: 14 additions & 2 deletions src/main/java/org/peergos/protocol/dht/KademliaEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.google.protobuf.*;
import com.offbynull.kademlia.*;
import io.ipfs.cid.*;
import io.ipfs.multiaddr.*;
import io.ipfs.multihash.Multihash;
import io.libp2p.core.*;
import io.libp2p.core.Stream;
Expand Down Expand Up @@ -97,12 +96,25 @@ public List<PeerAddresses> getKClosestPeers(byte[] key) {
.collect(Collectors.toList());
}

public void addRecord(Multihash publisher, IpnsRecord record) {
ipnsStore.put(publisher, record);
}

public Optional<IpnsRecord> getRecord(Multihash publisher) {
return ipnsStore.get(publisher);
}

public void receiveRequest(Dht.Message msg, PeerId source, Stream stream) {
responderReceivedBytes.inc(msg.getSerializedSize());
switch (msg.getType()) {
case PUT_VALUE: {
Optional<IpnsMapping> mapping = IPNS.validateIpnsEntry(msg);
Optional<IpnsMapping> mapping = IPNS.parseAndValidateIpnsEntry(msg);
if (mapping.isPresent()) {
Optional<IpnsRecord> existing = ipnsStore.get(mapping.get().publisher);
if (existing.isPresent() && mapping.get().value.compareTo(existing.get()) < 0) {
// don't add 'older' record
return;
}
ipnsStore.put(mapping.get().publisher, mapping.get().value);
stream.writeAndFlush(msg);
responderSentBytes.inc(msg.getSerializedSize());
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/org/peergos/protocol/dht/RamRecordStore.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.peergos.protocol.dht;

import io.ipfs.cid.*;
import io.ipfs.multihash.*;
import org.peergos.protocol.ipns.*;

Expand All @@ -19,7 +18,7 @@ public void put(Multihash peerId, IpnsRecord record) {
}

@Override
public Optional<IpnsRecord> get(Cid peerId) {
public Optional<IpnsRecord> get(Multihash peerId) {
return Optional.ofNullable(records.get(peerId));
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/peergos/protocol/dht/RecordStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public interface RecordStore extends AutoCloseable {

void put(Multihash peerId, IpnsRecord record);

Optional<IpnsRecord> get(Cid peerId);
Optional<IpnsRecord> get(Multihash peerId);

void remove(Multihash peerId);
}
2 changes: 1 addition & 1 deletion src/main/java/org/peergos/protocol/ipns/GetResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public static GetResult fromProtobuf(Dht.Message msg) {
.map(PeerAddresses::fromProtobuf)
.collect(Collectors.toList());
Optional<IpnsMapping> record = msg.hasRecord() ?
IPNS.validateIpnsEntry(msg) :
IPNS.parseAndValidateIpnsEntry(msg) :
Optional.empty();
return new GetResult(record, closerPeers);
}
Expand Down
49 changes: 41 additions & 8 deletions src/main/java/org/peergos/protocol/ipns/IPNS.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,49 @@ public static byte[] getKey(Multihash peerId) {
return bout.toByteArray();
}

public static byte[] createSignedRecord(byte[] value,
LocalDateTime expiry,
long sequence,
long ttlNanos,
PrivKey ourKey) {
byte[] cborEntryData = IPNS.createCborDataForIpnsEntry(value, expiry,
Ipns.IpnsEntry.ValidityType.EOL_VALUE, sequence, ttlNanos);
String expiryString = IPNS.formatExpiry(expiry);
byte[] signature = ourKey.sign(IPNS.createSigV2Data(cborEntryData));
PubKey pubKey = ourKey.publicKey();
Ipns.IpnsEntry.Builder entryBuilder = Ipns.IpnsEntry.newBuilder()
.setSequence(sequence)
.setTtl(ttlNanos)
.setValue(ByteString.copyFrom(value))
.setValidityType(Ipns.IpnsEntry.ValidityType.EOL)
.setValidity(ByteString.copyFrom(expiryString.getBytes()))
.setData(ByteString.copyFrom(cborEntryData))
.setSignatureV2(ByteString.copyFrom(signature));
if (ourKey.getKeyType() != Crypto.KeyType.Ed25519) {
byte[] pubKeyProtobuf = Crypto.PublicKey.newBuilder()
.setType(pubKey.getKeyType())
.setData(ByteString.copyFrom(pubKey.raw()))
.build()
.toByteArray();
entryBuilder = entryBuilder.setPubKey(ByteString.copyFrom(pubKeyProtobuf)); // not needed with Ed25519
}
return entryBuilder.build().toByteArray();
}

public static Cid getCidFromKey(ByteString key) {
if (! key.startsWith(ByteString.copyFrom("/ipns/".getBytes(StandardCharsets.UTF_8))))
throw new IllegalStateException("Unknown IPNS key space: " + key);
return Cid.cast(key.substring(6).toByteArray());
}

public static byte[] createCborDataForIpnsEntry(String pathToPublish,
public static byte[] createCborDataForIpnsEntry(byte[] value,
LocalDateTime expiry,
long validityType,
long sequence,
long ttl) {
SortedMap<String, Cborable> state = new TreeMap<>();
state.put("TTL", new CborObject.CborLong(ttl));
state.put("Value", new CborObject.CborByteArray(pathToPublish.getBytes()));
state.put("Value", new CborObject.CborByteArray(value));
state.put("Sequence", new CborObject.CborLong(sequence));
String expiryString = formatExpiry(expiry);
state.put("Validity", new CborObject.CborByteArray(expiryString.getBytes(StandardCharsets.UTF_8)));
Expand All @@ -66,17 +95,22 @@ public static byte[] createSigV2Data(byte[] data) {
}
}

public static Optional<IpnsMapping> validateIpnsEntry(Dht.Message msg) {
public static Optional<IpnsMapping> parseAndValidateIpnsEntry(Dht.Message msg) {
if (! msg.hasRecord() || msg.getRecord().getValue().size() > IPNS.MAX_RECORD_SIZE)
return Optional.empty();
if (! msg.getKey().equals(msg.getRecord().getKey()))
return Optional.empty();
if (! msg.getRecord().getKey().startsWith(ByteString.copyFrom("/ipns/".getBytes(StandardCharsets.UTF_8))))
byte[] entryBytes = msg.getRecord().getValue().toByteArray();
return parseAndValidateIpnsEntry(msg.getRecord().getKey().toByteArray(), entryBytes);
}

public static Optional<IpnsMapping> parseAndValidateIpnsEntry(byte[] key, byte[] entryBytes) {
if (! Arrays.equals(Arrays.copyOfRange(key, 0, 6), "/ipns/".getBytes(StandardCharsets.UTF_8)))
return Optional.empty();
byte[] cidBytes = msg.getRecord().getKey().substring(6).toByteArray();
byte[] cidBytes = Arrays.copyOfRange(key, 6, key.length);
Multihash signer = Multihash.deserialize(cidBytes);
try {
Ipns.IpnsEntry entry = Ipns.IpnsEntry.parseFrom(msg.getRecord().getValue());
Ipns.IpnsEntry entry = Ipns.IpnsEntry.parseFrom(entryBytes);
if (! entry.hasSignatureV2() || ! entry.hasData())
return Optional.empty();
PubKey pub;
Expand Down Expand Up @@ -108,8 +142,7 @@ public static Optional<IpnsMapping> validateIpnsEntry(Dht.Message msg) {
LocalDateTime expiry = LocalDateTime.parse(new String(validity).substring(0, validity.length - 1), IPNS.rfc3339nano);
if (expiry.isBefore(LocalDateTime.now()))
return Optional.empty();
byte[] entryBytes = msg.getRecord().getValue().toByteArray();
IpnsRecord record = new IpnsRecord(entryBytes, entry.getSequence(), entry.getTtl(), expiry, entry.getValue().toStringUtf8());
IpnsRecord record = new IpnsRecord(entryBytes, entry.getSequence(), entry.getTtl(), expiry, entry.getValue().toByteArray());
return Optional.of(new IpnsMapping(signer, record));
} catch (InvalidProtocolBufferException e) {
return Optional.empty();
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/peergos/protocol/ipns/IpnsRecord.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ public class IpnsRecord implements Comparable<IpnsRecord> {
public final byte[] raw;
public final long sequence, ttlNanos;
public final LocalDateTime expiry;
public final String value;
public final byte[] value;

public IpnsRecord(byte[] raw, long sequence, long ttlNanos, LocalDateTime expiry, String value) {
public IpnsRecord(byte[] raw, long sequence, long ttlNanos, LocalDateTime expiry, byte[] value) {
this.raw = raw;
this.sequence = sequence;
this.ttlNanos = ttlNanos;
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/org/peergos/DatabaseRecordStoreTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class DatabaseRecordStoreTest {
public void testRecordStore() {
try (DatabaseRecordStore bs = new DatabaseRecordStore("mem:")) {
LocalDateTime now = LocalDateTime.now();
IpnsRecord record = new IpnsRecord("raw".getBytes(), 1, 2, now, "value");
IpnsRecord record = new IpnsRecord("raw".getBytes(), 1, 2, now, "value".getBytes());
Cid peerId = Cid.decode("zb2rhYSxw4ZjuzgCnWSt19Q94ERaeFhu9uSqRgjSdx9bsgM6f");
bs.put(peerId, record);
//make sure PUTing a second time succeeds
Expand Down
60 changes: 60 additions & 0 deletions src/test/java/org/peergos/EmbeddedIpfsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@
import identify.pb.*;
import io.ipfs.cid.*;
import io.ipfs.multiaddr.*;
import io.ipfs.multihash.Multihash;
import io.libp2p.core.*;
import io.libp2p.core.crypto.*;
import io.libp2p.core.multiformats.*;
import io.libp2p.crypto.keys.*;
import io.libp2p.protocol.*;
import org.junit.*;
import org.peergos.blockstore.*;
import org.peergos.config.*;
import org.peergos.protocol.dht.*;
import org.peergos.protocol.ipns.*;

import java.time.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
Expand Down Expand Up @@ -39,6 +43,62 @@ public void largeBlock() throws Exception {
node2.stop();
}

@Test
public void publishValue() throws Exception {
EmbeddedIpfs node1 = build(BootstrapTest.BOOTSTRAP_NODES, List.of(new MultiAddress("/ip4/127.0.0.1/tcp/" + TestPorts.getPort())));
node1.start();

PrivKey publisher = Ed25519Kt.generateEd25519KeyPair().getFirst();
byte[] value = "This is a test".getBytes();
node1.publishValue(publisher, value, 1, 24).join();
byte[] res = node1.resolveValue(publisher.publicKey(), 5).join();
Assert.assertTrue(Arrays.equals(res, value));

node1.stop();
}

@Test
public void publishPresignedValue() throws Exception {
EmbeddedIpfs node1 = build(BootstrapTest.BOOTSTRAP_NODES, List.of(new MultiAddress("/ip4/127.0.0.1/tcp/" + TestPorts.getPort())));
node1.start();

PrivKey publisher = Ed25519Kt.generateEd25519KeyPair().getFirst();
byte[] value = "This is a test".getBytes();
io.ipfs.multihash.Multihash pub = Multihash.deserialize(PeerId.fromPubKey(publisher.publicKey()).getBytes());
long hoursTtl = 24*365;
LocalDateTime expiry = LocalDateTime.now().plusHours(hoursTtl);
long ttlNanos = hoursTtl * 3600_000_000_000L;
byte[] signedRecord = IPNS.createSignedRecord(value, expiry, 1, ttlNanos, publisher);
node1.publishPresignedRecord(pub, signedRecord).join();
node1.publishPresignedRecord(pub, signedRecord).join();
node1.publishPresignedRecord(pub, signedRecord).join();

byte[] res = node1.resolveValue(publisher.publicKey(), 5).join();
Assert.assertTrue(Arrays.equals(res, value));

// publish an updated value with same expiry
byte[] value2 = "Updated value".getBytes();
byte[] signedRecord2 = IPNS.createSignedRecord(value2, expiry, 2, ttlNanos, publisher);
node1.publishPresignedRecord(pub, signedRecord2).join();
node1.publishPresignedRecord(pub, signedRecord2).join();
node1.publishPresignedRecord(pub, signedRecord2).join();

byte[] res2 = node1.resolveValue(publisher.publicKey(), 5).join();
Assert.assertTrue(Arrays.equals(res2, value2));

// publish an updated value with earlier expiry
byte[] value3 = "3rd value to put in IPNS".getBytes();
byte[] signedRecord3 = IPNS.createSignedRecord(value3, expiry.minusDays(1), 3, ttlNanos, publisher);
node1.publishPresignedRecord(pub, signedRecord3).join();
node1.publishPresignedRecord(pub, signedRecord3).join();
node1.publishPresignedRecord(pub, signedRecord3).join();

byte[] res3 = node1.resolveValue(publisher.publicKey(), 5).join();
Assert.assertTrue(Arrays.equals(res3, value3));

node1.stop();
}

@Test
public void wildcardListenerAddressesGetExpanded() {
int node1Port = TestPorts.getPort();
Expand Down
Loading

0 comments on commit 21243bf

Please sign in to comment.