Skip to content

Commit

Permalink
feat!: set peer-exchange with default bootstrap (#1469)
Browse files Browse the repository at this point in the history
* set peer-exchange with default bootstrap

* only initialise protocols with bootstrap peers

* update package

* update package-lock

* refactor `getPeers` while setting up a protocol

* move codecs to `@waku/interfaces`

* lightpush: send messages to multiple peers

* only use multiple peers for LP and Filter

* fix: ts warnings

* lightpush: tests pass

* update breaking changes for new API

* move codecs back into protocol files

* refactor: `getPeers()`

* rm: log as an arg

* add tsdoc for getPeers

* add import

* add prettier rule to eslint

* add: peer exchange to sdk as a dep

* fix eslint error

* add try catch

* revert unecessary diff

* revert unecessary diff

* fix imports

* convert relaycodecs to array

* remove: peerId as an arg for protocol methods

* keep peerId as an arg for peer-exchange

* remove: peerId from getPeers()

* lightpush: extract hardcoded numPeers as a constant

* return all peers if numPeers is 0 and increase readability for random peers

* refactor considering more than 1 bootstrap peers can exist

* use `getPeers`

* change arg for `getPeers` to object

* address comments

* refactor tests for new API

* lightpush: make constant the class variable

* use `maxBootstrapPeers` instead of `includeBootstrap`

* refactor protocols for new API

* add tests for `getPeers`

* skip getPeers test

* rm: only from test

* move tests to `base_protocol.spec.ts`

* break down `getPeers` into a `filter` method

* return all bootstrap peers if arg is 0

* refactor test without stubbing

* address comments

* update test title

* move `filterPeers` to a separate file

* address comments & add more test

* make test title more verbose

* address comments

* remove ProtocolOptions

* chore: refactor tests for new API

* add defaults for getPeers

* address comments

* rm unneeded comment

* address comment: add diversity of node tags to test

* address comments

* fix: imports
  • Loading branch information
danisharora099 authored Sep 7, 2023
1 parent 408b79d commit 81a52a8
Show file tree
Hide file tree
Showing 23 changed files with 348 additions and 151 deletions.
3 changes: 3 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 3 additions & 7 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,11 @@ export * as waku_filter from "./lib/filter/index.js";
export { wakuFilter, FilterCodecs } from "./lib/filter/index.js";

export * as waku_light_push from "./lib/light_push/index.js";
export { wakuLightPush, LightPushCodec } from "./lib/light_push/index.js";
export { wakuLightPush } from "./lib/light_push/index.js";

export * as waku_store from "./lib/store/index.js";
export {
PageDirection,
wakuStore,
StoreCodec,
createCursor
} from "./lib/store/index.js";

export { PageDirection, wakuStore, createCursor } from "./lib/store/index.js";

export { waitForRemotePeer } from "./lib/wait_for_remote_peer.js";

Expand Down
29 changes: 29 additions & 0 deletions packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Peer, PeerStore } from "@libp2p/interface/peer-store";
import type { IBaseProtocol, Libp2pComponents } from "@waku/interfaces";
import { getPeersForProtocol, selectPeerForProtocol } from "@waku/utils/libp2p";

import { filterPeers } from "./filterPeers.js";
import { StreamManager } from "./stream_manager.js";

/**
Expand Down Expand Up @@ -60,4 +61,32 @@ export class BaseProtocol implements IBaseProtocol {
);
return peer;
}

/**
* Retrieves a list of peers based on the specified criteria.
*
* @param numPeers - The total number of peers to retrieve. If 0, all peers are returned.
* @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve.
* @returns A Promise that resolves to an array of peers based on the specified criteria.
*/
protected async getPeers(
{
numPeers,
maxBootstrapPeers
}: {
numPeers: number;
maxBootstrapPeers: number;
} = {
maxBootstrapPeers: 1,
numPeers: 0
}
): Promise<Peer[]> {
// Retrieve all peers that support the protocol
const allPeersForProtocol = await getPeersForProtocol(this.peerStore, [
this.multicodec
]);

// Filter the peers based on the specified criteria
return filterPeers(allPeersForProtocol, numPeers, maxBootstrapPeers);
}
}
25 changes: 12 additions & 13 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { Stream } from "@libp2p/interface/connection";
import type { PeerId } from "@libp2p/interface/peer-id";
import type { Peer } from "@libp2p/interface/peer-store";
import type { IncomingStreamData } from "@libp2p/interface-internal/registrar";
import type {
Expand All @@ -14,7 +13,6 @@ import type {
Libp2p,
PeerIdStr,
ProtocolCreateOptions,
ProtocolOptions,
PubSubTopic,
Unsubscribe
} from "@waku/interfaces";
Expand Down Expand Up @@ -228,6 +226,7 @@ class Subscription {
class Filter extends BaseProtocol implements IReceiver {
private readonly options: ProtocolCreateOptions;
private activeSubscriptions = new Map<string, Subscription>();
private readonly NUM_PEERS_PROTOCOL = 1;

private getActiveSubscription(
pubSubTopic: PubSubTopic,
Expand Down Expand Up @@ -257,14 +256,16 @@ class Filter extends BaseProtocol implements IReceiver {
this.options = options ?? {};
}

async createSubscription(
pubSubTopic?: string,
peerId?: PeerId
): Promise<Subscription> {
async createSubscription(pubSubTopic?: string): Promise<Subscription> {
const _pubSubTopic =
pubSubTopic ?? this.options.pubSubTopic ?? DefaultPubSubTopic;

const peer = await this.getPeer(peerId);
const peer = (
await this.getPeers({
maxBootstrapPeers: 1,
numPeers: this.NUM_PEERS_PROTOCOL
})
)[0];

const subscription =
this.getActiveSubscription(_pubSubTopic, peer.id.toString()) ??
Expand All @@ -278,10 +279,9 @@ class Filter extends BaseProtocol implements IReceiver {
}

public toSubscriptionIterator<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
opts?: ProtocolOptions | undefined
decoders: IDecoder<T> | IDecoder<T>[]
): Promise<IAsyncIterator<T>> {
return toAsyncIterator(this, decoders, opts);
return toAsyncIterator(this, decoders);
}

/**
Expand All @@ -301,10 +301,9 @@ class Filter extends BaseProtocol implements IReceiver {
*/
async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
opts?: ProtocolOptions
callback: Callback<T>
): Promise<Unsubscribe> {
const subscription = await this.createSubscription(undefined, opts?.peerId);
const subscription = await this.createSubscription();

await subscription.subscribe(decoders, callback);

Expand Down
144 changes: 144 additions & 0 deletions packages/core/src/lib/filterPeers.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import { Peer } from "@libp2p/interface/peer-store";
import type { Tag } from "@libp2p/interface/peer-store";
import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
import { Tags } from "@waku/interfaces";
import { expect } from "chai";

import { filterPeers } from "./filterPeers.js";

describe("filterPeers function", function () {
it("should return all peers when numPeers is 0", async function () {
const peer1 = await createSecp256k1PeerId();
const peer2 = await createSecp256k1PeerId();
const peer3 = await createSecp256k1PeerId();

const mockPeers = [
{
id: peer1,
tags: new Map<string, Tag>([[Tags.BOOTSTRAP, { value: 100 }]])
},
{
id: peer2,
tags: new Map<string, Tag>([[Tags.BOOTSTRAP, { value: 100 }]])
},
{
id: peer3,
tags: new Map<string, Tag>([[Tags.PEER_EXCHANGE, { value: 100 }]])
}
] as unknown as Peer[];

const result = await filterPeers(mockPeers, 0, 10);
expect(result.length).to.deep.equal(mockPeers.length);
});

it("should return all non-bootstrap peers and no bootstrap peer when numPeers is 0 and maxBootstrapPeers is 0", async function () {
const peer1 = await createSecp256k1PeerId();
const peer2 = await createSecp256k1PeerId();
const peer3 = await createSecp256k1PeerId();
const peer4 = await createSecp256k1PeerId();

const mockPeers = [
{
id: peer1,
tags: new Map<string, Tag>([[Tags.BOOTSTRAP, { value: 100 }]])
},
{
id: peer2,
tags: new Map<string, Tag>([[Tags.BOOTSTRAP, { value: 100 }]])
},
{
id: peer3,
tags: new Map<string, Tag>([[Tags.PEER_EXCHANGE, { value: 100 }]])
},
{
id: peer4,
tags: new Map<string, Tag>([[Tags.PEER_EXCHANGE, { value: 100 }]])
}
] as unknown as Peer[];

const result = await filterPeers(mockPeers, 0, 0);

// result should have no bootstrap peers, and a total of 2 peers
expect(result.length).to.equal(2);
expect(
result.filter((peer: Peer) => peer.tags.has(Tags.BOOTSTRAP)).length
).to.equal(0);
});

it("should return one bootstrap peer, and all non-boostrap peers, when numPeers is 0 & maxBootstrap is 1", async function () {
const peer1 = await createSecp256k1PeerId();
const peer2 = await createSecp256k1PeerId();
const peer3 = await createSecp256k1PeerId();
const peer4 = await createSecp256k1PeerId();
const peer5 = await createSecp256k1PeerId();

const mockPeers = [
{
id: peer1,
tags: new Map<string, Tag>([[Tags.BOOTSTRAP, { value: 100 }]])
},
{
id: peer2,
tags: new Map<string, Tag>([[Tags.BOOTSTRAP, { value: 100 }]])
},
{
id: peer3,
tags: new Map<string, Tag>([[Tags.PEER_EXCHANGE, { value: 100 }]])
},
{
id: peer4,
tags: new Map<string, Tag>([[Tags.PEER_EXCHANGE, { value: 100 }]])
},
{
id: peer5,
tags: new Map<string, Tag>([[Tags.PEER_EXCHANGE, { value: 100 }]])
}
] as unknown as Peer[];

const result = await filterPeers(mockPeers, 0, 1);

// result should have 1 bootstrap peers, and a total of 4 peers
expect(result.length).to.equal(4);
expect(
result.filter((peer: Peer) => peer.tags.has(Tags.BOOTSTRAP)).length
).to.equal(1);
});

it("should return only bootstrap peers up to maxBootstrapPeers", async function () {
const peer1 = await createSecp256k1PeerId();
const peer2 = await createSecp256k1PeerId();
const peer3 = await createSecp256k1PeerId();
const peer4 = await createSecp256k1PeerId();
const peer5 = await createSecp256k1PeerId();

const mockPeers = [
{
id: peer1,
tags: new Map<string, Tag>([[Tags.BOOTSTRAP, { value: 100 }]])
},
{
id: peer2,
tags: new Map<string, Tag>([[Tags.BOOTSTRAP, { value: 100 }]])
},
{
id: peer3,
tags: new Map<string, Tag>([[Tags.BOOTSTRAP, { value: 100 }]])
},
{
id: peer4,
tags: new Map<string, Tag>([[Tags.PEER_EXCHANGE, { value: 100 }]])
},
{
id: peer5,
tags: new Map<string, Tag>([[Tags.PEER_EXCHANGE, { value: 100 }]])
}
] as unknown as Peer[];

const result = await filterPeers(mockPeers, 5, 2);

// check that result has at least 2 bootstrap peers and no more than 5 peers
expect(result.length).to.be.at.least(2);
expect(result.length).to.be.at.most(5);
expect(result.filter((peer: Peer) => peer.tags.has(Tags.BOOTSTRAP)).length);
});
});
43 changes: 43 additions & 0 deletions packages/core/src/lib/filterPeers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { Peer } from "@libp2p/interface/peer-store";
import { Tags } from "@waku/interfaces";

/**
* Retrieves a list of peers based on the specified criteria.
*
* @param peers - The list of peers to filter from.
* @param numPeers - The total number of peers to retrieve. If 0, all peers are returned.
* @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve.
* @returns A Promise that resolves to an array of peers based on the specified criteria.
*/
export async function filterPeers(
peers: Peer[],
numPeers: number,
maxBootstrapPeers: number
): Promise<Peer[]> {
// Collect the bootstrap peers up to the specified maximum
const bootstrapPeers = peers
.filter((peer) => peer.tags.has(Tags.BOOTSTRAP))
.slice(0, maxBootstrapPeers);

// Collect non-bootstrap peers
const nonBootstrapPeers = peers.filter(
(peer) => !peer.tags.has(Tags.BOOTSTRAP)
);

// If numPeers is 0, return all peers
if (numPeers === 0) {
return [...bootstrapPeers, ...nonBootstrapPeers];
}

// Initialize the list of selected peers with the bootstrap peers
const selectedPeers: Peer[] = [...bootstrapPeers];

// Fill up to numPeers with remaining random peers if needed
while (selectedPeers.length < numPeers && nonBootstrapPeers.length > 0) {
const randomIndex = Math.floor(Math.random() * nonBootstrapPeers.length);
const randomPeer = nonBootstrapPeers.splice(randomIndex, 1)[0];
selectedPeers.push(randomPeer);
}

return selectedPeers;
}
Loading

0 comments on commit 81a52a8

Please sign in to comment.