Skip to content

Commit

Permalink
Merge pull request #19 from webrpc/sse-timeout-bug
Browse files Browse the repository at this point in the history
Fix SSE timeout bug
  • Loading branch information
VojtechVitek authored Oct 2, 2024
2 parents 2f1daf9 + 342a5f5 commit ca8157b
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 180 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:

- name: Install webrpc-gen
run: |
curl -o ./webrpc-gen -fLJO https://github.com/webrpc/webrpc/releases/download/v0.14.0/webrpc-gen.linux-amd64
curl -o ./webrpc-gen -fLJO https://github.com/webrpc/webrpc/releases/download/v0.19.3/webrpc-gen.linux-amd64
chmod +x ./webrpc-gen
echo $PWD >> $GITHUB_PATH
Expand Down
6 changes: 3 additions & 3 deletions _examples/node-ts/server/server.gen.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* eslint-disable */
// node-ts v1.0.0 2167bb6fb0a186ff099cd8a309c6ffbbd7d1b9c0
// node-ts v1.0.0 bd572b349e330d81cc30b1ff3cf69d7ab59f1619
// --
// Code generated by webrpc-gen@v0.14.0 with ../../ generator. DO NOT EDIT.
// Code generated by webrpc-gen@v0.19.3 with ../../ generator. DO NOT EDIT.
//
// webrpc-gen -schema=service.ridl -target=../../ -server -out=./server/server.gen.ts

Expand All @@ -12,7 +12,7 @@ export const WebRPCVersion = "v1"
export const WebRPCSchemaVersion = "v1.0.0"

// Schema hash generated from your RIDL schema
export const WebRPCSchemaHash = "2167bb6fb0a186ff099cd8a309c6ffbbd7d1b9c0"
export const WebRPCSchemaHash = "bd572b349e330d81cc30b1ff3cf69d7ab59f1619"

//
// Types
Expand Down
6 changes: 3 additions & 3 deletions _examples/node-ts/webapp/client.gen.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* eslint-disable */
// node-ts v1.0.0 2167bb6fb0a186ff099cd8a309c6ffbbd7d1b9c0
// node-ts v1.0.0 bd572b349e330d81cc30b1ff3cf69d7ab59f1619
// --
// Code generated by webrpc-gen@v0.14.0 with ../../ generator. DO NOT EDIT.
// Code generated by webrpc-gen@v0.19.3 with ../../ generator. DO NOT EDIT.
//
// webrpc-gen -schema=service.ridl -target=../../ -client -out=./webapp/client.gen.ts

Expand All @@ -12,7 +12,7 @@ export const WebRPCVersion = "v1"
export const WebRPCSchemaVersion = "v1.0.0"

// Schema hash generated from your RIDL schema
export const WebRPCSchemaHash = "2167bb6fb0a186ff099cd8a309c6ffbbd7d1b9c0"
export const WebRPCSchemaHash = "bd572b349e330d81cc30b1ff3cf69d7ab59f1619"

//
// Types
Expand Down
167 changes: 79 additions & 88 deletions _examples/sse/webapp/client.gen.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* eslint-disable */
// webrpc-sse-chat v1.0.0 f3307fcc3621aa9099b8fa02f8bb22dee4263705
// webrpc-sse-chat v1.0.0 e64bf9372e2faa45da9d449e8ea108dc0597a98d
// --
// Code generated by webrpc-gen@v0.14.0 with ../../ generator. DO NOT EDIT.
// Code generated by webrpc-gen@v0.19.3 with ../../ generator. DO NOT EDIT.
//
// webrpc-gen -schema=service.ridl -target=../../ -client -out=./webapp/client.gen.ts

Expand All @@ -12,7 +12,7 @@ export const WebRPCVersion = "v1"
export const WebRPCSchemaVersion = "v1.0.0"

// Schema hash generated from your RIDL schema
export const WebRPCSchemaHash = "f3307fcc3621aa9099b8fa02f8bb22dee4263705"
export const WebRPCSchemaHash = "e64bf9372e2faa45da9d449e8ea108dc0597a98d"

//
// Types
Expand Down Expand Up @@ -139,103 +139,94 @@ const sseResponse = async (
let buffer = "";
let lastReadTime = Date.now();
const timeout = (10 + 1) * 1000;
let intervalId: any;
let timeoutError = false
const intervalId = setInterval(() => {
if (Date.now() - lastReadTime > timeout) {
timeoutError = true
clearInterval(intervalId)
reader.releaseLock()
}
}, timeout);

try {
intervalId = setInterval(() => {
if (Date.now() - lastReadTime > timeout) {
throw WebrpcStreamLostError.new({cause: "Stream timed out"});
while (true) {
let value;
let done;
try {
({value, done} = await reader.read());
if (timeoutError) throw new Error("Timeout, no data or heartbeat received")
lastReadTime = Date.now();
buffer += decoder.decode(value, {stream: true});
} catch (error) {
let message = "";
if (error instanceof Error) {
message = error.message;
}
}, timeout);

while (true) {
let value;
let done;
try {
({value, done} = await reader.read());
lastReadTime = Date.now();
buffer += decoder.decode(value, {stream: true});
} catch (error) {
let message = "";
if (error instanceof Error) {
message = error.message;
}

if (error instanceof DOMException && error.name === "AbortError") {
onError(
WebrpcRequestFailedError.new({
message: "AbortError",
cause: `AbortError: ${message}`,
}),
() => {
throw new Error("Abort signal cannot be used to reconnect");
}
);
} else {
onError(
WebrpcStreamLostError.new({
cause: `reader.read(): ${message}`,
}),
retryFetch
);
}
return;
if (error instanceof DOMException && error.name === "AbortError") {
onError(
WebrpcRequestFailedError.new({
message: "AbortError",
cause: `AbortError: ${message}`,
}),
() => {
throw new Error("Abort signal cannot be used to reconnect");
}
);
} else {
onError(
WebrpcStreamLostError.new({
cause: `reader.read(): ${message}`,
}),
retryFetch
);
}
return;
}

let lines = buffer.split("\n");
for (let i = 0; i < lines.length - 1; i++) {
if (lines[i].length == 0) {
continue;
}
let data: any;
try {
data = JSON.parse(lines[i]);
if (data.hasOwnProperty("webrpcError")) {
const error = data.webrpcError;
const code: number =
typeof error.code === "number" ? error.code : 0;
onError(
(webrpcErrorByCode[code] || WebrpcError).new(error),
retryFetch
);
return;
}
} catch (error) {
if (
error instanceof Error &&
error.message === "Abort signal cannot be used to reconnect"
) {
throw error;
}
let lines = buffer.split("\n");
for (let i = 0; i < lines.length - 1; i++) {
if (lines[i].length == 0) {
continue;
}
let data: any;
try {
data = JSON.parse(lines[i]);
if (data.hasOwnProperty("webrpcError")) {
const error = data.webrpcError;
const code: number =
typeof error.code === "number" ? error.code : 0;
onError(
WebrpcBadResponseError.new({
status: res.status,
// @ts-ignore
cause: `JSON.parse(): ${error.message}`,
}),
(webrpcErrorByCode[code] || WebrpcError).new(error),
retryFetch
);
return;
}
onMessage(data);
}

if (!done) {
buffer = lines[lines.length - 1];
continue;
} catch (error) {
if (
error instanceof Error &&
error.message === "Abort signal cannot be used to reconnect"
) {
throw error;
}
onError(
WebrpcBadResponseError.new({
status: res.status,
// @ts-ignore
cause: `JSON.parse(): ${error.message}`,
}),
retryFetch
);
}

onClose && onClose();
return;
onMessage(data);
}
} catch (error) {
// @ts-ignore
if (error instanceof WebrpcStreamLostError) {
onError(error, retryFetch);
} else {
throw error;

if (!done) {
buffer = lines[lines.length - 1];
continue;
}
} finally {
clearInterval(intervalId);

onClose && onClose();
return;
}
};

Expand Down
Loading

0 comments on commit ca8157b

Please sign in to comment.