Skip to content

Commit 7ed6f9d

Browse files
manually run runQuery as a stream
1 parent 148e1df commit 7ed6f9d

File tree

5 files changed

+122
-74
lines changed

5 files changed

+122
-74
lines changed

lib/datastore/index.js

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -192,19 +192,41 @@ var util = require('../common/util.js');
192192
* //-
193193
* // <h3>Paginating Records</h3>
194194
* //
195-
* // By default, all records are returned at once. To control pagination, you
196-
* // can disable this on your query with {module:datastore/query#autoPaginate}.
195+
* // Imagine building a website that allows a user to sift through hundreds of
196+
* // their contacts. You'll likely want to only display a subset of these at
197+
* // once, so you set a limit.
197198
* //-
198-
* query.autoPaginate(false);
199+
* var express = require('express');
200+
* var app = express();
199201
*
200-
* datastore.runQuery(query, function(err, entities, nextQuery) {
201-
* // entities = [...];
202+
* var NUM_RESULTS_PER_PAGE = 15;
202203
*
203-
* if (nextQuery) {
204-
* // Run `datastore.runQuery` again with the prepared query to retrieve the
205-
* // next set of results.
206-
* datastore.runQuery(nextQuery, function(err, entities, nextQuery) {});
204+
* app.get('/contacts', function(req, res) {
205+
* var query = datastore.createQuery('Contacts')
206+
* .limit(NUM_RESULTS_PER_PAGE);
207+
*
208+
* if (req.query.startCursor) {
209+
* query.start(req.query.startCursor);
207210
* }
211+
*
212+
* datastore.runQuery(query, function(err, entities, info) {
213+
* if (err) {
214+
* // Error handling omitted...
215+
* return;
216+
* }
217+
*
218+
* // Respond to the front end with the contacts and the cursoring token
219+
* // from the query we just ran.
220+
* var frontEndResponse = {
221+
* contacts: entities
222+
* };
223+
*
224+
* if (info.moreResults !== 'NO_MORE_RESULTS') {
225+
* frontEndResponse.startCursor = info.endCursor;
226+
* }
227+
*
228+
* res.render('contacts', frontEndResponse);
229+
* });
208230
* });
209231
*
210232
* //-

lib/datastore/query.js

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -287,35 +287,29 @@ Query.prototype.offset = function(n) {
287287
* stream instance is returned.
288288
* @param {?error} callback.err - An error returned while making this request
289289
* @param {object[]} callback.entities - A list of entities.
290-
* @param {?object} callback.nextQuery - If present, query with this object to
291-
* check for more results.
290+
* @param {object} callback.info - An object useful for pagination.
291+
* @param {?string} callback.info.endCursor - Use this in a follow-up query to
292+
* begin from where these results ended.
293+
* @param {string} callback.info.moreResults - Datastore responds with one of:
294+
* - `MORE_RESULTS_AFTER_LIMIT`: There *may* be more results after the
295+
* specified limit.
296+
* - `MORE_RESULTS_AFTER_CURSOR`: There *may* be more results after the
297+
* specified end cursor.
298+
* - `NO_MORE_RESULTS`: There are no more results.
292299
* @param {object} callback.apiResponse - The full API response.
293300
*
294301
* @example
295-
* query.run(function(err, entities) {});
302+
* query.run(function(err, entities, info, apiResponse) {});
296303
*
297304
* //-
298-
* // To control how many API requests are made and page through the results
299-
* // manually, call `autoPaginate(false)` on your query.
300-
* //-
301-
* query.autoPaginate(false);
302-
*
303-
* function callback(err, entities, nextQuery, apiResponse) {
304-
* if (nextQuery) {
305-
* // More results might exist.
306-
* nextQuery.run(callback);
307-
* }
308-
* }
309-
*
310-
* query.run(callback);
311-
*
312-
* //-
313-
* // If you omit the callback, `run` will automatically call subsequent queries
314-
* // until no results remain. Entity objects will be pushed as they are found.
305+
* // If you omit the callback, you will get the matching entities in a readable
306+
* // object stream.
315307
* //-
316308
* query.run()
317309
* .on('error', console.error)
318310
* .on('data', function (entity) {})
311+
* .on('info', function(info) {})
312+
* .on('response', function(apiResponse) {})
319313
* .on('end', function() {
320314
* // All entities retrieved.
321315
* });

lib/datastore/request.js

Lines changed: 60 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -383,8 +383,15 @@ DatastoreRequest.prototype.insert = function(entities, callback) {
383383
* stream instance is returned.
384384
* @param {?error} callback.err - An error returned while making this request
385385
* @param {object[]} callback.entities - A list of entities.
386-
* @param {?object} callback.nextQuery - If present, query with this object to
387-
* check for more results.
386+
* @param {object} callback.info - An object useful for pagination.
387+
* @param {?string} callback.info.endCursor - Use this in a follow-up query to
388+
* begin from where these results ended.
389+
* @param {string} callback.info.moreResults - Datastore responds with one of:
390+
* - `MORE_RESULTS_AFTER_LIMIT`: There *may* be more results after the
391+
* specified limit.
392+
* - `MORE_RESULTS_AFTER_CURSOR`: There *may* be more results after the
393+
* specified end cursor.
394+
* - `NO_MORE_RESULTS`: There are no more results.
388395
* @param {object} callback.apiResponse - The full API response.
389396
*
390397
* @example
@@ -394,7 +401,7 @@ DatastoreRequest.prototype.insert = function(entities, callback) {
394401
* //-
395402
* var query = datastore.createQuery('Lion');
396403
*
397-
* datastore.runQuery(query, function(err, entities) {});
404+
* datastore.runQuery(query, function(err, entities, info, apiResponse) {});
398405
*
399406
* //-
400407
* // Or, if you're using a transaction object.
@@ -406,28 +413,14 @@ DatastoreRequest.prototype.insert = function(entities, callback) {
406413
* });
407414
*
408415
* //-
409-
* // To control how many API requests are made and page through the results
410-
* // manually, call `autoPaginate(false)` on your query.
411-
* //-
412-
* var manualPageQuery = datastore.createQuery('Lion').autoPaginate(false);
413-
*
414-
* function callback(err, entities, nextQuery, apiResponse) {
415-
* if (nextQuery) {
416-
* // More results might exist.
417-
* transaction.runQuery(nextQuery, callback);
418-
* }
419-
* }
420-
*
421-
* datastore.runQuery(manualPageQuery, callback);
422-
*
423-
* //-
424-
* // If you omit the callback, runQuery will automatically call subsequent
425-
* // queries until no results remain. Entity objects will be pushed as they are
426-
* // found.
416+
* // If you omit the callback, you will get the matching entities in a readable
417+
* // object stream.
427418
* //-
428419
* datastore.runQuery(query)
429420
* .on('error', console.error)
430421
* .on('data', function (entity) {})
422+
* .on('info', function(info) {})
423+
* .on('response', function(apiResponse) {})
431424
* .on('end', function() {
432425
* // All entities retrieved.
433426
* });
@@ -453,15 +446,35 @@ DatastoreRequest.prototype.insert = function(entities, callback) {
453446
* });
454447
*/
455448
DatastoreRequest.prototype.runQuery = function(query, options, callback) {
456-
var self = this;
457-
458449
if (is.fn(options)) {
459450
callback = options;
460451
options = {};
461452
}
462453

463454
options = options || {};
464455

456+
var apiResponse;
457+
var info;
458+
459+
if (is.fn(callback)) {
460+
// Run this method in stream mode and send the results back to the callback.
461+
this.runQuery(query, options)
462+
.on('error', callback)
463+
.on('response', function(apiResponse_) {
464+
apiResponse = apiResponse_;
465+
})
466+
.on('info', function(info_) {
467+
info = info_;
468+
})
469+
.pipe(concat(function(results) {
470+
callback(null, results, info, apiResponse);
471+
}));
472+
return;
473+
}
474+
475+
var self = this;
476+
var stream = through.obj();
477+
465478
var protoOpts = {
466479
service: 'Datastore',
467480
method: 'runQuery'
@@ -483,18 +496,12 @@ DatastoreRequest.prototype.runQuery = function(query, options, callback) {
483496
};
484497
}
485498

486-
var entities = [];
487-
488499
function onResponse(err, resp) {
489500
if (err) {
490-
callback(err, null, null, resp);
501+
stream.destroy(err);
491502
return;
492503
}
493504

494-
if (resp.batch.entityResults) {
495-
entities = entities.concat(entity.formatArray(resp.batch.entityResults));
496-
}
497-
498505
var info = {
499506
moreResults: resp.batch.moreResults
500507
};
@@ -503,7 +510,26 @@ DatastoreRequest.prototype.runQuery = function(query, options, callback) {
503510
info.endCursor = resp.batch.endCursor;
504511
}
505512

506-
if (resp.batch.moreResults === 'NOT_FINISHED') {
513+
var entities = [];
514+
515+
if (resp.batch.entityResults) {
516+
entities = entity.formatArray(resp.batch.entityResults);
517+
}
518+
519+
// Emit each result right away, then get the rest if necessary.
520+
split(entities, stream, function(streamEnded) {
521+
if (streamEnded) {
522+
return;
523+
}
524+
525+
if (resp.batch.moreResults !== 'NOT_FINISHED') {
526+
stream.emit('response', resp);
527+
stream.emit('info', info);
528+
stream.push(null);
529+
return;
530+
}
531+
532+
// The query is "NOT_FINISHED". Get the rest of the results.
507533
var offset = query.offsetVal === -1 ? 0 : query.offsetVal;
508534

509535
var continuationQuery = extend(true, new Query(), query)
@@ -517,13 +543,12 @@ DatastoreRequest.prototype.runQuery = function(query, options, callback) {
517543

518544
reqOpts.query = entity.queryToQueryProto(continuationQuery);
519545
self.request_(protoOpts, reqOpts, onResponse);
520-
return;
521-
}
522-
523-
callback(null, entities, info, resp);
546+
});
524547
}
525548

526549
this.request_(protoOpts, reqOpts, onResponse);
550+
551+
return stream;
527552
};
528553

529554
/**

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@
127127
"coveralls": "^2.11.2",
128128
"deep-strict-equal": "^0.1.0",
129129
"dox": "^0.8.0",
130+
"express": "^4.13.4",
130131
"glob": "^5.0.9",
131132
"globby": "^3.0.1",
132133
"istanbul": "^0.3.5",

system-test/datastore.js

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -439,9 +439,11 @@ describe.only('Datastore', function() {
439439
assert.ifError(err);
440440
assert.strictEqual(firstEntities.length, 5);
441441

442-
q.start(info.endCursor).limit(q.limitVal - firstEntities.length);
442+
var secondQ = datastore.createQuery('Character')
443+
.hasAncestor(ancestor)
444+
.start(info.endCursor);
443445

444-
datastore.runQuery(q, function(err, secondEntities) {
446+
datastore.runQuery(secondQ, function(err, secondEntities) {
445447
assert.ifError(err);
446448
assert.strictEqual(secondEntities.length, 3);
447449
done();
@@ -462,7 +464,7 @@ describe.only('Datastore', function() {
462464
});
463465
});
464466

465-
it.skip('should run a query as a stream', function(done) {
467+
it('should run a query as a stream', function(done) {
466468
var q = datastore.createQuery('Character').hasAncestor(ancestor);
467469

468470
var resultsReturned = 0;
@@ -476,7 +478,7 @@ describe.only('Datastore', function() {
476478
});
477479
});
478480

479-
it.skip('should not go over a limit with a stream', function(done) {
481+
it('should not go over a limit with a stream', function(done) {
480482
var limit = 3;
481483
var q = datastore.createQuery('Character')
482484
.hasAncestor(ancestor)
@@ -593,11 +595,12 @@ describe.only('Datastore', function() {
593595
assert.strictEqual(entities[0].data.name, 'Robb');
594596
assert.strictEqual(entities[2].data.name, 'Catelyn');
595597

596-
q.start(info.endCursor)
597-
.limit(q.limitVal - entities.length)
598-
.offset(0);
598+
var secondQ = datastore.createQuery('Character')
599+
.hasAncestor(ancestor)
600+
.order('appearances')
601+
.start(info.endCursor);
599602

600-
datastore.runQuery(q, function(err, secondEntities) {
603+
datastore.runQuery(secondQ, function(err, secondEntities) {
601604
assert.ifError(err);
602605

603606
assert.strictEqual(secondEntities.length, 3);
@@ -619,9 +622,12 @@ describe.only('Datastore', function() {
619622
datastore.runQuery(q, function(err, entities, info) {
620623
assert.ifError(err);
621624

622-
q.start(info.endCursor).limit(-1).offset(-1);
625+
var secondQ = datastore.createQuery('Character')
626+
.hasAncestor(ancestor)
627+
.order('appearances')
628+
.start(info.endCursor);
623629

624-
datastore.runQuery(q, function(err, secondEntities, info) {
630+
datastore.runQuery(secondQ, function(err, secondEntities) {
625631
assert.ifError(err);
626632

627633
assert.strictEqual(secondEntities.length, 4);

0 commit comments

Comments
 (0)