Skip to content

Commit 16ff0d0

Browse files
streamrouter: support autoPaginate: true
1 parent a570367 commit 16ff0d0

File tree

18 files changed

+659
-105
lines changed

18 files changed

+659
-105
lines changed

lib/bigquery/index.js

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,8 @@ BigQuery.prototype.dataset = function(id) {
162162
*
163163
* @param {object=} query - Configuration object.
164164
* @param {boolean} query.all - List all datasets, including hidden ones.
165+
* @param {boolean} query.autoPaginate - Have pagination handled automatically.
166+
* Default: false.
165167
* @param {number} query.maxResults - Maximum number of results to return.
166168
* @param {string} query.pageToken - Token returned from a previous call, to
167169
* request the next page of results.
@@ -178,6 +180,16 @@ BigQuery.prototype.dataset = function(id) {
178180
* bigquery.getDatasets(callback);
179181
*
180182
* //-
183+
* // To have pagination handled for you, set `autoPaginate`. Note the changed
184+
* // callback parameters.
185+
* //-
186+
* bigquery.getDatasets({
187+
* autoPaginate: true
188+
* }, function(err, datasets) {
189+
* // Called after all datasets have been retrieved.
190+
* });
191+
*
192+
* //-
181193
* // Get the datasets from your project as a readable object stream.
182194
* //-
183195
* bigquery.getDatasets()
@@ -238,6 +250,8 @@ BigQuery.prototype.getDatasets = function(query, callback) {
238250
* @param {object=} options - Configuration object.
239251
* @param {boolean=} options.allUsers - Display jobs owned by all users in the
240252
* project.
253+
* @param {boolean} options.autoPaginate - Have pagination handled
254+
* automatically. Default: false.
241255
* @param {number=} options.maxResults - Maximum number of results to return.
242256
* @param {string=} options.pageToken - Token returned from a previous call, to
243257
* request the next page of results.
@@ -254,6 +268,15 @@ BigQuery.prototype.getDatasets = function(query, callback) {
254268
* });
255269
*
256270
* //-
271+
* // To have pagination handled for you, set `autoPaginate`. Note the changed
272+
* // callback parameters.
273+
* //-
274+
* bigquery.getJobs({
275+
* autoPaginate: true
276+
* }, function(err, jobs) {
277+
* // Called after all jobs have been retrieved.
278+
* });
279+
* //-
257280
* // Get the jobs from your project as a readable object stream.
258281
* //-
259282
* bigquery.getJobs()
@@ -336,16 +359,16 @@ BigQuery.prototype.job = function(id) {
336359
* queries for you, pushing each row to the stream.
337360
*
338361
* @param {string|object} options - A string SQL query or configuration object.
362+
* @param {boolean} options.autoPaginate - Have pagination handled
363+
* automatically. Default: false.
339364
* @param {number} options.maxResults - Maximum number of results to read.
340365
* @param {string} options.query - A query string, following the BigQuery query
341366
* syntax, of the query to execute.
342367
* @param {number} options.timeoutMs - How long to wait for the query to
343368
* complete, in milliseconds, before returning. Default is to return
344369
* immediately. If the timeout passes before the job completes, the request
345370
* will fail with a `TIMEOUT` error.
346-
* @param {function=} callback - The callback function. If you intend to
347-
* continuously run this query until all results are in as part of a stream,
348-
* do not pass a callback.
371+
* @param {function=} callback - The callback function.
349372
*
350373
* @example
351374
* var query = 'SELECT url FROM [publicdata:samples.github_nested] LIMIT 100';
@@ -364,6 +387,17 @@ BigQuery.prototype.job = function(id) {
364387
* bigquery.query(query, callback);
365388
*
366389
* //-
390+
* // To have pagination handled for you, set `autoPaginate`. Note the changed
391+
* // callback parameters.
392+
* //-
393+
* bigquery.query({
394+
* query: query,
395+
* autoPaginate: true
396+
* }, function(err, rows) {
397+
* // Called after all rows have been retrieved.
398+
* });
399+
*
400+
* //-
367401
* // You can also use the `query` method as a readable object stream by
368402
* // omitting the callback.
369403
* //-

lib/bigquery/job.js

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ Job.prototype.getMetadata = function(callback) {
9191
* Get the results of a job.
9292
*
9393
* @param {object=} options - Configuration object.
94+
* @param {boolean} options.autoPaginate - Have pagination handled
95+
* automatically. Default: false.
9496
* @param {number} options.maxResults - Maximum number of results to read.
9597
* @param {string} options.pageToken - Page token, returned by a previous call,
9698
* to request the next page of results. Note: This is automatically added to
@@ -105,27 +107,42 @@ Job.prototype.getMetadata = function(callback) {
105107
* do not pass a callback.
106108
*
107109
* @example
110+
* var callback = function(err, rows, nextQuery, apiResponse) {
111+
* if (nextQuery) {
112+
* // More results exist.
113+
* job.getQueryResults(nextQuery, callback);
114+
* }
115+
* };
116+
*
108117
* //-
109118
* // Use the default options to get the results of a query.
110119
* //-
111-
* job.getQueryResults(function(err, rows, nextQuery, apiResponse) {});
120+
* job.getQueryResults(callback);
112121
*
113122
* //-
114123
* // Customize the results you want to fetch.
115124
* //-
116-
* var options = {
125+
* job.getQueryResults({
117126
* maxResults: 100
118-
* };
127+
* }, callback);
119128
*
120-
* job.getQueryResults(options, function(err, rows, nextQuery, apiResponse) {});
129+
* //-
130+
* // To have pagination handled for you, set `autoPaginate`. Note the changed
131+
* // callback parameters.
132+
* //-
133+
* job.getQueryResults({
134+
* autoPaginate: true
135+
* }, function(err, rows) {
136+
* // Called after all rows have been retrieved.
137+
* });
121138
*
122139
* //-
123140
* // Consume the results from the query as a readable object stream.
124141
* //-
125142
* var through2 = require('through2');
126143
* var fs = require('fs');
127144
*
128-
* job.getQueryResults(options)
145+
* job.getQueryResults()
129146
* .pipe(through2.obj(function (row, enc, next) {
130147
* this.push(JSON.stringify(row) + '\n');
131148
* }))

lib/bigquery/table.js

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,8 @@ Table.prototype.getMetadata = function(callback) {
483483
* your callback as an array of objects matching your table's schema.
484484
*
485485
* @param {object=} options - The configuration object.
486+
* @param {boolean} options.autoPaginate - Have pagination handled
487+
* automatically. Default: false.
486488
* @param {number} options.maxResults - Maximum number of results to return.
487489
* @param {function} callback - The callback function.
488490
*
@@ -501,6 +503,16 @@ Table.prototype.getMetadata = function(callback) {
501503
* table.getRows(options, callback);
502504
*
503505
* //-
506+
* // To have pagination handled for you, set `autoPaginate`. Note the changed
507+
* // callback parameters.
508+
* //-
509+
* table.getRows({
510+
* autoPaginate: true
511+
* }, function(err, rows) {
512+
* // Called after all rows have been retrieved.
513+
* });
514+
*
515+
* //-
504516
* // Get the rows as a readable object stream.
505517
* //-
506518
* table.getRows(options)

lib/common/stream-router.js

Lines changed: 82 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
'use strict';
2222

23+
var concat = require('concat-stream');
2324
var streamEvents = require('stream-events');
2425
var through = require('through2');
2526

@@ -60,36 +61,100 @@ streamRouter.extend = function(Class, methodNames) {
6061
var originalMethod = Class.prototype[methodName];
6162

6263
Class.prototype[methodName] = function() {
63-
return streamRouter.router_(arguments, originalMethod.bind(this));
64+
var parsedArguments = streamRouter.parseArguments_(arguments);
65+
return streamRouter.router_(parsedArguments, originalMethod.bind(this));
6466
};
6567
});
6668
};
6769

6870
/**
69-
* The router accepts all incoming arguments to the overwritten method. If the
70-
* last argument is a function, simply pass them through to the original method.
71-
* If the last argument is not a function, activate stream mode.
71+
* Parse a pseudo-array `arguments` for a query and callback.
7272
*
73-
* Stream mode simply calls the nextQuery recursively. The stream ends when
74-
* `nextQuery` is null.
73+
* @param {array} args - The original `arguments` pseduo-array that the original
74+
* method received.
75+
*/
76+
streamRouter.parseArguments_ = function(args) {
77+
var parsedArguments = {};
78+
79+
var firstArgument = args[0];
80+
var lastArgument = args[args.length - 1];
81+
82+
if (util.is(firstArgument, 'function')) {
83+
parsedArguments.callback = firstArgument;
84+
} else {
85+
parsedArguments.query = firstArgument;
86+
}
87+
88+
if (util.is(lastArgument, 'function')) {
89+
parsedArguments.callback = lastArgument;
90+
}
91+
92+
return parsedArguments;
93+
};
94+
95+
/**
96+
* The router accepts a query and callback that were passed to the overwritten
97+
* method. If there's a callback, simply pass the query and/or callback through
98+
* to the original method. If there isn't a callback. stream mode is activated.
7599
*
76-
* @param {array} args - The original `arguments` pseudo-array as it was
77-
* received by the original method.
100+
* @param {array} parsedArguments - Parsed arguments from the original method
101+
* call.
102+
* @param {object=|string=} parsedArguments.query - Query object. This is most
103+
* commonly an object, but to make the API more simple, it can also be a
104+
* string in some places.
105+
* @param {function=} parsedArguments.callback - Callback function.
78106
* @param {function} originalMethod - The cached method that accepts a callback
79107
* and returns `nextQuery` to receive more results.
80108
* @return {undefined|stream}
81109
*/
82-
streamRouter.router_ = function(args, originalMethod) {
83-
args = util.toArray(args);
110+
streamRouter.router_ = function(parsedArguments, originalMethod) {
111+
var query = parsedArguments.query || {};
112+
var callback = parsedArguments.callback;
113+
114+
if (callback) {
115+
if (query.autoPaginate === true || query.autoPaginateVal === true) {
116+
delete query.autoPaginate;
117+
delete query.autoPaginateVal;
118+
119+
this.runAsStream_(query, originalMethod)
120+
.on('error', callback)
121+
.pipe(concat(function(results) {
122+
callback(null, results);
123+
}));
124+
} else {
125+
originalMethod(query, callback);
126+
}
127+
} else {
128+
return this.runAsStream_(query, originalMethod);
129+
}
130+
};
84131

85-
var firstArgument = args[0];
86-
var lastArgument = args[args.length - 1];
132+
/**
133+
* This method simply calls the nextQuery recursively, emitting results to a
134+
* stream. The stream ends when `nextQuery` is null.
135+
*
136+
* `maxResults` and `limitVal` (from Datastore) will act as a cap for how many
137+
* results are fetched and emitted to the stream.
138+
*
139+
* @param {object=|string=} query - Query object. This is most
140+
* commonly an object, but to make the API more simple, it can also be a
141+
* string in some places.
142+
* @param {function} originalMethod - The cached method that accepts a callback
143+
* and returns `nextQuery` to receive more results.
144+
* @return {stream} - Readable object stream.
145+
*/
146+
streamRouter.runAsStream_ = function(query, originalMethod) {
147+
query = query || {};
87148

88-
var isStreamMode = !util.is(lastArgument, 'function');
149+
var resultsToSend = -1;
89150

90-
if (!isStreamMode) {
91-
originalMethod.apply(null, args);
92-
return;
151+
// Check if the user only asked for a certain amount of results.
152+
if (util.is(query.maxResults, 'number')) {
153+
// `maxResults` is used API-wide.
154+
resultsToSend = query.maxResults;
155+
} else if (util.is(query.limitVal, 'number')) {
156+
// `limitVal` is part of a Datastore query.
157+
resultsToSend = query.limitVal;
93158
}
94159

95160
var stream = streamEvents(through.obj());
@@ -106,19 +171,6 @@ streamRouter.router_ = function(args, originalMethod) {
106171
_end.apply(this, arguments);
107172
};
108173

109-
var resultsToSend = -1;
110-
if (util.is(firstArgument, 'object')) {
111-
// `firstArgument` is a query object. Check if the user only asked for a
112-
// certain amount of results.
113-
if (util.is(firstArgument.maxResults, 'number')) {
114-
// `maxResults` is used API-wide.
115-
resultsToSend = firstArgument.maxResults;
116-
} else if (util.is(firstArgument.limitVal, 'number')) {
117-
// `limitVal` is part of a Datastore query.
118-
resultsToSend = firstArgument.limitVal;
119-
}
120-
}
121-
122174
function onResultSet(err, results, nextQuery) {
123175
if (err) {
124176
stream.emit('error', err);
@@ -149,14 +201,7 @@ streamRouter.router_ = function(args, originalMethod) {
149201
}
150202

151203
stream.once('reading', function() {
152-
if (util.is(lastArgument, 'undefined')) {
153-
// Replace it with onResultSet.
154-
args.splice(args.length - 1, 1, onResultSet);
155-
} else {
156-
args = args.concat(onResultSet);
157-
}
158-
159-
originalMethod.apply(null, args);
204+
originalMethod.call(null, query, onResultSet);
160205
});
161206

162207
return stream;

lib/datastore/query.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,22 @@ function Query(namespace, kinds) {
7373
this.selectVal = [];
7474

7575
// pagination
76+
this.autoPaginateVal = false;
7677
this.startVal = null;
7778
this.endVal = null;
7879
this.limitVal = -1;
7980
this.offsetVal = -1;
8081
}
8182

83+
/**
84+
* @return {module:datastore/query}
85+
*/
86+
Query.prototype.autoPaginate = function(autoPaginateVal) {
87+
var query = extend(new Query(), this);
88+
query.autoPaginateVal = autoPaginateVal !== false;
89+
return query;
90+
};
91+
8292
/**
8393
* Datastore allows querying on properties. Supported comparison operators
8494
* are `=`, `<`, `>`, `<=`, and `>=`. "Not equal" and `IN` operators are

lib/datastore/request.js

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,17 @@ DatastoreRequest.prototype.delete = function(keys, callback) {
503503
* transaction.runQuery(query, callback);
504504
*
505505
* //-
506+
* // To have pagination handled for you, call `autoPaginate()`. Note the
507+
* // changed callback parameters.
508+
* //-
509+
*
510+
* var queryWithAutoPagination = dataset.createQuery('Lion').autoPaginate();
511+
*
512+
* transaction.runQuery(queryWithAutoPagination, function(err, entities) {
513+
* // Called after all entities have been retrieved.
514+
* });
515+
*
516+
* //-
506517
* // If you omit the callback, runQuery will automatically call subsequent
507518
* // queries until no results remain. Entity objects will be pushed as they are
508519
* // found.

0 commit comments

Comments
 (0)