Skip to content

Commit df69639

Browse files
committed
feat(NODE-6136): parse cursor responses on demand
1 parent 682a39e commit df69639

21 files changed

+294
-310
lines changed

src/bson.ts

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ export {
2727
UUID
2828
} from 'bson';
2929

30+
/** @internal */
3031
export type BSONElement = BSON.OnDemand['BSONElement'];
3132

3233
export function parseToElementsToArray(bytes: Uint8Array, offset?: number): BSONElement[] {

src/cmap/wire_protocol/responses.ts

+65-14
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import {
1010
} from '../../bson';
1111
import { MongoUnexpectedServerResponseError } from '../../error';
1212
import { type ClusterTime } from '../../sdam/common';
13-
import { type MongoDBNamespace, ns } from '../../utils';
13+
import { ns } from '../../utils';
1414
import { OnDemandDocument } from './on_demand/document';
1515

1616
// eslint-disable-next-line no-restricted-syntax
@@ -171,13 +171,29 @@ export class MongoDBResponse extends OnDemandDocument {
171171
}
172172
}
173173

174+
// Here's a litle blast from the past.
175+
// OLD style method definition so that I can override get without redefining ALL the fancy TS :/
176+
Object.defineProperty(MongoDBResponse.prototype, 'get', {
177+
value: function get(name: any, as: any, required: any) {
178+
try {
179+
return OnDemandDocument.prototype.get.call(this, name, as, required);
180+
} catch (cause) {
181+
throw new MongoUnexpectedServerResponseError(cause.message, { cause });
182+
}
183+
}
184+
});
185+
174186
/** @internal */
175187
export class CursorResponse extends MongoDBResponse {
176188
/**
177189
* This supports a feature of the FindCursor.
178190
* It is an optimization to avoid an extra getMore when the limit has been reached
179191
*/
180-
static emptyGetMore = { id: new Long(0), length: 0, shift: () => null };
192+
static emptyGetMore: CursorResponse = {
193+
id: new Long(0),
194+
length: 0,
195+
shift: () => null
196+
} as unknown as CursorResponse;
181197

182198
static override is(value: unknown): value is CursorResponse {
183199
return value instanceof CursorResponse || value === CursorResponse.emptyGetMore;
@@ -186,25 +202,25 @@ export class CursorResponse extends MongoDBResponse {
186202
private _batch: OnDemandDocument | null = null;
187203
private iterated = 0;
188204

189-
get cursor() {
205+
private get cursor() {
190206
return this.get('cursor', BSONType.object, true);
191207
}
192208

193-
get id(): Long {
209+
public get id(): Long {
194210
return Long.fromBigInt(this.cursor.get('id', BSONType.long, true));
195211
}
196212

197-
get ns() {
213+
public get ns() {
198214
const namespace = this.cursor.get('ns', BSONType.string);
199215
if (namespace != null) return ns(namespace);
200216
return null;
201217
}
202218

203-
get length() {
219+
public get length() {
204220
return Math.max(this.batchSize - this.iterated, 0);
205221
}
206222

207-
get batch() {
223+
private get batch() {
208224
if (this._batch != null) return this._batch;
209225
const cursor = this.cursor;
210226
if (cursor.has('firstBatch')) this._batch = cursor.get('firstBatch', BSONType.array, true);
@@ -213,11 +229,21 @@ export class CursorResponse extends MongoDBResponse {
213229
return this._batch;
214230
}
215231

216-
get batchSize() {
232+
public get batchSize() {
217233
return this.batch?.size();
218234
}
219235

220-
shift(options?: BSONSerializeOptions): any {
236+
public get postBatchResumeToken() {
237+
return (
238+
this.cursor.get('postBatchResumeToken', BSONType.object)?.toObject({
239+
promoteValues: false,
240+
promoteLongs: false,
241+
promoteBuffers: false
242+
}) ?? null
243+
);
244+
}
245+
246+
public shift(options?: BSONSerializeOptions): any {
221247
if (this.iterated >= this.batchSize) {
222248
return null;
223249
}
@@ -232,15 +258,40 @@ export class CursorResponse extends MongoDBResponse {
232258
}
233259
}
234260

235-
clear() {
261+
public clear() {
236262
this.iterated = this.batchSize;
237263
}
264+
}
265+
266+
/**
267+
* Explain responses have nothing to do with cursor responses
268+
* This class serves to temporarily avoid refactoring how cursors handle
269+
* explain responses which is to detect that the response is not cursor-like and return the explain
270+
* result as the "first and only" document in the "batch" and end the "cursor"
271+
*/
272+
export class ExplainedCursorResponse extends CursorResponse {
273+
isExplain = true;
274+
275+
override get id(): Long {
276+
return Long.fromBigInt(0n);
277+
}
278+
279+
override get batchSize() {
280+
return 0;
281+
}
282+
283+
override get ns() {
284+
return null;
285+
}
238286

239-
pushMany() {
240-
throw new Error('pushMany Unsupported method');
287+
_length = 1;
288+
override get length(): number {
289+
return this._length;
241290
}
242291

243-
push() {
244-
throw new Error('push Unsupported method');
292+
override shift(options?: BSONSerializeOptions | undefined) {
293+
if (this._length === 0) return null;
294+
this._length -= 1;
295+
return this.toObject(options);
245296
}
246297
}

src/cursor/abstract_cursor.ts

+12-56
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,15 @@ import {
1313
MongoTailableCursorError
1414
} from '../error';
1515
import type { MongoClient } from '../mongo_client';
16-
import { type TODO_NODE_3286, TypedEventEmitter } from '../mongo_types';
16+
import { TypedEventEmitter } from '../mongo_types';
1717
import { executeOperation, type ExecutionResult } from '../operations/execute_operation';
1818
import { GetMoreOperation } from '../operations/get_more';
1919
import { KillCursorsOperation } from '../operations/kill_cursors';
2020
import { ReadConcern, type ReadConcernLike } from '../read_concern';
2121
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
2222
import type { Server } from '../sdam/server';
2323
import { ClientSession, maybeClearPinnedConnection } from '../sessions';
24-
import { List, type MongoDBNamespace, ns, squashError } from '../utils';
24+
import { type MongoDBNamespace, squashError } from '../utils';
2525

2626
/** @internal */
2727
const kId = Symbol('id');
@@ -145,13 +145,7 @@ export abstract class AbstractCursor<
145145
/** @internal */
146146
[kNamespace]: MongoDBNamespace;
147147
/** @internal */
148-
[kDocuments]: {
149-
length: number;
150-
shift(bsonOptions?: any): TSchema | null;
151-
clear(): void;
152-
pushMany(many: Iterable<TSchema>): void;
153-
push(item: TSchema): void;
154-
};
148+
[kDocuments]: CursorResponse = { length: 0 } as unknown as CursorResponse;
155149
/** @internal */
156150
[kClient]: MongoClient;
157151
/** @internal */
@@ -182,7 +176,6 @@ export abstract class AbstractCursor<
182176
this[kClient] = client;
183177
this[kNamespace] = namespace;
184178
this[kId] = null;
185-
this[kDocuments] = new List();
186179
this[kInitialized] = false;
187180
this[kClosed] = false;
188181
this[kKilled] = false;
@@ -637,13 +630,12 @@ export abstract class AbstractCursor<
637630
protected abstract _initialize(session: ClientSession | undefined): Promise<ExecutionResult>;
638631

639632
/** @internal */
640-
async getMore(batchSize: number, useCursorResponse = false): Promise<Document | null> {
633+
async getMore(batchSize: number): Promise<CursorResponse> {
641634
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
642635
const getMoreOperation = new GetMoreOperation(this[kNamespace], this[kId]!, this[kServer]!, {
643636
...this[kOptions],
644637
session: this[kSession],
645-
batchSize,
646-
useCursorResponse
638+
batchSize
647639
});
648640

649641
return await executeOperation(this[kClient], getMoreOperation);
@@ -661,37 +653,13 @@ export abstract class AbstractCursor<
661653
const state = await this._initialize(this[kSession]);
662654
const response = state.response;
663655
this[kServer] = state.server;
664-
if (CursorResponse.is(response)) {
665-
this[kId] = response.id;
666-
if (response.ns) this[kNamespace] = response.ns;
667-
this[kDocuments] = response;
668-
} else if (response.cursor) {
669-
// TODO(NODE-2674): Preserve int64 sent from MongoDB
670-
this[kId] =
671-
typeof response.cursor.id === 'number'
672-
? Long.fromNumber(response.cursor.id)
673-
: typeof response.cursor.id === 'bigint'
674-
? Long.fromBigInt(response.cursor.id)
675-
: response.cursor.id;
676-
677-
if (response.cursor.ns) {
678-
this[kNamespace] = ns(response.cursor.ns);
679-
}
680-
681-
this[kDocuments].pushMany(response.cursor.firstBatch);
682-
}
683656

684-
// When server responses return without a cursor document, we close this cursor
685-
// and return the raw server response. This is often the case for explain commands
686-
// for example
687-
if (this[kId] == null) {
688-
this[kId] = Long.ZERO;
689-
// TODO(NODE-3286): ExecutionResult needs to accept a generic parameter
690-
this[kDocuments].push(state.response as TODO_NODE_3286);
691-
}
657+
if (!CursorResponse.is(response)) throw new Error('ah');
692658

693-
// the cursor is now initialized, even if it is dead
694-
this[kInitialized] = true;
659+
this[kId] = response.id;
660+
this[kNamespace] = response.ns ?? this[kNamespace];
661+
this[kDocuments] = response;
662+
this[kInitialized] = true; // the cursor is now initialized, even if it is dead
695663
} catch (error) {
696664
// the cursor is now initialized, even if an error occurred
697665
this[kInitialized] = true;
@@ -802,20 +770,8 @@ async function next<T>(
802770

803771
try {
804772
const response = await cursor.getMore(batchSize);
805-
if (CursorResponse.is(response)) {
806-
cursor[kId] = response.id;
807-
cursor[kDocuments] = response;
808-
} else if (response) {
809-
const cursorId =
810-
typeof response.cursor.id === 'number'
811-
? Long.fromNumber(response.cursor.id)
812-
: typeof response.cursor.id === 'bigint'
813-
? Long.fromBigInt(response.cursor.id)
814-
: response.cursor.id;
815-
816-
cursor[kDocuments].pushMany(response.cursor.nextBatch);
817-
cursor[kId] = cursorId;
818-
}
773+
cursor[kId] = response.id;
774+
cursor[kDocuments] = response;
819775
} catch (error) {
820776
try {
821777
await cleanupCursor(cursor, { error, needsToEmitClosed: true });

src/cursor/aggregation_cursor.ts

+10-8
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,16 @@ export class AggregationCursor<TSchema = any> extends AbstractCursor<TSchema> {
7676

7777
/** Execute the explain for the cursor */
7878
async explain(verbosity?: ExplainVerbosityLike): Promise<Document> {
79-
return await executeOperation(
80-
this.client,
81-
new AggregateOperation(this.namespace, this[kPipeline], {
82-
...this[kOptions], // NOTE: order matters here, we may need to refine this
83-
...this.cursorOptions,
84-
explain: verbosity ?? true
85-
})
86-
);
79+
return (
80+
await executeOperation(
81+
this.client,
82+
new AggregateOperation(this.namespace, this[kPipeline], {
83+
...this[kOptions], // NOTE: order matters here, we may need to refine this
84+
...this.cursorOptions,
85+
explain: verbosity ?? true
86+
})
87+
)
88+
).shift(this[kOptions]);
8789
}
8890

8991
/** Add a stage to the aggregation pipeline

src/cursor/change_stream_cursor.ts

+14-30
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1-
import type { Document, Long, Timestamp } from '../bson';
1+
import type { Document } from '../bson';
22
import {
33
ChangeStream,
44
type ChangeStreamDocument,
55
type ChangeStreamEvents,
66
type OperationTime,
77
type ResumeToken
88
} from '../change_stream';
9+
import { type CursorResponse } from '../cmap/wire_protocol/responses';
910
import { INIT, RESPONSE } from '../constants';
1011
import type { MongoClient } from '../mongo_client';
11-
import type { TODO_NODE_3286 } from '../mongo_types';
1212
import { AggregateOperation } from '../operations/aggregate';
1313
import type { CollationOptions } from '../operations/command';
1414
import { executeOperation, type ExecutionResult } from '../operations/execute_operation';
@@ -26,25 +26,13 @@ export interface ChangeStreamCursorOptions extends AbstractCursorOptions {
2626
fullDocument?: string;
2727
}
2828

29-
/** @internal */
30-
export type ChangeStreamAggregateRawResult<TChange> = {
31-
$clusterTime: { clusterTime: Timestamp };
32-
cursor: {
33-
postBatchResumeToken: ResumeToken;
34-
ns: string;
35-
id: number | Long;
36-
} & ({ firstBatch: TChange[] } | { nextBatch: TChange[] });
37-
ok: 1;
38-
operationTime: Timestamp;
39-
};
40-
4129
/** @internal */
4230
export class ChangeStreamCursor<
4331
TSchema extends Document = Document,
4432
TChange extends Document = ChangeStreamDocument<TSchema>
4533
> extends AbstractCursor<TChange, ChangeStreamEvents> {
4634
_resumeToken: ResumeToken;
47-
startAtOperationTime?: OperationTime;
35+
startAtOperationTime: OperationTime | null = null;
4836
hasReceived?: boolean;
4937
resumeAfter: ResumeToken;
5038
startAfter: ResumeToken;
@@ -71,7 +59,7 @@ export class ChangeStreamCursor<
7159
this.pipeline = pipeline;
7260
this.options = options;
7361
this._resumeToken = null;
74-
this.startAtOperationTime = options.startAtOperationTime;
62+
this.startAtOperationTime = options.startAtOperationTime ?? null;
7563

7664
if (options.startAfter) {
7765
this.resumeToken = options.startAfter;
@@ -120,15 +108,13 @@ export class ChangeStreamCursor<
120108
this.hasReceived = true;
121109
}
122110

123-
_processBatch(response: ChangeStreamAggregateRawResult<TChange>): void {
124-
const cursor = response.cursor;
125-
if (cursor.postBatchResumeToken) {
126-
this.postBatchResumeToken = response.cursor.postBatchResumeToken;
111+
_processBatch(response: CursorResponse): void {
112+
const { postBatchResumeToken } = response;
113+
if (postBatchResumeToken) {
114+
this.postBatchResumeToken = postBatchResumeToken;
127115

128-
const batch =
129-
'firstBatch' in response.cursor ? response.cursor.firstBatch : response.cursor.nextBatch;
130-
if (batch.length === 0) {
131-
this.resumeToken = cursor.postBatchResumeToken;
116+
if (response.batchSize === 0) {
117+
this.resumeToken = postBatchResumeToken;
132118
}
133119
}
134120
}
@@ -146,10 +132,7 @@ export class ChangeStreamCursor<
146132
session
147133
});
148134

149-
const response = await executeOperation<
150-
TODO_NODE_3286,
151-
ChangeStreamAggregateRawResult<TChange>
152-
>(session.client, aggregateOperation);
135+
const response = await executeOperation(session.client, aggregateOperation);
153136

154137
const server = aggregateOperation.server;
155138
this.maxWireVersion = maxWireVersion(server);
@@ -172,11 +155,12 @@ export class ChangeStreamCursor<
172155
return { server, session, response };
173156
}
174157

175-
override async getMore(batchSize: number): Promise<Document | null> {
158+
override async getMore(batchSize: number): Promise<CursorResponse> {
176159
const response = await super.getMore(batchSize);
177160

178161
this.maxWireVersion = maxWireVersion(this.server);
179-
this._processBatch(response as ChangeStreamAggregateRawResult<TChange>);
162+
// as ChangeStreamAggregateRawResult<TChange>
163+
this._processBatch(response);
180164

181165
this.emit(ChangeStream.MORE, response);
182166
this.emit(ChangeStream.RESPONSE);

0 commit comments

Comments
 (0)