Skip to content

bigquery: implement Job as an event emitter #1294

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions lib/bigquery/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ BigQuery.prototype.job = function(id) {
* });
*/
BigQuery.prototype.query = function(options, callback) {
var that = this;
var self = this;

if (is.string(options)) {
options = {
Expand All @@ -485,13 +485,13 @@ BigQuery.prototype.query = function(options, callback) {

if (job) {
// Get results of the query.
that.request({
self.request({
uri: '/queries/' + job.id,
qs: requestQuery
}, responseHandler);
} else {
// Create a job.
that.request({
self.request({
method: 'POST',
uri: '/queries',
json: options
Expand Down Expand Up @@ -521,7 +521,7 @@ BigQuery.prototype.query = function(options, callback) {
}
if (nextQuery && !nextQuery.job && resp.jobReference.jobId) {
// Create a prepared Job to continue the query.
nextQuery.job = that.job(resp.jobReference.jobId);
nextQuery.job = self.job(resp.jobReference.jobId);
}

callback(null, rows, nextQuery, resp);
Expand Down
132 changes: 102 additions & 30 deletions lib/bigquery/job.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@

'use strict';

var events = require('events');
var is = require('is');
var nodeutil = require('util');
var modelo = require('modelo');

/**
* @type {module:common/service-object}
Expand Down Expand Up @@ -72,6 +73,30 @@ var util = require('../common/util.js');
* var bigquery = gcloud.bigquery();
*
* var job = bigquery.job('job-id');
*
* //-
* // All jobs are event emitters. The status of each job is polled
* // continuously, starting only after you register a "complete" listener.
* //-
* job.on('complete', function(metadata) {
* // The job is complete.
* });
*
* //-
* // Be sure to register an error handler as well to catch any issues which
* // impeded the job.
* //-
* job.on('error', function(err) {
* // An error occurred during the job.
* });
*
* //-
* // To force the Job object to stop polling for updates, simply remove any
* // "complete" listeners you've registered.
* //
* // The easiest way to do this is with `removeAllListeners()`.
* //-
* job.removeAllListeners();
*/
function Job(bigQuery, id) {
var methods = {
Expand Down Expand Up @@ -126,6 +151,8 @@ function Job(bigQuery, id) {
methods: methods
});

events.EventEmitter.call(this);

this.bigQuery = bigQuery;

// The API endpoint for cancel is: .../bigquery/v2/project/projectId/...
Expand All @@ -139,9 +166,14 @@ function Job(bigQuery, id) {
return reqOpts;
}
});

this.completeListeners = 0;
this.hasActiveListeners = false;

this.listenForEvents_();
}

nodeutil.inherits(Job, ServiceObject);
modelo.inherits(Job, ServiceObject, events.EventEmitter);

/**
* Cancel a job. Use {module:bigquery/job#getMetadata} to see if the cancel
Expand All @@ -156,35 +188,9 @@ nodeutil.inherits(Job, ServiceObject);
* @example
* job.cancel(function(err, apiResponse) {
* // Check to see if the job completes successfully.
* onJobComplete(function(err) {
* if (!err) {
* // Job cancelled successfully.
* }
* });
* job.on('error', function(err) {});
* job.on('complete', function(metadata) {});

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

* });
*
* function onJobComplete(callback) {
* // Start a loop to check the status of the operation.
* checkJobStatus();
*
* function checkJobStatus() {
* job.getMetadata(function(err, apiResponse) {
* if (err) {
* callback(err);
* return;
* }
*
* if (apiResponse.status.state !== 'DONE') {
* // Job has not completed yet. Check again in 3 seconds.
* setTimeout(checkJobStatus, 3000);
* return;
* }
*
* // Job completed sucessfully.
* callback();
* });
* }
* }
*/
Job.prototype.cancel = function(callback) {
callback = callback || util.noop;
Expand Down Expand Up @@ -279,4 +285,70 @@ Job.prototype.getQueryResults = function(options, callback) {
return this.bigQuery.query(options, callback);
};

/**
* Begin listening for events on the job. This method keeps track of how many
* "complete" listeners are registered and removed, making sure polling is
* handled automatically.
*
* As long as there is one active "complete" listener, the connection is open.
* When there are no more listeners, the polling stops.
*
* @private
*/
Job.prototype.listenForEvents_ = function() {
var self = this;

this.on('newListener', function(event) {
if (event === 'complete') {
self.completeListeners++;

if (!self.hasActiveListeners) {
self.hasActiveListeners = true;
self.startPolling_();
}
}
});

this.on('removeListener', function(event) {
if (event === 'complete' && --self.completeListeners === 0) {
self.hasActiveListeners = false;
}
});
};

/**
* Poll `getMetadata` to check the operation's status. This runs a loop to ping
* the API on an interval.
*
* Note: This method is automatically called once a "complete" event handler is
* registered on the operation.
*
* @private
*/
Job.prototype.startPolling_ = function() {
var self = this;

if (!this.hasActiveListeners) {
return;
}

this.getMetadata(function(err, metadata, apiResponse) {
if (apiResponse.status && apiResponse.status.errors) {
err = util.ApiError(apiResponse.status);
}

if (err) {
self.emit('error', err);
return;
}

if (metadata.status.state !== 'DONE') {
setTimeout(self.startPolling_.bind(self), 500);
return;
}

self.emit('complete', metadata);
});
};

module.exports = Job;
22 changes: 14 additions & 8 deletions lib/bigquery/table.js
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,10 @@ Table.mergeSchemaWithRows_ = function(schema, rows) {
*
* @example
* var yourTable = dataset.table('your-table');
* table.copy(yourTable, function(err, job, apiResponse) {});
* table.copy(yourTable, function(err, job, apiResponse) {
* // `job` is a Job object that can be used to check the status of the
* // request.
* });
*
* //-
* // See the [`configuration.copy`](http://goo.gl/dKWIyS) object for all
Expand Down Expand Up @@ -387,11 +390,8 @@ Table.prototype.createReadStream = function() {
* request.get(csvUrl)
* .pipe(table.createWriteStream(metadata))
* .on('complete', function(job) {
* // job is a Job object, which you can use to check the status of the load
* // operation.
* job.getMetadata(function(err, metadata) {
* // metadata.status
* });
* // `job` is a Job object that can be used to check the status of the
* // request.
* });
*
* //-
Expand Down Expand Up @@ -499,7 +499,10 @@ Table.prototype.createWriteStream = function(metadata) {
* // If you wish to override this, or provide an array of destination files,
* // you must provide an `options` object.
* //-
* table.export(exportedFile, function(err, job, apiResponse) {});
* table.export(exportedFile, function(err, job, apiResponse) {
* // `job` is a Job object that can be used to check the status of the
* // request.
* });
*
* //-
* // If you need more customization, pass an `options` object.
Expand Down Expand Up @@ -737,7 +740,10 @@ Table.prototype.getRows = function(options, callback) {
* //-
* // Load data from a local file.
* //-
* table.import('./institutions.csv', function(err, job, apiResponse) {});
* table.import('./institutions.csv', function(err, job, apiResponse) {
* // `job` is a Job object that can be used to check the status of the
* // request.
* });
*
* //-
* // You may also pass in metadata in the format of a Jobs resource. See
Expand Down
50 changes: 23 additions & 27 deletions system-test/bigquery.js
Original file line number Diff line number Diff line change
Expand Up @@ -203,38 +203,20 @@ describe('BigQuery', function() {
});

it('should cancel a job', function(done) {
var query = 'SELECT * FROM [publicdata:samples.github_nested]';
var query = 'SELECT url FROM [publicdata:samples.github_nested] LIMIT 10';

bigquery.startQuery(query, function(err, job) {
assert.ifError(err);

job.cancel(function(err) {
assert.ifError(err);
onJobComplete(done);
});

function onJobComplete(callback) {
// Start a loop to check the status of the operation.
checkJobStatus();

function checkJobStatus() {
job.getMetadata(function(err, apiResponse) {
if (err) {
callback(err);
return;
}

if (apiResponse.status.state !== 'DONE') {
// Job has not completed yet. Check again in 3 seconds.
setTimeout(checkJobStatus, 3000);
return;
}

// Job completed sucessfully.
callback();
job
.on('error', done)
.on('complete', function() {
done();
});
}
}
});
});
});

Expand Down Expand Up @@ -346,8 +328,12 @@ describe('BigQuery', function() {
it('should import data from a file in your bucket', function(done) {
table.import(file, function(err, job) {
assert.ifError(err);
assert(job instanceof Job);
done();

job
.on('error', done)
.on('complete', function() {
done();
});
});
});

Expand Down Expand Up @@ -395,7 +381,17 @@ describe('BigQuery', function() {
});

it('should export data to a file in your bucket', function(done) {
table.export(bucket.file('kitten-test-data-backup.json'), done);
var file = bucket.file('kitten-test-data-backup.json');

table.export(file, function(err, job) {
assert.ifError(err);

job
.on('error', done)
.on('complete', function() {
done();
});
});
});
});
});
Expand Down
Loading