diff --git a/js/package.json b/js/package.json index 7e897da13..fdae8caf9 100644 --- a/js/package.json +++ b/js/package.json @@ -1,6 +1,6 @@ { "name": "langsmith", - "version": "0.2.1", + "version": "0.2.2", "description": "Client library to connect to the LangSmith LLM Tracing and Evaluation Platform.", "packageManager": "yarn@1.22.19", "files": [ diff --git a/js/src/client.ts b/js/src/client.ts index 9922e4c4f..fd4caec5d 100644 --- a/js/src/client.ts +++ b/js/src/client.ts @@ -363,7 +363,7 @@ const handle429 = async (response?: Response) => { return false; }; -export class Queue { +export class AutoBatchQueue { items: { action: "create" | "update"; payload: RunCreate | RunUpdate; @@ -461,7 +461,7 @@ export class Client { private autoBatchTracing = true; - private autoBatchQueue = new Queue(); + private autoBatchQueue = new AutoBatchQueue(); private autoBatchTimeout: ReturnType | undefined; @@ -755,7 +755,7 @@ export class Client { } } - private async _getBatchSizeLimitBytes() { + private async _getBatchSizeLimitBytes(): Promise { const serverInfo = await this._ensureServerInfo(); return ( this.batchSizeBytesLimit ?? @@ -764,21 +764,18 @@ export class Client { ); } - private async drainAutoBatchQueue() { - const batchSizeLimit = await this._getBatchSizeLimitBytes(); + private drainAutoBatchQueue(batchSizeLimit: number) { while (this.autoBatchQueue.items.length > 0) { - for (let i = 0; i < this.traceBatchConcurrency; i++) { - const [batch, done] = this.autoBatchQueue.pop(batchSizeLimit); - if (!batch.length) { - done(); - break; - } - await this.processBatch(batch, done); + const [batch, done] = this.autoBatchQueue.pop(batchSizeLimit); + if (!batch.length) { + done(); + break; } + void this._processBatch(batch, done).catch(console.error); } } - private async processBatch(batch: AutoBatchQueueItem[], done: () => void) { + private async _processBatch(batch: AutoBatchQueueItem[], done: () => void) { if (!batch.length) { done(); return; @@ -803,10 +800,7 @@ export class Client { } } - private async processRunOperation( - item: AutoBatchQueueItem, - immediatelyTriggerBatch?: boolean - ) { + private async processRunOperation(item: AutoBatchQueueItem) { const oldTimeout = this.autoBatchTimeout; clearTimeout(this.autoBatchTimeout); this.autoBatchTimeout = undefined; @@ -815,19 +809,14 @@ export class Client { } const itemPromise = this.autoBatchQueue.push(item); const sizeLimitBytes = await this._getBatchSizeLimitBytes(); - if ( - immediatelyTriggerBatch || - this.autoBatchQueue.sizeBytes > sizeLimitBytes - ) { - await this.drainAutoBatchQueue().catch(console.error); + if (this.autoBatchQueue.sizeBytes > sizeLimitBytes) { + this.drainAutoBatchQueue(sizeLimitBytes); } if (this.autoBatchQueue.items.length > 0) { this.autoBatchTimeout = setTimeout( () => { this.autoBatchTimeout = undefined; - // This error would happen in the background and is uncatchable - // from the outside. So just log instead. - void this.drainAutoBatchQueue().catch(console.error); + this.drainAutoBatchQueue(sizeLimitBytes); }, oldTimeout ? this.autoBatchAggregationDelayMs @@ -1232,9 +1221,11 @@ export class Client { data.parent_run_id === undefined && this.blockOnRootRunFinalization ) { - // Trigger a batch as soon as a root trace ends and block to ensure trace finishes + // Trigger batches as soon as a root trace ends and wait to ensure trace finishes // in serverless environments. - await this.processRunOperation({ action: "update", item: data }, true); + await this.processRunOperation({ action: "update", item: data }).catch( + console.error + ); return; } else { void this.processRunOperation({ action: "update", item: data }).catch( diff --git a/js/src/index.ts b/js/src/index.ts index 78f45d4d6..bde1f2522 100644 --- a/js/src/index.ts +++ b/js/src/index.ts @@ -14,4 +14,4 @@ export { RunTree, type RunTreeConfig } from "./run_trees.js"; export { overrideFetchImplementation } from "./singletons/fetch.js"; // Update using yarn bump-version -export const __version__ = "0.2.1"; +export const __version__ = "0.2.2"; diff --git a/js/src/utils/fast-safe-stringify/index.ts b/js/src/utils/fast-safe-stringify/index.ts index 7ae29d887..6b89b51db 100644 --- a/js/src/utils/fast-safe-stringify/index.ts +++ b/js/src/utils/fast-safe-stringify/index.ts @@ -15,33 +15,45 @@ function defaultOptions() { // Regular stringify export function stringify(obj, replacer?, spacer?, options?) { - if (typeof options === "undefined") { - options = defaultOptions(); - } - - decirc(obj, "", 0, [], undefined, 0, options); - var res; try { - if (replacerStack.length === 0) { - res = JSON.stringify(obj, replacer, spacer); - } else { - res = JSON.stringify(obj, replaceGetterValues(replacer), spacer); + return JSON.stringify(obj, replacer, spacer); + } catch (e: any) { + // Fall back to more complex stringify if circular reference + if (!e.message?.includes("Converting circular structure to JSON")) { + console.warn("[WARNING]: LangSmith received unserializable value."); + return "[Unserializable]"; } - } catch (_) { - return JSON.stringify( - "[unable to serialize, circular reference is too complex to analyze]" + console.warn( + "[WARNING]: LangSmith received circular JSON. This will decrease tracer performance." ); - } finally { - while (arr.length !== 0) { - var part = arr.pop(); - if (part.length === 4) { - Object.defineProperty(part[0], part[1], part[3]); + if (typeof options === "undefined") { + options = defaultOptions(); + } + + decirc(obj, "", 0, [], undefined, 0, options); + var res; + try { + if (replacerStack.length === 0) { + res = JSON.stringify(obj, replacer, spacer); } else { - part[0][part[1]] = part[2]; + res = JSON.stringify(obj, replaceGetterValues(replacer), spacer); + } + } catch (_) { + return JSON.stringify( + "[unable to serialize, circular reference is too complex to analyze]" + ); + } finally { + while (arr.length !== 0) { + var part = arr.pop(); + if (part.length === 4) { + Object.defineProperty(part[0], part[1], part[3]); + } else { + part[0][part[1]] = part[2]; + } } } + return res; } - return res; } function setReplace(replace, val, k, parent) {