Skip to content

Commit

Permalink
feat: add support for autosharded pubsub topics
Browse files Browse the repository at this point in the history
tests: use a generator for sharded pubsub topics
set pubsub topic in encoder/decoder based on sharding type
add function for grouping content topics by pubsub topic
add autosharding config to create options
add autoshard rpc endpoints to nwaku and use in tests
set autoshard pubsub topics in all protocols
fix rebase with static sharding
removes unused function
remove console logs
remove autosharding from ShardInfo, add to EncoderOptions
fix enr and encoder/decoder options
test that same application/version hashes to same shard index
update comment on shard field
fix spelling of autosharding
fix content topic protocol in tests
add sharding type alias and function to determine topic in encoders/decoders
move DefaultPubsubTopic from core to interfaces
  • Loading branch information
danisharora099 authored and adklempner committed Dec 21, 2023
1 parent 6dc3882 commit 2bc3735
Show file tree
Hide file tree
Showing 44 changed files with 1,351 additions and 122 deletions.
1 change: 0 additions & 1 deletion packages/core/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
export { DefaultUserAgent } from "./lib/waku.js";
export { DefaultPubsubTopic } from "./lib/constants.js";
export { createEncoder, createDecoder } from "./lib/message/version_0.js";
export type {
Encoder,
Expand Down
6 changes: 3 additions & 3 deletions packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import type {
IBaseProtocol,
Libp2pComponents,
PubsubTopic,
ShardInfo
ShardingParams
} from "@waku/interfaces";
import { DefaultPubsubTopic } from "@waku/interfaces";
import { shardInfoToPubsubTopics } from "@waku/utils";
import { getPeersForProtocol, selectPeerForProtocol } from "@waku/utils/libp2p";

import { DefaultPubsubTopic } from "./constants.js";
import { filterPeers } from "./filterPeers.js";
import { StreamManager } from "./stream_manager.js";

Expand Down Expand Up @@ -97,7 +97,7 @@ export class BaseProtocol implements IBaseProtocol {
return filterPeers(allPeersForProtocol, numPeers, maxBootstrapPeers);
}

initializePubsubTopic(shardInfo?: ShardInfo): PubsubTopic[] {
initializePubsubTopic(shardInfo?: ShardingParams): PubsubTopic[] {
return shardInfo
? shardInfoToPubsubTopics(shardInfo)
: [DefaultPubsubTopic];
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import type {
SingleShardInfo,
Unsubscribe
} from "@waku/interfaces";
import { DefaultPubsubTopic } from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import {
ensurePubsubTopicIsConfigured,
Expand All @@ -30,7 +31,6 @@ import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";

import { BaseProtocol } from "../base_protocol.js";
import { DefaultPubsubTopic } from "../constants.js";

import {
FilterPushRpc,
Expand Down
12 changes: 3 additions & 9 deletions packages/core/src/lib/message/version_0.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ import type {
SingleShardInfo
} from "@waku/interfaces";
import { proto_message as proto } from "@waku/proto";
import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils";

import { DefaultPubsubTopic } from "../constants.js";
import { determinePubsubTopic, Logger } from "@waku/utils";

const log = new Logger("message:version-0");
const OneMillion = BigInt(1_000_000);
Expand Down Expand Up @@ -128,9 +126,7 @@ export function createEncoder({
return new Encoder(
contentTopic,
ephemeral,
pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic,
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
metaSetter
);
}
Expand Down Expand Up @@ -193,9 +189,7 @@ export function createDecoder(
pubsubTopicShardInfo?: SingleShardInfo
): Decoder {
return new Decoder(
pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic,
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
contentTopic
);
}
13 changes: 9 additions & 4 deletions packages/core/src/lib/metadata/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import type { PeerId } from "@libp2p/interface/peer-id";
import { IncomingStreamData } from "@libp2p/interface/stream-handler";
import { encodeRelayShard } from "@waku/enr";
import type { IMetadata, Libp2pComponents, ShardInfo } from "@waku/interfaces";
import type {
IMetadata,
Libp2pComponents,
ShardInfo,
ShardingParams
} from "@waku/interfaces";
import { proto_metadata } from "@waku/proto";
import { Logger } from "@waku/utils";
import all from "it-all";
Expand All @@ -16,9 +21,9 @@ const log = new Logger("metadata");
export const MetadataCodec = "/vac/waku/metadata/1.0.0";

class Metadata extends BaseProtocol {
private readonly shardInfo: ShardInfo;
private readonly shardInfo: ShardingParams;
private libp2pComponents: Libp2pComponents;
constructor(shardInfo: ShardInfo, libp2p: Libp2pComponents) {
constructor(shardInfo: ShardingParams, libp2p: Libp2pComponents) {
super(MetadataCodec, libp2p.components);
this.libp2pComponents = libp2p;
this.shardInfo = shardInfo;
Expand Down Expand Up @@ -99,7 +104,7 @@ class Metadata extends BaseProtocol {
}

export function wakuMetadata(
shardInfo: ShardInfo
shardInfo: ShardingParams
): (components: Libp2pComponents) => IMetadata {
return (components: Libp2pComponents) => new Metadata(shardInfo, components);
}
7 changes: 3 additions & 4 deletions packages/core/src/lib/waku.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@ import type {
IStore,
Libp2p,
PubsubTopic,
ShardInfo,
ShardingParams,
Waku
} from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { DefaultPubsubTopic, Protocols } from "@waku/interfaces";
import { Logger, shardInfoToPubsubTopics } from "@waku/utils";

import { ConnectionManager } from "./connection_manager.js";
import { DefaultPubsubTopic } from "./constants.js";

export const DefaultPingKeepAliveValueSecs = 5 * 60;
export const DefaultRelayKeepAliveValueSecs = 5 * 60;
Expand Down Expand Up @@ -57,7 +56,7 @@ export class WakuNode implements Waku {
constructor(
options: WakuOptions,
libp2p: Libp2p,
pubsubShardInfo?: ShardInfo,
pubsubShardInfo?: ShardingParams,
store?: (libp2p: Libp2p) => IStore,
lightPush?: (libp2p: Libp2p) => ILightPush,
filter?: (libp2p: Libp2p) => IFilter,
Expand Down
File renamed without changes.
1 change: 1 addition & 0 deletions packages/interfaces/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ export * from "./libp2p.js";
export * from "./keep_alive_manager.js";
export * from "./dns_discovery.js";
export * from "./metadata.js";
export * from "./constants.js";
5 changes: 4 additions & 1 deletion packages/interfaces/src/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ import type { PubsubTopic } from "./misc.js";

export interface SingleShardInfo {
clusterId: number;
shard: number;
/**
* Specifying this field indicates to the encoder/decoder that static sharding must be used.
*/
shard?: number;
}

export interface IRateLimitProof {
Expand Down
9 changes: 8 additions & 1 deletion packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ export interface IBaseProtocol {
removeLibp2pEventListener: Libp2p["removeEventListener"];
}

export type ContentTopicInfo = {
clusterId: number;
contentTopics: string[];
};

export type ShardingParams = ShardInfo | ContentTopicInfo;

export type ProtocolCreateOptions = {
/**
* Waku supports usage of multiple pubsub topics. This is achieved through static sharding for now, and auto-sharding in the future.
Expand All @@ -39,7 +46,7 @@ export type ProtocolCreateOptions = {
* See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details.
*
*/
shardInfo?: ShardInfo;
shardInfo?: ShardingParams;
/**
* You can pass options to the `Libp2p` instance used by {@link @waku/core!WakuNode} using the `libp2p` property.
* This property is the same type as the one passed to [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create)
Expand Down
11 changes: 3 additions & 8 deletions packages/message-encryption/src/ecies.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { DefaultPubsubTopic } from "@waku/core";
import { Decoder as DecoderV0 } from "@waku/core/lib/message/version_0";
import type {
EncoderOptions as BaseEncoderOptions,
Expand All @@ -11,7 +10,7 @@ import type {
SingleShardInfo
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils";
import { determinePubsubTopic, Logger } from "@waku/utils";

import { generatePrivateKey } from "./crypto/utils.js";
import { DecodedMessage } from "./decoded_message.js";
Expand Down Expand Up @@ -107,9 +106,7 @@ export function createEncoder({
metaSetter
}: EncoderOptions): Encoder {
return new Encoder(
pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic,
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
contentTopic,
publicKey,
sigPrivKey,
Expand Down Expand Up @@ -200,9 +197,7 @@ export function createDecoder(
pubsubTopicShardInfo?: SingleShardInfo
): Decoder {
return new Decoder(
pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic,
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
contentTopic,
privateKey
);
Expand Down
11 changes: 3 additions & 8 deletions packages/message-encryption/src/symmetric.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { DefaultPubsubTopic } from "@waku/core";
import { Decoder as DecoderV0 } from "@waku/core/lib/message/version_0";
import type {
EncoderOptions as BaseEncoderOptions,
Expand All @@ -11,7 +10,7 @@ import type {
SingleShardInfo
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils";
import { determinePubsubTopic, Logger } from "@waku/utils";

import { generateSymmetricKey } from "./crypto/utils.js";
import { DecodedMessage } from "./decoded_message.js";
Expand Down Expand Up @@ -107,9 +106,7 @@ export function createEncoder({
metaSetter
}: EncoderOptions): Encoder {
return new Encoder(
pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic,
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
contentTopic,
symKey,
sigPrivKey,
Expand Down Expand Up @@ -200,9 +197,7 @@ export function createDecoder(
pubsubTopicShardInfo?: SingleShardInfo
): Decoder {
return new Decoder(
pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic,
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
contentTopic,
symKey
);
Expand Down
2 changes: 1 addition & 1 deletion packages/relay/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types";
import type { PeerId } from "@libp2p/interface/peer-id";
import type { PubSub as Libp2pPubsub } from "@libp2p/interface/pubsub";
import { sha256 } from "@noble/hashes/sha256";
import { DefaultPubsubTopic } from "@waku/core";
import { DefaultPubsubTopic } from "@waku/interfaces";
import {
ActiveSubscriptions,
Callback,
Expand Down
2 changes: 1 addition & 1 deletion packages/relay/src/topic_only_message.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { DefaultPubsubTopic } from "@waku/core";
import { DefaultPubsubTopic } from "@waku/interfaces";
import type {
IDecodedMessage,
IDecoder,
Expand Down
4 changes: 2 additions & 2 deletions packages/sdk/src/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import type {
LightNode,
ProtocolCreateOptions,
RelayNode,
ShardInfo
ShardingParams
} from "@waku/interfaces";
import { wakuPeerExchangeDiscovery } from "@waku/peer-exchange";
import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "@waku/relay";
Expand Down Expand Up @@ -180,7 +180,7 @@ type MetadataService = {
};

export async function defaultLibp2p(
shardInfo?: ShardInfo,
shardInfo?: ShardingParams,
wakuGossipSub?: PubsubService["pubsub"],
options?: Partial<CreateLibp2pOptions>,
userAgent?: string
Expand Down
46 changes: 45 additions & 1 deletion packages/tests/src/message_collector.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { DecodedMessage, DefaultPubsubTopic } from "@waku/core";
import { DecodedMessage } from "@waku/core";
import { DefaultPubsubTopic } from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
import { AssertionError, expect } from "chai";
Expand Down Expand Up @@ -103,6 +104,49 @@ export class MessageCollector {
}
}

async waitForMessagesAutosharding(
numMessages: number,
options?: {
contentTopic: string;
timeoutDuration?: number;
exact?: boolean;
}
): Promise<boolean> {
const startTime = Date.now();
const timeoutDuration = options?.timeoutDuration || 400;
const exact = options?.exact || false;

while (this.count < numMessages) {
if (this.nwaku) {
try {
this.list = await this.nwaku.messagesAutosharding(
options!.contentTopic
);
} catch (error) {
log.error(`Can't retrieve messages because of ${error}`);
await delay(10);
}
}

if (Date.now() - startTime > timeoutDuration * numMessages) {
return false;
}

await delay(10);
}

if (exact) {
if (this.count == numMessages) {
return true;
} else {
log.warn(`Was expecting exactly ${numMessages} messages`);
return false;
}
} else {
return true;
}
}

// Verifies a received message against expected values.
verifyReceivedMessage(
index: number,
Expand Down
Loading

0 comments on commit 2bc3735

Please sign in to comment.