diff --git a/packages/connect-node/src/transport.spec.ts b/packages/connect-node/src/transport.spec.ts new file mode 100644 index 000000000..3b2a77aea --- /dev/null +++ b/packages/connect-node/src/transport.spec.ts @@ -0,0 +1,113 @@ +// Copyright 2021-2024 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* eslint-disable @typescript-eslint/no-invalid-void-type */ +import { Int32Value, StringValue, MethodKind } from "@bufbuild/protobuf"; +import { useNodeServer } from "./use-node-server-helper.spec.js"; +import * as http2 from "node:http2"; +import { connectNodeAdapter } from "./connect-node-adapter.js"; +import { createPromiseClient } from "@connectrpc/connect"; +import type { Transport } from "@connectrpc/connect"; +import { createTransport as createGrpcTransport } from "@connectrpc/connect/protocol-grpc"; +import { createTransport as createGrpcWebTransport } from "@connectrpc/connect/protocol-grpc-web"; +import { validateNodeTransportOptions } from "./node-transport-options.js"; + +const TestService = { + typeName: "TestService", + methods: { + server: { + name: "Server", + I: Int32Value, + O: StringValue, + kind: MethodKind.ServerStreaming, + }, + }, +} as const; + +describe("Calls should fail with code internal on RST_STREAM no_error before trailers are received", function () { + let firstMessage: ReturnType>; + let rstStream: ReturnType>; + beforeEach(function () { + firstMessage = createCompleter(); + rstStream = createCompleter(); + }); + const adaptor = connectNodeAdapter({ + routes({ rpc }) { + rpc(TestService, TestService.methods.server, async function* () { + yield { value: "foo" }; + // Notify to send rst stream after a message. + firstMessage.resolve(); + // Wait for rst stream to be sent before returning. + // If we return early it will create a race. + await rstStream.promise; + }); + }, + }); + const server = useNodeServer(() => + http2.createServer((request, response) => { + adaptor(request, response); + firstMessage.promise + .then(() => { + response.stream.close(0, () => rstStream.resolve()); + }) + .catch(fail); + }), + ); + async function testRstStream(transport: Transport) { + const client = createPromiseClient(TestService, transport); + const it = client.server({ value: 1 })[Symbol.asyncIterator](); + const first = await it.next(); + expect(first.done).toBeFalse(); + expect(first.value).toEqual(new StringValue({ value: "foo" })); + await expectAsync(it.next()).toBeRejected(); + } + it("for gRPC Transport", async function () { + await testRstStream( + createGrpcTransport({ + ...validateNodeTransportOptions({ + httpVersion: "2", + baseUrl: server.getUrl(), + }), + baseUrl: server.getUrl(), + httpClient: server.getClient(), + }), + ); + }); + it("for gRPC Transport", async function () { + await testRstStream( + createGrpcWebTransport({ + ...validateNodeTransportOptions({ + httpVersion: "2", + baseUrl: server.getUrl(), + }), + baseUrl: server.getUrl(), + httpClient: server.getClient(), + }), + ); + }); +}); + +function createCompleter() { + let resolve: (_: T | PromiseLike) => void; + let reject: (reason?: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { + promise, + resolve: resolve!, + reject: reject!, + }; +} diff --git a/packages/connect-node/src/use-node-server-helper.spec.ts b/packages/connect-node/src/use-node-server-helper.spec.ts index 3330a2ad8..8e9e2c3f3 100644 --- a/packages/connect-node/src/use-node-server-helper.spec.ts +++ b/packages/connect-node/src/use-node-server-helper.spec.ts @@ -99,7 +99,7 @@ export function useNodeServer( client = createNodeHttpClient({ httpVersion: "2", sessionProvider: (authority) => { - if (authority !== this.getUrl()) { + if (new URL(this.getUrl()).origin != new URL(authority).origin) { throw new Error( "client from useNodeServer() can only be used for requests against the server URL", ); diff --git a/packages/connect-web-bench/README.md b/packages/connect-web-bench/README.md index ecc96388d..311c4113e 100644 --- a/packages/connect-web-bench/README.md +++ b/packages/connect-web-bench/README.md @@ -15,10 +15,10 @@ usually do. We repeat this for an increasing number of RPCs. | code generator | RPCs | bundle size | minified | compressed | | -------------- | ---: | ----------: | --------: | ---------: | -| Connect-ES | 1 | 152,703 b | 66,494 b | 16,400 b | -| Connect-ES | 4 | 168,145 b | 72,434 b | 16,865 b | -| Connect-ES | 8 | 193,458 b | 82,159 b | 17,484 b | -| Connect-ES | 16 | 227,097 b | 96,422 b | 18,224 b | +| Connect-ES | 1 | 152,706 b | 66,483 b | 16,386 b | +| Connect-ES | 4 | 168,148 b | 72,422 b | 16,890 b | +| Connect-ES | 8 | 193,461 b | 82,145 b | 17,461 b | +| Connect-ES | 16 | 227,100 b | 96,411 b | 18,214 b | | gRPC-Web | 1 | 876,563 b | 548,495 b | 52,300 b | | gRPC-Web | 4 | 928,964 b | 580,477 b | 54,673 b | | gRPC-Web | 8 | 1,004,833 b | 628,223 b | 57,118 b | diff --git a/packages/connect-web-bench/chart.svg b/packages/connect-web-bench/chart.svg index 4e6f0cdd4..390216865 100644 --- a/packages/connect-web-bench/chart.svg +++ b/packages/connect-web-bench/chart.svg @@ -42,13 +42,13 @@ 0 KiB - + Connect-ES -Connect-ES 16.02 KiB for 1 RPCs -Connect-ES 16.47 KiB for 4 RPCs -Connect-ES 17.07 KiB for 8 RPCs -Connect-ES 17.8 KiB for 16 RPCs +Connect-ES 16 KiB for 1 RPCs +Connect-ES 16.49 KiB for 4 RPCs +Connect-ES 17.05 KiB for 8 RPCs +Connect-ES 17.79 KiB for 16 RPCs diff --git a/packages/connect/src/protocol-connect/validate-response.ts b/packages/connect/src/protocol-connect/validate-response.ts index 8e878d3f1..c556c66be 100644 --- a/packages/connect/src/protocol-connect/validate-response.ts +++ b/packages/connect/src/protocol-connect/validate-response.ts @@ -17,7 +17,11 @@ import { Code } from "../code.js"; import { codeFromHttpStatus } from "./http-status.js"; import { ConnectError } from "../connect-error.js"; import { parseContentType } from "./content-type.js"; -import { headerStreamEncoding, headerUnaryEncoding } from "./headers.js"; +import { + headerContentType, + headerStreamEncoding, + headerUnaryEncoding, +} from "./headers.js"; import type { Compression } from "../protocol/compression.js"; /** @@ -38,7 +42,7 @@ export function validateResponse( ): | { isUnaryError: false; unaryError?: undefined } | { isUnaryError: true; unaryError: ConnectError } { - const mimeType = headers.get("Content-Type"); + const mimeType = headers.get(headerContentType); const parsedType = parseContentType(mimeType); if (status !== 200) { const errorFromStatus = new ConnectError( diff --git a/packages/connect/src/protocol-grpc/validate-response.spec.ts b/packages/connect/src/protocol-grpc/validate-response.spec.ts index d46ca0497..8bde24133 100644 --- a/packages/connect/src/protocol-grpc/validate-response.spec.ts +++ b/packages/connect/src/protocol-grpc/validate-response.spec.ts @@ -38,17 +38,28 @@ describe("gRPC validateResponse()", function () { } it("should honor grpc-status field", function () { - const e = v(200, { "grpc-status": "8" }); + const e = v(200, { + "grpc-status": "8", + "content-type": "application/grpc+proto", + }); expect(e?.message).toBe("[resource_exhausted]"); }); it("should honor grpc-message field", function () { - const e = v(200, { "grpc-status": "8", "grpc-message": "out of space" }); + const e = v(200, { + "grpc-status": "8", + "grpc-message": "out of space", + "content-type": "application/grpc+proto", + }); expect(e?.message).toBe("[resource_exhausted] out of space"); }); it("should include headers as error metadata with grpc-status", function () { - const e = v(200, { "grpc-status": "8", Foo: "Bar" }); + const e = v(200, { + "grpc-status": "8", + Foo: "Bar", + "content-type": "application/grpc+proto", + }); expect(e?.metadata.get("Foo")).toBe("Bar"); }); @@ -80,7 +91,10 @@ describe("gRPC validateResponse()", function () { it("should return foundStatus for grpc-status OK", function () { const { foundStatus } = validateResponse( 200, - new Headers({ "grpc-status": "0" }), + new Headers({ + "grpc-status": "0", + "content-type": "application/grpc+proto", + }), ); expect(foundStatus).toBeTrue(); }); diff --git a/packages/connect/src/protocol-grpc/validate-response.ts b/packages/connect/src/protocol-grpc/validate-response.ts index 2245f43c1..cc574dd89 100644 --- a/packages/connect/src/protocol-grpc/validate-response.ts +++ b/packages/connect/src/protocol-grpc/validate-response.ts @@ -16,8 +16,13 @@ import { codeFromHttpStatus } from "./http-status.js"; import { ConnectError } from "../connect-error.js"; import { findTrailerError } from "./trailer-status.js"; import { Code } from "../code.js"; -import { headerEncoding, headerGrpcStatus } from "./headers.js"; +import { + headerContentType, + headerEncoding, + headerGrpcStatus, +} from "./headers.js"; import type { Compression } from "../protocol/compression.js"; +import { parseContentType } from "./content-type.js"; /** * Validates response status and header for the gRPC protocol. @@ -41,6 +46,14 @@ export function validateResponse( headers, ); } + const mimeType = headers.get(headerContentType); + const parsedType = parseContentType(mimeType); + if (parsedType == undefined) { + throw new ConnectError( + `unsupported content type ${mimeType}`, + Code.Unknown, + ); + } return { foundStatus: headers.has(headerGrpcStatus), headerError: findTrailerError(headers), diff --git a/packages/connect/src/protocol-grpc/validate-trailer.ts b/packages/connect/src/protocol-grpc/validate-trailer.ts index 59fc78af2..d045ac3a8 100644 --- a/packages/connect/src/protocol-grpc/validate-trailer.ts +++ b/packages/connect/src/protocol-grpc/validate-trailer.ts @@ -12,11 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. +import { Code } from "../code.js"; +import { ConnectError } from "../connect-error.js"; +import { headerGrpcStatus } from "./headers.js"; import { findTrailerError } from "./trailer-status.js"; /** * Validates a trailer for the gRPC and the gRPC-web protocol. - * Throws a ConnectError if the trailer contains an error status. + * + * If the trailer contains an error status, a ConnectError is + * thrown. It will include trailer and header in the error's + * "metadata" property. + * + * Throws a ConnectError with code "internal" if neither the trailer + * nor the header contain the Grpc-Status field. * * @private Internal code, does not follow semantic versioning. */ @@ -28,4 +37,7 @@ export function validateTrailer(trailer: Headers, header: Headers): void { }); throw err; } + if (!header.has(headerGrpcStatus) && !trailer.has(headerGrpcStatus)) { + throw new ConnectError("protocol error: missing status", Code.Internal); + } }