Skip to content

Commit 2398fc6

Browse files
W-A-Jamesnbbeeken
authored andcommitted
feat(NODE-6305): Add CSOT support to tailable cursors (#4218)
Co-authored-by: Neal Beeken <[email protected]>
1 parent 3980489 commit 2398fc6

File tree

11 files changed

+641
-32
lines changed

11 files changed

+641
-32
lines changed

src/cursor/abstract_cursor.ts

+40-11
Original file line numberDiff line numberDiff line change
@@ -209,12 +209,35 @@ export abstract class AbstractCursor<
209209
options.readPreference && options.readPreference instanceof ReadPreference
210210
? options.readPreference
211211
: ReadPreference.primary,
212-
...pluckBSONSerializeOptions(options)
212+
...pluckBSONSerializeOptions(options),
213+
timeoutMS: options.timeoutMS,
214+
tailable: options.tailable,
215+
awaitData: options.awaitData
213216
};
214-
this.cursorOptions.timeoutMS = options.timeoutMS;
215217
if (this.cursorOptions.timeoutMS != null) {
216-
if (options.tailable && this.cursorOptions.timeoutMode === CursorTimeoutMode.LIFETIME) {
217-
throw new MongoInvalidArgumentError("Cannot set tailable cursor's timeoutMode to LIFETIME");
218+
if (options.timeoutMode == null) {
219+
if (options.tailable) {
220+
this.cursorOptions.timeoutMode = CursorTimeoutMode.ITERATION;
221+
222+
if (options.awaitData) {
223+
if (
224+
options.maxAwaitTimeMS != null &&
225+
options.maxAwaitTimeMS >= this.cursorOptions.timeoutMS
226+
)
227+
throw new MongoInvalidArgumentError(
228+
'Cannot specify maxAwaitTimeMS >= timeoutMS for a tailable awaitData cursor'
229+
);
230+
}
231+
} else {
232+
this.cursorOptions.timeoutMode = CursorTimeoutMode.LIFETIME;
233+
}
234+
} else {
235+
if (options.tailable && this.cursorOptions.timeoutMode === CursorTimeoutMode.LIFETIME) {
236+
throw new MongoInvalidArgumentError(
237+
"Cannot set tailable cursor's timeoutMode to LIFETIME"
238+
);
239+
}
240+
this.cursorOptions.timeoutMode = options.timeoutMode;
218241
}
219242
this.cursorOptions.timeoutMode =
220243
options.timeoutMode ??
@@ -223,6 +246,8 @@ export abstract class AbstractCursor<
223246
if (options.timeoutMode != null)
224247
throw new MongoInvalidArgumentError('Cannot set timeoutMode without setting timeoutMS');
225248
}
249+
250+
// Set for initial command
226251
this.cursorOptions.omitMaxTimeMS =
227252
this.cursorOptions.timeoutMS != null &&
228253
((this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION &&
@@ -781,15 +806,17 @@ export abstract class AbstractCursor<
781806
'Unexpected null selectedServer. A cursor creating command should have set this'
782807
);
783808
}
809+
const getMoreOptions = {
810+
...this.cursorOptions,
811+
session: this.cursorSession,
812+
batchSize
813+
};
814+
784815
const getMoreOperation = new GetMoreOperation(
785816
this.cursorNamespace,
786817
this.cursorId,
787818
this.selectedServer,
788-
{
789-
...this.cursorOptions,
790-
session: this.cursorSession,
791-
batchSize
792-
}
819+
getMoreOptions
793820
);
794821

795822
return await executeOperation(this.cursorClient, getMoreOperation, this.timeoutContext);
@@ -814,6 +841,8 @@ export abstract class AbstractCursor<
814841
}
815842
try {
816843
const state = await this._initialize(this.cursorSession);
844+
// Set omitMaxTimeMS to the value needed for subsequent getMore calls
845+
this.cursorOptions.omitMaxTimeMS = this.cursorOptions.timeoutMS != null;
817846
const response = state.response;
818847
this.selectedServer = state.server;
819848
this.cursorId = response.id;
@@ -866,9 +895,9 @@ export abstract class AbstractCursor<
866895
} catch (error) {
867896
try {
868897
await this.cleanup(undefined, error);
869-
} catch (error) {
898+
} catch (cleanupError) {
870899
// `cleanupCursor` should never throw, squash and throw the original error
871-
squashError(error);
900+
squashError(cleanupError);
872901
}
873902
throw error;
874903
}

src/cursor/run_command_cursor.ts

+2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ export type RunCursorCommandOptions = {
2323
timeoutMS?: number;
2424
/** @internal */
2525
timeoutMode?: CursorTimeoutMode;
26+
tailable?: boolean;
27+
awaitData?: boolean;
2628
} & BSONSerializeOptions;
2729

2830
/** @public */

src/mongo_client.ts

+5
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,11 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
482482
return this.s.bsonOptions;
483483
}
484484

485+
/** @internal */
486+
get timeoutMS(): number | undefined {
487+
return this.options.timeoutMS;
488+
}
489+
485490
/**
486491
* Executes a client bulk write operation, available on server 8.0+.
487492
* @param models - The client bulk write models.

src/operations/create_collection.ts

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import { Aspect, defineAspects } from './operation';
1717
const ILLEGAL_COMMAND_FIELDS = new Set([
1818
'w',
1919
'wtimeout',
20+
'timeoutMS',
2021
'j',
2122
'fsync',
2223
'autoIndexId',

test/benchmarks/driverBench/common.js

+3-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ function loadSpecString(filePath) {
2424
}
2525

2626
function makeClient() {
27-
this.client = new MongoClient(process.env.MONGODB_URI || 'mongodb://127.0.0.1:27017');
27+
this.client = new MongoClient(process.env.MONGODB_URI || 'mongodb://127.0.0.1:27017', {
28+
timeoutMS: 0
29+
});
2830
}
2931

3032
function connectClient() {

test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts

+27-13
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ describe('CSOT spec prose tests', function () {
7777
beforeEach(async function () {
7878
await internalClient
7979
.db('db')
80-
.collection('coll')
80+
.collection('bulkWriteTest')
8181
.drop()
8282
.catch(() => null);
8383
await internalClient.db('admin').command(failpoint);
@@ -93,7 +93,7 @@ describe('CSOT spec prose tests', function () {
9393
const oneMBDocs = Array.from({ length: 50 }, (_, _id) => ({ _id, a }));
9494
const error = await client
9595
.db('db')
96-
.collection<{ _id: number; a: Uint8Array }>('coll')
96+
.collection<{ _id: number; a: Uint8Array }>('bulkWriteTest')
9797
.insertMany(oneMBDocs)
9898
.catch(error => error);
9999

@@ -265,6 +265,7 @@ describe('CSOT spec prose tests', function () {
265265
});
266266

267267
context('5. Blocking Iteration Methods', () => {
268+
const metadata = { requires: { mongodb: '>=4.4' } };
268269
/**
269270
* Tests in this section MUST only be run against server versions 4.4 and higher and only apply to drivers that have a
270271
* blocking method for cursor iteration that executes `getMore` commands in a loop until a document is available or an
@@ -276,7 +277,7 @@ describe('CSOT spec prose tests', function () {
276277
data: {
277278
failCommands: ['getMore'],
278279
blockConnection: true,
279-
blockTimeMS: 20
280+
blockTimeMS: 90
280281
}
281282
};
282283
let internalClient: MongoClient;
@@ -286,15 +287,25 @@ describe('CSOT spec prose tests', function () {
286287

287288
beforeEach(async function () {
288289
internalClient = this.configuration.newClient();
289-
await internalClient.db('db').dropCollection('coll');
290+
await internalClient
291+
.db('db')
292+
.collection('coll')
293+
.drop()
294+
.catch(() => null);
290295
// Creating capped collection to be able to create tailable find cursor
291296
const coll = await internalClient
292297
.db('db')
293298
.createCollection('coll', { capped: true, size: 1_000_000 });
294299
await coll.insertOne({ x: 1 });
295300
await internalClient.db().admin().command(failpoint);
296301

297-
client = this.configuration.newClient(undefined, { timeoutMS: 20, monitorCommands: true });
302+
client = this.configuration.newClient(undefined, {
303+
monitorCommands: true,
304+
timeoutMS: 100,
305+
minPoolSize: 20
306+
});
307+
await client.connect();
308+
298309
commandStarted = [];
299310
commandSucceeded = [];
300311

@@ -337,11 +348,11 @@ describe('CSOT spec prose tests', function () {
337348
* 1. Verify that a `find` command and two `getMore` commands were executed against the `db.coll` collection during the test.
338349
*/
339350

340-
it.skip('send correct number of finds and getMores', async function () {
351+
it('send correct number of finds and getMores', metadata, async function () {
341352
const cursor = client
342353
.db('db')
343354
.collection('coll')
344-
.find({}, { tailable: true, awaitData: true })
355+
.find({}, { tailable: true })
345356
.project({ _id: 0 });
346357
const doc = await cursor.next();
347358
expect(doc).to.deep.equal({ x: 1 });
@@ -358,7 +369,7 @@ describe('CSOT spec prose tests', function () {
358369
expect(commandStarted.filter(e => e.command.find != null)).to.have.lengthOf(1);
359370
// Expect 2 getMore
360371
expect(commandStarted.filter(e => e.command.getMore != null)).to.have.lengthOf(2);
361-
}).skipReason = 'TODO(NODE-6305)';
372+
});
362373
});
363374

364375
context('Change Streams', () => {
@@ -383,8 +394,11 @@ describe('CSOT spec prose tests', function () {
383394
* - Expect this to fail with a timeout error.
384395
* 1. Verify that an `aggregate` command and two `getMore` commands were executed against the `db.coll` collection during the test.
385396
*/
386-
it.skip('sends correct number of aggregate and getMores', async function () {
387-
const changeStream = client.db('db').collection('coll').watch();
397+
it.skip('sends correct number of aggregate and getMores', metadata, async function () {
398+
const changeStream = client
399+
.db('db')
400+
.collection('coll')
401+
.watch([], { timeoutMS: 20, maxAwaitTimeMS: 19 });
388402
const maybeError = await changeStream.next().then(
389403
() => null,
390404
e => e
@@ -397,9 +411,9 @@ describe('CSOT spec prose tests', function () {
397411
const getMores = commandStarted.filter(e => e.command.getMore != null).map(e => e.command);
398412
// Expect 1 aggregate
399413
expect(aggregates).to.have.lengthOf(1);
400-
// Expect 1 getMore
401-
expect(getMores).to.have.lengthOf(1);
402-
}).skipReason = 'TODO(NODE-6305)';
414+
// Expect 2 getMores
415+
expect(getMores).to.have.lengthOf(2);
416+
}).skipReason = 'TODO(NODE-6387)';
403417
});
404418
});
405419

test/integration/client-side-operations-timeout/client_side_operations_timeout.spec.test.ts

+6-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,12 @@ const skippedTests = {
2525
'Non-tailable cursor lifetime remaining timeoutMS applied to getMore if timeoutMode is unset':
2626
'TODO(DRIVERS-2965)',
2727
'maxTimeMS value in the command is less than timeoutMS':
28-
'TODO(DRIVERS-2970): see modified test in unified-csot-node-specs'
28+
'TODO(DRIVERS-2970): see modified test in unified-csot-node-specs',
29+
'Tailable cursor awaitData iteration timeoutMS is refreshed for getMore - failure':
30+
'TODO(DRIVERS-2965)',
31+
'Tailable cursor iteration timeoutMS is refreshed for getMore - failure': 'TODO(DRIVERS-2965)',
32+
'timeoutMS is refreshed for getMore - failure':
33+
'TODO(DRIVERS-2965): see modified test in unified-csot-node-specs' // Skipping for both tailable awaitData and tailable non-awaitData cursors
2934
};
3035

3136
describe('CSOT spec tests', function () {

0 commit comments

Comments
 (0)