Skip to content

Commit 85f00ae

Browse files
datastore: get runs as a stream
1 parent f872342 commit 85f00ae

File tree

2 files changed

+85
-70
lines changed

2 files changed

+85
-70
lines changed

lib/datastore/request.js

+48-49
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,14 @@
2020

2121
'use strict';
2222

23-
var extend = require('extend');
2423
var request = require('request').defaults({
2524
pool: {
2625
maxSockets: Infinity
2726
}
2827
});
2928

29+
var through = require('through2');
30+
3031
/**
3132
* @type {module:datastore/entity}
3233
* @private
@@ -117,11 +118,7 @@ function DatastoreRequest() {}
117118
* //-
118119
* var key = dataset.key(['Company', 123]);
119120
*
120-
* transaction.get(key, function(err, entities) {
121-
* if (!err) {
122-
* var entity = entities[0];
123-
* }
124-
* });
121+
* transaction.get(key, function(err, entity) {});
125122
*
126123
* //-
127124
* // Get multiple entities at once.
@@ -132,33 +129,6 @@ function DatastoreRequest() {}
132129
* ], function(err, entities) {});
133130
*
134131
* //-
135-
* // To control how many API requests are made and page through the results
136-
* // manually, set `autoPaginate` to `false`.
137-
* //-
138-
* function onApiResponse(err, entities, nextQuery, apiResponse) {
139-
* if (err) {
140-
* console.error(err);
141-
* return;
142-
* }
143-
*
144-
* // `entities` is an array of results.
145-
*
146-
* if (nextQuery) {
147-
* transaction.get(nextQuery, onApiResponse);
148-
* }
149-
* }
150-
*
151-
* var options = {
152-
* keys: [
153-
* dataset.key(['Company', 123]),
154-
* // ...
155-
* ],
156-
* autoPaginate: false
157-
* };
158-
*
159-
* transaction.get(options, onApiResponse);
160-
*
161-
* //-
162132
* // Get the entities as a readable object stream.
163133
* //-
164134
* var keys = [
@@ -175,39 +145,68 @@ function DatastoreRequest() {}
175145
* // All entities retrieved.
176146
* });
177147
*/
178-
DatastoreRequest.prototype.get = function(options, callback) {
179-
if (options instanceof entity.Key || util.is(options, 'array')) {
180-
options = {
181-
keys: util.arrayize(options)
182-
};
148+
DatastoreRequest.prototype.get = function(keys, callback) {
149+
var self = this;
150+
151+
var isStreamMode = !callback;
152+
var stream;
153+
154+
if (isStreamMode) {
155+
stream = through.obj();
183156
}
184157

185-
if (!options.keys) {
158+
var isSingleLookup = keys instanceof entity.Key;
159+
keys = util.arrayize(keys).map(entity.keyToKeyProto);
160+
161+
if (keys.length === 0) {
186162
throw new Error('At least one Key object is required.');
187163
}
188164

189165
var request = {
190-
key: options.keys.map(entity.keyToKeyProto)
166+
key: keys
191167
};
192168

193-
this.makeReq_('lookup', request, function(err, resp) {
169+
var entities = [];
170+
this.makeReq_('lookup', request, onApiResponse);
171+
172+
function onApiResponse(err, resp) {
194173
if (err) {
195-
callback(err, null, null, resp);
174+
if (isStreamMode) {
175+
stream.emit('errror', err, resp);
176+
} else {
177+
callback(err, null, resp);
178+
}
196179
return;
197180
}
198181

199-
var entities = entity.formatArray(resp.found);
182+
var results = entity.formatArray(resp.found);
183+
184+
if (isStreamMode) {
185+
results.forEach(function(entity) {
186+
stream.push(entity);
187+
});
188+
} else {
189+
entities = entities.concat(results);
190+
}
200191

201-
var nextQuery = null;
202192
var nextKeys = (resp.deferred || []).map(entity.keyFromKeyProto);
203193

204194
if (nextKeys.length > 0) {
205-
nextQuery = extend(true, {}, options);
206-
nextQuery.keys = nextQuery.keys.concat(nextKeys);
195+
self.get(nextKeys, onApiResponse);
196+
return;
207197
}
208198

209-
callback(null, entities, nextQuery, resp);
210-
});
199+
if (isStreamMode) {
200+
stream.push(null);
201+
stream.end();
202+
} else {
203+
callback(null, isSingleLookup ? entities[0] : entities, resp);
204+
}
205+
}
206+
207+
if (isStreamMode) {
208+
return stream;
209+
}
211210
};
212211

213212
/**
@@ -819,6 +818,6 @@ DatastoreRequest.prototype.makeReq_ = function(method, body, callback) {
819818
* These methods can be used with either a callback or as a readable object
820819
* stream. `streamRouter` is used to add this dual behavior.
821820
*/
822-
streamRouter.extend(DatastoreRequest, ['get', 'runQuery']);
821+
streamRouter.extend(DatastoreRequest, 'runQuery');
823822

824823
module.exports = DatastoreRequest;

system-test/datastore.js

+37-21
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,9 @@ describe('datastore', function() {
5252
ds.save({ key: postKey, data: post }, function(err) {
5353
assert.ifError(err);
5454

55-
ds.get(postKey, function(err, entities) {
55+
ds.get(postKey, function(err, entity) {
5656
assert.ifError(err);
5757

58-
var entity = entities[0];
5958
assert.deepEqual(entity.data, post);
6059

6160
ds.delete(postKey, done);
@@ -69,10 +68,9 @@ describe('datastore', function() {
6968
ds.save({ key: postKey, data: post }, function(err) {
7069
assert.ifError(err);
7170

72-
ds.get(postKey, function(err, entities) {
71+
ds.get(postKey, function(err, entity) {
7372
assert.ifError(err);
7473

75-
var entity = entities[0];
7674
assert.deepEqual(entity.data, post);
7775

7876
ds.delete(postKey, done);
@@ -92,10 +90,9 @@ describe('datastore', function() {
9290
var assignedId = postKey.path[1];
9391
assert(assignedId);
9492

95-
ds.get(postKey, function(err, entities) {
93+
ds.get(postKey, function(err, entity) {
9694
assert.ifError(err);
9795

98-
var entity = entities[0];
9996
assert.deepEqual(entity.data, data);
10097

10198
ds.delete(ds.key(['Post', assignedId]), done);
@@ -112,10 +109,9 @@ describe('datastore', function() {
112109
// The key's path should now be complete.
113110
assert(postKey.path[1]);
114111

115-
ds.get(postKey, function(err, entities) {
112+
ds.get(postKey, function(err, entity) {
116113
assert.ifError(err);
117114

118-
var entity = entities[0];
119115
assert.deepEqual(entity.data, post);
120116

121117
ds.delete(postKey, done);
@@ -135,10 +131,9 @@ describe('datastore', function() {
135131
ds.save({ key: postKey, method: 'insert', data: post }, function(err) {
136132
assert.notEqual(err, null); // should fail insert
137133

138-
ds.get(postKey, function(err, entities) {
134+
ds.get(postKey, function(err, entity) {
139135
assert.ifError(err);
140136

141-
var entity = entities[0];
142137
assert.deepEqual(entity.data, post);
143138

144139
ds.delete(postKey, done);
@@ -188,6 +183,33 @@ describe('datastore', function() {
188183
});
189184
});
190185

186+
it('should get multiple entities in a stream', function(done) {
187+
var key1 = ds.key('Post');
188+
var key2 = ds.key('Post');
189+
190+
ds.save([
191+
{ key: key1, data: post },
192+
{ key: key2, data: post }
193+
], function(err) {
194+
assert.ifError(err);
195+
196+
var firstKey = ds.key(['Post', key1.path[1]]);
197+
var secondKey = ds.key(['Post', key2.path[1]]);
198+
199+
var numEntitiesEmitted = 0;
200+
201+
ds.get([firstKey, secondKey])
202+
.on('error', done)
203+
.on('data', function() {
204+
numEntitiesEmitted++;
205+
})
206+
.on('end', function() {
207+
assert.strictEqual(numEntitiesEmitted, 2);
208+
209+
ds.delete([firstKey, secondKey], done);
210+
});
211+
});
212+
});
191213
});
192214

193215
it('should save keys as a part of entity and query by key', function(done) {
@@ -510,10 +532,9 @@ describe('datastore', function() {
510532
}, function(err) {
511533
assert.ifError(err);
512534

513-
ds.get(key, function(err, entities) {
535+
ds.get(key, function(err, entity) {
514536
assert.ifError(err);
515537

516-
var entity = entities[0];
517538
assert.deepEqual(entity.data, obj);
518539

519540
ds.delete(key, done);
@@ -550,22 +571,17 @@ describe('datastore', function() {
550571
async.parallel([
551572
// The key queued for deletion should have been deleted.
552573
function(done) {
553-
ds.get(deleteKey, function(err, entities) {
574+
ds.get(deleteKey, function(err, entity) {
554575
assert.ifError(err);
555-
556-
var entity = entities[0];
557576
assert.equal(typeof entity, 'undefined');
558-
559577
done();
560578
});
561579
},
562580

563581
// Data should have been updated on the key.
564582
function(done) {
565-
ds.get(key, function(err, entities) {
583+
ds.get(key, function(err, entity) {
566584
assert.ifError(err);
567-
568-
var entity = entities[0];
569585
assert.equal(entity.data.rating, 10);
570586
done();
571587
});
@@ -597,9 +613,9 @@ describe('datastore', function() {
597613
assert.ifError(err);
598614

599615
// Should not return a result.
600-
ds.get(key, function(err, entities) {
616+
ds.get(key, function(err, entity) {
601617
assert.ifError(err);
602-
assert.strictEqual(entities.length, 0);
618+
assert.strictEqual(entity, undefined);
603619

604620
// Incomplete key should have been given an id.
605621
assert.equal(incompleteKey.path.length, 2);

0 commit comments

Comments
 (0)