Skip to content

Commit

Permalink
feat(NODE-6338): implement client bulk write error handling (#4262)
Browse files Browse the repository at this point in the history
  • Loading branch information
durran authored Oct 10, 2024
1 parent 27618ae commit 8def42d
Show file tree
Hide file tree
Showing 25 changed files with 919 additions and 236 deletions.
4 changes: 4 additions & 0 deletions src/cmap/wire_protocol/responses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -354,4 +354,8 @@ export class ClientBulkWriteCursorResponse extends CursorResponse {
get deletedCount() {
return this.get('nDeleted', BSONType.int, true);
}

get writeConcernError() {
return this.get('writeConcernError', BSONType.object, false);
}
}
10 changes: 2 additions & 8 deletions src/cursor/client_bulk_write_cursor.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { type Document } from 'bson';

import { type ClientBulkWriteCursorResponse } from '../cmap/wire_protocol/responses';
import { MongoClientBulkWriteCursorError } from '../error';
import type { MongoClient } from '../mongo_client';
import { ClientBulkWriteOperation } from '../operations/client_bulk_write/client_bulk_write';
import { type ClientBulkWriteCommandBuilder } from '../operations/client_bulk_write/command_builder';
Expand Down Expand Up @@ -48,16 +47,11 @@ export class ClientBulkWriteCursor extends AbstractCursor {
* We need a way to get the top level cursor response fields for
* generating the bulk write result, so we expose this here.
*/
get response(): ClientBulkWriteCursorResponse {
get response(): ClientBulkWriteCursorResponse | null {
if (this.cursorResponse) return this.cursorResponse;
throw new MongoClientBulkWriteCursorError(
'No client bulk write cursor response returned from the server.'
);
return null;
}

/**
* Get the last set of operations the cursor executed.
*/
get operations(): Document[] {
return this.commandBuilder.lastOperations;
}
Expand Down
46 changes: 44 additions & 2 deletions src/error.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import type { Document } from './bson';
import {
type ClientBulkWriteError,
type ClientBulkWriteResult
} from './operations/client_bulk_write/common';
import type { ServerType } from './sdam/common';
import type { TopologyVersion } from './sdam/server_description';
import type { TopologyDescription } from './sdam/topology_description';
Expand Down Expand Up @@ -616,6 +620,44 @@ export class MongoGCPError extends MongoOIDCError {
}
}

/**
* An error indicating that an error occurred when executing the bulk write.
*
* @public
* @category Error
*/
export class MongoClientBulkWriteError extends MongoServerError {
/**
* Write concern errors that occurred while executing the bulk write. This list may have
* multiple items if more than one server command was required to execute the bulk write.
*/
writeConcernErrors: Document[];
/**
* Errors that occurred during the execution of individual write operations. This map will
* contain at most one entry if the bulk write was ordered.
*/
writeErrors: Map<number, ClientBulkWriteError>;
/**
* The results of any successful operations that were performed before the error was
* encountered.
*/
partialResult?: ClientBulkWriteResult;

/**
* Initialize the client bulk write error.
* @param message - The error message.
*/
constructor(message: ErrorDescription) {
super(message);
this.writeConcernErrors = [];
this.writeErrors = new Map();
}

override get name(): string {
return 'MongoClientBulkWriteError';
}
}

/**
* An error indicating that an error occurred when processing bulk write results.
*
Expand Down Expand Up @@ -1047,8 +1089,8 @@ export class MongoInvalidArgumentError extends MongoAPIError {
*
* @public
**/
constructor(message: string) {
super(message);
constructor(message: string, options?: { cause?: Error }) {
super(message, options);
}

override get name(): string {
Expand Down
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ export {
MongoBatchReExecutionError,
MongoChangeStreamError,
MongoClientBulkWriteCursorError,
MongoClientBulkWriteError,
MongoClientBulkWriteExecutionError,
MongoCompatibilityError,
MongoCursorExhaustedError,
Expand Down Expand Up @@ -477,6 +478,7 @@ export type {
} from './operations/aggregate';
export type {
AnyClientBulkWriteModel,
ClientBulkWriteError,
ClientBulkWriteOptions,
ClientBulkWriteResult,
ClientDeleteManyModel,
Expand Down
5 changes: 5 additions & 0 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,11 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
models: AnyClientBulkWriteModel[],
options?: ClientBulkWriteOptions
): Promise<ClientBulkWriteResult | { ok: 1 }> {
if (this.autoEncrypter) {
throw new MongoInvalidArgumentError(
'MongoClient bulkWrite does not currently support automatic encryption.'
);
}
return await new ClientBulkWriteExecutor(this, models, options).execute();
}

Expand Down
46 changes: 36 additions & 10 deletions src/operations/client_bulk_write/client_bulk_write.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCu
this.ns = new MongoDBNamespace('admin', '$cmd');
}

override resetBatch(): boolean {
return this.commandBuilder.resetBatch();
}

override get canRetryWrite(): boolean {
return this.commandBuilder.isBatchRetryable;
}

/**
* Execute the command. Superclass will handle write concern, etc.
* @param server - The server.
Expand All @@ -41,14 +49,20 @@ export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCu

if (server.description.type === ServerType.LoadBalancer) {
if (session) {
// Checkout a connection to build the command.
const connection = await server.pool.checkOut();
// Pin the connection to the session so it get used to execute the command and we do not
// perform a double check-in/check-out.
session.pin(connection);
let connection;
if (!session.pinnedConnection) {
// Checkout a connection to build the command.
connection = await server.pool.checkOut();
// Pin the connection to the session so it get used to execute the command and we do not
// perform a double check-in/check-out.
session.pin(connection);
} else {
connection = session.pinnedConnection;
}
command = this.commandBuilder.buildBatch(
connection.hello?.maxMessageSizeBytes,
connection.hello?.maxWriteBatchSize
connection.hello?.maxWriteBatchSize,
connection.hello?.maxBsonObjectSize
);
} else {
throw new MongoClientBulkWriteExecutionError(
Expand All @@ -59,16 +73,26 @@ export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCu
// At this point we have a server and the auto connect code has already
// run in executeOperation, so the server description will be populated.
// We can use that to build the command.
if (!server.description.maxWriteBatchSize || !server.description.maxMessageSizeBytes) {
if (
!server.description.maxWriteBatchSize ||
!server.description.maxMessageSizeBytes ||
!server.description.maxBsonObjectSize
) {
throw new MongoClientBulkWriteExecutionError(
'In order to execute a client bulk write, both maxWriteBatchSize and maxMessageSizeBytes must be provided by the servers hello response.'
'In order to execute a client bulk write, both maxWriteBatchSize, maxMessageSizeBytes and maxBsonObjectSize must be provided by the servers hello response.'
);
}
command = this.commandBuilder.buildBatch(
server.description.maxMessageSizeBytes,
server.description.maxWriteBatchSize
server.description.maxWriteBatchSize,
server.description.maxBsonObjectSize
);
}

// Check after the batch is built if we cannot retry it and override the option.
if (!this.canRetryWrite) {
this.options.willRetryWrite = false;
}
return await super.executeCommand(server, session, command, ClientBulkWriteCursorResponse);
}
}
Expand All @@ -77,5 +101,7 @@ export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCu
defineAspects(ClientBulkWriteOperation, [
Aspect.WRITE_OPERATION,
Aspect.SKIP_COLLATION,
Aspect.CURSOR_CREATING
Aspect.CURSOR_CREATING,
Aspect.RETRYABLE,
Aspect.COMMAND_BATCHING
]);
89 changes: 84 additions & 5 deletions src/operations/client_bulk_write/command_builder.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { BSON, type Document } from '../../bson';
import { DocumentSequence } from '../../cmap/commands';
import { MongoAPIError, MongoInvalidArgumentError } from '../../error';
import { type PkFactory } from '../../mongo_client';
import type { Filter, OptionalId, UpdateFilter, WithoutId } from '../../mongo_types';
import { DEFAULT_PK_FACTORY } from '../../utils';
import { DEFAULT_PK_FACTORY, hasAtomicOperators } from '../../utils';
import { type CollationOptions } from '../command';
import { type Hint } from '../operation';
import type {
Expand Down Expand Up @@ -38,8 +39,14 @@ export class ClientBulkWriteCommandBuilder {
models: AnyClientBulkWriteModel[];
options: ClientBulkWriteOptions;
pkFactory: PkFactory;
/** The current index in the models array that is being processed. */
currentModelIndex: number;
/** The model index that the builder was on when it finished the previous batch. Used for resets when retrying. */
previousModelIndex: number;
/** The last array of operations that were created. Used by the results merger for indexing results. */
lastOperations: Document[];
/** Returns true if the current batch being created has no multi-updates. */
isBatchRetryable: boolean;

/**
* Create the command builder.
Expand All @@ -54,7 +61,9 @@ export class ClientBulkWriteCommandBuilder {
this.options = options;
this.pkFactory = pkFactory ?? DEFAULT_PK_FACTORY;
this.currentModelIndex = 0;
this.previousModelIndex = 0;
this.lastOperations = [];
this.isBatchRetryable = true;
}

/**
Expand All @@ -76,27 +85,57 @@ export class ClientBulkWriteCommandBuilder {
return this.currentModelIndex < this.models.length;
}

/**
* When we need to retry a command we need to set the current
* model index back to its previous value.
*/
resetBatch(): boolean {
this.currentModelIndex = this.previousModelIndex;
return true;
}

/**
* Build a single batch of a client bulk write command.
* @param maxMessageSizeBytes - The max message size in bytes.
* @param maxWriteBatchSize - The max write batch size.
* @returns The client bulk write command.
*/
buildBatch(maxMessageSizeBytes: number, maxWriteBatchSize: number): ClientBulkWriteCommand {
buildBatch(
maxMessageSizeBytes: number,
maxWriteBatchSize: number,
maxBsonObjectSize: number
): ClientBulkWriteCommand {
// We start by assuming the batch has no multi-updates, so it is retryable
// until we find them.
this.isBatchRetryable = true;
let commandLength = 0;
let currentNamespaceIndex = 0;
const command: ClientBulkWriteCommand = this.baseCommand();
const namespaces = new Map<string, number>();
// In the case of retries we need to mark where we started this batch.
this.previousModelIndex = this.currentModelIndex;

while (this.currentModelIndex < this.models.length) {
const model = this.models[this.currentModelIndex];
const ns = model.namespace;
const nsIndex = namespaces.get(ns);

// Multi updates are not retryable.
if (model.name === 'deleteMany' || model.name === 'updateMany') {
this.isBatchRetryable = false;
}

if (nsIndex != null) {
// Build the operation and serialize it to get the bytes buffer.
const operation = buildOperation(model, nsIndex, this.pkFactory);
const operationBuffer = BSON.serialize(operation);
let operationBuffer;
try {
operationBuffer = BSON.serialize(operation);
} catch (cause) {
throw new MongoInvalidArgumentError(`Could not serialize operation to BSON`, { cause });
}

validateBufferSize('ops', operationBuffer, maxBsonObjectSize);

// Check if the operation buffer can fit in the command. If it can,
// then add the operation to the document sequence and increment the
Expand All @@ -119,9 +158,18 @@ export class ClientBulkWriteCommandBuilder {
// construct our nsInfo and ops documents and buffers.
namespaces.set(ns, currentNamespaceIndex);
const nsInfo = { ns: ns };
const nsInfoBuffer = BSON.serialize(nsInfo);
const operation = buildOperation(model, currentNamespaceIndex, this.pkFactory);
const operationBuffer = BSON.serialize(operation);
let nsInfoBuffer;
let operationBuffer;
try {
nsInfoBuffer = BSON.serialize(nsInfo);
operationBuffer = BSON.serialize(operation);
} catch (cause) {
throw new MongoInvalidArgumentError(`Could not serialize ns info to BSON`, { cause });
}

validateBufferSize('nsInfo', nsInfoBuffer, maxBsonObjectSize);
validateBufferSize('ops', operationBuffer, maxBsonObjectSize);

// Check if the operation and nsInfo buffers can fit in the command. If they
// can, then add the operation and nsInfo to their respective document
Expand Down Expand Up @@ -179,6 +227,14 @@ export class ClientBulkWriteCommandBuilder {
}
}

function validateBufferSize(name: string, buffer: Uint8Array, maxBsonObjectSize: number) {
if (buffer.length > maxBsonObjectSize) {
throw new MongoInvalidArgumentError(
`Client bulk write operation ${name} of length ${buffer.length} exceeds the max bson object size of ${maxBsonObjectSize}`
);
}
}

/** @internal */
interface ClientInsertOperation {
insert: number;
Expand Down Expand Up @@ -293,6 +349,18 @@ export const buildUpdateManyOperation = (
return createUpdateOperation(model, index, true);
};

/**
* Validate the update document.
* @param update - The update document.
*/
function validateUpdate(update: Document) {
if (!hasAtomicOperators(update)) {
throw new MongoAPIError(
'Client bulk write update models must only contain atomic modifiers (start with $) and must not be empty.'
);
}
}

/**
* Creates a delete operation based on the parameters.
*/
Expand All @@ -301,6 +369,11 @@ function createUpdateOperation(
index: number,
multi: boolean
): ClientUpdateOperation {
// Update documents provided in UpdateOne and UpdateMany write models are
// required only to contain atomic modifiers (i.e. keys that start with "$").
// Drivers MUST throw an error if an update document is empty or if the
// document's first key does not start with "$".
validateUpdate(model.update);
const document: ClientUpdateOperation = {
update: index,
multi: multi,
Expand Down Expand Up @@ -343,6 +416,12 @@ export const buildReplaceOneOperation = (
model: ClientReplaceOneModel,
index: number
): ClientReplaceOneOperation => {
if (hasAtomicOperators(model.replacement)) {
throw new MongoAPIError(
'Client bulk write replace models must not contain atomic modifiers (start with $) and must not be empty.'
);
}

const document: ClientReplaceOneOperation = {
update: index,
multi: false,
Expand Down
6 changes: 6 additions & 0 deletions src/operations/client_bulk_write/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@ export interface ClientBulkWriteResult {
deleteResults?: Map<number, ClientDeleteResult>;
}

/** @public */
export interface ClientBulkWriteError {
code: number;
message: string;
}

/** @public */
export interface ClientInsertOneResult {
/**
Expand Down
Loading

0 comments on commit 8def42d

Please sign in to comment.