Skip to content

Commit 74f923c

Browse files
datastore: runQuery as a stream. fixes #289
1 parent b494f59 commit 74f923c

File tree

3 files changed

+124
-26
lines changed

3 files changed

+124
-26
lines changed

lib/datastore/request.js

+47-12
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
'use strict';
2222

2323
var https = require('https');
24+
var streamEvents = require('stream-events');
25+
var through = require('through2');
2426

2527
/**
2628
* @type module:datastore/entity
@@ -315,7 +317,8 @@ DatastoreRequest.prototype.delete = function(keys, callback) {
315317
* });
316318
*/
317319
DatastoreRequest.prototype.runQuery = function(q, callback) {
318-
callback = callback || util.noop;
320+
var that = this;
321+
var stream;
319322

320323
var req = {
321324
read_options: {},
@@ -328,19 +331,51 @@ DatastoreRequest.prototype.runQuery = function(q, callback) {
328331
};
329332
}
330333

331-
this.makeReq_('runQuery', req, function(err, resp) {
332-
if (err || !resp.batch || !resp.batch.entity_result) {
333-
callback(err);
334-
return;
335-
}
334+
if (!util.is(callback, 'function')) {
335+
stream = streamEvents(through.obj());
336+
stream.once('reading', runQuery);
337+
return stream;
338+
} else {
339+
callback = callback || util.noop;
340+
runQuery();
341+
}
336342

337-
var cursor = '';
338-
if (resp.batch.end_cursor) {
339-
cursor = resp.batch.end_cursor.toBase64();
340-
}
343+
function runQuery() {
344+
that.makeReq_('runQuery', req, function(err, resp) {
345+
if (err) {
346+
if (stream) {
347+
stream.emit('error', err);
348+
stream.end();
349+
} else {
350+
callback(err);
351+
}
352+
return;
353+
}
341354

342-
callback(null, entity.formatArray(resp.batch.entity_result), cursor);
343-
});
355+
var entities = entity.formatArray(resp.batch.entity_result);
356+
357+
var cursor = '';
358+
if (resp.batch.end_cursor) {
359+
cursor = resp.batch.end_cursor.toBase64();
360+
}
361+
362+
if (stream) {
363+
if (cursor && entities.length > 0) {
364+
entities.forEach(function (entity) {
365+
stream.push(entity);
366+
});
367+
368+
req.query = entity.queryToQueryProto(q.start(cursor).offset(0));
369+
370+
runQuery();
371+
} else {
372+
stream.end();
373+
}
374+
} else {
375+
callback(null, entities, cursor);
376+
}
377+
});
378+
}
344379
};
345380

346381
/**

regression/datastore.js

+15
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,21 @@ describe('datastore', function() {
248248
});
249249
});
250250

251+
it('should run a query as a stream', function(done) {
252+
var q = ds.createQuery('Character').hasAncestor(ancestor)
253+
.limit(5);
254+
255+
var resultsReturned = 0;
256+
257+
ds.runQuery(q)
258+
.on('error', done)
259+
.on('data', function() { resultsReturned++; })
260+
.on('end', function() {
261+
assert.equal(resultsReturned, characters.length);
262+
done();
263+
});
264+
});
265+
251266
it('should filter queries with simple indexes', function(done) {
252267
var q = ds.createQuery('Character').hasAncestor(ancestor)
253268
.filter('appearances >=', 20);

test/datastore/request.js

+62-14
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ var https = require('https');
2727
var mockRespGet = require('../testdata/response_get.json');
2828
var pb = require('../../lib/datastore/pb.js');
2929
var Query = require('../../lib/datastore/query.js');
30+
var Stream = require('stream');
3031
var util = require('../../lib/common/util.js');
3132

3233
var httpsRequestOverride = util.noop;
@@ -278,8 +279,7 @@ describe('Request', function() {
278279
entity_result: mockRespGet.found,
279280
end_cursor: new ByteBuffer().writeIString('cursor').flip()
280281
}
281-
},
282-
withoutResults: mockRespGet
282+
}
283283
};
284284

285285
beforeEach(function() {
@@ -298,18 +298,6 @@ describe('Request', function() {
298298
assert.equal(err, error);
299299
});
300300
});
301-
302-
it('should handle missing results error', function() {
303-
request.makeReq_ = function(method, req, callback) {
304-
assert.equal(method, 'runQuery');
305-
callback(null, mockResponse.withoutResults);
306-
};
307-
308-
request.runQuery(query, function(err, entities) {
309-
assert.strictEqual(err, null);
310-
assert.strictEqual(entities, undefined);
311-
});
312-
});
313301
});
314302

315303
it('should execute callback with results', function() {
@@ -354,6 +342,66 @@ describe('Request', function() {
354342
done();
355343
});
356344
});
345+
346+
describe('streams', function() {
347+
it('should be a stream if a callback is omitted', function() {
348+
assert(request.runQuery(query) instanceof Stream);
349+
});
350+
351+
it('should run the query after being read from', function(done) {
352+
request.makeReq_ = function() {
353+
done();
354+
};
355+
356+
request.runQuery(query).emit('reading');
357+
});
358+
359+
it('should continuosly run until there are no results', function(done) {
360+
var run = 0;
361+
var timesToRun = 2;
362+
363+
request.makeReq_ = function(method, req, callback) {
364+
run++;
365+
366+
if (run < timesToRun) {
367+
callback(null, mockResponse.withResultsAndEndCursor);
368+
} else {
369+
var lastEndCursor =
370+
mockResponse.withResultsAndEndCursor.batch.end_cursor.toBase64();
371+
lastEndCursor = new Buffer(lastEndCursor, 'base64').toString();
372+
373+
assert.equal(String(req.query.start_cursor), lastEndCursor);
374+
assert.strictEqual(req.query.offset, undefined);
375+
376+
callback(null, mockResponse.withResults);
377+
}
378+
};
379+
380+
var resultsReturned = 0;
381+
382+
request.runQuery(query)
383+
.on('data', function() { resultsReturned++; })
384+
.on('end', function() {
385+
assert.equal(resultsReturned, mockRespGet.found.length);
386+
done();
387+
});
388+
});
389+
390+
it('should emit an error', function(done) {
391+
var error = new Error('Error.');
392+
393+
request.makeReq_ = function(method, req, callback) {
394+
callback(error);
395+
};
396+
397+
request.runQuery(query)
398+
.on('error', function(err) {
399+
assert.equal(err, error);
400+
done();
401+
})
402+
.emit('reading');
403+
});
404+
});
357405
});
358406

359407
describe('allocateIds', function() {

0 commit comments

Comments
 (0)