Skip to content

Commit c5a9ae5

Browse files
nbbeekenbaileympearson
authored andcommitted
feat(NODE-6274): add CSOT support to bulkWrite (#4250)
Co-authored-by: Bailey Pearson <[email protected]>
1 parent 30c7df4 commit c5a9ae5

File tree

4 files changed

+159
-22
lines changed

4 files changed

+159
-22
lines changed

src/bulk/common.ts

+13-5
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,7 @@ export function mergeBatchResults(
501501

502502
async function executeCommands(
503503
bulkOperation: BulkOperationBase,
504-
options: BulkWriteOptions
504+
options: BulkWriteOptions & { timeoutContext?: TimeoutContext | null }
505505
): Promise<BulkWriteResult> {
506506
if (bulkOperation.s.batches.length === 0) {
507507
return new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered);
@@ -552,7 +552,11 @@ async function executeCommands(
552552
let thrownError = null;
553553
let result;
554554
try {
555-
result = await executeOperation(bulkOperation.s.collection.client, operation);
555+
result = await executeOperation(
556+
bulkOperation.s.collection.client,
557+
operation,
558+
finalOptions.timeoutContext
559+
);
556560
} catch (error) {
557561
thrownError = error;
558562
}
@@ -866,15 +870,19 @@ export class BulkWriteShimOperation extends AbstractOperation {
866870
return 'bulkWrite' as const;
867871
}
868872

869-
async execute(_server: Server, session: ClientSession | undefined): Promise<any> {
873+
async execute(
874+
_server: Server,
875+
session: ClientSession | undefined,
876+
timeoutContext: TimeoutContext
877+
): Promise<any> {
870878
if (this.options.session == null) {
871879
// An implicit session could have been created by 'executeOperation'
872880
// So if we stick it on finalOptions here, each bulk operation
873881
// will use this same session, it'll be passed in the same way
874882
// an explicit session would be
875883
this.options.session = session;
876884
}
877-
return await executeCommands(this.bulkOperation, this.options);
885+
return await executeCommands(this.bulkOperation, { ...this.options, timeoutContext });
878886
}
879887
}
880888

@@ -1203,7 +1211,7 @@ export abstract class BulkOperationBase {
12031211
const finalOptions = { ...this.s.options, ...options };
12041212
const operation = new BulkWriteShimOperation(this, finalOptions);
12051213

1206-
return await executeOperation(this.s.collection.client, operation);
1214+
return await executeOperation(this.s.collection.client, operation, finalOptions.timeoutContext);
12071215
}
12081216

12091217
/**

test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts

+141-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import * as sinon from 'sinon';
77
import { type CommandStartedEvent } from '../../../mongodb';
88
import {
99
type CommandSucceededEvent,
10+
MongoBulkWriteError,
1011
MongoClient,
1112
MongoOperationTimeoutError,
1213
MongoServerSelectionError,
@@ -28,7 +29,7 @@ describe('CSOT spec prose tests', function () {
2829
await client?.close();
2930
});
3031

31-
context.skip('1. Multi-batch writes', () => {
32+
describe('1. Multi-batch writes', { requires: { topology: 'single', mongodb: '>=4.4' } }, () => {
3233
/**
3334
* This test MUST only run against standalones on server versions 4.4 and higher.
3435
* The `insertMany` call takes an exceedingly long time on replicasets and sharded
@@ -55,6 +56,46 @@ describe('CSOT spec prose tests', function () {
5556
* - Expect this to fail with a timeout error.
5657
* 1. Verify that two `insert` commands were executed against `db.coll` as part of the `insertMany` call.
5758
*/
59+
60+
const failpoint: FailPoint = {
61+
configureFailPoint: 'failCommand',
62+
mode: {
63+
times: 2
64+
},
65+
data: {
66+
failCommands: ['insert'],
67+
blockConnection: true,
68+
blockTimeMS: 1010
69+
}
70+
};
71+
72+
beforeEach(async function () {
73+
await internalClient
74+
.db('db')
75+
.collection('coll')
76+
.drop()
77+
.catch(() => null);
78+
await internalClient.db('admin').command(failpoint);
79+
80+
client = this.configuration.newClient({ timeoutMS: 2000, monitorCommands: true });
81+
});
82+
83+
it('performs two inserts which fail to complete before 2000 ms', async () => {
84+
const inserts = [];
85+
client.on('commandStarted', ev => inserts.push(ev));
86+
87+
const a = new Uint8Array(1000000 - 22);
88+
const oneMBDocs = Array.from({ length: 50 }, (_, _id) => ({ _id, a }));
89+
const error = await client
90+
.db('db')
91+
.collection<{ _id: number; a: Uint8Array }>('coll')
92+
.insertMany(oneMBDocs)
93+
.catch(error => error);
94+
95+
expect(error).to.be.instanceOf(MongoBulkWriteError);
96+
expect(error.errorResponse).to.be.instanceOf(MongoOperationTimeoutError);
97+
expect(inserts.map(ev => ev.commandName)).to.deep.equal(['insert', 'insert']);
98+
});
5899
});
59100

60101
context.skip('2. maxTimeMS is not set for commands sent to mongocryptd', () => {
@@ -901,4 +942,103 @@ describe('CSOT spec prose tests', function () {
901942
});
902943
});
903944
});
945+
946+
describe.skip(
947+
'11. Multi-batch bulkWrites',
948+
{ requires: { mongodb: '>=8.0', serverless: 'forbid' } },
949+
function () {
950+
/**
951+
* ### 11. Multi-batch bulkWrites
952+
*
953+
* This test MUST only run against server versions 8.0+. This test must be skipped on Atlas Serverless.
954+
*
955+
* 1. Using `internalClient`, drop the `db.coll` collection.
956+
*
957+
* 2. Using `internalClient`, set the following fail point:
958+
*
959+
* @example
960+
* ```javascript
961+
* {
962+
* configureFailPoint: "failCommand",
963+
* mode: {
964+
* times: 2
965+
* },
966+
* data: {
967+
* failCommands: ["bulkWrite"],
968+
* blockConnection: true,
969+
* blockTimeMS: 1010
970+
* }
971+
* }
972+
* ```
973+
*
974+
* 3. Using `internalClient`, perform a `hello` command and record the `maxBsonObjectSize` and `maxMessageSizeBytes` values
975+
* in the response.
976+
*
977+
* 4. Create a new MongoClient (referred to as `client`) with `timeoutMS=2000`.
978+
*
979+
* 5. Create a list of write models (referred to as `models`) with the following write model repeated
980+
* (`maxMessageSizeBytes / maxBsonObjectSize + 1`) times:
981+
*
982+
* @example
983+
* ```json
984+
* InsertOne {
985+
* "namespace": "db.coll",
986+
* "document": { "a": "b".repeat(maxBsonObjectSize - 500) }
987+
* }
988+
* ```
989+
*
990+
* 6. Call `bulkWrite` on `client` with `models`.
991+
*
992+
* - Expect this to fail with a timeout error.
993+
*
994+
* 7. Verify that two `bulkWrite` commands were executed as part of the `MongoClient.bulkWrite` call.
995+
*/
996+
const failpoint: FailPoint = {
997+
configureFailPoint: 'failCommand',
998+
mode: {
999+
times: 2
1000+
},
1001+
data: {
1002+
failCommands: ['bulkWrite'],
1003+
blockConnection: true,
1004+
blockTimeMS: 1010
1005+
}
1006+
};
1007+
1008+
let maxBsonObjectSize: number;
1009+
let maxMessageSizeBytes: number;
1010+
1011+
beforeEach(async function () {
1012+
await internalClient
1013+
.db('db')
1014+
.collection('coll')
1015+
.drop()
1016+
.catch(() => null);
1017+
await internalClient.db('admin').command(failpoint);
1018+
1019+
const hello = await internalClient.db('admin').command({ hello: 1 });
1020+
maxBsonObjectSize = hello.maxBsonObjectSize;
1021+
maxMessageSizeBytes = hello.maxMessageSizeBytes;
1022+
1023+
client = this.configuration.newClient({ timeoutMS: 2000, monitorCommands: true });
1024+
});
1025+
1026+
it.skip('performs two bulkWrites which fail to complete before 2000 ms', async function () {
1027+
const writes = [];
1028+
client.on('commandStarted', ev => writes.push(ev));
1029+
1030+
const length = maxMessageSizeBytes / maxBsonObjectSize + 1;
1031+
const models = Array.from({ length }, () => ({
1032+
namespace: 'db.coll',
1033+
name: 'insertOne' as const,
1034+
document: { a: 'b'.repeat(maxBsonObjectSize - 500) }
1035+
}));
1036+
1037+
const error = await client.bulkWrite(models).catch(error => error);
1038+
1039+
expect(error, error.stack).to.be.instanceOf(MongoOperationTimeoutError);
1040+
expect(writes.map(ev => ev.commandName)).to.deep.equal(['bulkWrite', 'bulkWrite']);
1041+
}).skipReason = 'TODO(NODE-6403): client.bulkWrite is implemented in a follow up';
1042+
}
1043+
);
9041044
});

test/integration/client-side-operations-timeout/client_side_operations_timeout.spec.test.ts

-7
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import { loadSpecTests } from '../../spec';
55
import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner';
66

77
const skippedSpecs = {
8-
bulkWrite: 'TODO(NODE-6274)',
98
'change-streams': 'TODO(NODE-6035)',
109
'convenient-transactions': 'TODO(NODE-5687)',
1110
'deprecated-options': 'TODO(NODE-5689)',
@@ -19,18 +18,12 @@ const skippedSpecs = {
1918
};
2019

2120
const skippedTests = {
22-
'timeoutMS can be configured on a MongoClient - insertMany on collection': 'TODO(NODE-6274)',
23-
'timeoutMS can be configured on a MongoClient - bulkWrite on collection': 'TODO(NODE-6274)',
2421
'timeoutMS can be configured on a MongoClient - createChangeStream on client': 'TODO(NODE-6305)',
2522
'timeoutMS applies to whole operation, not individual attempts - createChangeStream on client':
2623
'TODO(NODE-6305)',
2724
'Tailable cursor iteration timeoutMS is refreshed for getMore - failure': 'TODO(NODE-6305)',
2825
'Tailable cursor awaitData iteration timeoutMS is refreshed for getMore - failure':
2926
'TODO(NODE-6305)',
30-
'timeoutMS applies to whole operation, not individual attempts - insertMany on collection':
31-
'TODO(NODE-6274)',
32-
'timeoutMS applies to whole operation, not individual attempts - bulkWrite on collection':
33-
'TODO(NODE-6274)',
3427
'command is not sent if RTT is greater than timeoutMS': 'TODO(DRIVERS-2965)',
3528
'Non=tailable cursor iteration timeoutMS is refreshed for getMore if timeoutMode is iteration - failure':
3629
'TODO(DRIVERS-2965)',

test/tools/unified-spec-runner/match.ts

+5-9
Original file line numberDiff line numberDiff line change
@@ -788,15 +788,11 @@ export function expectErrorCheck(
788788
if (expected.isTimeoutError === false) {
789789
expect(error).to.not.be.instanceof(MongoOperationTimeoutError);
790790
} else if (expected.isTimeoutError === true) {
791-
expect(error).to.be.instanceof(MongoOperationTimeoutError);
792-
}
793-
794-
// TODO(NODE-6274): Check for MongoBulkWriteErrors that have a MongoOperationTimeoutError in their
795-
// errorResponse field
796-
if (expected.isTimeoutError === false) {
797-
expect(error).to.not.be.instanceof(MongoOperationTimeoutError);
798-
} else if (expected.isTimeoutError === true) {
799-
expect(error).to.be.instanceof(MongoOperationTimeoutError);
791+
if ('errorResponse' in error) {
792+
expect(error.errorResponse).to.be.instanceof(MongoOperationTimeoutError);
793+
} else {
794+
expect(error).to.be.instanceof(MongoOperationTimeoutError);
795+
}
800796
}
801797

802798
if (expected.errorContains != null) {

0 commit comments

Comments
 (0)