Skip to content

Commit 654069f

Browse files
authored
feat(NODE-6633): MongoClient.close closes active cursors (#4372)
1 parent 73def18 commit 654069f

File tree

7 files changed

+193
-25
lines changed

7 files changed

+193
-25
lines changed

src/cursor/abstract_cursor.ts

+31-18
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ export interface CursorStreamOptions {
6767
/** @public */
6868
export type CursorFlag = (typeof CURSOR_FLAGS)[number];
6969

70+
function removeActiveCursor(this: AbstractCursor) {
71+
this.client.s.activeCursors.delete(this);
72+
}
73+
7074
/**
7175
* @public
7276
* @experimental
@@ -365,6 +369,7 @@ export abstract class AbstractCursor<
365369
this.signal,
366370
() => void this.close().then(undefined, squashError)
367371
);
372+
this.trackCursor();
368373
}
369374

370375
/**
@@ -444,6 +449,14 @@ export abstract class AbstractCursor<
444449
await this.close();
445450
}
446451

452+
/** Adds cursor to client's tracking so it will be closed by MongoClient.close() */
453+
private trackCursor() {
454+
this.cursorClient.s.activeCursors.add(this);
455+
if (!this.listeners('close').includes(removeActiveCursor)) {
456+
this.once('close', removeActiveCursor);
457+
}
458+
}
459+
447460
/** Returns current buffered documents length */
448461
bufferedCount(): number {
449462
return this.documents?.length ?? 0;
@@ -858,16 +871,15 @@ export abstract class AbstractCursor<
858871
this.isClosed = false;
859872
this.isKilled = false;
860873
this.initialized = false;
874+
this.hasEmittedClose = false;
875+
this.trackCursor();
861876

862-
const session = this.cursorSession;
863-
if (session) {
864-
// We only want to end this session if we created it, and it hasn't ended yet
865-
if (session.explicit === false) {
866-
if (!session.hasEnded) {
867-
session.endSession().then(undefined, squashError);
868-
}
869-
this.cursorSession = this.cursorClient.startSession({ owner: this, explicit: false });
877+
// We only want to end this session if we created it, and it hasn't ended yet
878+
if (this.cursorSession.explicit === false) {
879+
if (!this.cursorSession.hasEnded) {
880+
this.cursorSession.endSession().then(undefined, squashError);
870881
}
882+
this.cursorSession = this.cursorClient.startSession({ owner: this, explicit: false });
871883
}
872884
}
873885

@@ -1004,7 +1016,6 @@ export abstract class AbstractCursor<
10041016
private async cleanup(timeoutMS?: number, error?: Error) {
10051017
this.abortListener?.[kDispose]();
10061018
this.isClosed = true;
1007-
const session = this.cursorSession;
10081019
const timeoutContextForKillCursors = (): CursorTimeoutContext | undefined => {
10091020
if (timeoutMS != null) {
10101021
this.timeoutContext?.clear();
@@ -1026,7 +1037,7 @@ export abstract class AbstractCursor<
10261037
!this.cursorId.isZero() &&
10271038
this.cursorNamespace &&
10281039
this.selectedServer &&
1029-
!session.hasEnded
1040+
!this.cursorSession.hasEnded
10301041
) {
10311042
this.isKilled = true;
10321043
const cursorId = this.cursorId;
@@ -1035,22 +1046,24 @@ export abstract class AbstractCursor<
10351046
await executeOperation(
10361047
this.cursorClient,
10371048
new KillCursorsOperation(cursorId, this.cursorNamespace, this.selectedServer, {
1038-
session
1049+
session: this.cursorSession
10391050
}),
10401051
timeoutContextForKillCursors()
10411052
);
10421053
}
10431054
} catch (error) {
10441055
squashError(error);
10451056
} finally {
1046-
if (session?.owner === this) {
1047-
await session.endSession({ error });
1048-
}
1049-
if (!session?.inTransaction()) {
1050-
maybeClearPinnedConnection(session, { error });
1057+
try {
1058+
if (this.cursorSession?.owner === this) {
1059+
await this.cursorSession.endSession({ error });
1060+
}
1061+
if (!this.cursorSession?.inTransaction()) {
1062+
maybeClearPinnedConnection(this.cursorSession, { error });
1063+
}
1064+
} finally {
1065+
this.emitClose();
10511066
}
1052-
1053-
this.emitClose();
10541067
}
10551068
}
10561069

src/mongo_client.ts

+13
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import type { ClientMetadata } from './cmap/handshake/client_metadata';
1818
import type { CompressorName } from './cmap/wire_protocol/compression';
1919
import { parseOptions, resolveSRVRecord } from './connection_string';
2020
import { MONGO_CLIENT_EVENTS } from './constants';
21+
import { type AbstractCursor } from './cursor/abstract_cursor';
2122
import { Db, type DbOptions } from './db';
2223
import type { Encrypter } from './encrypter';
2324
import { MongoInvalidArgumentError } from './error';
@@ -323,6 +324,12 @@ export interface MongoClientPrivate {
323324
* - used to notify the leak checker in our tests if test author forgot to clean up explicit sessions
324325
*/
325326
readonly activeSessions: Set<ClientSession>;
327+
/**
328+
* We keep a reference to the cursors that are created from this client.
329+
* - used to track and close all cursors in client.close().
330+
* Cursors in this set are ones that still need to have their close method invoked (no other conditions are considered)
331+
*/
332+
readonly activeCursors: Set<AbstractCursor>;
326333
readonly sessionPool: ServerSessionPool;
327334
readonly options: MongoOptions;
328335
readonly readConcern?: ReadConcern;
@@ -398,6 +405,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
398405
hasBeenClosed: false,
399406
sessionPool: new ServerSessionPool(this),
400407
activeSessions: new Set(),
408+
activeCursors: new Set(),
401409
authProviders: new MongoClientAuthProviders(),
402410

403411
get options() {
@@ -650,6 +658,11 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
650658
writable: false
651659
});
652660

661+
const activeCursorCloses = Array.from(this.s.activeCursors, cursor => cursor.close());
662+
this.s.activeCursors.clear();
663+
664+
await Promise.all(activeCursorCloses);
665+
653666
const activeSessionEnds = Array.from(this.s.activeSessions, session => session.endSession());
654667
this.s.activeSessions.clear();
655668

test/integration/crud/find_cursor_methods.test.js

+20-1
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ describe('Find Cursor', function () {
251251
});
252252
});
253253

254-
context('#rewind', function () {
254+
describe('#rewind', function () {
255255
it('should rewind a cursor', async function () {
256256
const coll = client.db().collection('abstract_cursor');
257257
const cursor = coll.find({});
@@ -335,6 +335,25 @@ describe('Find Cursor', function () {
335335
});
336336
}
337337
});
338+
339+
it('emits close after rewind', async () => {
340+
let cursor;
341+
try {
342+
const coll = client.db().collection('abstract_cursor');
343+
cursor = coll.find({}, { batchSize: 1 });
344+
const closes = [];
345+
cursor.on('close', () => closes.push('close'));
346+
const doc0 = await cursor.next();
347+
await cursor.close();
348+
cursor.rewind();
349+
const doc1 = await cursor.next();
350+
await cursor.close();
351+
expect(doc0).to.deep.equal(doc1); // make sure rewind happened
352+
expect(closes).to.have.lengthOf(2);
353+
} finally {
354+
await cursor.close();
355+
}
356+
});
338357
});
339358

340359
context('#allowDiskUse', function () {

test/integration/crud/misc_cursors.test.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ const sinon = require('sinon');
1010
const { Writable } = require('stream');
1111
const { once, on } = require('events');
1212
const { setTimeout } = require('timers');
13-
const { ReadPreference, MongoExpiredSessionError } = require('../../mongodb');
13+
const { ReadPreference } = require('../../mongodb');
1414
const { ServerType } = require('../../mongodb');
1515
const { formatSort } = require('../../mongodb');
1616

@@ -1872,7 +1872,7 @@ describe('Cursor', function () {
18721872
expect(cursor).to.have.property('closed', true);
18731873

18741874
const error = await rejectedEarlyBecauseClientClosed;
1875-
expect(error).to.be.instanceOf(MongoExpiredSessionError);
1875+
expect(error).to.be.null; // TODO(NODE-6632): This should throw again after the client signal aborts the in-progress next call
18761876
});
18771877

18781878
it('shouldAwaitData', {

test/integration/node-specific/abort_signal.test.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -603,7 +603,7 @@ describe('AbortSignal support', () => {
603603
const start = performance.now();
604604
const result = await cursor.toArray().catch(error => error);
605605
const end = performance.now();
606-
expect(end - start).to.be.lessThan(15);
606+
expect(end - start).to.be.lessThan(50);
607607

608608
expect(result).to.be.instanceOf(DOMException);
609609
});

test/integration/node-specific/abstract_cursor.test.ts

+43
Original file line numberDiff line numberDiff line change
@@ -556,4 +556,47 @@ describe('class AbstractCursor', function () {
556556
);
557557
});
558558
});
559+
560+
describe('cursor tracking', () => {
561+
let client: MongoClient;
562+
let collection: Collection;
563+
564+
beforeEach(async function () {
565+
client = this.configuration.newClient();
566+
collection = client.db('activeCursors').collection('activeCursors');
567+
await collection.drop().catch(() => null);
568+
await collection.insertMany(Array.from({ length: 50 }, (_, i) => ({ i })));
569+
});
570+
571+
afterEach(async function () {
572+
await client.close();
573+
});
574+
575+
it('adds itself to a set upon construction', () => {
576+
collection.find({}, { batchSize: 1 });
577+
expect(client.s.activeCursors).to.have.lengthOf(1);
578+
});
579+
580+
it('adds itself to a set upon rewind', async () => {
581+
const cursor = collection.find({}, { batchSize: 1 });
582+
await cursor.next();
583+
expect(client.s.activeCursors).to.have.lengthOf(1);
584+
await cursor.close();
585+
expect(client.s.activeCursors).to.have.lengthOf(0);
586+
cursor.rewind();
587+
expect(client.s.activeCursors).to.have.lengthOf(1);
588+
});
589+
590+
it('does not add more than one close listener', async () => {
591+
const cursor = collection.find({}, { batchSize: 1 });
592+
await cursor.next();
593+
expect(cursor.listeners('close')).to.have.lengthOf(1);
594+
await cursor.close();
595+
expect(cursor.listeners('close')).to.have.lengthOf(0);
596+
cursor.rewind();
597+
cursor.rewind();
598+
cursor.rewind();
599+
expect(cursor.listeners('close')).to.have.lengthOf(1);
600+
});
601+
});
559602
});

test/integration/node-specific/mongo_client.test.ts

+83-3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import * as net from 'net';
44
import * as sinon from 'sinon';
55

66
import {
7+
type Collection,
78
type CommandFailedEvent,
89
type CommandStartedEvent,
910
type CommandSucceededEvent,
@@ -31,7 +32,6 @@ describe('class MongoClient', function () {
3132
afterEach(async () => {
3233
sinon.restore();
3334
await client?.close();
34-
// @ts-expect-error: Put this variable back to undefined to force tests to make their own client
3535
client = undefined;
3636
});
3737

@@ -567,7 +567,44 @@ describe('class MongoClient', function () {
567567
});
568568
});
569569

570-
context('#close()', () => {
570+
describe('active cursors', function () {
571+
let client: MongoClient;
572+
let collection: Collection<{ _id: number }>;
573+
const kills = [];
574+
575+
beforeEach(async function () {
576+
client = this.configuration.newClient();
577+
collection = client.db('activeCursors').collection('activeCursors');
578+
await collection.drop().catch(() => null);
579+
await collection.insertMany(Array.from({ length: 50 }, (_, _id) => ({ _id })));
580+
581+
kills.length = 0;
582+
client.on('commandStarted', ev => ev.commandName === 'killCursors' && kills.push(ev));
583+
});
584+
585+
afterEach(async function () {
586+
await client.close();
587+
});
588+
589+
it('are tracked upon creation and removed upon exhaustion', async () => {
590+
const cursors = Array.from({ length: 30 }, (_, skip) =>
591+
collection.find({}, { skip, batchSize: 1 })
592+
);
593+
expect(client.s.activeCursors).to.have.lengthOf(30);
594+
await Promise.all(cursors.map(c => c.toArray()));
595+
expect(client.s.activeCursors).to.have.lengthOf(0);
596+
expect(kills).to.have.lengthOf(0);
597+
});
598+
599+
it('are removed from tracking if exhausted in first batch', async () => {
600+
const cursors = Array.from({ length: 30 }, () => collection.find());
601+
expect(client.s.activeCursors).to.have.lengthOf(30);
602+
await Promise.all(cursors.map(c => c.next())); // only one document pulled from each.
603+
expect(client.s.activeCursors).to.have.lengthOf(0);
604+
});
605+
});
606+
607+
describe('#close()', () => {
571608
let client: MongoClient;
572609
let db: Db;
573610

@@ -702,7 +739,7 @@ describe('class MongoClient', function () {
702739
expect(endEvents[0]).to.have.property('reply', undefined); // noReponse: true
703740
});
704741

705-
context('when server selection would return no servers', () => {
742+
describe('when server selection would return no servers', () => {
706743
const serverDescription = new ServerDescription('a:1');
707744

708745
it('short circuits and does not end sessions', async () => {
@@ -722,6 +759,49 @@ describe('class MongoClient', function () {
722759
expect(client.s.sessionPool.sessions).to.have.lengthOf(1);
723760
});
724761
});
762+
763+
describe('active cursors', function () {
764+
let collection: Collection<{ _id: number }>;
765+
const kills = [];
766+
767+
beforeEach(async () => {
768+
collection = client.db('test').collection('activeCursors');
769+
await collection.drop().catch(() => null);
770+
await collection.insertMany(Array.from({ length: 50 }, (_, _id) => ({ _id })));
771+
772+
kills.length = 0;
773+
client.on('commandStarted', ev => ev.commandName === 'killCursors' && kills.push(ev));
774+
});
775+
776+
it('are all closed', async () => {
777+
const cursors = Array.from({ length: 30 }, (_, skip) =>
778+
collection.find({}, { skip, batchSize: 1 })
779+
);
780+
await Promise.all(cursors.map(c => c.next()));
781+
expect(client.s.activeCursors).to.have.lengthOf(30);
782+
await client.close();
783+
expect(client.s.activeCursors).to.have.lengthOf(0);
784+
expect(kills).to.have.lengthOf(30);
785+
});
786+
787+
it('creating cursors after close adds to activeCursors', async () => {
788+
expect(client.s.activeCursors).to.have.lengthOf(0);
789+
await client.close();
790+
collection.find({});
791+
expect(client.s.activeCursors).to.have.lengthOf(1);
792+
});
793+
794+
it('rewinding cursors after close adds to activeCursors', async () => {
795+
expect(client.s.activeCursors).to.have.lengthOf(0);
796+
const cursor = collection.find({}, { batchSize: 1 });
797+
await cursor.next();
798+
expect(client.s.activeCursors).to.have.lengthOf(1);
799+
await client.close();
800+
expect(client.s.activeCursors).to.have.lengthOf(0);
801+
cursor.rewind();
802+
expect(client.s.activeCursors).to.have.lengthOf(1);
803+
});
804+
});
725805
});
726806

727807
context('when connecting', function () {

0 commit comments

Comments
 (0)