diff --git a/packages/connect-conformance/package.json b/packages/connect-conformance/package.json index cd39b6e41..8ef29c829 100644 --- a/packages/connect-conformance/package.json +++ b/packages/connect-conformance/package.json @@ -14,7 +14,7 @@ "connectconformance": "bin/connectconformance.cjs" }, "scripts": { - "generate": "buf generate buf.build/connectrpc/conformance:v1.0.2", + "generate": "buf generate buf.build/connectrpc/conformance:v1.0.3", "postgenerate": "license-header src/gen", "prebuild": "rm -rf ./dist/*", "build": "npm run build:cjs && npm run build:esm", diff --git a/packages/connect-conformance/src/callback-client.ts b/packages/connect-conformance/src/callback-client.ts index 92e09cdb0..25c3066ee 100644 --- a/packages/connect-conformance/src/callback-client.ts +++ b/packages/connect-conformance/src/callback-client.ts @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -import { createCallbackClient, ConnectError } from "@connectrpc/connect"; +import { createCallbackClient, ConnectError, Code } from "@connectrpc/connect"; import type { CallbackClient, Transport } from "@connectrpc/connect"; import { ClientCompatRequest, @@ -20,18 +20,18 @@ import { } from "./gen/connectrpc/conformance/v1/client_compat_pb.js"; import { UnaryRequest, - Header as ConformanceHeader, ServerStreamRequest, - ConformancePayload, UnimplementedRequest, IdempotentUnaryRequest, } from "./gen/connectrpc/conformance/v1/service_pb.js"; import { convertToProtoError, convertToProtoHeaders, - appendProtoHeaders, wait, getCancelTiming, + getRequestHeaders, + getSingleRequestMessage, + setClientErrorResult, } from "./protocol.js"; import { ConformanceService } from "./gen/connectrpc/conformance/v1/service_connect.js"; @@ -59,189 +59,138 @@ export function invokeWithCallbackClient( async function unary( client: ConformanceClient, - req: ClientCompatRequest, + compatRequest: ClientCompatRequest, idempotent: boolean = false, ) { - if (req.requestMessages.length !== 1) { - throw new Error("Unary method requires exactly one request message"); - } - const msg = req.requestMessages[0]; - const uReq = idempotent ? new IdempotentUnaryRequest() : new UnaryRequest(); - if (!msg.unpackTo(uReq)) { - throw new Error("Could not unpack request message to unary request"); - } - const reqHeader = new Headers(); - appendProtoHeaders(reqHeader, req.requestHeaders); - let error: ConnectError | undefined = undefined; - let resHeaders: ConformanceHeader[] = []; - let resTrailers: ConformanceHeader[] = []; - const payloads: ConformancePayload[] = []; - - let call = client.unary; - if (idempotent) { - call = client.idempotentUnary; - } - - await wait(req.requestDelayMs); + await wait(compatRequest.requestDelayMs); + const result = new ClientResponseResult(); return new Promise((resolve) => { - call( - uReq, - (err, uRes) => { + const call = idempotent ? client.idempotentUnary : client.unary; + let clientCancelled = false; + const clientCancelFn = call( + getSingleRequestMessage( + compatRequest, + idempotent ? IdempotentUnaryRequest : UnaryRequest, + ), + (err, response) => { + // Callback clients swallow client triggered cancellations and never + // call the callback. This will trigger the global error handler and + // fail the process. + if (clientCancelled) { + throw new Error("Aborted requests should not trigger the callback"); + } if (err !== undefined) { - error = ConnectError.from(err); - // We can't distinguish between headers and trailers here, so we just - // add the metadata to both. - // - // But if the headers are already set, we don't need to overwrite them. - resHeaders = - resHeaders.length === 0 - ? convertToProtoHeaders(error.metadata) - : resHeaders; - resTrailers = convertToProtoHeaders(error.metadata); + setClientErrorResult(result, err); } else { - payloads.push(uRes.payload!); + result.payloads.push(response.payload!); } - resolve( - new ClientResponseResult({ - payloads: payloads, - responseHeaders: resHeaders, - responseTrailers: resTrailers, - error: convertToProtoError(error), - }), - ); + resolve(result); }, { - headers: reqHeader, + headers: getRequestHeaders(compatRequest), onHeader(headers) { - resHeaders = convertToProtoHeaders(headers); + result.responseHeaders = convertToProtoHeaders(headers); }, onTrailer(trailers) { - resTrailers = convertToProtoHeaders(trailers); + result.responseTrailers = convertToProtoHeaders(trailers); }, }, ); + const { afterCloseSendMs } = getCancelTiming(compatRequest); + if (afterCloseSendMs >= 0) { + setTimeout(() => { + clientCancelled = true; + clientCancelFn(); + // Callback clients swallow client triggered cancellations and never + // call the callback. We report a fake error to the test runner to let + // it know that the call was cancelled. + result.error = convertToProtoError( + new ConnectError("client cancelled", Code.Canceled), + ); + resolve(result); + }, afterCloseSendMs); + } }); } async function serverStream( client: ConformanceClient, - req: ClientCompatRequest, + compatRequest: ClientCompatRequest, ) { - if (req.requestMessages.length !== 1) { - throw new Error("ServerStream method requires exactly one request message"); - } - const msg = req.requestMessages[0]; - const uReq = new ServerStreamRequest(); - if (!msg.unpackTo(uReq)) { - throw new Error( - "Could not unpack request message to server stream request", - ); - } - const reqHeader = new Headers(); - appendProtoHeaders(reqHeader, req.requestHeaders); - let error: ConnectError | undefined = undefined; - let resHeaders: ConformanceHeader[] = []; - let resTrailers: ConformanceHeader[] = []; - const payloads: ConformancePayload[] = []; - const cancelTiming = getCancelTiming(req); - let count = 0; - - await wait(req.requestDelayMs); + const cancelTiming = getCancelTiming(compatRequest); + await wait(compatRequest.requestDelayMs); + const result = new ClientResponseResult(); return new Promise((resolve) => { - const cancelFn = client.serverStream( - uReq, - (uResp) => { - if (cancelTiming.afterNumResponses === 0) { - cancelFn(); - } - payloads.push(uResp.payload!); - count++; - if (count === cancelTiming.afterNumResponses) { - cancelFn(); + let clientCancelled = false; + const clientCancelFn = client.serverStream( + getSingleRequestMessage(compatRequest, ServerStreamRequest), + (response) => { + result.payloads.push(response.payload!); + if (result.payloads.length === cancelTiming.afterNumResponses) { + clientCancelled = true; + clientCancelFn(); } }, (err) => { + // Callback clients call the closeCallback without an error for client + // triggered cancellation. We report a fake error to the test runner to let + // it know that the call was cancelled. + if (clientCancelled) { + if (err !== undefined) { + throw new Error( + "Aborted requests should not trigger the closeCallback with an error", + ); + } + result.error = convertToProtoError( + new ConnectError("client cancelled", Code.Canceled), + ); + } if (err !== undefined) { - error = ConnectError.from(err); - // We can't distinguish between headers and trailers here, so we just - // add the metadata to both. - // - // But if the headers are already set, we don't need to overwrite them. - resHeaders = - resHeaders.length === 0 - ? convertToProtoHeaders(error.metadata) - : resHeaders; - resTrailers = convertToProtoHeaders(error.metadata); + setClientErrorResult(result, err); } - resolve( - new ClientResponseResult({ - responseHeaders: resHeaders, - responseTrailers: resTrailers, - payloads: payloads, - error: convertToProtoError(error), - }), - ); + resolve(result); }, { - headers: reqHeader, + headers: getRequestHeaders(compatRequest), onHeader(headers) { - resHeaders = convertToProtoHeaders(headers); + result.responseHeaders = convertToProtoHeaders(headers); }, onTrailer(trailers) { - resTrailers = convertToProtoHeaders(trailers); + result.responseTrailers = convertToProtoHeaders(trailers); }, }, ); + if (cancelTiming.afterCloseSendMs >= 0) { + setTimeout(() => { + clientCancelled = true; + clientCancelFn(); + }, cancelTiming.afterCloseSendMs); + } }); } async function unimplemented( client: ConformanceClient, - req: ClientCompatRequest, + compatRequest: ClientCompatRequest, ) { - const msg = req.requestMessages[0]; - const unReq = new UnimplementedRequest(); - if (!msg.unpackTo(unReq)) { - throw new Error("Could not unpack request message to unary request"); - } - const reqHeader = new Headers(); - appendProtoHeaders(reqHeader, req.requestHeaders); - let error: ConnectError | undefined = undefined; - let resHeaders: ConformanceHeader[] = []; - let resTrailers: ConformanceHeader[] = []; - + const result = new ClientResponseResult(); return new Promise((resolve) => { client.unimplemented( - unReq, + getSingleRequestMessage(compatRequest, UnimplementedRequest), // eslint-disable-next-line @typescript-eslint/no-unused-vars (err, _) => { if (err !== undefined) { - error = ConnectError.from(err); - // We can't distinguish between headers and trailers here, so we just - // add the metadata to both. - // - // But if the headers are already set, we don't need to overwrite them. - resHeaders = - resHeaders.length === 0 - ? convertToProtoHeaders(error.metadata) - : resHeaders; - resTrailers = convertToProtoHeaders(error.metadata); + setClientErrorResult(result, err); } - resolve( - new ClientResponseResult({ - responseHeaders: resHeaders, - responseTrailers: resTrailers, - error: convertToProtoError(error), - }), - ); + resolve(result); }, { - headers: reqHeader, + headers: getRequestHeaders(compatRequest), onHeader(headers) { - resHeaders = convertToProtoHeaders(headers); + result.responseHeaders = convertToProtoHeaders(headers); }, onTrailer(trailers) { - resTrailers = convertToProtoHeaders(trailers); + result.responseTrailers = convertToProtoHeaders(trailers); }, }, ); diff --git a/packages/connect-conformance/src/conformance.ts b/packages/connect-conformance/src/conformance.ts index 4df4ce74a..6854c7300 100644 --- a/packages/connect-conformance/src/conformance.ts +++ b/packages/connect-conformance/src/conformance.ts @@ -29,20 +29,18 @@ import { execFileSync } from "node:child_process"; import { fetch } from "undici"; import { scripts } from "../package.json"; -// Extract conformance runner version from the `generate` script -const [, version] = /conformance:(v\d+\.\d+\.\d+)/.exec(scripts.generate) ?? [ - "?", -]; - -const downloadUrl = `https://github.com/connectrpc/conformance/releases/download/${version}`; - export async function run() { - const { archive, bin } = getArtifactNameForEnv(); - const tempDir = getTempDir(); + // Extract conformance runner version from the `generate` script + const [, version] = /conformance:(v\d+\.\d+\.\d+)/.exec(scripts.generate) ?? [ + "?", + ]; + const { archive, bin } = getArtifactNameForEnv(version); + const tempDir = getTempDir(version); const binPath = joinPath(tempDir, bin); if (!existsSync(binPath)) { + const downloadUrl = `https://github.com/connectrpc/conformance/releases/download/${version}/${archive}`; const archivePath = joinPath(tempDir, archive); - await download(`${downloadUrl}/${archive}`, archivePath); + await download(downloadUrl, archivePath); await extractBin(archivePath, binPath); } execFileSync(binPath, process.argv.slice(2), { @@ -101,15 +99,21 @@ async function extractBin(archivePath: string, binPath: string) { ); } -function getTempDir() { - const tempDir = joinPath(process.env["TEMP"] ?? os.tmpdir(), "conformance"); +function getTempDir(version: string) { + const tempDir = joinPath( + process.env["TEMP"] ?? os.tmpdir(), + `conformance-${version}`, + ); if (!existsSync(tempDir)) { mkdirSync(tempDir, { recursive: true }); } return tempDir; } -function getArtifactNameForEnv(): { archive: string; bin: string } { +function getArtifactNameForEnv(version: string): { + archive: string; + bin: string; +} { let build = ""; let ext = ".tar.gz"; let bin = "connectconformance"; diff --git a/packages/connect-conformance/src/gen/connectrpc/conformance/v1/client_compat_pb.ts b/packages/connect-conformance/src/gen/connectrpc/conformance/v1/client_compat_pb.ts index 52a020e15..cbf06a717 100644 --- a/packages/connect-conformance/src/gen/connectrpc/conformance/v1/client_compat_pb.ts +++ b/packages/connect-conformance/src/gen/connectrpc/conformance/v1/client_compat_pb.ts @@ -131,7 +131,8 @@ export class ClientCompatRequest extends Message { method?: string; /** - * The stream type of `method` (i.e. Unary, Client-Streaming, Server-Streaming, Full Duplex Bidi, or Half Duplex Bidi). + * The stream type of `method` (i.e. unary, client stream, server stream, full-duplex bidi + * stream, or half-duplex bidi stream). * When writing test cases, this is a required field. * * @generated from field: connectrpc.conformance.v1.StreamType stream_type = 13; @@ -161,9 +162,9 @@ export class ClientCompatRequest extends Message { * The actual request messages that will sent to the server. * The type URL for all entries should be equal to the request type of the * method. - * There must be exactly one for unary and server-stream methods but - * can be zero or more for client- and bidi-stream methods. - * For client- and bidi-stream methods, all entries will have the + * There must be exactly one for unary and server stream methods but + * can be zero or more for client and bidi stream methods. + * For client and bidi stream methods, all entries will have the * same type URL. * * @generated from field: repeated google.protobuf.Any request_messages = 16; @@ -180,7 +181,7 @@ export class ClientCompatRequest extends Message { /** * Wait this many milliseconds before sending a request message. - * For client- or bidi-streaming requests, this delay should be + * For client or bidi stream methods, this delay should be * applied before each request sent. * * @generated from field: uint32 request_delay_ms = 18; @@ -274,8 +275,9 @@ export class ClientCompatRequest_Cancel extends Message { /** * Servers should echo back payloads that they received as part of the request. * This field should contain all the payloads the server echoed back. Note that - * There will be zero-to-one for unary and client-stream methods and - * zero-to-many for server- and bidi-stream methods. + * There will be zero-to-one for unary and client stream methods and + * zero-to-many for server and bidi stream methods. * * @generated from field: repeated connectrpc.conformance.v1.ConformancePayload payloads = 2; */ diff --git a/packages/connect-conformance/src/gen/connectrpc/conformance/v1/config_pb.ts b/packages/connect-conformance/src/gen/connectrpc/conformance/v1/config_pb.ts index fdc518fcb..8373d0234 100644 --- a/packages/connect-conformance/src/gen/connectrpc/conformance/v1/config_pb.ts +++ b/packages/connect-conformance/src/gen/connectrpc/conformance/v1/config_pb.ts @@ -107,7 +107,10 @@ export enum Codec { JSON = 2, /** - * @generated from enum value: CODEC_TEXT = 3; + * not used; will be ignored + * + * @generated from enum value: CODEC_TEXT = 3 [deprecated = true]; + * @deprecated */ TEXT = 3, } diff --git a/packages/connect-conformance/src/promise-client.ts b/packages/connect-conformance/src/promise-client.ts index d437b8378..81f5a280d 100644 --- a/packages/connect-conformance/src/promise-client.ts +++ b/packages/connect-conformance/src/promise-client.ts @@ -12,28 +12,28 @@ // See the License for the specific language governing permissions and // limitations under the License. -import { createPromiseClient, ConnectError } from "@connectrpc/connect"; import type { PromiseClient, Transport } from "@connectrpc/connect"; +import { createPromiseClient } from "@connectrpc/connect"; import { ClientCompatRequest, ClientResponseResult, } from "./gen/connectrpc/conformance/v1/client_compat_pb.js"; import { - UnaryRequest, - Header as ConformanceHeader, + BidiStreamRequest, ClientStreamRequest, + IdempotentUnaryRequest, ServerStreamRequest, - ConformancePayload, - BidiStreamRequest, + UnaryRequest, UnimplementedRequest, - IdempotentUnaryRequest, } from "./gen/connectrpc/conformance/v1/service_pb.js"; import { - convertToProtoError, convertToProtoHeaders, - appendProtoHeaders, - wait, getCancelTiming, + getRequestHeaders, + getRequestMessages, + getSingleRequestMessage, + setClientErrorResult, + wait, } from "./protocol.js"; import { ConformanceService } from "./gen/connectrpc/conformance/v1/service_connect.js"; import { createWritableIterable } from "@connectrpc/connect/protocol"; @@ -43,172 +43,115 @@ type ConformanceClient = PromiseClient; export function invokeWithPromiseClient( transport: Transport, - req: ClientCompatRequest, + compatRequest: ClientCompatRequest, ) { const client = createPromiseClient(ConformanceService, transport); - switch (req.method) { + switch (compatRequest.method) { case ConformanceService.methods.unary.name: - return unary(client, req); + return unary(client, compatRequest); case ConformanceService.methods.idempotentUnary.name: - return unary(client, req, true); + return unary(client, compatRequest, true); case ConformanceService.methods.serverStream.name: - return serverStream(client, req); + return serverStream(client, compatRequest); case ConformanceService.methods.clientStream.name: - return clientStream(client, req); + return clientStream(client, compatRequest); case ConformanceService.methods.bidiStream.name: - return bidiStream(client, req); + return bidiStream(client, compatRequest); case ConformanceService.methods.unimplemented.name: - return unimplemented(client, req); + return unimplemented(client, compatRequest); default: - throw new Error(`Unknown method: ${req.method}`); + throw new Error(`Unknown method: ${compatRequest.method}`); } } async function unary( client: ConformanceClient, - req: ClientCompatRequest, + compatRequest: ClientCompatRequest, idempotent: boolean = false, ) { - if (req.requestMessages.length !== 1) { - throw new Error("Unary method requires exactly one request message"); - } - const msg = req.requestMessages[0]; - const uReq = idempotent ? new IdempotentUnaryRequest() : new UnaryRequest(); - if (!msg.unpackTo(uReq)) { - throw new Error("Could not unpack request message to unary request"); - } - const reqHeader = new Headers(); - appendProtoHeaders(reqHeader, req.requestHeaders); - let error: ConnectError | undefined = undefined; - let resHeaders: ConformanceHeader[] = []; - let resTrailers: ConformanceHeader[] = []; - const payloads: ConformancePayload[] = []; + await wait(compatRequest.requestDelayMs); + const result = new ClientResponseResult(); try { - let call = client.unary; - if (idempotent) { - call = client.idempotentUnary; + const controller = new AbortController(); + const { afterCloseSendMs } = getCancelTiming(compatRequest); + if (afterCloseSendMs >= 0) { + void wait(afterCloseSendMs).then(() => controller.abort()); } - await wait(req.requestDelayMs); - const uRes = await call(uReq, { - headers: reqHeader, + const request = getSingleRequestMessage( + compatRequest, + idempotent ? IdempotentUnaryRequest : UnaryRequest, + ); + const call = idempotent ? client.idempotentUnary : client.unary; + const response = await call(request, { + headers: getRequestHeaders(compatRequest), + signal: controller.signal, onHeader(headers) { - resHeaders = convertToProtoHeaders(headers); + result.responseHeaders = convertToProtoHeaders(headers); }, onTrailer(trailers) { - resTrailers = convertToProtoHeaders(trailers); + result.responseTrailers = convertToProtoHeaders(trailers); }, }); - payloads.push(uRes.payload!); + result.payloads.push(response.payload!); } catch (e) { - error = ConnectError.from(e); - // We can't distinguish between headers and trailers here, so we just - // add the metadata to both. - // - // But if the headers are already set, we don't need to overwrite them. - resHeaders = - resHeaders.length === 0 - ? convertToProtoHeaders(error.metadata) - : resHeaders; - resTrailers = convertToProtoHeaders(error.metadata); + setClientErrorResult(result, e); } - return new ClientResponseResult({ - payloads: payloads, - responseHeaders: resHeaders, - responseTrailers: resTrailers, - error: convertToProtoError(error), - }); + return result; } async function serverStream( client: ConformanceClient, - req: ClientCompatRequest, + compatRequest: ClientCompatRequest, ) { - if (req.requestMessages.length !== 1) { - throw new Error("ServerStream method requires exactly one request message"); - } - const msg = req.requestMessages[0]; - const uReq = new ServerStreamRequest(); - if (!msg.unpackTo(uReq)) { - throw new Error( - "Could not unpack request message to server stream request", - ); - } - const reqHeader = new Headers(); - appendProtoHeaders(reqHeader, req.requestHeaders); - let error: ConnectError | undefined = undefined; - let resHeaders: ConformanceHeader[] = []; - let resTrailers: ConformanceHeader[] = []; - const payloads: ConformancePayload[] = []; - const cancelTiming = getCancelTiming(req); + const cancelTiming = getCancelTiming(compatRequest); const controller = new AbortController(); + await wait(compatRequest.requestDelayMs); + const result = new ClientResponseResult(); + const request = getSingleRequestMessage(compatRequest, ServerStreamRequest); try { - await wait(req.requestDelayMs); - const res = client.serverStream(uReq, { - headers: reqHeader, + const res = client.serverStream(request, { + headers: getRequestHeaders(compatRequest), signal: controller.signal, onHeader(headers) { - resHeaders = convertToProtoHeaders(headers); + result.responseHeaders = convertToProtoHeaders(headers); }, onTrailer(trailers) { - resTrailers = convertToProtoHeaders(trailers); + result.responseTrailers = convertToProtoHeaders(trailers); }, }); - if (cancelTiming.afterNumResponses == 0) { + if (cancelTiming.afterCloseSendMs >= 0) { + await wait(cancelTiming.afterCloseSendMs); controller.abort(); } - let count = 0; for await (const msg of res) { - payloads.push(msg.payload!); - count++; - if (count === cancelTiming.afterNumResponses) { + result.payloads.push(msg.payload!); + if (result.payloads.length === cancelTiming.afterNumResponses) { controller.abort(); } } } catch (e) { - error = ConnectError.from(e); - // We can't distinguish between headers and trailers here, so we just - // add the metadata to both. - // - // But if the headers are already set, we don't need to overwrite them. - resHeaders = - resHeaders.length === 0 - ? convertToProtoHeaders(error.metadata) - : resHeaders; - resTrailers = convertToProtoHeaders(error.metadata); + setClientErrorResult(result, e); } - return new ClientResponseResult({ - responseHeaders: resHeaders, - responseTrailers: resTrailers, - payloads: payloads, - error: convertToProtoError(error), - }); + return result; } async function clientStream( client: ConformanceClient, - req: ClientCompatRequest, + compatRequest: ClientCompatRequest, ) { - const reqHeaders = new Headers(); - appendProtoHeaders(reqHeaders, req.requestHeaders); - let error: ConnectError | undefined = undefined; - let resHeaders: ConformanceHeader[] = []; - let resTrailers: ConformanceHeader[] = []; - const payloads: ConformancePayload[] = []; - const cancelTiming = getCancelTiming(req); + const cancelTiming = getCancelTiming(compatRequest); const controller = new AbortController(); + const result = new ClientResponseResult(); try { - const csRes = await client.clientStream( + const response = await client.clientStream( (async function* () { - for (const msg of req.requestMessages) { - const csReq = new ClientStreamRequest(); - if (!msg.unpackTo(csReq)) { - throw new Error( - "Could not unpack request message to client stream request", - ); - } - await wait(req.requestDelayMs); - yield csReq; + for (const msg of getRequestMessages( + compatRequest, + ClientStreamRequest, + )) { + await wait(compatRequest.requestDelayMs); + yield msg; } if (cancelTiming.beforeCloseSend !== undefined) { controller.abort(); @@ -220,87 +163,63 @@ async function clientStream( })(), { signal: controller.signal, - headers: reqHeaders, + headers: getRequestHeaders(compatRequest), onHeader(headers) { - resHeaders = convertToProtoHeaders(headers); + result.responseHeaders = convertToProtoHeaders(headers); }, onTrailer(trailers) { - resTrailers = convertToProtoHeaders(trailers); + result.responseTrailers = convertToProtoHeaders(trailers); }, }, ); - payloads.push(csRes.payload!); + result.payloads.push(response.payload!); } catch (e) { - error = ConnectError.from(e); - // We can't distinguish between headers and trailers here, so we just - // add the metadata to both. - // - // But if the headers are already set, we don't need to overwrite them. - resHeaders = - resHeaders.length === 0 - ? convertToProtoHeaders(error.metadata) - : resHeaders; - resTrailers = convertToProtoHeaders(error.metadata); + setClientErrorResult(result, e); } - return new ClientResponseResult({ - responseHeaders: resHeaders, - responseTrailers: resTrailers, - payloads: payloads, - error: convertToProtoError(error), - }); + return result; } -async function bidiStream(client: ConformanceClient, req: ClientCompatRequest) { - const reqHeaders = new Headers(); - appendProtoHeaders(reqHeaders, req.requestHeaders); - let error: ConnectError | undefined = undefined; - let resHeaders: ConformanceHeader[] = []; - let resTrailers: ConformanceHeader[] = []; - const payloads: ConformancePayload[] = []; - const cancelTiming = getCancelTiming(req); +async function bidiStream( + client: ConformanceClient, + compatRequest: ClientCompatRequest, +) { + const cancelTiming = getCancelTiming(compatRequest); const controller = new AbortController(); - let recvCount = 0; + const result = new ClientResponseResult(); try { - const reqIt = createWritableIterable(); - const sRes = client.bidiStream(reqIt, { + const request = createWritableIterable(); + const responses = client.bidiStream(request, { signal: controller.signal, - headers: reqHeaders, + headers: getRequestHeaders(compatRequest), onHeader(headers) { - resHeaders = convertToProtoHeaders(headers); + result.responseHeaders = convertToProtoHeaders(headers); }, onTrailer(trailers) { - resTrailers = convertToProtoHeaders(trailers); + result.responseTrailers = convertToProtoHeaders(trailers); }, }); - const resIt = sRes[Symbol.asyncIterator](); - for (const msg of req.requestMessages) { - const bdReq = new BidiStreamRequest(); - if (!msg.unpackTo(bdReq)) { - throw new Error( - "Could not unpack request message to client stream request", - ); - } - await wait(req.requestDelayMs); - await reqIt.write(bdReq); - if (req.streamType === StreamType.FULL_DUPLEX_BIDI_STREAM) { + const responseIterator = responses[Symbol.asyncIterator](); + for (const msg of getRequestMessages(compatRequest, BidiStreamRequest)) { + await wait(compatRequest.requestDelayMs); + await request.write(msg); + if (compatRequest.streamType === StreamType.FULL_DUPLEX_BIDI_STREAM) { if (cancelTiming.afterNumResponses === 0) { controller.abort(); } - const next = await resIt.next(); + const next = await responseIterator.next(); if (next.done === true) { continue; } - recvCount++; - if (cancelTiming.afterNumResponses === recvCount) { + result.payloads.push(next.value.payload!); + if (result.payloads.length === cancelTiming.afterNumResponses) { controller.abort(); } - payloads.push(next.value.payload!); } } if (cancelTiming.beforeCloseSend !== undefined) { controller.abort(); } - reqIt.close(); + request.close(); if (cancelTiming.afterCloseSendMs >= 0) { setTimeout(() => { controller.abort(); @@ -311,75 +230,39 @@ async function bidiStream(client: ConformanceClient, req: ClientCompatRequest) { } // Drain the response iterator for (;;) { - const next = await resIt.next(); + const next = await responseIterator.next(); if (next.done === true) { break; } - recvCount++; - if (cancelTiming.afterNumResponses === recvCount) { + result.payloads.push(next.value.payload!); + if (result.payloads.length === cancelTiming.afterNumResponses) { controller.abort(); } - payloads.push(next.value.payload!); } } catch (e) { - error = ConnectError.from(e); - // We can't distinguish between headers and trailers here, so we just - // add the metadata to both. - // - // But if the headers are already set, we don't need to overwrite them. - resHeaders = - resHeaders.length === 0 - ? convertToProtoHeaders(error.metadata) - : resHeaders; - resTrailers = convertToProtoHeaders(error.metadata); + setClientErrorResult(result, e); } - return new ClientResponseResult({ - responseHeaders: resHeaders, - responseTrailers: resTrailers, - payloads: payloads, - error: convertToProtoError(error), - }); + return result; } async function unimplemented( client: ConformanceClient, - req: ClientCompatRequest, + compatRequest: ClientCompatRequest, ) { - const msg = req.requestMessages[0]; - const unReq = new UnimplementedRequest(); - if (!msg.unpackTo(unReq)) { - throw new Error("Could not unpack request message to unary request"); - } - const reqHeader = new Headers(); - appendProtoHeaders(reqHeader, req.requestHeaders); - let error: ConnectError | undefined = undefined; - let resHeaders: ConformanceHeader[] = []; - let resTrailers: ConformanceHeader[] = []; + const request = getSingleRequestMessage(compatRequest, UnimplementedRequest); + const result = new ClientResponseResult(); try { - await client.unimplemented(unReq, { - headers: reqHeader, + await client.unimplemented(request, { + headers: getRequestHeaders(compatRequest), onHeader(headers) { - resHeaders = convertToProtoHeaders(headers); + result.responseHeaders = convertToProtoHeaders(headers); }, onTrailer(trailers) { - resTrailers = convertToProtoHeaders(trailers); + result.responseTrailers = convertToProtoHeaders(trailers); }, }); } catch (e) { - error = ConnectError.from(e); - // We can't distinguish between headers and trailers here, so we just - // add the metadata to both. - // - // But if the headers are already set, we don't need to overwrite them. - resHeaders = - resHeaders.length === 0 - ? convertToProtoHeaders(error.metadata) - : resHeaders; - resTrailers = convertToProtoHeaders(error.metadata); + setClientErrorResult(result, e); } - return new ClientResponseResult({ - responseHeaders: resHeaders, - responseTrailers: resTrailers, - error: convertToProtoError(error), - }); + return result; } diff --git a/packages/connect-conformance/src/protocol.ts b/packages/connect-conformance/src/protocol.ts index 92e479943..64cec6eac 100644 --- a/packages/connect-conformance/src/protocol.ts +++ b/packages/connect-conformance/src/protocol.ts @@ -12,39 +12,120 @@ // See the License for the specific language governing permissions and // limitations under the License. -import { ConnectError, Code } from "@connectrpc/connect"; -import { createRegistry, Any, Message } from "@bufbuild/protobuf"; +import { Code, ConnectError } from "@connectrpc/connect"; import { + Any, + createRegistry, + Message, + type MessageType, +} from "@bufbuild/protobuf"; +import { + ConformancePayload_RequestInfo, Error as ConformanceError, Header as ConformanceHeader, - ConformancePayload_RequestInfo, } from "./gen/connectrpc/conformance/v1/service_pb.js"; import { Code as ConformanceCode } from "./gen/connectrpc/conformance/v1/config_pb.js"; -import { ClientCompatRequest } from "./gen/connectrpc/conformance/v1/client_compat_pb.js"; +import { + ClientCompatRequest, + ClientResponseResult, +} from "./gen/connectrpc/conformance/v1/client_compat_pb.js"; -const detailsRegitry = createRegistry(ConformancePayload_RequestInfo); +const detailsRegistry = createRegistry(ConformancePayload_RequestInfo); -export function getCancelTiming(req: ClientCompatRequest) { +export function getCancelTiming(compatRequest: ClientCompatRequest) { const def = { beforeCloseSend: undefined, afterCloseSendMs: -1, afterNumResponses: -1, }; - switch (req.cancel?.cancelTiming.case) { + switch (compatRequest.cancel?.cancelTiming.case) { case "beforeCloseSend": return { ...def, beforeCloseSend: {} }; case "afterCloseSendMs": return { ...def, - afterCloseSendMs: req.cancel.cancelTiming.value, + afterCloseSendMs: compatRequest.cancel.cancelTiming.value, }; case "afterNumResponses": - return { ...def, afterNumResponses: req.cancel.cancelTiming.value }; + return { + ...def, + afterNumResponses: compatRequest.cancel.cancelTiming.value, + }; case undefined: return def; } } +/** + * Get the headers for a conformance client request. + */ +export function getRequestHeaders( + compatRequest: ClientCompatRequest, +): HeadersInit { + const headers = new Headers(); + appendProtoHeaders(headers, compatRequest.requestHeaders); + return headers; +} + +/** + * Get a single request message for a conformance client call. + */ +export function getSingleRequestMessage>( + compatRequest: ClientCompatRequest, + type: MessageType, +): T { + if (compatRequest.requestMessages.length !== 1) { + throw new Error( + `Expected exactly one request_message in ClientCompatRequest, found ${compatRequest.requestMessages.length}`, + ); + } + const any = compatRequest.requestMessages[0]; + const target = new type(); + if (!any.unpackTo(target)) { + throw new Error( + `Could not unpack request_message from ClientCompatRequest into ${type.typeName}`, + ); + } + return target; +} + +/** + * Get a request messages for a conformance client call. + */ +export function* getRequestMessages>( + compatRequest: ClientCompatRequest, + type: MessageType, +): Iterable { + for (const any of compatRequest.requestMessages) { + const target = new type(); + if (!any.unpackTo(target)) { + throw new Error( + `Could not unpack request_message from ClientCompatRequest into ${type.typeName}`, + ); + } + yield target; + } +} + +/** + * Record an error from a failed conformance client call in the result message. + */ +export function setClientErrorResult( + result: ClientResponseResult, + error: unknown, +): void { + const connectError = ConnectError.from(error); + result.error = convertToProtoError(connectError); + // We can't distinguish between headers and trailers here, so we just + // add the metadata to both. + // + // But if the headers are already set, we don't need to overwrite them. + if (result.responseHeaders.length === 0) { + result.responseHeaders = convertToProtoHeaders(connectError.metadata); + } + result.responseTrailers = convertToProtoHeaders(connectError.metadata); +} + export function connectErrorFromProto(err: ConformanceError) { // The ConnectError constructor accepts messages for details. // The conformance error details are the raw google.protobuf.Any messages. @@ -54,7 +135,7 @@ export function connectErrorFromProto(err: ConformanceError) { err.code as unknown as Code, undefined, err.details.map((d) => { - const m = d.unpack(detailsRegitry); + const m = d.unpack(detailsRegistry); if (m === undefined) { throw new Error(`Cannot unpack ${d.typeUrl}`); } diff --git a/packages/connect-web-bench/README.md b/packages/connect-web-bench/README.md index 2b5ddebfb..e340f0cd1 100644 --- a/packages/connect-web-bench/README.md +++ b/packages/connect-web-bench/README.md @@ -15,10 +15,10 @@ usually do. We repeat this for an increasing number of RPCs. | code generator | RPCs | bundle size | minified | compressed | | -------------- | ---: | ----------: | --------: | ---------: | -| Connect-ES | 1 | 152,643 b | 66,478 b | 16,380 b | -| Connect-ES | 4 | 168,085 b | 72,418 b | 16,852 b | -| Connect-ES | 8 | 193,398 b | 82,142 b | 17,475 b | -| Connect-ES | 16 | 227,037 b | 96,408 b | 18,237 b | +| Connect-ES | 1 | 152,759 b | 66,533 b | 16,438 b | +| Connect-ES | 4 | 168,201 b | 72,473 b | 16,896 b | +| Connect-ES | 8 | 193,514 b | 82,198 b | 17,477 b | +| Connect-ES | 16 | 227,153 b | 96,461 b | 18,226 b | | gRPC-Web | 1 | 876,563 b | 548,495 b | 52,300 b | | gRPC-Web | 4 | 928,964 b | 580,477 b | 54,673 b | | gRPC-Web | 8 | 1,004,833 b | 628,223 b | 57,118 b | diff --git a/packages/connect-web-bench/chart.svg b/packages/connect-web-bench/chart.svg index 287b9d8aa..7b6c13018 100644 --- a/packages/connect-web-bench/chart.svg +++ b/packages/connect-web-bench/chart.svg @@ -42,13 +42,13 @@ 0 KiB - + Connect-ES -Connect-ES 16 KiB for 1 RPCs -Connect-ES 16.46 KiB for 4 RPCs -Connect-ES 17.07 KiB for 8 RPCs -Connect-ES 17.81 KiB for 16 RPCs +Connect-ES 16.05 KiB for 1 RPCs +Connect-ES 16.5 KiB for 4 RPCs +Connect-ES 17.07 KiB for 8 RPCs +Connect-ES 17.8 KiB for 16 RPCs diff --git a/packages/connect-web/conformance/known-failing-callback-client.txt b/packages/connect-web/conformance/known-failing-callback-client.txt deleted file mode 100644 index 032ad2d5d..000000000 --- a/packages/connect-web/conformance/known-failing-callback-client.txt +++ /dev/null @@ -1,4 +0,0 @@ -# The callback client does not pass a cancelled error to the end-of-stream callback for cancellations. This is intentional behavior as it is the user's -# responsibility to handle this on the client side. -**/server-stream/cancel-after-zero-responses -**/server-stream/cancel-after-responses diff --git a/packages/connect-web/package.json b/packages/connect-web/package.json index 8d49a71be..c92e5f07b 100644 --- a/packages/connect-web/package.json +++ b/packages/connect-web/package.json @@ -14,13 +14,13 @@ "build:esm": "tsc --project tsconfig.build.json --outDir ./dist/esm --declaration --declarationDir ./dist/esm", "conformance:safari": "npm run conformance:safari:promise && npm run conformance:client:safari:callback", "conformance:safari:promise": "connectconformance --mode client --conf conformance/conformance-web.yaml -- tsx conformance/client.ts --browser safari", - "conformance:safari:callback": "connectconformance --mode client --conf conformance/conformance-web.yaml --known-failing @conformance/known-failing-callback-client.txt -- tsx conformance/client.ts --browser safari --useCallbackClient", + "conformance:safari:callback": "connectconformance --mode client --conf conformance/conformance-web.yaml -- tsx conformance/client.ts --browser safari --useCallbackClient", "conformance:chrome:promise": "connectconformance --mode client --conf conformance/conformance-web.yaml -- tsx conformance/client.ts --browser chrome", - "conformance:chrome:callback": "connectconformance --mode client --conf conformance/conformance-web.yaml --known-failing @conformance/known-failing-callback-client.txt -- tsx conformance/client.ts --browser chrome --useCallbackClient", + "conformance:chrome:callback": "connectconformance --mode client --conf conformance/conformance-web.yaml -- tsx conformance/client.ts --browser chrome --useCallbackClient", "conformance:firefox:promise": "connectconformance --mode client --conf conformance/conformance-web.yaml -- tsx conformance/client.ts --browser firefox", - "conformance:firefox:callback": "connectconformance --mode client --conf conformance/conformance-web.yaml --known-failing @conformance/known-failing-callback-client.txt -- tsx conformance/client.ts --browser firefox --useCallbackClient", + "conformance:firefox:callback": "connectconformance --mode client --conf conformance/conformance-web.yaml -- tsx conformance/client.ts --browser firefox --useCallbackClient", "conformance:node:promise": "connectconformance --mode client --conf conformance/conformance-web-node.yaml -- tsx conformance/client.ts --browser node", - "conformance:node:callback": "connectconformance --mode client --conf conformance/conformance-web-node.yaml --known-failing @conformance/known-failing-callback-client.txt -- tsx conformance/client.ts --browser node --useCallbackClient", + "conformance:node:callback": "connectconformance --mode client --conf conformance/conformance-web-node.yaml -- tsx conformance/client.ts --browser node --useCallbackClient", "test": "jasmine --config=jasmine.json", "generate": "buf generate --template browserstack/buf.gen.yaml", "postgenerate": "license-header browserstack/gen", diff --git a/packages/connect-web/src/connect-transport.ts b/packages/connect-web/src/connect-transport.ts index 3e27ce6ec..2d097cac9 100644 --- a/packages/connect-web/src/connect-transport.ts +++ b/packages/connect-web/src/connect-transport.ts @@ -266,6 +266,7 @@ export function createConnectTransport( body: ReadableStream, trailerTarget: Headers, header: Headers, + signal: AbortSignal, ) { const reader = createEnvelopeReadableStream(body).getReader(); let endStreamReceived = false; @@ -298,6 +299,16 @@ export function createConnectTransport( } yield parse(data); } + // Node wil not throw an AbortError on `read` if the + // signal is aborted before `getReader` is called. + // As a work around we check at the end and throw. + // + // Ref: https://github.com/nodejs/undici/issues/1940 + if ("throwIfAborted" in signal) { + // We assume that implementations without `throwIfAborted` (old + // browsers) do honor aborted signals on `read`. + signal.throwIfAborted(); + } if (!endStreamReceived) { throw "missing EndStreamResponse"; } @@ -369,7 +380,12 @@ export function createConnectTransport( ...req, header: fRes.headers, trailer, - message: parseResponseBody(fRes.body, trailer, fRes.headers), + message: parseResponseBody( + fRes.body, + trailer, + fRes.headers, + req.signal, + ), }; return res; }, diff --git a/packages/connect-web/src/grpc-web-transport.ts b/packages/connect-web/src/grpc-web-transport.ts index 350961603..cfec08940 100644 --- a/packages/connect-web/src/grpc-web-transport.ts +++ b/packages/connect-web/src/grpc-web-transport.ts @@ -275,6 +275,7 @@ export function createGrpcWebTransport( trailerTarget: Headers, header: Headers, headerError: ConnectError | undefined, + signal: AbortSignal, ) { const reader = createEnvelopeReadableStream(body).getReader(); if (foundStatus) { @@ -314,6 +315,16 @@ export function createGrpcWebTransport( yield parse(data); continue; } + // Node wil not throw an AbortError on `read` if the + // signal is aborted before `getReader` is called. + // As a work around we check at the end and throw. + // + // Ref: https://github.com/nodejs/undici/issues/1940 + if ("throwIfAborted" in signal) { + // We assume that implementations without `throwIfAborted` (old + // browsers) do honor aborted signals on `read`. + signal.throwIfAborted(); + } if (!trailerReceived) { if (headerError) { throw headerError; @@ -388,6 +399,7 @@ export function createGrpcWebTransport( trailer, fRes.headers, headerError, + req.signal, ), }; return res;