Skip to content

Commit 0287e10

Browse files
authored
fix(edge): run after() if request is cancelled mid-streaming (#76013)
### What - Fixes an issue where `after()` in an edge page would not run if the request was cancelled/aborted - Unskips the `runs callbacks if redirect() was called` test which was disabled to not block other things ### Background This was initially hit in #75882, during which the `runs callbacks if redirect() was called` test started failing when using experimental react + turbo in dev mode. Turns out that this happenws because something got slower and we weren't finishing the redirect response in time, i.e. before the browser disconnected and started loading the page it got redirected to. It's relevant that the response didn't finish streaming, because in `edge`, we use that as the trigger to start running `after()` callbacks. In particular, we instrument the response stream using `trackStreamConsumed`. The problem was that this function didn't handle the stream being cancelled, which is what happens when a request is aborted mid-streaming, so `after()` never ended up executing. This PR fixes that and adds some tests for cancellation and interrupted streaming.
1 parent fef9c5c commit 0287e10

File tree

9 files changed

+244
-20
lines changed

9 files changed

+244
-20
lines changed
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
import { DetachedPromise } from '../../lib/detached-promise'
2+
import { trackStreamConsumed } from './web-on-close'
3+
4+
describe('trackStreamConsumed', () => {
5+
it('calls onEnd when the stream finishes', async () => {
6+
const endPromise = new DetachedPromise<void>()
7+
const onEnd = jest.fn(endPromise.resolve)
8+
9+
const { stream: inputStream, controller } =
10+
readableStreamWithController<string>()
11+
const trackedStream = trackStreamConsumed(inputStream, onEnd)
12+
13+
const reader = trackedStream.getReader()
14+
controller.enqueue('one')
15+
controller.enqueue('two')
16+
await reader.read()
17+
await reader.read()
18+
expect(onEnd).not.toHaveBeenCalled()
19+
20+
controller.close()
21+
22+
await expect(reader.read()).resolves.toEqual({
23+
done: true,
24+
value: undefined,
25+
})
26+
27+
await endPromise.promise
28+
expect(onEnd).toHaveBeenCalledTimes(1)
29+
})
30+
31+
it('calls onEnd when the stream errors', async () => {
32+
const endPromise = new DetachedPromise<void>()
33+
const onEnd = jest.fn(endPromise.resolve)
34+
35+
const { stream: inputStream, controller } =
36+
readableStreamWithController<string>()
37+
const trackedStream = trackStreamConsumed(inputStream, onEnd)
38+
39+
const reader = trackedStream.getReader()
40+
controller.enqueue('one')
41+
controller.enqueue('two')
42+
await reader.read()
43+
await reader.read()
44+
expect(onEnd).not.toHaveBeenCalled()
45+
46+
const error = new Error('kaboom')
47+
controller.error(error)
48+
49+
// if the underlying stream errors, we should error as well
50+
await expect(reader.read()).rejects.toThrow(error)
51+
52+
await endPromise.promise
53+
expect(onEnd).toHaveBeenCalledTimes(1)
54+
})
55+
56+
it('calls onEnd when the stream is cancelled', async () => {
57+
const endPromise = new DetachedPromise<void>()
58+
const onEnd = jest.fn(endPromise.resolve)
59+
60+
const cancelledPromise = new DetachedPromise<unknown>()
61+
const onCancel = jest.fn(cancelledPromise.resolve)
62+
63+
const { stream: inputStream, controller } =
64+
readableStreamWithController<string>(onCancel)
65+
const trackedStream = trackStreamConsumed(inputStream, onEnd)
66+
67+
const reader = trackedStream.getReader()
68+
controller.enqueue('one')
69+
controller.enqueue('two')
70+
await reader.read()
71+
await reader.read()
72+
expect(onEnd).not.toHaveBeenCalled()
73+
74+
const cancellationReason = new Error('cancelled')
75+
await reader.cancel(cancellationReason)
76+
77+
// from a reader's perspective, a cancelled stream behaves like it's done
78+
// (which is a bit weird honestly?)
79+
await expect(reader.read()).resolves.toEqual({
80+
done: true,
81+
value: undefined,
82+
})
83+
84+
await endPromise.promise
85+
expect(onEnd).toHaveBeenCalledTimes(1)
86+
87+
// the cancellation should propagate to back to the underlying stream
88+
await cancelledPromise.promise
89+
expect(onCancel).toHaveBeenCalledWith(cancellationReason)
90+
})
91+
})
92+
93+
function readableStreamWithController<TChunk>(
94+
onCancel?: (reason: unknown) => void
95+
) {
96+
let controller: ReadableStreamDefaultController<TChunk> = undefined!
97+
const stream = new ReadableStream<TChunk>({
98+
start(_controller) {
99+
controller = _controller
100+
},
101+
cancel(reason) {
102+
onCancel?.(reason)
103+
},
104+
})
105+
return { controller, stream }
106+
}

packages/next/src/server/web/web-on-close.ts

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,17 @@ export function trackStreamConsumed<TChunk>(
2222
stream: ReadableStream<TChunk>,
2323
onEnd: () => void
2424
): ReadableStream<TChunk> {
25-
const closePassThrough = new TransformStream<TChunk, TChunk>({
26-
flush: () => {
27-
return onEnd()
28-
},
29-
})
30-
return stream.pipeThrough(closePassThrough)
25+
// NOTE: This function must handle `stream` being aborted or cancelled,
26+
// so it can't just be this:
27+
//
28+
// return stream.pipeThrough(new TransformStream({ flush() { onEnd() } }))
29+
//
30+
// because that doesn't handle cancellations.
31+
// (and cancellation handling via `Transformer.cancel` is only available in node >20)
32+
const dest = new TransformStream()
33+
const runOnEnd = () => onEnd()
34+
stream.pipeTo(dest.writable).then(runOnEnd, runOnEnd)
35+
return dest.readable
3136
}
3237

3338
export class CloseController {
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export { default } from '../../../../nodejs/interrupted/incomplete-stream/end/page'
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export { default } from '../../../../nodejs/interrupted/incomplete-stream/hang/page'
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export { default } from '../../../../nodejs/interrupted/incomplete-stream/start/page'
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
export default function Page() {
2+
return (
3+
<main>
4+
<h1>End</h1>
5+
</main>
6+
)
7+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import { connection, after } from 'next/server'
2+
import { Suspense } from 'react'
3+
import { cliLog } from '../../../../../utils/log'
4+
5+
export default async function Page() {
6+
await connection()
7+
after(() => {
8+
cliLog({
9+
source: '[page] /interrupted/incomplete-stream/hang',
10+
})
11+
})
12+
return (
13+
<main>
14+
<h1>Hanging forever</h1>
15+
<Suspense
16+
fallback={
17+
<div id="loading-fallback">
18+
{
19+
// we're going to look for this string in the streamed response,
20+
// make sure it doesn't show up in the literal form someplace else
21+
'Loading' + (Math.random() > 1 ? 'impossible' : '') + '...'
22+
}
23+
</div>
24+
}
25+
>
26+
<HangForever />
27+
</Suspense>
28+
</main>
29+
)
30+
}
31+
32+
async function HangForever() {
33+
await new Promise((_resolve) => {})
34+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
'use client'
2+
3+
import Link from 'next/link'
4+
import { usePathname } from 'next/navigation'
5+
6+
export default function Page() {
7+
const pathname = usePathname()
8+
return (
9+
<main>
10+
<h1>Start</h1>
11+
<Link href={pathname.replace('/start', '/hang')}>hang</Link>
12+
</main>
13+
)
14+
}

test/e2e/app-dir/next-after-app/index.test.ts

Lines changed: 69 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import * as Log from './utils/log'
88
const runtimes = ['nodejs', 'edge']
99

1010
describe.each(runtimes)('after() in %s runtime', (runtimeValue) => {
11-
const { next, isNextDeploy, skipped, isTurbopack } = nextTestSetup({
11+
const { next, isNextDeploy, skipped } = nextTestSetup({
1212
files: __dirname,
1313
// `patchFile` and reading runtime logs are not supported in a deployed environment
1414
skipDeployment: true,
@@ -100,21 +100,18 @@ describe.each(runtimes)('after() in %s runtime', (runtimeValue) => {
100100
// This is currently broken with Turbopack.
101101
// https://github.com/vercel/next.js/pull/75989
102102

103-
;(isTurbopack ? it.skip : it)(
104-
'runs callbacks if redirect() was called',
105-
async () => {
106-
await next.browser(pathPrefix + '/interrupted/calls-redirect')
103+
it('runs callbacks if redirect() was called', async () => {
104+
await next.browser(pathPrefix + '/interrupted/calls-redirect')
107105

108-
await retry(() => {
109-
expect(getLogs()).toContainEqual({
110-
source: '[page] /interrupted/calls-redirect',
111-
})
112-
expect(getLogs()).toContainEqual({
113-
source: '[page] /interrupted/redirect-target',
114-
})
106+
await retry(() => {
107+
expect(getLogs()).toContainEqual({
108+
source: '[page] /interrupted/calls-redirect',
115109
})
116-
}
117-
)
110+
expect(getLogs()).toContainEqual({
111+
source: '[page] /interrupted/redirect-target',
112+
})
113+
})
114+
})
118115

119116
it('runs callbacks if notFound() was called', async () => {
120117
await next.browser(pathPrefix + '/interrupted/calls-not-found')
@@ -129,6 +126,64 @@ describe.each(runtimes)('after() in %s runtime', (runtimeValue) => {
129126
source: '[page] /interrupted/throws-error',
130127
})
131128
})
129+
130+
it('runs callbacks if a request is aborted before the page finishes streaming', async () => {
131+
const abortController = new AbortController()
132+
const res = await next.fetch(
133+
pathPrefix + '/interrupted/incomplete-stream/hang',
134+
{ signal: abortController.signal }
135+
)
136+
expect(res.status).toBe(200)
137+
138+
const textDecoder = new TextDecoder()
139+
for await (const rawChunk of res.body) {
140+
const chunk =
141+
typeof rawChunk === 'string' ? rawChunk : textDecoder.decode(rawChunk)
142+
// we found the loading fallback for the part that hangs forever, so we know we won't progress any further
143+
if (chunk.includes('Loading...')) {
144+
break
145+
}
146+
}
147+
abortController.abort()
148+
149+
await retry(() => {
150+
expect(getLogs()).toContainEqual({
151+
source: '[page] /interrupted/incomplete-stream/hang',
152+
})
153+
})
154+
})
155+
156+
it('runs callbacks if the browser disconnects before the page finishes streaming', async () => {
157+
// `next.browser()` always waits for the `load` event, which we don't want here.
158+
// (because the page hangs forever while streaming and will thus never fire `load`)
159+
// but we can't easily bypass that, so go to a dummy page first
160+
const browser = await next.browser(
161+
pathPrefix + '/interrupted/incomplete-stream/start'
162+
)
163+
expect(await browser.elementByCss('h1').text()).toEqual('Start')
164+
165+
// navigate to a page that hangs forever while streaming...
166+
// NOTE: this needs to be a soft navigation (using Link), playwright seems to hang otherwise
167+
await browser.elementByCss('a').click()
168+
await retry(async () => {
169+
expect(await browser.hasElementByCssSelector('#loading-fallback')).toBe(
170+
true
171+
)
172+
})
173+
174+
// ...but navigate away before streaming is finished (it hangs forever, so it will never finish)
175+
await browser.get(
176+
new URL(pathPrefix + '/interrupted/incomplete-stream/end', next.url)
177+
.href
178+
)
179+
expect(await browser.elementByCss('h1').text()).toEqual('End')
180+
181+
await retry(async () => {
182+
expect(getLogs()).toContainEqual({
183+
source: '[page] /interrupted/incomplete-stream/hang',
184+
})
185+
})
186+
})
132187
})
133188

134189
it('runs in middleware', async () => {

0 commit comments

Comments
 (0)