From 771440c58c1a6f3b748f4b73785874b1ca984819 Mon Sep 17 00:00:00 2001 From: Sasha Date: Fri, 11 Oct 2024 19:51:04 +0200 Subject: [PATCH 01/11] feat: lighten retry logic for LightPush --- packages/interfaces/src/sender.ts | 17 ++++- .../protocols/light_push/light_push.spec.ts | 10 +++ .../src/protocols/light_push/light_push.ts | 61 +++++++++++------ packages/sdk/src/reliability_monitor/index.ts | 15 ----- .../sdk/src/reliability_monitor/sender.ts | 65 ------------------- 5 files changed, 67 insertions(+), 101 deletions(-) create mode 100644 packages/sdk/src/protocols/light_push/light_push.spec.ts delete mode 100644 packages/sdk/src/reliability_monitor/sender.ts diff --git a/packages/interfaces/src/sender.ts b/packages/interfaces/src/sender.ts index c195403a7e..19270d5ebd 100644 --- a/packages/interfaces/src/sender.ts +++ b/packages/interfaces/src/sender.ts @@ -1,10 +1,23 @@ import type { IEncoder, IMessage } from "./message.js"; -import { ProtocolUseOptions, SDKProtocolResult } from "./protocols.js"; +import { SDKProtocolResult } from "./protocols.js"; + +export type ISenderOptions = { + /** + * Enables retry of a message that was failed to be sent. + * @default false + */ + autoRetry?: boolean; + /** + * Sets number of attempts if `autoRetry` is enabled. + * @default 3 + */ + maxAttempts?: number; +}; export interface ISender { send: ( encoder: IEncoder, message: IMessage, - sendOptions?: ProtocolUseOptions + sendOptions?: ISenderOptions ) => Promise; } diff --git a/packages/sdk/src/protocols/light_push/light_push.spec.ts b/packages/sdk/src/protocols/light_push/light_push.spec.ts new file mode 100644 index 0000000000..9f529f181e --- /dev/null +++ b/packages/sdk/src/protocols/light_push/light_push.spec.ts @@ -0,0 +1,10 @@ +// TODO: add them after decoupling `BaseProtocolSDK` from LightPush +describe("LightPush SDK", () => { + it("should fail to send if pubsub topics are misconfigured"); + + it("should fail to send if no connected peers found"); + + it("should send to number of used peers"); + + it("should retry on failure if specified"); +}); diff --git a/packages/sdk/src/protocols/light_push/light_push.ts b/packages/sdk/src/protocols/light_push/light_push.ts index 61e72487f7..8da6ea31de 100644 --- a/packages/sdk/src/protocols/light_push/light_push.ts +++ b/packages/sdk/src/protocols/light_push/light_push.ts @@ -6,28 +6,33 @@ import { LightPushCore } from "@waku/core"; import { + type CoreProtocolResult, Failure, type IEncoder, ILightPush, type IMessage, + type ISenderOptions, type Libp2p, type ProtocolCreateOptions, ProtocolError, - ProtocolUseOptions, SDKProtocolResult } from "@waku/interfaces"; import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils"; -import { ReliabilityMonitorManager } from "../../reliability_monitor/index.js"; -import { SenderReliabilityMonitor } from "../../reliability_monitor/sender.js"; import { BaseProtocolSDK } from "../base_protocol.js"; const log = new Logger("sdk:light-push"); -class LightPush extends BaseProtocolSDK implements ILightPush { - public readonly protocol: LightPushCore; +const DEFAULT_MAX_ATTEMPTS = 3; +const DEFAULT_SEND_OPTIONS: ISenderOptions = { + autoRetry: false, + maxAttempts: DEFAULT_MAX_ATTEMPTS +}; + +type RetryCallback = (peer: Peer) => Promise; - private readonly reliabilityMonitor: SenderReliabilityMonitor; +export class LightPush extends BaseProtocolSDK implements ILightPush { + public readonly protocol: LightPushCore; public constructor( connectionManager: ConnectionManager, @@ -42,17 +47,13 @@ class LightPush extends BaseProtocolSDK implements ILightPush { } ); - this.reliabilityMonitor = ReliabilityMonitorManager.createSenderMonitor( - this.renewPeer.bind(this) - ); - this.protocol = this.core as LightPushCore; } public async send( encoder: IEncoder, message: IMessage, - _options?: ProtocolUseOptions + options: ISenderOptions = DEFAULT_SEND_OPTIONS ): Promise { const successes: PeerId[] = []; const failures: Failure[] = []; @@ -105,14 +106,10 @@ class LightPush extends BaseProtocolSDK implements ILightPush { if (failure) { failures.push(failure); - const connectedPeer = this.connectedPeers.find((connectedPeer) => - connectedPeer.id.equals(failure.peerId) - ); - - if (connectedPeer) { - void this.reliabilityMonitor.attemptRetriesOrRenew( - connectedPeer.id, - () => this.protocol.send(encoder, message, connectedPeer) + if (options?.autoRetry) { + void this.attemptRetries( + (peer: Peer) => this.protocol.send(encoder, message, peer), + options.maxAttempts ); } } @@ -129,6 +126,32 @@ class LightPush extends BaseProtocolSDK implements ILightPush { }; } + private async attemptRetries( + fn: RetryCallback, + maxAttempts?: number + ): Promise { + maxAttempts = maxAttempts || DEFAULT_MAX_ATTEMPTS; + const connectedPeers = await this.getConnectedPeers(); + + if (connectedPeers.length === 0) { + log.warn("Cannot retry with no connected peers."); + return; + } + + for (let i = 0; i < maxAttempts; i++) { + const peer = connectedPeers[i % connectedPeers.length]; // always present as we checked for the length already + const response = await fn(peer); + + if (response.success) { + return; + } + + log.info( + `Attempted retry for peer:${peer.id} failed with:${response?.failure?.error}` + ); + } + } + private async getConnectedPeers(): Promise { const peerIDs = this.libp2p.getPeers(); diff --git a/packages/sdk/src/reliability_monitor/index.ts b/packages/sdk/src/reliability_monitor/index.ts index f5bc48879d..f75c5fdc92 100644 --- a/packages/sdk/src/reliability_monitor/index.ts +++ b/packages/sdk/src/reliability_monitor/index.ts @@ -7,14 +7,12 @@ import { } from "@waku/interfaces"; import { ReceiverReliabilityMonitor } from "./receiver.js"; -import { SenderReliabilityMonitor } from "./sender.js"; export class ReliabilityMonitorManager { private static receiverMonitors: Map< PubsubTopic, ReceiverReliabilityMonitor > = new Map(); - private static senderMonitor: SenderReliabilityMonitor | undefined; public static createReceiverMonitor( pubsubTopic: PubsubTopic, @@ -44,22 +42,10 @@ export class ReliabilityMonitorManager { return monitor; } - public static createSenderMonitor( - renewPeer: (peerId: PeerId) => Promise - ): SenderReliabilityMonitor { - if (!ReliabilityMonitorManager.senderMonitor) { - ReliabilityMonitorManager.senderMonitor = new SenderReliabilityMonitor( - renewPeer - ); - } - return ReliabilityMonitorManager.senderMonitor; - } - private constructor() {} public static stop(pubsubTopic: PubsubTopic): void { this.receiverMonitors.delete(pubsubTopic); - this.senderMonitor = undefined; } public static stopAll(): void { @@ -67,7 +53,6 @@ export class ReliabilityMonitorManager { monitor.setMaxMissedMessagesThreshold(undefined); monitor.setMaxPingFailures(undefined); this.receiverMonitors.delete(pubsubTopic); - this.senderMonitor = undefined; } } } diff --git a/packages/sdk/src/reliability_monitor/sender.ts b/packages/sdk/src/reliability_monitor/sender.ts deleted file mode 100644 index 914c321da8..0000000000 --- a/packages/sdk/src/reliability_monitor/sender.ts +++ /dev/null @@ -1,65 +0,0 @@ -import type { Peer, PeerId } from "@libp2p/interface"; -import { CoreProtocolResult, PeerIdStr } from "@waku/interfaces"; -import { Logger } from "@waku/utils"; - -const log = new Logger("sdk:sender:reliability_monitor"); - -const DEFAULT_MAX_ATTEMPTS_BEFORE_RENEWAL = 3; - -export class SenderReliabilityMonitor { - private attempts: Map = new Map(); - private readonly maxAttemptsBeforeRenewal = - DEFAULT_MAX_ATTEMPTS_BEFORE_RENEWAL; - - public constructor( - private renewPeer: (peerId: PeerId) => Promise - ) {} - - public async attemptRetriesOrRenew( - peerId: PeerId, - protocolSend: () => Promise - ): Promise { - const peerIdStr = peerId.toString(); - const currentAttempts = this.attempts.get(peerIdStr) || 0; - this.attempts.set(peerIdStr, currentAttempts + 1); - - if (currentAttempts + 1 < this.maxAttemptsBeforeRenewal) { - try { - const result = await protocolSend(); - if (result.success) { - log.info(`Successfully sent message after retry to ${peerIdStr}`); - this.attempts.delete(peerIdStr); - } else { - log.error( - `Failed to send message after retry to ${peerIdStr}: ${result.failure}` - ); - await this.attemptRetriesOrRenew(peerId, protocolSend); - } - } catch (error) { - log.error( - `Failed to send message after retry to ${peerIdStr}: ${error}` - ); - await this.attemptRetriesOrRenew(peerId, protocolSend); - } - } else { - try { - const newPeer = await this.renewPeer(peerId); - if (newPeer) { - log.info( - `Renewed peer ${peerId.toString()} to ${newPeer.id.toString()}` - ); - - this.attempts.delete(peerIdStr); - this.attempts.set(newPeer.id.toString(), 0); - await protocolSend(); - } else { - log.error( - `Failed to renew peer ${peerId.toString()}: New peer is undefined` - ); - } - } catch (error) { - log.error(`Failed to renew peer ${peerId.toString()}: ${error}`); - } - } - } -} From 92d0a282c214d740c7988afe4efe8c65ddce20af Mon Sep 17 00:00:00 2001 From: Sasha Date: Fri, 11 Oct 2024 21:55:38 +0200 Subject: [PATCH 02/11] update tests --- packages/tests/tests/health-manager/node.spec.ts | 4 +--- .../tests/tests/light-push/peer_management.spec.ts | 10 +++------- .../single_node/multiple_pubsub.node.spec.ts | 10 +++------- 3 files changed, 7 insertions(+), 17 deletions(-) diff --git a/packages/tests/tests/health-manager/node.spec.ts b/packages/tests/tests/health-manager/node.spec.ts index f29e4d98b9..9645a86b48 100644 --- a/packages/tests/tests/health-manager/node.spec.ts +++ b/packages/tests/tests/health-manager/node.spec.ts @@ -44,9 +44,7 @@ describe("Node Health Status Matrix Tests", function () { ); if (lightPushPeers > 0) { - await waku.lightPush.send(TestEncoder, messagePayload, { - forceUseAllPeers: true - }); + await waku.lightPush.send(TestEncoder, messagePayload); } if (filterPeers > 0) { diff --git a/packages/tests/tests/light-push/peer_management.spec.ts b/packages/tests/tests/light-push/peer_management.spec.ts index 218e7a6a89..d837b28aed 100644 --- a/packages/tests/tests/light-push/peer_management.spec.ts +++ b/packages/tests/tests/light-push/peer_management.spec.ts @@ -50,13 +50,9 @@ describe("Waku Light Push: Connection Management: E2E", function () { // skipped because of https://github.com/waku-org/js-waku/pull/2155#discussion_r1787452696 it.skip("Failed peers are renewed", async function () { // send a lightpush request -- should have all successes - const response1 = await waku.lightPush.send( - encoder, - { - payload: utf8ToBytes("Hello_World") - }, - { forceUseAllPeers: true } - ); + const response1 = await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello_World") + }); expect(response1.successes.length).to.be.equal( waku.lightPush.numPeersToUse diff --git a/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts b/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts index 52cb6f14f8..02e6c7668a 100644 --- a/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts @@ -432,13 +432,9 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () const { failures: f1 } = await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); - const { failures: f2 } = await waku.lightPush.send( - customEncoder2, - { - payload: utf8ToBytes("M2") - }, - { forceUseAllPeers: true } - ); + const { failures: f2 } = await waku.lightPush.send(customEncoder2, { + payload: utf8ToBytes("M2") + }); expect(f1).to.be.empty; expect(f2).to.be.empty; From 9d1afd75f415aa0b21d8a57d0c18a090a67b58af Mon Sep 17 00:00:00 2001 From: Sasha Date: Fri, 11 Oct 2024 23:27:48 +0200 Subject: [PATCH 03/11] remove base protocol sdk from light push, add unit tests for light push --- packages/interfaces/src/light_push.ts | 5 +- packages/sdk/src/protocols/base_protocol.ts | 2 +- .../protocols/light_push/light_push.spec.ts | 170 +++++++++++++++++- .../src/protocols/light_push/light_push.ts | 17 +- 4 files changed, 175 insertions(+), 19 deletions(-) diff --git a/packages/interfaces/src/light_push.ts b/packages/interfaces/src/light_push.ts index 32b29048e6..b9350c6b6b 100644 --- a/packages/interfaces/src/light_push.ts +++ b/packages/interfaces/src/light_push.ts @@ -1,5 +1,4 @@ -import { IBaseProtocolCore, IBaseProtocolSDK } from "./protocols.js"; +import { IBaseProtocolCore } from "./protocols.js"; import type { ISender } from "./sender.js"; -export type ILightPush = ISender & - IBaseProtocolSDK & { protocol: IBaseProtocolCore }; +export type ILightPush = ISender & { protocol: IBaseProtocolCore }; diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts index 062583404b..4550072b84 100644 --- a/packages/sdk/src/protocols/base_protocol.ts +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -11,7 +11,7 @@ interface Options { maintainPeersInterval?: number; } -const DEFAULT_NUM_PEERS_TO_USE = 2; +export const DEFAULT_NUM_PEERS_TO_USE = 2; const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000; export class BaseProtocolSDK implements IBaseProtocolSDK { diff --git a/packages/sdk/src/protocols/light_push/light_push.spec.ts b/packages/sdk/src/protocols/light_push/light_push.spec.ts index 9f529f181e..afc2da78e2 100644 --- a/packages/sdk/src/protocols/light_push/light_push.spec.ts +++ b/packages/sdk/src/protocols/light_push/light_push.spec.ts @@ -1,10 +1,170 @@ -// TODO: add them after decoupling `BaseProtocolSDK` from LightPush +import { Peer } from "@libp2p/interface"; +import { + ConnectionManager, + createEncoder, + Encoder, + LightPushCodec +} from "@waku/core"; +import { Libp2p, ProtocolError } from "@waku/interfaces"; +import { utf8ToBytes } from "@waku/utils/bytes"; +import { expect } from "chai"; +import sinon from "sinon"; + +import { LightPush } from "./light_push.js"; + +const PUBSUB_TOPIC = "/waku/2/rs/1/4"; +const CONTENT_TOPIC = "/test/1/waku-light-push/utf8"; + describe("LightPush SDK", () => { - it("should fail to send if pubsub topics are misconfigured"); + let libp2p: Libp2p; + let encoder: Encoder; + let lightPush: LightPush; + + beforeEach(() => { + libp2p = mockLibp2p(); + encoder = createEncoder({ contentTopic: CONTENT_TOPIC }); + lightPush = mockLightPush({ libp2p }); + }); + + it("should fail to send if pubsub topics are misconfigured", async () => { + lightPush = mockLightPush({ libp2p, pubsubTopics: ["/wrong"] }); + + const result = await lightPush.send(encoder, { + payload: utf8ToBytes("test") + }); + const failures = result.failures ?? []; + + expect(failures.length).to.be.eq(1); + expect(failures.some((v) => v.error === ProtocolError.TOPIC_NOT_CONFIGURED)) + .to.be.true; + }); + + it("should fail to send if no connected peers found", async () => { + const result = await lightPush.send(encoder, { + payload: utf8ToBytes("test") + }); + const failures = result.failures ?? []; + + expect(failures.length).to.be.eq(1); + expect(failures.some((v) => v.error === ProtocolError.NO_PEER_AVAILABLE)).to + .be.true; + }); + + it("should send to specified number of peers of used peers", async () => { + libp2p = mockLibp2p({ + peers: [mockPeer("1"), mockPeer("2"), mockPeer("3"), mockPeer("4")] + }); + + // check default value that should be 2 + lightPush = mockLightPush({ libp2p }); + let sendSpy = sinon.spy( + (_encoder: any, _message: any, peer: Peer) => + ({ success: peer.id }) as any + ); + lightPush.protocol.send = sendSpy; + + let result = await lightPush.send(encoder, { + payload: utf8ToBytes("test") + }); + + expect(sendSpy.calledTwice).to.be.true; + expect(result.successes?.length).to.be.eq(2); + + // check if setting another value works + lightPush = mockLightPush({ libp2p, numPeersToUse: 3 }); + sendSpy = sinon.spy( + (_encoder: any, _message: any, peer: Peer) => + ({ success: peer.id }) as any + ); + lightPush.protocol.send = sendSpy; - it("should fail to send if no connected peers found"); + result = await lightPush.send(encoder, { payload: utf8ToBytes("test") }); - it("should send to number of used peers"); + expect(sendSpy.calledThrice).to.be.true; + expect(result.successes?.length).to.be.eq(3); + }); - it("should retry on failure if specified"); + it("should retry on failure if specified", async () => { + libp2p = mockLibp2p({ + peers: [mockPeer("1"), mockPeer("2")] + }); + + lightPush = mockLightPush({ libp2p }); + let sendSpy = sinon.spy((_encoder: any, _message: any, peer: Peer) => { + if (peer.id.toString() === "1") { + return { success: peer.id }; + } + + return { failure: { error: "problem" } }; + }); + lightPush.protocol.send = sendSpy as any; + const attemptRetriesSpy = sinon.spy(lightPush["attemptRetries"]); + lightPush["attemptRetries"] = attemptRetriesSpy; + + const result = await lightPush.send( + encoder, + { payload: utf8ToBytes("test") }, + { autoRetry: true } + ); + + expect(attemptRetriesSpy.calledOnce).to.be.true; + expect(result.successes?.length).to.be.eq(1); + expect(result.failures?.length).to.be.eq(1); + + sendSpy = sinon.spy(() => ({ failure: { error: "problem" } })) as any; + await lightPush["attemptRetries"](sendSpy as any); + + expect(sendSpy.callCount).to.be.eq(3); + + sendSpy = sinon.spy(() => ({ failure: { error: "problem" } })) as any; + await lightPush["attemptRetries"](sendSpy as any, 2); + + expect(sendSpy.callCount).to.be.eq(2); + }); }); + +type MockLibp2pOptions = { + peers?: Peer[]; +}; + +function mockLibp2p(options?: MockLibp2pOptions): Libp2p { + const peers = options?.peers || []; + const peerStore = { + get: (id: any) => Promise.resolve(peers.find((p) => p.id === id)) + }; + + return { + peerStore, + getPeers: () => peers.map((p) => p.id), + components: { + events: new EventTarget(), + connectionManager: { + getConnections: () => [] + } as any, + peerStore + } + } as unknown as Libp2p; +} + +type MockLightPushOptions = { + libp2p: Libp2p; + pubsubTopics?: string[]; + numPeersToUse?: number; +}; + +function mockLightPush(options: MockLightPushOptions): LightPush { + return new LightPush( + { + configuredPubsubTopics: options.pubsubTopics || [PUBSUB_TOPIC] + } as ConnectionManager, + options.libp2p, + { numPeersToUse: options.numPeersToUse } + ); +} + +function mockPeer(id: string): Peer { + return { + id, + protocols: [LightPushCodec] + } as unknown as Peer; +} diff --git a/packages/sdk/src/protocols/light_push/light_push.ts b/packages/sdk/src/protocols/light_push/light_push.ts index 8da6ea31de..9764111f18 100644 --- a/packages/sdk/src/protocols/light_push/light_push.ts +++ b/packages/sdk/src/protocols/light_push/light_push.ts @@ -19,7 +19,7 @@ import { } from "@waku/interfaces"; import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils"; -import { BaseProtocolSDK } from "../base_protocol.js"; +import { DEFAULT_NUM_PEERS_TO_USE } from "../base_protocol.js"; const log = new Logger("sdk:light-push"); @@ -31,7 +31,8 @@ const DEFAULT_SEND_OPTIONS: ISenderOptions = { type RetryCallback = (peer: Peer) => Promise; -export class LightPush extends BaseProtocolSDK implements ILightPush { +export class LightPush implements ILightPush { + private numPeersToUse: number = DEFAULT_NUM_PEERS_TO_USE; public readonly protocol: LightPushCore; public constructor( @@ -39,15 +40,11 @@ export class LightPush extends BaseProtocolSDK implements ILightPush { private libp2p: Libp2p, options?: ProtocolCreateOptions ) { - super( - new LightPushCore(connectionManager.configuredPubsubTopics, libp2p), - connectionManager, - { - numPeersToUse: options?.numPeersToUse - } + this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE; + this.protocol = new LightPushCore( + connectionManager.configuredPubsubTopics, + libp2p ); - - this.protocol = this.core as LightPushCore; } public async send( From 9ec36c3f225175f653a7a3e002e4f921532ed4ac Mon Sep 17 00:00:00 2001 From: Sasha Date: Fri, 11 Oct 2024 23:35:04 +0200 Subject: [PATCH 04/11] remove replaced test --- .../tests/light-push/peer_management.spec.ts | 84 ------------------- 1 file changed, 84 deletions(-) delete mode 100644 packages/tests/tests/light-push/peer_management.spec.ts diff --git a/packages/tests/tests/light-push/peer_management.spec.ts b/packages/tests/tests/light-push/peer_management.spec.ts deleted file mode 100644 index d837b28aed..0000000000 --- a/packages/tests/tests/light-push/peer_management.spec.ts +++ /dev/null @@ -1,84 +0,0 @@ -import { LightNode } from "@waku/interfaces"; -import { createEncoder, utf8ToBytes } from "@waku/sdk"; -import { expect } from "chai"; -import { describe } from "mocha"; - -import { - afterEachCustom, - beforeEachCustom, - DefaultTestShardInfo, - DefaultTestSingleShardInfo, - runMultipleNodes, - ServiceNodesFleet, - teardownNodesWithRedundancy -} from "../../src/index.js"; -import { TestContentTopic } from "../filter/utils.js"; - -describe("Waku Light Push: Connection Management: E2E", function () { - this.timeout(15000); - let waku: LightNode; - let serviceNodes: ServiceNodesFleet; - - beforeEachCustom(this, async () => { - [serviceNodes, waku] = await runMultipleNodes( - this.ctx, - DefaultTestShardInfo, - { lightpush: true, filter: true }, - undefined, - 5 - ); - }); - - afterEachCustom(this, async () => { - await teardownNodesWithRedundancy(serviceNodes, waku); - }); - - const encoder = createEncoder({ - pubsubTopicShardInfo: DefaultTestSingleShardInfo, - contentTopic: TestContentTopic - }); - - it("should push to needed amount of connections", async function () { - const { successes, failures } = await waku.lightPush.send(encoder, { - payload: utf8ToBytes("Hello_World") - }); - - expect(successes.length).to.be.equal(waku.lightPush.numPeersToUse); - expect(failures?.length || 0).to.equal(0); - }); - - // skipped because of https://github.com/waku-org/js-waku/pull/2155#discussion_r1787452696 - it.skip("Failed peers are renewed", async function () { - // send a lightpush request -- should have all successes - const response1 = await waku.lightPush.send(encoder, { - payload: utf8ToBytes("Hello_World") - }); - - expect(response1.successes.length).to.be.equal( - waku.lightPush.numPeersToUse - ); - - const { successes, failures } = await waku.lightPush.send(encoder, { - payload: utf8ToBytes("Hello_World") - }); - - expect(successes.length).to.be.equal(1); - expect(failures?.length || 0).to.equal(0); - }); - - it("should fail to send if no connections available", async function () { - const connections = waku.libp2p.getConnections(); - await Promise.all( - connections.map((c) => - waku.connectionManager.dropConnection(c.remotePeer) - ) - ); - - const { successes, failures } = await waku.lightPush.send(encoder, { - payload: utf8ToBytes("Hello_World") - }); - - expect(successes.length).to.be.equal(0); - expect(failures?.length).to.equal(1); - }); -}); From 02976823def29127a0238d425d5912aae94df195 Mon Sep 17 00:00:00 2001 From: Sasha Date: Sat, 12 Oct 2024 14:59:01 +0200 Subject: [PATCH 05/11] ensure numPeersToUse is respected --- packages/sdk/src/protocols/base_protocol.ts | 6 +++--- packages/sdk/src/waku/waku.ts | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts index 4550072b84..3ef5cc1df8 100644 --- a/packages/sdk/src/protocols/base_protocol.ts +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -29,12 +29,12 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { ) { this.log = new Logger(`sdk:${core.multicodec}`); - this.peerManager = new PeerManager(connectionManager, core, this.log); - this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE; const maintainPeersInterval = options?.maintainPeersInterval ?? DEFAULT_MAINTAIN_PEERS_INTERVAL; + this.peerManager = new PeerManager(connectionManager, core, this.log); + this.log.info( `Initializing BaseProtocolSDK with numPeersToUse: ${this.numPeersToUse}, maintainPeersInterval: ${maintainPeersInterval}ms` ); @@ -42,7 +42,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { } public get connectedPeers(): Peer[] { - return this.peerManager.getPeers(); + return this.peerManager.getPeers().slice(0, this.numPeersToUse); } /** diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index c461a63ab3..1c450420cf 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -72,7 +72,7 @@ export class WakuNode implements IWaku { public constructor( public readonly pubsubTopics: PubsubTopic[], - options: WakuOptions, + options: CreateWakuNodeOptions, libp2p: Libp2p, protocolsEnabled: ProtocolsEnabled, relay?: IRelay @@ -111,12 +111,12 @@ export class WakuNode implements IWaku { } if (protocolsEnabled.lightpush) { - const lightPush = wakuLightPush(this.connectionManager); + const lightPush = wakuLightPush(this.connectionManager, options); this.lightPush = lightPush(libp2p); } if (protocolsEnabled.filter) { - const filter = wakuFilter(this.connectionManager); + const filter = wakuFilter(this.connectionManager, options); this.filter = filter(libp2p); } From a6d503ee0f66e0badb2c9425c1e5ad58a717cedb Mon Sep 17 00:00:00 2001 From: Sasha Date: Sun, 13 Oct 2024 16:36:15 +0200 Subject: [PATCH 06/11] turn off check for missing messages --- packages/sdk/src/reliability_monitor/receiver.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sdk/src/reliability_monitor/receiver.ts b/packages/sdk/src/reliability_monitor/receiver.ts index 400485a0e9..7876080bbb 100644 --- a/packages/sdk/src/reliability_monitor/receiver.ts +++ b/packages/sdk/src/reliability_monitor/receiver.ts @@ -106,7 +106,6 @@ export class ReceiverReliabilityMonitor { pubsubTopic, peerIdStr ); - void this.checkAndRenewPeers(); return alreadyReceived; } @@ -138,6 +137,7 @@ export class ReceiverReliabilityMonitor { return alreadyReceived; } + // @ts-expect-error Turned off until properly investigated and dogfooded: https://github.com/waku-org/js-waku/issues/2075 private async checkAndRenewPeers(): Promise { for (const hash of this.receivedMessagesHashes.all) { for (const [peerIdStr, hashes] of Object.entries( From cc089aae24edee3ba3d401d0064e57ebb9e6ae55 Mon Sep 17 00:00:00 2001 From: Sasha Date: Sun, 13 Oct 2024 16:56:55 +0200 Subject: [PATCH 07/11] fix recurring ping --- .../protocols/filter/subscription_manager.ts | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/packages/sdk/src/protocols/filter/subscription_manager.ts b/packages/sdk/src/protocols/filter/subscription_manager.ts index 71dd9959f0..51b32e1e9c 100644 --- a/packages/sdk/src/protocols/filter/subscription_manager.ts +++ b/packages/sdk/src/protocols/filter/subscription_manager.ts @@ -30,7 +30,9 @@ const log = new Logger("sdk:filter:subscription_manager"); export class SubscriptionManager implements ISubscription { private reliabilityMonitor: ReceiverReliabilityMonitor; - private keepAliveTimer: number | null = null; + private keepAliveTimeout: number = DEFAULT_KEEP_ALIVE; + private keepAliveInterval: ReturnType | null = null; + private subscriptionCallbacks: Map< ContentTopic, SubscriptionCallback @@ -67,7 +69,7 @@ export class SubscriptionManager implements ISubscription { options.maxMissedMessagesThreshold ); this.reliabilityMonitor.setMaxPingFailures(options.pingsBeforePeerRenewed); - this.keepAliveTimer = options.keepAlive || DEFAULT_KEEP_ALIVE; + this.keepAliveTimeout = options.keepAlive || DEFAULT_KEEP_ALIVE; const decodersArray = Array.isArray(decoders) ? decoders : [decoders]; @@ -112,7 +114,7 @@ export class SubscriptionManager implements ISubscription { this.subscriptionCallbacks.set(contentTopic, subscriptionCallback); }); - this.startSubscriptionsMaintenance(this.keepAliveTimer); + this.startSubscriptionsMaintenance(this.keepAliveTimeout); return finalResult; } @@ -254,9 +256,9 @@ export class SubscriptionManager implements ISubscription { } } - private startSubscriptionsMaintenance(interval: number): void { + private startSubscriptionsMaintenance(timeout: number): void { log.info("Starting subscriptions maintenance"); - this.startKeepAlivePings(interval); + this.startKeepAlivePings(timeout); this.startConnectionListener(); } @@ -295,31 +297,29 @@ export class SubscriptionManager implements ISubscription { log.error(`networkStateListener failed to recover: ${err}`); } - this.startKeepAlivePings(this.keepAliveTimer || DEFAULT_KEEP_ALIVE); + this.startKeepAlivePings(this.keepAliveTimeout); } - private startKeepAlivePings(interval: number): void { - if (this.keepAliveTimer) { + private startKeepAlivePings(timeout: number): void { + if (this.keepAliveInterval) { log.info("Recurring pings already set up."); return; } - this.keepAliveTimer = setInterval(() => { - void this.ping() - .then(() => log.info("Keep-alive ping successful")) - .catch((error) => log.error("Error in keep-alive ping cycle:", error)); - }, interval) as unknown as number; + this.keepAliveInterval = setInterval(() => { + void this.ping(); + }, timeout); } private stopKeepAlivePings(): void { - if (!this.keepAliveTimer) { + if (!this.keepAliveInterval) { log.info("Already stopped recurring pings."); return; } log.info("Stopping recurring pings."); - clearInterval(this.keepAliveTimer); - this.keepAliveTimer = null; + clearInterval(this.keepAliveInterval); + this.keepAliveInterval = null; } } From 857e9de9707ff12c62447af4bf1270382b591b71 Mon Sep 17 00:00:00 2001 From: Sasha Date: Sun, 13 Oct 2024 23:28:17 +0200 Subject: [PATCH 08/11] add useful logs --- .../sdk/src/protocols/filter/subscription_manager.ts | 11 +++++++---- packages/sdk/src/reliability_monitor/receiver.ts | 3 +++ 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/packages/sdk/src/protocols/filter/subscription_manager.ts b/packages/sdk/src/protocols/filter/subscription_manager.ts index 51b32e1e9c..7087ee8e84 100644 --- a/packages/sdk/src/protocols/filter/subscription_manager.ts +++ b/packages/sdk/src/protocols/filter/subscription_manager.ts @@ -242,18 +242,21 @@ export class SubscriptionManager implements ISubscription { let result; try { result = await this.protocol.ping(peer); - return result; } catch (error) { - return { + result = { success: null, failure: { peerId, error: ProtocolError.GENERIC_FAIL } }; - } finally { - void this.reliabilityMonitor.handlePingResult(peerId, result); } + + log.info( + `Received result from filter ping peerId:${peerId.toString()}\tsuccess:${result.success?.toString()}\tfailure:${result.failure?.error}` + ); + await this.reliabilityMonitor.handlePingResult(peerId, result); + return result; } private startSubscriptionsMaintenance(timeout: number): void { diff --git a/packages/sdk/src/reliability_monitor/receiver.ts b/packages/sdk/src/reliability_monitor/receiver.ts index 7876080bbb..ff48b6a06f 100644 --- a/packages/sdk/src/reliability_monitor/receiver.ts +++ b/packages/sdk/src/reliability_monitor/receiver.ts @@ -88,6 +88,9 @@ export class ReceiverReliabilityMonitor { if (failures >= this.maxPingFailures) { try { + log.info( + `Attempting to renew ${peerId.toString()} due to ping failures.` + ); await this.renewAndSubscribePeer(peerId); this.peerFailures.delete(peerId.toString()); } catch (error) { From 9552c7cab142d25c7b5dd8ad563ad0be550399d5 Mon Sep 17 00:00:00 2001 From: Sasha Date: Wed, 16 Oct 2024 01:09:17 +0200 Subject: [PATCH 09/11] skip tests --- packages/tests/tests/health-manager/node.spec.ts | 3 ++- packages/tests/tests/health-manager/protocols.spec.ts | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/tests/tests/health-manager/node.spec.ts b/packages/tests/tests/health-manager/node.spec.ts index 9645a86b48..6e50522a17 100644 --- a/packages/tests/tests/health-manager/node.spec.ts +++ b/packages/tests/tests/health-manager/node.spec.ts @@ -17,7 +17,8 @@ import { TestShardInfo } from "./utils.js"; -describe("Node Health Status Matrix Tests", function () { +// TODO(weboko): resolve https://github.com/waku-org/js-waku/issues/2186 +describe.skip("Node Health Status Matrix Tests", function () { let waku: LightNode; let serviceNodes: ServiceNode[]; diff --git a/packages/tests/tests/health-manager/protocols.spec.ts b/packages/tests/tests/health-manager/protocols.spec.ts index 09f0febe49..95d4ec5d71 100644 --- a/packages/tests/tests/health-manager/protocols.spec.ts +++ b/packages/tests/tests/health-manager/protocols.spec.ts @@ -17,7 +17,8 @@ import { const NUM_NODES = [0, 1, 2, 3]; -describe("Health Manager", function () { +// TODO(weboko): resolve https://github.com/waku-org/js-waku/issues/2186 +describe.skip("Health Manager", function () { this.timeout(10_000); let waku: LightNode; From 3b8c315979717de422917fec611961f337e2ba47 Mon Sep 17 00:00:00 2001 From: Sasha Date: Wed, 16 Oct 2024 01:12:18 +0200 Subject: [PATCH 10/11] remove comment --- packages/sdk/src/reliability_monitor/receiver.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sdk/src/reliability_monitor/receiver.ts b/packages/sdk/src/reliability_monitor/receiver.ts index ff48b6a06f..61d700817f 100644 --- a/packages/sdk/src/reliability_monitor/receiver.ts +++ b/packages/sdk/src/reliability_monitor/receiver.ts @@ -140,7 +140,7 @@ export class ReceiverReliabilityMonitor { return alreadyReceived; } - // @ts-expect-error Turned off until properly investigated and dogfooded: https://github.com/waku-org/js-waku/issues/2075 + // @ts-expect-error Turned off until properly investigated: https://github.com/waku-org/js-waku/issues/2075 private async checkAndRenewPeers(): Promise { for (const hash of this.receivedMessagesHashes.all) { for (const [peerIdStr, hashes] of Object.entries( From 9f0836334560803a2f00e25fb6e4f78426aa5db8 Mon Sep 17 00:00:00 2001 From: Sasha <118575614+weboko@users.noreply.github.com> Date: Thu, 17 Oct 2024 00:48:23 +0200 Subject: [PATCH 11/11] feat: check filter subscriptions against lightPush (#2185) --- packages/interfaces/src/filter.ts | 2 +- packages/message-hash/src/index.ts | 4 +- .../sdk/src/protocols/filter/constants.ts | 5 +- packages/sdk/src/protocols/filter/index.ts | 12 +- .../protocols/filter/subscription_manager.ts | 107 +++++++++-- packages/sdk/src/reliability_monitor/index.ts | 7 +- .../sdk/src/reliability_monitor/receiver.ts | 175 +++++++----------- packages/sdk/src/waku/waku.ts | 6 +- 8 files changed, 192 insertions(+), 126 deletions(-) diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index 145f208fe8..bbe6bb351b 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -20,7 +20,7 @@ export type SubscriptionCallback = { export type SubscribeOptions = { keepAlive?: number; pingsBeforePeerRenewed?: number; - maxMissedMessagesThreshold?: number; + enableLightPushFilterCheck?: boolean; }; export interface ISubscription { diff --git a/packages/message-hash/src/index.ts b/packages/message-hash/src/index.ts index 825143ca95..b22f17215a 100644 --- a/packages/message-hash/src/index.ts +++ b/packages/message-hash/src/index.ts @@ -2,7 +2,7 @@ import { sha256 } from "@noble/hashes/sha256"; import type { IDecodedMessage, IProtoMessage } from "@waku/interfaces"; import { isDefined } from "@waku/utils"; import { - bytesToUtf8, + bytesToHex, concat, numberToBytes, utf8ToBytes @@ -56,6 +56,6 @@ export function messageHashStr( message: IProtoMessage | IDecodedMessage ): string { const hash = messageHash(pubsubTopic, message); - const hashStr = bytesToUtf8(hash); + const hashStr = bytesToHex(hash); return hashStr; } diff --git a/packages/sdk/src/protocols/filter/constants.ts b/packages/sdk/src/protocols/filter/constants.ts index 01cea6859f..7a4af02a30 100644 --- a/packages/sdk/src/protocols/filter/constants.ts +++ b/packages/sdk/src/protocols/filter/constants.ts @@ -1,5 +1,8 @@ export const DEFAULT_KEEP_ALIVE = 10_000; +export const DEFAULT_LIGHT_PUSH_FILTER_CHECK = false; +export const DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL = 10_000; export const DEFAULT_SUBSCRIBE_OPTIONS = { - keepAlive: DEFAULT_KEEP_ALIVE + keepAlive: DEFAULT_KEEP_ALIVE, + enableLightPushFilterCheck: DEFAULT_LIGHT_PUSH_FILTER_CHECK }; diff --git a/packages/sdk/src/protocols/filter/index.ts b/packages/sdk/src/protocols/filter/index.ts index 6f5320812b..2d8f356150 100644 --- a/packages/sdk/src/protocols/filter/index.ts +++ b/packages/sdk/src/protocols/filter/index.ts @@ -6,6 +6,7 @@ import { type IDecodedMessage, type IDecoder, type IFilter, + type ILightPush, type Libp2p, NetworkConfig, type ProtocolCreateOptions, @@ -38,7 +39,8 @@ class Filter extends BaseProtocolSDK implements IFilter { public constructor( connectionManager: ConnectionManager, - libp2p: Libp2p, + private libp2p: Libp2p, + private lightPush?: ILightPush, options?: ProtocolCreateOptions ) { super( @@ -195,7 +197,9 @@ class Filter extends BaseProtocolSDK implements IFilter { this.protocol, this.connectionManager, () => this.connectedPeers, - this.renewPeer.bind(this) + this.renewPeer.bind(this), + this.libp2p, + this.lightPush ) ); @@ -300,7 +304,9 @@ class Filter extends BaseProtocolSDK implements IFilter { export function wakuFilter( connectionManager: ConnectionManager, + lightPush?: ILightPush, init?: ProtocolCreateOptions ): (libp2p: Libp2p) => IFilter { - return (libp2p: Libp2p) => new Filter(connectionManager, libp2p, init); + return (libp2p: Libp2p) => + new Filter(connectionManager, libp2p, lightPush, init); } diff --git a/packages/sdk/src/protocols/filter/subscription_manager.ts b/packages/sdk/src/protocols/filter/subscription_manager.ts index 7087ee8e84..1dc2724ab5 100644 --- a/packages/sdk/src/protocols/filter/subscription_manager.ts +++ b/packages/sdk/src/protocols/filter/subscription_manager.ts @@ -1,6 +1,12 @@ import type { Peer } from "@libp2p/interface"; import type { PeerId } from "@libp2p/interface"; -import { ConnectionManager, FilterCore } from "@waku/core"; +import { + ConnectionManager, + createDecoder, + createEncoder, + FilterCore, + LightPushCore +} from "@waku/core"; import { type Callback, type ContentTopic, @@ -8,8 +14,10 @@ import { EConnectionStateEvents, type IDecodedMessage, type IDecoder, + type ILightPush, type IProtoMessage, type ISubscription, + type Libp2p, type PeerIdStr, ProtocolError, type PubsubTopic, @@ -23,7 +31,12 @@ import { groupByContentTopic, Logger } from "@waku/utils"; import { ReliabilityMonitorManager } from "../../reliability_monitor/index.js"; import { ReceiverReliabilityMonitor } from "../../reliability_monitor/receiver.js"; -import { DEFAULT_KEEP_ALIVE, DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js"; +import { + DEFAULT_KEEP_ALIVE, + DEFAULT_LIGHT_PUSH_FILTER_CHECK, + DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL, + DEFAULT_SUBSCRIBE_OPTIONS +} from "./constants.js"; const log = new Logger("sdk:filter:subscription_manager"); @@ -33,6 +46,8 @@ export class SubscriptionManager implements ISubscription { private keepAliveTimeout: number = DEFAULT_KEEP_ALIVE; private keepAliveInterval: ReturnType | null = null; + private enableLightPushFilterCheck = DEFAULT_LIGHT_PUSH_FILTER_CHECK; + private subscriptionCallbacks: Map< ContentTopic, SubscriptionCallback @@ -45,7 +60,9 @@ export class SubscriptionManager implements ISubscription { private readonly getPeers: () => Peer[], private readonly renewPeer: ( peerToDisconnect: PeerId - ) => Promise + ) => Promise, + private readonly libp2p: Libp2p, + private readonly lightPush?: ILightPush ) { this.pubsubTopic = pubsubTopic; this.subscriptionCallbacks = new Map(); @@ -56,7 +73,8 @@ export class SubscriptionManager implements ISubscription { this.renewPeer.bind(this), () => Array.from(this.subscriptionCallbacks.keys()), this.protocol.subscribe.bind(this.protocol), - this.protocol.addLibp2pEventListener.bind(this.protocol) + this.protocol.addLibp2pEventListener.bind(this.protocol), + this.sendLightPushCheckMessage.bind(this) ); } @@ -65,11 +83,10 @@ export class SubscriptionManager implements ISubscription { callback: Callback, options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS ): Promise { - this.reliabilityMonitor.setMaxMissedMessagesThreshold( - options.maxMissedMessagesThreshold - ); this.reliabilityMonitor.setMaxPingFailures(options.pingsBeforePeerRenewed); this.keepAliveTimeout = options.keepAlive || DEFAULT_KEEP_ALIVE; + this.enableLightPushFilterCheck = + options?.enableLightPushFilterCheck || DEFAULT_LIGHT_PUSH_FILTER_CHECK; const decodersArray = Array.isArray(decoders) ? decoders : [decoders]; @@ -87,11 +104,20 @@ export class SubscriptionManager implements ISubscription { } } + if (this.enableLightPushFilterCheck) { + decodersArray.push( + createDecoder( + this.buildLightPushContentTopic(), + this.pubsubTopic + ) as IDecoder + ); + } + const decodersGroupedByCT = groupByContentTopic(decodersArray); const contentTopics = Array.from(decodersGroupedByCT.keys()); const promises = this.getPeers().map(async (peer) => - this.protocol.subscribe(this.pubsubTopic, peer, contentTopics) + this.subscribeWithPeerVerification(peer, contentTopics) ); const results = await Promise.allSettled(promises); @@ -109,6 +135,11 @@ export class SubscriptionManager implements ISubscription { callback } as unknown as SubscriptionCallback; + // don't handle case of internal content topic + if (contentTopic === this.buildLightPushContentTopic()) { + return; + } + // The callback and decoder may override previous values, this is on // purpose as the user may call `subscribe` to refresh the subscription this.subscriptionCallbacks.set(contentTopic, subscriptionCallback); @@ -176,10 +207,9 @@ export class SubscriptionManager implements ISubscription { message: WakuMessage, peerIdStr: PeerIdStr ): Promise { - const alreadyReceived = this.reliabilityMonitor.processIncomingMessage( - message, - this.pubsubTopic, - peerIdStr + const alreadyReceived = this.reliabilityMonitor.notifyMessageReceived( + peerIdStr, + message as IProtoMessage ); if (alreadyReceived) { @@ -202,6 +232,19 @@ export class SubscriptionManager implements ISubscription { await pushMessage(subscriptionCallback, this.pubsubTopic, message); } + private async subscribeWithPeerVerification( + peer: Peer, + contentTopics: string[] + ): Promise { + const result = await this.protocol.subscribe( + this.pubsubTopic, + peer, + contentTopics + ); + await this.sendLightPushCheckMessage(peer); + return result; + } + private handleResult( results: PromiseSettledResult[], type: "ping" | "subscribe" | "unsubscribe" | "unsubscribeAll" @@ -324,6 +367,46 @@ export class SubscriptionManager implements ISubscription { clearInterval(this.keepAliveInterval); this.keepAliveInterval = null; } + + private async sendLightPushCheckMessage(peer: Peer): Promise { + if ( + this.lightPush && + this.libp2p && + this.reliabilityMonitor.shouldVerifyPeer(peer.id) + ) { + const encoder = createEncoder({ + contentTopic: this.buildLightPushContentTopic(), + pubsubTopic: this.pubsubTopic, + ephemeral: true + }); + + const message = { payload: new Uint8Array(1) }; + const protoMessage = await encoder.toProtoObj(message); + + // make a delay to be sure message is send when subscription is in place + setTimeout( + (async () => { + const result = await (this.lightPush!.protocol as LightPushCore).send( + encoder, + message, + peer + ); + this.reliabilityMonitor.notifyMessageSent(peer.id, protoMessage); + if (result.failure) { + log.error( + `failed to send lightPush ping message to peer:${peer.id.toString()}\t${result.failure.error}` + ); + return; + } + }) as () => void, + DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL + ); + } + } + + private buildLightPushContentTopic(): string { + return `/js-waku-subscription-ping/1/${this.libp2p.peerId.toString()}/utf8`; + } } async function pushMessage( diff --git a/packages/sdk/src/reliability_monitor/index.ts b/packages/sdk/src/reliability_monitor/index.ts index f75c5fdc92..f92d1fa866 100644 --- a/packages/sdk/src/reliability_monitor/index.ts +++ b/packages/sdk/src/reliability_monitor/index.ts @@ -24,7 +24,8 @@ export class ReliabilityMonitorManager { peer: Peer, contentTopics: ContentTopic[] ) => Promise, - addLibp2pEventListener: Libp2p["addEventListener"] + addLibp2pEventListener: Libp2p["addEventListener"], + sendLightPushMessage: (peer: Peer) => Promise ): ReceiverReliabilityMonitor { if (ReliabilityMonitorManager.receiverMonitors.has(pubsubTopic)) { return ReliabilityMonitorManager.receiverMonitors.get(pubsubTopic)!; @@ -36,7 +37,8 @@ export class ReliabilityMonitorManager { renewPeer, getContentTopics, protocolSubscribe, - addLibp2pEventListener + addLibp2pEventListener, + sendLightPushMessage ); ReliabilityMonitorManager.receiverMonitors.set(pubsubTopic, monitor); return monitor; @@ -50,7 +52,6 @@ export class ReliabilityMonitorManager { public static stopAll(): void { for (const [pubsubTopic, monitor] of this.receiverMonitors) { - monitor.setMaxMissedMessagesThreshold(undefined); monitor.setMaxPingFailures(undefined); this.receiverMonitors.delete(pubsubTopic); } diff --git a/packages/sdk/src/reliability_monitor/receiver.ts b/packages/sdk/src/reliability_monitor/receiver.ts index 61d700817f..ea58c1b17c 100644 --- a/packages/sdk/src/reliability_monitor/receiver.ts +++ b/packages/sdk/src/reliability_monitor/receiver.ts @@ -8,24 +8,20 @@ import { PubsubTopic } from "@waku/interfaces"; import { messageHashStr } from "@waku/message-hash"; -import { WakuMessage } from "@waku/proto"; import { Logger } from "@waku/utils"; - -type ReceivedMessageHashes = { - all: Set; - nodes: Record>; -}; - -const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3; +import { bytesToUtf8 } from "@waku/utils/bytes"; const log = new Logger("sdk:receiver:reliability_monitor"); const DEFAULT_MAX_PINGS = 3; +const MESSAGE_VERIFICATION_DELAY = 5_000; export class ReceiverReliabilityMonitor { - private receivedMessagesHashes: ReceivedMessageHashes; - private missedMessagesByPeer: Map = new Map(); - private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD; + private receivedMessagesFormPeer = new Set(); + private receivedMessages = new Set(); + private scheduledVerification = new Map(); + private verifiedPeers = new Set(); + private peerFailures: Map = new Map(); private maxPingFailures: number = DEFAULT_MAX_PINGS; private peerRenewalLocks: Set = new Set(); @@ -40,18 +36,9 @@ export class ReceiverReliabilityMonitor { peer: Peer, contentTopics: ContentTopic[] ) => Promise, - private addLibp2pEventListener: Libp2p["addEventListener"] + private addLibp2pEventListener: Libp2p["addEventListener"], + private sendLightPushMessage: (peer: Peer) => Promise ) { - const allPeerIdStr = this.getPeers().map((p) => p.id.toString()); - - this.receivedMessagesHashes = { - all: new Set(), - nodes: { - ...Object.fromEntries(allPeerIdStr.map((peerId) => [peerId, new Set()])) - } - }; - allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0)); - this.addLibp2pEventListener("peer:disconnect", (evt) => { const peerId = evt.detail; if (this.getPeers().some((p) => p.id.equals(peerId))) { @@ -60,13 +47,6 @@ export class ReceiverReliabilityMonitor { }); } - public setMaxMissedMessagesThreshold(value: number | undefined): void { - if (value === undefined) { - return; - } - this.maxMissedMessagesThreshold = value; - } - public setMaxPingFailures(value: number | undefined): void { if (value === undefined) { return; @@ -99,77 +79,79 @@ export class ReceiverReliabilityMonitor { } } - public processIncomingMessage( - message: WakuMessage, - pubsubTopic: PubsubTopic, - peerIdStr?: string + public notifyMessageReceived( + peerIdStr: string, + message: IProtoMessage ): boolean { - const alreadyReceived = this.addMessageToCache( - message, - pubsubTopic, - peerIdStr + const hash = this.buildMessageHash(message); + + this.verifiedPeers.add(peerIdStr); + this.receivedMessagesFormPeer.add(`${peerIdStr}-${hash}`); + + log.info( + `notifyMessage received debug: ephemeral:${message.ephemeral}\t${bytesToUtf8(message.payload)}` ); - return alreadyReceived; + log.info(`notifyMessage received: peer:${peerIdStr}\tmessage:${hash}`); + + if (this.receivedMessages.has(hash)) { + return true; + } + + this.receivedMessages.add(hash); + + return false; } - private addMessageToCache( - message: WakuMessage, - pubsubTopic: PubsubTopic, - peerIdStr?: string - ): boolean { - const hashedMessageStr = messageHashStr( - pubsubTopic, - message as IProtoMessage - ); + public notifyMessageSent(peerId: PeerId, message: IProtoMessage): void { + const peerIdStr = peerId.toString(); + const hash = this.buildMessageHash(message); + + log.info(`notifyMessage sent debug: ${bytesToUtf8(message.payload)}`); + + if (this.scheduledVerification.has(peerIdStr)) { + log.warn( + `notifyMessage sent: attempting to schedule verification for pending peer:${peerIdStr}\tmessage:${hash}` + ); + return; + } - const alreadyReceived = - this.receivedMessagesHashes.all.has(hashedMessageStr); - this.receivedMessagesHashes.all.add(hashedMessageStr); + const timeout = window.setTimeout( + (async () => { + const receivedAnyMessage = this.verifiedPeers.has(peerIdStr); + const receivedTestMessage = this.receivedMessagesFormPeer.has( + `${peerIdStr}-${hash}` + ); + + if (receivedAnyMessage || receivedTestMessage) { + log.info( + `notifyMessage sent setTimeout: verified that peer pushes filter messages, peer:${peerIdStr}\tmessage:${hash}` + ); + return; + } - if (peerIdStr) { - const hashesForPeer = this.receivedMessagesHashes.nodes[peerIdStr]; - if (!hashesForPeer) { log.warn( - `Peer ${peerIdStr} not initialized in receivedMessagesHashes.nodes, adding it.` + `notifyMessage sent setTimeout: peer didn't return probe message, attempting renewAndSubscribe, peer:${peerIdStr}\tmessage:${hash}` ); - this.receivedMessagesHashes.nodes[peerIdStr] = new Set(); - } - this.receivedMessagesHashes.nodes[peerIdStr].add(hashedMessageStr); - } + this.scheduledVerification.delete(peerIdStr); + await this.renewAndSubscribePeer(peerId); + }) as () => void, + MESSAGE_VERIFICATION_DELAY + ); - return alreadyReceived; + this.scheduledVerification.set(peerIdStr, timeout); } - // @ts-expect-error Turned off until properly investigated: https://github.com/waku-org/js-waku/issues/2075 - private async checkAndRenewPeers(): Promise { - for (const hash of this.receivedMessagesHashes.all) { - for (const [peerIdStr, hashes] of Object.entries( - this.receivedMessagesHashes.nodes - )) { - if (!hashes.has(hash)) { - this.incrementMissedMessageCount(peerIdStr); - if (this.shouldRenewPeer(peerIdStr)) { - log.info( - `Peer ${peerIdStr} has missed too many messages, renewing.` - ); - const peerId = this.getPeers().find( - (p) => p.id.toString() === peerIdStr - )?.id; - if (!peerId) { - log.error( - `Unexpected Error: Peer ${peerIdStr} not found in connected peers.` - ); - continue; - } - try { - await this.renewAndSubscribePeer(peerId); - } catch (error) { - log.error(`Failed to renew peer ${peerIdStr}: ${error}`); - } - } - } - } - } + public shouldVerifyPeer(peerId: PeerId): boolean { + const peerIdStr = peerId.toString(); + + const isPeerVerified = this.verifiedPeers.has(peerIdStr); + const isVerificationPending = this.scheduledVerification.has(peerIdStr); + + return !(isPeerVerified || isVerificationPending); + } + + private buildMessageHash(message: IProtoMessage): string { + return messageHashStr(this.pubsubTopic, message); } private async renewAndSubscribePeer( @@ -196,12 +178,9 @@ export class ReceiverReliabilityMonitor { this.getContentTopics() ); - this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set(); - this.missedMessagesByPeer.set(newPeer.id.toString(), 0); + await this.sendLightPushMessage(newPeer); this.peerFailures.delete(peerIdStr); - this.missedMessagesByPeer.delete(peerIdStr); - delete this.receivedMessagesHashes.nodes[peerIdStr]; return newPeer; } catch (error) { @@ -211,14 +190,4 @@ export class ReceiverReliabilityMonitor { this.peerRenewalLocks.delete(peerIdStr); } } - - private incrementMissedMessageCount(peerIdStr: string): void { - const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0; - this.missedMessagesByPeer.set(peerIdStr, currentCount + 1); - } - - private shouldRenewPeer(peerIdStr: string): boolean { - const missedMessages = this.missedMessagesByPeer.get(peerIdStr) || 0; - return missedMessages > this.maxMissedMessagesThreshold; - } } diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index 1c450420cf..eea596d7a9 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -116,7 +116,11 @@ export class WakuNode implements IWaku { } if (protocolsEnabled.filter) { - const filter = wakuFilter(this.connectionManager, options); + const filter = wakuFilter( + this.connectionManager, + this.lightPush, + options + ); this.filter = filter(libp2p); }