Skip to content

Commit 31d2b72

Browse files
authored
Reimplement stream cancellation (#52281)
### What? This reimplements our stream cancellation code for a few more cases: 1. Adds support in all stream-returning APIs 2. Fixes cancellation detection in node 16 3. Implements out-of-band detection, so can cancel in the middle of a read It also (finally) adds tests for all the cases I'm aware of. ### Why? To allow disconnecting from an AI service when a client disconnects. $$$ ### How? 1. Reuses a single pipe function in all paths to push data from the dev's `ReadableStream` into our `ServerResponse` 2. Uses `ServerResponse` to detect disconnect, instead of the `IncomingMessage` (request) - The `close` event fire once all incoming body data is read - The request `abort` event will not fire after the incoming body data has been fully read 3. Using `on('close')` on the writable destination allows us to detect close - Checking for `res.destroyed` in the body of the loop meant we had to wait for the `await stream.read()` to complete before we could possibly cancel the stream - - - #52157 (and #51594) had an issue with Node 16, because I was using `res.closed` to detect when the server response was closed by the client disconnecting. But, `closed` wasn't [added](nodejs/node#45672) until [v18.13.0](https://nodejs.org/en/blog/release/v18.13.0#:~:text=%5Bcbd710bbf4%5D%20%2D%20http%3A%20make%20OutgoingMessage%20more%20streamlike%20(Robert%20Nagy)%20%2345672). This fixes it by using `res.destroyed`. Reverts #52277 Relands #52157 Fixes #52809 ---------
1 parent 39fd917 commit 31d2b72

File tree

29 files changed

+667
-176
lines changed

29 files changed

+667
-176
lines changed

packages/next-swc/crates/next-core/js/src/entry/app/edge-page-bootstrap.ts

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,27 @@ async function render(request: NextRequest, event: NextFetchEvent) {
8080
response.headers.append('Vary', RSC_VARY_HEADER)
8181

8282
const writer = tranform.writable.getWriter()
83-
result.pipe({
83+
84+
let innerClose: undefined | (() => void)
85+
const target = {
8486
write: (chunk: Uint8Array) => writer.write(chunk),
8587
end: () => writer.close(),
86-
destroy: (reason?: Error) => writer.abort(reason),
87-
})
88+
89+
on(_event: 'close', cb: () => void) {
90+
innerClose = cb
91+
},
92+
off(_event: 'close', _cb: () => void) {
93+
innerClose = undefined
94+
},
95+
}
96+
const onClose = () => {
97+
innerClose?.()
98+
}
99+
// No, this cannot be replaced with `finally`, because early cancelling
100+
// the stream will create a rejected promise, and finally will create an
101+
// unhandled rejection.
102+
writer.closed.then(onClose, onClose)
103+
result.pipe(target)
88104

89105
return response
90106
}

packages/next-swc/crates/next-core/js/src/internal/nodejs-proxy-handler.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ import {
1111
NodeNextResponse,
1212
} from 'next/dist/server/base-http/node'
1313
import { sendResponse } from 'next/dist/server/send-response'
14-
import { NextRequestAdapter } from 'next/dist/server/web/spec-extension/adapters/next-request'
14+
import {
15+
NextRequestAdapter,
16+
signalFromNodeResponse,
17+
} from 'next/dist/server/web/spec-extension/adapters/next-request'
1518
import { RouteHandlerManagerContext } from 'next/dist/server/future/route-handler-managers/route-handler-manager'
1619

1720
import { attachRequestMeta } from './next-request-helpers'
@@ -43,7 +46,10 @@ export default (routeModule: RouteModule) => {
4346
}
4447

4548
const routeResponse = await routeModule.handle(
46-
NextRequestAdapter.fromNodeNextRequest(req),
49+
NextRequestAdapter.fromNodeNextRequest(
50+
req,
51+
signalFromNodeResponse(response)
52+
),
4753
context
4854
)
4955

packages/next/src/export/worker.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,10 @@ import { NodeNextRequest } from '../server/base-http/node'
4545
import { isAppRouteRoute } from '../lib/is-app-route-route'
4646
import { toNodeOutgoingHttpHeaders } from '../server/web/utils'
4747
import { RouteModuleLoader } from '../server/future/helpers/module-loader/route-module-loader'
48-
import { NextRequestAdapter } from '../server/web/spec-extension/adapters/next-request'
48+
import {
49+
NextRequestAdapter,
50+
signalFromNodeResponse,
51+
} from '../server/web/spec-extension/adapters/next-request'
4952
import * as ciEnvironment from '../telemetry/ci-info'
5053

5154
const envConfig = require('../shared/lib/runtime-config')
@@ -388,7 +391,8 @@ export default async function exportPage({
388391
// Ensure that the url for the page is absolute.
389392
req.url = `http://localhost:3000${req.url}`
390393
const request = NextRequestAdapter.fromNodeNextRequest(
391-
new NodeNextRequest(req)
394+
new NodeNextRequest(req),
395+
signalFromNodeResponse(res)
392396
)
393397

394398
// Create the context for the handler. This contains the params from

packages/next/src/server/base-server.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ import {
116116
type RouteMatch,
117117
} from './future/route-matches/route-match'
118118
import { normalizeLocalePath } from '../shared/lib/i18n/normalize-locale-path'
119+
import { signalFromNodeResponse } from './web/spec-extension/adapters/next-request'
119120

120121
export type FindComponentsResult = {
121122
components: LoadComponentsReturnType
@@ -1837,7 +1838,12 @@ export default abstract class Server<ServerOptions extends Options = Options> {
18371838

18381839
try {
18391840
// Handle the match and collect the response if it's a static response.
1840-
const response = await this.handlers.handle(match, req, context)
1841+
const response = await this.handlers.handle(
1842+
match,
1843+
req,
1844+
context,
1845+
signalFromNodeResponse((res as NodeNextResponse).originalResponse)
1846+
)
18411847

18421848
;(req as any).fetchMetrics = (
18431849
context.staticGenerationContext as any

packages/next/src/server/future/route-handler-managers/route-handler-manager.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ export class RouteHandlerManager {
2424
public async handle(
2525
match: AppRouteRouteMatch,
2626
req: BaseNextRequest,
27-
context: RouteHandlerManagerContext
27+
context: RouteHandlerManagerContext,
28+
signal: AbortSignal
2829
): Promise<Response> {
2930
// The module supports minimal mode, load the minimal module.
3031
const module = await RouteModuleLoader.load<RouteModule>(
@@ -33,7 +34,7 @@ export class RouteHandlerManager {
3334
)
3435

3536
// Convert the BaseNextRequest to a NextRequest.
36-
const request = NextRequestAdapter.fromBaseNextRequest(req)
37+
const request = NextRequestAdapter.fromBaseNextRequest(req, signal)
3738

3839
// Get the response from the handler and send it back.
3940
return await module.handle(request, context)

packages/next/src/server/lib/route-resolver.ts

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ import { proxyRequest } from './router-utils/proxy-request'
1717
import { getResolveRoutes } from './router-utils/resolve-routes'
1818
import { PERMANENT_REDIRECT_STATUS } from '../../shared/lib/constants'
1919
import { splitCookiesString, toNodeOutgoingHttpHeaders } from '../web/utils'
20-
import { signalFromNodeRequest } from '../web/spec-extension/adapters/next-request'
20+
import { signalFromNodeResponse } from '../web/spec-extension/adapters/next-request'
2121
import { getMiddlewareRouteMatcher } from '../../shared/lib/router/utils/middleware-route-matcher'
22-
import { pipeReadable } from './server-ipc/invoke-request'
22+
import { pipeReadable } from '../pipe-readable'
2323

2424
type RouteResult =
2525
| {
@@ -132,7 +132,7 @@ export async function makeResolver(
132132
serverAddr.port || 3000
133133
}${req.url}`,
134134
body: cloneableBody,
135-
signal: signalFromNodeRequest(req),
135+
signal: signalFromNodeResponse(res),
136136
},
137137
useCache: true,
138138
onWarning: console.warn,
@@ -160,11 +160,11 @@ export async function makeResolver(
160160
}
161161
res.statusCode = result.response.status
162162

163-
for await (const chunk of result.response.body || ([] as any)) {
164-
if (res.closed) break
165-
res.write(chunk)
163+
if (result.response.body) {
164+
await pipeReadable(result.response.body, res)
165+
} else {
166+
res.end()
166167
}
167-
res.end()
168168
} catch (err) {
169169
console.error(err)
170170
res.statusCode = 500
@@ -218,7 +218,12 @@ export async function makeResolver(
218218
req: IncomingMessage,
219219
res: ServerResponse
220220
): Promise<RouteResult | void> {
221-
const routeResult = await resolveRoutes(req, new Set(), false)
221+
const routeResult = await resolveRoutes(
222+
req,
223+
new Set(),
224+
false,
225+
signalFromNodeResponse(res)
226+
)
222227
const {
223228
matchedOutput,
224229
bodyStream,

packages/next/src/server/lib/router-server.ts

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ import { filterReqHeaders } from './server-ipc/utils'
1515
import { findPagesDir } from '../../lib/find-pages-dir'
1616
import { setupFsCheck } from './router-utils/filesystem'
1717
import { proxyRequest } from './router-utils/proxy-request'
18-
import { invokeRequest, pipeReadable } from './server-ipc/invoke-request'
18+
import { invokeRequest } from './server-ipc/invoke-request'
19+
import { isAbortError, pipeReadable } from '../pipe-readable'
1920
import { createRequestResponseMocks } from './mock-request'
2021
import { createIpcServer, createWorker } from './server-ipc'
2122
import { UnwrapPromise } from '../../lib/coalesced-function'
@@ -29,6 +30,7 @@ import {
2930
PHASE_DEVELOPMENT_SERVER,
3031
PERMANENT_REDIRECT_STATUS,
3132
} from '../../shared/lib/constants'
33+
import { signalFromNodeResponse } from '../web/spec-extension/adapters/next-request'
3234

3335
let initializeResult:
3436
| undefined
@@ -331,14 +333,26 @@ export async function initialize(opts: {
331333

332334
debug('invokeRender', renderUrl, invokeHeaders)
333335

334-
const invokeRes = await invokeRequest(
335-
renderUrl,
336-
{
337-
headers: invokeHeaders,
338-
method: req.method,
339-
},
340-
getRequestMeta(req, '__NEXT_CLONABLE_BODY')?.cloneBodyStream()
341-
)
336+
let invokeRes
337+
try {
338+
invokeRes = await invokeRequest(
339+
renderUrl,
340+
{
341+
headers: invokeHeaders,
342+
method: req.method,
343+
signal: signalFromNodeResponse(res),
344+
},
345+
getRequestMeta(req, '__NEXT_CLONABLE_BODY')?.cloneBodyStream()
346+
)
347+
} catch (e) {
348+
// If the client aborts before we can receive a response object (when
349+
// the headers are flushed), then we can early exit without further
350+
// processing.
351+
if (isAbortError(e)) {
352+
return
353+
}
354+
throw e
355+
}
342356

343357
debug('invokeRender res', invokeRes.status, invokeRes.headers)
344358

@@ -419,7 +433,12 @@ export async function initialize(opts: {
419433
resHeaders,
420434
bodyStream,
421435
matchedOutput,
422-
} = await resolveRoutes(req, matchedDynamicRoutes, false)
436+
} = await resolveRoutes(
437+
req,
438+
matchedDynamicRoutes,
439+
false,
440+
signalFromNodeResponse(res)
441+
)
423442

424443
if (devInstance && matchedOutput?.type === 'devVirtualFsItem') {
425444
const origUrl = req.url || '/'
@@ -687,7 +706,8 @@ export async function initialize(opts: {
687706
const { matchedOutput, parsedUrl } = await resolveRoutes(
688707
req,
689708
new Set(),
690-
true
709+
true,
710+
signalFromNodeResponse(socket)
691711
)
692712

693713
// TODO: allow upgrade requests to pages/app paths?

packages/next/src/server/lib/router-utils/proxy-request.ts

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,24 @@ export async function proxyRequest(
3434
await new Promise((proxyResolve, proxyReject) => {
3535
let finished = false
3636

37+
// http-proxy does not properly detect a client disconnect in newer
38+
// versions of Node.js. This is caused because it only listens for the
39+
// `aborted` event on the our request object, but it also fully reads
40+
// and closes the request object. Node **will not** fire `aborted` when
41+
// the request is already closed. Listening for `close` on our response
42+
// object will detect the disconnect, and we can abort the proxy's
43+
// connection.
44+
proxy.on('proxyReq', (proxyReq) => {
45+
res.on('close', () => proxyReq.destroy())
46+
})
47+
proxy.on('proxyRes', (proxyRes) => {
48+
if (res.destroyed) {
49+
proxyRes.destroy()
50+
} else {
51+
res.on('close', () => proxyRes.destroy())
52+
}
53+
})
54+
3755
proxy.on('proxyRes', (proxyRes, innerReq, innerRes) => {
3856
const cleanup = (err: any) => {
3957
// cleanup event listeners to allow clean garbage collection
@@ -59,7 +77,7 @@ export async function proxyRequest(
5977
finished = true
6078
proxyReject(err)
6179

62-
if (!res.closed) {
80+
if (!res.destroyed) {
6381
res.statusCode = 500
6482
res.end('Internal Server Error')
6583
}

packages/next/src/server/lib/router-utils/resolve-routes.ts

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { Header } from '../../../lib/load-custom-routes'
1313
import { stringifyQuery } from '../../server-route-utils'
1414
import { toNodeOutgoingHttpHeaders } from '../../web/utils'
1515
import { invokeRequest } from '../server-ipc/invoke-request'
16+
import { isAbortError } from '../../pipe-readable'
1617
import { getCookieParser, setLazyProp } from '../../api-utils'
1718
import { getHostname } from '../../../shared/lib/get-hostname'
1819
import { UnwrapPromise } from '../../../lib/coalesced-function'
@@ -93,7 +94,8 @@ export function getResolveRoutes(
9394
async function resolveRoutes(
9495
req: IncomingMessage,
9596
matchedDynamicRoutes: Set<string>,
96-
isUpgradeReq?: boolean
97+
isUpgradeReq: boolean,
98+
signal: AbortSignal
9799
): Promise<{
98100
finished: boolean
99101
statusCode?: number
@@ -453,14 +455,31 @@ export function getResolveRoutes(
453455

454456
debug('invoking middleware', renderUrl, invokeHeaders)
455457

456-
const middlewareRes = await invokeRequest(
457-
renderUrl,
458-
{
459-
headers: invokeHeaders,
460-
method: req.method,
461-
},
462-
getRequestMeta(req, '__NEXT_CLONABLE_BODY')?.cloneBodyStream()
463-
)
458+
let middlewareRes
459+
try {
460+
middlewareRes = await invokeRequest(
461+
renderUrl,
462+
{
463+
headers: invokeHeaders,
464+
method: req.method,
465+
signal,
466+
},
467+
getRequestMeta(req, '__NEXT_CLONABLE_BODY')?.cloneBodyStream()
468+
)
469+
} catch (e) {
470+
// If the client aborts before we can receive a response object
471+
// (when the headers are flushed), then we can early exit without
472+
// further processing.
473+
if (isAbortError(e)) {
474+
return {
475+
parsedUrl,
476+
resHeaders,
477+
finished: true,
478+
}
479+
}
480+
throw e
481+
}
482+
464483
const middlewareHeaders = toNodeOutgoingHttpHeaders(
465484
middlewareRes.headers
466485
) as Record<string, string | string[] | undefined>
Lines changed: 4 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
import '../../node-polyfill-fetch'
22

33
import type { IncomingMessage } from 'http'
4-
import type { Writable, Readable } from 'stream'
4+
import type { Readable } from 'stream'
55
import { filterReqHeaders } from './utils'
66

77
export const invokeRequest = async (
88
targetUrl: string,
99
requestInit: {
1010
headers: IncomingMessage['headers']
1111
method: IncomingMessage['method']
12+
signal?: AbortSignal
1213
},
1314
readableBody?: Readable | ReadableStream
1415
) => {
@@ -22,10 +23,11 @@ export const invokeRequest = async (
2223
...requestInit.headers,
2324
}) as IncomingMessage['headers']
2425

25-
const invokeRes = await fetch(parsedTargetUrl.toString(), {
26+
return await fetch(parsedTargetUrl.toString(), {
2627
headers: invokeHeaders as any as Headers,
2728
method: requestInit.method,
2829
redirect: 'manual',
30+
signal: requestInit.signal,
2931

3032
...(requestInit.method !== 'GET' &&
3133
requestInit.method !== 'HEAD' &&
@@ -41,31 +43,4 @@ export const invokeRequest = async (
4143
internal: true,
4244
},
4345
})
44-
45-
return invokeRes
46-
}
47-
48-
export async function pipeReadable(
49-
readable: ReadableStream,
50-
writable: Writable
51-
) {
52-
const reader = readable.getReader()
53-
54-
async function doRead() {
55-
const item = await reader.read()
56-
57-
if (item?.value) {
58-
writable.write(Buffer.from(item?.value))
59-
60-
if ('flush' in writable) {
61-
;(writable as any).flush()
62-
}
63-
}
64-
65-
if (!item?.done) {
66-
return doRead()
67-
}
68-
}
69-
await doRead()
70-
writable.end()
7146
}

0 commit comments

Comments
 (0)