Skip to content

Commit

Permalink
feat(filter): peer/subscription renewal with recurring Filter pings (#…
Browse files Browse the repository at this point in the history
…2052)

* chore: renewPeer() returns the new found peer

* feat: ping & peer renewal

* chore: add tests

* fix: tests

* chore: remove only

* chore: remove comments

* chore(tests): decrease timeout

* chore: add array index validation

* chore: remove only

* chore: move defaults into a separate variable

* chore: update lightpush with new API

* chore: include peer renewals within `ping` instead of `interval`

* chore: update tests

* chore: add new test

* chore: address comments
  • Loading branch information
danisharora099 authored Jul 10, 2024
1 parent 68590f0 commit 318667e
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 68 deletions.
5 changes: 4 additions & 1 deletion packages/interfaces/src/filter.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { PeerId } from "@libp2p/interface";

import type { IDecodedMessage, IDecoder } from "./message.js";
import type { ContentTopic, PubsubTopic, ThisOrThat } from "./misc.js";
import type {
Expand All @@ -13,6 +15,7 @@ import type { IReceiver } from "./receiver.js";

export type SubscribeOptions = {
keepAlive?: number;
pingsBeforePeerRenewed?: number;
};

export type IFilter = IReceiver & IBaseProtocolCore;
Expand All @@ -26,7 +29,7 @@ export interface ISubscriptionSDK {

unsubscribe(contentTopics: ContentTopic[]): Promise<SDKProtocolResult>;

ping(): Promise<SDKProtocolResult>;
ping(peerId?: PeerId): Promise<SDKProtocolResult>;

unsubscribeAll(): Promise<SDKProtocolResult>;
}
Expand Down
2 changes: 1 addition & 1 deletion packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export type IBaseProtocolCore = {
};

export type IBaseProtocolSDK = {
renewPeer: (peerToDisconnect: PeerId) => Promise<void>;
renewPeer: (peerToDisconnect: PeerId) => Promise<Peer>;
readonly connectedPeers: Peer[];
readonly numPeersToUse: number;
};
Expand Down
39 changes: 21 additions & 18 deletions packages/sdk/src/protocols/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,24 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
/**
* Disconnects from a peer and tries to find a new one to replace it.
* @param peerToDisconnect The peer to disconnect from.
* @returns The new peer that was found and connected to.
*/
public async renewPeer(peerToDisconnect: PeerId): Promise<void> {
public async renewPeer(peerToDisconnect: PeerId): Promise<Peer> {
this.log.info(`Renewing peer ${peerToDisconnect}`);
try {
await this.connectionManager.dropConnection(peerToDisconnect);
this.peers = this.peers.filter((peer) => peer.id !== peerToDisconnect);
this.log.info(
`Peer ${peerToDisconnect} disconnected and removed from the peer list`
);

await this.findAndAddPeers(1);
} catch (error) {
this.log.info(
"Peer renewal failed, relying on the interval to find a new peer"
await this.connectionManager.dropConnection(peerToDisconnect);
this.peers = this.peers.filter((peer) => peer.id !== peerToDisconnect);
this.log.info(
`Peer ${peerToDisconnect} disconnected and removed from the peer list`
);

const peer = (await this.findAndAddPeers(1))[0];
if (!peer) {
throw new Error(
"Failed to find a new peer to replace the disconnected one"
);
}

return peer;
}

/**
Expand Down Expand Up @@ -171,14 +173,15 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
* Finds and adds new peers to the peers list.
* @param numPeers The number of peers to find and add.
*/
private async findAndAddPeers(numPeers: number): Promise<void> {
private async findAndAddPeers(numPeers: number): Promise<Peer[]> {
this.log.info(`Finding and adding ${numPeers} new peers`);
try {
const additionalPeers = await this.findAdditionalPeers(numPeers);
this.peers = [...this.peers, ...additionalPeers];
this.log.info(
`Added ${additionalPeers.length} new peers, total peers: ${this.peers.length}`
);
return additionalPeers;
} catch (error) {
this.log.error("Error finding and adding new peers:", error);
throw error;
Expand All @@ -197,20 +200,20 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
try {
let newPeers = await this.core.getPeers({
maxBootstrapPeers: 0,
numPeers: numPeers
numPeers: 0
});

if (newPeers.length === 0) {
this.log.warn("No new peers found, trying with bootstrap peers");
newPeers = await this.core.getPeers({
maxBootstrapPeers: numPeers,
numPeers: numPeers
numPeers: 0
});
}

newPeers = newPeers.filter(
(peer) => this.peers.some((p) => p.id === peer.id) === false
);
newPeers = newPeers
.filter((peer) => this.peers.some((p) => p.id === peer.id) === false)
.slice(0, numPeers);
return newPeers;
} catch (error) {
this.log.error("Error finding additional peers:", error);
Expand Down
119 changes: 91 additions & 28 deletions packages/sdk/src/protocols/filter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { Peer } from "@libp2p/interface";
import type { PeerId } from "@libp2p/interface";
import { ConnectionManager, FilterCore } from "@waku/core";
import {
type Callback,
Expand Down Expand Up @@ -41,13 +42,17 @@ type SubscriptionCallback<T extends IDecodedMessage> = {
const log = new Logger("sdk:filter");

const MINUTE = 60 * 1000;
const DEFAULT_MAX_PINGS = 3;

const DEFAULT_SUBSCRIBE_OPTIONS = {
keepAlive: MINUTE
};
export class SubscriptionManager implements ISubscriptionSDK {
private readonly pubsubTopic: PubsubTopic;
readonly receivedMessagesHashStr: string[] = [];
private keepAliveTimer: number | null = null;
private peerFailures: Map<string, number> = new Map();
private maxPingFailures: number = DEFAULT_MAX_PINGS;

private subscriptionCallbacks: Map<
ContentTopic,
Expand All @@ -56,18 +61,21 @@ export class SubscriptionManager implements ISubscriptionSDK {

constructor(
pubsubTopic: PubsubTopic,
private peers: Peer[],
private protocol: FilterCore
private protocol: FilterCore,
private getPeers: () => Peer[],
private readonly renewPeer: (peerToDisconnect: PeerId) => Promise<Peer>
) {
this.pubsubTopic = pubsubTopic;
this.subscriptionCallbacks = new Map();
}

async subscribe<T extends IDecodedMessage>(
public async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
): Promise<SDKProtocolResult> {
this.maxPingFailures = options.pingsBeforePeerRenewed || DEFAULT_MAX_PINGS;

const decodersArray = Array.isArray(decoders) ? decoders : [decoders];

// check that all decoders are configured for the same pubsub topic as this subscription
Expand All @@ -87,7 +95,7 @@ export class SubscriptionManager implements ISubscriptionSDK {
const decodersGroupedByCT = groupByContentTopic(decodersArray);
const contentTopics = Array.from(decodersGroupedByCT.keys());

const promises = this.peers.map(async (peer) =>
const promises = this.getPeers().map(async (peer) =>
this.protocol.subscribe(this.pubsubTopic, peer, contentTopics)
);

Expand All @@ -111,15 +119,17 @@ export class SubscriptionManager implements ISubscriptionSDK {
this.subscriptionCallbacks.set(contentTopic, subscriptionCallback);
});

if (options?.keepAlive) {
this.startKeepAlivePings(options.keepAlive);
if (options.keepAlive) {
this.startKeepAlivePings(options);
}

return finalResult;
}

async unsubscribe(contentTopics: ContentTopic[]): Promise<SDKProtocolResult> {
const promises = this.peers.map(async (peer) => {
public async unsubscribe(
contentTopics: ContentTopic[]
): Promise<SDKProtocolResult> {
const promises = this.getPeers().map(async (peer) => {
const response = await this.protocol.unsubscribe(
this.pubsubTopic,
peer,
Expand All @@ -143,16 +153,17 @@ export class SubscriptionManager implements ISubscriptionSDK {
return finalResult;
}

async ping(): Promise<SDKProtocolResult> {
const promises = this.peers.map(async (peer) => this.protocol.ping(peer));
public async ping(peerId?: PeerId): Promise<SDKProtocolResult> {
const peers = peerId ? [peerId] : this.getPeers().map((peer) => peer.id);

const promises = peers.map((peerId) => this.pingSpecificPeer(peerId));
const results = await Promise.allSettled(promises);

return this.handleResult(results, "ping");
}

async unsubscribeAll(): Promise<SDKProtocolResult> {
const promises = this.peers.map(async (peer) =>
public async unsubscribeAll(): Promise<SDKProtocolResult> {
const promises = this.getPeers().map(async (peer) =>
this.protocol.unsubscribeAll(this.pubsubTopic, peer)
);

Expand Down Expand Up @@ -217,31 +228,78 @@ export class SubscriptionManager implements ISubscriptionSDK {
}
}
}
return result;
}

// TODO: handle renewing faulty peers with new peers (https://github.com/waku-org/js-waku/issues/1463)
private async pingSpecificPeer(peerId: PeerId): Promise<CoreProtocolResult> {
const peer = this.getPeers().find((p) => p.id.equals(peerId));
if (!peer) {
return {
success: null,
failure: {
peerId,
error: ProtocolError.NO_PEER_AVAILABLE
}
};
}

return result;
try {
const result = await this.protocol.ping(peer);
if (result.failure) {
await this.handlePeerFailure(peerId);
} else {
this.peerFailures.delete(peerId.toString());
}
return result;
} catch (error) {
await this.handlePeerFailure(peerId);
return {
success: null,
failure: {
peerId,
error: ProtocolError.GENERIC_FAIL
}
};
}
}

private async handlePeerFailure(peerId: PeerId): Promise<void> {
const failures = (this.peerFailures.get(peerId.toString()) || 0) + 1;
this.peerFailures.set(peerId.toString(), failures);

if (failures > this.maxPingFailures) {
try {
await this.renewAndSubscribePeer(peerId);
this.peerFailures.delete(peerId.toString());
} catch (error) {
log.error(`Failed to renew peer ${peerId.toString()}: ${error}.`);
}
}
}

private startKeepAlivePings(interval: number): void {
private async renewAndSubscribePeer(peerId: PeerId): Promise<Peer> {
const newPeer = await this.renewPeer(peerId);
await this.protocol.subscribe(
this.pubsubTopic,
newPeer,
Array.from(this.subscriptionCallbacks.keys())
);

return newPeer;
}

private startKeepAlivePings(options: SubscribeOptions): void {
const { keepAlive } = options;
if (this.keepAliveTimer) {
log.info("Recurring pings already set up.");
return;
}

this.keepAliveTimer = setInterval(() => {
const run = async (): Promise<void> => {
try {
log.info("Recurring ping to peers.");
await this.ping();
} catch (error) {
log.error("Stopping recurring pings due to failure", error);
this.stopKeepAlivePings();
}
};

void run();
}, interval) as unknown as number;
void this.ping().catch((error) => {
log.error("Error in keep-alive ping cycle:", error);
});
}, keepAlive) as unknown as number;
}

private stopKeepAlivePings(): void {
Expand Down Expand Up @@ -345,7 +403,12 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
this.getActiveSubscription(pubsubTopic) ??
this.setActiveSubscription(
pubsubTopic,
new SubscriptionManager(pubsubTopic, this.connectedPeers, this.protocol)
new SubscriptionManager(
pubsubTopic,
this.protocol,
() => this.connectedPeers,
this.renewPeer.bind(this)
)
);

return {
Expand Down
7 changes: 6 additions & 1 deletion packages/sdk/src/protocols/light_push.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,12 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
}
if (failure) {
if (failure.peerId) {
await this.renewPeer(failure.peerId);
try {
await this.renewPeer(failure.peerId);
log.info("Renewed peer", failure.peerId.toString());
} catch (error) {
log.error("Failed to renew peer", error);
}
}

failures.push(failure);
Expand Down
Loading

0 comments on commit 318667e

Please sign in to comment.