Skip to content

Commit

Permalink
feat: node and protocols health (#2080)
Browse files Browse the repository at this point in the history
* feat: introduce HealthManager

* feat: make health accessible on Waku object

* feat: update health from protocols

* chore: add access modifiers to healthmanager

* feat: use a HealthManager singleton

* chore: add tests for Filter, LightPush and Store

* feat: add overall node health

* chore: update protocol health to consider Store protocol

* chore: setup generic test utils instead of using filter utils

* tests: add a health status matrix check from 0-3

* chore: increase timeout for failing tests in CI
tests pass locally without an increased timeout, but fail in CI

* chore: move name inference to HealthManager

* tests: abstract away node creation and teardown utils

* fix: import
  • Loading branch information
danisharora099 authored Jul 27, 2024
1 parent defe41b commit d464af3
Show file tree
Hide file tree
Showing 21 changed files with 557 additions and 32 deletions.
2 changes: 2 additions & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ export { waitForRemotePeer } from "./lib/wait_for_remote_peer.js";

export { ConnectionManager } from "./lib/connection_manager.js";

export { getHealthManager } from "./lib/health_manager.js";

export { KeepAliveManager } from "./lib/keep_alive_manager.js";
export { StreamManager } from "./lib/stream_manager/index.js";

Expand Down
90 changes: 90 additions & 0 deletions packages/core/src/lib/health_manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import {
HealthStatus,
type IHealthManager,
NodeHealth,
type ProtocolHealth,
Protocols
} from "@waku/interfaces";

class HealthManager implements IHealthManager {
public static instance: HealthManager;
private readonly health: NodeHealth;

private constructor() {
this.health = {
overallStatus: HealthStatus.Unhealthy,
protocolStatuses: new Map()
};
}

public static getInstance(): HealthManager {
if (!HealthManager.instance) {
HealthManager.instance = new HealthManager();
}
return HealthManager.instance;
}

public getHealthStatus(): HealthStatus {
return this.health.overallStatus;
}

public getProtocolStatus(protocol: Protocols): ProtocolHealth | undefined {
return this.health.protocolStatuses.get(protocol);
}

public updateProtocolHealth(
multicodec: string,
connectedPeers: number
): void {
const protocol = this.getNameFromMulticodec(multicodec);

let status: HealthStatus = HealthStatus.Unhealthy;
if (connectedPeers == 1) {
status = HealthStatus.MinimallyHealthy;
} else if (connectedPeers >= 2) {
status = HealthStatus.SufficientlyHealthy;
}

this.health.protocolStatuses.set(protocol, {
name: protocol,
status: status,
lastUpdate: new Date()
});

this.updateOverallHealth();
}

private getNameFromMulticodec(multicodec: string): Protocols {
let name: Protocols;
if (multicodec.includes("filter")) {
name = Protocols.Filter;
} else if (multicodec.includes("lightpush")) {
name = Protocols.LightPush;
} else if (multicodec.includes("store")) {
name = Protocols.Store;
} else {
throw new Error(`Unknown protocol: ${multicodec}`);
}
return name;
}

private updateOverallHealth(): void {
const relevantProtocols = [Protocols.LightPush, Protocols.Filter];
const statuses = relevantProtocols.map(
(p) => this.getProtocolStatus(p)?.status
);

if (statuses.some((status) => status === HealthStatus.Unhealthy)) {
this.health.overallStatus = HealthStatus.Unhealthy;
} else if (
statuses.some((status) => status === HealthStatus.MinimallyHealthy)
) {
this.health.overallStatus = HealthStatus.MinimallyHealthy;
} else {
this.health.overallStatus = HealthStatus.SufficientlyHealthy;
}
}
}

export const getHealthManager = (): HealthManager =>
HealthManager.getInstance();
26 changes: 26 additions & 0 deletions packages/interfaces/src/health_manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { Protocols } from "./protocols";

export enum HealthStatus {
Unhealthy = "Unhealthy",
MinimallyHealthy = "MinimallyHealthy",
SufficientlyHealthy = "SufficientlyHealthy"
}

export interface IHealthManager {
getHealthStatus: () => HealthStatus;
getProtocolStatus: (protocol: Protocols) => ProtocolHealth | undefined;
updateProtocolHealth: (multicodec: string, connectedPeers: number) => void;
}

export type NodeHealth = {
overallStatus: HealthStatus;
protocolStatuses: ProtocolsHealthStatus;
};

export type ProtocolHealth = {
name: Protocols;
status: HealthStatus;
lastUpdate: Date;
};

export type ProtocolsHealthStatus = Map<Protocols, ProtocolHealth>;
1 change: 1 addition & 0 deletions packages/interfaces/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ export * from "./dns_discovery.js";
export * from "./metadata.js";
export * from "./constants.js";
export * from "./local_storage.js";
export * from "./health_manager.js";
2 changes: 1 addition & 1 deletion packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ export type IBaseProtocolCore = {
};

export type IBaseProtocolSDK = {
renewPeer: (peerToDisconnect: PeerId) => Promise<Peer>;
readonly connectedPeers: Peer[];
renewPeer: (peerToDisconnect: PeerId) => Promise<Peer>;
readonly numPeersToUse: number;
};

Expand Down
3 changes: 3 additions & 0 deletions packages/interfaces/src/waku.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { MultiaddrInput } from "@multiformats/multiaddr";

import { IConnectionManager } from "./connection_manager.js";
import type { IFilterSDK } from "./filter.js";
import { IHealthManager } from "./health_manager.js";
import type { Libp2p } from "./libp2p.js";
import type { ILightPushSDK } from "./light_push.js";
import { Protocols } from "./protocols.js";
Expand All @@ -27,6 +28,8 @@ export interface Waku {
isStarted(): boolean;

isConnected(): boolean;

health: IHealthManager;
}

export interface LightNode extends Waku {
Expand Down
30 changes: 26 additions & 4 deletions packages/sdk/src/protocols/base_protocol.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import type { Peer, PeerId } from "@libp2p/interface";
import { ConnectionManager } from "@waku/core";
import { ConnectionManager, getHealthManager } from "@waku/core";
import { BaseProtocol } from "@waku/core/lib/base_protocol";
import { IBaseProtocolSDK, ProtocolUseOptions } from "@waku/interfaces";
import {
IBaseProtocolSDK,
IHealthManager,
ProtocolUseOptions
} from "@waku/interfaces";
import { delay, Logger } from "@waku/utils";

interface Options {
Expand All @@ -14,6 +18,7 @@ const DEFAULT_NUM_PEERS_TO_USE = 3;
const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000;

export class BaseProtocolSDK implements IBaseProtocolSDK {
private healthManager: IHealthManager;
public readonly numPeersToUse: number;
private peers: Peer[] = [];
private maintainPeersIntervalId: ReturnType<
Expand All @@ -32,6 +37,9 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
options: Options
) {
this.log = new Logger(`sdk:${core.multicodec}`);

this.healthManager = getHealthManager();

this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE;
const maintainPeersInterval =
options?.maintainPeersInterval ?? DEFAULT_MAINTAIN_PEERS_INTERVAL;
Expand Down Expand Up @@ -60,7 +68,11 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
);
}

this.peers = this.peers.filter((peer) => !peer.id.equals(peerToDisconnect));
const updatedPeers = this.peers.filter(
(peer) => !peer.id.equals(peerToDisconnect)
);
this.updatePeers(updatedPeers);

this.log.info(
`Peer ${peerToDisconnect} disconnected and removed from the peer list`
);
Expand Down Expand Up @@ -192,7 +204,9 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {

await Promise.all(dials);

this.peers = [...this.peers, ...additionalPeers];
const updatedPeers = [...this.peers, ...additionalPeers];
this.updatePeers(updatedPeers);

this.log.info(
`Added ${additionalPeers.length} new peers, total peers: ${this.peers.length}`
);
Expand Down Expand Up @@ -232,6 +246,14 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
throw error;
}
}

private updatePeers(peers: Peer[]): void {
this.peers = peers;
this.healthManager.updateProtocolHealth(
this.core.multicodec,
this.peers.length
);
}
}

class RenewPeerLocker {
Expand Down
6 changes: 5 additions & 1 deletion packages/sdk/src/waku.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import type { Stream } from "@libp2p/interface";
import { isPeerId, PeerId } from "@libp2p/interface";
import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr";
import { ConnectionManager } from "@waku/core";
import { ConnectionManager, getHealthManager } from "@waku/core";
import type {
IFilterSDK,
IHealthManager,
ILightPushSDK,
IRelay,
IStoreSDK,
Expand Down Expand Up @@ -68,6 +69,7 @@ export class WakuNode implements Waku {
public lightPush?: ILightPushSDK;
public connectionManager: ConnectionManager;
public readonly pubsubTopics: PubsubTopic[];
public readonly health: IHealthManager;

public constructor(
options: WakuOptions,
Expand Down Expand Up @@ -105,6 +107,8 @@ export class WakuNode implements Waku {
this.relay
);

this.health = getHealthManager();

if (protocolsEnabled.store) {
const store = wakuStore(this.connectionManager, options);
this.store = store(libp2p);
Expand Down
1 change: 1 addition & 0 deletions packages/tests/src/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ export * from "./base64_utf8.js";
export * from "./waitForConnections.js";
export * from "./custom_mocha_hooks.js";
export * from "./waku_versions_utils.js";
export * from "./nodes.js";
115 changes: 115 additions & 0 deletions packages/tests/src/utils/nodes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import { waitForRemotePeer } from "@waku/core";
import {
LightNode,
ProtocolCreateOptions,
Protocols,
ShardingParams,
Waku
} from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { isDefined, shardInfoToPubsubTopics } from "@waku/utils";
import { Context } from "mocha";
import pRetry from "p-retry";

import { DefaultTestPubsubTopic, NOISE_KEY_1 } from "../constants";
import { ServiceNodesFleet } from "../lib";
import { Args } from "../types";

import { waitForConnections } from "./waitForConnections";

export async function runMultipleNodes(
context: Context,
shardInfo?: ShardingParams,
customArgs?: Args,
strictChecking: boolean = false,
numServiceNodes = 3,
withoutFilter = false
): Promise<[ServiceNodesFleet, LightNode]> {
const pubsubTopics = shardInfo
? shardInfoToPubsubTopics(shardInfo)
: [DefaultTestPubsubTopic];
// create numServiceNodes nodes
const serviceNodes = await ServiceNodesFleet.createAndRun(
context,
pubsubTopics,
numServiceNodes,
strictChecking,
shardInfo,
customArgs,
withoutFilter
);

const wakuOptions: ProtocolCreateOptions = {
staticNoiseKey: NOISE_KEY_1,
libp2p: {
addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] }
}
};

if (shardInfo) {
wakuOptions.shardInfo = shardInfo;
} else {
wakuOptions.pubsubTopics = pubsubTopics;
}

const waku = await createLightNode(wakuOptions);
await waku.start();

if (!waku) {
throw new Error("Failed to initialize waku");
}

for (const node of serviceNodes.nodes) {
await waku.dial(await node.getMultiaddrWithId());
await waitForRemotePeer(
waku,
[
!customArgs?.filter ? undefined : Protocols.Filter,
!customArgs?.lightpush ? undefined : Protocols.LightPush
].filter(isDefined)
);
await node.ensureSubscriptions(pubsubTopics);

const wakuConnections = waku.libp2p.getConnections();
const nodePeers = await node.peers();

if (wakuConnections.length < 1 || nodePeers.length < 1) {
throw new Error(
`Expected at least 1 peer in each node. Got waku connections: ${wakuConnections.length} and service nodes: ${nodePeers.length}`
);
}
}

await waitForConnections(numServiceNodes, waku);

return [serviceNodes, waku];
}

export async function teardownNodesWithRedundancy(
serviceNodes: ServiceNodesFleet,
wakuNodes: Waku | Waku[]
): Promise<void> {
const wNodes = Array.isArray(wakuNodes) ? wakuNodes : [wakuNodes];

const stopNwakuNodes = serviceNodes.nodes.map(async (node) => {
await pRetry(
async () => {
await node.stop();
},
{ retries: 3 }
);
});

const stopWakuNodes = wNodes.map(async (waku) => {
if (waku) {
await pRetry(
async () => {
await waku.stop();
},
{ retries: 3 }
);
}
});

await Promise.all([...stopNwakuNodes, ...stopWakuNodes]);
}
Loading

0 comments on commit d464af3

Please sign in to comment.