Skip to content

Commit 142fbba

Browse files
authored
Merge pull request #5270 from Automattic/4.10
4.10
2 parents 8ba7869 + 4bc7b3e commit 142fbba

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1097
-293
lines changed

.eslintignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@ docs/
22
bin/
33
test/triage/
44
test/model.discriminator.test.js
5+
tools/
56
test/es6/

lib/aggregate.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22
* Module dependencies
33
*/
44

5-
var util = require('util');
6-
var utils = require('./utils');
5+
var AggregationCursor = require('./cursor/AggregationCursor');
76
var PromiseProvider = require('./promise_provider');
87
var Query = require('./query');
8+
var util = require('util');
9+
var utils = require('./utils');
910
var read = Query.prototype.read;
1011

1112
/**
@@ -631,6 +632,9 @@ Aggregate.prototype.exec = function(callback) {
631632
callback && callback(null, cursor);
632633
});
633634
});
635+
} else if (options.cursor.useMongooseAggCursor) {
636+
delete options.cursor.useMongooseAggCursor;
637+
return new AggregationCursor(this);
634638
}
635639
var cursor = this._model.collection.
636640
aggregate(this._pipeline, this.options || {});

lib/cast.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,9 @@ module.exports = function cast(schema, obj, options) {
179179
}
180180
throw new StrictModeError(path, 'Path "' + path + '" is not in ' +
181181
'schema, strict mode is `true`, and upsert is `true`.');
182+
} else if (options && options.strictQuery === 'throw') {
183+
throw new StrictModeError(path, 'Path "' + path + '" is not in ' +
184+
'schema and strictQuery is true.');
182185
}
183186
} else if (val === null || val === undefined) {
184187
obj[path] = null;

lib/cursor/AggregationCursor.js

Lines changed: 282 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,282 @@
1+
/*!
2+
* Module dependencies.
3+
*/
4+
5+
var PromiseProvider = require('../promise_provider');
6+
var Readable = require('stream').Readable;
7+
var util = require('util');
8+
9+
/**
10+
* An AggregationCursor is a concurrency primitive for processing aggregation
11+
* results one document at a time. It is analogous to QueryCursor.
12+
*
13+
* An AggregationCursor fulfills the [Node.js streams3 API](https://strongloop.com/strongblog/whats-new-io-js-beta-streams3/),
14+
* in addition to several other mechanisms for loading documents from MongoDB
15+
* one at a time.
16+
*
17+
* Unless you're an advanced user, do **not** instantiate this class directly.
18+
* Use [`Aggregate#cursor()`](/docs/api.html#aggregate_Aggregate-cursor) instead.
19+
*
20+
* @param {Aggregate} agg
21+
* @param {Object} options
22+
* @inherits Readable
23+
* @event `cursor`: Emitted when the cursor is created
24+
* @event `error`: Emitted when an error occurred
25+
* @event `data`: Emitted when the stream is flowing and the next doc is ready
26+
* @event `end`: Emitted when the stream is exhausted
27+
* @api public
28+
*/
29+
30+
function AggregationCursor(agg) {
31+
Readable.call(this, { objectMode: true });
32+
33+
this.cursor = null;
34+
this.agg = agg;
35+
this._transforms = [];
36+
var _this = this;
37+
var model = agg._model;
38+
model.collection.aggregate(agg._pipeline, agg.options || {}, function(err, cursor) {
39+
if (_this._error) {
40+
cursor.close(function() {});
41+
_this.listeners('error').length > 0 && _this.emit('error', _this._error);
42+
}
43+
if (err) {
44+
return _this.emit('error', err);
45+
}
46+
_this.cursor = cursor;
47+
_this.emit('cursor', cursor);
48+
});
49+
}
50+
51+
util.inherits(AggregationCursor, Readable);
52+
53+
/*!
54+
* Necessary to satisfy the Readable API
55+
*/
56+
57+
AggregationCursor.prototype._read = function() {
58+
var _this = this;
59+
_next(this, function(error, doc) {
60+
if (error) {
61+
return _this.emit('error', error);
62+
}
63+
if (!doc) {
64+
_this.push(null);
65+
_this.cursor.close(function(error) {
66+
if (error) {
67+
return _this.emit('error', error);
68+
}
69+
setTimeout(function() {
70+
_this.emit('close');
71+
}, 0);
72+
});
73+
return;
74+
}
75+
_this.push(doc);
76+
});
77+
};
78+
79+
/**
80+
* Registers a transform function which subsequently maps documents retrieved
81+
* via the streams interface or `.next()`
82+
*
83+
* ####Example
84+
*
85+
* // Map documents returned by `data` events
86+
* Thing.
87+
* find({ name: /^hello/ }).
88+
* cursor().
89+
* map(function (doc) {
90+
* doc.foo = "bar";
91+
* return doc;
92+
* })
93+
* on('data', function(doc) { console.log(doc.foo); });
94+
*
95+
* // Or map documents returned by `.next()`
96+
* var cursor = Thing.find({ name: /^hello/ }).
97+
* cursor().
98+
* map(function (doc) {
99+
* doc.foo = "bar";
100+
* return doc;
101+
* });
102+
* cursor.next(function(error, doc) {
103+
* console.log(doc.foo);
104+
* });
105+
*
106+
* @param {Function} fn
107+
* @return {QueryCursor}
108+
* @api public
109+
* @method map
110+
*/
111+
112+
AggregationCursor.prototype.map = function(fn) {
113+
this._transforms.push(fn);
114+
return this;
115+
};
116+
117+
/*!
118+
* Marks this cursor as errored
119+
*/
120+
121+
AggregationCursor.prototype._markError = function(error) {
122+
this._error = error;
123+
return this;
124+
};
125+
126+
/**
127+
* Marks this cursor as closed. Will stop streaming and subsequent calls to
128+
* `next()` will error.
129+
*
130+
* @param {Function} callback
131+
* @return {Promise}
132+
* @api public
133+
* @method close
134+
* @emits close
135+
* @see MongoDB driver cursor#close http://mongodb.github.io/node-mongodb-native/2.1/api/Cursor.html#close
136+
*/
137+
138+
AggregationCursor.prototype.close = function(callback) {
139+
var Promise = PromiseProvider.get();
140+
var _this = this;
141+
return new Promise.ES6(function(resolve, reject) {
142+
_this.cursor.close(function(error) {
143+
if (error) {
144+
callback && callback(error);
145+
reject(error);
146+
return _this.listeners('error').length > 0 &&
147+
_this.emit('error', error);
148+
}
149+
_this.emit('close');
150+
resolve();
151+
callback && callback();
152+
});
153+
});
154+
};
155+
156+
/**
157+
* Get the next document from this cursor. Will return `null` when there are
158+
* no documents left.
159+
*
160+
* @param {Function} callback
161+
* @return {Promise}
162+
* @api public
163+
* @method next
164+
*/
165+
166+
AggregationCursor.prototype.next = function(callback) {
167+
var Promise = PromiseProvider.get();
168+
var _this = this;
169+
return new Promise.ES6(function(resolve, reject) {
170+
_next(_this, function(error, doc) {
171+
if (error) {
172+
callback && callback(error);
173+
return reject(error);
174+
}
175+
callback && callback(null, doc);
176+
resolve(doc);
177+
});
178+
});
179+
};
180+
181+
/**
182+
* Execute `fn` for every document in the cursor. If `fn` returns a promise,
183+
* will wait for the promise to resolve before iterating on to the next one.
184+
* Returns a promise that resolves when done.
185+
*
186+
* @param {Function} fn
187+
* @param {Function} [callback] executed when all docs have been processed
188+
* @return {Promise}
189+
* @api public
190+
* @method eachAsync
191+
*/
192+
193+
AggregationCursor.prototype.eachAsync = function(fn, callback) {
194+
var Promise = PromiseProvider.get();
195+
var _this = this;
196+
197+
var handleNextResult = function(doc, callback) {
198+
var promise = fn(doc);
199+
if (promise && typeof promise.then === 'function') {
200+
promise.then(
201+
function() { callback(null); },
202+
function(error) { callback(error); });
203+
} else {
204+
callback(null);
205+
}
206+
};
207+
208+
var iterate = function(callback) {
209+
return _next(_this, function(error, doc) {
210+
if (error) {
211+
return callback(error);
212+
}
213+
if (!doc) {
214+
return callback(null);
215+
}
216+
handleNextResult(doc, function(error) {
217+
if (error) {
218+
return callback(error);
219+
}
220+
// Make sure to clear the stack re: gh-4697
221+
setTimeout(function() {
222+
iterate(callback);
223+
}, 0);
224+
});
225+
});
226+
};
227+
228+
return new Promise.ES6(function(resolve, reject) {
229+
iterate(function(error) {
230+
if (error) {
231+
callback && callback(error);
232+
return reject(error);
233+
}
234+
callback && callback(null);
235+
return resolve();
236+
});
237+
});
238+
};
239+
240+
/*!
241+
* Get the next doc from the underlying cursor and mongooseify it
242+
* (populate, etc.)
243+
*/
244+
245+
function _next(ctx, cb) {
246+
var callback = cb;
247+
if (ctx._transforms.length) {
248+
callback = function(err, doc) {
249+
if (err || doc === null) {
250+
return cb(err, doc);
251+
}
252+
cb(err, ctx._transforms.reduce(function(doc, fn) {
253+
return fn(doc);
254+
}, doc));
255+
};
256+
}
257+
258+
if (ctx._error) {
259+
return process.nextTick(function() {
260+
callback(ctx._error);
261+
});
262+
}
263+
264+
if (ctx.cursor) {
265+
return ctx.cursor.next(function(error, doc) {
266+
if (error) {
267+
return callback(error);
268+
}
269+
if (!doc) {
270+
return callback(null, null);
271+
}
272+
273+
callback(null, doc);
274+
});
275+
} else {
276+
ctx.once('cursor', function() {
277+
_next(ctx, cb);
278+
});
279+
}
280+
}
281+
282+
module.exports = AggregationCursor;

0 commit comments

Comments
 (0)