Skip to content

Commit 2f9b34f

Browse files
author
Ace Nassri
authored
First draft of streaming insert sample (#196)
* First draft of streaming insert sample * Fix flaky test + address comments * Fix comments * Fix flaky test
1 parent 241d28f commit 2f9b34f

File tree

3 files changed

+219
-10
lines changed

3 files changed

+219
-10
lines changed

bigquery/system-test/tables.test.js

+33-7
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,21 @@ var path = require('path');
2222
function generateUuid () {
2323
return 'nodejs_docs_samples_' + uuid.v4().replace(/-/gi, '_');
2424
}
25-
25+
var rows = [
26+
{ Name: 'foo', Age: 27, Weight: 80.3, IsMagic: true },
27+
{ Name: 'bar', Age: 13, Weight: 54.6, IsMagic: false }
28+
];
2629
var options = {
2730
projectId: process.env.GCLOUD_PROJECT,
2831
localFilePath: path.join(__dirname, '../resources/data.csv'),
2932
bucket: generateUuid(),
3033
file: 'data.json',
3134
dataset: generateUuid(),
3235
table: generateUuid(),
33-
schema: 'Name:string, Age:integer, Weigth:float, IsMagic:boolean'
36+
schema: 'Name:string, Age:integer, Weight:float, IsMagic:boolean',
37+
rows: rows
3438
};
3539

36-
var file = storage.bucket(options.bucket).file(options.file);
37-
3840
describe('bigquery:tables', function () {
3941
before(function (done) {
4042
// Create bucket
@@ -62,7 +64,9 @@ describe('bigquery:tables', function () {
6264
return done(err);
6365
}
6466
// Delete bucket
65-
storage.bucket(options.bucket).delete(done);
67+
setTimeout(function () {
68+
storage.bucket(options.bucket).delete(done);
69+
}, 2000);
6670
});
6771
});
6872
});
@@ -122,7 +126,7 @@ describe('bigquery:tables', function () {
122126
assert(metadata.status, 'job metadata has status');
123127
assert.equal(metadata.status.state, 'DONE', 'job was finished');
124128

125-
file.exists(function (err, exists) {
129+
storage.bucket(options.bucket).file(options.file).exists(function (err, exists) {
126130
assert.ifError(err, 'file existence check succeeded');
127131
assert(exists, 'export destination exists');
128132
done();
@@ -131,8 +135,30 @@ describe('bigquery:tables', function () {
131135
});
132136
});
133137

138+
describe('insertRowsAsStream', function () {
139+
it('should insert rows into a table', function (done) {
140+
var table = bigquery.dataset(options.dataset).table(options.table);
141+
table.getRows({}, function (err, startRows) {
142+
assert.equal(err, null);
143+
144+
program.insertRowsAsStream(options, function (err, insertErrors) {
145+
assert.equal(err, null);
146+
assert.deepEqual(insertErrors, [], 'no per-row insert errors occurred');
147+
148+
setTimeout(function () {
149+
table.getRows({}, function (err, endRows) {
150+
assert.equal(err, null);
151+
assert.equal(startRows.length + 2, endRows.length, 'insertRows() added 2 rows');
152+
done();
153+
});
154+
}, 2000);
155+
});
156+
});
157+
});
158+
});
159+
134160
describe('deleteTable', function () {
135-
it('should list tables', function (done) {
161+
it('should delete table', function (done) {
136162
program.deleteTable(options, function (err) {
137163
assert.ifError(err);
138164
assert(console.log.calledWith('Deleted table: %s', options.table));

bigquery/tables.js

+56-2
Original file line numberDiff line numberDiff line change
@@ -184,18 +184,41 @@ function exportTableToGCS (options, callback) {
184184
});
185185
}
186186
// [END export_table_to_gcs]
187+
188+
// [START insert_rows_as_stream]
189+
/**
190+
* Insert rows (as a stream) into a BigQuery table.
191+
* @param {object} options Configuration options.
192+
* @param {array} options.rows An array of rows to insert into a BigQuery table.
193+
* @param {string} options.dataset The ID of the dataset containing the target table.
194+
* @param {string} options.table The ID of the table to insert rows into.
195+
* @param {function} callback Callback function to receive query status.
196+
*/
197+
function insertRowsAsStream (options, callback) {
198+
var table = bigquery.dataset(options.dataset).table(options.table);
199+
table.insert(options.rows, function (err, insertErrors) {
200+
if (err) {
201+
return callback(err);
202+
}
203+
console.log('Inserted %d rows!', options.rows.length);
204+
return callback(null, insertErrors);
205+
});
206+
}
207+
// [END insert_rows_as_stream]
187208
// [END all]
188209

189210
// The command-line program
190211
var cli = require('yargs');
191212
var utils = require('../utils');
213+
var fs = require('fs');
192214

193215
var program = module.exports = {
194216
createTable: createTable,
195217
listTables: listTables,
196218
deleteTable: deleteTable,
197219
importFile: importFile,
198220
exportTableToGCS: exportTableToGCS,
221+
insertRowsAsStream: insertRowsAsStream,
199222
main: function (args) {
200223
// Run the command-line program
201224
cli.help().strict().parse(args).argv;
@@ -243,6 +266,29 @@ cli
243266
}, function (options) {
244267
program.exportTableToGCS(utils.pick(options, ['dataset', 'table', 'bucket', 'file', 'format', 'gzip']), utils.makeHandler());
245268
})
269+
.command('insert <dataset> <table> <json_or_file>',
270+
'Insert a JSON array (as a string or newline-delimited file) into a BigQuery table.', {},
271+
function (options) {
272+
var content;
273+
try {
274+
content = fs.readFileSync(options.json_or_file);
275+
} catch (err) {
276+
content = options.json_or_file;
277+
}
278+
279+
var rows = null;
280+
try {
281+
rows = JSON.parse(content);
282+
} catch (err) {}
283+
284+
if (!Array.isArray(rows)) {
285+
throw new Error('"json_or_file" (or the file it points to) is not a valid JSON array.');
286+
}
287+
288+
options.rows = rows;
289+
program.insertRowsAsStream(utils.pick(options, ['rows', 'dataset', 'table']), utils.makeHandler());
290+
}
291+
)
246292
.example(
247293
'node $0 create my_dataset my_table',
248294
'Create table "my_table" in "my_dataset".'
@@ -265,11 +311,19 @@ cli
265311
)
266312
.example(
267313
'node $0 export my_dataset my_table my-bucket my-file',
268-
'Export my_dataset:my_table to gcs://my-bucket/my-file as raw CSV'
314+
'Export my_dataset:my_table to gcs://my-bucket/my-file as raw CSV.'
269315
)
270316
.example(
271317
'node $0 export my_dataset my_table my-bucket my-file -f JSON --gzip',
272-
'Export my_dataset:my_table to gcs://my-bucket/my-file as gzipped JSON'
318+
'Export my_dataset:my_table to gcs://my-bucket/my-file as gzipped JSON.'
319+
)
320+
.example(
321+
'node $0 insert my_dataset my_table json_string',
322+
'Insert the JSON array represented by json_string into my_dataset:my_table.'
323+
)
324+
.example(
325+
'node $0 insert my_dataset my_table json_file',
326+
'Insert the JSON objects contained in json_file (one per line) into my_dataset:my_table.'
273327
)
274328
.wrap(100)
275329
.recommendCommands()

bigquery/test/tables.test.js

+130-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,15 @@ var dataset = 'dataset';
2121
var table = 'table';
2222
var format = 'JSON';
2323
var schema = 'schema';
24+
var jsonArray = [
25+
{ name: 'foo', age: 27 },
26+
{ name: 'bar', age: 13 }
27+
];
28+
var validJsonFile = 'validJsonFile';
29+
var invalidJsonFile = 'invalidJsonFile';
30+
var validJsonString = JSON.stringify(jsonArray);
31+
var invalidJsonString = 'INVALID';
32+
var errorList = ['error 1', 'error 2'];
2433

2534
function getSample () {
2635
var tableMocks = [
@@ -44,7 +53,8 @@ function getSample () {
4453
var tableMock = {
4554
export: sinon.stub().yields(null, jobMock),
4655
delete: sinon.stub().yields(null),
47-
import: sinon.stub().yields(null, jobMock)
56+
import: sinon.stub().yields(null, jobMock),
57+
insert: sinon.stub().yields(null, errorList)
4858
};
4959
var datasetMock = {
5060
table: sinon.stub().returns(tableMock),
@@ -57,11 +67,17 @@ function getSample () {
5767
};
5868
var BigQueryMock = sinon.stub().returns(bigqueryMock);
5969
var StorageMock = sinon.stub().returns(storageMock);
70+
var fsMock = {
71+
readFileSync: sinon.stub().throws(new Error('Invalid file.'))
72+
};
73+
fsMock.readFileSync.withArgs(validJsonFile).returns(validJsonString);
74+
fsMock.readFileSync.withArgs(invalidJsonFile).returns(invalidJsonString);
6075

6176
return {
6277
program: proxyquire('../tables', {
6378
'@google-cloud/bigquery': BigQueryMock,
6479
'@google-cloud/storage': StorageMock,
80+
'fs': fsMock,
6581
yargs: proxyquire('yargs', {})
6682
}),
6783
mocks: {
@@ -74,6 +90,7 @@ function getSample () {
7490
table: tableMock,
7591
bucket: bucketMock,
7692
dataset: datasetMock,
93+
fs: fsMock,
7794
tables: tableMocks
7895
}
7996
};
@@ -290,6 +307,45 @@ describe('bigquery:tables', function () {
290307
});
291308
});
292309

310+
describe('insertRowsAsStream', function () {
311+
var options = {
312+
file: file,
313+
dataset: dataset,
314+
table: table,
315+
rows: jsonArray
316+
};
317+
318+
it('should stream-insert rows into a table', function () {
319+
var program = getSample().program;
320+
var callback = sinon.stub();
321+
322+
program.insertRowsAsStream(options, callback);
323+
assert.equal(callback.calledOnce, true);
324+
assert.deepEqual(callback.firstCall.args, [null, errorList]);
325+
});
326+
327+
it('should handle API errors', function () {
328+
var example = getSample();
329+
var callback = sinon.stub();
330+
var error = new Error('error');
331+
example.mocks.table.insert = sinon.stub().yields(error);
332+
333+
example.program.insertRowsAsStream(options, callback);
334+
assert.equal(callback.calledOnce, true);
335+
assert.deepEqual(callback.firstCall.args, [error]);
336+
});
337+
338+
it('should handle (per-row) insert errors', function () {
339+
var example = getSample();
340+
var callback = sinon.stub();
341+
example.mocks.table.insert = sinon.stub().yields(null, errorList);
342+
343+
example.program.insertRowsAsStream(options, callback);
344+
assert.equal(callback.calledOnce, true);
345+
assert.deepEqual(callback.firstCall.args, [null, errorList]);
346+
});
347+
});
348+
293349
describe('main', function () {
294350
it('should call createTable', function () {
295351
var program = getSample().program;
@@ -349,6 +405,19 @@ describe('bigquery:tables', function () {
349405
}]);
350406
});
351407

408+
it('should call insertRowsAsStream', function () {
409+
var program = getSample().program;
410+
program.insertRowsAsStream = sinon.stub();
411+
412+
program.main(['insert', dataset, table, validJsonFile]);
413+
414+
assert.equal(program.insertRowsAsStream.calledOnce, true);
415+
assert.deepEqual(
416+
program.insertRowsAsStream.firstCall.args.slice(0, -1),
417+
[{ rows: jsonArray, dataset: dataset, table: table }]
418+
);
419+
});
420+
352421
it('should recognize --gzip flag', function () {
353422
var program = getSample().program;
354423
program.exportTableToGCS = sinon.stub();
@@ -380,5 +449,65 @@ describe('bigquery:tables', function () {
380449
gzip: false
381450
}]);
382451
});
452+
453+
describe('insert', function () {
454+
var options = {
455+
dataset: dataset,
456+
table: table,
457+
rows: jsonArray
458+
};
459+
460+
it('should accept valid JSON files', function () {
461+
var program = getSample().program;
462+
program.insertRowsAsStream = sinon.stub();
463+
464+
program.main(['insert', dataset, table, validJsonFile]);
465+
466+
assert.equal(program.insertRowsAsStream.calledOnce, true);
467+
assert.deepEqual(program.insertRowsAsStream.firstCall.args.slice(0, -1), [options]);
468+
});
469+
470+
it('should reject files with invalid JSON', function () {
471+
var program = getSample().program;
472+
program.insertRowsAsStream = sinon.stub();
473+
474+
assert.throws(
475+
function () { program.main(['insert', dataset, table, invalidJsonFile]); },
476+
/"json_or_file" \(or the file it points to\) is not a valid JSON array\./
477+
);
478+
assert.equal(program.insertRowsAsStream.called, false);
479+
});
480+
481+
it('should reject invalid file names', function () {
482+
var program = getSample().program;
483+
program.insertRowsAsStream = sinon.stub();
484+
485+
assert.throws(
486+
function () { program.main(['insert', dataset, table, '']); },
487+
/"json_or_file" \(or the file it points to\) is not a valid JSON array\./
488+
);
489+
assert.equal(program.insertRowsAsStream.called, false);
490+
});
491+
492+
it('should accept valid JSON strings', function () {
493+
var program = getSample().program;
494+
program.insertRowsAsStream = sinon.stub();
495+
496+
program.main(['insert', dataset, table, validJsonString]);
497+
assert.equal(program.insertRowsAsStream.calledOnce, true);
498+
assert.deepEqual(program.insertRowsAsStream.firstCall.args.slice(0, -1), [options]);
499+
});
500+
501+
it('should reject invalid JSON strings', function () {
502+
var program = getSample().program;
503+
program.insertRowsAsStream = sinon.stub();
504+
505+
assert.throws(
506+
function () { program.main(['insert', dataset, table, invalidJsonString]); },
507+
/"json_or_file" \(or the file it points to\) is not a valid JSON array\./
508+
);
509+
assert.equal(program.insertRowsAsStream.called, false);
510+
});
511+
});
383512
});
384513
});

0 commit comments

Comments
 (0)