Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(js): Compress payloads above a certain size limit #1113

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 70 additions & 22 deletions js/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
hideOutputs?: boolean | ((outputs: KVMap) => KVMap);
autoBatchTracing?: boolean;
batchSizeBytesLimit?: number;
tracePayloadByteCompressionLimit?: number;
blockOnRootRunFinalization?: boolean;
traceBatchConcurrency?: number;
fetchOptions?: RequestInit;
Expand Down Expand Up @@ -364,6 +365,46 @@
return false;
};

const _compressPayload = async (
payload: string | Uint8Array,
contentType: string
) => {
const compressedPayloadStream = new Blob([payload])
.stream()
.pipeThrough(new CompressionStream("gzip"));
const reader = compressedPayloadStream.getReader();
const chunks = [];
let totalLength = 0;
// eslint-disable-next-line no-constant-condition
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(value);
totalLength += value.length;
}
return new Blob(chunks, {
type: `${contentType}; length=${totalLength}; encoding=gzip`,
});
};

const _preparePayload = async (
payload: any,

Check warning on line 391 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

Unexpected any. Specify a different type
contentType: string,
compressionThreshold: number
) => {
let finalPayload = payload;
// eslint-disable-next-line no-instanceof/no-instanceof
if (!(payload instanceof Uint8Array)) {
finalPayload = stringifyForTracing(payload);
}
if (finalPayload.length < compressionThreshold) {
return new Blob([finalPayload], {
type: `${contentType}; length=${finalPayload.length}`,
});
}
return _compressPayload(finalPayload, contentType);
};

export class AutoBatchQueue {
items: {
action: "create" | "update";
Expand Down Expand Up @@ -420,7 +461,7 @@
// If there is an item on the queue we were unable to pop,
// just return it as a single batch.
if (popped.length === 0 && this.items.length > 0) {
const item = this.items.shift()!;

Check warning on line 464 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

Forbidden non-null assertion
popped.push(item);
poppedSizeBytes += item.size;
this.sizeBytes -= item.size;
Expand All @@ -435,6 +476,8 @@
// 20 MB
export const DEFAULT_BATCH_SIZE_LIMIT_BYTES = 20_971_520;

const DEFAULT_MAX_UNCOMPRESSED_PAYLOAD_LIMIT = 10 * 1024;

const SERVER_INFO_REQUEST_TIMEOUT = 1000;

export class Client {
Expand Down Expand Up @@ -468,6 +511,9 @@

private autoBatchAggregationDelayMs = 250;

private tracePayloadByteCompressionLimit =
DEFAULT_MAX_UNCOMPRESSED_PAYLOAD_LIMIT;

private batchSizeBytesLimit?: number;

private fetchOptions: RequestInit;
Expand Down Expand Up @@ -520,6 +566,9 @@
this.blockOnRootRunFinalization =
config.blockOnRootRunFinalization ?? this.blockOnRootRunFinalization;
this.batchSizeBytesLimit = config.batchSizeBytesLimit;
this.tracePayloadByteCompressionLimit =
config.tracePayloadByteCompressionLimit ??
this.tracePayloadByteCompressionLimit;
this.fetchOptions = config.fetchOptions || {};
}

Expand Down Expand Up @@ -836,7 +885,7 @@
if (this._serverInfo === undefined) {
try {
this._serverInfo = await this._getServerInfo();
} catch (e) {

Check warning on line 888 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

'e' is defined but never used. Allowed unused args must match /^_/u
console.warn(
`[WARNING]: LangSmith failed to fetch info on supported operations. Falling back to single calls and default limits.`
);
Expand Down Expand Up @@ -1038,6 +1087,7 @@
delete preparedCreate.attachments;
preparedCreateParams.push(preparedCreate);
}

let preparedUpdateParams = [];
for (const update of runUpdates ?? []) {
preparedUpdateParams.push(this.prepareRunCreateOrUpdateInputs(update));
Expand Down Expand Up @@ -1109,24 +1159,26 @@
originalPayload;
const fields = { inputs, outputs, events };
// encode the main run payload
const stringifiedPayload = stringifyForTracing(payload);
accumulatedParts.push({
name: `${method}.${payload.id}`,
payload: new Blob([stringifiedPayload], {
type: `application/json; length=${stringifiedPayload.length}`, // encoding=gzip
}),
payload: await _preparePayload(
payload,
"application/json",
this.tracePayloadByteCompressionLimit
),
});
// encode the fields we collected
for (const [key, value] of Object.entries(fields)) {
if (value === undefined) {
continue;
}
const stringifiedValue = stringifyForTracing(value);
accumulatedParts.push({
name: `${method}.${payload.id}.${key}`,
payload: new Blob([stringifiedValue], {
type: `application/json; length=${stringifiedValue.length}`,
}),
payload: await _preparePayload(
value,
"application/json",
this.tracePayloadByteCompressionLimit
),
});
}
// encode the attachments
Expand All @@ -1147,9 +1199,11 @@
}
accumulatedParts.push({
name: `attachment.${payload.id}.${name}`,
payload: new Blob([content], {
type: `${contentType}; length=${content.byteLength}`,
}),
payload: await _preparePayload(
content,
contentType,
this.tracePayloadByteCompressionLimit
),
});
}
}
Expand All @@ -1170,8 +1224,7 @@
for (const part of parts) {
formData.append(part.name, part.payload);
}
// Log the form data
await this.batchIngestCaller.call(
const res = await this.batchIngestCaller.call(
_getFetchImplementation(),
`${this.apiUrl}/runs/multipart`,
{
Expand All @@ -1184,15 +1237,10 @@
...this.fetchOptions,
}
);
} catch (e) {
let errorMessage = "Failed to multipart ingest runs";
// eslint-disable-next-line no-instanceof/no-instanceof
if (e instanceof Error) {
errorMessage += `: ${e.stack || e.message}`;
} else {
errorMessage += `: ${String(e)}`;
}
console.warn(`${errorMessage.trim()}\n\nContext: ${context}`);
await raiseForStatus(res, "ingest multipart runs", true);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
} catch (e: any) {
console.warn(`${e.message.trim()}\n\nContext: ${context}`);
}
}

Expand Down Expand Up @@ -1552,7 +1600,7 @@
treeFilter?: string;
isRoot?: boolean;
dataSourceType?: string;
}): Promise<any> {

Check warning on line 1603 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

Unexpected any. Specify a different type
let projectIds_ = projectIds || [];
if (projectNames) {
projectIds_ = [
Expand Down Expand Up @@ -1840,7 +1888,7 @@
`Failed to list shared examples: ${response.status} ${response.statusText}`
);
}
return result.map((example: any) => ({

Check warning on line 1891 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

Unexpected any. Specify a different type
...example,
_hostUrl: this.getHostUrl(),
}));
Expand Down Expand Up @@ -1977,7 +2025,7 @@
}
// projectId querying
return true;
} catch (e) {

Check warning on line 2028 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

'e' is defined but never used. Allowed unused args must match /^_/u
return false;
}
}
Expand Down Expand Up @@ -3317,7 +3365,7 @@
async _logEvaluationFeedback(
evaluatorResponse: EvaluationResult | EvaluationResults,
run?: Run,
sourceInfo?: { [key: string]: any }

Check warning on line 3368 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

Unexpected any. Specify a different type
): Promise<[results: EvaluationResult[], feedbacks: Feedback[]]> {
const evalResults: Array<EvaluationResult> =
this._selectEvalResults(evaluatorResponse);
Expand Down Expand Up @@ -3356,7 +3404,7 @@
public async logEvaluationFeedback(
evaluatorResponse: EvaluationResult | EvaluationResults,
run?: Run,
sourceInfo?: { [key: string]: any }

Check warning on line 3407 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

Unexpected any. Specify a different type
): Promise<EvaluationResult[]> {
const [results] = await this._logEvaluationFeedback(
evaluatorResponse,
Expand Down Expand Up @@ -3806,7 +3854,7 @@

public async createCommit(
promptIdentifier: string,
object: any,

Check warning on line 3857 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

Unexpected any. Specify a different type
options?: {
parentCommitHash?: string;
}
Expand Down Expand Up @@ -3857,7 +3905,7 @@
isPublic?: boolean;
isArchived?: boolean;
}
): Promise<Record<string, any>> {

Check warning on line 3908 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

Unexpected any. Specify a different type
if (!(await this.promptExists(promptIdentifier))) {
throw new Error("Prompt does not exist, you must create it first.");
}
Expand Down
53 changes: 53 additions & 0 deletions js/src/tests/batch_client.int.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,59 @@ test.concurrent(
180_000
);

test("Test persist run with all items compressed", async () => {
const langchainClient = new Client({
autoBatchTracing: true,
callerOptions: { maxRetries: 2 },
timeout_ms: 30_000,
tracePayloadByteCompressionLimit: 1,
});
const projectName = "__test_compression" + uuidv4().substring(0, 4);
await deleteProject(langchainClient, projectName);

const runId = uuidv4();
const dottedOrder = convertToDottedOrderFormat(
new Date().getTime() / 1000,
runId
);
const pathname = path.join(
path.dirname(fileURLToPath(import.meta.url)),
"test_data",
"parrot-icon.png"
);
await langchainClient.createRun({
id: runId,
project_name: projectName,
name: "test_run",
run_type: "llm",
inputs: { text: "hello world" },
trace_id: runId,
dotted_order: dottedOrder,
attachments: {
testimage: ["image/png", new Uint8Array(fs.readFileSync(pathname))],
},
});

await new Promise((resolve) => setTimeout(resolve, 1000));

await langchainClient.updateRun(runId, {
outputs: { output: ["Hi"] },
dotted_order: dottedOrder,
trace_id: runId,
end_time: Math.floor(new Date().getTime() / 1000),
});

await Promise.all([
waitUntilRunFound(langchainClient, runId, true),
waitUntilProjectFound(langchainClient, projectName),
]);

const storedRun = await langchainClient.readRun(runId);
expect(storedRun.id).toEqual(runId);
expect(storedRun.status).toEqual("success");
// await langchainClient.deleteProject({ projectName });
}, 180_000);

test.skip("very large runs", async () => {
const langchainClient = new Client({
autoBatchTracing: true,
Expand Down
114 changes: 108 additions & 6 deletions js/src/tests/batch_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
/* eslint-disable prefer-const */
import { jest } from "@jest/globals";
import { v4 as uuidv4 } from "uuid";
import * as zlib from "node:zlib";
import { Client, mergeRuntimeEnvIntoRunCreate } from "../client.js";
import { convertToDottedOrderFormat } from "../run_trees.js";
import { _getFetchImplementation } from "../singletons/fetch.js";
Expand All @@ -24,7 +25,17 @@ const parseMockRequestBody = async (body: string | FormData) => {
try {
parsedValue = JSON.parse(text);
} catch (e) {
parsedValue = text;
try {
// Try decompression
const decompressed = zlib
.gunzipSync(Buffer.from(await value.arrayBuffer()))
.toString();
parsedValue = JSON.parse(decompressed);
} catch (e) {
console.log(e);
// Give up
parsedValue = text;
}
}
// if (method === "attachment") {
// for (const item of reconstructedBody.post) {
Expand Down Expand Up @@ -609,12 +620,20 @@ describe.each(ENDPOINT_TYPES)(

await new Promise((resolve) => setTimeout(resolve, 10));

const calledRequestParam: any = callSpy.mock.calls[0][2];
const calledRequestParam2: any = callSpy.mock.calls[1][2];
const calledRequestBody = await parseMockRequestBody(
(callSpy.mock.calls[0][2] as any).body
);
const calledRequestBody2: any = await parseMockRequestBody(
(callSpy.mock.calls[1][2] as any).body
);

// Queue should drain as soon as size limit is reached,
// sending both batches
expect(await parseMockRequestBody(calledRequestParam?.body)).toEqual({
expect(
calledRequestBody.post.length === 10
? calledRequestBody
: calledRequestBody2
).toEqual({
post: runIds.slice(0, 10).map((runId, i) =>
expect.objectContaining({
id: runId,
Expand All @@ -628,7 +647,11 @@ describe.each(ENDPOINT_TYPES)(
patch: [],
});

expect(await parseMockRequestBody(calledRequestParam2?.body)).toEqual({
expect(
calledRequestBody.post.length === 5
? calledRequestBody
: calledRequestBody2
).toEqual({
post: runIds.slice(10).map((runId, i) =>
expect.objectContaining({
id: runId,
Expand Down Expand Up @@ -784,7 +807,7 @@ describe.each(ENDPOINT_TYPES)(
dotted_order: dottedOrder,
});

await new Promise((resolve) => setTimeout(resolve, 300));
await client.awaitPendingTraceBatches();

const calledRequestParam: any = callSpy.mock.calls[0][2];
expect(
Expand Down Expand Up @@ -903,3 +926,82 @@ describe.each(ENDPOINT_TYPES)(
});
}
);

it("should compress fields above the compression limit", async () => {
const client = new Client({
apiKey: "test-api-key",
tracePayloadByteCompressionLimit: 1000,
autoBatchTracing: true,
});
const callSpy = jest
.spyOn((client as any).batchIngestCaller, "call")
.mockResolvedValue({
ok: true,
text: () => "",
});
jest.spyOn(client as any, "_getServerInfo").mockImplementation(() => {
return {
version: "foo",
batch_ingest_config: { use_multipart_endpoint: true },
};
});

const projectName = "__test_compression";

const runId = uuidv4();
const dottedOrder = convertToDottedOrderFormat(
new Date().getTime() / 1000,
runId
);

await client.createRun({
id: runId,
project_name: projectName,
name: "test_run",
run_type: "llm",
inputs: { text: "hello world!" },
trace_id: runId,
dotted_order: dottedOrder,
});

const runId2 = uuidv4();
const dottedOrder2 = convertToDottedOrderFormat(
new Date().getTime() / 1000,
runId
);

await client.createRun({
id: runId2,
project_name: projectName,
name: "test_run2",
run_type: "llm",
inputs: { text: `hello world!${"x".repeat(1000)}` },
trace_id: runId2,
dotted_order: dottedOrder2,
});

await client.awaitPendingTraceBatches();

const calledRequestParam: any = callSpy.mock.calls[0][2];
expect(await parseMockRequestBody(calledRequestParam?.body)).toEqual({
post: [
expect.objectContaining({
id: runId,
run_type: "llm",
inputs: {
text: "hello world!",
},
trace_id: runId,
}),
expect.objectContaining({
id: runId2,
run_type: "llm",
inputs: {
text: `hello world!${"x".repeat(1000)}`,
},
trace_id: runId2,
}),
],
patch: [],
});
});
Loading