Skip to content

Commit e4e6a5e

Browse files
nbbeekenW-A-James
authored andcommitted
feat(NODE-5682): set maxTimeMS on commands and preempt I/O (#4174)
Co-authored-by: Warren James <[email protected]>
1 parent 195b3ad commit e4e6a5e

File tree

16 files changed

+200
-77
lines changed

16 files changed

+200
-77
lines changed

src/admin.ts

+4-1
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,10 @@ export class Admin {
155155
* @param options - Optional settings for the command
156156
*/
157157
async listDatabases(options?: ListDatabasesOptions): Promise<ListDatabasesResult> {
158-
return await executeOperation(this.s.db.client, new ListDatabasesOperation(this.s.db, options));
158+
return await executeOperation(
159+
this.s.db.client,
160+
new ListDatabasesOperation(this.s.db, { timeoutMS: this.s.db.timeoutMS, ...options })
161+
);
159162
}
160163

161164
/**

src/cmap/connection.ts

+58-8
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import {
2424
MongoMissingDependencyError,
2525
MongoNetworkError,
2626
MongoNetworkTimeoutError,
27+
MongoOperationTimeoutError,
2728
MongoParseError,
2829
MongoServerError,
2930
MongoUnexpectedServerResponseError
@@ -35,7 +36,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
3536
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
3637
import { ServerType } from '../sdam/common';
3738
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
38-
import { type TimeoutContext } from '../timeout';
39+
import { type TimeoutContext, TimeoutError } from '../timeout';
3940
import {
4041
BufferPool,
4142
calculateDurationInMs,
@@ -424,6 +425,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
424425
...options
425426
};
426427

428+
if (options.timeoutContext?.csotEnabled()) {
429+
const { maxTimeMS } = options.timeoutContext;
430+
if (maxTimeMS > 0 && Number.isFinite(maxTimeMS)) cmd.maxTimeMS = maxTimeMS;
431+
}
432+
427433
const message = this.supportsOpMsg
428434
? new OpMsgRequest(db, cmd, commandOptions)
429435
: new OpQueryRequest(db, cmd, commandOptions);
@@ -438,7 +444,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
438444
): AsyncGenerator<MongoDBResponse> {
439445
this.throwIfAborted();
440446

441-
if (typeof options.socketTimeoutMS === 'number') {
447+
if (options.timeoutContext?.csotEnabled()) {
448+
this.socket.setTimeout(0);
449+
} else if (typeof options.socketTimeoutMS === 'number') {
442450
this.socket.setTimeout(options.socketTimeoutMS);
443451
} else if (this.socketTimeoutMS !== 0) {
444452
this.socket.setTimeout(this.socketTimeoutMS);
@@ -447,7 +455,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
447455
try {
448456
await this.writeCommand(message, {
449457
agreedCompressor: this.description.compressor ?? 'none',
450-
zlibCompressionLevel: this.description.zlibCompressionLevel
458+
zlibCompressionLevel: this.description.zlibCompressionLevel,
459+
timeoutContext: options.timeoutContext
451460
});
452461

453462
if (options.noResponse || message.moreToCome) {
@@ -457,7 +466,17 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
457466

458467
this.throwIfAborted();
459468

460-
for await (const response of this.readMany()) {
469+
if (
470+
options.timeoutContext?.csotEnabled() &&
471+
options.timeoutContext.minRoundTripTime != null &&
472+
options.timeoutContext.remainingTimeMS < options.timeoutContext.minRoundTripTime
473+
) {
474+
throw new MongoOperationTimeoutError(
475+
'Server roundtrip time is greater than the time remaining'
476+
);
477+
}
478+
479+
for await (const response of this.readMany({ timeoutContext: options.timeoutContext })) {
461480
this.socket.setTimeout(0);
462481
const bson = response.parse();
463482

@@ -634,7 +653,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
634653
*/
635654
private async writeCommand(
636655
command: WriteProtocolMessageType,
637-
options: { agreedCompressor?: CompressorName; zlibCompressionLevel?: number }
656+
options: {
657+
agreedCompressor?: CompressorName;
658+
zlibCompressionLevel?: number;
659+
timeoutContext?: TimeoutContext;
660+
}
638661
): Promise<void> {
639662
const finalCommand =
640663
options.agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command)
@@ -646,8 +669,32 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
646669

647670
const buffer = Buffer.concat(await finalCommand.toBin());
648671

672+
if (options.timeoutContext?.csotEnabled()) {
673+
if (
674+
options.timeoutContext.minRoundTripTime != null &&
675+
options.timeoutContext.remainingTimeMS < options.timeoutContext.minRoundTripTime
676+
) {
677+
throw new MongoOperationTimeoutError(
678+
'Server roundtrip time is greater than the time remaining'
679+
);
680+
}
681+
}
682+
649683
if (this.socket.write(buffer)) return;
650-
return await once(this.socket, 'drain');
684+
685+
const drainEvent = once<void>(this.socket, 'drain');
686+
const timeout = options?.timeoutContext?.timeoutForSocketWrite;
687+
if (timeout) {
688+
try {
689+
return await Promise.race([drainEvent, timeout]);
690+
} catch (error) {
691+
if (TimeoutError.is(error)) {
692+
throw new MongoOperationTimeoutError('Timed out at socket write');
693+
}
694+
throw error;
695+
}
696+
}
697+
return await drainEvent;
651698
}
652699

653700
/**
@@ -659,10 +706,13 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
659706
*
660707
* Note that `for-await` loops call `return` automatically when the loop is exited.
661708
*/
662-
private async *readMany(): AsyncGenerator<OpMsgResponse | OpReply> {
709+
private async *readMany(options: {
710+
timeoutContext?: TimeoutContext;
711+
}): AsyncGenerator<OpMsgResponse | OpReply> {
663712
try {
664-
this.dataEvents = onData(this.messageStream);
713+
this.dataEvents = onData(this.messageStream, options);
665714
this.messageStream.resume();
715+
666716
for await (const message of this.dataEvents) {
667717
const response = await decompressResponse(message);
668718
yield response;

src/cmap/wire_protocol/on_data.ts

+14-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import { type EventEmitter } from 'events';
22

3+
import { MongoOperationTimeoutError } from '../../error';
4+
import { type TimeoutContext, TimeoutError } from '../../timeout';
35
import { List, promiseWithResolvers } from '../../utils';
46

57
/**
@@ -18,7 +20,10 @@ type PendingPromises = Omit<
1820
* Returns an AsyncIterator that iterates each 'data' event emitted from emitter.
1921
* It will reject upon an error event.
2022
*/
21-
export function onData(emitter: EventEmitter) {
23+
export function onData(
24+
emitter: EventEmitter,
25+
{ timeoutContext }: { timeoutContext?: TimeoutContext }
26+
) {
2227
// Setup pending events and pending promise lists
2328
/**
2429
* When the caller has not yet called .next(), we store the
@@ -86,6 +91,8 @@ export function onData(emitter: EventEmitter) {
8691
// Adding event handlers
8792
emitter.on('data', eventHandler);
8893
emitter.on('error', errorHandler);
94+
// eslint-disable-next-line github/no-then
95+
timeoutContext?.timeoutForSocketRead?.then(undefined, errorHandler);
8996

9097
return iterator;
9198

@@ -97,8 +104,12 @@ export function onData(emitter: EventEmitter) {
97104

98105
function errorHandler(err: Error) {
99106
const promise = unconsumedPromises.shift();
100-
if (promise != null) promise.reject(err);
101-
else error = err;
107+
const timeoutError = TimeoutError.is(err)
108+
? new MongoOperationTimeoutError('Timed out during socket read')
109+
: undefined;
110+
111+
if (promise != null) promise.reject(timeoutError ?? err);
112+
else error = timeoutError ?? err;
102113
void closeHandler();
103114
}
104115

src/db.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ export class Db {
277277
this.client,
278278
new RunCommandOperation(this, command, {
279279
...resolveBSONOptions(options),
280-
timeoutMS: options?.timeoutMS,
280+
timeoutMS: options?.timeoutMS ?? this.timeoutMS,
281281
session: options?.session,
282282
readPreference: options?.readPreference
283283
})

src/sdam/topology.ts

+10-7
Original file line numberDiff line numberDiff line change
@@ -460,29 +460,28 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
460460
}
461461
}
462462

463-
const timeoutMS = this.client.s.options.timeoutMS;
463+
// TODO(NODE-6223): auto connect cannot use timeoutMS
464+
// const timeoutMS = this.client.s.options.timeoutMS;
464465
const serverSelectionTimeoutMS = this.client.s.options.serverSelectionTimeoutMS;
465466
const readPreference = options.readPreference ?? ReadPreference.primary;
466-
467467
const timeoutContext = TimeoutContext.create({
468-
timeoutMS,
468+
timeoutMS: undefined,
469469
serverSelectionTimeoutMS,
470470
waitQueueTimeoutMS: this.client.s.options.waitQueueTimeoutMS
471471
});
472-
473472
const selectServerOptions = {
474473
operationName: 'ping',
475474
...options,
476475
timeoutContext
477476
};
477+
478478
try {
479479
const server = await this.selectServer(
480480
readPreferenceServerSelector(readPreference),
481481
selectServerOptions
482482
);
483-
484483
const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true;
485-
if (!skipPingOnConnect && server && this.s.credentials) {
484+
if (!skipPingOnConnect && this.s.credentials) {
486485
await server.command(ns('admin.$cmd'), { ping: 1 }, { timeoutContext });
487486
stateTransition(this, STATE_CONNECTED);
488487
this.emit(Topology.OPEN, this);
@@ -623,7 +622,11 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
623622

624623
try {
625624
timeout?.throwIfExpired();
626-
return await (timeout ? Promise.race([serverPromise, timeout]) : serverPromise);
625+
const server = await (timeout ? Promise.race([serverPromise, timeout]) : serverPromise);
626+
if (options.timeoutContext?.csotEnabled() && server.description.minRoundTripTime !== 0) {
627+
options.timeoutContext.minRoundTripTime = server.description.minRoundTripTime;
628+
}
629+
return server;
627630
} catch (error) {
628631
if (TimeoutError.is(error)) {
629632
// Timeout

src/timeout.ts

+36-7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { clearTimeout, setTimeout } from 'timers';
22

3-
import { MongoInvalidArgumentError, MongoRuntimeError } from './error';
3+
import { MongoInvalidArgumentError, MongoOperationTimeoutError, MongoRuntimeError } from './error';
44
import { csotMin, noop } from './utils';
55

66
/** @internal */
@@ -51,7 +51,7 @@ export class Timeout extends Promise<never> {
5151
}
5252

5353
/** Create a new timeout that expires in `duration` ms */
54-
private constructor(executor: Executor = () => null, duration: number, unref = false) {
54+
private constructor(executor: Executor = () => null, duration: number, unref = true) {
5555
let reject!: Reject;
5656

5757
if (duration < 0) {
@@ -163,6 +163,10 @@ export abstract class TimeoutContext {
163163

164164
abstract get clearConnectionCheckoutTimeout(): boolean;
165165

166+
abstract get timeoutForSocketWrite(): Timeout | null;
167+
168+
abstract get timeoutForSocketRead(): Timeout | null;
169+
166170
abstract csotEnabled(): this is CSOTTimeoutContext;
167171
}
168172

@@ -175,13 +179,15 @@ export class CSOTTimeoutContext extends TimeoutContext {
175179
clearConnectionCheckoutTimeout: boolean;
176180
clearServerSelectionTimeout: boolean;
177181

178-
private _maxTimeMS?: number;
179-
180182
private _serverSelectionTimeout?: Timeout | null;
181183
private _connectionCheckoutTimeout?: Timeout | null;
184+
public minRoundTripTime = 0;
185+
private start: number;
182186

183187
constructor(options: CSOTTimeoutContextOptions) {
184188
super();
189+
this.start = Math.trunc(performance.now());
190+
185191
this.timeoutMS = options.timeoutMS;
186192

187193
this.serverSelectionTimeoutMS = options.serverSelectionTimeoutMS;
@@ -193,11 +199,12 @@ export class CSOTTimeoutContext extends TimeoutContext {
193199
}
194200

195201
get maxTimeMS(): number {
196-
return this._maxTimeMS ?? -1;
202+
return this.remainingTimeMS - this.minRoundTripTime;
197203
}
198204

199-
set maxTimeMS(v: number) {
200-
this._maxTimeMS = v;
205+
get remainingTimeMS() {
206+
const timePassed = Math.trunc(performance.now()) - this.start;
207+
return this.timeoutMS <= 0 ? Infinity : this.timeoutMS - timePassed;
201208
}
202209

203210
csotEnabled(): this is CSOTTimeoutContext {
@@ -238,6 +245,20 @@ export class CSOTTimeoutContext extends TimeoutContext {
238245
}
239246
return this._connectionCheckoutTimeout;
240247
}
248+
249+
get timeoutForSocketWrite(): Timeout | null {
250+
const { remainingTimeMS } = this;
251+
if (!Number.isFinite(remainingTimeMS)) return null;
252+
if (remainingTimeMS > 0) return Timeout.expires(remainingTimeMS);
253+
throw new MongoOperationTimeoutError('Timed out before socket write');
254+
}
255+
256+
get timeoutForSocketRead(): Timeout | null {
257+
const { remainingTimeMS } = this;
258+
if (!Number.isFinite(remainingTimeMS)) return null;
259+
if (remainingTimeMS > 0) return Timeout.expires(remainingTimeMS);
260+
throw new MongoOperationTimeoutError('Timed out before socket read');
261+
}
241262
}
242263

243264
/** @internal */
@@ -268,4 +289,12 @@ export class LegacyTimeoutContext extends TimeoutContext {
268289
return Timeout.expires(this.options.waitQueueTimeoutMS);
269290
return null;
270291
}
292+
293+
get timeoutForSocketWrite(): Timeout | null {
294+
return null;
295+
}
296+
297+
get timeoutForSocketRead(): Timeout | null {
298+
return null;
299+
}
271300
}

test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts

+12-8
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ describe('CSOT spec prose tests', function () {
384384
clock.restore();
385385
});
386386

387-
it('serverSelectionTimeoutMS honored if timeoutMS is not set', async function () {
387+
it.skip('serverSelectionTimeoutMS honored if timeoutMS is not set', async function () {
388388
/**
389389
* 1. Create a MongoClient (referred to as `client`) with URI `mongodb://invalid/?serverSelectionTimeoutMS=10`.
390390
* 1. Using `client`, execute the command `{ ping: 1 }` against the `admin` database.
@@ -416,10 +416,11 @@ describe('CSOT spec prose tests', function () {
416416

417417
await clock.tickAsync(11);
418418
expect(await maybeError).to.be.instanceof(MongoServerSelectionError);
419-
});
419+
}).skipReason =
420+
'TODO(NODE-6223): Auto connect performs extra server selection. Explicit connect throws on invalid host name';
420421
});
421422

422-
it("timeoutMS honored for server selection if it's lower than serverSelectionTimeoutMS", async function () {
423+
it.skip("timeoutMS honored for server selection if it's lower than serverSelectionTimeoutMS", async function () {
423424
/**
424425
* 1. Create a MongoClient (referred to as `client`) with URI `mongodb://invalid/?timeoutMS=10&serverSelectionTimeoutMS=20`.
425426
* 1. Using `client`, run the command `{ ping: 1 }` against the `admin` database.
@@ -440,9 +441,10 @@ describe('CSOT spec prose tests', function () {
440441

441442
expect(maybeError).to.be.instanceof(MongoOperationTimeoutError);
442443
expect(end - start).to.be.lte(15);
443-
});
444+
}).skipReason =
445+
'TODO(NODE-6223): Auto connect performs extra server selection. Explicit connect throws on invalid host name';
444446

445-
it("serverSelectionTimeoutMS honored for server selection if it's lower than timeoutMS", async function () {
447+
it.skip("serverSelectionTimeoutMS honored for server selection if it's lower than timeoutMS", async function () {
446448
/**
447449
* 1. Create a MongoClient (referred to as `client`) with URI `mongodb://invalid/?timeoutMS=20&serverSelectionTimeoutMS=10`.
448450
* 1. Using `client`, run the command `{ ping: 1 }` against the `admin` database.
@@ -462,9 +464,10 @@ describe('CSOT spec prose tests', function () {
462464

463465
expect(maybeError).to.be.instanceof(MongoOperationTimeoutError);
464466
expect(end - start).to.be.lte(15);
465-
});
467+
}).skipReason =
468+
'TODO(NODE-6223): Auto connect performs extra server selection. Explicit connect throws on invalid host name';
466469

467-
it('serverSelectionTimeoutMS honored for server selection if timeoutMS=0', async function () {
470+
it.skip('serverSelectionTimeoutMS honored for server selection if timeoutMS=0', async function () {
468471
/**
469472
* 1. Create a MongoClient (referred to as `client`) with URI `mongodb://invalid/?timeoutMS=0&serverSelectionTimeoutMS=10`.
470473
* 1. Using `client`, run the command `{ ping: 1 }` against the `admin` database.
@@ -484,7 +487,8 @@ describe('CSOT spec prose tests', function () {
484487

485488
expect(maybeError).to.be.instanceof(MongoOperationTimeoutError);
486489
expect(end - start).to.be.lte(15);
487-
});
490+
}).skipReason =
491+
'TODO(NODE-6223): Auto connect performs extra server selection. Explicit connect throws on invalid host name';
488492

489493
it.skip("timeoutMS honored for connection handshake commands if it's lower than serverSelectionTimeoutMS", async function () {
490494
/**

0 commit comments

Comments
 (0)