-
Notifications
You must be signed in to change notification settings - Fork 2k
First draft of streaming insert sample #196
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -184,18 +184,41 @@ function exportTableToGCS (options, callback) { | |
}); | ||
} | ||
// [END export_table_to_gcs] | ||
|
||
// [START insert_rows_as_stream] | ||
/** | ||
* Insert rows (as a stream) into a BigQuery table. | ||
* @param {object} options Configuration options. | ||
* @param {array} options.rows An array of rows to insert into a BigQuery table. | ||
* @param {string} options.dataset The ID of the dataset containing the target table. | ||
* @param {string} options.table The ID of the table to insert rows into. | ||
* @param {function} callback Callback function to receive query status. | ||
*/ | ||
function insertRowsAsStream (options, callback) { | ||
var table = bigquery.dataset(options.dataset).table(options.table); | ||
table.insert(options.rows, function (err, insertErrors) { | ||
if (err) { | ||
return callback(err); | ||
} | ||
console.log('Inserted %d rows!', options.rows.length); | ||
return callback(null, insertErrors); | ||
}); | ||
} | ||
// [END insert_rows_as_stream] | ||
// [END all] | ||
|
||
// The command-line program | ||
var cli = require('yargs'); | ||
var utils = require('../utils'); | ||
var fs = require('fs'); | ||
|
||
var program = module.exports = { | ||
createTable: createTable, | ||
listTables: listTables, | ||
deleteTable: deleteTable, | ||
importFile: importFile, | ||
exportTableToGCS: exportTableToGCS, | ||
insertRowsAsStream: insertRowsAsStream, | ||
main: function (args) { | ||
// Run the command-line program | ||
cli.help().strict().parse(args).argv; | ||
|
@@ -243,6 +266,29 @@ cli | |
}, function (options) { | ||
program.exportTableToGCS(utils.pick(options, ['dataset', 'table', 'bucket', 'file', 'format', 'gzip']), utils.makeHandler()); | ||
}) | ||
.command('insert <json_or_file> <dataset> <table>', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's have |
||
'Insert a JSON array (as a string or newline-delimited file) into a BigQuery table.', {}, | ||
function (options) { | ||
var content; | ||
try { | ||
content = fs.readFileSync(options.json_or_file); | ||
} catch (err) { | ||
content = options.json_or_file; | ||
} | ||
|
||
var rows = null; | ||
try { | ||
rows = JSON.parse(content); | ||
} catch (err) {} | ||
|
||
if (!Array.isArray(rows)) { | ||
throw new Error('"json_or_file" (or the file it points to) is not a valid JSON array.'); | ||
} | ||
|
||
options.rows = rows; | ||
program.insertRowsAsStream(utils.pick(options, ['rows', 'dataset', 'table']), utils.makeHandler()); | ||
} | ||
) | ||
.example( | ||
'node $0 create my_dataset my_table', | ||
'Create table "my_table" in "my_dataset".' | ||
|
@@ -265,11 +311,19 @@ cli | |
) | ||
.example( | ||
'node $0 export my_dataset my_table my-bucket my-file', | ||
'Export my_dataset:my_table to gcs://my-bucket/my-file as raw CSV' | ||
'Export my_dataset:my_table to gcs://my-bucket/my-file as raw CSV.' | ||
) | ||
.example( | ||
'node $0 export my_dataset my_table my-bucket my-file -f JSON --gzip', | ||
'Export my_dataset:my_table to gcs://my-bucket/my-file as gzipped JSON' | ||
'Export my_dataset:my_table to gcs://my-bucket/my-file as gzipped JSON.' | ||
) | ||
.example( | ||
'node $0 insert json_string my_dataset my_table', | ||
'Insert the JSON array represented by json_string into my_dataset:my_table.' | ||
) | ||
.example( | ||
'node $0 insert json_file my_dataset my_table', | ||
'Insert the JSON objects contained in json_file (one per line) into my_dataset:my_table.' | ||
) | ||
.wrap(100) | ||
.recommendCommands() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,15 @@ var dataset = 'dataset'; | |
var table = 'table'; | ||
var format = 'JSON'; | ||
var schema = 'schema'; | ||
var jsonArray = [ | ||
{ name: 'foo', age: 27 }, | ||
{ name: 'bar', age: 13 } | ||
]; | ||
var validJsonFile = 'validJsonFile'; | ||
var invalidJsonFile = 'invalidJsonFile'; | ||
var validJsonString = JSON.stringify(jsonArray); | ||
var invalidJsonString = 'INVALID'; | ||
var errorList = ['error 1', 'error 2']; | ||
|
||
function getSample () { | ||
var tableMocks = [ | ||
|
@@ -44,7 +53,8 @@ function getSample () { | |
var tableMock = { | ||
export: sinon.stub().yields(null, jobMock), | ||
delete: sinon.stub().yields(null), | ||
import: sinon.stub().yields(null, jobMock) | ||
import: sinon.stub().yields(null, jobMock), | ||
insert: sinon.stub().yields(null, errorList) | ||
}; | ||
var datasetMock = { | ||
table: sinon.stub().returns(tableMock), | ||
|
@@ -57,11 +67,17 @@ function getSample () { | |
}; | ||
var BigQueryMock = sinon.stub().returns(bigqueryMock); | ||
var StorageMock = sinon.stub().returns(storageMock); | ||
var fsMock = { | ||
readFileSync: sinon.stub().throws(new Error('Invalid file.')) | ||
}; | ||
fsMock.readFileSync.withArgs(validJsonFile).returns(validJsonString); | ||
fsMock.readFileSync.withArgs(invalidJsonFile).returns(invalidJsonString); | ||
|
||
return { | ||
program: proxyquire('../tables', { | ||
'@google-cloud/bigquery': BigQueryMock, | ||
'@google-cloud/storage': StorageMock, | ||
'fs': fsMock, | ||
yargs: proxyquire('yargs', {}) | ||
}), | ||
mocks: { | ||
|
@@ -74,6 +90,7 @@ function getSample () { | |
table: tableMock, | ||
bucket: bucketMock, | ||
dataset: datasetMock, | ||
fs: fsMock, | ||
tables: tableMocks | ||
} | ||
}; | ||
|
@@ -290,6 +307,45 @@ describe('bigquery:tables', function () { | |
}); | ||
}); | ||
|
||
describe('insertRowsAsStream', function () { | ||
var options = { | ||
file: file, | ||
dataset: dataset, | ||
table: table, | ||
rows: jsonArray | ||
}; | ||
|
||
it('should stream-insert rows into a table', function () { | ||
var program = getSample().program; | ||
var callback = sinon.stub(); | ||
|
||
program.insertRowsAsStream(options, callback); | ||
assert.equal(callback.calledOnce, true); | ||
assert.deepEqual(callback.firstCall.args, [null, errorList]); | ||
}); | ||
|
||
it('should handle API errors', function () { | ||
var example = getSample(); | ||
var callback = sinon.stub(); | ||
var error = new Error('error'); | ||
example.mocks.table.insert = sinon.stub().yields(error); | ||
|
||
example.program.insertRowsAsStream(options, callback); | ||
assert.equal(callback.calledOnce, true); | ||
assert.deepEqual(callback.firstCall.args, [error]); | ||
}); | ||
|
||
it('should handle (per-row) insert errors', function () { | ||
var example = getSample(); | ||
var callback = sinon.stub(); | ||
example.mocks.table.insert = sinon.stub().yields(null, errorList); | ||
|
||
example.program.insertRowsAsStream(options, callback); | ||
assert.equal(callback.calledOnce, true); | ||
assert.deepEqual(callback.firstCall.args, [null, errorList]); | ||
}); | ||
}); | ||
|
||
describe('main', function () { | ||
it('should call createTable', function () { | ||
var program = getSample().program; | ||
|
@@ -349,6 +405,19 @@ describe('bigquery:tables', function () { | |
}]); | ||
}); | ||
|
||
it('should call insertRowsAsStream', function () { | ||
var program = getSample().program; | ||
program.insertRowsAsStream = sinon.stub(); | ||
|
||
program.main(['insert', validJsonFile, dataset, table]); | ||
|
||
assert.equal(program.insertRowsAsStream.calledOnce, true); | ||
assert.deepEqual( | ||
program.insertRowsAsStream.firstCall.args.slice(0, -1), | ||
[{ rows: jsonArray, dataset: dataset, table: table }] | ||
); | ||
}); | ||
|
||
it('should recognize --gzip flag', function () { | ||
var program = getSample().program; | ||
program.exportTableToGCS = sinon.stub(); | ||
|
@@ -380,5 +449,65 @@ describe('bigquery:tables', function () { | |
gzip: false | ||
}]); | ||
}); | ||
|
||
describe('main:insert', function () { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Change |
||
var options = { | ||
dataset: dataset, | ||
table: table, | ||
rows: jsonArray | ||
}; | ||
|
||
it('should accept valid JSON files', function () { | ||
var program = getSample().program; | ||
program.insertRowsAsStream = sinon.stub(); | ||
|
||
program.main(['insert', validJsonFile, dataset, table]); | ||
|
||
assert.equal(program.insertRowsAsStream.calledOnce, true); | ||
assert.deepEqual(program.insertRowsAsStream.firstCall.args.slice(0, -1), [options]); | ||
}); | ||
|
||
it('should reject files with invalid JSON', function () { | ||
var program = getSample().program; | ||
program.insertRowsAsStream = sinon.stub(); | ||
|
||
assert.throws( | ||
function () { program.main(['insert', invalidJsonFile, dataset, table]); }, | ||
/"json_or_file" \(or the file it points to\) is not a valid JSON array\./ | ||
); | ||
assert.equal(program.insertRowsAsStream.called, false); | ||
}); | ||
|
||
it('should reject invalid file names', function () { | ||
var program = getSample().program; | ||
program.insertRowsAsStream = sinon.stub(); | ||
|
||
assert.throws( | ||
function () { program.main(['insert', '', dataset, table]); }, | ||
/"json_or_file" \(or the file it points to\) is not a valid JSON array\./ | ||
); | ||
assert.equal(program.insertRowsAsStream.called, false); | ||
}); | ||
|
||
it('should accept valid JSON strings', function () { | ||
var program = getSample().program; | ||
program.insertRowsAsStream = sinon.stub(); | ||
|
||
program.main(['insert', validJsonString, dataset, table]); | ||
assert.equal(program.insertRowsAsStream.calledOnce, true); | ||
assert.deepEqual(program.insertRowsAsStream.firstCall.args.slice(0, -1), [options]); | ||
}); | ||
|
||
it('should reject invalid JSON strings', function () { | ||
var program = getSample().program; | ||
program.insertRowsAsStream = sinon.stub(); | ||
|
||
assert.throws( | ||
function () { program.main(['insert', invalidJsonString, dataset, table]); }, | ||
/"json_or_file" \(or the file it points to\) is not a valid JSON array\./ | ||
); | ||
assert.equal(program.insertRowsAsStream.called, false); | ||
}); | ||
}); | ||
}); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have a
console.log
above this line like all the other samples, e.g.:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix.