Skip to content

Commit 5565d50

Browse files
perf(NODE-5906): optimize toArray to use batches (#4171)
Co-authored-by: Neal Beeken <[email protected]>
1 parent b3f3987 commit 5565d50

File tree

3 files changed

+110
-5
lines changed

3 files changed

+110
-5
lines changed

src/cursor/abstract_cursor.ts

+15-5
Original file line numberDiff line numberDiff line change
@@ -296,8 +296,8 @@ export abstract class AbstractCursor<
296296
}
297297

298298
/** Returns current buffered documents */
299-
readBufferedDocuments(number?: number): TSchema[] {
300-
const bufferedDocs: TSchema[] = [];
299+
readBufferedDocuments(number?: number): NonNullable<TSchema>[] {
300+
const bufferedDocs: NonNullable<TSchema>[] = [];
301301
const documentsToRead = Math.min(
302302
number ?? this.documents?.length ?? 0,
303303
this.documents?.length ?? 0
@@ -312,6 +312,7 @@ export abstract class AbstractCursor<
312312

313313
return bufferedDocs;
314314
}
315+
315316
async *[Symbol.asyncIterator](): AsyncGenerator<TSchema, void, void> {
316317
if (this.isClosed) {
317318
return;
@@ -475,13 +476,22 @@ export abstract class AbstractCursor<
475476
* cursor.rewind() can be used to reset the cursor.
476477
*/
477478
async toArray(): Promise<TSchema[]> {
478-
const array = [];
479+
const array: TSchema[] = [];
480+
// at the end of the loop (since readBufferedDocuments is called) the buffer will be empty
481+
// then, the 'await of' syntax will run a getMore call
479482
for await (const document of this) {
480483
array.push(document);
484+
const docs = this.readBufferedDocuments();
485+
if (this.transform != null) {
486+
for (const doc of docs) {
487+
array.push(await this.transformDocument(doc));
488+
}
489+
} else {
490+
array.push(...docs);
491+
}
481492
}
482493
return array;
483494
}
484-
485495
/**
486496
* Add a cursor flag to the cursor
487497
*
@@ -822,7 +832,7 @@ export abstract class AbstractCursor<
822832
}
823833

824834
/** @internal */
825-
private async transformDocument(document: NonNullable<TSchema>): Promise<TSchema> {
835+
private async transformDocument(document: NonNullable<TSchema>): Promise<NonNullable<TSchema>> {
826836
if (this.transform == null) return document;
827837

828838
try {

test/benchmarks/mongoBench/suites/multiBench.js

+61
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,67 @@ function makeMultiBench(suite) {
155155
})
156156
.teardown(dropDb)
157157
.teardown(disconnectClient)
158+
)
159+
.benchmark('aggregateAMillionDocumentsAndToArray', benchmark =>
160+
benchmark
161+
.taskSize(16)
162+
.setup(makeClient)
163+
.setup(connectClient)
164+
.setup(initDb)
165+
.setup(dropDb)
166+
.task(async function () {
167+
await this.db
168+
.aggregate([
169+
{ $documents: [{}] },
170+
{
171+
$set: {
172+
field: {
173+
$reduce: {
174+
input: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19],
175+
initialValue: [0],
176+
in: { $concatArrays: ['$$value', '$$value'] }
177+
}
178+
}
179+
}
180+
},
181+
{ $unwind: '$field' },
182+
{ $limit: 1000000 }
183+
])
184+
.toArray();
185+
})
186+
.teardown(dropDb)
187+
.teardown(disconnectClient)
188+
)
189+
.benchmark('aggregateAMillionTweetsAndToArray', benchmark =>
190+
benchmark
191+
.taskSize(1500)
192+
.setup(makeLoadJSON('tweet.json'))
193+
.setup(makeClient)
194+
.setup(connectClient)
195+
.setup(initDb)
196+
.setup(dropDb)
197+
.task(async function () {
198+
await this.db
199+
.aggregate([
200+
{ $documents: [this.doc] },
201+
{
202+
$set: {
203+
id: {
204+
$reduce: {
205+
input: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19],
206+
initialValue: [0],
207+
in: { $concatArrays: ['$$value', '$$value'] }
208+
}
209+
}
210+
}
211+
},
212+
{ $unwind: '$id' },
213+
{ $limit: 1000000 }
214+
])
215+
.toArray();
216+
})
217+
.teardown(dropDb)
218+
.teardown(disconnectClient)
158219
);
159220
}
160221

test/integration/node-specific/abstract_cursor.test.ts

+34
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { Transform } from 'stream';
55
import { inspect } from 'util';
66

77
import {
8+
AbstractCursor,
89
type Collection,
910
type FindCursor,
1011
MongoAPIError,
@@ -361,4 +362,37 @@ describe('class AbstractCursor', function () {
361362
});
362363
});
363364
});
365+
366+
describe('toArray', () => {
367+
let nextSpy;
368+
let client: MongoClient;
369+
let cursor: AbstractCursor;
370+
let col: Collection;
371+
const numBatches = 10;
372+
const batchSize = 4;
373+
374+
beforeEach(async function () {
375+
client = this.configuration.newClient();
376+
col = client.db().collection('test');
377+
await col.deleteMany({});
378+
for (let i = 0; i < numBatches; i++) {
379+
await col.insertMany([{ a: 1 }, { a: 2 }, { a: 3 }, { a: 4 }]);
380+
}
381+
nextSpy = sinon.spy(AbstractCursor.prototype, 'next');
382+
});
383+
384+
afterEach(async function () {
385+
sinon.restore();
386+
await cursor.close();
387+
await client.close();
388+
});
389+
390+
it('iterates per batch not per document', async () => {
391+
cursor = client.db().collection('test').find({}, { batchSize });
392+
await cursor.toArray();
393+
expect(nextSpy.callCount).to.equal(numBatches + 1);
394+
const numDocuments = numBatches * batchSize;
395+
expect(nextSpy.callCount).to.be.lessThan(numDocuments);
396+
});
397+
});
364398
});

0 commit comments

Comments
 (0)