Skip to content

Commit

Permalink
feat(filter)!: return error codes instead of throwing errors (#1971)
Browse files Browse the repository at this point in the history
* move protocol result type to interfaces

* chore: update type names for verbosity

* feat(filter-core): convert error throws to return types

* chore: update types & imports

* update Filter API

* chore: update createSubscription

* chore: update imports & rename

* chore: update all tests

* chore: resolve conflicts & merge (2/n)

* chore: resolve conflicts & merge (3/n)

* chore: resolve conflicts & merge (4/n)

* chore: resolve conflicts & merge (5/n)

* chore: resolve conflicts & merge (6/n)

* chore: use idiomatic approach

* chore: fix tests

* chore: address comments

* chore: fix test

* rm: only
  • Loading branch information
danisharora099 authored May 9, 2024
1 parent 5df41b0 commit 4eb06c6
Show file tree
Hide file tree
Showing 29 changed files with 551 additions and 284 deletions.
206 changes: 157 additions & 49 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
import type { Peer } from "@libp2p/interface";
import type { Peer, Stream } from "@libp2p/interface";
import type { IncomingStreamData } from "@libp2p/interface-internal";
import type {
ContentTopic,
IBaseProtocolCore,
Libp2p,
ProtocolCreateOptions,
PubsubTopic
import {
type ContentTopic,
type CoreProtocolResult,
type IBaseProtocolCore,
type Libp2p,
type ProtocolCreateOptions,
ProtocolError,
type PubsubTopic
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import { Logger } from "@waku/utils";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import { Uint8ArrayList } from "uint8arraylist";

import { BaseProtocol } from "../base_protocol.js";

Expand Down Expand Up @@ -90,53 +93,106 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
pubsubTopic: PubsubTopic,
peer: Peer,
contentTopics: ContentTopic[]
): Promise<void> {
): Promise<CoreProtocolResult> {
const stream = await this.getStream(peer);

const request = FilterSubscribeRpc.createSubscribeRequest(
pubsubTopic,
contentTopics
);

const res = await pipe(
[request.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);

if (!res || !res.length) {
throw Error(
`No response received for request ${request.requestId}: ${res}`
let res: Uint8ArrayList[] | undefined;
try {
res = await pipe(
[request.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);
} catch (error) {
log.error("Failed to send subscribe request", error);
return {
success: null,
failure: {
error: ProtocolError.GENERIC_FAIL,
peerId: peer.id
}
};
}

const { statusCode, requestId, statusDesc } =
FilterSubscribeResponse.decode(res[0].slice());

if (statusCode < 200 || statusCode >= 300) {
throw new Error(
log.error(
`Filter subscribe request ${requestId} failed with status code ${statusCode}: ${statusDesc}`
);
return {
failure: {
error: ProtocolError.REMOTE_PEER_REJECTED,
peerId: peer.id
},
success: null
};
}

return {
failure: null,
success: peer.id
};
}

async unsubscribe(
pubsubTopic: PubsubTopic,
peer: Peer,
contentTopics: ContentTopic[]
): Promise<void> {
const stream = await this.getStream(peer);
): Promise<CoreProtocolResult> {
let stream: Stream | undefined;
try {
stream = await this.getStream(peer);
} catch (error) {
log.error(
`Failed to get a stream for remote peer${peer.id.toString()}`,
error
);
return {
success: null,
failure: {
error: ProtocolError.REMOTE_PEER_FAULT,
peerId: peer.id
}
};
}

const unsubscribeRequest = FilterSubscribeRpc.createUnsubscribeRequest(
pubsubTopic,
contentTopics
);

await pipe([unsubscribeRequest.encode()], lp.encode, stream.sink);
try {
await pipe([unsubscribeRequest.encode()], lp.encode, stream.sink);
} catch (error) {
log.error("Failed to send unsubscribe request", error);
return {
success: null,
failure: {
error: ProtocolError.GENERIC_FAIL,
peerId: peer.id
}
};
}

return {
success: peer.id,
failure: null
};
}

async unsubscribeAll(pubsubTopic: PubsubTopic, peer: Peer): Promise<void> {
async unsubscribeAll(
pubsubTopic: PubsubTopic,
peer: Peer
): Promise<CoreProtocolResult> {
const stream = await this.getStream(peer);

const request = FilterSubscribeRpc.createUnsubscribeAllRequest(pubsubTopic);
Expand All @@ -150,53 +206,105 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
);

if (!res || !res.length) {
throw Error(
`No response received for request ${request.requestId}: ${res}`
);
return {
failure: {
error: ProtocolError.REMOTE_PEER_FAULT,
peerId: peer.id
},
success: null
};
}

const { statusCode, requestId, statusDesc } =
FilterSubscribeResponse.decode(res[0].slice());

if (statusCode < 200 || statusCode >= 300) {
throw new Error(
log.error(
`Filter unsubscribe all request ${requestId} failed with status code ${statusCode}: ${statusDesc}`
);
return {
failure: {
error: ProtocolError.REMOTE_PEER_REJECTED,
peerId: peer.id
},
success: null
};
}

return {
failure: null,
success: peer.id
};
}

async ping(peer: Peer): Promise<void> {
const stream = await this.getStream(peer);
async ping(peer: Peer): Promise<CoreProtocolResult> {
let stream: Stream | undefined;
try {
stream = await this.getStream(peer);
} catch (error) {
log.error(
`Failed to get a stream for remote peer${peer.id.toString()}`,
error
);
return {
success: null,
failure: {
error: ProtocolError.REMOTE_PEER_FAULT,
peerId: peer.id
}
};
}

const request = FilterSubscribeRpc.createSubscriberPingRequest();

let res: Uint8ArrayList[] | undefined;
try {
const res = await pipe(
res = await pipe(
[request.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);

if (!res || !res.length) {
throw Error(
`No response received for request ${request.requestId}: ${res}`
);
}

const { statusCode, requestId, statusDesc } =
FilterSubscribeResponse.decode(res[0].slice());

if (statusCode < 200 || statusCode >= 300) {
throw new Error(
`Filter ping request ${requestId} failed with status code ${statusCode}: ${statusDesc}`
);
}
log.info(`Ping successful for peer ${peer.id.toString()}`);
} catch (error) {
log.error("Error pinging: ", error);
throw error; // Rethrow the actual error instead of wrapping it
log.error("Failed to send ping request", error);
return {
success: null,
failure: {
error: ProtocolError.GENERIC_FAIL,
peerId: peer.id
}
};
}

if (!res || !res.length) {
return {
success: null,
failure: {
error: ProtocolError.REMOTE_PEER_FAULT,
peerId: peer.id
}
};
}

const { statusCode, requestId, statusDesc } =
FilterSubscribeResponse.decode(res[0].slice());

if (statusCode < 200 || statusCode >= 300) {
log.error(
`Filter ping request ${requestId} failed with status code ${statusCode}: ${statusDesc}`
);
return {
success: null,
failure: {
error: ProtocolError.REMOTE_PEER_REJECTED,
peerId: peer.id
}
};
}
return {
success: peer.id,
failure: null
};
}
}
22 changes: 10 additions & 12 deletions packages/core/src/lib/light_push/index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import type { Peer, PeerId, Stream } from "@libp2p/interface";
import type { Peer, Stream } from "@libp2p/interface";
import {
Failure,
IBaseProtocolCore,
IEncoder,
IMessage,
Libp2p,
ProtocolCreateOptions,
type CoreProtocolResult,
type IBaseProtocolCore,
type IEncoder,
type IMessage,
type Libp2p,
type ProtocolCreateOptions,
ProtocolError,
ProtocolResult
type ThisOrThat
} from "@waku/interfaces";
import { PushResponse } from "@waku/proto";
import { isMessageSizeUnderCap } from "@waku/utils";
Expand All @@ -26,9 +26,7 @@ const log = new Logger("light-push");
export const LightPushCodec = "/vac/waku/lightpush/2.0.0-beta1";
export { PushResponse };

type PreparePushMessageResult = ProtocolResult<"query", PushRpc>;

type CoreSendResult = ProtocolResult<"success", PeerId, "failure", Failure>;
type PreparePushMessageResult = ThisOrThat<"query", PushRpc>;

/**
* Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/).
Expand Down Expand Up @@ -84,7 +82,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
encoder: IEncoder,
message: IMessage,
peer: Peer
): Promise<CoreSendResult> {
): Promise<CoreProtocolResult> {
const { query, error: preparationError } = await this.preparePushMessage(
encoder,
message
Expand Down
10 changes: 6 additions & 4 deletions packages/core/src/lib/metadata/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ import { IncomingStreamData } from "@libp2p/interface";
import {
type IMetadata,
type Libp2pComponents,
type MetadataQueryResult,
type PeerIdStr,
ProtocolError,
QueryResult,
type ShardInfo
} from "@waku/interfaces";
import { proto_metadata } from "@waku/proto";
Expand Down Expand Up @@ -74,7 +74,7 @@ class Metadata extends BaseProtocol implements IMetadata {
/**
* Make a metadata query to a peer
*/
async query(peerId: PeerId): Promise<QueryResult> {
async query(peerId: PeerId): Promise<MetadataQueryResult> {
const request = proto_metadata.WakuMetadataRequest.encode(this.shardInfo);

const peer = await this.peerStore.get(peerId);
Expand Down Expand Up @@ -112,7 +112,9 @@ class Metadata extends BaseProtocol implements IMetadata {
};
}

public async confirmOrAttemptHandshake(peerId: PeerId): Promise<QueryResult> {
public async confirmOrAttemptHandshake(
peerId: PeerId
): Promise<MetadataQueryResult> {
const shardInfo = this.handshakesConfirmed.get(peerId.toString());
if (shardInfo) {
return {
Expand All @@ -126,7 +128,7 @@ class Metadata extends BaseProtocol implements IMetadata {

private decodeMetadataResponse(
encodedResponse: Uint8ArrayList[]
): QueryResult {
): MetadataQueryResult {
const bytes = new Uint8ArrayList();

encodedResponse.forEach((chunk) => {
Expand Down
6 changes: 4 additions & 2 deletions packages/discovery/src/peer-exchange/waku_peer_exchange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
IPeerExchange,
Libp2pComponents,
PeerExchangeQueryParams,
PeerExchangeResult,
PeerExchangeQueryResult,
ProtocolError,
PubsubTopic
} from "@waku/interfaces";
Expand Down Expand Up @@ -35,7 +35,9 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
/**
* Make a peer exchange query to a peer
*/
async query(params: PeerExchangeQueryParams): Promise<PeerExchangeResult> {
async query(
params: PeerExchangeQueryParams
): Promise<PeerExchangeQueryResult> {
const { numPeers } = params;
const rpcQuery = PeerExchangeRPC.createRequest({
numPeers: BigInt(numPeers)
Expand Down
Loading

0 comments on commit 4eb06c6

Please sign in to comment.