Skip to content

Commit e8c8789

Browse files
committed
Merge branch '4.10' of github.com:Automattic/mongoose into 4.10
2 parents 9d4c9d4 + b8f0dd8 commit e8c8789

14 files changed

+381
-40
lines changed

lib/aggregate.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22
* Module dependencies
33
*/
44

5-
var util = require('util');
6-
var utils = require('./utils');
5+
var AggregationCursor = require('./cursor/AggregationCursor');
76
var PromiseProvider = require('./promise_provider');
87
var Query = require('./query');
8+
var util = require('util');
9+
var utils = require('./utils');
910
var read = Query.prototype.read;
1011

1112
/**
@@ -631,6 +632,9 @@ Aggregate.prototype.exec = function(callback) {
631632
callback && callback(null, cursor);
632633
});
633634
});
635+
} else if (options.cursor.useMongooseAggCursor) {
636+
delete options.cursor.useMongooseAggCursor;
637+
return new AggregationCursor(this);
634638
}
635639
var cursor = this._model.collection.
636640
aggregate(this._pipeline, this.options || {});

lib/cursor/AggregationCursor.js

Lines changed: 282 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,282 @@
1+
/*!
2+
* Module dependencies.
3+
*/
4+
5+
var PromiseProvider = require('../promise_provider');
6+
var Readable = require('stream').Readable;
7+
var util = require('util');
8+
9+
/**
10+
* An AggregationCursor is a concurrency primitive for processing aggregation
11+
* results one document at a time. It is analogous to QueryCursor.
12+
*
13+
* An AggregationCursor fulfills the [Node.js streams3 API](https://strongloop.com/strongblog/whats-new-io-js-beta-streams3/),
14+
* in addition to several other mechanisms for loading documents from MongoDB
15+
* one at a time.
16+
*
17+
* Unless you're an advanced user, do **not** instantiate this class directly.
18+
* Use [`Aggregate#cursor()`](/docs/api.html#aggregate_Aggregate-cursor) instead.
19+
*
20+
* @param {Aggregate} agg
21+
* @param {Object} options
22+
* @inherits Readable
23+
* @event `cursor`: Emitted when the cursor is created
24+
* @event `error`: Emitted when an error occurred
25+
* @event `data`: Emitted when the stream is flowing and the next doc is ready
26+
* @event `end`: Emitted when the stream is exhausted
27+
* @api public
28+
*/
29+
30+
function AggregationCursor(agg) {
31+
Readable.call(this, { objectMode: true });
32+
33+
this.cursor = null;
34+
this.agg = agg;
35+
this._transforms = [];
36+
var _this = this;
37+
var model = agg._model;
38+
model.collection.aggregate(agg._pipeline, agg.options || {}, function(err, cursor) {
39+
if (_this._error) {
40+
cursor.close(function() {});
41+
_this.listeners('error').length > 0 && _this.emit('error', _this._error);
42+
}
43+
if (err) {
44+
return _this.emit('error', err);
45+
}
46+
_this.cursor = cursor;
47+
_this.emit('cursor', cursor);
48+
});
49+
}
50+
51+
util.inherits(AggregationCursor, Readable);
52+
53+
/*!
54+
* Necessary to satisfy the Readable API
55+
*/
56+
57+
AggregationCursor.prototype._read = function() {
58+
var _this = this;
59+
_next(this, function(error, doc) {
60+
if (error) {
61+
return _this.emit('error', error);
62+
}
63+
if (!doc) {
64+
_this.push(null);
65+
_this.cursor.close(function(error) {
66+
if (error) {
67+
return _this.emit('error', error);
68+
}
69+
setTimeout(function() {
70+
_this.emit('close');
71+
}, 0);
72+
});
73+
return;
74+
}
75+
_this.push(doc);
76+
});
77+
};
78+
79+
/**
80+
* Registers a transform function which subsequently maps documents retrieved
81+
* via the streams interface or `.next()`
82+
*
83+
* ####Example
84+
*
85+
* // Map documents returned by `data` events
86+
* Thing.
87+
* find({ name: /^hello/ }).
88+
* cursor().
89+
* map(function (doc) {
90+
* doc.foo = "bar";
91+
* return doc;
92+
* })
93+
* on('data', function(doc) { console.log(doc.foo); });
94+
*
95+
* // Or map documents returned by `.next()`
96+
* var cursor = Thing.find({ name: /^hello/ }).
97+
* cursor().
98+
* map(function (doc) {
99+
* doc.foo = "bar";
100+
* return doc;
101+
* });
102+
* cursor.next(function(error, doc) {
103+
* console.log(doc.foo);
104+
* });
105+
*
106+
* @param {Function} fn
107+
* @return {QueryCursor}
108+
* @api public
109+
* @method map
110+
*/
111+
112+
AggregationCursor.prototype.map = function(fn) {
113+
this._transforms.push(fn);
114+
return this;
115+
};
116+
117+
/*!
118+
* Marks this cursor as errored
119+
*/
120+
121+
AggregationCursor.prototype._markError = function(error) {
122+
this._error = error;
123+
return this;
124+
};
125+
126+
/**
127+
* Marks this cursor as closed. Will stop streaming and subsequent calls to
128+
* `next()` will error.
129+
*
130+
* @param {Function} callback
131+
* @return {Promise}
132+
* @api public
133+
* @method close
134+
* @emits close
135+
* @see MongoDB driver cursor#close http://mongodb.github.io/node-mongodb-native/2.1/api/Cursor.html#close
136+
*/
137+
138+
AggregationCursor.prototype.close = function(callback) {
139+
var Promise = PromiseProvider.get();
140+
var _this = this;
141+
return new Promise.ES6(function(resolve, reject) {
142+
_this.cursor.close(function(error) {
143+
if (error) {
144+
callback && callback(error);
145+
reject(error);
146+
return _this.listeners('error').length > 0 &&
147+
_this.emit('error', error);
148+
}
149+
_this.emit('close');
150+
resolve();
151+
callback && callback();
152+
});
153+
});
154+
};
155+
156+
/**
157+
* Get the next document from this cursor. Will return `null` when there are
158+
* no documents left.
159+
*
160+
* @param {Function} callback
161+
* @return {Promise}
162+
* @api public
163+
* @method next
164+
*/
165+
166+
AggregationCursor.prototype.next = function(callback) {
167+
var Promise = PromiseProvider.get();
168+
var _this = this;
169+
return new Promise.ES6(function(resolve, reject) {
170+
_next(_this, function(error, doc) {
171+
if (error) {
172+
callback && callback(error);
173+
return reject(error);
174+
}
175+
callback && callback(null, doc);
176+
resolve(doc);
177+
});
178+
});
179+
};
180+
181+
/**
182+
* Execute `fn` for every document in the cursor. If `fn` returns a promise,
183+
* will wait for the promise to resolve before iterating on to the next one.
184+
* Returns a promise that resolves when done.
185+
*
186+
* @param {Function} fn
187+
* @param {Function} [callback] executed when all docs have been processed
188+
* @return {Promise}
189+
* @api public
190+
* @method eachAsync
191+
*/
192+
193+
AggregationCursor.prototype.eachAsync = function(fn, callback) {
194+
var Promise = PromiseProvider.get();
195+
var _this = this;
196+
197+
var handleNextResult = function(doc, callback) {
198+
var promise = fn(doc);
199+
if (promise && typeof promise.then === 'function') {
200+
promise.then(
201+
function() { callback(null); },
202+
function(error) { callback(error); });
203+
} else {
204+
callback(null);
205+
}
206+
};
207+
208+
var iterate = function(callback) {
209+
return _next(_this, function(error, doc) {
210+
if (error) {
211+
return callback(error);
212+
}
213+
if (!doc) {
214+
return callback(null);
215+
}
216+
handleNextResult(doc, function(error) {
217+
if (error) {
218+
return callback(error);
219+
}
220+
// Make sure to clear the stack re: gh-4697
221+
setTimeout(function() {
222+
iterate(callback);
223+
}, 0);
224+
});
225+
});
226+
};
227+
228+
return new Promise.ES6(function(resolve, reject) {
229+
iterate(function(error) {
230+
if (error) {
231+
callback && callback(error);
232+
return reject(error);
233+
}
234+
callback && callback(null);
235+
return resolve();
236+
});
237+
});
238+
};
239+
240+
/*!
241+
* Get the next doc from the underlying cursor and mongooseify it
242+
* (populate, etc.)
243+
*/
244+
245+
function _next(ctx, cb) {
246+
var callback = cb;
247+
if (ctx._transforms.length) {
248+
callback = function(err, doc) {
249+
if (err || doc === null) {
250+
return cb(err, doc);
251+
}
252+
cb(err, ctx._transforms.reduce(function(doc, fn) {
253+
return fn(doc);
254+
}, doc));
255+
};
256+
}
257+
258+
if (ctx._error) {
259+
return process.nextTick(function() {
260+
callback(ctx._error);
261+
});
262+
}
263+
264+
if (ctx.cursor) {
265+
return ctx.cursor.next(function(error, doc) {
266+
if (error) {
267+
return callback(error);
268+
}
269+
if (!doc) {
270+
return callback(null, null);
271+
}
272+
273+
callback(null, doc);
274+
});
275+
} else {
276+
ctx.once('cursor', function() {
277+
_next(ctx, cb);
278+
});
279+
}
280+
}
281+
282+
module.exports = AggregationCursor;

lib/schema.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -653,6 +653,10 @@ Schema.interpretAsType = function(path, obj, options) {
653653
'You can only nest using refs or arrays.');
654654
}
655655

656+
obj = utils.clone(obj, { retainKeyOrder: true });
657+
if (!('runSettersOnQuery' in obj)) {
658+
obj.runSettersOnQuery = options.runSettersOnQuery;
659+
}
656660
return new MongooseTypes[name](path, obj);
657661
};
658662

lib/schema/boolean.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,10 @@ SchemaBoolean.prototype.castForQuery = function($conditional, val) {
9090
return handler.call(this, val);
9191
}
9292

93-
return this.cast(val);
93+
return this._castForQuery(val);
9494
}
9595

96-
return this.cast($conditional);
96+
return this._castForQuery($conditional);
9797
};
9898

9999
/*!

lib/schema/buffer.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ SchemaBuffer.prototype.castForQuery = function($conditional, val) {
178178
return handler.call(this, val);
179179
}
180180
val = $conditional;
181-
var casted = this.cast(val);
181+
var casted = this._castForQuery(val);
182182
return casted ? casted.toObject({ transform: false, virtuals: false }) : casted;
183183
};
184184

lib/schema/date.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ SchemaDate.prototype.castForQuery = function($conditional, val) {
277277
var handler;
278278

279279
if (arguments.length !== 2) {
280-
return this.cast($conditional);
280+
return this._castForQuery($conditional);
281281
}
282282

283283
handler = this.$conditionalHandlers[$conditional];

lib/schema/decimal128.js

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -139,26 +139,6 @@ Decimal128.prototype.$conditionalHandlers =
139139
$lte: handleSingle
140140
});
141141

142-
/**
143-
* Casts contents for queries.
144-
*
145-
* @param {String} $conditional
146-
* @param {any} [val]
147-
* @api private
148-
*/
149-
150-
Decimal128.prototype.castForQuery = function($conditional, val) {
151-
var handler;
152-
if (arguments.length === 2) {
153-
handler = this.$conditionalHandlers[$conditional];
154-
if (!handler) {
155-
throw new Error('Can\'t use ' + $conditional + ' with ObjectId.');
156-
}
157-
return handler.call(this, val);
158-
}
159-
return this.cast($conditional);
160-
};
161-
162142
/*!
163143
* Module exports.
164144
*/

lib/schema/embedded.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,10 @@ Embedded.prototype.castForQuery = function($conditional, val) {
156156
return val;
157157
}
158158

159+
if (this.options.runSetters) {
160+
val = this._applySetters(val);
161+
}
162+
159163
return new this.caster(val);
160164
};
161165

lib/schema/number.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ SchemaNumber.prototype.castForQuery = function($conditional, val) {
279279
}
280280
return handler.call(this, val);
281281
}
282-
val = this.cast($conditional);
282+
val = this._castForQuery($conditional);
283283
return val;
284284
};
285285

0 commit comments

Comments
 (0)