Skip to content

First draft of streaming insert sample #196

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 31, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 28 additions & 6 deletions bigquery/system-test/tables.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,21 @@ var path = require('path');
function generateUuid () {
return 'nodejs_docs_samples_' + uuid.v4().replace(/-/gi, '_');
}

var rows = [
{ Name: 'foo', Age: 27, Weight: 80.3, IsMagic: true },
{ Name: 'bar', Age: 13, Weight: 54.6, IsMagic: false }
];
var options = {
projectId: process.env.GCLOUD_PROJECT,
localFilePath: path.join(__dirname, '../resources/data.csv'),
bucket: generateUuid(),
file: 'data.json',
dataset: generateUuid(),
table: generateUuid(),
schema: 'Name:string, Age:integer, Weigth:float, IsMagic:boolean'
schema: 'Name:string, Age:integer, Weight:float, IsMagic:boolean',
rows: rows
};

var file = storage.bucket(options.bucket).file(options.file);

describe('bigquery:tables', function () {
before(function (done) {
// Create bucket
Expand Down Expand Up @@ -122,7 +124,7 @@ describe('bigquery:tables', function () {
assert(metadata.status, 'job metadata has status');
assert.equal(metadata.status.state, 'DONE', 'job was finished');

file.exists(function (err, exists) {
storage.bucket(options.bucket).file(options.file).exists(function (err, exists) {
assert.ifError(err, 'file existence check succeeded');
assert(exists, 'export destination exists');
done();
Expand All @@ -131,8 +133,28 @@ describe('bigquery:tables', function () {
});
});

describe('insertRowsAsStream', function () {
it('should insert rows into a table', function (done) {
var table = bigquery.dataset(options.dataset).table(options.table);
table.getRows({}, function (err, startRows) {
assert.equal(err, null);

program.insertRowsAsStream(options, function (err, insertErrors) {
assert.equal(err, null);
assert.deepEqual(insertErrors, [], 'no per-row insert errors occurred');

table.getRows({}, function (err, endRows) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrap this in a timeout, say 2000ms

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix.

assert.equal(err, null);
assert.equal(startRows.length + 2, endRows.length, 'insertRows() added 2 rows');
done();
});
});
});
});
});

describe('deleteTable', function () {
it('should list tables', function (done) {
it('should delete table', function (done) {
program.deleteTable(options, function (err) {
assert.ifError(err);
assert(console.log.calledWith('Deleted table: %s', options.table));
Expand Down
54 changes: 52 additions & 2 deletions bigquery/tables.js
Original file line number Diff line number Diff line change
Expand Up @@ -184,18 +184,40 @@ function exportTableToGCS (options, callback) {
});
}
// [END export_table_to_gcs]

// [START insert_rows_as_stream]
/**
* Insert rows (as a stream) into a BigQuery table.
* @param {object} options Configuration options.
* @param {array} options.rows An array of rows to insert into a BigQuery table.
* @param {string} options.dataset The ID of the dataset containing the target table.
* @param {string} options.table The ID of the table to insert rows into.
* @param {function} callback Callback function to receive query status.
*/
function insertRowsAsStream (options, callback) {
var table = bigquery.dataset(options.dataset).table(options.table);
table.insert(options.rows, function (err, insertErrors) {
if (err) {
return callback(err);
}
return callback(null, insertErrors);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have a console.log above this line like all the other samples, e.g.:

if (err) {
  return callback(err);
}

console.log('Inserted %d row(s)!', options.rows.length);
return callback(null, insertErrors);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix.

});
}
// [END insert_rows_as_stream]
// [END all]

// The command-line program
var cli = require('yargs');
var utils = require('../utils');
var fs = require('fs');

var program = module.exports = {
createTable: createTable,
listTables: listTables,
deleteTable: deleteTable,
importFile: importFile,
exportTableToGCS: exportTableToGCS,
insertRowsAsStream: insertRowsAsStream,
main: function (args) {
// Run the command-line program
cli.help().strict().parse(args).argv;
Expand Down Expand Up @@ -243,6 +265,26 @@ cli
}, function (options) {
program.exportTableToGCS(utils.pick(options, ['dataset', 'table', 'bucket', 'file', 'format', 'gzip']), utils.makeHandler());
})
.command('insert <json_or_file> <dataset> <table>',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's have dataset and table be the first two args in order to match all the other commands.

'Insert a JSON array (as a string or newline-delimited file) into a BigQuery table.', {},
function (options) {
var content = fs.readFileSync(options.json_or_file);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the file cannot be found, this will throw an error, so it needs to be wrapped in a try/catch, and in your catch you would just do content = options.json_or_file;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix; adding a test for this too.

if (!content) {
content = options.json_or_file;
}
var rows = null;
try {
rows = JSON.parse(content);
} catch (err) {}

if (!Array.isArray(rows)) {
throw new Error('"json_or_file" (or the file it points to) is not a valid JSON array.');
}

options.rows = rows;
program.insertRowsAsStream(utils.pick(options, ['rows', 'dataset', 'table']), utils.makeHandler());
}
)
.example(
'node $0 create my_dataset my_table',
'Create table "my_table" in "my_dataset".'
Expand All @@ -265,11 +307,19 @@ cli
)
.example(
'node $0 export my_dataset my_table my-bucket my-file',
'Export my_dataset:my_table to gcs://my-bucket/my-file as raw CSV'
'Export my_dataset:my_table to gcs://my-bucket/my-file as raw CSV.'
)
.example(
'node $0 export my_dataset my_table my-bucket my-file -f JSON --gzip',
'Export my_dataset:my_table to gcs://my-bucket/my-file as gzipped JSON'
'Export my_dataset:my_table to gcs://my-bucket/my-file as gzipped JSON.'
)
.example(
'node $0 insert json_string my_dataset my_table',
'Insert the JSON array represented by json_string into my_dataset:my_table.'
)
.example(
'node $0 insert json_file my_dataset my_table',
'Insert the JSON objects contained in json_file (one per line) into my_dataset:my_table.'
)
.wrap(100)
.recommendCommands()
Expand Down
120 changes: 119 additions & 1 deletion bigquery/test/tables.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ var dataset = 'dataset';
var table = 'table';
var format = 'JSON';
var schema = 'schema';
var jsonArray = [
{ name: 'foo', age: 27 },
{ name: 'bar', age: 13 }
];
var validJsonFile = 'validJsonFile';
var invalidJsonFile = 'invalidJsonFile';
var validJsonString = JSON.stringify(jsonArray);
var invalidJsonString = 'INVALID';
var errorList = ['error 1', 'error 2'];

function getSample () {
var tableMocks = [
Expand All @@ -44,7 +53,8 @@ function getSample () {
var tableMock = {
export: sinon.stub().yields(null, jobMock),
delete: sinon.stub().yields(null),
import: sinon.stub().yields(null, jobMock)
import: sinon.stub().yields(null, jobMock),
insert: sinon.stub().yields(null, errorList)
};
var datasetMock = {
table: sinon.stub().returns(tableMock),
Expand All @@ -57,11 +67,17 @@ function getSample () {
};
var BigQueryMock = sinon.stub().returns(bigqueryMock);
var StorageMock = sinon.stub().returns(storageMock);
var fsMock = {
readFileSync: sinon.stub().returns(null)
};
fsMock.readFileSync.withArgs(validJsonFile).returns(validJsonString);
fsMock.readFileSync.withArgs(invalidJsonFile).returns(invalidJsonString);

return {
program: proxyquire('../tables', {
'@google-cloud/bigquery': BigQueryMock,
'@google-cloud/storage': StorageMock,
'fs': fsMock,
yargs: proxyquire('yargs', {})
}),
mocks: {
Expand All @@ -74,6 +90,7 @@ function getSample () {
table: tableMock,
bucket: bucketMock,
dataset: datasetMock,
fs: fsMock,
tables: tableMocks
}
};
Expand Down Expand Up @@ -290,6 +307,45 @@ describe('bigquery:tables', function () {
});
});

describe('insertRowsAsStream', function () {
var options = {
file: file,
dataset: dataset,
table: table,
rows: jsonArray
};

it('should stream-insert rows into a table', function () {
var program = getSample().program;
var callback = sinon.stub();

program.insertRowsAsStream(options, callback);
assert.equal(callback.calledOnce, true);
assert.deepEqual(callback.firstCall.args, [null, errorList]);
});

it('should handle API errors', function () {
var example = getSample();
var callback = sinon.stub();
var error = new Error('error');
example.mocks.table.insert = sinon.stub().yields(error);

example.program.insertRowsAsStream(options, callback);
assert.equal(callback.calledOnce, true);
assert.deepEqual(callback.firstCall.args, [error]);
});

it('should handle (per-row) insert errors', function () {
var example = getSample();
var callback = sinon.stub();
example.mocks.table.insert = sinon.stub().yields(null, errorList);

example.program.insertRowsAsStream(options, callback);
assert.equal(callback.calledOnce, true);
assert.deepEqual(callback.firstCall.args, [null, errorList]);
});
});

describe('main', function () {
it('should call createTable', function () {
var program = getSample().program;
Expand Down Expand Up @@ -349,6 +405,19 @@ describe('bigquery:tables', function () {
}]);
});

it('should call insertRowsAsStream', function () {
var program = getSample().program;
program.insertRowsAsStream = sinon.stub();

program.main(['insert', validJsonFile, dataset, table]);

assert.equal(program.insertRowsAsStream.calledOnce, true);
assert.deepEqual(
program.insertRowsAsStream.firstCall.args.slice(0, -1),
[{ rows: jsonArray, dataset: dataset, table: table }]
);
});

it('should recognize --gzip flag', function () {
var program = getSample().program;
program.exportTableToGCS = sinon.stub();
Expand Down Expand Up @@ -380,5 +449,54 @@ describe('bigquery:tables', function () {
gzip: false
}]);
});

describe('main:insert', function () {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change main:insert to just insert. This describe clause is already nested under the main describe clause. Having main:insert is like calling this test main main:insert

var options = {
dataset: dataset,
table: table,
rows: jsonArray
};

it('should accept valid JSON files', function () {
var program = getSample().program;
program.insertRowsAsStream = sinon.stub();

program.main(['insert', validJsonFile, dataset, table]);

assert.equal(program.insertRowsAsStream.calledOnce, true);
assert.deepEqual(program.insertRowsAsStream.firstCall.args.slice(0, -1), [options]);
});

it('should reject files with invalid JSON', function () {
var program = getSample().program;
program.insertRowsAsStream = sinon.stub();

assert.throws(
function () { program.main(['insert', invalidJsonFile, dataset, table]); },
/"json_or_file" \(or the file it points to\) is not a valid JSON array\./
);
assert.equal(program.insertRowsAsStream.called, false);
});

it('should accept valid JSON strings', function () {
var program = getSample().program;
program.insertRowsAsStream = sinon.stub();

program.main(['insert', validJsonString, dataset, table]);
assert.equal(program.insertRowsAsStream.calledOnce, true);
assert.deepEqual(program.insertRowsAsStream.firstCall.args.slice(0, -1), [options]);
});

it('should reject invalid JSON strings', function () {
var program = getSample().program;
program.insertRowsAsStream = sinon.stub();

assert.throws(
function () { program.main(['insert', invalidJsonString, dataset, table]); },
/"json_or_file" \(or the file it points to\) is not a valid JSON array\./
);
assert.equal(program.insertRowsAsStream.called, false);
});
});
});
});