Skip to content

Commit 4c9b4d8

Browse files
authored
fix(NODE-4783): handle orphaned operation descriptions (#3463)
1 parent 04203c7 commit 4c9b4d8

File tree

4 files changed

+250
-37
lines changed

4 files changed

+250
-37
lines changed

src/cmap/connection.ts

+29-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import {
1717
MongoMissingDependencyError,
1818
MongoNetworkError,
1919
MongoNetworkTimeoutError,
20+
MongoRuntimeError,
2021
MongoServerError,
2122
MongoWriteConcernError
2223
} from '../error';
@@ -68,6 +69,8 @@ const kAutoEncrypter = Symbol('autoEncrypter');
6869
/** @internal */
6970
const kDelayedTimeoutId = Symbol('delayedTimeoutId');
7071

72+
const INVALID_QUEUE_SIZE = 'Connection internal queue contains more than 1 operation description';
73+
7174
/** @internal */
7275
export interface CommandOptions extends BSONSerializeOptions {
7376
command?: boolean;
@@ -369,7 +372,28 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
369372

370373
// always emit the message, in case we are streaming
371374
this.emit('message', message);
372-
const operationDescription = this[kQueue].get(message.responseTo);
375+
let operationDescription = this[kQueue].get(message.responseTo);
376+
377+
if (!operationDescription && this.isMonitoringConnection) {
378+
// This is how we recover when the initial hello's requestId is not
379+
// the responseTo when hello responses have been skipped:
380+
381+
// First check if the map is of invalid size
382+
if (this[kQueue].size > 1) {
383+
this.onError(new MongoRuntimeError(INVALID_QUEUE_SIZE));
384+
} else {
385+
// Get the first orphaned operation description.
386+
const entry = this[kQueue].entries().next();
387+
if (entry) {
388+
const [requestId, orphaned]: [number, OperationDescription] = entry.value;
389+
// If the orphaned operation description exists then set it.
390+
operationDescription = orphaned;
391+
// Remove the entry with the bad request id from the queue.
392+
this[kQueue].delete(requestId);
393+
}
394+
}
395+
}
396+
373397
if (!operationDescription) {
374398
return;
375399
}
@@ -381,7 +405,10 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
381405
// making the `responseTo` change on each response
382406
this[kQueue].delete(message.responseTo);
383407
if ('moreToCome' in message && message.moreToCome) {
384-
// requeue the callback for next synthetic request
408+
// If the operation description check above does find an orphaned
409+
// description and sets the operationDescription then this line will put one
410+
// back in the queue with the correct requestId and will resolve not being able
411+
// to find the next one via the responseTo of the next streaming hello.
385412
this[kQueue].set(message.requestId, operationDescription);
386413
} else if (operationDescription.socketTimeoutOverride) {
387414
this[kStream].setTimeout(this.socketTimeoutMS);

test/tools/utils.ts

+13
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { EJSON } from 'bson';
22
import * as BSON from 'bson';
33
import { expect } from 'chai';
4+
import { Readable } from 'stream';
45
import { setTimeout } from 'timers';
56
import { inspect, promisify } from 'util';
67

@@ -354,6 +355,18 @@ export class TestBuilder {
354355
}
355356
}
356357

358+
export function bufferToStream(buffer) {
359+
const stream = new Readable();
360+
if (Array.isArray(buffer)) {
361+
buffer.forEach(b => stream.push(b));
362+
} else {
363+
stream.push(buffer);
364+
}
365+
366+
stream.push(null);
367+
return stream;
368+
}
369+
357370
export function generateOpMsgBuffer(document: Document): Buffer {
358371
const header = Buffer.alloc(4 * 4 + 4);
359372

test/unit/cmap/connection.test.ts

+207-22
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
import { expect } from 'chai';
2-
import { EventEmitter } from 'events';
2+
import { EventEmitter, once } from 'events';
33
import { Socket } from 'net';
44
import * as sinon from 'sinon';
5+
import { Readable } from 'stream';
56
import { setTimeout } from 'timers';
67

8+
import { BinMsg } from '../../../src/cmap/commands';
79
import { connect } from '../../../src/cmap/connect';
810
import { Connection, hasSessionSupport } from '../../../src/cmap/connection';
911
import { MessageStream } from '../../../src/cmap/message_stream';
10-
import { MongoNetworkTimeoutError } from '../../../src/error';
12+
import { MongoNetworkTimeoutError, MongoRuntimeError } from '../../../src/error';
1113
import { isHello, ns } from '../../../src/utils';
1214
import * as mock from '../../tools/mongodb-mock/index';
13-
import { getSymbolFrom } from '../../tools/utils';
15+
import { generateOpMsgBuffer, getSymbolFrom } from '../../tools/utils';
1416
import { createTimerSandbox } from '../timer_sandbox';
1517

1618
const connectionOptionsDefaults = {
@@ -22,6 +24,25 @@ const connectionOptionsDefaults = {
2224
loadBalanced: false
2325
};
2426

27+
/** The absolute minimum socket API needed by Connection as of writing this test */
28+
class FakeSocket extends EventEmitter {
29+
address() {
30+
// is never called
31+
}
32+
pipe() {
33+
// does not need to do anything
34+
}
35+
destroy() {
36+
// is called, has no side effects
37+
}
38+
get remoteAddress() {
39+
return 'iLoveJavaScript';
40+
}
41+
get remotePort() {
42+
return 123;
43+
}
44+
}
45+
2546
describe('new Connection()', function () {
2647
let server;
2748
after(() => mock.cleanup());
@@ -137,6 +158,189 @@ describe('new Connection()', function () {
137158
});
138159
});
139160

161+
describe('#onMessage', function () {
162+
context('when the connection is a monitoring connection', function () {
163+
let queue: Map<number, OperationDescription>;
164+
let driverSocket: FakeSocket;
165+
let connection: Connection;
166+
167+
beforeEach(function () {
168+
driverSocket = sinon.spy(new FakeSocket());
169+
});
170+
171+
context('when multiple hellos exist on the stream', function () {
172+
let callbackSpy;
173+
const inputStream = new Readable();
174+
const document = { ok: 1 };
175+
const last = { isWritablePrimary: true };
176+
177+
beforeEach(function () {
178+
callbackSpy = sinon.spy();
179+
const firstHello = generateOpMsgBuffer(document);
180+
const secondHello = generateOpMsgBuffer(document);
181+
const thirdHello = generateOpMsgBuffer(last);
182+
const buffer = Buffer.concat([firstHello, secondHello, thirdHello]);
183+
184+
connection = sinon.spy(new Connection(inputStream, connectionOptionsDefaults));
185+
connection.isMonitoringConnection = true;
186+
const queueSymbol = getSymbolFrom(connection, 'queue');
187+
queue = connection[queueSymbol];
188+
189+
// Create the operation description.
190+
const operationDescription: OperationDescription = {
191+
requestId: 1,
192+
cb: callbackSpy
193+
};
194+
195+
// Stick an operation description in the queue.
196+
queue.set(1, operationDescription);
197+
198+
// Push the buffer of 3 hellos to the input stream
199+
inputStream.push(buffer);
200+
inputStream.push(null);
201+
});
202+
203+
it('calls the callback with the last hello document', async function () {
204+
const messages = await once(connection, 'message');
205+
expect(messages[0].responseTo).to.equal(0);
206+
expect(callbackSpy).to.be.calledOnceWith(undefined, last);
207+
});
208+
});
209+
210+
context('when requestId/responseTo do not match', function () {
211+
let callbackSpy;
212+
const document = { ok: 1 };
213+
214+
beforeEach(function () {
215+
callbackSpy = sinon.spy();
216+
217+
// @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay
218+
connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults));
219+
connection.isMonitoringConnection = true;
220+
const queueSymbol = getSymbolFrom(connection, 'queue');
221+
queue = connection[queueSymbol];
222+
223+
// Create the operation description.
224+
const operationDescription: OperationDescription = {
225+
requestId: 1,
226+
cb: callbackSpy
227+
};
228+
229+
// Stick an operation description in the queue.
230+
queue.set(1, operationDescription);
231+
// Emit a message that won't match the existing operation description.
232+
const msg = generateOpMsgBuffer(document);
233+
const msgHeader: MessageHeader = {
234+
length: msg.readInt32LE(0),
235+
requestId: 1,
236+
responseTo: 0, // This will not match.
237+
opCode: msg.readInt32LE(12)
238+
};
239+
const msgBody = msg.subarray(16);
240+
241+
const message = new BinMsg(msg, msgHeader, msgBody);
242+
connection.onMessage(message);
243+
});
244+
245+
it('calls the operation description callback with the document', function () {
246+
expect(callbackSpy).to.be.calledOnceWith(undefined, document);
247+
});
248+
});
249+
250+
context('when requestId/reponseTo match', function () {
251+
let callbackSpy;
252+
const document = { ok: 1 };
253+
254+
beforeEach(function () {
255+
callbackSpy = sinon.spy();
256+
257+
// @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay
258+
connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults));
259+
connection.isMonitoringConnection = true;
260+
const queueSymbol = getSymbolFrom(connection, 'queue');
261+
queue = connection[queueSymbol];
262+
263+
// Create the operation description.
264+
const operationDescription: OperationDescription = {
265+
requestId: 1,
266+
cb: callbackSpy
267+
};
268+
269+
// Stick an operation description in the queue.
270+
queue.set(1, operationDescription);
271+
// Emit a message that matches the existing operation description.
272+
const msg = generateOpMsgBuffer(document);
273+
const msgHeader: MessageHeader = {
274+
length: msg.readInt32LE(0),
275+
requestId: 2,
276+
responseTo: 1,
277+
opCode: msg.readInt32LE(12)
278+
};
279+
const msgBody = msg.subarray(16);
280+
281+
const message = new BinMsg(msg, msgHeader, msgBody);
282+
connection.onMessage(message);
283+
});
284+
285+
it('calls the operation description callback with the document', function () {
286+
expect(callbackSpy).to.be.calledOnceWith(undefined, document);
287+
});
288+
});
289+
290+
context('when more than one operation description is in the queue', function () {
291+
let spyOne;
292+
let spyTwo;
293+
const document = { ok: 1 };
294+
295+
beforeEach(function () {
296+
spyOne = sinon.spy();
297+
spyTwo = sinon.spy();
298+
299+
// @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay
300+
connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults));
301+
connection.isMonitoringConnection = true;
302+
const queueSymbol = getSymbolFrom(connection, 'queue');
303+
queue = connection[queueSymbol];
304+
305+
// Create the operation descriptions.
306+
const descriptionOne: OperationDescription = {
307+
requestId: 1,
308+
cb: spyOne
309+
};
310+
const descriptionTwo: OperationDescription = {
311+
requestId: 2,
312+
cb: spyTwo
313+
};
314+
315+
// Stick an operation description in the queue.
316+
queue.set(2, descriptionOne);
317+
queue.set(3, descriptionTwo);
318+
// Emit a message that matches the existing operation description.
319+
const msg = generateOpMsgBuffer(document);
320+
const msgHeader: MessageHeader = {
321+
length: msg.readInt32LE(0),
322+
requestId: 2,
323+
responseTo: 1,
324+
opCode: msg.readInt32LE(12)
325+
};
326+
const msgBody = msg.subarray(16);
327+
328+
const message = new BinMsg(msg, msgHeader, msgBody);
329+
connection.onMessage(message);
330+
});
331+
332+
it('calls all operation description callbacks with an error', function () {
333+
expect(spyOne).to.be.calledOnce;
334+
expect(spyTwo).to.be.calledOnce;
335+
const errorOne = spyOne.firstCall.args[0];
336+
const errorTwo = spyTwo.firstCall.args[0];
337+
expect(errorOne).to.be.instanceof(MongoRuntimeError);
338+
expect(errorTwo).to.be.instanceof(MongoRuntimeError);
339+
});
340+
});
341+
});
342+
});
343+
140344
describe('onTimeout()', () => {
141345
let connection: sinon.SinonSpiedInstance<Connection>;
142346
let clock: sinon.SinonFakeTimers;
@@ -146,25 +350,6 @@ describe('new Connection()', function () {
146350
let kDelayedTimeoutId: symbol;
147351
let NodeJSTimeoutClass: any;
148352

149-
/** The absolute minimum socket API needed by Connection as of writing this test */
150-
class FakeSocket extends EventEmitter {
151-
address() {
152-
// is never called
153-
}
154-
pipe() {
155-
// does not need to do anything
156-
}
157-
destroy() {
158-
// is called, has no side effects
159-
}
160-
get remoteAddress() {
161-
return 'iLoveJavaScript';
162-
}
163-
get remotePort() {
164-
return 123;
165-
}
166-
}
167-
168353
beforeEach(() => {
169354
timerSandbox = createTimerSandbox();
170355
clock = sinon.useFakeTimers();

test/unit/cmap/message_stream.test.js

+1-13
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,7 @@ const { MessageStream } = require('../../../src/cmap/message_stream');
66
const { Msg } = require('../../../src/cmap/commands');
77
const expect = require('chai').expect;
88
const { LEGACY_HELLO_COMMAND } = require('../../../src/constants');
9-
const { generateOpMsgBuffer } = require('../../tools/utils');
10-
11-
function bufferToStream(buffer) {
12-
const stream = new Readable();
13-
if (Array.isArray(buffer)) {
14-
buffer.forEach(b => stream.push(b));
15-
} else {
16-
stream.push(buffer);
17-
}
18-
19-
stream.push(null);
20-
return stream;
21-
}
9+
const { bufferToStream, generateOpMsgBuffer } = require('../../tools/utils');
2210

2311
describe('MessageStream', function () {
2412
context('when the stream is for a monitoring connection', function () {

0 commit comments

Comments
 (0)