Skip to content

Commit

Permalink
Rely on AsyncCaller
Browse files Browse the repository at this point in the history
  • Loading branch information
jacoblee93 committed Oct 24, 2024
1 parent fb234fe commit 1f0bcd1
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 31 deletions.
46 changes: 18 additions & 28 deletions js/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,6 @@ export class Client {

private _serverInfo: RecordStringAny | undefined;

private _autoBatchQueueIsDraining = false;

// eslint-disable-next-line @typescript-eslint/no-explicit-any
private _getServerInfoPromise?: Promise<Record<string, any>>;

Expand All @@ -500,7 +498,13 @@ export class Client {
}
this.timeout_ms = config.timeout_ms ?? 90_000;
this.caller = new AsyncCaller(config.callerOptions ?? {});
this.batchConcurrency = config.batchConcurrency ?? this.batchConcurrency;
if (this.batchConcurrency < 1) {
throw new Error("Batch concurrency must be positive.");
}
this.batchIngestCaller = new AsyncCaller({
maxRetries: 2,
maxConcurrency: this.batchConcurrency,
...(config.callerOptions ?? {}),
onFailedResponseHook: handle429,
});
Expand All @@ -515,10 +519,6 @@ export class Client {
config.blockOnRootRunFinalization ?? this.blockOnRootRunFinalization;
this.batchSizeBytesLimit = config.batchSizeBytesLimit;
this.fetchOptions = config.fetchOptions || {};
this.batchConcurrency = config.batchConcurrency ?? this.batchConcurrency;
if (this.batchConcurrency < 1) {
throw new Error("Batch concurrency must be positive.");
}
}

public static getDefaultClientConfig(): {
Expand Down Expand Up @@ -762,27 +762,16 @@ export class Client {
}

private async drainAutoBatchQueue() {
if (this._autoBatchQueueIsDraining) {
// Exit if already draining
return;
}
this._autoBatchQueueIsDraining = true;
try {
const batchSizeLimit = await this._getBatchSizeLimitBytes();
while (this.autoBatchQueue.items.length > 0) {
const promises = [];
for (let i = 0; i < this.batchConcurrency; i++) {
const [batch, done] = this.autoBatchQueue.pop(batchSizeLimit);
if (!batch.length) {
done();
break;
}
promises.push(this.processBatch(batch, done));
const batchSizeLimit = await this._getBatchSizeLimitBytes();
while (this.autoBatchQueue.items.length > 0) {
for (let i = 0; i < this.batchConcurrency; i++) {
const [batch, done] = this.autoBatchQueue.pop(batchSizeLimit);
if (!batch.length) {
done();
break;
}
await Promise.all(promises);
await this.processBatch(batch, done);
}
} finally {
this._autoBatchQueueIsDraining = false;
}
}

Expand Down Expand Up @@ -4185,8 +4174,9 @@ export class Client {
* @returns A promise that resolves once all currently pending traces have sent.
*/
public awaitPendingTraceBatches() {
return Promise.all(
this.autoBatchQueue.items.map(({ itemPromise }) => itemPromise)
);
return Promise.all([
...this.autoBatchQueue.items.map(({ itemPromise }) => itemPromise),
this.batchIngestCaller.queue.onIdle(),
]);
}
}
8 changes: 6 additions & 2 deletions js/src/tests/batch_client.int.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ test.skip("very large runs", async () => {
const projectName = "__test_large_runs" + uuidv4().substring(0, 4);
await deleteProject(langchainClient, projectName);

console.time("foo");
console.time("largeRunTimer");

const promises = [];
for (let i = 0; i < 10; i++) {
Expand All @@ -281,7 +281,11 @@ test.skip("very large runs", async () => {

await Promise.all(promises);

console.timeLog("foo");
console.timeLog("largeRunTimer");

await langchainClient.awaitPendingTraceBatches();

console.timeLog("largeRunTimer");

await Promise.all([waitUntilProjectFound(langchainClient, projectName)]);

Expand Down
2 changes: 1 addition & 1 deletion js/src/utils/async_caller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export class AsyncCaller {

protected maxRetries: AsyncCallerParams["maxRetries"];

private queue: typeof import("p-queue")["default"]["prototype"];
queue: typeof import("p-queue")["default"]["prototype"];

private onFailedResponseHook?: ResponseCallback;

Expand Down

0 comments on commit 1f0bcd1

Please sign in to comment.