From 26dc214561ffe5cb968fc771a074b3f42331b341 Mon Sep 17 00:00:00 2001 From: Nathan Brown Date: Tue, 18 Jul 2023 21:25:09 -0700 Subject: [PATCH] Correctly handle split frame headers for WebSocket handleMessage --- CHANGELOG.md | 1 + src/amqp-websocket-client.ts | 4 ++-- test-browser/websocket.ts | 8 +++----- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 001b451..535541c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Pass the correct array buffer to dataview when reading framesize (related to [#55](https://github.com/cloudamqp/amqp-client.js/issues/55)) - Raise `AMQPError` when `channelMax` is reached (related to [#43](https://github.com/cloudamqp/amqp-client.js/issues/43)) - Add `Channel#onerror` callback (related to [#40](https://github.com/cloudamqp/amqp-client.js/issues/40)) +- Correctly handle frame headers split across reads in the WebSocket client. (related to [#55](https://github.com/cloudamqp/amqp-client.js/issues/55)) ### Changed diff --git a/src/amqp-websocket-client.ts b/src/amqp-websocket-client.ts index f2c1424..d8431d9 100644 --- a/src/amqp-websocket-client.ts +++ b/src/amqp-websocket-client.ts @@ -97,8 +97,8 @@ export class AMQPWebSocketClient extends AMQPBaseClient { if (this.frameSize === 0) { // first 7 bytes of a frame was split over two reads, this reads the second part if (this.framePos !== 0) { - const len = buf.byteLength - bufPos - this.frameBuffer.set(new Uint8Array(buf, bufPos), this.framePos) + const len = 7 - this.framePos + this.frameBuffer.set(new Uint8Array(buf, bufPos, len), this.framePos) this.frameSize = new DataView(this.frameBuffer.buffer).getInt32(bufPos + 3) + 8 this.framePos += len bufPos += len diff --git a/test-browser/websocket.ts b/test-browser/websocket.ts index c60e38e..faa52b9 100644 --- a/test-browser/websocket.ts +++ b/test-browser/websocket.ts @@ -538,8 +538,7 @@ test("can't set too small frameMax", () => { expect(() => getNewClient({ frameMax: 16 })).toThrow() }) -// TODO: throws unhandled exception, stopping the rest of the test -test.skip("can handle frames split over socket reads", async () => { +test("can handle frames split over socket reads", async () => { const amqp = getNewClient({ frameMax: 4*1024 }) const conn = await amqp.connect() const ch = await conn.channel() @@ -553,7 +552,7 @@ test.skip("can handle frames split over socket reads", async () => { const consumer = await q.subscribe({ noAck: true }, () => { if (++i === msgs) consumer.cancel() }) await consumer.wait(20_000) expect(i).toEqual(msgs) -}, 40_000) +}, 60_000) test("have to connect socket before opening channels", async () => { const amqp = getNewClient() @@ -635,8 +634,7 @@ test("will split body over multiple frames", async () => { assert.fail("no msg") }) -// TODO: fails intermittently, throws unhandled exception, stopping the rest of the test -test.skip("can republish in consume block without race condition", async () => { +test("can republish in consume block without race condition", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel()