Skip to content

Aggregation cursor #5253

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions lib/aggregate.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
* Module dependencies
*/

var util = require('util');
var utils = require('./utils');
var AggregationCursor = require('./cursor/AggregationCursor');
var PromiseProvider = require('./promise_provider');
var Query = require('./query');
var util = require('util');
var utils = require('./utils');
var read = Query.prototype.read;

/**
Expand Down Expand Up @@ -631,6 +632,9 @@ Aggregate.prototype.exec = function(callback) {
callback && callback(null, cursor);
});
});
} else if (options.cursor.useMongooseAggCursor) {
delete options.cursor.useMongooseAggCursor;
return new AggregationCursor(this);
}
var cursor = this._model.collection.
aggregate(this._pipeline, this.options || {});
Expand Down
282 changes: 282 additions & 0 deletions lib/cursor/AggregationCursor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
/*!
* Module dependencies.
*/

var PromiseProvider = require('../promise_provider');
var Readable = require('stream').Readable;
var util = require('util');

/**
* An AggregationCursor is a concurrency primitive for processing aggregation
* results one document at a time. It is analogous to QueryCursor.
*
* An AggregationCursor fulfills the [Node.js streams3 API](https://strongloop.com/strongblog/whats-new-io-js-beta-streams3/),
* in addition to several other mechanisms for loading documents from MongoDB
* one at a time.
*
* Unless you're an advanced user, do **not** instantiate this class directly.
* Use [`Aggregate#cursor()`](/docs/api.html#aggregate_Aggregate-cursor) instead.
*
* @param {Aggregate} agg
* @param {Object} options
* @inherits Readable
* @event `cursor`: Emitted when the cursor is created
* @event `error`: Emitted when an error occurred
* @event `data`: Emitted when the stream is flowing and the next doc is ready
* @event `end`: Emitted when the stream is exhausted
* @api public
*/

function AggregationCursor(agg) {
Readable.call(this, { objectMode: true });

this.cursor = null;
this.agg = agg;
this._transforms = [];
var _this = this;
var model = agg._model;
model.collection.aggregate(agg._pipeline, agg.options || {}, function(err, cursor) {
if (_this._error) {
cursor.close(function() {});
_this.listeners('error').length > 0 && _this.emit('error', _this._error);
}
if (err) {
return _this.emit('error', err);
}
_this.cursor = cursor;
_this.emit('cursor', cursor);
});
}

util.inherits(AggregationCursor, Readable);

/*!
* Necessary to satisfy the Readable API
*/

AggregationCursor.prototype._read = function() {
var _this = this;
_next(this, function(error, doc) {
if (error) {
return _this.emit('error', error);
}
if (!doc) {
_this.push(null);
_this.cursor.close(function(error) {
if (error) {
return _this.emit('error', error);
}
setTimeout(function() {
_this.emit('close');
}, 0);
});
return;
}
_this.push(doc);
});
};

/**
* Registers a transform function which subsequently maps documents retrieved
* via the streams interface or `.next()`
*
* ####Example
*
* // Map documents returned by `data` events
* Thing.
* find({ name: /^hello/ }).
* cursor().
* map(function (doc) {
* doc.foo = "bar";
* return doc;
* })
* on('data', function(doc) { console.log(doc.foo); });
*
* // Or map documents returned by `.next()`
* var cursor = Thing.find({ name: /^hello/ }).
* cursor().
* map(function (doc) {
* doc.foo = "bar";
* return doc;
* });
* cursor.next(function(error, doc) {
* console.log(doc.foo);
* });
*
* @param {Function} fn
* @return {QueryCursor}
* @api public
* @method map
*/

AggregationCursor.prototype.map = function(fn) {
this._transforms.push(fn);
return this;
};

/*!
* Marks this cursor as errored
*/

AggregationCursor.prototype._markError = function(error) {
this._error = error;
return this;
};

/**
* Marks this cursor as closed. Will stop streaming and subsequent calls to
* `next()` will error.
*
* @param {Function} callback
* @return {Promise}
* @api public
* @method close
* @emits close
* @see MongoDB driver cursor#close http://mongodb.github.io/node-mongodb-native/2.1/api/Cursor.html#close
*/

AggregationCursor.prototype.close = function(callback) {
var Promise = PromiseProvider.get();
var _this = this;
return new Promise.ES6(function(resolve, reject) {
_this.cursor.close(function(error) {
if (error) {
callback && callback(error);
reject(error);
return _this.listeners('error').length > 0 &&
_this.emit('error', error);
}
_this.emit('close');
resolve();
callback && callback();
});
});
};

/**
* Get the next document from this cursor. Will return `null` when there are
* no documents left.
*
* @param {Function} callback
* @return {Promise}
* @api public
* @method next
*/

AggregationCursor.prototype.next = function(callback) {
var Promise = PromiseProvider.get();
var _this = this;
return new Promise.ES6(function(resolve, reject) {
_next(_this, function(error, doc) {
if (error) {
callback && callback(error);
return reject(error);
}
callback && callback(null, doc);
resolve(doc);
});
});
};

/**
* Execute `fn` for every document in the cursor. If `fn` returns a promise,
* will wait for the promise to resolve before iterating on to the next one.
* Returns a promise that resolves when done.
*
* @param {Function} fn
* @param {Function} [callback] executed when all docs have been processed
* @return {Promise}
* @api public
* @method eachAsync
*/

AggregationCursor.prototype.eachAsync = function(fn, callback) {
var Promise = PromiseProvider.get();
var _this = this;

var handleNextResult = function(doc, callback) {
var promise = fn(doc);
if (promise && typeof promise.then === 'function') {
promise.then(
function() { callback(null); },
function(error) { callback(error); });
} else {
callback(null);
}
};

var iterate = function(callback) {
return _next(_this, function(error, doc) {
if (error) {
return callback(error);
}
if (!doc) {
return callback(null);
}
handleNextResult(doc, function(error) {
if (error) {
return callback(error);
}
// Make sure to clear the stack re: gh-4697
setTimeout(function() {
iterate(callback);
}, 0);
});
});
};

return new Promise.ES6(function(resolve, reject) {
iterate(function(error) {
if (error) {
callback && callback(error);
return reject(error);
}
callback && callback(null);
return resolve();
});
});
};

/*!
* Get the next doc from the underlying cursor and mongooseify it
* (populate, etc.)
*/

function _next(ctx, cb) {
var callback = cb;
if (ctx._transforms.length) {
callback = function(err, doc) {
if (err || doc === null) {
return cb(err, doc);
}
cb(err, ctx._transforms.reduce(function(doc, fn) {
return fn(doc);
}, doc));
};
}

if (ctx._error) {
return process.nextTick(function() {
callback(ctx._error);
});
}

if (ctx.cursor) {
return ctx.cursor.next(function(error, doc) {
if (error) {
return callback(error);
}
if (!doc) {
return callback(null, null);
}

callback(null, doc);
});
} else {
ctx.once('cursor', function() {
_next(ctx, cb);
});
}
}

module.exports = AggregationCursor;
13 changes: 13 additions & 0 deletions test/aggregate.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,19 @@ describe('aggregate: ', function() {
});
});

it('cursor() with useMongooseAggCursor (gh-5145)', function(done) {
var db = start();

var MyModel = db.model('gh5145', { name: String });

var cursor = MyModel.
aggregate([{ $match: { name: 'test' } }]).
cursor({ useMongooseAggCursor: true }).
exec();
assert.ok(cursor instanceof require('stream').Readable);
done();
});

it('cursor() eachAsync (gh-4300)', function(done) {
var db = start();

Expand Down