Skip to content

handling Objects for streaming bodies #1405

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion lib/application.js
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,34 @@ function respond(ctx) {
// responses
if (Buffer.isBuffer(body)) return res.end(body);
if ('string' == typeof body) return res.end(body);
if (body instanceof Stream) return body.pipe(res);
if (body instanceof Stream) {
// check if it's an objectMode stream
// readableObjectMode is available since Node 12.3
if (body.readableObjectMode ||
(body._readableState &&
body._readableState.objectMode)) {
let first = true;
res.write('[');
body = body
.pipe(new Stream.Transform({
writableObjectMode: true,
readableObjectMode: false,
transform(data, encoding, callback) {
if (!first) this.push(',');
else first = false;

this.push(JSON.stringify(data));
return callback();
},
flush(callback) {
this.push(']');
this.push(null);
return callback();
}
}));
}
return body.pipe(res);
}

// body: json
body = JSON.stringify(body);
Expand Down
11 changes: 9 additions & 2 deletions lib/response.js
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,21 @@ module.exports = {
}

// stream
if ('function' == typeof val.pipe) {
if (val instanceof Stream) {
onFinish(this.res, destroy.bind(null, val));
ensureErrorHandler(val, err => this.ctx.onerror(err));

// overwriting
if (null != original && original != val) this.remove('Content-Length');

if (setType) this.type = 'bin';
if (setType) {
// check if it's an objectMode stream
// readableObjectMode is available since Node 12.3
this.type = (val.readableObjectMode ||
(val._readableState &&
val._readableState.objectMode))
? 'json' : 'bin';
}
return;
}

Expand Down
41 changes: 41 additions & 0 deletions test/application/respond.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const statuses = require('statuses');
const assert = require('assert');
const Koa = require('../..');
const fs = require('fs');
const { PassThrough } = require('stream');

describe('app.respond', () => {
describe('when ctx.respond === false', () => {
Expand Down Expand Up @@ -618,6 +619,46 @@ describe('app.respond', () => {
assert.deepEqual(res.body, pkg);
});

it('should serve object mode streams as JSON', async() => {
const app = new Koa();

app.use(ctx => {
const stream = new PassThrough({ objectMode: true, autoDestroy: false });
ctx.body = stream;
setImmediate(() => {
stream.write({ foo: 1 });
stream.write({ boo: [true] });
stream.end({ finish: true });
});
});

const server = app.listen();

const res = await request(server)
.get('/')
.expect('Content-Type', 'application/json; charset=utf-8')
.expect('[{"foo":1},{"boo":[true]},{"finish":true}]');
assert.strictEqual(res.headers.hasOwnProperty('content-length'), false);
});

it('empty object mode stream should result in empty array', async() => {
const app = new Koa();

app.use(ctx => {
const stream = new PassThrough({ objectMode: true, autoDestroy: false });
ctx.body = stream;
setImmediate(() => stream.end());
});

const server = app.listen();

const res = await request(server)
.get('/')
.expect('Content-Type', 'application/json; charset=utf-8')
.expect('[]');
assert.strictEqual(res.headers.hasOwnProperty('content-length'), false);
});

it('should handle errors', done => {
const app = new Koa();

Expand Down
8 changes: 8 additions & 0 deletions test/response/body.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
const response = require('../helpers/context').response;
const assert = require('assert');
const fs = require('fs');
const { PassThrough } = require('stream');

describe('res.body=', () => {
describe('when Content-Type is set', () => {
Expand Down Expand Up @@ -108,6 +109,13 @@ describe('res.body=', () => {
res.body = fs.createReadStream('LICENSE');
assert.equal('application/octet-stream', res.header['content-type']);
});

it('object mode stream', () => {
const res = response();
const stream = new PassThrough({ objectMode: true });
res.body = stream;
assert.equal('application/json; charset=utf-8', res.header['content-type']);
});
});

describe('when a buffer is given', () => {
Expand Down