Skip to content

Commit 2d178d0

Browse files
nbbeekendurran
andauthored
fix(NODE-5027): revert "ensure that MessageStream is destroyed when connections are destroyed" (#3552)
Co-authored-by: Durran Jordan <[email protected]>
1 parent bf69bd6 commit 2d178d0

13 files changed

+128
-279
lines changed

src/cmap/connect.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ function performInitialHandshake(
9696
) {
9797
const callback: Callback<Document> = function (err, ret) {
9898
if (err && conn) {
99-
conn.destroy({ force: false });
99+
conn.destroy();
100100
}
101101
_callback(err, ret);
102102
};

src/cmap/connection.ts

+38-17
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,10 @@ export interface ConnectionOptions
130130
metadata: ClientMetadata;
131131
}
132132

133-
/** @public */
133+
/** @internal */
134134
export interface DestroyOptions {
135135
/** Force the destruction. */
136-
force: boolean;
136+
force?: boolean;
137137
}
138138

139139
/** @public */
@@ -154,8 +154,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
154154
address: string;
155155
socketTimeoutMS: number;
156156
monitorCommands: boolean;
157-
/** Indicates that the connection (including underlying TCP socket) has been closed. */
158157
closed: boolean;
158+
destroyed: boolean;
159159
lastHelloMS?: number;
160160
serverApi?: ServerApi;
161161
helloOk?: boolean;
@@ -204,6 +204,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
204204
this.monitorCommands = options.monitorCommands;
205205
this.serverApi = options.serverApi;
206206
this.closed = false;
207+
this.destroyed = false;
207208
this[kHello] = null;
208209
this[kClusterTime] = null;
209210

@@ -296,7 +297,10 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
296297
if (this.closed) {
297298
return;
298299
}
299-
this.destroy({ force: false });
300+
301+
this[kStream].destroy(error);
302+
303+
this.closed = true;
300304

301305
for (const op of this[kQueue].values()) {
302306
op.cb(error);
@@ -310,7 +314,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
310314
if (this.closed) {
311315
return;
312316
}
313-
this.destroy({ force: false });
317+
318+
this.closed = true;
314319

315320
const message = `connection ${this.id} to ${this.address} closed`;
316321
for (const op of this[kQueue].values()) {
@@ -327,7 +332,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
327332
}
328333

329334
this[kDelayedTimeoutId] = setTimeout(() => {
330-
this.destroy({ force: false });
335+
this[kStream].destroy();
336+
337+
this.closed = true;
331338

332339
const message = `connection ${this.id} to ${this.address} timed out`;
333340
const beforeHandshake = this.hello == null;
@@ -436,27 +443,41 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
436443
callback(undefined, message.documents[0]);
437444
}
438445

439-
destroy(options: DestroyOptions, callback?: Callback): void {
446+
destroy(options?: DestroyOptions, callback?: Callback): void {
447+
if (typeof options === 'function') {
448+
callback = options;
449+
options = { force: false };
450+
}
451+
440452
this.removeAllListeners(Connection.PINNED);
441453
this.removeAllListeners(Connection.UNPINNED);
442454

443-
this[kMessageStream].destroy();
444-
this.closed = true;
455+
options = Object.assign({ force: false }, options);
456+
if (this[kStream] == null || this.destroyed) {
457+
this.destroyed = true;
458+
if (typeof callback === 'function') {
459+
callback();
460+
}
461+
462+
return;
463+
}
445464

446465
if (options.force) {
447466
this[kStream].destroy();
448-
if (callback) {
449-
return process.nextTick(callback);
467+
this.destroyed = true;
468+
if (typeof callback === 'function') {
469+
callback();
450470
}
471+
472+
return;
451473
}
452474

453-
if (!this[kStream].writableEnded) {
454-
this[kStream].end(callback);
455-
} else {
456-
if (callback) {
457-
return process.nextTick(callback);
475+
this[kStream].end(() => {
476+
this.destroyed = true;
477+
if (typeof callback === 'function') {
478+
callback();
458479
}
459-
}
480+
});
460481
}
461482

462483
command(

src/cmap/connection_pool.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
510510
ConnectionPool.CONNECTION_CLOSED,
511511
new ConnectionClosedEvent(this, conn, 'poolClosed')
512512
);
513-
conn.destroy({ force: !!options.force }, cb);
513+
conn.destroy(options, cb);
514514
},
515515
err => {
516516
this[kConnections].clear();
@@ -586,7 +586,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
586586
new ConnectionClosedEvent(this, connection, reason)
587587
);
588588
// destroy the connection
589-
process.nextTick(() => connection.destroy({ force: false }));
589+
process.nextTick(() => connection.destroy());
590590
}
591591

592592
private connectionIsStale(connection: Connection) {

src/sdam/server.ts

+1-4
Original file line numberDiff line numberDiff line change
@@ -241,10 +241,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
241241

242242
/** Destroy the server connection */
243243
destroy(options?: DestroyOptions, callback?: Callback): void {
244-
if (typeof options === 'function') {
245-
callback = options;
246-
options = { force: false };
247-
}
244+
if (typeof options === 'function') (callback = options), (options = {});
248245
options = Object.assign({}, { force: false }, options);
249246

250247
if (this.s.state === STATE_CLOSED) {

src/sdam/topology.ts

+13-4
Original file line numberDiff line numberDiff line change
@@ -466,17 +466,26 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
466466
}
467467

468468
/** Close this topology */
469+
close(callback: Callback): void;
469470
close(options: CloseOptions): void;
470471
close(options: CloseOptions, callback: Callback): void;
471-
close(options?: CloseOptions, callback?: Callback): void {
472-
options = options ?? { force: false };
472+
close(options?: CloseOptions | Callback, callback?: Callback): void {
473+
if (typeof options === 'function') {
474+
callback = options;
475+
options = {};
476+
}
477+
478+
if (typeof options === 'boolean') {
479+
options = { force: options };
480+
}
481+
options = options ?? {};
473482

474483
if (this.s.state === STATE_CLOSED || this.s.state === STATE_CLOSING) {
475484
return callback?.();
476485
}
477486

478487
const destroyedServers = Array.from(this.s.servers.values(), server => {
479-
return promisify(destroyServer)(server, this, { force: !!options?.force });
488+
return promisify(destroyServer)(server, this, options as CloseOptions);
480489
});
481490

482491
Promise.all(destroyedServers)
@@ -731,7 +740,7 @@ function destroyServer(
731740
options?: DestroyOptions,
732741
callback?: Callback
733742
) {
734-
options = options ?? { force: false };
743+
options = options ?? {};
735744
for (const event of LOCAL_SERVER_EVENTS) {
736745
server.removeAllListeners(event);
737746
}

test/integration/crud/misc_cursors.test.js

+49-20
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ const { ReadPreference } = require('../../mongodb');
1313
const { ServerType } = require('../../mongodb');
1414
const { formatSort } = require('../../mongodb');
1515
const { getSymbolFrom } = require('../../tools/utils');
16-
const { MongoExpiredSessionError } = require('../../mongodb');
1716

1817
describe('Cursor', function () {
1918
before(function () {
@@ -1868,31 +1867,61 @@ describe('Cursor', function () {
18681867
}
18691868
});
18701869

1871-
it('closes cursors when client is closed even if it has not been exhausted', async function () {
1872-
await client
1873-
.db()
1874-
.dropCollection('test_cleanup_tailable')
1875-
.catch(() => null);
1870+
it('should close dead tailable cursors', {
1871+
metadata: {
1872+
os: '!win32' // NODE-2943: timeout on windows
1873+
},
18761874

1877-
const collection = await client
1878-
.db()
1879-
.createCollection('test_cleanup_tailable', { capped: true, size: 1000, max: 3 });
1875+
test: function (done) {
1876+
// http://www.mongodb.org/display/DOCS/Tailable+Cursors
18801877

1881-
// insert only 2 docs in capped coll of 3
1882-
await collection.insertMany([{ a: 1 }, { a: 1 }]);
1878+
const configuration = this.configuration;
1879+
client.connect((err, client) => {
1880+
expect(err).to.not.exist;
1881+
this.defer(() => client.close());
18831882

1884-
const cursor = collection.find({}, { tailable: true, awaitData: true, maxAwaitTimeMS: 2000 });
1883+
const db = client.db(configuration.db);
1884+
const options = { capped: true, size: 10000000 };
1885+
db.createCollection(
1886+
'test_if_dead_tailable_cursors_close',
1887+
options,
1888+
function (err, collection) {
1889+
expect(err).to.not.exist;
18851890

1886-
await cursor.next();
1887-
await cursor.next();
1888-
// will block for maxAwaitTimeMS (except we are closing the client)
1889-
const rejectedEarlyBecauseClientClosed = cursor.next().catch(error => error);
1891+
let closeCount = 0;
1892+
const docs = Array.from({ length: 100 }).map(() => ({ a: 1 }));
1893+
collection.insertMany(docs, { w: 'majority', wtimeoutMS: 5000 }, err => {
1894+
expect(err).to.not.exist;
18901895

1891-
await client.close();
1892-
expect(cursor).to.have.property('killed', true);
1896+
const cursor = collection.find({}, { tailable: true, awaitData: true });
1897+
const stream = cursor.stream();
1898+
1899+
stream.resume();
1900+
1901+
var validator = () => {
1902+
closeCount++;
1903+
if (closeCount === 2) {
1904+
done();
1905+
}
1906+
};
1907+
1908+
// we validate that the stream "ends" either cleanly or with an error
1909+
stream.on('end', validator);
1910+
stream.on('error', validator);
1911+
1912+
cursor.on('close', validator);
18931913

1894-
const error = await rejectedEarlyBecauseClientClosed;
1895-
expect(error).to.be.instanceOf(MongoExpiredSessionError);
1914+
const docs = Array.from({ length: 100 }).map(() => ({ a: 1 }));
1915+
collection.insertMany(docs, err => {
1916+
expect(err).to.not.exist;
1917+
1918+
setTimeout(() => client.close());
1919+
});
1920+
});
1921+
}
1922+
);
1923+
});
1924+
}
18961925
});
18971926

18981927
it('shouldAwaitData', {

test/integration/node-specific/topology.test.js

+5-13
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,12 @@ describe('Topology', function () {
1010
const states = [];
1111
topology.on('stateChanged', (_, newState) => states.push(newState));
1212
topology.connect(err => {
13-
try {
13+
expect(err).to.not.exist;
14+
topology.close(err => {
1415
expect(err).to.not.exist;
15-
} catch (error) {
16-
done(error);
17-
}
18-
topology.close({}, err => {
19-
try {
20-
expect(err).to.not.exist;
21-
expect(topology.isDestroyed()).to.be.true;
22-
expect(states).to.eql(['connecting', 'connected', 'closing', 'closed']);
23-
done();
24-
} catch (error) {
25-
done(error);
26-
}
16+
expect(topology.isDestroyed()).to.be.true;
17+
expect(states).to.eql(['connecting', 'connected', 'closing', 'closed']);
18+
done();
2719
});
2820
});
2921
}

test/unit/assorted/polling_srv_records_for_mongos_discovery.prose.test.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ describe('Polling Srv Records for Mongos Discovery', () => {
103103

104104
afterEach(function (done) {
105105
if (context.topology) {
106-
context.topology.close({}, done);
106+
context.topology.close(done);
107107
} else {
108108
done();
109109
}

test/unit/assorted/server_selection_spec_helper.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ function executeServerSelectionTest(testDefinition, testDone) {
109109
});
110110

111111
function done(err) {
112-
topology.close({}, e => testDone(e || err));
112+
topology.close(e => testDone(e || err));
113113
}
114114

115115
topology.connect(err => {

0 commit comments

Comments
 (0)