Skip to content

Commit 38681df

Browse files
authored
perf(ext/http): optimize for zero or one-packet response streams (denoland#18834)
Improve `deno_reactdom_ssr_flash.jsx` by optimizing for zero/one-packet response streams.
1 parent 1b45001 commit 38681df

File tree

2 files changed

+141
-95
lines changed

2 files changed

+141
-95
lines changed

cli/tests/unit/serve_test.ts

Lines changed: 60 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -532,21 +532,43 @@ Deno.test(
532532
},
533533
);
534534

535-
Deno.test(
536-
{ permissions: { net: true } },
537-
async function httpServerStreamResponse() {
538-
const stream = new TransformStream();
539-
const writer = stream.writable.getWriter();
540-
writer.write(new TextEncoder().encode("hello "));
541-
writer.write(new TextEncoder().encode("world"));
542-
writer.close();
535+
function createStreamTest(count: number, delay: number, action: string) {
536+
function doAction(controller: ReadableStreamDefaultController, i: number) {
537+
if (i == count) {
538+
if (action == "Throw") {
539+
controller.error(new Error("Expected error!"));
540+
} else {
541+
controller.close();
542+
}
543+
} else {
544+
controller.enqueue(`a${i}`);
543545

544-
const listeningPromise = deferred();
546+
if (delay == 0) {
547+
doAction(controller, i + 1);
548+
} else {
549+
setTimeout(() => doAction(controller, i + 1), delay);
550+
}
551+
}
552+
}
553+
554+
function makeStream(count: number, delay: number): ReadableStream {
555+
return new ReadableStream({
556+
start(controller) {
557+
if (delay == 0) {
558+
doAction(controller, 0);
559+
} else {
560+
setTimeout(() => doAction(controller, 0), delay);
561+
}
562+
},
563+
}).pipeThrough(new TextEncoderStream());
564+
}
565+
566+
Deno.test(`httpServerStreamCount${count}Delay${delay}${action}`, async () => {
545567
const ac = new AbortController();
568+
const listeningPromise = deferred();
546569
const server = Deno.serve({
547-
handler: (request) => {
548-
assert(!request.body);
549-
return new Response(stream.readable);
570+
handler: async (request) => {
571+
return new Response(makeStream(count, delay));
550572
},
551573
port: 4501,
552574
signal: ac.signal,
@@ -556,12 +578,34 @@ Deno.test(
556578

557579
await listeningPromise;
558580
const resp = await fetch("http://127.0.0.1:4501/");
559-
const respBody = await resp.text();
560-
assertEquals("hello world", respBody);
581+
const text = await resp.text();
582+
561583
ac.abort();
562584
await server;
563-
},
564-
);
585+
let expected = "";
586+
if (action == "Throw" && count < 2 && delay < 1000) {
587+
// NOTE: This is specific to the current implementation. In some cases where a stream errors, we
588+
// don't send the first packet.
589+
expected = "";
590+
} else {
591+
for (let i = 0; i < count; i++) {
592+
expected += `a${i}`;
593+
}
594+
}
595+
596+
assertEquals(text, expected);
597+
});
598+
}
599+
600+
for (let count of [0, 1, 2, 3]) {
601+
for (let delay of [0, 1, 1000]) {
602+
// Creating a stream that errors in start will throw
603+
if (delay > 0) {
604+
createStreamTest(count, delay, "Throw");
605+
}
606+
createStreamTest(count, delay, "Close");
607+
}
608+
}
565609

566610
Deno.test(
567611
{ permissions: { net: true } },
@@ -1690,78 +1734,6 @@ createServerLengthTest("autoResponseWithUnknownLengthEmpty", {
16901734
expects_con_len: false,
16911735
});
16921736

1693-
Deno.test(
1694-
{ permissions: { net: true } },
1695-
async function httpServerGetChunkedResponseWithKa() {
1696-
const promises = [deferred(), deferred()];
1697-
let reqCount = 0;
1698-
const listeningPromise = deferred();
1699-
const ac = new AbortController();
1700-
1701-
const server = Deno.serve({
1702-
handler: async (request) => {
1703-
assertEquals(request.method, "GET");
1704-
promises[reqCount].resolve();
1705-
reqCount++;
1706-
return new Response(reqCount <= 1 ? stream("foo bar baz") : "zar quux");
1707-
},
1708-
port: 4503,
1709-
signal: ac.signal,
1710-
onListen: onListen(listeningPromise),
1711-
onError: createOnErrorCb(ac),
1712-
});
1713-
1714-
await listeningPromise;
1715-
const conn = await Deno.connect({ port: 4503 });
1716-
const encoder = new TextEncoder();
1717-
{
1718-
const body =
1719-
`GET / HTTP/1.1\r\nHost: example.domain\r\nConnection: keep-alive\r\n\r\n`;
1720-
const writeResult = await conn.write(encoder.encode(body));
1721-
assertEquals(body.length, writeResult);
1722-
await promises[0];
1723-
}
1724-
1725-
const decoder = new TextDecoder();
1726-
{
1727-
let msg = "";
1728-
while (true) {
1729-
try {
1730-
const buf = new Uint8Array(1024);
1731-
const readResult = await conn.read(buf);
1732-
assert(readResult);
1733-
msg += decoder.decode(buf.subarray(0, readResult));
1734-
assert(msg.endsWith("\r\nfoo bar baz\r\n0\r\n\r\n"));
1735-
break;
1736-
} catch {
1737-
continue;
1738-
}
1739-
}
1740-
}
1741-
1742-
// once more!
1743-
{
1744-
const body =
1745-
`GET /quux HTTP/1.1\r\nHost: example.domain\r\nConnection: close\r\n\r\n`;
1746-
const writeResult = await conn.write(encoder.encode(body));
1747-
assertEquals(body.length, writeResult);
1748-
await promises[1];
1749-
}
1750-
{
1751-
const buf = new Uint8Array(1024);
1752-
const readResult = await conn.read(buf);
1753-
assert(readResult);
1754-
const msg = decoder.decode(buf.subarray(0, readResult));
1755-
assert(msg.endsWith("zar quux"));
1756-
}
1757-
1758-
conn.close();
1759-
1760-
ac.abort();
1761-
await server;
1762-
},
1763-
);
1764-
17651737
Deno.test(
17661738
{ permissions: { net: true } },
17671739
async function httpServerPostWithContentLengthBody() {

ext/http/00_serve.js

Lines changed: 81 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import {
2828
import {
2929
Deferred,
3030
getReadableStreamResourceBacking,
31+
readableStreamClose,
3132
readableStreamForRid,
3233
ReadableStreamPrototype,
3334
} from "ext:deno_web/06_streams.js";
@@ -331,24 +332,97 @@ function fastSyncResponseOrStream(req, respBody) {
331332
}
332333

333334
async function asyncResponse(responseBodies, req, status, stream) {
334-
const responseRid = core.ops.op_set_response_body_stream(req);
335-
SetPrototypeAdd(responseBodies, responseRid);
336335
const reader = stream.getReader();
337-
core.ops.op_set_promise_complete(req, status);
336+
let responseRid;
337+
let closed = false;
338+
let timeout;
339+
338340
try {
341+
// IMPORTANT: We get a performance boost from this optimization, but V8 is very
342+
// sensitive to the order and structure. Benchmark any changes to this code.
343+
344+
// Optimize for streams that are done in zero or one packets. We will not
345+
// have to allocate a resource in this case.
346+
const { value: value1, done: done1 } = await reader.read();
347+
if (done1) {
348+
closed = true;
349+
// Exit 1: no response body at all, extreme fast path
350+
// Reader will be closed by finally block
351+
return;
352+
}
353+
354+
// The second value cannot block indefinitely, as someone may be waiting on a response
355+
// of the first packet that may influence this packet. We set this timeout arbitrarily to 250ms
356+
// and we race it.
357+
let timeoutPromise;
358+
timeout = setTimeout(() => {
359+
responseRid = core.ops.op_set_response_body_stream(req);
360+
SetPrototypeAdd(responseBodies, responseRid);
361+
core.ops.op_set_promise_complete(req, status);
362+
timeoutPromise = core.writeAll(responseRid, value1);
363+
}, 250);
364+
const { value: value2, done: done2 } = await reader.read();
365+
366+
if (timeoutPromise) {
367+
await timeoutPromise;
368+
if (done2) {
369+
closed = true;
370+
// Exit 2(a): read 2 is EOS, and timeout resolved.
371+
// Reader will be closed by finally block
372+
// Response stream will be closed by finally block.
373+
return;
374+
}
375+
376+
// Timeout resolved, value1 written but read2 is not EOS. Carry value2 forward.
377+
} else {
378+
clearTimeout(timeout);
379+
timeout = undefined;
380+
381+
if (done2) {
382+
// Exit 2(b): read 2 is EOS, and timeout did not resolve as we read fast enough.
383+
// Reader will be closed by finally block
384+
// No response stream
385+
closed = true;
386+
core.ops.op_set_response_body_bytes(req, value1);
387+
return;
388+
}
389+
390+
responseRid = core.ops.op_set_response_body_stream(req);
391+
SetPrototypeAdd(responseBodies, responseRid);
392+
core.ops.op_set_promise_complete(req, status);
393+
// Write our first packet
394+
await core.writeAll(responseRid, value1);
395+
}
396+
397+
await core.writeAll(responseRid, value2);
339398
while (true) {
340399
const { value, done } = await reader.read();
341400
if (done) {
401+
closed = true;
342402
break;
343403
}
344404
await core.writeAll(responseRid, value);
345405
}
346406
} catch (error) {
347-
await reader.cancel(error);
407+
closed = true;
408+
try {
409+
await reader.cancel(error);
410+
} catch {
411+
// Pass
412+
}
348413
} finally {
349-
core.tryClose(responseRid);
350-
SetPrototypeDelete(responseBodies, responseRid);
351-
reader.releaseLock();
414+
if (!closed) {
415+
readableStreamClose(reader);
416+
}
417+
if (timeout !== undefined) {
418+
clearTimeout(timeout);
419+
}
420+
if (responseRid) {
421+
core.tryClose(responseRid);
422+
SetPrototypeDelete(responseBodies, responseRid);
423+
} else {
424+
core.ops.op_set_promise_complete(req, status);
425+
}
352426
}
353427
}
354428

0 commit comments

Comments
 (0)