Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(js): Limit queue batch concurrency, bump timeout, add maximum wait period for serverless envs #1124

Merged
merged 13 commits into from
Oct 25, 2024
10 changes: 5 additions & 5 deletions js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "langsmith",
"version": "0.1.67",
"version": "0.2.0",
"description": "Client library to connect to the LangSmith LLM Tracing and Evaluation Platform.",
"packageManager": "[email protected]",
"files": [
Expand Down Expand Up @@ -109,9 +109,9 @@
"@babel/preset-env": "^7.22.4",
"@faker-js/faker": "^8.4.1",
"@jest/globals": "^29.5.0",
"@langchain/core": "^0.3.1",
"@langchain/langgraph": "^0.2.3",
"@langchain/openai": "^0.3.0",
"@langchain/core": "^0.3.14",
"@langchain/langgraph": "^0.2.18",
"@langchain/openai": "^0.3.11",
"@tsconfig/recommended": "^1.0.2",
"@types/jest": "^29.5.1",
"@typescript-eslint/eslint-plugin": "^5.59.8",
Expand All @@ -126,7 +126,7 @@
"eslint-plugin-no-instanceof": "^1.0.1",
"eslint-plugin-prettier": "^4.2.1",
"jest": "^29.5.0",
"langchain": "^0.3.2",
"langchain": "^0.3.3",
"openai": "^4.67.3",
"prettier": "^2.8.8",
"ts-jest": "^29.1.0",
Expand Down
80 changes: 51 additions & 29 deletions js/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
isLangChainMessage,
} from "./utils/messages.js";
import {
getEnvironmentVariable,
getLangChainEnvVarsMetadata,
getLangSmithEnvironmentVariable,
getRuntimeEnvironment,
Expand Down Expand Up @@ -74,6 +75,7 @@
autoBatchTracing?: boolean;
batchSizeBytesLimit?: number;
blockOnRootRunFinalization?: boolean;
traceBatchConcurrency?: number;
fetchOptions?: RequestInit;
}

Expand Down Expand Up @@ -417,7 +419,7 @@
// If there is an item on the queue we were unable to pop,
// just return it as a single batch.
if (popped.length === 0 && this.items.length > 0) {
const item = this.items.shift()!;

Check warning on line 422 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

Forbidden non-null assertion
popped.push(item);
poppedSizeBytes += item.size;
this.sizeBytes -= item.size;
Expand Down Expand Up @@ -473,7 +475,10 @@

private settings: Promise<LangSmithSettings> | null;

private blockOnRootRunFinalization = true;
private blockOnRootRunFinalization =
getEnvironmentVariable("LANGSMITH_TRACING_BACKGROUND") === "false";

private traceBatchConcurrency = 5;

private _serverInfo: RecordStringAny | undefined;

Expand All @@ -493,9 +498,16 @@
if (this.webUrl?.endsWith("/")) {
this.webUrl = this.webUrl.slice(0, -1);
}
this.timeout_ms = config.timeout_ms ?? 12_000;
this.timeout_ms = config.timeout_ms ?? 90_000;
this.caller = new AsyncCaller(config.callerOptions ?? {});
this.traceBatchConcurrency =
config.traceBatchConcurrency ?? this.traceBatchConcurrency;
if (this.traceBatchConcurrency < 1) {
throw new Error("Trace batch concurrency must be positive.");
}
this.batchIngestCaller = new AsyncCaller({
maxRetries: 2,
maxConcurrency: this.traceBatchConcurrency,
...(config.callerOptions ?? {}),
onFailedResponseHook: handle429,
});
Expand Down Expand Up @@ -753,35 +765,44 @@
}

private async drainAutoBatchQueue() {
while (this.autoBatchQueue.items.length >= 0) {
const [batch, done] = this.autoBatchQueue.pop(
await this._getBatchSizeLimitBytes()
);
if (!batch.length) {
done();
return;
}
try {
const ingestParams = {
runCreates: batch
.filter((item) => item.action === "create")
.map((item) => item.item) as RunCreate[],
runUpdates: batch
.filter((item) => item.action === "update")
.map((item) => item.item) as RunUpdate[],
};
const serverInfo = await this._ensureServerInfo();
if (serverInfo?.batch_ingest_config?.use_multipart_endpoint) {
await this.multipartIngestRuns(ingestParams);
} else {
await this.batchIngestRuns(ingestParams);
const batchSizeLimit = await this._getBatchSizeLimitBytes();
while (this.autoBatchQueue.items.length > 0) {
for (let i = 0; i < this.traceBatchConcurrency; i++) {
const [batch, done] = this.autoBatchQueue.pop(batchSizeLimit);
if (!batch.length) {
done();
jacoblee93 marked this conversation as resolved.
Show resolved Hide resolved
break;
}
} finally {
done();
await this.processBatch(batch, done);
}
}
}

private async processBatch(batch: AutoBatchQueueItem[], done: () => void) {
if (!batch.length) {
done();
return;
}
try {
const ingestParams = {
runCreates: batch
.filter((item) => item.action === "create")
.map((item) => item.item) as RunCreate[],
runUpdates: batch
.filter((item) => item.action === "update")
.map((item) => item.item) as RunUpdate[],
};
const serverInfo = await this._ensureServerInfo();
if (serverInfo?.batch_ingest_config?.use_multipart_endpoint) {
await this.multipartIngestRuns(ingestParams);
} else {
await this.batchIngestRuns(ingestParams);
}
} finally {
done();
}
}

private async processRunOperation(
item: AutoBatchQueueItem,
immediatelyTriggerBatch?: boolean
Expand Down Expand Up @@ -1541,7 +1562,7 @@
treeFilter?: string;
isRoot?: boolean;
dataSourceType?: string;
}): Promise<any> {

Check warning on line 1565 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

Unexpected any. Specify a different type
let projectIds_ = projectIds || [];
if (projectNames) {
projectIds_ = [
Expand Down Expand Up @@ -1829,7 +1850,7 @@
`Failed to list shared examples: ${response.status} ${response.statusText}`
);
}
return result.map((example: any) => ({

Check warning on line 1853 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

Unexpected any. Specify a different type
...example,
_hostUrl: this.getHostUrl(),
}));
Expand Down Expand Up @@ -2972,7 +2993,7 @@
}

const feedbackResult = await evaluator.evaluateRun(run_, referenceExample);
const [_, feedbacks] = await this._logEvaluationFeedback(

Check warning on line 2996 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

'_' is assigned a value but never used
feedbackResult,
run_,
sourceInfo
Expand Down Expand Up @@ -3306,7 +3327,7 @@
async _logEvaluationFeedback(
evaluatorResponse: EvaluationResult | EvaluationResults,
run?: Run,
sourceInfo?: { [key: string]: any }

Check warning on line 3330 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

Unexpected any. Specify a different type
): Promise<[results: EvaluationResult[], feedbacks: Feedback[]]> {
const evalResults: Array<EvaluationResult> =
this._selectEvalResults(evaluatorResponse);
Expand Down Expand Up @@ -3345,7 +3366,7 @@
public async logEvaluationFeedback(
evaluatorResponse: EvaluationResult | EvaluationResults,
run?: Run,
sourceInfo?: { [key: string]: any }

Check warning on line 3369 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

Unexpected any. Specify a different type
): Promise<EvaluationResult[]> {
const [results] = await this._logEvaluationFeedback(
evaluatorResponse,
Expand Down Expand Up @@ -3613,7 +3634,7 @@
promptIdentifier: string,
like: boolean
): Promise<LikePromptResponse> {
const [owner, promptName, _] = parsePromptIdentifier(promptIdentifier);

Check warning on line 3637 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

'_' is assigned a value but never used
const response = await this.caller.call(
_getFetchImplementation(),
`${this.apiUrl}/likes/${owner}/${promptName}`,
Expand Down Expand Up @@ -3718,7 +3739,7 @@
}

public async getPrompt(promptIdentifier: string): Promise<Prompt | null> {
const [owner, promptName, _] = parsePromptIdentifier(promptIdentifier);

Check warning on line 3742 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

'_' is assigned a value but never used
const response = await this.caller.call(
_getFetchImplementation(),
`${this.apiUrl}/repos/${owner}/${promptName}`,
Expand Down Expand Up @@ -3762,7 +3783,7 @@
);
}

const [owner, promptName, _] = parsePromptIdentifier(promptIdentifier);

Check warning on line 3786 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

'_' is assigned a value but never used
if (!(await this._currentTenantIsOwner(owner))) {
throw await this._ownerConflictError("create a prompt", owner);
}
Expand Down Expand Up @@ -3795,7 +3816,7 @@

public async createCommit(
promptIdentifier: string,
object: any,

Check warning on line 3819 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

Unexpected any. Specify a different type
options?: {
parentCommitHash?: string;
}
Expand Down Expand Up @@ -4152,8 +4173,9 @@
* @returns A promise that resolves once all currently pending traces have sent.
*/
public awaitPendingTraceBatches() {
return Promise.all(
this.autoBatchQueue.items.map(({ itemPromise }) => itemPromise)
);
return Promise.all([
...this.autoBatchQueue.items.map(({ itemPromise }) => itemPromise),
this.batchIngestCaller.queue.onIdle(),
]);
}
}
2 changes: 1 addition & 1 deletion js/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ export { RunTree, type RunTreeConfig } from "./run_trees.js";
export { overrideFetchImplementation } from "./singletons/fetch.js";

// Update using yarn bump-version
export const __version__ = "0.1.67";
export const __version__ = "0.2.0";
41 changes: 41 additions & 0 deletions js/src/tests/batch_client.int.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
waitUntilProjectFound,
waitUntilRunFound,
} from "./utils.js";
import { traceable } from "../traceable.js";

test.concurrent(
"Test persist update run",
Expand Down Expand Up @@ -241,3 +242,43 @@ test.concurrent(
},
180_000
);

test.skip("very large runs", async () => {
const langchainClient = new Client({
autoBatchTracing: true,
timeout_ms: 120_000,
});

const projectName = "__test_large_runs" + uuidv4().substring(0, 4);
await deleteProject(langchainClient, projectName);

console.time("largeRunTimer");

const promises = [];
for (let i = 0; i < 10; i++) {
promises.push(
traceable(
async () => {
return "x".repeat(9000000);
},
{
project_name: projectName,
client: langchainClient,
tracingEnabled: true,
}
)()
);
}

await Promise.all(promises);

console.timeLog("largeRunTimer");

await langchainClient.awaitPendingTraceBatches();

console.timeLog("largeRunTimer");

await Promise.all([waitUntilProjectFound(langchainClient, projectName)]);

await langchainClient.deleteProject({ projectName });
}, 180_000);
7 changes: 4 additions & 3 deletions js/src/tests/batch_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ describe.each(ENDPOINT_TYPES)(
await new Promise((resolve) => setTimeout(resolve, 300));
});

it("Create + update batching should merge into a single call", async () => {
it.only("Create + update batching should merge into a single call", async () => {
const client = new Client({
apiKey: "test-api-key",
autoBatchTracing: true,
Expand Down Expand Up @@ -219,7 +219,7 @@ describe.each(ENDPOINT_TYPES)(
end_time: endTime,
});

await new Promise((resolve) => setTimeout(resolve, 100));
await client.awaitPendingTraceBatches();

const calledRequestParam: any = callSpy.mock.calls[0][2];
expect(await parseMockRequestBody(calledRequestParam?.body)).toEqual({
Expand Down Expand Up @@ -331,10 +331,11 @@ describe.each(ENDPOINT_TYPES)(
);
});

it("should immediately trigger a batch on root run end", async () => {
it("should immediately trigger a batch on root run end if blockOnRootRunFinalization is set", async () => {
const client = new Client({
apiKey: "test-api-key",
autoBatchTracing: true,
blockOnRootRunFinalization: true,
});
const callSpy = jest
.spyOn((client as any).batchIngestCaller, "call")
Expand Down
16 changes: 14 additions & 2 deletions js/src/tests/fetch.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable no-process-env */
/* eslint-disable @typescript-eslint/no-explicit-any */
import { jest } from "@jest/globals";
import { Client } from "../client.js";
Expand All @@ -14,14 +15,24 @@ describe.each([[""], ["mocked"]])("Client uses %s fetch", (description) => {
globalFetchMock = jest.fn(() =>
Promise.resolve({
ok: true,
json: () => Promise.resolve({}),
json: () =>
Promise.resolve({
batch_ingest_config: {
use_multipart_endpoint: true,
},
}),
text: () => Promise.resolve(""),
})
);
overriddenFetch = jest.fn(() =>
Promise.resolve({
ok: true,
json: () => Promise.resolve({}),
json: () =>
Promise.resolve({
batch_ingest_config: {
use_multipart_endpoint: true,
},
}),
text: () => Promise.resolve(""),
})
);
Expand Down Expand Up @@ -78,6 +89,7 @@ describe.each([[""], ["mocked"]])("Client uses %s fetch", (description) => {
});

test("basic traceable implementation", async () => {
process.env.LANGSMITH_TRACING_BACKGROUND = "false";
const llm = traceable(
async function* llm(input: string) {
const response = input.repeat(2).split("");
Expand Down
13 changes: 9 additions & 4 deletions js/src/tests/traceable.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ test("404s should only log, not throw an error", async () => {
overrideFetchImplementation(overriddenFetch);
const client = new Client({
apiUrl: "https://foobar.notreal",
autoBatchTracing: false,
});
const llm = traceable(
async function* llm(input: string) {
Expand Down Expand Up @@ -1111,8 +1112,12 @@ test("argsConfigPath", async () => {

test("traceable continues execution when client throws error", async () => {
const errorClient = {
createRun: jest.fn().mockRejectedValue(new Error("Client error") as never),
updateRun: jest.fn().mockRejectedValue(new Error("Client error") as never),
createRun: jest
.fn()
.mockRejectedValue(new Error("Expected test client error") as never),
updateRun: jest
.fn()
.mockRejectedValue(new Error("Expected test client error") as never),
};

const tracedFunction = traceable(
Expand Down Expand Up @@ -1214,7 +1219,7 @@ test("traceable with processInputs throwing error does not affect invocation", a
const { client, callSpy } = mockClient();

const processInputs = jest.fn((_inputs: Readonly<KVMap>) => {
throw new Error("processInputs error");
throw new Error("totally expected test processInputs error");
});

const func = traceable(
Expand Down Expand Up @@ -1250,7 +1255,7 @@ test("traceable with processOutputs throwing error does not affect invocation",
const { client, callSpy } = mockClient();

const processOutputs = jest.fn((_outputs: Readonly<KVMap>) => {
throw new Error("processOutputs error");
throw new Error("totally expected test processInputs error");
});

const func = traceable(
Expand Down
2 changes: 1 addition & 1 deletion js/src/utils/async_caller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export class AsyncCaller {

protected maxRetries: AsyncCallerParams["maxRetries"];

private queue: typeof import("p-queue")["default"]["prototype"];
queue: typeof import("p-queue")["default"]["prototype"];

private onFailedResponseHook?: ResponseCallback;

Expand Down
Loading
Loading