Skip to content

Commit

Permalink
feat!: deprecate named pubsub topics and use static/auto sharding (#2097
Browse files Browse the repository at this point in the history
)

* feat: deprecate named sharding & protocols adhere
simplify network config type, all protocols use pubsub topic internally

* chore: update tests

* tests: rm application info

* chore: use static sharding and auto sharding terminologies

* chore: update docs for network config

* chore: update interfaces

* tests: update tests error message

* chore: remove `ShardingParams` type and fix test
  • Loading branch information
danisharora099 authored Aug 12, 2024
1 parent 86f730f commit 5ce36c8
Show file tree
Hide file tree
Showing 54 changed files with 376 additions and 787 deletions.
10 changes: 3 additions & 7 deletions packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ import type { Peer, PeerStore, Stream } from "@libp2p/interface";
import type {
IBaseProtocolCore,
Libp2pComponents,
ProtocolCreateOptions,
PubsubTopic
} from "@waku/interfaces";
import { ensureShardingConfigured, Logger } from "@waku/utils";
import { Logger, pubsubTopicsToShardInfo } from "@waku/utils";
import {
getConnectedPeersForProtocolAndShard,
getPeersForProtocol,
Expand All @@ -29,8 +28,7 @@ export class BaseProtocol implements IBaseProtocolCore {
public multicodec: string,
private components: Libp2pComponents,
private log: Logger,
public readonly pubsubTopics: PubsubTopic[],
private options?: ProtocolCreateOptions
public readonly pubsubTopics: PubsubTopic[]
) {
this.addLibp2pEventListener = components.events.addEventListener.bind(
components.events
Expand Down Expand Up @@ -100,9 +98,7 @@ export class BaseProtocol implements IBaseProtocolCore {
this.components.connectionManager.getConnections(),
this.peerStore,
[this.multicodec],
this.options?.shardInfo
? ensureShardingConfigured(this.options.shardInfo).shardInfo
: undefined
pubsubTopicsToShardInfo(this.pubsubTopics)
);

// Filter the peers based on discovery & number of peers requested
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/lib/connection_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ export class ConnectionManager
private constructor(
libp2p: Libp2p,
keepAliveOptions: KeepAliveOptions,
private configuredPubsubTopics: PubsubTopic[],
public readonly configuredPubsubTopics: PubsubTopic[],
relay?: IRelay,
options?: Partial<ConnectionManagerOptions>
) {
Expand Down
13 changes: 3 additions & 10 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import {
type CoreProtocolResult,
type IBaseProtocolCore,
type Libp2p,
type ProtocolCreateOptions,
ProtocolError,
type PubsubTopic
} from "@waku/interfaces";
Expand Down Expand Up @@ -38,16 +37,10 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
wakuMessage: WakuMessage,
peerIdStr: string
) => Promise<void>,
libp2p: Libp2p,
options?: ProtocolCreateOptions
public readonly pubsubTopics: PubsubTopic[],
libp2p: Libp2p
) {
super(
FilterCodecs.SUBSCRIBE,
libp2p.components,
log,
options!.pubsubTopics!,
options
);
super(FilterCodecs.SUBSCRIBE, libp2p.components, log, pubsubTopics);

libp2p
.handle(FilterCodecs.PUSH, this.onRequest.bind(this), {
Expand Down
15 changes: 6 additions & 9 deletions packages/core/src/lib/light_push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import {
type IEncoder,
type IMessage,
type Libp2p,
type ProtocolCreateOptions,
ProtocolError,
PubsubTopic,
type ThisOrThat
} from "@waku/interfaces";
import { PushResponse } from "@waku/proto";
Expand All @@ -32,14 +32,11 @@ type PreparePushMessageResult = ThisOrThat<"query", PushRpc>;
* Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/).
*/
export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
public constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(
LightPushCodec,
libp2p.components,
log,
options!.pubsubTopics!,
options
);
public constructor(
public readonly pubsubTopics: PubsubTopic[],
libp2p: Libp2p
) {
super(LightPushCodec, libp2p.components, log, pubsubTopics);
}

private async preparePushMessage(
Expand Down
23 changes: 11 additions & 12 deletions packages/core/src/lib/metadata/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import {
type MetadataQueryResult,
type PeerIdStr,
ProtocolError,
PubsubTopic,
type ShardInfo
} from "@waku/interfaces";
import { proto_metadata } from "@waku/proto";
import { encodeRelayShard, Logger, shardInfoToPubsubTopics } from "@waku/utils";
import { encodeRelayShard, Logger, pubsubTopicsToShardInfo } from "@waku/utils";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
Expand All @@ -26,15 +27,10 @@ class Metadata extends BaseProtocol implements IMetadata {
protected handshakesConfirmed: Map<PeerIdStr, ShardInfo> = new Map();

public constructor(
public shardInfo: ShardInfo,
public pubsubTopics: PubsubTopic[],
libp2p: Libp2pComponents
) {
super(
MetadataCodec,
libp2p.components,
log,
shardInfoToPubsubTopics(shardInfo)
);
super(MetadataCodec, libp2p.components, log, pubsubTopics);
this.libp2pComponents = libp2p;
void libp2p.registrar.handle(MetadataCodec, (streamData) => {
void this.onRequest(streamData);
Expand All @@ -45,7 +41,9 @@ class Metadata extends BaseProtocol implements IMetadata {
* Make a metadata query to a peer
*/
public async query(peerId: PeerId): Promise<MetadataQueryResult> {
const request = proto_metadata.WakuMetadataRequest.encode(this.shardInfo);
const request = proto_metadata.WakuMetadataRequest.encode(
pubsubTopicsToShardInfo(this.pubsubTopics)
);

const peer = await this.peerStore.get(peerId);
if (!peer) {
Expand Down Expand Up @@ -112,7 +110,7 @@ class Metadata extends BaseProtocol implements IMetadata {
try {
const { stream, connection } = streamData;
const encodedShardInfo = proto_metadata.WakuMetadataResponse.encode(
this.shardInfo
pubsubTopicsToShardInfo(this.pubsubTopics)
);

const encodedResponse = await pipe(
Expand Down Expand Up @@ -177,7 +175,8 @@ class Metadata extends BaseProtocol implements IMetadata {
}

export function wakuMetadata(
shardInfo: ShardInfo
pubsubTopics: PubsubTopic[]
): (components: Libp2pComponents) => IMetadata {
return (components: Libp2pComponents) => new Metadata(shardInfo, components);
return (components: Libp2pComponents) =>
new Metadata(pubsubTopics, components);
}
15 changes: 6 additions & 9 deletions packages/core/src/lib/store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
IDecoder,
IStoreCore,
Libp2p,
ProtocolCreateOptions,
PubsubTopic,
QueryRequestParams
} from "@waku/interfaces";
import { Logger } from "@waku/utils";
Expand All @@ -28,14 +28,11 @@ const log = new Logger("store");
export const StoreCodec = "/vac/waku/store-query/3.0.0";

export class StoreCore extends BaseProtocol implements IStoreCore {
public constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(
StoreCodec,
libp2p.components,
log,
options?.pubsubTopics || [],
options
);
public constructor(
public readonly pubsubTopics: PubsubTopic[],
libp2p: Libp2p
) {
super(StoreCodec, libp2p.components, log, pubsubTopics);
}

public async *queryPerPage<T extends IDecodedMessage>(
Expand Down
3 changes: 3 additions & 0 deletions packages/interfaces/src/connection_manager.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import type { Peer, PeerId, TypedEventEmitter } from "@libp2p/interface";

import { PubsubTopic } from "./misc";

export enum Tags {
BOOTSTRAP = "bootstrap",
PEER_EXCHANGE = "peer-exchange",
Expand Down Expand Up @@ -61,6 +63,7 @@ export interface IConnectionStateEvents {

export interface IConnectionManager
extends TypedEventEmitter<IPeersByDiscoveryEvents & IConnectionStateEvents> {
configuredPubsubTopics: PubsubTopic[];
dropConnection(peerId: PeerId): Promise<void>;
getPeersByDiscovery(): Promise<PeersByDiscoveryResult>;
stop(): void;
Expand Down
4 changes: 3 additions & 1 deletion packages/interfaces/src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ShardInfo } from "./enr";
import type { ShardInfo } from "./sharding";

/**
* The default cluster ID for The Waku Network
Expand All @@ -12,3 +12,5 @@ export const DefaultShardInfo: ShardInfo = {
clusterId: DEFAULT_CLUSTER_ID,
shards: [0, 1, 2, 3, 4, 5, 6, 7, 8]
};

export const DefaultNetworkConfig = DefaultShardInfo;
7 changes: 2 additions & 5 deletions packages/interfaces/src/enr.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import type { PeerId } from "@libp2p/interface";
import type { PeerInfo } from "@libp2p/interface";
import type { Multiaddr } from "@multiformats/multiaddr";

import { ShardInfo } from "./sharding";

export type ENRKey = string;
export type ENRValue = Uint8Array;
/**
Expand All @@ -18,11 +20,6 @@ export interface Waku2 {
lightPush: boolean;
}

export interface ShardInfo {
clusterId: number;
shards: number[];
}

export interface IEnr extends Map<ENRKey, ENRValue> {
nodeId?: NodeId;
peerId?: PeerId;
Expand Down
1 change: 1 addition & 0 deletions packages/interfaces/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ export * from "./metadata.js";
export * from "./constants.js";
export * from "./local_storage.js";
export * from "./health_manager.js";
export * from "./sharding.js";
8 changes: 4 additions & 4 deletions packages/interfaces/src/metadata.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import type { PeerId } from "@libp2p/interface";

import { type ShardInfo } from "./enr.js";
import { ThisOrThat } from "./misc.js";
import type { IBaseProtocolCore, ShardingParams } from "./protocols.js";
import { PubsubTopic, ThisOrThat } from "./misc.js";
import type { IBaseProtocolCore } from "./protocols.js";
import type { ShardInfo } from "./sharding.js";

export type MetadataQueryResult = ThisOrThat<"shardInfo", ShardInfo>;

// IMetadata always has shardInfo defined while it is optionally undefined in IBaseProtocol
export interface IMetadata extends Omit<IBaseProtocolCore, "shardInfo"> {
shardInfo: ShardingParams;
pubsubTopics: PubsubTopic[];
confirmOrAttemptHandshake(peerId: PeerId): Promise<MetadataQueryResult>;
query(peerId: PeerId): Promise<MetadataQueryResult>;
}
63 changes: 24 additions & 39 deletions packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ import type { Libp2p } from "@libp2p/interface";
import type { PeerId } from "@libp2p/interface";
import type { Peer, PeerStore } from "@libp2p/interface";

import type { ShardInfo } from "./enr.js";
import type { CreateLibp2pOptions } from "./libp2p.js";
import type { IDecodedMessage } from "./message.js";
import { PubsubTopic, ThisAndThat, ThisOrThat } from "./misc.js";
import { ThisAndThat, ThisOrThat } from "./misc.js";
import { AutoSharding, StaticSharding } from "./sharding.js";

export enum Protocols {
Relay = "relay",
Expand All @@ -15,7 +15,6 @@ export enum Protocols {
}

export type IBaseProtocolCore = {
shardInfo?: ShardInfo;
multicodec: string;
peerStore: PeerStore;
allPeers: () => Promise<Peer[]>;
Expand All @@ -30,18 +29,7 @@ export type IBaseProtocolSDK = {
readonly numPeersToUse: number;
};

export type ContentTopicInfo = {
clusterId?: number;
contentTopics: string[];
};

export type ApplicationInfo = {
clusterId: number;
application: string;
version: string;
};

export type ShardingParams = ShardInfo | ContentTopicInfo | ApplicationInfo;
export type NetworkConfig = StaticSharding | AutoSharding;

//TODO: merge this with ProtocolCreateOptions or establish distinction: https://github.com/waku-org/js-waku/issues/2048
/**
Expand Down Expand Up @@ -72,38 +60,35 @@ export type ProtocolUseOptions = {

export type ProtocolCreateOptions = {
/**
* @deprecated
* Should be used ONLY if some other than The Waku Network is in use.
* Configuration for determining the network in use.
*
* See [Waku v2 Topic Usage Recommendations](https://github.com/vacp2p/rfc-index/blob/main/waku/informational/23/topics.md#pubsub-topics) for details.
*
* This is used by:
* - WakuRelay to receive, route and send messages,
* - WakuLightPush to send messages,
* - WakuStore to retrieve messages.
*
* If no pubsub topic is specified, the default pubsub topic will be determined from DefaultShardInfo.
* If using Static Sharding:
* Default value is configured for The Waku Network.
* The format to specify a shard is: clusterId: number, shards: number[]
* To learn more about the sharding specification, see [Relay Sharding](https://rfc.vac.dev/spec/51/).
*
* You cannot add or remove pubsub topics after initialization of the node.
* If using Auto Sharding:
* See [Waku v2 Topic Usage Recommendations](https://github.com/vacp2p/rfc-index/blob/main/waku/informational/23/topics.md#content-topics) for details.
* You cannot add or remove content topics after initialization of the node.
*/
pubsubTopics?: PubsubTopic[];
/**
* ShardInfo is used to determine which network is in use.
* Defaults to {@link @waku/interfaces!DefaultShardInfo}.
* Default value is configured for The Waku Network
* Configuration for determining the network in use.
* Network configuration refers to the shards and clusters used in the network.
*
* The format to specify a shard is:
* clusterId: number, shards: number[]
* If using Static Sharding:
* Cluster ID and shards are specified in the format: clusterId: number, shards: number[]
* The default value is configured for The Waku Network => clusterId: 0, shards: [0, 1, 2, 3, 4, 5, 6, 7]
* To learn more about the sharding specification, see [Relay Sharding](https://rfc.vac.dev/spec/51/).
*/
shardInfo?: Partial<ShardingParams>;
/**
* Content topics are used to determine network in use.
* See [Waku v2 Topic Usage Recommendations](https://github.com/vacp2p/rfc-index/blob/main/waku/informational/23/topics.md#content-topics) for details.
*
* You cannot add or remove content topics after initialization of the node.
* If using Auto Sharding:
* Cluster ID and content topics are specified in the format: clusterId: number, contentTopics: string[]
* Content topics are used to determine the shards to be configured for the network.
* Cluster ID is optional, and defaults to The Waku Network's cluster ID => 0
* To specify content topics, see [Waku v2 Topic Usage Recommendations](https://github.com/vacp2p/rfc-index/blob/main/waku/informational/23/topics.md#content-topics) for details
*
* @default { clusterId: 1, shards: [0, 1, 2, 3, 4, 5, 6, 7] }
*/
contentTopics?: string[];
networkConfig?: NetworkConfig;
/**
* You can pass options to the `Libp2p` instance used by {@link @waku/sdk!WakuNode} using the `libp2p` property.
* This property is the same type as the one passed to [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create)
Expand Down
12 changes: 12 additions & 0 deletions packages/interfaces/src/sharding.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
export type ShardInfo = {
clusterId: number;
shards: number[];
};

export type ContentTopicInfo = {
clusterId?: number;
contentTopics: string[];
};

export type StaticSharding = ShardInfo;
export type AutoSharding = ContentTopicInfo;
6 changes: 3 additions & 3 deletions packages/sdk/src/create/create.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { type LightNode } from "@waku/interfaces";

import { CreateWakuNodeOptions, WakuNode, WakuOptions } from "../waku.js";
import { CreateWakuNodeOptions, WakuNode } from "../waku.js";

import { createLibp2pAndUpdateOptions } from "./libp2p.js";

Expand All @@ -12,9 +12,9 @@ import { createLibp2pAndUpdateOptions } from "./libp2p.js";
export async function createLightNode(
options: CreateWakuNodeOptions = {}
): Promise<LightNode> {
const libp2p = await createLibp2pAndUpdateOptions(options);
const { libp2p, pubsubTopics } = await createLibp2pAndUpdateOptions(options);

return new WakuNode(options as WakuOptions, libp2p, {
return new WakuNode(pubsubTopics, options, libp2p, {
store: true,
lightpush: true,
filter: true
Expand Down
Loading

0 comments on commit 5ce36c8

Please sign in to comment.