Skip to content

Commit

Permalink
fix(startServer): Properly close web stream body when connection is c…
Browse files Browse the repository at this point in the history
…losed
  • Loading branch information
vinsonchuong committed Nov 22, 2023
1 parent 3a6ef2a commit ce80fd3
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 14 deletions.
6 changes: 5 additions & 1 deletion http/start-server/build-responder.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ export default function (computeResponse) {
) {
nodeResponse.end(response.body)
} else if (response.body instanceof ReadableStream) {
Readable.fromWeb(response.body).pipe(nodeResponse)
const stream = Readable.fromWeb(response.body)
stream.pipe(nodeResponse)
nodeResponse.once('close', () => {
stream.destroy()
})
} else {
response.body.pipe(nodeResponse)
}
Expand Down
57 changes: 45 additions & 12 deletions http/start-server/index.test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import {Buffer} from 'node:buffer'
import {createHash} from 'node:crypto'
import {Readable} from 'node:stream'
import http from 'node:http'
import {setTimeout} from 'node:timers/promises'
import test from 'ava'
import WebSocket from 'ws'
import makeCert from 'make-cert'
Expand Down Expand Up @@ -141,8 +143,39 @@ test('supporting a web stream body', async (t) => {
)
})

test('closing a web stream when the requester closes the connection', async (t) => {
await new Promise(async (resolve) => {
const server = await startServer({port: 10_005}, () => ({
status: 200,
headers: {
'Content-Type': 'text/event-stream',
},
body: new ReadableStream({
async pull(controller) {
await setTimeout(100)
controller.enqueue('Echo')
},
cancel() {
resolve()
},
}),
}))
t.teardown(async () => {
stopServer(server)
})

const request = http.request('http://localhost:10005', async (response) => {
await setTimeout(200)
response.destroy()
})
request.end()
})

t.pass()
})

test('supporting a stream body', async (t) => {
const server = await startServer({port: 10_005}, () => ({
const server = await startServer({port: 10_006}, () => ({
status: 200,
headers: {
'content-type': 'text/plain',
Expand All @@ -156,7 +189,7 @@ test('supporting a stream body', async (t) => {
t.like(
await sendRequest({
method: 'GET',
url: 'http://localhost:10005',
url: 'http://localhost:10006',
headers: {},
}),
{
Expand All @@ -166,7 +199,7 @@ test('supporting a stream body', async (t) => {
})

test('supporting a buffer body', async (t) => {
const server = await startServer({port: 10_006}, () => ({
const server = await startServer({port: 10_007}, () => ({
status: 200,
headers: {
'content-type': 'text/plain',
Expand All @@ -180,7 +213,7 @@ test('supporting a buffer body', async (t) => {
t.like(
await sendRequest({
method: 'GET',
url: 'http://localhost:10006',
url: 'http://localhost:10007',
headers: {},
}),
{
Expand All @@ -190,12 +223,12 @@ test('supporting a buffer body', async (t) => {
})

test('omitting unused fields', async (t) => {
const server = await startServer({port: 10_007}, () => ({status: 200}))
const server = await startServer({port: 10_008}, () => ({status: 200}))
t.teardown(async () => {
stopServer(server)
})

t.like(await sendRequest({method: 'GET', url: 'http://localhost:10007'}), {
t.like(await sendRequest({method: 'GET', url: 'http://localhost:10008'}), {
status: 200,
})
})
Expand All @@ -204,7 +237,7 @@ test('allowing upgrading to WebSocket', async (t) => {
const webSocketHashingConstant = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
const keyRegex = /^[+/\dA-Za-z]{22}==$/

const server = await startServer({port: 10_008}, (request) => {
const server = await startServer({port: 10_009}, (request) => {
if (
request.headers.connection === 'Upgrade' &&
request.headers.upgrade === 'websocket'
Expand Down Expand Up @@ -253,14 +286,14 @@ test('allowing upgrading to WebSocket', async (t) => {
stopServer(server)
})

t.like(await sendRequest({method: 'GET', url: 'http://localhost:10008'}), {
t.like(await sendRequest({method: 'GET', url: 'http://localhost:10009'}), {
status: 426,
})

t.like(
await sendRequest({
method: 'GET',
url: 'http://localhost:10008',
url: 'http://localhost:10009',
headers: {
Connection: 'Upgrade',
Upgrade: 'unsupported',
Expand All @@ -274,7 +307,7 @@ test('allowing upgrading to WebSocket', async (t) => {
t.like(
await sendRequest({
method: 'GET',
url: 'http://localhost:10008',
url: 'http://localhost:10009',
headers: {
Connection: 'Upgrade',
Upgrade: 'websocket',
Expand All @@ -288,7 +321,7 @@ test('allowing upgrading to WebSocket', async (t) => {
t.like(
await sendRequest({
method: 'GET',
url: 'http://localhost:10008',
url: 'http://localhost:10009',
headers: {
Connection: 'Upgrade',
Upgrade: 'websocket',
Expand All @@ -300,7 +333,7 @@ test('allowing upgrading to WebSocket', async (t) => {
},
)

const ws = new WebSocket('ws://localhost:10008')
const ws = new WebSocket('ws://localhost:10009')
await new Promise((resolve) => {
ws.once('open', resolve)
})
Expand Down
6 changes: 5 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,14 @@
"prettier": true,
"space": 2,
"semicolon": false,
"envs": [
"browser"
],
"rules": {
"ava/use-t-well": "off",
"import/no-anonymous-default-export": "off",
"unicorn/prefer-event-target": "off"
"unicorn/prefer-event-target": "off",
"no-async-promise-executor": "off"
}
},
"packageManager": "[email protected]"
Expand Down

0 comments on commit ce80fd3

Please sign in to comment.