Skip to content

Commit 365d63b

Browse files
baileympearsonW-A-James
authored andcommitted
feat(NODE-6403): add CSOT support to client bulk write (#4261)
Co-authored-by: Warren James <[email protected]>
1 parent fd8f3bd commit 365d63b

File tree

13 files changed

+536
-37
lines changed

13 files changed

+536
-37
lines changed

src/cmap/connection.ts

+2
Original file line numberDiff line numberDiff line change
@@ -721,6 +721,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
721721
throw new MongoOperationTimeoutError('Timed out at socket write');
722722
}
723723
throw error;
724+
} finally {
725+
timeout.clear();
724726
}
725727
}
726728
return await drainEvent;

src/cmap/wire_protocol/on_data.ts

+1
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ export function onData(
116116
emitter.off('data', eventHandler);
117117
emitter.off('error', errorHandler);
118118
finished = true;
119+
timeoutForSocketRead?.clear();
119120
const doneResult = { value: undefined, done: finished } as const;
120121

121122
for (const promise of unconsumedPromises) {

src/cursor/abstract_cursor.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ export abstract class AbstractCursor<
243243
options.timeoutMode ??
244244
(options.tailable ? CursorTimeoutMode.ITERATION : CursorTimeoutMode.LIFETIME);
245245
} else {
246-
if (options.timeoutMode != null)
246+
if (options.timeoutMode != null && options.timeoutContext == null)
247247
throw new MongoInvalidArgumentError('Cannot set timeoutMode without setting timeoutMS');
248248
}
249249

src/cursor/client_bulk_write_cursor.ts

+6-2
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ export class ClientBulkWriteCursor extends AbstractCursor {
3434
constructor(
3535
client: MongoClient,
3636
commandBuilder: ClientBulkWriteCommandBuilder,
37-
options: ClientBulkWriteOptions = {}
37+
options: ClientBulkWriteCursorOptions = {}
3838
) {
3939
super(client, new MongoDBNamespace('admin', '$cmd'), options);
4040

@@ -71,7 +71,11 @@ export class ClientBulkWriteCursor extends AbstractCursor {
7171
session
7272
});
7373

74-
const response = await executeOperation(this.client, clientBulkWriteOperation);
74+
const response = await executeOperation(
75+
this.client,
76+
clientBulkWriteOperation,
77+
this.timeoutContext
78+
);
7579
this.cursorResponse = response;
7680

7781
return { server: clientBulkWriteOperation.server, session, response };

src/operations/client_bulk_write/executor.ts

+14-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { type Document } from '../../bson';
2+
import { CursorTimeoutContext, CursorTimeoutMode } from '../../cursor/abstract_cursor';
23
import { ClientBulkWriteCursor } from '../../cursor/client_bulk_write_cursor';
34
import {
45
MongoClientBulkWriteError,
@@ -7,6 +8,8 @@ import {
78
MongoServerError
89
} from '../../error';
910
import { type MongoClient } from '../../mongo_client';
11+
import { TimeoutContext } from '../../timeout';
12+
import { resolveTimeoutOptions } from '../../utils';
1013
import { WriteConcern } from '../../write_concern';
1114
import { executeOperation } from '../execute_operation';
1215
import { ClientBulkWriteOperation } from './client_bulk_write';
@@ -86,17 +89,26 @@ export class ClientBulkWriteExecutor {
8689
pkFactory
8790
);
8891
// Unacknowledged writes need to execute all batches and return { ok: 1}
92+
const resolvedOptions = resolveTimeoutOptions(this.client, this.options);
93+
const context = TimeoutContext.create(resolvedOptions);
94+
8995
if (this.options.writeConcern?.w === 0) {
9096
while (commandBuilder.hasNextBatch()) {
9197
const operation = new ClientBulkWriteOperation(commandBuilder, this.options);
92-
await executeOperation(this.client, operation);
98+
await executeOperation(this.client, operation, context);
9399
}
94100
return ClientBulkWriteResultsMerger.unacknowledged();
95101
} else {
96102
const resultsMerger = new ClientBulkWriteResultsMerger(this.options);
97103
// For each command will will create and exhaust a cursor for the results.
98104
while (commandBuilder.hasNextBatch()) {
99-
const cursor = new ClientBulkWriteCursor(this.client, commandBuilder, this.options);
105+
const cursorContext = new CursorTimeoutContext(context, Symbol());
106+
const options = {
107+
...this.options,
108+
timeoutContext: cursorContext,
109+
...(resolvedOptions.timeoutMS != null && { timeoutMode: CursorTimeoutMode.LIFETIME })
110+
};
111+
const cursor = new ClientBulkWriteCursor(this.client, commandBuilder, options);
100112
try {
101113
await resultsMerger.merge(cursor);
102114
} catch (error) {

src/sdam/server.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ export type ServerEvents = {
106106
EventEmitterWithState;
107107

108108
/** @internal */
109-
export type ServerCommandOptions = Omit<CommandOptions, 'timeoutContext'> & {
109+
export type ServerCommandOptions = Omit<CommandOptions, 'timeoutContext' | 'socketTimeoutMS'> & {
110110
timeoutContext: TimeoutContext;
111111
};
112112

src/utils.ts

+13
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import { ServerType } from './sdam/common';
3636
import type { Server } from './sdam/server';
3737
import type { Topology } from './sdam/topology';
3838
import type { ClientSession } from './sessions';
39+
import { type TimeoutContextOptions } from './timeout';
3940
import { WriteConcern } from './write_concern';
4041

4142
/**
@@ -515,6 +516,18 @@ export function hasAtomicOperators(doc: Document | Document[]): boolean {
515516
return keys.length > 0 && keys[0][0] === '$';
516517
}
517518

519+
export function resolveTimeoutOptions<T extends Partial<TimeoutContextOptions>>(
520+
client: MongoClient,
521+
options: T
522+
): T &
523+
Pick<
524+
MongoClient['s']['options'],
525+
'timeoutMS' | 'serverSelectionTimeoutMS' | 'waitQueueTimeoutMS' | 'socketTimeoutMS'
526+
> {
527+
const { socketTimeoutMS, serverSelectionTimeoutMS, waitQueueTimeoutMS, timeoutMS } =
528+
client.s.options;
529+
return { socketTimeoutMS, serverSelectionTimeoutMS, waitQueueTimeoutMS, timeoutMS, ...options };
530+
}
518531
/**
519532
* Merge inherited properties from parent into options, prioritizing values from options,
520533
* then values from parent.

test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts

+9-20
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ import {
2121
promiseWithResolvers,
2222
squashError
2323
} from '../../mongodb';
24-
import { type FailPoint } from '../../tools/utils';
24+
import { type FailPoint, makeMultiBatchWrite } from '../../tools/utils';
25+
import { filterForCommands } from '../shared';
2526

2627
// TODO(NODE-5824): Implement CSOT prose tests
2728
describe('CSOT spec prose tests', function () {
@@ -1183,9 +1184,9 @@ describe('CSOT spec prose tests', function () {
11831184
});
11841185
});
11851186

1186-
describe.skip(
1187+
describe(
11871188
'11. Multi-batch bulkWrites',
1188-
{ requires: { mongodb: '>=8.0', serverless: 'forbid' } },
1189+
{ requires: { mongodb: '>=8.0', serverless: 'forbid', topology: 'single' } },
11891190
function () {
11901191
/**
11911192
* ### 11. Multi-batch bulkWrites
@@ -1245,9 +1246,6 @@ describe('CSOT spec prose tests', function () {
12451246
}
12461247
};
12471248

1248-
let maxBsonObjectSize: number;
1249-
let maxMessageSizeBytes: number;
1250-
12511249
beforeEach(async function () {
12521250
await internalClient
12531251
.db('db')
@@ -1256,29 +1254,20 @@ describe('CSOT spec prose tests', function () {
12561254
.catch(() => null);
12571255
await internalClient.db('admin').command(failpoint);
12581256

1259-
const hello = await internalClient.db('admin').command({ hello: 1 });
1260-
maxBsonObjectSize = hello.maxBsonObjectSize;
1261-
maxMessageSizeBytes = hello.maxMessageSizeBytes;
1262-
12631257
client = this.configuration.newClient({ timeoutMS: 2000, monitorCommands: true });
12641258
});
12651259

1266-
it.skip('performs two bulkWrites which fail to complete before 2000 ms', async function () {
1260+
it('performs two bulkWrites which fail to complete before 2000 ms', async function () {
12671261
const writes = [];
1268-
client.on('commandStarted', ev => writes.push(ev));
1262+
client.on('commandStarted', filterForCommands('bulkWrite', writes));
12691263

1270-
const length = maxMessageSizeBytes / maxBsonObjectSize + 1;
1271-
const models = Array.from({ length }, () => ({
1272-
namespace: 'db.coll',
1273-
name: 'insertOne' as const,
1274-
document: { a: 'b'.repeat(maxBsonObjectSize - 500) }
1275-
}));
1264+
const models = await makeMultiBatchWrite(this.configuration);
12761265

12771266
const error = await client.bulkWrite(models).catch(error => error);
12781267

12791268
expect(error, error.stack).to.be.instanceOf(MongoOperationTimeoutError);
1280-
expect(writes.map(ev => ev.commandName)).to.deep.equal(['bulkWrite', 'bulkWrite']);
1281-
}).skipReason = 'TODO(NODE-6403): client.bulkWrite is implemented in a follow up';
1269+
expect(writes).to.have.lengthOf(2);
1270+
});
12821271
}
12831272
);
12841273
});

test/integration/client-side-operations-timeout/node_csot.test.ts

+10-6
Original file line numberDiff line numberDiff line change
@@ -279,12 +279,16 @@ describe('CSOT driver tests', metadata, () => {
279279
.stub(Connection.prototype, 'readMany')
280280
.callsFake(async function* (...args) {
281281
const realIterator = readManyStub.wrappedMethod.call(this, ...args);
282-
const cmd = commandSpy.lastCall.args.at(1);
283-
if ('giveMeWriteErrors' in cmd) {
284-
await realIterator.next().catch(() => null); // dismiss response
285-
yield { parse: () => writeErrorsReply };
286-
} else {
287-
yield (await realIterator.next()).value;
282+
try {
283+
const cmd = commandSpy.lastCall.args.at(1);
284+
if ('giveMeWriteErrors' in cmd) {
285+
await realIterator.next().catch(() => null); // dismiss response
286+
yield { parse: () => writeErrorsReply };
287+
} else {
288+
yield (await realIterator.next()).value;
289+
}
290+
} finally {
291+
realIterator.return();
288292
}
289293
});
290294
});

test/integration/collection-management/collection_db_management.test.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { expect } from 'chai';
22

3-
import { Collection, type Db, type MongoClient } from '../../mongodb';
3+
import { Collection, type Db, type MongoClient, ObjectId } from '../../mongodb';
44

55
describe('Collection Management and Db Management', function () {
66
let client: MongoClient;
@@ -16,7 +16,7 @@ describe('Collection Management and Db Management', function () {
1616
});
1717

1818
it('returns a collection object after calling createCollection', async function () {
19-
const collection = await db.createCollection('collection');
19+
const collection = await db.createCollection(new ObjectId().toHexString());
2020
expect(collection).to.be.instanceOf(Collection);
2121
});
2222

0 commit comments

Comments
 (0)