Skip to content

Commit 464ff6c

Browse files
stephenpluspluscallmehiphop
authored andcommitted
bigquery: allow AVRO format (#1380)
1 parent 7b01258 commit 464ff6c

File tree

2 files changed

+145
-57
lines changed

2 files changed

+145
-57
lines changed

lib/bigquery/table.js

+53-49
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,18 @@ var streamRouter = require('../common/stream-router.js');
5454
*/
5555
var util = require('../common/util.js');
5656

57+
/**
58+
* The file formats accepted by BigQuery.
59+
*
60+
* @type {object}
61+
* @private
62+
*/
63+
var FORMATS = {
64+
avro: 'AVRO',
65+
csv: 'CSV',
66+
json: 'NEWLINE_DELIMITED_JSON'
67+
};
68+
5769
/*! Developer Documentation
5870
*
5971
* @param {module:bigquery/dataset} dataset - Dataset instance.
@@ -293,7 +305,7 @@ Table.mergeSchemaWithRows_ = function(schema, rows) {
293305
* table.copy(yourTable, metadata, function(err, job, apiResponse) {});
294306
*/
295307
Table.prototype.copy = function(destination, metadata, callback) {
296-
var that = this;
308+
var self = this;
297309

298310
if (!(destination instanceof Table)) {
299311
throw new Error('Destination must be a Table object.');
@@ -331,7 +343,7 @@ Table.prototype.copy = function(destination, metadata, callback) {
331343
return;
332344
}
333345

334-
var job = that.bigQuery.job(resp.jobReference.jobId);
346+
var job = self.bigQuery.job(resp.jobReference.jobId);
335347
job.metadata = resp;
336348

337349
callback(null, job, resp);
@@ -361,8 +373,8 @@ Table.prototype.createReadStream = function() {
361373
};
362374

363375
/**
364-
* Load data into your table from a readable stream of JSON or CSV-formatted
365-
* data.
376+
* Load data into your table from a readable stream of JSON, CSV, or
377+
* AVRO data.
366378
*
367379
* @resource [Jobs: insert API Documentation]{@link https://cloud.google.com/bigquery/docs/reference/v2/jobs/insert}
368380
*
@@ -404,21 +416,17 @@ Table.prototype.createReadStream = function() {
404416
* .on('complete', function(job) {});
405417
*/
406418
Table.prototype.createWriteStream = function(metadata) {
407-
var that = this;
419+
var self = this;
408420

409421
metadata = metadata || {};
410422

411-
var fileTypeMap = {
412-
csv: 'CSV',
413-
json: 'NEWLINE_DELIMITED_JSON'
414-
};
415-
var fileTypes = Object.keys(fileTypeMap).map(function(key) {
416-
return fileTypeMap[key];
423+
var fileTypes = Object.keys(FORMATS).map(function(key) {
424+
return FORMATS[key];
417425
});
418426

419427
if (is.string(metadata)) {
420428
metadata = {
421-
sourceFormat: fileTypeMap[metadata.toLowerCase()]
429+
sourceFormat: FORMATS[metadata.toLowerCase()]
422430
};
423431
}
424432

@@ -428,9 +436,9 @@ Table.prototype.createWriteStream = function(metadata) {
428436

429437
extend(true, metadata, {
430438
destinationTable: {
431-
projectId: that.bigQuery.projectId,
432-
datasetId: that.dataset.id,
433-
tableId: that.id
439+
projectId: self.bigQuery.projectId,
440+
datasetId: self.dataset.id,
441+
tableId: self.id
434442
}
435443
});
436444

@@ -443,7 +451,7 @@ Table.prototype.createWriteStream = function(metadata) {
443451

444452
dup.once('writing', function() {
445453
util.makeWritableStream(dup, {
446-
makeAuthenticatedRequest: that.bigQuery.makeAuthenticatedRequest,
454+
makeAuthenticatedRequest: self.bigQuery.makeAuthenticatedRequest,
447455
metadata: {
448456
configuration: {
449457
load: metadata
@@ -452,11 +460,11 @@ Table.prototype.createWriteStream = function(metadata) {
452460
request: {
453461
uri: format('{base}/{projectId}/jobs', {
454462
base: 'https://www.googleapis.com/upload/bigquery/v2/projects',
455-
projectId: that.bigQuery.projectId
463+
projectId: self.bigQuery.projectId
456464
})
457465
}
458466
}, function(data) {
459-
var job = that.bigQuery.job(data.jobReference.jobId);
467+
var job = self.bigQuery.job(data.jobReference.jobId);
460468
job.metadata = data;
461469

462470
dup.emit('complete', job);
@@ -523,19 +531,13 @@ Table.prototype.createWriteStream = function(metadata) {
523531
* ], options, function(err, job, apiResponse) {});
524532
*/
525533
Table.prototype.export = function(destination, options, callback) {
526-
var that = this;
534+
var self = this;
527535

528536
if (is.fn(options)) {
529537
callback = options;
530538
options = {};
531539
}
532540

533-
var formats = {
534-
avro: 'AVRO',
535-
csv: 'CSV',
536-
json: 'NEWLINE_DELIMITED_JSON'
537-
};
538-
539541
extend(true, options, {
540542
destinationUris: arrify(destination).map(function(dest) {
541543
if (!(dest instanceof File)) {
@@ -545,8 +547,8 @@ Table.prototype.export = function(destination, options, callback) {
545547
// If no explicit format was provided, attempt to find a match from the
546548
// file's extension. If no match, don't set, and default upstream to CSV.
547549
var format = path.extname(dest.name).substr(1).toLowerCase();
548-
if (!options.destinationFormat && !options.format && formats[format]) {
549-
options.destinationFormat = formats[format];
550+
if (!options.destinationFormat && !options.format && FORMATS[format]) {
551+
options.destinationFormat = FORMATS[format];
550552
}
551553

552554
return 'gs://' + dest.bucket.name + '/' + dest.name;
@@ -556,8 +558,8 @@ Table.prototype.export = function(destination, options, callback) {
556558
if (options.format) {
557559
options.format = options.format.toLowerCase();
558560

559-
if (formats[options.format]) {
560-
options.destinationFormat = formats[options.format];
561+
if (FORMATS[options.format]) {
562+
options.destinationFormat = FORMATS[options.format];
561563
delete options.format;
562564
} else {
563565
throw new Error('Destination format not recognized: ' + options.format);
@@ -591,7 +593,7 @@ Table.prototype.export = function(destination, options, callback) {
591593
return;
592594
}
593595

594-
var job = that.bigQuery.job(resp.jobReference.jobId);
596+
var job = self.bigQuery.job(resp.jobReference.jobId);
595597
job.metadata = resp;
596598

597599
callback(null, job, resp);
@@ -658,7 +660,7 @@ Table.prototype.export = function(destination, options, callback) {
658660
* });
659661
*/
660662
Table.prototype.getRows = function(options, callback) {
661-
var that = this;
663+
var self = this;
662664

663665
if (is.fn(options)) {
664666
callback = options;
@@ -684,9 +686,9 @@ Table.prototype.getRows = function(options, callback) {
684686
});
685687
}
686688

687-
if (resp.rows && resp.rows.length > 0 && !that.metadata.schema) {
689+
if (resp.rows && resp.rows.length > 0 && !self.metadata.schema) {
688690
// We don't know the schema for this table yet. Do a quick stat.
689-
that.getMetadata(function(err, metadata, apiResponse) {
691+
self.getMetadata(function(err, metadata, apiResponse) {
690692
if (err) {
691693
onComplete(err, null, null, apiResponse);
692694
return;
@@ -707,7 +709,7 @@ Table.prototype.getRows = function(options, callback) {
707709
return;
708710
}
709711

710-
rows = Table.mergeSchemaWithRows_(that.metadata.schema, rows || []);
712+
rows = Table.mergeSchemaWithRows_(self.metadata.schema, rows || []);
711713

712714
callback(null, rows, nextQuery, resp);
713715
}
@@ -720,16 +722,17 @@ Table.prototype.getRows = function(options, callback) {
720722
* asynchronously. If you would like instantaneous access to your data, insert
721723
* it using {module:bigquery/table#insert}.
722724
*
723-
* Note: Only JSON and CSV source files are supported. The file type will be
724-
* inferred by the given file's extension. If you wish to override this, you
725-
* must provide a `metadata` object.
725+
* Note: The file type will be inferred by the given file's extension. If you
726+
* wish to override this, you must provide `metadata.format`.
726727
*
727728
* @resource [Jobs: insert API Documentation]{@link https://cloud.google.com/bigquery/docs/reference/v2/jobs/insert}
728729
*
729730
* @param {string|module:storage/file} source - The source file to import.
730731
* @param {object=} metadata - Metadata to set with the load operation. The
731732
* metadata object should be in the format of the
732733
* [`configuration.load`](http://goo.gl/BVcXk4) property of a Jobs resource.
734+
* @param {string} metadata.format - The format the data being imported is in.
735+
* Allowed options are "CSV", "JSON", or "AVRO".
733736
* @param {function} callback - The callback function.
734737
* @param {?error} callback.err - An error returned while making this request
735738
* @param {module:bigquery/job} callback.job - The job used to import your data.
@@ -772,7 +775,7 @@ Table.prototype.getRows = function(options, callback) {
772775
* ], function(err, job, apiResponse) {});
773776
*/
774777
Table.prototype.import = function(source, metadata, callback) {
775-
var that = this;
778+
var self = this;
776779

777780
if (is.fn(metadata)) {
778781
callback = metadata;
@@ -782,17 +785,18 @@ Table.prototype.import = function(source, metadata, callback) {
782785
callback = callback || util.noop;
783786
metadata = metadata || {};
784787

785-
var formats = {
786-
csv: 'CSV',
787-
json: 'NEWLINE_DELIMITED_JSON'
788-
};
788+
var format = metadata.sourceFormat || metadata.format;
789+
if (format) {
790+
metadata.sourceFormat = FORMATS[format.toLowerCase()];
791+
delete metadata.format;
792+
}
789793

790794
if (is.string(source)) {
791795
// A path to a file was given. If a sourceFormat wasn't specified, try to
792796
// find a match from the file's extension.
793-
var format = formats[path.extname(source).substr(1).toLowerCase()];
794-
if (!metadata.sourceFormat && format) {
795-
metadata.sourceFormat = format;
797+
var detectedFormat = FORMATS[path.extname(source).substr(1).toLowerCase()];
798+
if (!metadata.sourceFormat && detectedFormat) {
799+
metadata.sourceFormat = detectedFormat;
796800
}
797801

798802
// Read the file into a new write stream.
@@ -826,7 +830,7 @@ Table.prototype.import = function(source, metadata, callback) {
826830
// If no explicit format was provided, attempt to find a match from
827831
// the file's extension. If no match, don't set, and default upstream
828832
// to CSV.
829-
var format = formats[path.extname(src.name).substr(1).toLowerCase()];
833+
var format = FORMATS[path.extname(src.name).substr(1).toLowerCase()];
830834
if (!metadata.sourceFormat && format) {
831835
body.configuration.load.sourceFormat = format;
832836
}
@@ -845,7 +849,7 @@ Table.prototype.import = function(source, metadata, callback) {
845849
return;
846850
}
847851

848-
var job = that.bigQuery.job(resp.jobReference.jobId);
852+
var job = self.bigQuery.job(resp.jobReference.jobId);
849853
job.metadata = resp;
850854

851855
callback(null, job, resp);
@@ -1028,7 +1032,7 @@ Table.prototype.query = function(query, callback) {
10281032
* table.setMetadata(metadata, function(err, metadata, apiResponse) {});
10291033
*/
10301034
Table.prototype.setMetadata = function(metadata, callback) {
1031-
var that = this;
1035+
var self = this;
10321036

10331037
if (metadata.name) {
10341038
metadata.friendlyName = metadata.name;
@@ -1049,7 +1053,7 @@ Table.prototype.setMetadata = function(metadata, callback) {
10491053
return;
10501054
}
10511055

1052-
that.metadata = resp;
1056+
self.metadata = resp;
10531057

10541058
callback(null, resp);
10551059
});

0 commit comments

Comments
 (0)