Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: improve StreamManager #1994

Merged
merged 11 commits into from
May 14, 2024
78 changes: 39 additions & 39 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 4 additions & 7 deletions packages/core/src/lib/light_push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,18 +98,15 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
};
}

let stream: Stream | undefined;
let stream: Stream;
try {
stream = await this.getStream(peer);
} catch (err) {
log.error(
`Failed to get a stream for remote peer${peer.id.toString()}`,
err
);
} catch (error) {
log.error("Failed to get stream", error);
return {
success: null,
failure: {
error: ProtocolError.REMOTE_PEER_FAULT,
error: ProtocolError.NO_STREAM_AVAILABLE,
peerId: peer.id
}
};
Expand Down
11 changes: 10 additions & 1 deletion packages/core/src/lib/metadata/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,16 @@ class Metadata extends BaseProtocol implements IMetadata {
};
}

const stream = await this.getStream(peer);
let stream;
try {
stream = await this.getStream(peer);
} catch (error) {
log.error("Failed to get stream", error);
return {
shardInfo: null,
error: ProtocolError.NO_STREAM_AVAILABLE
};
}

const encodedResponse = await pipe(
[request],
Expand Down
8 changes: 7 additions & 1 deletion packages/core/src/lib/store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,13 @@ export class StoreCore extends BaseProtocol implements IStoreCore {

const historyRpcQuery = HistoryRpc.createQuery(queryOpts);

const stream = await this.getStream(peer);
let stream;
try {
stream = await this.getStream(peer);
} catch (e) {
log.error("Failed to get stream", e);
break;
}

const res = await pipe(
[historyRpcQuery.encode()],
Expand Down
91 changes: 66 additions & 25 deletions packages/core/src/lib/stream_manager.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import type { PeerUpdate, Stream } from "@libp2p/interface";
import { Peer } from "@libp2p/interface";
import type { Peer, PeerId } from "@libp2p/interface";
import { Libp2p } from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { selectConnection } from "@waku/utils/libp2p";

const CONNECTION_TIMEOUT = 5_000;
const RETRY_BACKOFF_BASE = 1_000;
const MAX_RETRIES = 3;

export class StreamManager {
private streamPool: Map<string, Promise<Stream | void>>;
private readonly streamPool: Map<string, Promise<Stream | void>>;
private readonly log: Logger;

constructor(
Expand All @@ -14,60 +18,97 @@ export class StreamManager {
public addEventListener: Libp2p["addEventListener"]
) {
this.log = new Logger(`stream-manager:${multicodec}`);
this.addEventListener(
"peer:update",
this.handlePeerUpdateStreamPool.bind(this)
);
this.getStream = this.getStream.bind(this);
this.streamPool = new Map();
this.addEventListener("peer:update", this.handlePeerUpdateStreamPool);
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
}

public async getStream(peer: Peer): Promise<Stream> {
const peerIdStr = peer.id.toString();
const streamPromise = this.streamPool.get(peerIdStr);

if (!streamPromise) {
return this.newStream(peer); // fallback by creating a new stream on the spot
return this.createStream(peer);
}

// We have the stream, let's remove it from the map
this.streamPool.delete(peerIdStr);
this.prepareStream(peer);

this.prepareNewStream(peer);

const stream = await streamPromise;

if (!stream || stream.status === "closed") {
return this.newStream(peer); // fallback by creating a new stream on the spot
try {
const stream = await streamPromise;
if (stream && stream.status !== "closed") {
return stream;
}
} catch (error) {
this.log.warn(`Failed to get stream for ${peerIdStr} -- `, error);
this.log.warn("Attempting to create a new stream for the peer");
}

return stream;
return this.createStream(peer);
}

private async newStream(peer: Peer): Promise<Stream> {
private async createStream(peer: Peer, retries = 0): Promise<Stream> {
const connections = this.getConnections(peer.id);
const connection = selectConnection(connections);

if (!connection) {
throw new Error("Failed to get a connection to the peer");
}
return connection.newStream(this.multicodec);

try {
return await connection.newStream(this.multicodec);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's rewrite to

let result;
try {
  result = await ...;
}
return result;

the reason is due to huge difference in code execution because of return or return await

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same for other places

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you elaborate on the difference in code execution? imo both of them are the same?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if try { return promise } catch ... - it won't catch as promise is not awaited
if try { return await promise } catch ... - will catch and the difference is only in one word

I think it is better to be on a clear side of things and write:

let result;
try {
  result = await ...;
}
return result;

as the absence of await changes so much (and I faced some nasty bugs in the past because of that)

} catch (error) {
if (retries < MAX_RETRIES) {
const backoff = RETRY_BACKOFF_BASE * Math.pow(2, retries);
await new Promise((resolve) => setTimeout(resolve, backoff));
return this.createStream(peer, retries + 1);
}
throw new Error(
`Failed to create a new stream for ${peer.id.toString()} -- ` + error
);
}
}

private prepareNewStream(peer: Peer): void {
const streamPromise = this.newStream(peer).catch(() => {
// No error thrown as this call is not triggered by the user
private prepareStream(peer: Peer): void {
const timeoutPromise = new Promise<void>((resolve) =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's make it a util or use some small npm module for it (I prefer to have some convenient module) - we can do a follow up

setTimeout(resolve, CONNECTION_TIMEOUT)
);

const streamPromise = Promise.race([
this.createStream(peer),
timeoutPromise.then(() => {
throw new Error("Connection timeout");
})
]).catch((error) => {
this.log.error(
`Failed to prepare a new stream for ${peer.id.toString()}`
`Failed to prepare a new stream for ${peer.id.toString()} -- `,
error
);
});

this.streamPool.set(peer.id.toString(), streamPromise);
}

private handlePeerUpdateStreamPool = (evt: CustomEvent<PeerUpdate>): void => {
const peer = evt.detail.peer;
const { peer } = evt.detail;

if (peer.protocols.includes(this.multicodec)) {
this.log.info(`Preemptively opening a stream to ${peer.id.toString()}`);
this.prepareNewStream(peer);
const isConnected = this.isConnectedTo(peer.id);

if (isConnected) {
this.log.info(`Preemptively opening a stream to ${peer.id.toString()}`);
this.prepareStream(peer);
} else {
const peerIdStr = peer.id.toString();
this.streamPool.delete(peerIdStr);
this.log.info(
`Removed pending stream for disconnected peer ${peerIdStr}`
);
}
}
};

private isConnectedTo(peerId: PeerId): boolean {
const connections = this.getConnections(peerId);
return connections.some((connection) => connection.status === "open");
}
}
Loading
Loading