diff --git a/src/client.ts b/src/client.ts index 2b18c35..817d8f2 100644 --- a/src/client.ts +++ b/src/client.ts @@ -197,7 +197,7 @@ export class KubernetesRESTClient implements IKubernetesRESTClient { }; const request = client.request(requestHeaders); - let body = ""; + let lastResponse = ""; let buffer = ""; request.on("error", (err: any) => { @@ -227,7 +227,7 @@ export class KubernetesRESTClient implements IKubernetesRESTClient { } try { - const parsedBody = JSON.parse(body); + const parsedBody = JSON.parse(lastResponse); if (isStatus(parsedBody) && parsedBody.status === "Failure") { debug("watch: failed with status %O", parsedBody); @@ -249,28 +249,39 @@ export class KubernetesRESTClient implements IKubernetesRESTClient { debug("WATCH request on %o received %d bytes of data", absoluteURL, chunk.length); buffer += chunk; - body += chunk; + + let parts = buffer.split('\n'); + + // the last part is either empty or a part of the following incomplete line + buffer = parts.pop() ?? ""; + + if (buffer !== "") { + lastResponse = buffer + } else if (parts.length > 0) { + lastResponse = parts[parts.length - 1] + } // Line is not yet complete; wait for next chunk. - if (!buffer.endsWith("\n")) { + if (parts.length === 0) { return; } - try { - const obj: WatchEvent = JSON.parse(buffer); - buffer = ""; - - const resourceVersion = obj.object.metadata.resourceVersion - ? parseInt(obj.object.metadata.resourceVersion, 10) - : -1; - if (resourceVersion > lastVersion) { - debug(`watch: emitting ${obj.type} event for ${obj.object.metadata.name}`); - - lastVersion = resourceVersion; - onUpdate(obj).catch(onError); + for (const part of parts) { + try { + const obj: WatchEvent = JSON.parse(part); + + const resourceVersion = obj.object.metadata.resourceVersion + ? parseInt(obj.object.metadata.resourceVersion, 10) + : -1; + if (resourceVersion > lastVersion) { + debug(`watch: emitting ${obj.type} event for ${obj.object.metadata.name}`); + + lastVersion = resourceVersion; + onUpdate(obj).catch(onError); + } + } catch (err) { + onError(err); } - } catch (err) { - onError(err); } }); });