Skip to content

Commit 4588ff2

Browse files
W-A-Jamesdariakp
authored andcommitted
feat(NODE-6387): Add CSOT support to change streams (#4256)
1 parent e86f11e commit 4588ff2

10 files changed

+676
-127
lines changed

src/change_stream.ts

+125-52
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@ import type { Readable } from 'stream';
33
import type { Binary, Document, Timestamp } from './bson';
44
import { Collection } from './collection';
55
import { CHANGE, CLOSE, END, ERROR, INIT, MORE, RESPONSE, RESUME_TOKEN_CHANGED } from './constants';
6-
import type { AbstractCursorEvents, CursorStreamOptions } from './cursor/abstract_cursor';
6+
import { type CursorStreamOptions, CursorTimeoutContext } from './cursor/abstract_cursor';
77
import { ChangeStreamCursor, type ChangeStreamCursorOptions } from './cursor/change_stream_cursor';
88
import { Db } from './db';
99
import {
1010
type AnyError,
1111
isResumableError,
1212
MongoAPIError,
1313
MongoChangeStreamError,
14+
MongoOperationTimeoutError,
1415
MongoRuntimeError
1516
} from './error';
1617
import { MongoClient } from './mongo_client';
@@ -20,6 +21,7 @@ import type { CollationOptions, OperationParent } from './operations/command';
2021
import type { ReadPreference } from './read_preference';
2122
import { type AsyncDisposable, configureResourceManagement } from './resource_management';
2223
import type { ServerSessionId } from './sessions';
24+
import { CSOTTimeoutContext, type TimeoutContext } from './timeout';
2325
import { filterOptions, getTopology, type MongoDBNamespace, squashError } from './utils';
2426

2527
/** @internal */
@@ -538,7 +540,12 @@ export type ChangeStreamEvents<
538540
end(): void;
539541
error(error: Error): void;
540542
change(change: TChange): void;
541-
} & AbstractCursorEvents;
543+
/**
544+
* @remarks Note that the `close` event is currently emitted whenever the internal `ChangeStreamCursor`
545+
* instance is closed, which can occur multiple times for a given `ChangeStream` instance.
546+
*/
547+
close(): void;
548+
};
542549

543550
/**
544551
* Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}.
@@ -609,6 +616,13 @@ export class ChangeStream<
609616
*/
610617
static readonly RESUME_TOKEN_CHANGED = RESUME_TOKEN_CHANGED;
611618

619+
private timeoutContext?: TimeoutContext;
620+
/**
621+
* Note that this property is here to uniquely identify a ChangeStream instance as the owner of
622+
* the {@link CursorTimeoutContext} instance (see {@link ChangeStream._createChangeStreamCursor}) to ensure
623+
* that {@link AbstractCursor.close} does not mutate the timeoutContext.
624+
*/
625+
private contextOwner: symbol;
612626
/**
613627
* @internal
614628
*
@@ -624,20 +638,25 @@ export class ChangeStream<
624638

625639
this.pipeline = pipeline;
626640
this.options = { ...options };
641+
let serverSelectionTimeoutMS: number;
627642
delete this.options.writeConcern;
628643

629644
if (parent instanceof Collection) {
630645
this.type = CHANGE_DOMAIN_TYPES.COLLECTION;
646+
serverSelectionTimeoutMS = parent.s.db.client.options.serverSelectionTimeoutMS;
631647
} else if (parent instanceof Db) {
632648
this.type = CHANGE_DOMAIN_TYPES.DATABASE;
649+
serverSelectionTimeoutMS = parent.client.options.serverSelectionTimeoutMS;
633650
} else if (parent instanceof MongoClient) {
634651
this.type = CHANGE_DOMAIN_TYPES.CLUSTER;
652+
serverSelectionTimeoutMS = parent.options.serverSelectionTimeoutMS;
635653
} else {
636654
throw new MongoChangeStreamError(
637655
'Parent provided to ChangeStream constructor must be an instance of Collection, Db, or MongoClient'
638656
);
639657
}
640658

659+
this.contextOwner = Symbol();
641660
this.parent = parent;
642661
this.namespace = parent.s.namespace;
643662
if (!this.options.readPreference && parent.readPreference) {
@@ -662,6 +681,13 @@ export class ChangeStream<
662681
this[kCursorStream]?.removeAllListeners('data');
663682
}
664683
});
684+
685+
if (this.options.timeoutMS != null) {
686+
this.timeoutContext = new CSOTTimeoutContext({
687+
timeoutMS: this.options.timeoutMS,
688+
serverSelectionTimeoutMS
689+
});
690+
}
665691
}
666692

667693
/** @internal */
@@ -681,22 +707,30 @@ export class ChangeStream<
681707
// This loop continues until either a change event is received or until a resume attempt
682708
// fails.
683709

684-
while (true) {
685-
try {
686-
const hasNext = await this.cursor.hasNext();
687-
return hasNext;
688-
} catch (error) {
710+
this.timeoutContext?.refresh();
711+
try {
712+
while (true) {
689713
try {
690-
await this._processErrorIteratorMode(error);
714+
const hasNext = await this.cursor.hasNext();
715+
return hasNext;
691716
} catch (error) {
692717
try {
693-
await this.close();
718+
await this._processErrorIteratorMode(error, this.cursor.id != null);
694719
} catch (error) {
695-
squashError(error);
720+
if (error instanceof MongoOperationTimeoutError && this.cursor.id == null) {
721+
throw error;
722+
}
723+
try {
724+
await this.close();
725+
} catch (error) {
726+
squashError(error);
727+
}
728+
throw error;
696729
}
697-
throw error;
698730
}
699731
}
732+
} finally {
733+
this.timeoutContext?.clear();
700734
}
701735
}
702736

@@ -706,24 +740,32 @@ export class ChangeStream<
706740
// Change streams must resume indefinitely while each resume event succeeds.
707741
// This loop continues until either a change event is received or until a resume attempt
708742
// fails.
743+
this.timeoutContext?.refresh();
709744

710-
while (true) {
711-
try {
712-
const change = await this.cursor.next();
713-
const processedChange = this._processChange(change ?? null);
714-
return processedChange;
715-
} catch (error) {
745+
try {
746+
while (true) {
716747
try {
717-
await this._processErrorIteratorMode(error);
748+
const change = await this.cursor.next();
749+
const processedChange = this._processChange(change ?? null);
750+
return processedChange;
718751
} catch (error) {
719752
try {
720-
await this.close();
753+
await this._processErrorIteratorMode(error, this.cursor.id != null);
721754
} catch (error) {
722-
squashError(error);
755+
if (error instanceof MongoOperationTimeoutError && this.cursor.id == null) {
756+
throw error;
757+
}
758+
try {
759+
await this.close();
760+
} catch (error) {
761+
squashError(error);
762+
}
763+
throw error;
723764
}
724-
throw error;
725765
}
726766
}
767+
} finally {
768+
this.timeoutContext?.clear();
727769
}
728770
}
729771

@@ -735,23 +777,29 @@ export class ChangeStream<
735777
// Change streams must resume indefinitely while each resume event succeeds.
736778
// This loop continues until either a change event is received or until a resume attempt
737779
// fails.
780+
this.timeoutContext?.refresh();
738781

739-
while (true) {
740-
try {
741-
const change = await this.cursor.tryNext();
742-
return change ?? null;
743-
} catch (error) {
782+
try {
783+
while (true) {
744784
try {
745-
await this._processErrorIteratorMode(error);
785+
const change = await this.cursor.tryNext();
786+
return change ?? null;
746787
} catch (error) {
747788
try {
748-
await this.close();
789+
await this._processErrorIteratorMode(error, this.cursor.id != null);
749790
} catch (error) {
750-
squashError(error);
791+
if (error instanceof MongoOperationTimeoutError && this.cursor.id == null) throw error;
792+
try {
793+
await this.close();
794+
} catch (error) {
795+
squashError(error);
796+
}
797+
throw error;
751798
}
752-
throw error;
753799
}
754800
}
801+
} finally {
802+
this.timeoutContext?.clear();
755803
}
756804
}
757805

@@ -784,6 +832,8 @@ export class ChangeStream<
784832
* Frees the internal resources used by the change stream.
785833
*/
786834
async close(): Promise<void> {
835+
this.timeoutContext?.clear();
836+
this.timeoutContext = undefined;
787837
this[kClosed] = true;
788838

789839
const cursor = this.cursor;
@@ -866,7 +916,12 @@ export class ChangeStream<
866916
client,
867917
this.namespace,
868918
pipeline,
869-
options
919+
{
920+
...options,
921+
timeoutContext: this.timeoutContext
922+
? new CursorTimeoutContext(this.timeoutContext, this.contextOwner)
923+
: undefined
924+
}
870925
);
871926

872927
for (const event of CHANGE_STREAM_EVENTS) {
@@ -899,8 +954,9 @@ export class ChangeStream<
899954
} catch (error) {
900955
this.emit(ChangeStream.ERROR, error);
901956
}
957+
this.timeoutContext?.refresh();
902958
});
903-
stream.on('error', error => this._processErrorStreamMode(error));
959+
stream.on('error', error => this._processErrorStreamMode(error, this.cursor.id != null));
904960
}
905961

906962
/** @internal */
@@ -942,24 +998,30 @@ export class ChangeStream<
942998
}
943999

9441000
/** @internal */
945-
private _processErrorStreamMode(changeStreamError: AnyError) {
1001+
private _processErrorStreamMode(changeStreamError: AnyError, cursorInitialized: boolean) {
9461002
// If the change stream has been closed explicitly, do not process error.
9471003
if (this[kClosed]) return;
9481004

949-
if (this.cursor.id != null && isResumableError(changeStreamError, this.cursor.maxWireVersion)) {
1005+
if (
1006+
cursorInitialized &&
1007+
(isResumableError(changeStreamError, this.cursor.maxWireVersion) ||
1008+
changeStreamError instanceof MongoOperationTimeoutError)
1009+
) {
9501010
this._endStream();
9511011

952-
this.cursor.close().then(undefined, squashError);
953-
954-
const topology = getTopology(this.parent);
955-
topology
956-
.selectServer(this.cursor.readPreference, {
957-
operationName: 'reconnect topology in change stream'
958-
})
959-
1012+
this.cursor
1013+
.close()
1014+
.then(
1015+
() => this._resume(changeStreamError),
1016+
e => {
1017+
squashError(e);
1018+
return this._resume(changeStreamError);
1019+
}
1020+
)
9601021
.then(
9611022
() => {
962-
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
1023+
if (changeStreamError instanceof MongoOperationTimeoutError)
1024+
this.emit(ChangeStream.ERROR, changeStreamError);
9631025
},
9641026
() => this._closeEmitterModeWithError(changeStreamError)
9651027
);
@@ -969,33 +1031,44 @@ export class ChangeStream<
9691031
}
9701032

9711033
/** @internal */
972-
private async _processErrorIteratorMode(changeStreamError: AnyError) {
1034+
private async _processErrorIteratorMode(changeStreamError: AnyError, cursorInitialized: boolean) {
9731035
if (this[kClosed]) {
9741036
// TODO(NODE-3485): Replace with MongoChangeStreamClosedError
9751037
throw new MongoAPIError(CHANGESTREAM_CLOSED_ERROR);
9761038
}
9771039

9781040
if (
979-
this.cursor.id == null ||
980-
!isResumableError(changeStreamError, this.cursor.maxWireVersion)
1041+
cursorInitialized &&
1042+
(isResumableError(changeStreamError, this.cursor.maxWireVersion) ||
1043+
changeStreamError instanceof MongoOperationTimeoutError)
9811044
) {
1045+
try {
1046+
await this.cursor.close();
1047+
} catch (error) {
1048+
squashError(error);
1049+
}
1050+
1051+
await this._resume(changeStreamError);
1052+
1053+
if (changeStreamError instanceof MongoOperationTimeoutError) throw changeStreamError;
1054+
} else {
9821055
try {
9831056
await this.close();
9841057
} catch (error) {
9851058
squashError(error);
9861059
}
1060+
9871061
throw changeStreamError;
9881062
}
1063+
}
9891064

990-
try {
991-
await this.cursor.close();
992-
} catch (error) {
993-
squashError(error);
994-
}
1065+
private async _resume(changeStreamError: AnyError) {
1066+
this.timeoutContext?.refresh();
9951067
const topology = getTopology(this.parent);
9961068
try {
9971069
await topology.selectServer(this.cursor.readPreference, {
998-
operationName: 'reconnect topology in change stream'
1070+
operationName: 'reconnect topology in change stream',
1071+
timeoutContext: this.timeoutContext
9991072
});
10001073
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
10011074
} catch {

src/cmap/connection.ts

+5-3
Original file line numberDiff line numberDiff line change
@@ -505,7 +505,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
505505
responseType?: MongoDBResponseConstructor
506506
) {
507507
const message = this.prepareCommand(ns.db, command, options);
508-
509508
let started = 0;
510509
if (this.shouldEmitAndLogCommand) {
511510
started = now();
@@ -717,8 +716,10 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
717716
try {
718717
return await Promise.race([drainEvent, timeout]);
719718
} catch (error) {
719+
let err = error;
720720
if (TimeoutError.is(error)) {
721-
throw new MongoOperationTimeoutError('Timed out at socket write');
721+
err = new MongoOperationTimeoutError('Timed out at socket write');
722+
this.cleanup(err);
722723
}
723724
throw error;
724725
} finally {
@@ -753,6 +754,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
753754
}
754755
}
755756
} catch (readError) {
757+
const err = readError;
756758
if (TimeoutError.is(readError)) {
757759
const error = new MongoOperationTimeoutError(
758760
`Timed out during socket read (${readError.duration}ms)`
@@ -761,7 +763,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
761763
this.onError(error);
762764
throw error;
763765
}
764-
throw readError;
766+
throw err;
765767
} finally {
766768
this.dataEvents = null;
767769
this.messageStream.pause();

0 commit comments

Comments
 (0)