Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: attempt to fix some of the Filter issues #2183

Merged
merged 15 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/interfaces/src/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export type SubscriptionCallback<T extends IDecodedMessage> = {
export type SubscribeOptions = {
keepAlive?: number;
pingsBeforePeerRenewed?: number;
maxMissedMessagesThreshold?: number;
enableLightPushFilterCheck?: boolean;
};

export interface ISubscription {
Expand Down
4 changes: 2 additions & 2 deletions packages/message-hash/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { sha256 } from "@noble/hashes/sha256";
import type { IDecodedMessage, IProtoMessage } from "@waku/interfaces";
import { isDefined } from "@waku/utils";
import {
bytesToUtf8,
bytesToHex,
concat,
numberToBytes,
utf8ToBytes
Expand Down Expand Up @@ -56,6 +56,6 @@ export function messageHashStr(
message: IProtoMessage | IDecodedMessage
): string {
const hash = messageHash(pubsubTopic, message);
const hashStr = bytesToUtf8(hash);
const hashStr = bytesToHex(hash);
return hashStr;
}
5 changes: 4 additions & 1 deletion packages/sdk/src/protocols/filter/constants.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
export const DEFAULT_KEEP_ALIVE = 60_000;
export const DEFAULT_LIGHT_PUSH_FILTER_CHECK = false;
export const DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL = 10_000;

export const DEFAULT_SUBSCRIBE_OPTIONS = {
keepAlive: DEFAULT_KEEP_ALIVE
keepAlive: DEFAULT_KEEP_ALIVE,
enableLightPushFilterCheck: DEFAULT_LIGHT_PUSH_FILTER_CHECK
};
12 changes: 9 additions & 3 deletions packages/sdk/src/protocols/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
type IDecodedMessage,
type IDecoder,
type IFilter,
type ILightPush,
type Libp2p,
NetworkConfig,
type ProtocolCreateOptions,
Expand Down Expand Up @@ -38,7 +39,8 @@ class Filter extends BaseProtocolSDK implements IFilter {

public constructor(
connectionManager: ConnectionManager,
libp2p: Libp2p,
private libp2p: Libp2p,
private lightPush?: ILightPush,
options?: ProtocolCreateOptions
) {
super(
Expand Down Expand Up @@ -195,7 +197,9 @@ class Filter extends BaseProtocolSDK implements IFilter {
this.protocol,
this.connectionManager,
() => this.connectedPeers,
this.renewPeer.bind(this)
this.renewPeer.bind(this),
this.libp2p,
this.lightPush
)
);

Expand Down Expand Up @@ -300,7 +304,9 @@ class Filter extends BaseProtocolSDK implements IFilter {

export function wakuFilter(
connectionManager: ConnectionManager,
lightPush?: ILightPush,
init?: ProtocolCreateOptions
): (libp2p: Libp2p) => IFilter {
return (libp2p: Libp2p) => new Filter(connectionManager, libp2p, init);
return (libp2p: Libp2p) =>
new Filter(connectionManager, libp2p, lightPush, init);
}
150 changes: 118 additions & 32 deletions packages/sdk/src/protocols/filter/subscription_manager.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
import type { Peer } from "@libp2p/interface";
import type { PeerId } from "@libp2p/interface";
import { ConnectionManager, FilterCore } from "@waku/core";
import {
ConnectionManager,
createDecoder,
createEncoder,
FilterCore,
LightPushCore
} from "@waku/core";
import {
type Callback,
type ContentTopic,
type CoreProtocolResult,
EConnectionStateEvents,
type IDecodedMessage,
type IDecoder,
type ILightPush,
type IProtoMessage,
type ISubscription,
type Libp2p,
type PeerIdStr,
ProtocolError,
type PubsubTopic,
Expand All @@ -23,14 +31,23 @@ import { groupByContentTopic, Logger } from "@waku/utils";
import { ReliabilityMonitorManager } from "../../reliability_monitor/index.js";
import { ReceiverReliabilityMonitor } from "../../reliability_monitor/receiver.js";

import { DEFAULT_KEEP_ALIVE, DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js";
import {
DEFAULT_KEEP_ALIVE,
DEFAULT_LIGHT_PUSH_FILTER_CHECK,
DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL,
DEFAULT_SUBSCRIBE_OPTIONS
} from "./constants.js";

const log = new Logger("sdk:filter:subscription_manager");

export class SubscriptionManager implements ISubscription {
private reliabilityMonitor: ReceiverReliabilityMonitor;

private keepAliveTimer: number | null = null;
private keepAliveTimeout: number = DEFAULT_KEEP_ALIVE;
private keepAliveInterval: ReturnType<typeof setInterval> | null = null;

private enableLightPushFilterCheck = DEFAULT_LIGHT_PUSH_FILTER_CHECK;

private subscriptionCallbacks: Map<
ContentTopic,
SubscriptionCallback<IDecodedMessage>
Expand All @@ -43,7 +60,9 @@ export class SubscriptionManager implements ISubscription {
private readonly getPeers: () => Peer[],
private readonly renewPeer: (
peerToDisconnect: PeerId
) => Promise<Peer | undefined>
) => Promise<Peer | undefined>,
private readonly libp2p: Libp2p,
private readonly lightPush?: ILightPush
) {
this.pubsubTopic = pubsubTopic;
this.subscriptionCallbacks = new Map();
Expand All @@ -54,7 +73,8 @@ export class SubscriptionManager implements ISubscription {
this.renewPeer.bind(this),
() => Array.from(this.subscriptionCallbacks.keys()),
this.protocol.subscribe.bind(this.protocol),
this.protocol.addLibp2pEventListener.bind(this.protocol)
this.protocol.addLibp2pEventListener.bind(this.protocol),
this.sendLightPushCheckMessage.bind(this)
);
}

Expand All @@ -63,11 +83,10 @@ export class SubscriptionManager implements ISubscription {
callback: Callback<T>,
options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
): Promise<SDKProtocolResult> {
this.reliabilityMonitor.setMaxMissedMessagesThreshold(
options.maxMissedMessagesThreshold
);
this.reliabilityMonitor.setMaxPingFailures(options.pingsBeforePeerRenewed);
this.keepAliveTimer = options.keepAlive || DEFAULT_KEEP_ALIVE;
this.keepAliveTimeout = options.keepAlive || DEFAULT_KEEP_ALIVE;
this.enableLightPushFilterCheck =
options?.enableLightPushFilterCheck || DEFAULT_LIGHT_PUSH_FILTER_CHECK;

const decodersArray = Array.isArray(decoders) ? decoders : [decoders];

Expand All @@ -85,11 +104,20 @@ export class SubscriptionManager implements ISubscription {
}
}

if (this.enableLightPushFilterCheck) {
decodersArray.push(
createDecoder(
this.buildLightPushContentTopic(),
this.pubsubTopic
) as IDecoder<T>
);
}

const decodersGroupedByCT = groupByContentTopic(decodersArray);
const contentTopics = Array.from(decodersGroupedByCT.keys());

const promises = this.getPeers().map(async (peer) =>
this.protocol.subscribe(this.pubsubTopic, peer, contentTopics)
this.subscribeWithPeerVerification(peer, contentTopics)
);

const results = await Promise.allSettled(promises);
Expand All @@ -107,12 +135,17 @@ export class SubscriptionManager implements ISubscription {
callback
} as unknown as SubscriptionCallback<IDecodedMessage>;

// don't handle case of internal content topic
if (contentTopic === this.buildLightPushContentTopic()) {
return;
}

// The callback and decoder may override previous values, this is on
// purpose as the user may call `subscribe` to refresh the subscription
this.subscriptionCallbacks.set(contentTopic, subscriptionCallback);
});

this.startSubscriptionsMaintenance(this.keepAliveTimer);
this.startSubscriptionsMaintenance(this.keepAliveTimeout);

return finalResult;
}
Expand Down Expand Up @@ -174,10 +207,9 @@ export class SubscriptionManager implements ISubscription {
message: WakuMessage,
peerIdStr: PeerIdStr
): Promise<void> {
const alreadyReceived = this.reliabilityMonitor.processIncomingMessage(
message,
this.pubsubTopic,
peerIdStr
const alreadyReceived = this.reliabilityMonitor.notifyMessageReceived(
peerIdStr,
message as IProtoMessage
);

if (alreadyReceived) {
Expand All @@ -200,6 +232,19 @@ export class SubscriptionManager implements ISubscription {
await pushMessage(subscriptionCallback, this.pubsubTopic, message);
}

private async subscribeWithPeerVerification(
peer: Peer,
contentTopics: string[]
): Promise<CoreProtocolResult> {
const result = await this.protocol.subscribe(
this.pubsubTopic,
peer,
contentTopics
);
await this.sendLightPushCheckMessage(peer);
return result;
}

private handleResult(
results: PromiseSettledResult<CoreProtocolResult>[],
type: "ping" | "subscribe" | "unsubscribe" | "unsubscribeAll"
Expand Down Expand Up @@ -240,23 +285,26 @@ export class SubscriptionManager implements ISubscription {
let result;
try {
result = await this.protocol.ping(peer);
return result;
} catch (error) {
return {
result = {
success: null,
failure: {
peerId,
error: ProtocolError.GENERIC_FAIL
}
};
} finally {
void this.reliabilityMonitor.handlePingResult(peerId, result);
}

log.info(
`Received result from filter ping peerId:${peerId.toString()}\tsuccess:${result.success?.toString()}\tfailure:${result.failure?.error}`
);
await this.reliabilityMonitor.handlePingResult(peerId, result);
return result;
}

private startSubscriptionsMaintenance(interval: number): void {
private startSubscriptionsMaintenance(timeout: number): void {
log.info("Starting subscriptions maintenance");
this.startKeepAlivePings(interval);
this.startKeepAlivePings(timeout);
this.startConnectionListener();
}

Expand Down Expand Up @@ -295,31 +343,69 @@ export class SubscriptionManager implements ISubscription {
log.error(`networkStateListener failed to recover: ${err}`);
}

this.startKeepAlivePings(this.keepAliveTimer || DEFAULT_KEEP_ALIVE);
this.startKeepAlivePings(this.keepAliveTimeout);
}

private startKeepAlivePings(interval: number): void {
if (this.keepAliveTimer) {
private startKeepAlivePings(timeout: number): void {
if (this.keepAliveInterval) {
log.info("Recurring pings already set up.");
return;
}

this.keepAliveTimer = setInterval(() => {
void this.ping()
.then(() => log.info("Keep-alive ping successful"))
.catch((error) => log.error("Error in keep-alive ping cycle:", error));
}, interval) as unknown as number;
this.keepAliveInterval = setInterval(() => {
void this.ping();
}, timeout);
}

private stopKeepAlivePings(): void {
if (!this.keepAliveTimer) {
if (!this.keepAliveInterval) {
log.info("Already stopped recurring pings.");
return;
}

log.info("Stopping recurring pings.");
clearInterval(this.keepAliveTimer);
this.keepAliveTimer = null;
clearInterval(this.keepAliveInterval);
this.keepAliveInterval = null;
}

private async sendLightPushCheckMessage(peer: Peer): Promise<void> {
if (
this.lightPush &&
this.libp2p &&
this.reliabilityMonitor.shouldVerifyPeer(peer.id)
) {
const encoder = createEncoder({
contentTopic: this.buildLightPushContentTopic(),
pubsubTopic: this.pubsubTopic,
ephemeral: true
});

const message = { payload: new Uint8Array(1) };
const protoMessage = await encoder.toProtoObj(message);

// make a delay to be sure message is send when subscription is in place
setTimeout(
(async () => {
const result = await (this.lightPush!.protocol as LightPushCore).send(
encoder,
message,
peer
);
this.reliabilityMonitor.notifyMessageSent(peer.id, protoMessage);
if (result.failure) {
log.error(
`failed to send lightPush ping message to peer:${peer.id.toString()}\t${result.failure.error}`
);
return;
}
}) as () => void,
DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL
);
}
}

private buildLightPushContentTopic(): string {
return `/js-waku-subscription-ping/1/${this.libp2p.peerId.toString()}/utf8`;
}
}

Expand Down
7 changes: 4 additions & 3 deletions packages/sdk/src/reliability_monitor/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ export class ReliabilityMonitorManager {
peer: Peer,
contentTopics: ContentTopic[]
) => Promise<CoreProtocolResult>,
addLibp2pEventListener: Libp2p["addEventListener"]
addLibp2pEventListener: Libp2p["addEventListener"],
sendLightPushMessage: (peer: Peer) => Promise<void>
): ReceiverReliabilityMonitor {
if (ReliabilityMonitorManager.receiverMonitors.has(pubsubTopic)) {
return ReliabilityMonitorManager.receiverMonitors.get(pubsubTopic)!;
Expand All @@ -36,7 +37,8 @@ export class ReliabilityMonitorManager {
renewPeer,
getContentTopics,
protocolSubscribe,
addLibp2pEventListener
addLibp2pEventListener,
sendLightPushMessage
);
ReliabilityMonitorManager.receiverMonitors.set(pubsubTopic, monitor);
return monitor;
Expand All @@ -50,7 +52,6 @@ export class ReliabilityMonitorManager {

public static stopAll(): void {
for (const [pubsubTopic, monitor] of this.receiverMonitors) {
monitor.setMaxMissedMessagesThreshold(undefined);
monitor.setMaxPingFailures(undefined);
this.receiverMonitors.delete(pubsubTopic);
}
Expand Down
Loading
Loading