Node V2: proper server streaming client abort handling #1216
-
Hey, I'm trying to convert a server event approach to rpc. I already got it running, but I'm currently unsure if my implmentation completely disposes all connected generators on client close. Here an abstract exmaple of my current implementation: export async function* myStream() {
let aborted = false;
const abortPromise = new Promise<void>((resolve) =>
setTimeout(() => {
aborted = true;
resolve();
}, 1000),
);
while (true) {
console.log("Start Streaming");
yield await abortPromise;
console.log("Stopped Streaming");
if (aborted) break;
}
console.log("I reached the end of the generator.");
} If I run this in a Now the implementation in connect-node context: export async function* myStream(
req: AccountRequest,
context: HandlerContext,
) {
console.log(`Connected client with id ${req.id}`);
const abortPromise = new Promise<void>((resolve) =>
context.signal.addEventListener("abort", () => {
resolve();
}),
);
while (true) {
yield await Promise.race([
abortPromise,
getStreamPromise(), // This will always return an unresolved promise, that will resolves with the next server event.
]);
if (context.signal.aborted) break;
}
console.log(
`Closed client connection with id ${req.id}`,
);
} In this case the last console log will never be called, so I guess the while loop will continue to run infinitely. My guess is that before the "abort" event is send connect stopps "hearing" for Is this approach even in the right direction to handle a stream that should run as long as a client is connected? |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments
-
Hey! Irrespective of connection duration you can safely ignore the abort the signal for this specific use case: export async function* myStream(
req: AccountRequest,
context: HandlerContext,
) {
console.log(`Connected client with id ${req.id}`);
try {
console.log("Setup")
while (true) {
yield await getStreamPromise(); // The "yield" will throw or return.
}
} finally {
console.log("Cleaning up");
}
}
export async function* myStream(
req: AccountRequest,
context: HandlerContext,
) {
console.log(`Connected client with id ${req.id}`);
try {
console.log("Setup")
while (true) {
// The "yield" will still throw or return, but
// for that to happen the fetch call has to return first.
//
// To ensure fetch returns as soon as client disconnects, we pass
// the signal to fetch.
yield await fetch("/path/to/api", { signal: context.signal });
}
} finally {
console.log("Cleaning up");
}
} |
Beta Was this translation helpful? Give feedback.
-
Thank you @srikrsna-buf. Here is my current solution, following the input (#1217), in case some one else needs something similar: export function abortPromise(signal: AbortSignal) {
return new Promise<never>((_, reject) => {
signal.addEventListener("abort", () => {
reject(new Error("The stream has been destroyed"));
});
});
} Stream generator: export async function* myStream(
req: AccountRequest,
context: HandlerContext,
) {
console.log(`Connected client with id ${req.id}.`);
const abort = abortPromise(context.signal);
try {
while (true) {
yield Promise.race([abort, getStreamPromise()]);
}
} catch (error) {
console.error((error as Error)?.message);
}
console.log(
`Closed client connection with id ${req.id}.`,
);
} |
Beta Was this translation helpful? Give feedback.
Thank you @srikrsna-buf. Here is my current solution, following the input (#1217), in case some one else needs something similar:
Helper function
Stream generator: