Skip to content

Commit 82bd654

Browse files
Merge pull request #692 from stephenplusplus/spp--implement-streamrouter
streamrouter: implement across library
2 parents 7dc8310 + 3c379df commit 82bd654

28 files changed

+1558
-764
lines changed

lib/bigquery/index.js

Lines changed: 151 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
'use strict';
2222

2323
var extend = require('extend');
24-
var streamEvents = require('stream-events');
25-
var through = require('through2');
2624

2725
/**
2826
* @type {module:bigquery/dataset}
@@ -36,6 +34,12 @@ var Dataset = require('./dataset.js');
3634
*/
3735
var Job = require('./job.js');
3836

37+
/**
38+
* @type {module:common/streamrouter}
39+
* @private
40+
*/
41+
var streamRouter = require('../common/stream-router.js');
42+
3943
/**
4044
* @type {module:bigquery/table}
4145
* @private
@@ -158,15 +162,55 @@ BigQuery.prototype.dataset = function(id) {
158162
*
159163
* @param {object=} query - Configuration object.
160164
* @param {boolean} query.all - List all datasets, including hidden ones.
165+
* @param {boolean} query.autoPaginate - Have pagination handled automatically.
166+
* Default: true.
161167
* @param {number} query.maxResults - Maximum number of results to return.
162168
* @param {string} query.pageToken - Token returned from a previous call, to
163169
* request the next page of results.
164170
* @param {function} callback - The callback function.
165171
*
166172
* @example
167-
* bigquery.getDatasets(function(err, datasets, nextQuery, apiResponse) {
168-
* // If `nextQuery` is non-null, there are more results to fetch.
173+
* bigquery.getDatasets(function(err, datasets) {
174+
* if (!err) {
175+
* // datasets is an array of Dataset objects.
176+
* }
169177
* });
178+
*
179+
* //-
180+
* // To control how many API requests are made and page through the results
181+
* // manually, set `autoPaginate` to `false`.
182+
* //-
183+
* var callback = function(err, datasets, nextQuery, apiResponse) {
184+
* if (nextQuery) {
185+
* // More results exist.
186+
* bigquery.getDatasets(nextQuery, callback);
187+
* }
188+
* };
189+
*
190+
* bigquery.getDatasets({
191+
* autoPaginate: false
192+
* }, callback);
193+
*
194+
* //-
195+
* // Get the datasets from your project as a readable object stream.
196+
* //-
197+
* bigquery.getDatasets()
198+
* .on('error', console.error)
199+
* .on('data', function(dataset) {
200+
* // dataset is a Dataset object.
201+
* })
202+
* .on('end', function() {
203+
* // All datasets retrieved.
204+
* });
205+
*
206+
* //-
207+
* // If you anticipate many results, you can end a stream early to prevent
208+
* // unnecessary processing and API requests.
209+
* //-
210+
* bigquery.getDatasets()
211+
* .on('data', function(dataset) {
212+
* this.end();
213+
* });
170214
*/
171215
BigQuery.prototype.getDatasets = function(query, callback) {
172216
var that = this;
@@ -208,6 +252,8 @@ BigQuery.prototype.getDatasets = function(query, callback) {
208252
* @param {object=} options - Configuration object.
209253
* @param {boolean=} options.allUsers - Display jobs owned by all users in the
210254
* project.
255+
* @param {boolean} options.autoPaginate - Have pagination handled
256+
* automatically. Default: true.
211257
* @param {number=} options.maxResults - Maximum number of results to return.
212258
* @param {string=} options.pageToken - Token returned from a previous call, to
213259
* request the next page of results.
@@ -219,9 +265,47 @@ BigQuery.prototype.getDatasets = function(query, callback) {
219265
* @param {function} callback - The callback function.
220266
*
221267
* @example
222-
* bigquery.getJobs(function(err, jobs, nextQuery, apiResponse) {
223-
* // If `nextQuery` is non-null, there are more results to fetch.
268+
* bigquery.getJobs(function(err, jobs) {
269+
* if (!err) {
270+
* // jobs is an array of Job objects.
271+
* }
224272
* });
273+
*
274+
* //-
275+
* // To control how many API requests are made and page through the results
276+
* // manually, set `autoPaginate` to `false`.
277+
* //-
278+
* var callback = function(err, jobs, nextQuery, apiRespose) {
279+
* if (nextQuery) {
280+
* // More results exist.
281+
* bigquery.getJobs(nextQuery, callback);
282+
* }
283+
* };
284+
*
285+
* bigquery.getJobs({
286+
* autoPaginate: false
287+
* }, callback);
288+
*
289+
* //-
290+
* // Get the jobs from your project as a readable object stream.
291+
* //-
292+
* bigquery.getJobs()
293+
* .on('error', console.error)
294+
* .on('data', function(job) {
295+
* // job is a Job object.
296+
* })
297+
* .on('end', function() {
298+
* // All jobs retrieved.
299+
* });
300+
*
301+
* //-
302+
* // If you anticipate many results, you can end a stream early to prevent
303+
* // unnecessary processing and API requests.
304+
* //-
305+
* bigquery.getJobs()
306+
* .on('data', function(job) {
307+
* this.end();
308+
* });
225309
*/
226310
BigQuery.prototype.getJobs = function(options, callback) {
227311
var that = this;
@@ -270,31 +354,6 @@ BigQuery.prototype.job = function(id) {
270354
return new Job(this, id);
271355
};
272356

273-
/*! Developer Documentation
274-
*
275-
* The `query` method is dual-purpose, like the use cases for a query.
276-
* Sometimes, a user will want to fetch results from their table in a serial
277-
* manner (get results -> more results exist? -> get more results, repeat.) --
278-
* other times, a user may want to wave their hands at making repeated calls to
279-
* get all of the rows, instead using a stream.
280-
*
281-
* A couple different libraries are used to cover the stream case:
282-
*
283-
* var stream = streamEvents(through2.obj());
284-
*
285-
* - streamEvents - https://github.com/stephenplusplus/stream-events
286-
* This library enables us to wait until our stream is being asked for
287-
* data, before making any API calls. It is possible a user will get a
288-
* stream, then not end up running it - or, it will be run later, at a
289-
* time when the token returned from the API call could have expired.
290-
* Using this library ensures we wait until the last possible chance to
291-
* get that token.
292-
*
293-
* - through2 - https://github.com/rvagg/through2
294-
* This is a popular library for how simple it makes dealing with the
295-
* complicated Node.js Streams API. We're creating an object stream, as
296-
* the data we are receiving from the API are rows of JSON data.
297-
*/
298357
/**
299358
* Run a query scoped to your project.
300359
*
@@ -310,48 +369,56 @@ BigQuery.prototype.job = function(id) {
310369
* queries for you, pushing each row to the stream.
311370
*
312371
* @param {string|object} options - A string SQL query or configuration object.
372+
* @param {boolean} options.autoPaginate - Have pagination handled
373+
* automatically. Default: true.
313374
* @param {number} options.maxResults - Maximum number of results to read.
314375
* @param {string} options.query - A query string, following the BigQuery query
315376
* syntax, of the query to execute.
316377
* @param {number} options.timeoutMs - How long to wait for the query to
317378
* complete, in milliseconds, before returning. Default is to return
318379
* immediately. If the timeout passes before the job completes, the request
319380
* will fail with a `TIMEOUT` error.
320-
* @param {function=} callback - The callback function. If you intend to
321-
* continuously run this query until all results are in as part of a stream,
322-
* do not pass a callback.
381+
* @param {function=} callback - The callback function.
323382
*
324383
* @example
325384
* var query = 'SELECT url FROM [publicdata:samples.github_nested] LIMIT 100';
326385
*
386+
* bigquery.query(query, function(err, rows) {
387+
* if (!err) {
388+
* // Handle results here.
389+
* }
390+
* });
391+
*
327392
* //-
328-
* // You can run a query against your data in a serial manner.
393+
* // To control how many API requests are made and page through the results
394+
* // manually, set `autoPaginate` to `false`.
329395
* //-
330-
* bigquery.query(query, function(err, rows, nextQuery, apiResponse) {
331-
* // Handle results here.
396+
* var callback = function(err, rows, nextQuery, apiResponse) {
332397
* if (nextQuery) {
333-
* bigquery.query(nextQuery, function(err, rows, nextQuery, apiResponse) {
334-
* // Handle more results here.
335-
* });
398+
* bigquery.query(nextQuery, callback);
336399
* }
337-
* });
400+
* };
401+
*
402+
* bigquery.query({
403+
* query: query,
404+
* autoPaginate: false
405+
* }, callback);
338406
*
339407
* //-
340408
* // You can also use the `query` method as a readable object stream by
341409
* // omitting the callback.
342410
* //-
343-
* var through2 = require('through2');
344-
*
345411
* bigquery.query(query)
346-
* .pipe(through2.obj(function(row, enc, next) {
347-
* this.push(row.url += '?trackingCode=AZ19b\n');
348-
* next();
349-
* }))
350-
* .pipe(process.stdout);
412+
* .on('error', console.error)
413+
* .on('data', function(row) {
414+
* // row is a result from your query.
415+
* })
416+
* .on('end', function() {
417+
* // All rows retrieved.
418+
* });
351419
*/
352420
BigQuery.prototype.query = function(options, callback) {
353421
var that = this;
354-
var stream;
355422

356423
if (util.is(options, 'string')) {
357424
options = {
@@ -366,79 +433,42 @@ BigQuery.prototype.query = function(options, callback) {
366433
var requestQuery = extend({}, options);
367434
delete requestQuery.job;
368435

369-
if (!util.is(callback, 'function')) {
370-
stream = streamEvents(through.obj());
371-
stream.once('reading', runQuery);
372-
return stream;
436+
if (job) {
437+
// Get results of the query.
438+
var path = '/queries/' + job.id;
439+
that.makeReq_('GET', path, requestQuery, null, responseHandler);
373440
} else {
374-
callback = callback || util.noop;
375-
runQuery();
441+
// Create a job.
442+
that.makeReq_('POST', '/queries', null, options, responseHandler);
376443
}
377444

378-
function runQuery() {
379-
if (job) {
380-
// Get results of the query.
381-
var path = '/queries/' + job.id;
382-
that.makeReq_('GET', path, requestQuery, null, responseHandler);
383-
} else {
384-
// Create a job.
385-
that.makeReq_('POST', '/queries', null, options, responseHandler);
445+
function responseHandler(err, resp) {
446+
if (err) {
447+
callback(err, null, null, resp);
448+
return;
386449
}
387450

388-
function responseHandler(err, resp) {
389-
if (err) {
390-
onComplete(err, null, null, resp);
391-
return;
392-
}
393-
394-
var rows = [];
395-
if (resp.schema && resp.rows) {
396-
rows = Table.mergeSchemaWithRows_(resp.schema, resp.rows);
397-
}
398-
399-
var nextQuery = null;
400-
if (resp.jobComplete === false) {
401-
// Query is still running.
402-
nextQuery = extend({}, options);
403-
} else if (resp.pageToken) {
404-
// More results exist.
405-
nextQuery = extend({}, options, {
406-
pageToken: resp.pageToken
407-
});
408-
}
409-
if (nextQuery && !nextQuery.job && resp.jobReference.jobId) {
410-
// Create a prepared Job to continue the query.
411-
nextQuery.job = that.job(resp.jobReference.jobId);
412-
}
413-
414-
onComplete(null, rows, nextQuery, resp);
451+
var rows = [];
452+
if (resp.schema && resp.rows) {
453+
rows = Table.mergeSchemaWithRows_(resp.schema, resp.rows);
415454
}
416455

417-
function onComplete(err, rows, nextQuery, resp) {
418-
if (err) {
419-
if (stream) {
420-
stream.emit('error', err);
421-
stream.end();
422-
} else {
423-
callback(err, null, null, resp);
424-
}
425-
return;
426-
}
427-
428-
if (stream) {
429-
rows.forEach(function(row) {
430-
stream.push(row);
431-
});
432-
433-
if (nextQuery) {
434-
that.query(nextQuery, onComplete);
435-
} else {
436-
stream.end();
437-
}
438-
} else {
439-
callback(null, rows, nextQuery, resp);
440-
}
456+
var nextQuery = null;
457+
if (resp.jobComplete === false) {
458+
// Query is still running.
459+
nextQuery = extend({}, options);
460+
} else if (resp.pageToken) {
461+
// More results exist.
462+
nextQuery = extend({}, options, {
463+
pageToken: resp.pageToken
464+
});
465+
}
466+
if (nextQuery && !nextQuery.job && resp.jobReference.jobId) {
467+
// Create a prepared Job to continue the query.
468+
nextQuery.job = that.job(resp.jobReference.jobId);
441469
}
470+
471+
callback(null, rows, nextQuery, resp);
442472
}
443473
};
444474

@@ -564,4 +594,11 @@ BigQuery.prototype.makeReq_ = function(method, path, query, body, callback) {
564594
this.makeAuthorizedRequest_(reqOpts, callback);
565595
};
566596

597+
/*! Developer Documentation
598+
*
599+
* These methods can be used with either a callback or as a readable object
600+
* stream. `streamRouter` is used to add this dual behavior.
601+
*/
602+
streamRouter.extend(BigQuery, ['getDatasets', 'getJobs', 'query']);
603+
567604
module.exports = BigQuery;

0 commit comments

Comments
 (0)