Skip to content

Commit 28ee925

Browse files
bigtable/table/createReadStream: allow early ending
1 parent 0362106 commit 28ee925

File tree

4 files changed

+61
-41
lines changed

4 files changed

+61
-41
lines changed

packages/bigtable/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
"extend": "^3.0.0",
6060
"google-proto-files": "^0.11.0",
6161
"is": "^3.0.1",
62+
"is-stream-ended": "^0.1.2",
6263
"lodash.flatten": "^4.2.0",
6364
"node-int64": "^0.4.0",
6465
"prop-assign": "^1.0.0",

packages/bigtable/src/table.js

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ var commonGrpc = require('@google-cloud/common-grpc');
2626
var concat = require('concat-stream');
2727
var flatten = require('lodash.flatten');
2828
var is = require('is');
29+
var isStreamEnded = require('is-stream-ended');
2930
var propAssign = require('prop-assign');
3031
var pumpify = require('pumpify');
3132
var through = require('through2');
@@ -494,24 +495,27 @@ Table.prototype.createReadStream = function(options) {
494495
reqOpts.rowsLimit = options.limit;
495496
}
496497

497-
return pumpify.obj([
498+
var stream = pumpify.obj([
498499
this.requestStream(grpcOpts, reqOpts),
500+
499501
through.obj(function(data, enc, next) {
500-
var throughStream = this;
501-
var rows = Row.formatChunks_(data.chunks, {
502-
decode: options.decode
503-
});
502+
var transformStream = this;
504503

505-
rows.forEach(function(rowData) {
506-
var row = self.row(rowData.key);
504+
Row.formatChunks_(data.chunks, { decode: options.decode })
505+
.forEach(function(rowData) {
506+
var row = self.row(rowData.key);
507+
row.data = rowData.data;
507508

508-
row.data = rowData.data;
509-
throughStream.push(row);
510-
});
509+
if (!isStreamEnded(stream)) {
510+
transformStream.push(row);
511+
}
512+
});
511513

512514
next();
513515
})
514516
]);
517+
518+
return stream;
515519
};
516520

517521
/**

packages/bigtable/system-test/bigtable.js

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -379,37 +379,36 @@ describe('Bigtable', function() {
379379
});
380380

381381
describe('rows', function() {
382+
var ROWS = [{
383+
key: 'gwashington',
384+
data: {
385+
follows: {
386+
jadams: 1
387+
}
388+
}
389+
}, {
390+
key: 'tjefferson',
391+
data: {
392+
follows: {
393+
gwashington: 1,
394+
jadams: 1
395+
}
396+
}
397+
}, {
398+
key: 'jadams',
399+
data: {
400+
follows: {
401+
gwashington: 1,
402+
tjefferson: 1
403+
}
404+
}
405+
}];
382406

383-
describe('inserting data', function() {
384-
385-
it('should insert rows', function(done) {
386-
var rows = [{
387-
key: 'gwashington',
388-
data: {
389-
follows: {
390-
jadams: 1
391-
}
392-
}
393-
}, {
394-
key: 'tjefferson',
395-
data: {
396-
follows: {
397-
gwashington: 1,
398-
jadams: 1
399-
}
400-
}
401-
}, {
402-
key: 'jadams',
403-
data: {
404-
follows: {
405-
gwashington: 1,
406-
tjefferson: 1
407-
}
408-
}
409-
}];
407+
before(function(done) {
408+
TABLE.insert(ROWS, done);
409+
});
410410

411-
TABLE.insert(rows, done);
412-
});
411+
describe('inserting data', function() {
413412

414413
it('should create an individual row', function(done) {
415414
var row = TABLE.row('alincoln');
@@ -510,7 +509,7 @@ describe('Bigtable', function() {
510509
it('should get rows', function(done) {
511510
TABLE.getRows(function(err, rows) {
512511
assert.ifError(err);
513-
assert.strictEqual(rows.length, 4);
512+
assert(rows.length >= ROWS.length);
514513
assert(rows[0] instanceof Row);
515514
done();
516515
});
@@ -524,9 +523,10 @@ describe('Bigtable', function() {
524523
.on('data', function(row) {
525524
assert(row instanceof Row);
526525
rows.push(row);
526+
this.end();
527527
})
528528
.on('end', function() {
529-
assert.strictEqual(rows.length, 4);
529+
assert.strictEqual(rows.length, 1);
530530
done();
531531
});
532532
});

packages/bigtable/test/table.js

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,21 @@ describe('Bigtable/Table', function() {
636636
done();
637637
});
638638
});
639+
640+
it('should allow a stream to end early', function(done) {
641+
var rows = [];
642+
643+
table.createReadStream()
644+
.on('error', done)
645+
.on('data', function(row) {
646+
rows.push(row);
647+
this.end();
648+
})
649+
.on('end', function() {
650+
assert.strictEqual(rows.length, 1);
651+
done();
652+
});
653+
});
639654
});
640655

641656
describe('error', function() {

0 commit comments

Comments
 (0)