Skip to content

Commit bd8a9f4

Browse files
W-A-Jamesdariakp
authored andcommitted
feat(NODE-6090): Implement CSOT logic for connection checkout and server selection
1 parent dc3fe95 commit bd8a9f4

20 files changed

+570
-178
lines changed

src/admin.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ export class Admin {
7878
new RunAdminCommandOperation(command, {
7979
...resolveBSONOptions(options),
8080
session: options?.session,
81-
readPreference: options?.readPreference
81+
readPreference: options?.readPreference,
82+
timeoutMS: options?.timeoutMS ?? this.s.db.timeoutMS
8283
})
8384
);
8485
}

src/cmap/connection.ts

+4
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
3535
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
3636
import { ServerType } from '../sdam/common';
3737
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
38+
import { type Timeout } from '../timeout';
3839
import {
3940
BufferPool,
4041
calculateDurationInMs,
@@ -99,6 +100,9 @@ export interface CommandOptions extends BSONSerializeOptions {
99100
writeConcern?: WriteConcern;
100101

101102
directConnection?: boolean;
103+
104+
/** @internal */
105+
timeout?: Timeout;
102106
}
103107

104108
/** @public */

src/cmap/connection_pool.ts

+38-15
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,14 @@ import {
2121
MongoInvalidArgumentError,
2222
MongoMissingCredentialsError,
2323
MongoNetworkError,
24+
MongoOperationTimeoutError,
2425
MongoRuntimeError,
2526
MongoServerError
2627
} from '../error';
2728
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
2829
import type { Server } from '../sdam/server';
2930
import { Timeout, TimeoutError } from '../timeout';
30-
import { type Callback, List, makeCounter, now, promiseWithResolvers } from '../utils';
31+
import { type Callback, csotMin, List, makeCounter, promiseWithResolvers } from '../utils';
3132
import { connect } from './connect';
3233
import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection';
3334
import {
@@ -102,7 +103,6 @@ export interface ConnectionPoolOptions extends Omit<ConnectionOptions, 'id' | 'g
102103
export interface WaitQueueMember {
103104
resolve: (conn: Connection) => void;
104105
reject: (err: AnyError) => void;
105-
timeout: Timeout;
106106
[kCancelled]?: boolean;
107107
checkoutTime: number;
108108
}
@@ -355,37 +355,57 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
355355
* will be held by the pool. This means that if a connection is checked out it MUST be checked back in or
356356
* explicitly destroyed by the new owner.
357357
*/
358-
async checkOut(): Promise<Connection> {
359-
const checkoutTime = now();
358+
async checkOut(options?: { timeout?: Timeout }): Promise<Connection> {
360359
this.emitAndLog(
361360
ConnectionPool.CONNECTION_CHECK_OUT_STARTED,
362361
new ConnectionCheckOutStartedEvent(this)
363362
);
364363

365364
const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;
365+
const serverSelectionTimeoutMS = this[kServer].topology.s.serverSelectionTimeoutMS;
366366

367367
const { promise, resolve, reject } = promiseWithResolvers<Connection>();
368368

369-
const timeout = Timeout.expires(waitQueueTimeoutMS);
369+
let timeout: Timeout | null = null;
370+
if (options?.timeout) {
371+
// CSOT enabled
372+
// Determine if we're using the timeout passed in or a new timeout
373+
if (options.timeout.duration > 0 || serverSelectionTimeoutMS > 0) {
374+
// This check determines whether or not Topology.selectServer used the configured
375+
// `timeoutMS` or `serverSelectionTimeoutMS` value for its timeout
376+
if (
377+
options.timeout.duration === serverSelectionTimeoutMS ||
378+
csotMin(options.timeout.duration, serverSelectionTimeoutMS) < serverSelectionTimeoutMS
379+
) {
380+
// server selection used `timeoutMS`, so we should use the existing timeout as the timeout
381+
// here
382+
timeout = options.timeout;
383+
} else {
384+
// server selection used `serverSelectionTimeoutMS`, so we construct a new timeout with
385+
// the time remaining to ensure that Topology.selectServer and ConnectionPool.checkOut
386+
// cumulatively don't spend more than `serverSelectionTimeoutMS` blocking
387+
timeout = Timeout.expires(serverSelectionTimeoutMS - options.timeout.timeElapsed);
388+
}
389+
}
390+
} else {
391+
timeout = Timeout.expires(waitQueueTimeoutMS);
392+
}
370393

371394
const waitQueueMember: WaitQueueMember = {
372395
resolve,
373-
reject,
374-
timeout,
375-
checkoutTime
396+
reject
376397
};
377398

378399
this[kWaitQueue].push(waitQueueMember);
379400
process.nextTick(() => this.processWaitQueue());
380401

381402
try {
382-
return await Promise.race([promise, waitQueueMember.timeout]);
403+
timeout?.throwIfExpired();
404+
return await (timeout ? Promise.race([promise, timeout]) : promise);
383405
} catch (error) {
384406
if (TimeoutError.is(error)) {
385407
waitQueueMember[kCancelled] = true;
386408

387-
waitQueueMember.timeout.clear();
388-
389409
this.emitAndLog(
390410
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
391411
new ConnectionCheckOutFailedEvent(this, 'timeout', waitQueueMember.checkoutTime)
@@ -396,9 +416,16 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
396416
: 'Timed out while checking out a connection from connection pool',
397417
this.address
398418
);
419+
if (options?.timeout) {
420+
throw new MongoOperationTimeoutError('Timed out during connection checkout', {
421+
cause: timeoutError
422+
});
423+
}
399424
throw timeoutError;
400425
}
401426
throw error;
427+
} finally {
428+
if (timeout !== options?.timeout) timeout?.clear();
402429
}
403430
}
404431

@@ -764,7 +791,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
764791
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
765792
new ConnectionCheckOutFailedEvent(this, reason, waitQueueMember.checkoutTime, error)
766793
);
767-
waitQueueMember.timeout.clear();
768794
this[kWaitQueue].shift();
769795
waitQueueMember.reject(error);
770796
continue;
@@ -785,7 +811,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
785811
ConnectionPool.CONNECTION_CHECKED_OUT,
786812
new ConnectionCheckedOutEvent(this, connection, waitQueueMember.checkoutTime)
787813
);
788-
waitQueueMember.timeout.clear();
789814

790815
this[kWaitQueue].shift();
791816
waitQueueMember.resolve(connection);
@@ -828,8 +853,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
828853
);
829854
waitQueueMember.resolve(connection);
830855
}
831-
832-
waitQueueMember.timeout.clear();
833856
}
834857
process.nextTick(() => this.processWaitQueue());
835858
});

src/collection.ts

+5
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,11 @@ export class Collection<TSchema extends Document = Document> {
262262
this.s.collectionHint = normalizeHintField(v);
263263
}
264264

265+
/** @internal */
266+
get timeoutMS(): number | undefined {
267+
return this.s.options.timeoutMS;
268+
}
269+
265270
/**
266271
* Inserts a single document into MongoDB. If documents passed in do not contain the **_id** field,
267272
* one will be added to each of the documents missing it by the driver, mutating the document. This behavior

src/db.ts

+6
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,11 @@ export class Db {
222222
return this.s.namespace.toString();
223223
}
224224

225+
/** @internal */
226+
get timeoutMS(): number | undefined {
227+
return this.s.options?.timeoutMS;
228+
}
229+
225230
/**
226231
* Create a new collection on a server with the specified options. Use this to create capped collections.
227232
* More information about command options available at https://www.mongodb.com/docs/manual/reference/command/create/
@@ -272,6 +277,7 @@ export class Db {
272277
this.client,
273278
new RunCommandOperation(this, command, {
274279
...resolveBSONOptions(options),
280+
timeoutMS: options?.timeoutMS,
275281
session: options?.session,
276282
readPreference: options?.readPreference
277283
})

src/error.ts

+9
Original file line numberDiff line numberDiff line change
@@ -857,6 +857,15 @@ export class MongoUnexpectedServerResponseError extends MongoRuntimeError {
857857
}
858858
}
859859

860+
/**
861+
* @internal
862+
*/
863+
export class MongoOperationTimeoutError extends MongoRuntimeError {
864+
override get name(): string {
865+
return 'MongoOperationTimeoutError';
866+
}
867+
}
868+
860869
/**
861870
* An error thrown when the user attempts to add options to a cursor that has already been
862871
* initialized

src/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ export {
6666
MongoNetworkTimeoutError,
6767
MongoNotConnectedError,
6868
MongoOIDCError,
69+
MongoOperationTimeoutError,
6970
MongoParseError,
7071
MongoRuntimeError,
7172
MongoServerClosedError,

src/operations/command.ts

+2
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ export interface OperationParent {
6565
writeConcern?: WriteConcern;
6666
readPreference?: ReadPreference;
6767
bsonOptions?: BSONSerializeOptions;
68+
timeoutMS?: number;
6869
}
6970

7071
/** @internal */
@@ -131,6 +132,7 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
131132
const options = {
132133
...this.options,
133134
...this.bsonOptions,
135+
timeout: this.timeout,
134136
readPreference: this.readPreference,
135137
session
136138
};

src/operations/find.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,8 @@ export class FindOperation extends CommandOperation<CursorResponse> {
123123
...this.options,
124124
...this.bsonOptions,
125125
documentsReturnedIn: 'firstBatch',
126-
session
126+
session,
127+
timeout: this.timeout
127128
},
128129
this.explain ? ExplainedCursorResponse : CursorResponse
129130
);

src/operations/operation.ts

+8
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '..
22
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
33
import type { Server } from '../sdam/server';
44
import type { ClientSession } from '../sessions';
5+
import { type Timeout } from '../timeout';
56
import type { MongoDBNamespace } from '../utils';
67

78
export const Aspect = {
@@ -57,6 +58,11 @@ export abstract class AbstractOperation<TResult = any> {
5758

5859
options: OperationOptions;
5960

61+
/** @internal */
62+
timeout?: Timeout;
63+
/** @internal */
64+
timeoutMS?: number;
65+
6066
[kSession]: ClientSession | undefined;
6167

6268
static aspects?: Set<symbol>;
@@ -74,6 +80,8 @@ export abstract class AbstractOperation<TResult = any> {
7480
this.options = options;
7581
this.bypassPinningCheck = !!options.bypassPinningCheck;
7682
this.trySecondaryWrite = false;
83+
84+
this.timeoutMS = options.timeoutMS;
7785
}
7886

7987
/** Must match the first key of the command object sent to the server.

src/operations/run_command.ts

+7-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ export type RunCommandOptions = {
1414
session?: ClientSession;
1515
/** The read preference */
1616
readPreference?: ReadPreferenceLike;
17+
/** @internal */
18+
timeoutMS?: number;
1719
} & BSONSerializeOptions;
1820

1921
/** @internal */
@@ -39,10 +41,12 @@ export class RunCommandOperation<T = Document> extends AbstractOperation<T> {
3941
{
4042
...this.options,
4143
readPreference: this.readPreference,
42-
session
44+
session,
45+
timeout: this.timeout
4346
},
4447
this.options.responseType
4548
);
49+
4650
return res;
4751
}
4852
}
@@ -68,7 +72,8 @@ export class RunAdminCommandOperation<T = Document> extends AbstractOperation<T>
6872
const res: TODO_NODE_3286 = await server.command(this.ns, this.command, {
6973
...this.options,
7074
readPreference: this.readPreference,
71-
session
75+
session,
76+
timeout: this.timeout
7277
});
7378
return res;
7479
}

src/sdam/server.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
311311
this.incrementOperationCount();
312312
if (conn == null) {
313313
try {
314-
conn = await this.pool.checkOut();
314+
conn = await this.pool.checkOut(options);
315315
if (this.loadBalanced && isPinnableCommand(cmd, session)) {
316316
session?.pin(conn);
317317
}
@@ -336,6 +336,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
336336
operationError.code === MONGODB_ERROR_CODES.Reauthenticate
337337
) {
338338
await this.pool.reauthenticate(conn);
339+
// TODO(NODE-5682): Implement CSOT support for socket read/write at the connection layer
339340
try {
340341
const res = await conn.command(ns, cmd, finalOptions, responseType);
341342
throwIfWriteConcernError(res);

0 commit comments

Comments
 (0)