From a683697f18f300d17c0d84051266030adef2ae87 Mon Sep 17 00:00:00 2001 From: jacoblee93 Date: Fri, 25 Oct 2024 17:00:34 -0700 Subject: [PATCH 1/6] Further reduce blocking behavior --- js/src/client.ts | 37 +++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/js/src/client.ts b/js/src/client.ts index 9922e4c4f..b1747638e 100644 --- a/js/src/client.ts +++ b/js/src/client.ts @@ -363,7 +363,7 @@ const handle429 = async (response?: Response) => { return false; }; -export class Queue { +export class AutoBatchQueue { items: { action: "create" | "update"; payload: RunCreate | RunUpdate; @@ -461,7 +461,7 @@ export class Client { private autoBatchTracing = true; - private autoBatchQueue = new Queue(); + private autoBatchQueue = new AutoBatchQueue(); private autoBatchTimeout: ReturnType | undefined; @@ -755,7 +755,7 @@ export class Client { } } - private async _getBatchSizeLimitBytes() { + private async _getBatchSizeLimitBytes(): Promise { const serverInfo = await this._ensureServerInfo(); return ( this.batchSizeBytesLimit ?? @@ -764,21 +764,24 @@ export class Client { ); } - private async drainAutoBatchQueue() { - const batchSizeLimit = await this._getBatchSizeLimitBytes(); - while (this.autoBatchQueue.items.length > 0) { - for (let i = 0; i < this.traceBatchConcurrency; i++) { - const [batch, done] = this.autoBatchQueue.pop(batchSizeLimit); - if (!batch.length) { - done(); - break; + private drainAutoBatchQueue(batchSizeLimit: number) { + try { + while (this.autoBatchQueue.items.length > 0) { + for (let i = 0; i < this.traceBatchConcurrency; i++) { + const [batch, done] = this.autoBatchQueue.pop(batchSizeLimit); + if (!batch.length) { + done(); + break; + } + void this._processBatch(batch, done).catch(console.error); } - await this.processBatch(batch, done); } + } catch (e) { + console.error(e); } } - private async processBatch(batch: AutoBatchQueueItem[], done: () => void) { + private async _processBatch(batch: AutoBatchQueueItem[], done: () => void) { if (!batch.length) { done(); return; @@ -819,15 +822,13 @@ export class Client { immediatelyTriggerBatch || this.autoBatchQueue.sizeBytes > sizeLimitBytes ) { - await this.drainAutoBatchQueue().catch(console.error); + this.drainAutoBatchQueue(sizeLimitBytes); } if (this.autoBatchQueue.items.length > 0) { this.autoBatchTimeout = setTimeout( () => { this.autoBatchTimeout = undefined; - // This error would happen in the background and is uncatchable - // from the outside. So just log instead. - void this.drainAutoBatchQueue().catch(console.error); + this.drainAutoBatchQueue(sizeLimitBytes); }, oldTimeout ? this.autoBatchAggregationDelayMs @@ -1232,7 +1233,7 @@ export class Client { data.parent_run_id === undefined && this.blockOnRootRunFinalization ) { - // Trigger a batch as soon as a root trace ends and block to ensure trace finishes + // Trigger batches as soon as a root trace ends and wait to ensure trace finishes // in serverless environments. await this.processRunOperation({ action: "update", item: data }, true); return; From b190ea9121ef358b197e11ecfee633a002a9de8c Mon Sep 17 00:00:00 2001 From: jacoblee93 Date: Fri, 25 Oct 2024 17:03:42 -0700 Subject: [PATCH 2/6] Further simplify --- js/src/client.ts | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/js/src/client.ts b/js/src/client.ts index b1747638e..f19c36bd3 100644 --- a/js/src/client.ts +++ b/js/src/client.ts @@ -765,19 +765,13 @@ export class Client { } private drainAutoBatchQueue(batchSizeLimit: number) { - try { - while (this.autoBatchQueue.items.length > 0) { - for (let i = 0; i < this.traceBatchConcurrency; i++) { - const [batch, done] = this.autoBatchQueue.pop(batchSizeLimit); - if (!batch.length) { - done(); - break; - } - void this._processBatch(batch, done).catch(console.error); - } + while (this.autoBatchQueue.items.length > 0) { + const [batch, done] = this.autoBatchQueue.pop(batchSizeLimit); + if (!batch.length) { + done(); + break; } - } catch (e) { - console.error(e); + void this._processBatch(batch, done).catch(console.error); } } From a6fbcee192625eda172348f4f1026661f3f646f4 Mon Sep 17 00:00:00 2001 From: jacoblee93 Date: Fri, 25 Oct 2024 17:16:35 -0700 Subject: [PATCH 3/6] Remove delay --- js/src/client.ts | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/js/src/client.ts b/js/src/client.ts index f19c36bd3..fd4caec5d 100644 --- a/js/src/client.ts +++ b/js/src/client.ts @@ -800,10 +800,7 @@ export class Client { } } - private async processRunOperation( - item: AutoBatchQueueItem, - immediatelyTriggerBatch?: boolean - ) { + private async processRunOperation(item: AutoBatchQueueItem) { const oldTimeout = this.autoBatchTimeout; clearTimeout(this.autoBatchTimeout); this.autoBatchTimeout = undefined; @@ -812,10 +809,7 @@ export class Client { } const itemPromise = this.autoBatchQueue.push(item); const sizeLimitBytes = await this._getBatchSizeLimitBytes(); - if ( - immediatelyTriggerBatch || - this.autoBatchQueue.sizeBytes > sizeLimitBytes - ) { + if (this.autoBatchQueue.sizeBytes > sizeLimitBytes) { this.drainAutoBatchQueue(sizeLimitBytes); } if (this.autoBatchQueue.items.length > 0) { @@ -1229,7 +1223,9 @@ export class Client { ) { // Trigger batches as soon as a root trace ends and wait to ensure trace finishes // in serverless environments. - await this.processRunOperation({ action: "update", item: data }, true); + await this.processRunOperation({ action: "update", item: data }).catch( + console.error + ); return; } else { void this.processRunOperation({ action: "update", item: data }).catch( From 9b4da026e38b3f224cd93def95e197f0d1a7447e Mon Sep 17 00:00:00 2001 From: jacoblee93 Date: Fri, 25 Oct 2024 18:31:11 -0700 Subject: [PATCH 4/6] Attempt naive serialization first, then fallback to circular detection --- js/src/utils/fast-safe-stringify/index.ts | 50 +++++++++++++---------- 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/js/src/utils/fast-safe-stringify/index.ts b/js/src/utils/fast-safe-stringify/index.ts index 7ae29d887..f04b204cd 100644 --- a/js/src/utils/fast-safe-stringify/index.ts +++ b/js/src/utils/fast-safe-stringify/index.ts @@ -15,33 +15,41 @@ function defaultOptions() { // Regular stringify export function stringify(obj, replacer?, spacer?, options?) { - if (typeof options === "undefined") { - options = defaultOptions(); - } - - decirc(obj, "", 0, [], undefined, 0, options); - var res; try { - if (replacerStack.length === 0) { - res = JSON.stringify(obj, replacer, spacer); - } else { - res = JSON.stringify(obj, replaceGetterValues(replacer), spacer); + return JSON.stringify(obj, replacer, spacer); + } catch (e: any) { + // Fall back to more complex stringify if circular reference + if (!e.message?.includes("Converting circular structure to JSON")) { + return "[Unserializable]"; } - } catch (_) { - return JSON.stringify( - "[unable to serialize, circular reference is too complex to analyze]" - ); - } finally { - while (arr.length !== 0) { - var part = arr.pop(); - if (part.length === 4) { - Object.defineProperty(part[0], part[1], part[3]); + if (typeof options === "undefined") { + options = defaultOptions(); + } + + decirc(obj, "", 0, [], undefined, 0, options); + var res; + try { + if (replacerStack.length === 0) { + res = JSON.stringify(obj, replacer, spacer); } else { - part[0][part[1]] = part[2]; + res = JSON.stringify(obj, replaceGetterValues(replacer), spacer); + } + } catch (_) { + return JSON.stringify( + "[unable to serialize, circular reference is too complex to analyze]" + ); + } finally { + while (arr.length !== 0) { + var part = arr.pop(); + if (part.length === 4) { + Object.defineProperty(part[0], part[1], part[3]); + } else { + part[0][part[1]] = part[2]; + } } } + return res; } - return res; } function setReplace(replace, val, k, parent) { From 49de7abb303d253acb7994003587872717bc114a Mon Sep 17 00:00:00 2001 From: jacoblee93 Date: Fri, 25 Oct 2024 18:37:13 -0700 Subject: [PATCH 5/6] Add warning logs --- js/src/utils/fast-safe-stringify/index.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/js/src/utils/fast-safe-stringify/index.ts b/js/src/utils/fast-safe-stringify/index.ts index f04b204cd..6b89b51db 100644 --- a/js/src/utils/fast-safe-stringify/index.ts +++ b/js/src/utils/fast-safe-stringify/index.ts @@ -20,8 +20,12 @@ export function stringify(obj, replacer?, spacer?, options?) { } catch (e: any) { // Fall back to more complex stringify if circular reference if (!e.message?.includes("Converting circular structure to JSON")) { + console.warn("[WARNING]: LangSmith received unserializable value."); return "[Unserializable]"; } + console.warn( + "[WARNING]: LangSmith received circular JSON. This will decrease tracer performance." + ); if (typeof options === "undefined") { options = defaultOptions(); } From 7a11fb79cb65089c1d3a61b6b80e658098590565 Mon Sep 17 00:00:00 2001 From: jacoblee93 Date: Fri, 25 Oct 2024 18:39:48 -0700 Subject: [PATCH 6/6] Bump to 0.2.2 --- js/package.json | 2 +- js/src/index.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/js/package.json b/js/package.json index 7e897da13..fdae8caf9 100644 --- a/js/package.json +++ b/js/package.json @@ -1,6 +1,6 @@ { "name": "langsmith", - "version": "0.2.1", + "version": "0.2.2", "description": "Client library to connect to the LangSmith LLM Tracing and Evaluation Platform.", "packageManager": "yarn@1.22.19", "files": [ diff --git a/js/src/index.ts b/js/src/index.ts index 78f45d4d6..bde1f2522 100644 --- a/js/src/index.ts +++ b/js/src/index.ts @@ -14,4 +14,4 @@ export { RunTree, type RunTreeConfig } from "./run_trees.js"; export { overrideFetchImplementation } from "./singletons/fetch.js"; // Update using yarn bump-version -export const __version__ = "0.2.1"; +export const __version__ = "0.2.2";