diff --git a/js/src/client.ts b/js/src/client.ts index a61e9a84e..9cd7dde17 100644 --- a/js/src/client.ts +++ b/js/src/client.ts @@ -72,7 +72,7 @@ export interface ClientConfig { hideInputs?: boolean | ((inputs: KVMap) => KVMap); hideOutputs?: boolean | ((outputs: KVMap) => KVMap); autoBatchTracing?: boolean; - pendingAutoBatchedRunLimit?: number; + batchSizeBytesLimit?: number; blockOnRootRunFinalization?: boolean; fetchOptions?: RequestInit; } @@ -238,6 +238,7 @@ interface CreateRunParams { revision_id?: string; trace_id?: string; dotted_order?: string; + attachments?: Record; } interface UpdateRunParams extends RunUpdate { @@ -276,28 +277,31 @@ type AutoBatchQueueItem = { item: RunCreate | RunUpdate; }; -async function mergeRuntimeEnvIntoRunCreates(runs: RunCreate[]) { - const runtimeEnv = await getRuntimeEnvironment(); +type MultipartPart = { + name: string; + payload: Blob; +}; + +export function mergeRuntimeEnvIntoRunCreate(run: RunCreate) { + const runtimeEnv = getRuntimeEnvironment(); const envVars = getLangChainEnvVarsMetadata(); - return runs.map((run) => { - const extra = run.extra ?? {}; - const metadata = extra.metadata; - run.extra = { - ...extra, - runtime: { - ...runtimeEnv, - ...extra?.runtime, - }, - metadata: { - ...envVars, - ...(envVars.revision_id || run.revision_id - ? { revision_id: run.revision_id ?? envVars.revision_id } - : {}), - ...metadata, - }, - }; - return run; - }); + const extra = run.extra ?? {}; + const metadata = extra.metadata; + run.extra = { + ...extra, + runtime: { + ...runtimeEnv, + ...extra?.runtime, + }, + metadata: { + ...envVars, + ...(envVars.revision_id || run.revision_id + ? { revision_id: run.revision_id ?? envVars.revision_id } + : {}), + ...metadata, + }, + }; + return run; } const getTracingSamplingRate = () => { @@ -357,45 +361,79 @@ const handle429 = async (response?: Response) => { return false; }; -export class Queue { - items: [T, () => void, Promise][] = []; +export class Queue { + items: { + action: "create" | "update"; + payload: RunCreate | RunUpdate; + itemPromiseResolve: () => void; + itemPromise: Promise; + size: number; + }[] = []; + + sizeBytes = 0; - get size() { - return this.items.length; + peek() { + return this.items[0]; } - push(item: T): Promise { + push(item: AutoBatchQueueItem): Promise { let itemPromiseResolve; const itemPromise = new Promise((resolve) => { // Setting itemPromiseResolve is synchronous with promise creation: // https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/Promise itemPromiseResolve = resolve; }); - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this.items.push([item, itemPromiseResolve!, itemPromise]); + const size = stringifyForTracing(item.item).length; + this.items.push({ + action: item.action, + payload: item.item, + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + itemPromiseResolve: itemPromiseResolve!, + itemPromise, + size, + }); + this.sizeBytes += size; return itemPromise; } - pop(upToN: number): [T[], () => void] { - if (upToN < 1) { - throw new Error("Number of items to pop off may not be less than 1."); + pop(upToSizeBytes: number): [AutoBatchQueueItem[], () => void] { + if (upToSizeBytes < 1) { + throw new Error("Number of bytes to pop off may not be less than 1."); } const popped: typeof this.items = []; - while (popped.length < upToN && this.items.length) { + let poppedSizeBytes = 0; + // Pop items until we reach or exceed the size limit + while ( + poppedSizeBytes + (this.peek()?.size ?? 0) < upToSizeBytes && + this.items.length > 0 + ) { const item = this.items.shift(); if (item) { popped.push(item); - } else { - break; + poppedSizeBytes += item.size; + this.sizeBytes -= item.size; } } - return [popped.map((it) => it[0]), () => popped.forEach((it) => it[1]())]; + // If there is an item on the queue we were unable to pop, + // just return it as a single batch. + if (popped.length === 0 && this.items.length > 0) { + const item = this.items.shift()!; + popped.push(item); + poppedSizeBytes += item.size; + this.sizeBytes -= item.size; + } + return [ + popped.map((it) => ({ action: it.action, item: it.payload })), + () => popped.forEach((it) => it.itemPromiseResolve()), + ]; } } // 20 MB export const DEFAULT_BATCH_SIZE_LIMIT_BYTES = 20_971_520; +const SERVER_INFO_REQUEST_TIMEOUT = 1000; + export class Client { private apiKey?: string; @@ -421,11 +459,7 @@ export class Client { private autoBatchTracing = true; - private batchEndpointSupported?: boolean; - - private autoBatchQueue = new Queue(); - - private pendingAutoBatchedRunLimit = 100; + private autoBatchQueue = new Queue(); private autoBatchTimeout: ReturnType | undefined; @@ -433,7 +467,7 @@ export class Client { private autoBatchAggregationDelayMs = 50; - private serverInfo: RecordStringAny | undefined; + private batchSizeBytesLimit?: number; private fetchOptions: RequestInit; @@ -441,6 +475,11 @@ export class Client { private blockOnRootRunFinalization = true; + private _serverInfo: RecordStringAny | undefined; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + private _getServerInfoPromise?: Promise>; + constructor(config: ClientConfig = {}) { const defaultConfig = Client.getDefaultClientConfig(); @@ -469,8 +508,7 @@ export class Client { this.autoBatchTracing = config.autoBatchTracing ?? this.autoBatchTracing; this.blockOnRootRunFinalization = config.blockOnRootRunFinalization ?? this.blockOnRootRunFinalization; - this.pendingAutoBatchedRunLimit = - config.pendingAutoBatchedRunLimit ?? this.pendingAutoBatchedRunLimit; + this.batchSizeBytesLimit = config.batchSizeBytesLimit; this.fetchOptions = config.fetchOptions || {}; } @@ -596,6 +634,7 @@ export class Client { const response = await this._getResponse(path, queryParams); return response.json() as T; } + private async *_getPaginated( path: string, queryParams: URLSearchParams = new URLSearchParams(), @@ -630,6 +669,7 @@ export class Client { offset += items.length; } } + private async *_getCursorPaginatedList( path: string, body: RecordStringAny | null = null, @@ -703,24 +743,39 @@ export class Client { } } + private async _getBatchSizeLimitBytes() { + const serverInfo = await this._ensureServerInfo(); + return ( + this.batchSizeBytesLimit ?? + serverInfo.batch_ingest_config?.size_limit_bytes ?? + DEFAULT_BATCH_SIZE_LIMIT_BYTES + ); + } + private async drainAutoBatchQueue() { - while (this.autoBatchQueue.size >= 0) { + while (this.autoBatchQueue.items.length >= 0) { const [batch, done] = this.autoBatchQueue.pop( - this.pendingAutoBatchedRunLimit + await this._getBatchSizeLimitBytes() ); if (!batch.length) { done(); return; } try { - await this.batchIngestRuns({ + const ingestParams = { runCreates: batch .filter((item) => item.action === "create") .map((item) => item.item) as RunCreate[], runUpdates: batch .filter((item) => item.action === "update") .map((item) => item.item) as RunUpdate[], - }); + }; + const serverInfo = await this._ensureServerInfo(); + if (serverInfo?.batch_ingest_config?.use_multipart_endpoint) { + await this.multipartIngestRuns(ingestParams); + } else { + await this.batchIngestRuns(ingestParams); + } } finally { done(); } @@ -734,14 +789,18 @@ export class Client { const oldTimeout = this.autoBatchTimeout; clearTimeout(this.autoBatchTimeout); this.autoBatchTimeout = undefined; + if (item.action === "create") { + item.item = mergeRuntimeEnvIntoRunCreate(item.item as RunCreate); + } const itemPromise = this.autoBatchQueue.push(item); + const sizeLimitBytes = await this._getBatchSizeLimitBytes(); if ( immediatelyTriggerBatch || - this.autoBatchQueue.size > this.pendingAutoBatchedRunLimit + this.autoBatchQueue.sizeBytes > sizeLimitBytes ) { await this.drainAutoBatchQueue().catch(console.error); } - if (this.autoBatchQueue.size > 0) { + if (this.autoBatchQueue.items.length > 0) { this.autoBatchTimeout = setTimeout( () => { this.autoBatchTimeout = undefined; @@ -761,20 +820,34 @@ export class Client { const response = await _getFetchImplementation()(`${this.apiUrl}/info`, { method: "GET", headers: { Accept: "application/json" }, - signal: AbortSignal.timeout(this.timeout_ms), + signal: AbortSignal.timeout(SERVER_INFO_REQUEST_TIMEOUT), ...this.fetchOptions, }); await raiseForStatus(response, "get server info"); return response.json(); } - protected async batchEndpointIsSupported() { - try { - this.serverInfo = await this._getServerInfo(); - } catch (e) { - return false; + protected async _ensureServerInfo() { + if (this._getServerInfoPromise === undefined) { + this._getServerInfoPromise = (async () => { + if (this._serverInfo === undefined) { + try { + this._serverInfo = await this._getServerInfo(); + } catch (e) { + console.warn( + `[WARNING]: LangSmith failed to fetch info on supported operations. Falling back to single calls and default limits.` + ); + } + } + return this._serverInfo ?? {}; + })(); } - return true; + return this._getServerInfoPromise.then((serverInfo) => { + if (this._serverInfo === undefined) { + this._getServerInfoPromise = undefined; + } + return serverInfo; + }); } protected async _getSettings() { @@ -809,9 +882,7 @@ export class Client { }).catch(console.error); return; } - const mergedRunCreateParams = await mergeRuntimeEnvIntoRunCreates([ - runCreate, - ]); + const mergedRunCreateParam = mergeRuntimeEnvIntoRunCreate(runCreate); const response = await this.caller.call( _getFetchImplementation(), @@ -819,7 +890,7 @@ export class Client { { method: "POST", headers, - body: stringifyForTracing(mergedRunCreateParams[0]), + body: stringifyForTracing(mergedRunCreateParam), signal: AbortSignal.timeout(this.timeout_ms), ...this.fetchOptions, } @@ -882,13 +953,8 @@ export class Client { if (!rawBatch.post.length && !rawBatch.patch.length) { return; } - preparedCreateParams = await mergeRuntimeEnvIntoRunCreates( - preparedCreateParams - ); - if (this.batchEndpointSupported === undefined) { - this.batchEndpointSupported = await this.batchEndpointIsSupported(); - } - if (!this.batchEndpointSupported) { + const serverInfo = await this._ensureServerInfo(); + if (serverInfo.version === undefined) { this.autoBatchTracing = false; for (const preparedCreateParam of rawBatch.post) { await this.createRun(preparedCreateParam as CreateRunParams); @@ -903,30 +969,15 @@ export class Client { } return; } - const sizeLimitBytes = - this.serverInfo?.batch_ingest_config?.size_limit_bytes ?? - DEFAULT_BATCH_SIZE_LIMIT_BYTES; const batchChunks = { post: [] as (typeof rawBatch)["post"], patch: [] as (typeof rawBatch)["patch"], }; - let currentBatchSizeBytes = 0; for (const k of ["post", "patch"]) { const key = k as keyof typeof rawBatch; const batchItems = rawBatch[key].reverse(); let batchItem = batchItems.pop(); while (batchItem !== undefined) { - const stringifiedBatchItem = stringifyForTracing(batchItem); - if ( - currentBatchSizeBytes > 0 && - currentBatchSizeBytes + stringifiedBatchItem.length > sizeLimitBytes - ) { - await this._postBatchIngestRuns(stringifyForTracing(batchChunks)); - currentBatchSizeBytes = 0; - batchChunks.post = []; - batchChunks.patch = []; - } - currentBatchSizeBytes += stringifiedBatchItem.length; batchChunks[key].push(batchItem); batchItem = batchItems.pop(); } @@ -956,6 +1007,186 @@ export class Client { await raiseForStatus(response, "batch create run", true); } + /** + * Batch ingest/upsert multiple runs in the Langsmith system. + * @param runs + */ + public async multipartIngestRuns({ + runCreates, + runUpdates, + }: { + runCreates?: RunCreate[]; + runUpdates?: RunUpdate[]; + }) { + if (runCreates === undefined && runUpdates === undefined) { + return; + } + // transform and convert to dicts + const allAttachments: Record< + string, + Record + > = {}; + let preparedCreateParams = []; + for (const create of runCreates ?? []) { + const preparedCreate = this.prepareRunCreateOrUpdateInputs(create); + if ( + preparedCreate.id !== undefined && + preparedCreate.attachments !== undefined + ) { + allAttachments[preparedCreate.id] = preparedCreate.attachments; + } + delete preparedCreate.attachments; + preparedCreateParams.push(preparedCreate); + } + + let preparedUpdateParams = []; + for (const update of runUpdates ?? []) { + preparedUpdateParams.push(this.prepareRunCreateOrUpdateInputs(update)); + } + + // require trace_id and dotted_order + const invalidRunCreate = preparedCreateParams.find((runCreate) => { + return ( + runCreate.trace_id === undefined || runCreate.dotted_order === undefined + ); + }); + if (invalidRunCreate !== undefined) { + throw new Error( + `Multipart ingest requires "trace_id" and "dotted_order" to be set when creating a run` + ); + } + const invalidRunUpdate = preparedUpdateParams.find((runUpdate) => { + return ( + runUpdate.trace_id === undefined || runUpdate.dotted_order === undefined + ); + }); + if (invalidRunUpdate !== undefined) { + throw new Error( + `Multipart ingest requires "trace_id" and "dotted_order" to be set when updating a run` + ); + } + // combine post and patch dicts where possible + if (preparedCreateParams.length > 0 && preparedUpdateParams.length > 0) { + const createById = preparedCreateParams.reduce( + (params: Record, run) => { + if (!run.id) { + return params; + } + params[run.id] = run; + return params; + }, + {} + ); + const standaloneUpdates = []; + for (const updateParam of preparedUpdateParams) { + if (updateParam.id !== undefined && createById[updateParam.id]) { + createById[updateParam.id] = { + ...createById[updateParam.id], + ...updateParam, + }; + } else { + standaloneUpdates.push(updateParam); + } + } + preparedCreateParams = Object.values(createById); + preparedUpdateParams = standaloneUpdates; + } + if ( + preparedCreateParams.length === 0 && + preparedUpdateParams.length === 0 + ) { + return; + } + // send the runs in multipart requests + const accumulatedContext: string[] = []; + const accumulatedParts: MultipartPart[] = []; + for (const [method, payloads] of [ + ["post", preparedCreateParams] as const, + ["patch", preparedUpdateParams] as const, + ]) { + for (const originalPayload of payloads) { + // collect fields to be sent as separate parts + const { inputs, outputs, events, ...payload } = originalPayload; + const fields = { inputs, outputs, events }; + // encode the main run payload + const stringifiedPayload = stringifyForTracing(payload); + accumulatedParts.push({ + name: `${method}.${payload.id}`, + payload: new Blob([stringifiedPayload], { + type: `application/json; length=${stringifiedPayload.length}`, // encoding=gzip + }), + }); + // encode the fields we collected + for (const [key, value] of Object.entries(fields)) { + if (value === undefined) { + continue; + } + const stringifiedValue = stringifyForTracing(value); + accumulatedParts.push({ + name: `${method}.${payload.id}.${key}`, + payload: new Blob([stringifiedValue], { + type: `application/json; length=${stringifiedValue.length}`, + }), + }); + } + // encode the attachments + if (payload.id !== undefined) { + const attachments = allAttachments[payload.id]; + if (attachments) { + delete allAttachments[payload.id]; + for (const [name, [contentType, content]] of Object.entries( + attachments + )) { + accumulatedParts.push({ + name: `attachment.${payload.id}.${name}`, + payload: new Blob([content], { + type: `${contentType}; length=${content.length}`, + }), + }); + } + } + } + // compute context + accumulatedContext.push(`trace=${payload.trace_id},id=${payload.id}`); + } + } + await this._sendMultipartRequest( + accumulatedParts, + accumulatedContext.join("; ") + ); + } + + private async _sendMultipartRequest(parts: MultipartPart[], context: string) { + try { + const formData = new FormData(); + for (const part of parts) { + formData.append(part.name, part.payload); + } + await this.batchIngestCaller.call( + _getFetchImplementation(), + `${this.apiUrl}/runs/multipart`, + { + method: "POST", + headers: { + ...this.headers, + }, + body: formData, + signal: AbortSignal.timeout(this.timeout_ms), + ...this.fetchOptions, + } + ); + } catch (e) { + let errorMessage = "Failed to multipart ingest runs"; + // eslint-disable-next-line no-instanceof/no-instanceof + if (e instanceof Error) { + errorMessage += `: ${e.stack || e.message}`; + } else { + errorMessage += `: ${String(e)}`; + } + console.warn(`${errorMessage.trim()}\n\nContext: ${context}`); + } + } + public async updateRun(runId: string, run: RunUpdate): Promise { assertUuid(runId); if (run.inputs) { @@ -3922,7 +4153,7 @@ export class Client { */ public awaitPendingTraceBatches() { return Promise.all( - this.autoBatchQueue.items.map(([, , promise]) => promise) + this.autoBatchQueue.items.map(({ itemPromise }) => itemPromise) ); } } diff --git a/js/src/run_trees.ts b/js/src/run_trees.ts index c8e8091ab..97a33a19c 100644 --- a/js/src/run_trees.ts +++ b/js/src/run_trees.ts @@ -376,7 +376,7 @@ export class RunTree implements BaseRun { async postRun(excludeChildRuns = true): Promise { try { - const runtimeEnv = await getRuntimeEnvironment(); + const runtimeEnv = getRuntimeEnvironment(); const runCreate = await this._convertToCreate(this, runtimeEnv, true); await this.client.createRun(runCreate); diff --git a/js/src/schemas.ts b/js/src/schemas.ts index 7dc9562d8..1e899bc2c 100644 --- a/js/src/schemas.ts +++ b/js/src/schemas.ts @@ -126,6 +126,12 @@ export interface BaseRun { * - 20230915T223155647Z1b64098b-4ab7-43f6-afee-992304f198d8.20230914T223155650Zc8d9f4c5-6c5a-4b2d-9b1c-3d9d7a7c5c7c */ dotted_order?: string; + + /** + * Attachments associated with the run. + * Each entry is a tuple of [mime_type, bytes] + */ + attachments?: Record; } type S3URL = { diff --git a/js/src/tests/batch_client.int.test.ts b/js/src/tests/batch_client.int.test.ts index 4705fae3c..a91cfc6b0 100644 --- a/js/src/tests/batch_client.int.test.ts +++ b/js/src/tests/batch_client.int.test.ts @@ -1,6 +1,10 @@ +import { v4 as uuidv4 } from "uuid"; +import * as fs from "node:fs"; +import * as path from "node:path"; +import { fileURLToPath } from "node:url"; + import { Client } from "../client.js"; import { RunTree, convertToDottedOrderFormat } from "../run_trees.js"; -import { v4 as uuidv4 } from "uuid"; import { deleteProject, waitUntilProjectFound, @@ -58,7 +62,7 @@ test.concurrent( const langchainClient = new Client({ autoBatchTracing: true, callerOptions: { maxRetries: 2 }, - pendingAutoBatchedRunLimit: 2, + batchSizeBytesLimit: 1, timeout_ms: 30_000, }); const projectName = @@ -185,3 +189,55 @@ test.concurrent( }, 180_000 ); + +test.concurrent( + "Test persist run with attachment", + async () => { + const langchainClient = new Client({ + autoBatchTracing: true, + callerOptions: { maxRetries: 2 }, + timeout_ms: 30_000, + }); + const projectName = "__test_create_attachment" + uuidv4().substring(0, 4); + await deleteProject(langchainClient, projectName); + + const runId = uuidv4(); + const dottedOrder = convertToDottedOrderFormat( + new Date().getTime() / 1000, + runId + ); + const pathname = path.join( + path.dirname(fileURLToPath(import.meta.url)), + "test_data", + "parrot-icon.png" + ); + await langchainClient.createRun({ + id: runId, + project_name: projectName, + name: "test_run", + run_type: "llm", + inputs: { text: "hello world" }, + trace_id: runId, + dotted_order: dottedOrder, + attachments: { + testimage: ["image/png", fs.readFileSync(pathname)], + }, + }); + + await langchainClient.updateRun(runId, { + outputs: { output: ["Hi"] }, + dotted_order: dottedOrder, + trace_id: runId, + }); + + await Promise.all([ + waitUntilRunFound(langchainClient, runId, true), + waitUntilProjectFound(langchainClient, projectName), + ]); + + const storedRun = await langchainClient.readRun(runId); + expect(storedRun.id).toEqual(runId); + await langchainClient.deleteProject({ projectName }); + }, + 180_000 +); diff --git a/js/src/tests/batch_client.test.ts b/js/src/tests/batch_client.test.ts index c9f66a486..1fea3f778 100644 --- a/js/src/tests/batch_client.test.ts +++ b/js/src/tests/batch_client.test.ts @@ -1,706 +1,904 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ +/* eslint-disable prefer-const */ import { jest } from "@jest/globals"; import { v4 as uuidv4 } from "uuid"; -import { Client } from "../client.js"; +import { Client, mergeRuntimeEnvIntoRunCreate } from "../client.js"; import { convertToDottedOrderFormat } from "../run_trees.js"; import { _getFetchImplementation } from "../singletons/fetch.js"; +import { RunCreate } from "../schemas.js"; + +const parseMockRequestBody = async (body: string | FormData) => { + if (typeof body === "string") { + return JSON.parse(body); + } + // Typing is missing + const entries: any[] = Array.from((body as any).entries()); + const reconstructedBody: any = { + post: [], + patch: [], + }; + for (const [key, value] of entries) { + let [method, id, type] = key.split("."); + const text = await value.text(); + let parsedValue; + try { + parsedValue = JSON.parse(text); + } catch (e) { + parsedValue = text; + } + // if (method === "attachment") { + // for (const item of reconstructedBody.post) { + // if (item.id === id) { + // if (item.attachments === undefined) { + // item.attachments = []; + // } + + // item[type] = parsedValue; + // } + // } + // return; + // } + if (!(method in reconstructedBody)) { + throw new Error(`${method} must be "post" or "patch"`); + } + if (!type) { + reconstructedBody[method as keyof typeof reconstructedBody].push( + parsedValue + ); + } else { + for (const item of reconstructedBody[method]) { + if (item.id === id) { + item[type] = parsedValue; + } + } + } + } + return reconstructedBody; +}; + +// prettier-ignore +const ENDPOINT_TYPES = [ + "batch", + "multipart", +]; + +describe.each(ENDPOINT_TYPES)( + "Batch client tracing with %s endpoint", + (endpointType) => { + const extraBatchIngestConfig = + endpointType === "batch" + ? {} + : { + use_multipart_endpoint: true, + }; + const expectedTraceURL = + endpointType === "batch" + ? "https://api.smith.langchain.com/runs/batch" + : "https://api.smith.langchain.com/runs/multipart"; + it("should create a batched run with the given input", async () => { + const client = new Client({ + apiKey: "test-api-key", + autoBatchTracing: true, + }); + const callSpy = jest + .spyOn((client as any).batchIngestCaller, "call") + .mockResolvedValue({ + ok: true, + text: () => "", + }); + jest.spyOn(client as any, "_getServerInfo").mockImplementation(() => { + return { + version: "foo", + batch_ingest_config: { ...extraBatchIngestConfig }, + }; + }); + const projectName = "__test_batch"; + + const runId = uuidv4(); + const dottedOrder = convertToDottedOrderFormat( + new Date().getTime() / 1000, + runId + ); + await client.createRun({ + id: runId, + project_name: projectName, + name: "test_run", + run_type: "llm", + inputs: { text: "hello world" }, + trace_id: runId, + dotted_order: dottedOrder, + }); -describe("Batch client tracing", () => { - it("should create a batched run with the given input", async () => { - const client = new Client({ - apiKey: "test-api-key", - autoBatchTracing: true, - }); - const callSpy = jest - .spyOn((client as any).batchIngestCaller, "call") - .mockResolvedValue({ - ok: true, - text: () => "", - }); - jest - .spyOn(client as any, "batchEndpointIsSupported") - .mockResolvedValue(true); - const projectName = "__test_batch"; - - const runId = uuidv4(); - const dottedOrder = convertToDottedOrderFormat( - new Date().getTime() / 1000, - runId - ); - await client.createRun({ - id: runId, - project_name: projectName, - name: "test_run", - run_type: "llm", - inputs: { text: "hello world" }, - trace_id: runId, - dotted_order: dottedOrder, - }); + await new Promise((resolve) => setTimeout(resolve, 300)); - await new Promise((resolve) => setTimeout(resolve, 300)); + const calledRequestParam: any = callSpy.mock.calls[0][2]; + expect(await parseMockRequestBody(calledRequestParam?.body)).toEqual({ + post: [ + expect.objectContaining({ + id: runId, + run_type: "llm", + inputs: { + text: "hello world", + }, + trace_id: runId, + dotted_order: dottedOrder, + }), + ], + patch: [], + }); - const calledRequestParam: any = callSpy.mock.calls[0][2]; - expect(JSON.parse(calledRequestParam?.body)).toEqual({ - post: [ + expect(callSpy).toHaveBeenCalledWith( + _getFetchImplementation(), + expectedTraceURL, expect.objectContaining({ - id: runId, - run_type: "llm", - inputs: { - text: "hello world", - }, - trace_id: runId, - dotted_order: dottedOrder, - }), - ], - patch: [], + body: expect.any(endpointType === "batch" ? String : FormData), + }) + ); }); - expect(callSpy).toHaveBeenCalledWith( - _getFetchImplementation(), - "https://api.smith.langchain.com/runs/batch", - expect.objectContaining({ body: expect.any(String) }) - ); - }); - - it("should not throw an error if fetch fails for batch requests", async () => { - const client = new Client({ - apiKey: "test-api-key", - autoBatchTracing: true, - }); - jest.spyOn((client as any).caller, "call").mockImplementation(() => { - throw new Error("Totally expected mock error"); - }); - jest - .spyOn((client as any).batchIngestCaller, "call") - .mockImplementation(() => { + it("should not throw an error if fetch fails for batch requests", async () => { + const client = new Client({ + apiKey: "test-api-key", + autoBatchTracing: true, + }); + jest.spyOn((client as any).caller, "call").mockImplementation(() => { throw new Error("Totally expected mock error"); }); - jest - .spyOn(client as any, "batchEndpointIsSupported") - .mockResolvedValue(true); - const projectName = "__test_batch"; - - const runId = uuidv4(); - const dottedOrder = convertToDottedOrderFormat( - new Date().getTime() / 1000, - runId - ); - - await client.createRun({ - id: runId, - project_name: projectName, - name: "test_run", - run_type: "llm", - inputs: { text: "hello world" }, - trace_id: runId, - dotted_order: dottedOrder, + jest + .spyOn((client as any).batchIngestCaller, "call") + .mockImplementation(() => { + throw new Error("Totally expected mock error"); + }); + jest.spyOn(client as any, "_getServerInfo").mockImplementation(() => { + return { + version: "foo", + batch_ingest_config: { ...extraBatchIngestConfig }, + }; + }); + const projectName = "__test_batch"; + + const runId = uuidv4(); + const dottedOrder = convertToDottedOrderFormat( + new Date().getTime() / 1000, + runId + ); + + await client.createRun({ + id: runId, + project_name: projectName, + name: "test_run", + run_type: "llm", + inputs: { text: "hello world" }, + trace_id: runId, + dotted_order: dottedOrder, + }); + + await new Promise((resolve) => setTimeout(resolve, 300)); }); - await new Promise((resolve) => setTimeout(resolve, 300)); - }); + it("Create + update batching should merge into a single call", async () => { + const client = new Client({ + apiKey: "test-api-key", + autoBatchTracing: true, + }); + const callSpy = jest + .spyOn((client as any).batchIngestCaller, "call") + .mockResolvedValue({ + ok: true, + text: () => "", + }); + jest.spyOn(client as any, "_getServerInfo").mockImplementation(() => { + return { + version: "foo", + batch_ingest_config: { ...extraBatchIngestConfig }, + }; + }); + const projectName = "__test_batch"; + + const runId = uuidv4(); + const dottedOrder = convertToDottedOrderFormat( + new Date().getTime() / 1000, + runId + ); + await client.createRun({ + id: runId, + project_name: projectName, + name: "test_run", + run_type: "llm", + inputs: { text: "hello world" }, + trace_id: runId, + dotted_order: dottedOrder, + }); - it("Create + update batching should merge into a single call", async () => { - const client = new Client({ - apiKey: "test-api-key", - autoBatchTracing: true, - }); - const callSpy = jest - .spyOn((client as any).batchIngestCaller, "call") - .mockResolvedValue({ - ok: true, - text: () => "", - }); - jest - .spyOn(client as any, "batchEndpointIsSupported") - .mockResolvedValue(true); - const projectName = "__test_batch"; - - const runId = uuidv4(); - const dottedOrder = convertToDottedOrderFormat( - new Date().getTime() / 1000, - runId - ); - await client.createRun({ - id: runId, - project_name: projectName, - name: "test_run", - run_type: "llm", - inputs: { text: "hello world" }, - trace_id: runId, - dotted_order: dottedOrder, - }); + const endTime = Math.floor(new Date().getTime() / 1000); - const endTime = Math.floor(new Date().getTime() / 1000); + await client.updateRun(runId, { + outputs: { output: ["Hi"] }, + dotted_order: dottedOrder, + trace_id: runId, + end_time: endTime, + }); - await client.updateRun(runId, { - outputs: { output: ["Hi"] }, - dotted_order: dottedOrder, - trace_id: runId, - end_time: endTime, - }); + await new Promise((resolve) => setTimeout(resolve, 100)); - await new Promise((resolve) => setTimeout(resolve, 100)); + const calledRequestParam: any = callSpy.mock.calls[0][2]; + expect(await parseMockRequestBody(calledRequestParam?.body)).toEqual({ + post: [ + expect.objectContaining({ + id: runId, + run_type: "llm", + inputs: { + text: "hello world", + }, + outputs: { + output: ["Hi"], + }, + end_time: endTime, + trace_id: runId, + dotted_order: dottedOrder, + }), + ], + patch: [], + }); - const calledRequestParam: any = callSpy.mock.calls[0][2]; - expect(JSON.parse(calledRequestParam?.body)).toEqual({ - post: [ + expect(callSpy).toHaveBeenCalledWith( + _getFetchImplementation(), + expectedTraceURL, expect.objectContaining({ - id: runId, - run_type: "llm", - inputs: { - text: "hello world", - }, - outputs: { - output: ["Hi"], - }, - end_time: endTime, - trace_id: runId, - dotted_order: dottedOrder, - }), - ], - patch: [], + body: expect.any(endpointType === "batch" ? String : FormData), + }) + ); }); - expect(callSpy).toHaveBeenCalledWith( - _getFetchImplementation(), - "https://api.smith.langchain.com/runs/batch", - expect.objectContaining({ body: expect.any(String) }) - ); - }); - - it("should immediately trigger a batch on root run end", async () => { - const client = new Client({ - apiKey: "test-api-key", - autoBatchTracing: true, - }); - const callSpy = jest - .spyOn((client as any).batchIngestCaller, "call") - .mockResolvedValue({ - ok: true, - text: () => "", - }); - jest - .spyOn(client as any, "batchEndpointIsSupported") - .mockResolvedValue(true); - const projectName = "__test_batch"; - - const runId = uuidv4(); - const dottedOrder = convertToDottedOrderFormat( - new Date().getTime() / 1000, - runId - ); - await client.createRun({ - id: runId, - project_name: projectName, - name: "test_run", - run_type: "llm", - inputs: { text: "hello world" }, - trace_id: runId, - dotted_order: dottedOrder, - }); + it("server info fetch should retry even if initial call fails", async () => { + const client = new Client({ + apiKey: "test-api-key", + autoBatchTracing: true, + }); + const callSpy = jest + .spyOn((client as any).batchIngestCaller, "call") + .mockResolvedValue({ + ok: true, + text: () => "", + }); + let serverInfoFailedOnce = false; + jest.spyOn(client as any, "_getServerInfo").mockImplementationOnce(() => { + serverInfoFailedOnce = true; + throw new Error("[MOCK] Connection error."); + }); + jest.spyOn(client as any, "_getServerInfo").mockImplementation(() => { + return { + version: "foo", + batch_ingest_config: { ...extraBatchIngestConfig }, + }; + }); + const projectName = "__test_batch"; + + const runId = uuidv4(); + const dottedOrder = convertToDottedOrderFormat( + new Date().getTime() / 1000, + runId + ); + await client.createRun({ + id: runId, + project_name: projectName, + name: "test_run", + run_type: "llm", + inputs: { text: "hello world" }, + trace_id: runId, + dotted_order: dottedOrder, + }); - // Wait for first batch to send - await new Promise((resolve) => setTimeout(resolve, 300)); + const endTime = Math.floor(new Date().getTime() / 1000); - const endTime = Math.floor(new Date().getTime() / 1000); + await client.updateRun(runId, { + outputs: { output: ["Hi"] }, + dotted_order: dottedOrder, + trace_id: runId, + end_time: endTime, + }); - // A root run finishing triggers the second batch - await client.updateRun(runId, { - outputs: { output: ["Hi"] }, - dotted_order: dottedOrder, - trace_id: runId, - end_time: endTime, - }); + await new Promise((resolve) => setTimeout(resolve, 100)); - const runId2 = uuidv4(); - const dottedOrder2 = convertToDottedOrderFormat( - new Date().getTime() / 1000, - runId2 - ); - - // Will send in a third batch, even though it's triggered around the same time as the update - await client.createRun({ - id: runId2, - project_name: projectName, - name: "test_run", - run_type: "llm", - inputs: { text: "hello world 2" }, - trace_id: runId2, - dotted_order: dottedOrder2, - }); + expect(serverInfoFailedOnce).toBe(true); - await new Promise((resolve) => setTimeout(resolve, 300)); + const calledRequestParam: any = callSpy.mock.calls[0][2]; + expect(await parseMockRequestBody(calledRequestParam?.body)).toEqual({ + post: [ + expect.objectContaining({ + id: runId, + run_type: "llm", + inputs: { + text: "hello world", + }, + outputs: { + output: ["Hi"], + }, + end_time: endTime, + trace_id: runId, + dotted_order: dottedOrder, + }), + ], + patch: [], + }); - const calledRequestParam: any = callSpy.mock.calls[0][2]; - const calledRequestParam2: any = callSpy.mock.calls[1][2]; - const calledRequestParam3: any = callSpy.mock.calls[2][2]; - expect(JSON.parse(calledRequestParam?.body)).toEqual({ - post: [ + expect(callSpy).toHaveBeenCalledWith( + _getFetchImplementation(), + expectedTraceURL, expect.objectContaining({ - id: runId, - run_type: "llm", - inputs: { - text: "hello world", - }, - trace_id: runId, - dotted_order: dottedOrder, - }), - ], - patch: [], + body: expect.any(endpointType === "batch" ? String : FormData), + }) + ); }); - expect(JSON.parse(calledRequestParam2?.body)).toEqual({ - post: [], - patch: [ - expect.objectContaining({ - id: runId, - dotted_order: dottedOrder, - trace_id: runId, - end_time: endTime, - outputs: { - output: ["Hi"], - }, - }), - ], - }); - expect(JSON.parse(calledRequestParam3?.body)).toEqual({ - post: [ - expect.objectContaining({ - id: runId2, - run_type: "llm", - inputs: { - text: "hello world 2", - }, - trace_id: runId2, - dotted_order: dottedOrder2, - }), - ], - patch: [], - }); - }); + it("should immediately trigger a batch on root run end", async () => { + const client = new Client({ + apiKey: "test-api-key", + autoBatchTracing: true, + }); + const callSpy = jest + .spyOn((client as any).batchIngestCaller, "call") + .mockResolvedValue({ + ok: true, + text: () => "", + }); + jest.spyOn(client as any, "_getServerInfo").mockImplementation(() => { + return { + version: "foo", + batch_ingest_config: { ...extraBatchIngestConfig }, + }; + }); + const projectName = "__test_batch"; + + const runId = uuidv4(); + const dottedOrder = convertToDottedOrderFormat( + new Date().getTime() / 1000, + runId + ); + await client.createRun({ + id: runId, + project_name: projectName, + name: "test_run", + run_type: "llm", + inputs: { text: "hello world" }, + trace_id: runId, + dotted_order: dottedOrder, + }); - it("should not trigger a batch on root run end and instead batch call with previous batch if blockOnRootRunFinalization is false", async () => { - const client = new Client({ - apiKey: "test-api-key", - autoBatchTracing: true, - blockOnRootRunFinalization: false, - }); - const callSpy = jest - .spyOn((client as any).batchIngestCaller, "call") - .mockResolvedValue({ - ok: true, - text: () => "", - }); - jest - .spyOn(client as any, "batchEndpointIsSupported") - .mockResolvedValue(true); - const projectName = "__test_batch"; - - const runId = uuidv4(); - const dottedOrder = convertToDottedOrderFormat( - new Date().getTime() / 1000, - runId - ); - await client.createRun({ - id: runId, - project_name: projectName, - name: "test_run", - run_type: "llm", - inputs: { text: "hello world" }, - trace_id: runId, - dotted_order: dottedOrder, - }); + // Wait for first batch to send + await new Promise((resolve) => setTimeout(resolve, 300)); - expect((client as any).autoBatchQueue.size).toBe(1); - // Wait for first batch to send - await new Promise((resolve) => setTimeout(resolve, 300)); - expect((client as any).autoBatchQueue.size).toBe(0); + const endTime = Math.floor(new Date().getTime() / 1000); - const endTime = Math.floor(new Date().getTime() / 1000); + // A root run finishing triggers the second batch + await client.updateRun(runId, { + outputs: { output: ["Hi"] }, + dotted_order: dottedOrder, + trace_id: runId, + end_time: endTime, + }); - // Start the the second batch - await client.updateRun(runId, { - outputs: { output: ["Hi"] }, - dotted_order: dottedOrder, - trace_id: runId, - end_time: endTime, - }); + const runId2 = uuidv4(); + const dottedOrder2 = convertToDottedOrderFormat( + new Date().getTime() / 1000, + runId2 + ); + + // Will send in a third batch, even though it's triggered around the same time as the update + await client.createRun({ + id: runId2, + project_name: projectName, + name: "test_run", + run_type: "llm", + inputs: { text: "hello world 2" }, + trace_id: runId2, + dotted_order: dottedOrder2, + }); + + await new Promise((resolve) => setTimeout(resolve, 300)); + + const calledRequestParam: any = callSpy.mock.calls[0][2]; + const calledRequestParam2: any = callSpy.mock.calls[1][2]; + const calledRequestParam3: any = callSpy.mock.calls[2][2]; + expect(await parseMockRequestBody(calledRequestParam?.body)).toEqual({ + post: [ + expect.objectContaining({ + id: runId, + run_type: "llm", + inputs: { + text: "hello world", + }, + trace_id: runId, + dotted_order: dottedOrder, + }), + ], + patch: [], + }); - const runId2 = uuidv4(); - const dottedOrder2 = convertToDottedOrderFormat( - new Date().getTime() / 1000, - runId2 - ); - - // Should aggregate on the second batch - await client.createRun({ - id: runId2, - project_name: projectName, - name: "test_run", - run_type: "llm", - inputs: { text: "hello world 2" }, - trace_id: runId2, - dotted_order: dottedOrder2, + expect(await parseMockRequestBody(calledRequestParam2?.body)).toEqual({ + post: [], + patch: [ + expect.objectContaining({ + id: runId, + dotted_order: dottedOrder, + trace_id: runId, + end_time: endTime, + outputs: { + output: ["Hi"], + }, + }), + ], + }); + expect(await parseMockRequestBody(calledRequestParam3?.body)).toEqual({ + post: [ + expect.objectContaining({ + id: runId2, + run_type: "llm", + inputs: { + text: "hello world 2", + }, + trace_id: runId2, + dotted_order: dottedOrder2, + }), + ], + patch: [], + }); }); - // 2 runs in the queue - expect((client as any).autoBatchQueue.size).toBe(2); - await client.awaitPendingTraceBatches(); - expect((client as any).autoBatchQueue.size).toBe(0); + it("should not trigger a batch on root run end and instead batch call with previous batch if blockOnRootRunFinalization is false", async () => { + const client = new Client({ + apiKey: "test-api-key", + autoBatchTracing: true, + blockOnRootRunFinalization: false, + }); + const callSpy = jest + .spyOn((client as any).batchIngestCaller, "call") + .mockResolvedValue({ + ok: true, + text: () => "", + }); + jest.spyOn(client as any, "_getServerInfo").mockImplementation(() => { + return { + version: "foo", + batch_ingest_config: { ...extraBatchIngestConfig }, + }; + }); + const projectName = "__test_batch"; + + const runId = uuidv4(); + const dottedOrder = convertToDottedOrderFormat( + new Date().getTime() / 1000, + runId + ); + await client.createRun({ + id: runId, + project_name: projectName, + name: "test_run", + run_type: "llm", + inputs: { text: "hello world" }, + trace_id: runId, + dotted_order: dottedOrder, + }); - expect(callSpy.mock.calls.length).toEqual(2); - const calledRequestParam: any = callSpy.mock.calls[0][2]; - const calledRequestParam2: any = callSpy.mock.calls[1][2]; - expect(JSON.parse(calledRequestParam?.body)).toEqual({ - post: [ - expect.objectContaining({ - id: runId, - run_type: "llm", - inputs: { - text: "hello world", - }, - trace_id: runId, - dotted_order: dottedOrder, - }), - ], - patch: [], - }); + expect((client as any).autoBatchQueue.items.length).toBe(1); + // Wait for first batch to send + await new Promise((resolve) => setTimeout(resolve, 300)); + expect((client as any).autoBatchQueue.items.length).toBe(0); - expect(JSON.parse(calledRequestParam2?.body)).toEqual({ - post: [ - expect.objectContaining({ - id: runId2, - run_type: "llm", - inputs: { - text: "hello world 2", - }, - trace_id: runId2, - dotted_order: dottedOrder2, - }), - ], - patch: [ - expect.objectContaining({ - id: runId, - dotted_order: dottedOrder, - trace_id: runId, - end_time: endTime, - outputs: { - output: ["Hi"], - }, - }), - ], - }); - }); + const endTime = Math.floor(new Date().getTime() / 1000); - it("should send traces above the batch size and see even batches", async () => { - const client = new Client({ - apiKey: "test-api-key", - pendingAutoBatchedRunLimit: 10, - autoBatchTracing: true, - }); - const callSpy = jest - .spyOn((client as any).batchIngestCaller, "call") - .mockResolvedValue({ - ok: true, - text: () => "", - }); - jest - .spyOn(client as any, "batchEndpointIsSupported") - .mockResolvedValue(true); - const projectName = "__test_batch"; - - const runIds = await Promise.all( - [...Array(15)].map(async (_, i) => { - const runId = uuidv4(); - const dottedOrder = convertToDottedOrderFormat( - new Date().getTime() / 1000, - runId - ); - await client.createRun({ - id: runId, - project_name: projectName, - name: "test_run " + i, - run_type: "llm", - inputs: { text: "hello world " + i }, - trace_id: runId, - dotted_order: dottedOrder, - }); - return runId; - }) - ); + // Start the the second batch + await client.updateRun(runId, { + outputs: { output: ["Hi"] }, + dotted_order: dottedOrder, + trace_id: runId, + end_time: endTime, + }); - await new Promise((resolve) => setTimeout(resolve, 10)); + const runId2 = uuidv4(); + const dottedOrder2 = convertToDottedOrderFormat( + new Date().getTime() / 1000, + runId2 + ); + + // Should aggregate on the second batch + await client.createRun({ + id: runId2, + project_name: projectName, + name: "test_run", + run_type: "llm", + inputs: { text: "hello world 2" }, + trace_id: runId2, + dotted_order: dottedOrder2, + }); - const calledRequestParam: any = callSpy.mock.calls[0][2]; - const calledRequestParam2: any = callSpy.mock.calls[1][2]; + // 2 runs in the queue + expect((client as any).autoBatchQueue.items.length).toBe(2); + await client.awaitPendingTraceBatches(); + expect((client as any).autoBatchQueue.items.length).toBe(0); + + expect(callSpy.mock.calls.length).toEqual(2); + const calledRequestParam: any = callSpy.mock.calls[0][2]; + const calledRequestParam2: any = callSpy.mock.calls[1][2]; + expect(await parseMockRequestBody(calledRequestParam?.body)).toEqual({ + post: [ + expect.objectContaining({ + id: runId, + run_type: "llm", + inputs: { + text: "hello world", + }, + trace_id: runId, + dotted_order: dottedOrder, + }), + ], + patch: [], + }); - // Queue should drain as soon as size limit is reached, - // sending both batches - expect(JSON.parse(calledRequestParam?.body)).toEqual({ - post: runIds.slice(0, 10).map((runId, i) => - expect.objectContaining({ - id: runId, - run_type: "llm", - inputs: { - text: "hello world " + i, - }, - trace_id: runId, - }) - ), - patch: [], + expect(await parseMockRequestBody(calledRequestParam2?.body)).toEqual({ + post: [ + expect.objectContaining({ + id: runId2, + run_type: "llm", + inputs: { + text: "hello world 2", + }, + trace_id: runId2, + dotted_order: dottedOrder2, + }), + ], + patch: [ + expect.objectContaining({ + id: runId, + dotted_order: dottedOrder, + trace_id: runId, + end_time: endTime, + outputs: { + output: ["Hi"], + }, + }), + ], + }); }); - expect(JSON.parse(calledRequestParam2?.body)).toEqual({ - post: runIds.slice(10).map((runId, i) => - expect.objectContaining({ - id: runId, - run_type: "llm", - inputs: { - text: "hello world " + (i + 10), - }, - trace_id: runId, + it("should send traces above the batch size and see even batches", async () => { + const client = new Client({ + apiKey: "test-api-key", + batchSizeBytesLimit: 10000, + autoBatchTracing: true, + }); + const callSpy = jest + .spyOn((client as any).batchIngestCaller, "call") + .mockResolvedValue({ + ok: true, + text: () => "", + }); + jest.spyOn(client as any, "_getServerInfo").mockImplementation(() => { + return { + version: "foo", + batch_ingest_config: { ...extraBatchIngestConfig }, + }; + }); + const projectName = "__test_batch"; + + const runIds = await Promise.all( + [...Array(15)].map(async (_, i) => { + const runId = uuidv4(); + const dottedOrder = convertToDottedOrderFormat( + new Date().getTime() / 1000, + runId + ); + const params = mergeRuntimeEnvIntoRunCreate({ + id: runId, + project_name: projectName, + name: "test_run " + i, + run_type: "llm", + inputs: { text: "hello world " + i }, + trace_id: runId, + dotted_order: dottedOrder, + } as RunCreate); + // Allow some extra space for other request properties + const mockRunSize = 950; + const padCount = mockRunSize - JSON.stringify(params).length; + params.inputs.text = params.inputs.text + "x".repeat(padCount); + await client.createRun(params); + return runId; }) - ), - patch: [], - }); - }); + ); + + await new Promise((resolve) => setTimeout(resolve, 10)); + + const calledRequestParam: any = callSpy.mock.calls[0][2]; + const calledRequestParam2: any = callSpy.mock.calls[1][2]; + + // Queue should drain as soon as size limit is reached, + // sending both batches + expect(await parseMockRequestBody(calledRequestParam?.body)).toEqual({ + post: runIds.slice(0, 10).map((runId, i) => + expect.objectContaining({ + id: runId, + run_type: "llm", + inputs: { + text: expect.stringContaining("hello world " + i), + }, + trace_id: runId, + }) + ), + patch: [], + }); - it("should send traces above the batch size limit in bytes and see even batches", async () => { - const client = new Client({ - apiKey: "test-api-key", - pendingAutoBatchedRunLimit: 10, - autoBatchTracing: true, - }); - const callSpy = jest - .spyOn((client as any).batchIngestCaller, "call") - .mockResolvedValue({ - ok: true, - text: () => "", - }); - jest.spyOn(client as any, "_getServerInfo").mockResolvedValue({ - batch_ingest_config: { - size_limit_bytes: 1, - }, + expect(await parseMockRequestBody(calledRequestParam2?.body)).toEqual({ + post: runIds.slice(10).map((runId, i) => + expect.objectContaining({ + id: runId, + run_type: "llm", + inputs: { + text: expect.stringContaining("hello world " + (i + 10)), + }, + trace_id: runId, + }) + ), + patch: [], + }); }); - const projectName = "__test_batch"; - - const runIds = await Promise.all( - [...Array(4)].map(async (_, i) => { - const runId = uuidv4(); - const dottedOrder = convertToDottedOrderFormat( - new Date().getTime() / 1000, - runId - ); - await client.createRun({ - id: runId, - project_name: projectName, - name: "test_run " + i, - run_type: "llm", - inputs: { text: "hello world " + i }, - trace_id: runId, - dotted_order: dottedOrder, - }); - return runId; - }) - ); - - await new Promise((resolve) => setTimeout(resolve, 300)); - - expect(callSpy.mock.calls.length).toEqual(4); - - const calledRequestParam: any = callSpy.mock.calls[0][2]; - const calledRequestParam2: any = callSpy.mock.calls[1][2]; - const calledRequestParam3: any = callSpy.mock.calls[2][2]; - const calledRequestParam4: any = callSpy.mock.calls[3][2]; - // Queue should drain as soon as byte size limit of 1 is reached, - // sending each call individually - expect(JSON.parse(calledRequestParam?.body)).toEqual({ - post: [ - expect.objectContaining({ - id: runIds[0], - run_type: "llm", - inputs: { - text: "hello world 0", + it("a very low batch size limit should be equivalent to single calls", async () => { + const client = new Client({ + apiKey: "test-api-key", + batchSizeBytesLimit: 1, + autoBatchTracing: true, + }); + const callSpy = jest + .spyOn((client as any).batchIngestCaller, "call") + .mockResolvedValue({ + ok: true, + text: () => "", + }); + jest.spyOn(client as any, "_getServerInfo").mockImplementation(() => { + return { + version: "foo", + batch_ingest_config: { + ...extraBatchIngestConfig, }, - trace_id: runIds[0], - }), - ], - patch: [], - }); + }; + }); + const projectName = "__test_batch"; + + const runIds = await Promise.all( + [...Array(4)].map(async (_, i) => { + const runId = uuidv4(); + const dottedOrder = convertToDottedOrderFormat( + new Date().getTime() / 1000, + runId + ); + await client.createRun({ + id: runId, + project_name: projectName, + name: "test_run " + i, + run_type: "llm", + inputs: { text: "hello world " + i }, + trace_id: runId, + dotted_order: dottedOrder, + }); + return runId; + }) + ); + + await new Promise((resolve) => setTimeout(resolve, 300)); + + expect(callSpy.mock.calls.length).toEqual(4); + + const calledRequestParam: any = callSpy.mock.calls[0][2]; + const calledRequestParam2: any = callSpy.mock.calls[1][2]; + const calledRequestParam3: any = callSpy.mock.calls[2][2]; + const calledRequestParam4: any = callSpy.mock.calls[3][2]; + + // Queue should drain as soon as byte size limit of 1 is reached, + // sending each call individually + expect(await parseMockRequestBody(calledRequestParam?.body)).toEqual({ + post: [ + expect.objectContaining({ + id: runIds[0], + run_type: "llm", + inputs: { + text: "hello world 0", + }, + trace_id: runIds[0], + }), + ], + patch: [], + }); - expect(JSON.parse(calledRequestParam2?.body)).toEqual({ - post: [ - expect.objectContaining({ - id: runIds[1], - run_type: "llm", - inputs: { - text: "hello world 1", - }, - trace_id: runIds[1], - }), - ], - patch: [], - }); + expect(await parseMockRequestBody(calledRequestParam2?.body)).toEqual({ + post: [ + expect.objectContaining({ + id: runIds[1], + run_type: "llm", + inputs: { + text: "hello world 1", + }, + trace_id: runIds[1], + }), + ], + patch: [], + }); - expect(JSON.parse(calledRequestParam3?.body)).toEqual({ - post: [ - expect.objectContaining({ - id: runIds[2], - run_type: "llm", - inputs: { - text: "hello world 2", - }, - trace_id: runIds[2], - }), - ], - patch: [], - }); + expect(await parseMockRequestBody(calledRequestParam3?.body)).toEqual({ + post: [ + expect.objectContaining({ + id: runIds[2], + run_type: "llm", + inputs: { + text: "hello world 2", + }, + trace_id: runIds[2], + }), + ], + patch: [], + }); - expect(JSON.parse(calledRequestParam4?.body)).toEqual({ - post: [ - expect.objectContaining({ - id: runIds[3], - run_type: "llm", - inputs: { - text: "hello world 3", - }, - trace_id: runIds[3], - }), - ], - patch: [], + expect(await parseMockRequestBody(calledRequestParam4?.body)).toEqual({ + post: [ + expect.objectContaining({ + id: runIds[3], + run_type: "llm", + inputs: { + text: "hello world 3", + }, + trace_id: runIds[3], + }), + ], + patch: [], + }); }); - }); - it("If batching is unsupported, fall back to old endpoint", async () => { - const client = new Client({ - apiKey: "test-api-key", - autoBatchTracing: true, - }); - const callSpy = jest - .spyOn((client as any).caller, "call") - .mockResolvedValue({ - ok: true, - text: () => "", - }); - jest - .spyOn(client as any, "batchEndpointIsSupported") - .mockResolvedValue(false); - const projectName = "__test_batch"; - - const runId = uuidv4(); - const dottedOrder = convertToDottedOrderFormat( - new Date().getTime() / 1000, - runId - ); - await client.createRun({ - id: runId, - project_name: projectName, - name: "test_run", - run_type: "llm", - inputs: { text: "hello world" }, - trace_id: runId, - dotted_order: dottedOrder, - }); + it("If batching is unsupported, fall back to old endpoint", async () => { + const client = new Client({ + apiKey: "test-api-key", + autoBatchTracing: true, + }); + const callSpy = jest + .spyOn((client as any).caller, "call") + .mockResolvedValue({ + ok: true, + text: () => "", + }); + jest.spyOn(client as any, "_getServerInfo").mockImplementation(() => { + return {}; + }); + const projectName = "__test_batch"; + + const runId = uuidv4(); + const dottedOrder = convertToDottedOrderFormat( + new Date().getTime() / 1000, + runId + ); + await client.createRun({ + id: runId, + project_name: projectName, + name: "test_run", + run_type: "llm", + inputs: { text: "hello world" }, + trace_id: runId, + dotted_order: dottedOrder, + }); - await new Promise((resolve) => setTimeout(resolve, 300)); - - const calledRequestParam: any = callSpy.mock.calls[0][2]; - expect(JSON.parse(calledRequestParam?.body)).toMatchObject({ - id: runId, - session_name: projectName, - extra: expect.anything(), - start_time: expect.any(Number), - name: "test_run", - run_type: "llm", - inputs: { text: "hello world" }, - trace_id: runId, - dotted_order: dottedOrder, - }); + await new Promise((resolve) => setTimeout(resolve, 300)); + + const calledRequestParam: any = callSpy.mock.calls[0][2]; + expect( + await parseMockRequestBody(calledRequestParam?.body) + ).toMatchObject({ + id: runId, + session_name: projectName, + extra: expect.anything(), + start_time: expect.any(Number), + name: "test_run", + run_type: "llm", + inputs: { text: "hello world" }, + trace_id: runId, + dotted_order: dottedOrder, + }); - expect(callSpy).toHaveBeenCalledWith( - _getFetchImplementation(), - "https://api.smith.langchain.com/runs", - expect.objectContaining({ body: expect.any(String) }) - ); - }); - - it("Should handle circular values", async () => { - const client = new Client({ - apiKey: "test-api-key", - autoBatchTracing: true, - }); - const callSpy = jest - .spyOn((client as any).batchIngestCaller, "call") - .mockResolvedValue({ - ok: true, - text: () => "", - }); - jest - .spyOn(client as any, "batchEndpointIsSupported") - .mockResolvedValue(true); - const projectName = "__test_batch"; - const a: Record = {}; - const b: Record = {}; - a.b = b; - b.a = a; - - const runId = uuidv4(); - const dottedOrder = convertToDottedOrderFormat( - new Date().getTime() / 1000, - runId - ); - await client.createRun({ - id: runId, - project_name: projectName, - name: "test_run", - run_type: "llm", - inputs: a, - trace_id: runId, - dotted_order: dottedOrder, + expect(callSpy).toHaveBeenCalledWith( + _getFetchImplementation(), + "https://api.smith.langchain.com/runs", + expect.objectContaining({ + body: expect.any(String), + }) + ); }); - const endTime = Math.floor(new Date().getTime() / 1000); + it("Should handle circular values", async () => { + const client = new Client({ + apiKey: "test-api-key", + autoBatchTracing: true, + }); + const callSpy = jest + .spyOn((client as any).batchIngestCaller, "call") + .mockResolvedValue({ + ok: true, + text: () => "", + }); + jest.spyOn(client as any, "_getServerInfo").mockImplementation(() => { + return { + version: "foo", + batch_ingest_config: { ...extraBatchIngestConfig }, + }; + }); + const projectName = "__test_batch"; + const a: Record = {}; + const b: Record = {}; + a.b = b; + b.a = a; + + const runId = uuidv4(); + const dottedOrder = convertToDottedOrderFormat( + new Date().getTime() / 1000, + runId + ); + await client.createRun({ + id: runId, + project_name: projectName, + name: "test_run", + run_type: "llm", + inputs: a, + trace_id: runId, + dotted_order: dottedOrder, + }); - await client.updateRun(runId, { - outputs: b, - dotted_order: dottedOrder, - trace_id: runId, - end_time: endTime, - }); + const endTime = Math.floor(new Date().getTime() / 1000); - await new Promise((resolve) => setTimeout(resolve, 100)); + await client.updateRun(runId, { + outputs: b, + dotted_order: dottedOrder, + trace_id: runId, + end_time: endTime, + }); - const calledRequestParam: any = callSpy.mock.calls[0][2]; - expect(JSON.parse(calledRequestParam?.body)).toEqual({ - post: [ - expect.objectContaining({ - id: runId, - run_type: "llm", - inputs: { - b: { - a: { - result: "[Circular]", + await new Promise((resolve) => setTimeout(resolve, 100)); + + const calledRequestParam: any = callSpy.mock.calls[0][2]; + expect(await parseMockRequestBody(calledRequestParam?.body)).toEqual({ + post: [ + expect.objectContaining({ + id: runId, + run_type: "llm", + inputs: { + b: { + a: { + result: "[Circular]", + }, }, }, - }, - outputs: { - a: { - result: "[Circular]", + outputs: { + a: + // Stringification happens at a different level + endpointType === "batch" + ? { + result: "[Circular]", + } + : { + b: { + result: "[Circular]", + }, + }, }, - }, - end_time: endTime, - trace_id: runId, - dotted_order: dottedOrder, - }), - ], - patch: [], - }); + end_time: endTime, + trace_id: runId, + dotted_order: dottedOrder, + }), + ], + patch: [], + }); - expect(callSpy).toHaveBeenCalledWith( - _getFetchImplementation(), - "https://api.smith.langchain.com/runs/batch", - expect.objectContaining({ body: expect.any(String) }) - ); - }); -}); + expect(callSpy).toHaveBeenCalledWith( + _getFetchImplementation(), + expectedTraceURL, + expect.objectContaining({ + body: expect.any(endpointType === "batch" ? String : FormData), + }) + ); + }); + } +); diff --git a/js/src/tests/test_data/parrot-icon.png b/js/src/tests/test_data/parrot-icon.png new file mode 100644 index 000000000..7fd3de1dc Binary files /dev/null and b/js/src/tests/test_data/parrot-icon.png differ diff --git a/js/src/utils/env.ts b/js/src/utils/env.ts index 535ef2772..e02eae3a8 100644 --- a/js/src/utils/env.ts +++ b/js/src/utils/env.ts @@ -69,7 +69,7 @@ export type RuntimeEnvironment = { let runtimeEnvironment: RuntimeEnvironment | undefined; -export async function getRuntimeEnvironment(): Promise { +export function getRuntimeEnvironment(): RuntimeEnvironment { if (runtimeEnvironment === undefined) { const env = getEnv(); const releaseEnv = getShas();