Skip to content

Commit 131f6ed

Browse files
W-A-Jamesdariakp
authored andcommitted
feat(NODE-6304): add CSOT support for non-tailable cursors (#4195)
1 parent 5f1102f commit 131f6ed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+2000
-228
lines changed

src/cmap/connection.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ export interface CommandOptions extends BSONSerializeOptions {
9191
documentsReturnedIn?: string;
9292
noResponse?: boolean;
9393
omitReadPreference?: boolean;
94+
omitMaxTimeMS?: boolean;
9495

9596
// TODO(NODE-2802): Currently the CommandOptions take a property willRetryWrite which is a hint
9697
// from executeOperation that the txnNum should be applied to this command.
@@ -426,7 +427,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
426427
...options
427428
};
428429

429-
if (options.timeoutContext?.csotEnabled()) {
430+
if (!options.omitMaxTimeMS && options.timeoutContext?.csotEnabled()) {
430431
const { maxTimeMS } = options.timeoutContext;
431432
if (maxTimeMS > 0 && Number.isFinite(maxTimeMS)) cmd.maxTimeMS = maxTimeMS;
432433
}
@@ -626,7 +627,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
626627
for await (const document of this.sendCommand(ns, command, options, responseType)) {
627628
if (options.timeoutContext?.csotEnabled()) {
628629
if (MongoDBResponse.is(document)) {
629-
// TODO(NODE-5684): test coverage to be added once cursors are enabling CSOT
630630
if (document.isMaxTimeExpiredError) {
631631
throw new MongoOperationTimeoutError('Server reported a timeout error', {
632632
cause: new MongoServerError(document.toObject())

src/cmap/wire_protocol/on_data.ts

-1
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ export function onData(
9393

9494
const timeoutForSocketRead = timeoutContext?.timeoutForSocketRead;
9595
timeoutForSocketRead?.throwIfExpired();
96-
// eslint-disable-next-line github/no-then
9796
timeoutForSocketRead?.then(undefined, errorHandler);
9897

9998
return iterator;

src/collection.ts

+4-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import {
1111
type ListSearchIndexesOptions
1212
} from './cursor/list_search_indexes_cursor';
1313
import type { Db } from './db';
14-
import { MongoInvalidArgumentError } from './error';
14+
import { MongoInvalidArgumentError, MongoOperationTimeoutError } from './error';
1515
import type { MongoClient, PkFactory } from './mongo_client';
1616
import type {
1717
Filter,
@@ -678,7 +678,9 @@ export class Collection<TSchema extends Document = Document> {
678678
new DropIndexOperation(this as TODO_NODE_3286, '*', resolveOptions(this, options))
679679
);
680680
return true;
681-
} catch {
681+
} catch (error) {
682+
if (error instanceof MongoOperationTimeoutError) throw error; // TODO: Check the spec for index management behaviour/file a drivers ticket for this
683+
// Seems like we should throw all errors
682684
return false;
683685
}
684686
}

src/cursor/abstract_cursor.ts

+118-28
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
2121
import { type AsyncDisposable, configureResourceManagement } from '../resource_management';
2222
import type { Server } from '../sdam/server';
2323
import { ClientSession, maybeClearPinnedConnection } from '../sessions';
24+
import { TimeoutContext } from '../timeout';
2425
import { type MongoDBNamespace, squashError } from '../utils';
2526

2627
/**
@@ -60,6 +61,17 @@ export interface CursorStreamOptions {
6061
/** @public */
6162
export type CursorFlag = (typeof CURSOR_FLAGS)[number];
6263

64+
/** @public*/
65+
export const CursorTimeoutMode = Object.freeze({
66+
ITERATION: 'iteration',
67+
LIFETIME: 'cursorLifetime'
68+
} as const);
69+
70+
/** @public
71+
* TODO(NODE-5688): Document and release
72+
* */
73+
export type CursorTimeoutMode = (typeof CursorTimeoutMode)[keyof typeof CursorTimeoutMode];
74+
6375
/** @public */
6476
export interface AbstractCursorOptions extends BSONSerializeOptions {
6577
session?: ClientSession;
@@ -105,6 +117,8 @@ export interface AbstractCursorOptions extends BSONSerializeOptions {
105117
noCursorTimeout?: boolean;
106118
/** @internal TODO(NODE-5688): make this public */
107119
timeoutMS?: number;
120+
/** @internal TODO(NODE-5688): make this public */
121+
timeoutMode?: CursorTimeoutMode;
108122
}
109123

110124
/** @internal */
@@ -117,6 +131,8 @@ export type InternalAbstractCursorOptions = Omit<AbstractCursorOptions, 'readPre
117131
oplogReplay?: boolean;
118132
exhaust?: boolean;
119133
partial?: boolean;
134+
135+
omitMaxTimeMS?: boolean;
120136
};
121137

122138
/** @public */
@@ -154,6 +170,8 @@ export abstract class AbstractCursor<
154170
private isKilled: boolean;
155171
/** @internal */
156172
protected readonly cursorOptions: InternalAbstractCursorOptions;
173+
/** @internal */
174+
protected timeoutContext?: TimeoutContext;
157175

158176
/** @event */
159177
static readonly CLOSE = 'close' as const;
@@ -186,6 +204,30 @@ export abstract class AbstractCursor<
186204
...pluckBSONSerializeOptions(options)
187205
};
188206
this.cursorOptions.timeoutMS = options.timeoutMS;
207+
if (this.cursorOptions.timeoutMS != null) {
208+
if (options.timeoutMode == null) {
209+
if (options.tailable) {
210+
this.cursorOptions.timeoutMode = CursorTimeoutMode.ITERATION;
211+
} else {
212+
this.cursorOptions.timeoutMode = CursorTimeoutMode.LIFETIME;
213+
}
214+
} else {
215+
if (options.tailable && this.cursorOptions.timeoutMode === CursorTimeoutMode.LIFETIME) {
216+
throw new MongoInvalidArgumentError(
217+
"Cannot set tailable cursor's timeoutMode to LIFETIME"
218+
);
219+
}
220+
this.cursorOptions.timeoutMode = options.timeoutMode;
221+
}
222+
} else {
223+
if (options.timeoutMode != null)
224+
throw new MongoInvalidArgumentError('Cannot set timeoutMode without setting timeoutMS');
225+
}
226+
this.cursorOptions.omitMaxTimeMS =
227+
this.cursorOptions.timeoutMS != null &&
228+
((this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION &&
229+
!this.cursorOptions.tailable) ||
230+
(this.cursorOptions.tailable && !this.cursorOptions.awaitData));
189231

190232
const readConcern = ReadConcern.fromOptions(options);
191233
if (readConcern) {
@@ -400,12 +442,21 @@ export abstract class AbstractCursor<
400442
return false;
401443
}
402444

403-
do {
404-
if ((this.documents?.length ?? 0) !== 0) {
405-
return true;
445+
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
446+
this.timeoutContext?.refresh();
447+
}
448+
try {
449+
do {
450+
if ((this.documents?.length ?? 0) !== 0) {
451+
return true;
452+
}
453+
await this.fetchBatch();
454+
} while (!this.isDead || (this.documents?.length ?? 0) !== 0);
455+
} finally {
456+
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
457+
this.timeoutContext?.clear();
406458
}
407-
await this.fetchBatch();
408-
} while (!this.isDead || (this.documents?.length ?? 0) !== 0);
459+
}
409460

410461
return false;
411462
}
@@ -415,15 +466,24 @@ export abstract class AbstractCursor<
415466
if (this.cursorId === Long.ZERO) {
416467
throw new MongoCursorExhaustedError();
417468
}
469+
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
470+
this.timeoutContext?.refresh();
471+
}
418472

419-
do {
420-
const doc = this.documents?.shift(this.deserializationOptions);
421-
if (doc != null) {
422-
if (this.transform != null) return await this.transformDocument(doc);
423-
return doc;
473+
try {
474+
do {
475+
const doc = this.documents?.shift(this.deserializationOptions);
476+
if (doc != null) {
477+
if (this.transform != null) return await this.transformDocument(doc);
478+
return doc;
479+
}
480+
await this.fetchBatch();
481+
} while (!this.isDead || (this.documents?.length ?? 0) !== 0);
482+
} finally {
483+
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
484+
this.timeoutContext?.clear();
424485
}
425-
await this.fetchBatch();
426-
} while (!this.isDead || (this.documents?.length ?? 0) !== 0);
486+
}
427487

428488
return null;
429489
}
@@ -436,18 +496,27 @@ export abstract class AbstractCursor<
436496
throw new MongoCursorExhaustedError();
437497
}
438498

439-
let doc = this.documents?.shift(this.deserializationOptions);
440-
if (doc != null) {
441-
if (this.transform != null) return await this.transformDocument(doc);
442-
return doc;
499+
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
500+
this.timeoutContext?.refresh();
443501
}
502+
try {
503+
let doc = this.documents?.shift(this.deserializationOptions);
504+
if (doc != null) {
505+
if (this.transform != null) return await this.transformDocument(doc);
506+
return doc;
507+
}
444508

445-
await this.fetchBatch();
509+
await this.fetchBatch();
446510

447-
doc = this.documents?.shift(this.deserializationOptions);
448-
if (doc != null) {
449-
if (this.transform != null) return await this.transformDocument(doc);
450-
return doc;
511+
doc = this.documents?.shift(this.deserializationOptions);
512+
if (doc != null) {
513+
if (this.transform != null) return await this.transformDocument(doc);
514+
return doc;
515+
}
516+
} finally {
517+
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
518+
this.timeoutContext?.clear();
519+
}
451520
}
452521

453522
return null;
@@ -476,8 +545,8 @@ export abstract class AbstractCursor<
476545
/**
477546
* Frees any client-side resources used by the cursor.
478547
*/
479-
async close(): Promise<void> {
480-
await this.cleanup();
548+
async close(options?: { timeoutMS?: number }): Promise<void> {
549+
await this.cleanup(options?.timeoutMS);
481550
}
482551

483552
/**
@@ -658,6 +727,8 @@ export abstract class AbstractCursor<
658727

659728
this.cursorId = null;
660729
this.documents?.clear();
730+
this.timeoutContext?.clear();
731+
this.timeoutContext = undefined;
661732
this.isClosed = false;
662733
this.isKilled = false;
663734
this.initialized = false;
@@ -707,7 +778,7 @@ export abstract class AbstractCursor<
707778
}
708779
);
709780

710-
return await executeOperation(this.cursorClient, getMoreOperation);
781+
return await executeOperation(this.cursorClient, getMoreOperation, this.timeoutContext);
711782
}
712783

713784
/**
@@ -718,6 +789,12 @@ export abstract class AbstractCursor<
718789
* a significant refactor.
719790
*/
720791
private async cursorInit(): Promise<void> {
792+
if (this.cursorOptions.timeoutMS != null) {
793+
this.timeoutContext = TimeoutContext.create({
794+
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
795+
timeoutMS: this.cursorOptions.timeoutMS
796+
});
797+
}
721798
try {
722799
const state = await this._initialize(this.cursorSession);
723800
const response = state.response;
@@ -729,7 +806,7 @@ export abstract class AbstractCursor<
729806
} catch (error) {
730807
// the cursor is now initialized, even if an error occurred
731808
this.initialized = true;
732-
await this.cleanup(error);
809+
await this.cleanup(undefined, error);
733810
throw error;
734811
}
735812

@@ -763,14 +840,15 @@ export abstract class AbstractCursor<
763840

764841
// otherwise need to call getMore
765842
const batchSize = this.cursorOptions.batchSize || 1000;
843+
this.cursorOptions.omitMaxTimeMS = this.cursorOptions.timeoutMS != null;
766844

767845
try {
768846
const response = await this.getMore(batchSize);
769847
this.cursorId = response.id;
770848
this.documents = response;
771849
} catch (error) {
772850
try {
773-
await this.cleanup(error);
851+
await this.cleanup(undefined, error);
774852
} catch (error) {
775853
// `cleanupCursor` should never throw, squash and throw the original error
776854
squashError(error);
@@ -791,7 +869,7 @@ export abstract class AbstractCursor<
791869
}
792870

793871
/** @internal */
794-
private async cleanup(error?: Error) {
872+
private async cleanup(timeoutMS?: number, error?: Error) {
795873
this.isClosed = true;
796874
const session = this.cursorSession;
797875
try {
@@ -806,11 +884,23 @@ export abstract class AbstractCursor<
806884
this.isKilled = true;
807885
const cursorId = this.cursorId;
808886
this.cursorId = Long.ZERO;
887+
let timeoutContext: TimeoutContext | undefined;
888+
if (timeoutMS != null) {
889+
this.timeoutContext?.clear();
890+
timeoutContext = TimeoutContext.create({
891+
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
892+
timeoutMS
893+
});
894+
} else {
895+
this.timeoutContext?.refresh();
896+
timeoutContext = this.timeoutContext;
897+
}
809898
await executeOperation(
810899
this.cursorClient,
811900
new KillCursorsOperation(cursorId, this.cursorNamespace, this.selectedServer, {
812901
session
813-
})
902+
}),
903+
timeoutContext
814904
);
815905
}
816906
} catch (error) {

src/cursor/aggregation_cursor.ts

+19-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type { Document } from '../bson';
2+
import { MongoAPIError } from '../error';
23
import type { ExplainCommandOptions, ExplainVerbosityLike } from '../explain';
34
import type { MongoClient } from '../mongo_client';
45
import { AggregateOperation, type AggregateOptions } from '../operations/aggregate';
@@ -9,6 +10,7 @@ import { mergeOptions, type MongoDBNamespace } from '../utils';
910
import {
1011
AbstractCursor,
1112
type AbstractCursorOptions,
13+
CursorTimeoutMode,
1214
type InitialCursorResponse
1315
} from './abstract_cursor';
1416

@@ -38,6 +40,15 @@ export class AggregationCursor<TSchema = any> extends AbstractCursor<TSchema> {
3840

3941
this.pipeline = pipeline;
4042
this.aggregateOptions = options;
43+
44+
const lastStage: Document | undefined = this.pipeline[this.pipeline.length - 1];
45+
46+
if (
47+
this.cursorOptions.timeoutMS != null &&
48+
this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION &&
49+
(lastStage?.$merge != null || lastStage?.$out != null)
50+
)
51+
throw new MongoAPIError('Cannot use $out or $merge stage with ITERATION timeoutMode');
4152
}
4253

4354
clone(): AggregationCursor<TSchema> {
@@ -60,7 +71,7 @@ export class AggregationCursor<TSchema = any> extends AbstractCursor<TSchema> {
6071
session
6172
});
6273

63-
const response = await executeOperation(this.client, aggregateOperation);
74+
const response = await executeOperation(this.client, aggregateOperation, this.timeoutContext);
6475

6576
return { server: aggregateOperation.server, session, response };
6677
}
@@ -95,6 +106,13 @@ export class AggregationCursor<TSchema = any> extends AbstractCursor<TSchema> {
95106
addStage<T = Document>(stage: Document): AggregationCursor<T>;
96107
addStage<T = Document>(stage: Document): AggregationCursor<T> {
97108
this.throwIfInitialized();
109+
if (
110+
this.cursorOptions.timeoutMS != null &&
111+
this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION &&
112+
(stage.$out != null || stage.$merge != null)
113+
) {
114+
throw new MongoAPIError('Cannot use $out or $merge stage with ITERATION timeoutMode');
115+
}
98116
this.pipeline.push(stage);
99117
return this as unknown as AggregationCursor<T>;
100118
}

0 commit comments

Comments
 (0)