Skip to content

Commit 567fb98

Browse files
committed
fix: track position, helper, tailable interrupt
1 parent adade92 commit 567fb98

File tree

2 files changed

+23
-26
lines changed

2 files changed

+23
-26
lines changed

src/cursor/abstract_cursor.ts

+21-24
Original file line numberDiff line numberDiff line change
@@ -264,10 +264,6 @@ export abstract class AbstractCursor<
264264
throw new MongoRuntimeError('Cursor must be constructed with MongoClient');
265265
}
266266
this.cursorClient = client;
267-
268-
this.cursorClient.s.activeCursors.add(this);
269-
this.once('close', removeActiveCursor);
270-
271267
this.cursorNamespace = namespace;
272268
this.cursorId = null;
273269
this.initialized = false;
@@ -360,6 +356,7 @@ export abstract class AbstractCursor<
360356
};
361357

362358
this.timeoutContext = options.timeoutContext;
359+
this.trackCursor();
363360
}
364361

365362
/**
@@ -439,6 +436,14 @@ export abstract class AbstractCursor<
439436
await this.close();
440437
}
441438

439+
/** Adds cursor to client's tracking so it will be closed by MongoClient.close() */
440+
private trackCursor() {
441+
this.cursorClient.s.activeCursors.add(this);
442+
if (!this.listeners('close').includes(removeActiveCursor)) {
443+
this.once('close', removeActiveCursor);
444+
}
445+
}
446+
442447
/** Returns current buffered documents length */
443448
bufferedCount(): number {
444449
return this.documents?.length ?? 0;
@@ -832,21 +837,14 @@ export abstract class AbstractCursor<
832837
this.isClosed = false;
833838
this.isKilled = false;
834839
this.initialized = false;
840+
this.trackCursor();
835841

836-
this.cursorClient.s.activeCursors.add(this);
837-
if (!this.listeners('close').includes(removeActiveCursor)) {
838-
this.once('close', removeActiveCursor);
839-
}
840-
841-
const session = this.cursorSession;
842-
if (session) {
843-
// We only want to end this session if we created it, and it hasn't ended yet
844-
if (session.explicit === false) {
845-
if (!session.hasEnded) {
846-
session.endSession().then(undefined, squashError);
847-
}
848-
this.cursorSession = this.cursorClient.startSession({ owner: this, explicit: false });
842+
// We only want to end this session if we created it, and it hasn't ended yet
843+
if (this.cursorSession.explicit === false) {
844+
if (!this.cursorSession.hasEnded) {
845+
this.cursorSession.endSession().then(undefined, squashError);
849846
}
847+
this.cursorSession = this.cursorClient.startSession({ owner: this, explicit: false });
850848
}
851849
}
852850

@@ -982,7 +980,6 @@ export abstract class AbstractCursor<
982980
/** @internal */
983981
private async cleanup(timeoutMS?: number, error?: Error) {
984982
this.isClosed = true;
985-
const session = this.cursorSession;
986983
const timeoutContextForKillCursors = (): CursorTimeoutContext | undefined => {
987984
if (timeoutMS != null) {
988985
this.timeoutContext?.clear();
@@ -1004,7 +1001,7 @@ export abstract class AbstractCursor<
10041001
!this.cursorId.isZero() &&
10051002
this.cursorNamespace &&
10061003
this.selectedServer &&
1007-
!session.hasEnded
1004+
!this.cursorSession.hasEnded
10081005
) {
10091006
this.isKilled = true;
10101007
const cursorId = this.cursorId;
@@ -1013,7 +1010,7 @@ export abstract class AbstractCursor<
10131010
await executeOperation(
10141011
this.cursorClient,
10151012
new KillCursorsOperation(cursorId, this.cursorNamespace, this.selectedServer, {
1016-
session
1013+
session: this.cursorSession
10171014
}),
10181015
timeoutContextForKillCursors()
10191016
);
@@ -1022,11 +1019,11 @@ export abstract class AbstractCursor<
10221019
squashError(error);
10231020
} finally {
10241021
try {
1025-
if (session?.owner === this) {
1026-
await session.endSession({ error });
1022+
if (this.cursorSession?.owner === this) {
1023+
await this.cursorSession.endSession({ error });
10271024
}
1028-
if (!session?.inTransaction()) {
1029-
maybeClearPinnedConnection(session, { error });
1025+
if (!this.cursorSession?.inTransaction()) {
1026+
maybeClearPinnedConnection(this.cursorSession, { error });
10301027
}
10311028
} finally {
10321029
this.emitClose();

test/integration/crud/misc_cursors.test.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ const sinon = require('sinon');
1010
const { Writable } = require('stream');
1111
const { once, on } = require('events');
1212
const { setTimeout } = require('timers');
13-
const { ReadPreference, MongoExpiredSessionError } = require('../../mongodb');
13+
const { ReadPreference } = require('../../mongodb');
1414
const { ServerType } = require('../../mongodb');
1515
const { formatSort } = require('../../mongodb');
1616

@@ -1872,7 +1872,7 @@ describe('Cursor', function () {
18721872
expect(cursor).to.have.property('closed', true);
18731873

18741874
const error = await rejectedEarlyBecauseClientClosed;
1875-
expect(error).to.be.instanceOf(MongoExpiredSessionError);
1875+
expect(error).to.be.null; // TODO: is this API or just "how it behaved at the time"
18761876
});
18771877

18781878
it('shouldAwaitData', {

0 commit comments

Comments
 (0)