Skip to content

Commit 69afa37

Browse files
committed
fix(edge): run after() if request is cancelled mid-streaming (#76013)
- Fixes an issue where `after()` in an edge page would not run if the request was cancelled/aborted - Unskips the `runs callbacks if redirect() was called` test which was disabled to not block other things This was initially hit in #75882, during which the `runs callbacks if redirect() was called` test started failing when using experimental react + turbo in dev mode. Turns out that this happenws because something got slower and we weren't finishing the redirect response in time, i.e. before the browser disconnected and started loading the page it got redirected to. It's relevant that the response didn't finish streaming, because in `edge`, we use that as the trigger to start running `after()` callbacks. In particular, we instrument the response stream using `trackStreamConsumed`. The problem was that this function didn't handle the stream being cancelled, which is what happens when a request is aborted mid-streaming, so `after()` never ended up executing. This PR fixes that and adds some tests for cancellation and interrupted streaming.
1 parent d4bcfc3 commit 69afa37

File tree

9 files changed

+233
-6
lines changed

9 files changed

+233
-6
lines changed
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
import { DetachedPromise } from '../../lib/detached-promise'
2+
import { trackStreamConsumed } from './web-on-close'
3+
4+
describe('trackStreamConsumed', () => {
5+
it('calls onEnd when the stream finishes', async () => {
6+
const endPromise = new DetachedPromise<void>()
7+
const onEnd = jest.fn(endPromise.resolve)
8+
9+
const { stream: inputStream, controller } =
10+
readableStreamWithController<string>()
11+
const trackedStream = trackStreamConsumed(inputStream, onEnd)
12+
13+
const reader = trackedStream.getReader()
14+
controller.enqueue('one')
15+
controller.enqueue('two')
16+
await reader.read()
17+
await reader.read()
18+
expect(onEnd).not.toHaveBeenCalled()
19+
20+
controller.close()
21+
22+
await expect(reader.read()).resolves.toEqual({
23+
done: true,
24+
value: undefined,
25+
})
26+
27+
await endPromise.promise
28+
expect(onEnd).toHaveBeenCalledTimes(1)
29+
})
30+
31+
it('calls onEnd when the stream errors', async () => {
32+
const endPromise = new DetachedPromise<void>()
33+
const onEnd = jest.fn(endPromise.resolve)
34+
35+
const { stream: inputStream, controller } =
36+
readableStreamWithController<string>()
37+
const trackedStream = trackStreamConsumed(inputStream, onEnd)
38+
39+
const reader = trackedStream.getReader()
40+
controller.enqueue('one')
41+
controller.enqueue('two')
42+
await reader.read()
43+
await reader.read()
44+
expect(onEnd).not.toHaveBeenCalled()
45+
46+
const error = new Error('kaboom')
47+
controller.error(error)
48+
49+
// if the underlying stream errors, we should error as well
50+
await expect(reader.read()).rejects.toThrow(error)
51+
52+
await endPromise.promise
53+
expect(onEnd).toHaveBeenCalledTimes(1)
54+
})
55+
56+
it('calls onEnd when the stream is cancelled', async () => {
57+
const endPromise = new DetachedPromise<void>()
58+
const onEnd = jest.fn(endPromise.resolve)
59+
60+
const cancelledPromise = new DetachedPromise<unknown>()
61+
const onCancel = jest.fn(cancelledPromise.resolve)
62+
63+
const { stream: inputStream, controller } =
64+
readableStreamWithController<string>(onCancel)
65+
const trackedStream = trackStreamConsumed(inputStream, onEnd)
66+
67+
const reader = trackedStream.getReader()
68+
controller.enqueue('one')
69+
controller.enqueue('two')
70+
await reader.read()
71+
await reader.read()
72+
expect(onEnd).not.toHaveBeenCalled()
73+
74+
const cancellationReason = new Error('cancelled')
75+
await reader.cancel(cancellationReason)
76+
77+
// from a reader's perspective, a cancelled stream behaves like it's done
78+
// (which is a bit weird honestly?)
79+
await expect(reader.read()).resolves.toEqual({
80+
done: true,
81+
value: undefined,
82+
})
83+
84+
await endPromise.promise
85+
expect(onEnd).toHaveBeenCalledTimes(1)
86+
87+
// the cancellation should propagate to back to the underlying stream
88+
await cancelledPromise.promise
89+
expect(onCancel).toHaveBeenCalledWith(cancellationReason)
90+
})
91+
})
92+
93+
function readableStreamWithController<TChunk>(
94+
onCancel?: (reason: unknown) => void
95+
) {
96+
let controller: ReadableStreamDefaultController<TChunk> = undefined!
97+
const stream = new ReadableStream<TChunk>({
98+
start(_controller) {
99+
controller = _controller
100+
},
101+
cancel(reason) {
102+
onCancel?.(reason)
103+
},
104+
})
105+
return { controller, stream }
106+
}

packages/next/src/server/web/web-on-close.ts

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,17 @@ export function trackStreamConsumed<TChunk>(
2222
stream: ReadableStream<TChunk>,
2323
onEnd: () => void
2424
): ReadableStream<TChunk> {
25-
const closePassThrough = new TransformStream<TChunk, TChunk>({
26-
flush: () => {
27-
return onEnd()
28-
},
29-
})
30-
return stream.pipeThrough(closePassThrough)
25+
// NOTE: This function must handle `stream` being aborted or cancelled,
26+
// so it can't just be this:
27+
//
28+
// return stream.pipeThrough(new TransformStream({ flush() { onEnd() } }))
29+
//
30+
// because that doesn't handle cancellations.
31+
// (and cancellation handling via `Transformer.cancel` is only available in node >20)
32+
const dest = new TransformStream()
33+
const runOnEnd = () => onEnd()
34+
stream.pipeTo(dest.writable).then(runOnEnd, runOnEnd)
35+
return dest.readable
3136
}
3237

3338
export class CloseController {
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export { default } from '../../../../nodejs/interrupted/incomplete-stream/end/page'
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export { default } from '../../../../nodejs/interrupted/incomplete-stream/hang/page'
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export { default } from '../../../../nodejs/interrupted/incomplete-stream/start/page'
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
export default function Page() {
2+
return (
3+
<main>
4+
<h1>End</h1>
5+
</main>
6+
)
7+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import { connection, after } from 'next/server'
2+
import { Suspense } from 'react'
3+
import { cliLog } from '../../../../../utils/log'
4+
5+
export default async function Page() {
6+
await connection()
7+
after(() => {
8+
cliLog({
9+
source: '[page] /interrupted/incomplete-stream/hang',
10+
})
11+
})
12+
return (
13+
<main>
14+
<h1>Hanging forever</h1>
15+
<Suspense
16+
fallback={
17+
<div id="loading-fallback">
18+
{
19+
// we're going to look for this string in the streamed response,
20+
// make sure it doesn't show up in the literal form someplace else
21+
'Loading' + (Math.random() > 1 ? 'impossible' : '') + '...'
22+
}
23+
</div>
24+
}
25+
>
26+
<HangForever />
27+
</Suspense>
28+
</main>
29+
)
30+
}
31+
32+
async function HangForever() {
33+
await new Promise((_resolve) => {})
34+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
'use client'
2+
3+
import Link from 'next/link'
4+
import { usePathname } from 'next/navigation'
5+
6+
export default function Page() {
7+
const pathname = usePathname()
8+
return (
9+
<main>
10+
<h1>Start</h1>
11+
<Link href={pathname.replace('/start', '/hang')}>hang</Link>
12+
</main>
13+
)
14+
}

test/e2e/app-dir/next-after-app/index.test.ts

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,64 @@ describe.each(runtimes)('after() in %s runtime', (runtimeValue) => {
124124
source: '[page] /interrupted/throws-error',
125125
})
126126
})
127+
128+
it('runs callbacks if a request is aborted before the page finishes streaming', async () => {
129+
const abortController = new AbortController()
130+
const res = await next.fetch(
131+
pathPrefix + '/interrupted/incomplete-stream/hang',
132+
{ signal: abortController.signal }
133+
)
134+
expect(res.status).toBe(200)
135+
136+
const textDecoder = new TextDecoder()
137+
for await (const rawChunk of res.body) {
138+
const chunk =
139+
typeof rawChunk === 'string' ? rawChunk : textDecoder.decode(rawChunk)
140+
// we found the loading fallback for the part that hangs forever, so we know we won't progress any further
141+
if (chunk.includes('Loading...')) {
142+
break
143+
}
144+
}
145+
abortController.abort()
146+
147+
await retry(() => {
148+
expect(getLogs()).toContainEqual({
149+
source: '[page] /interrupted/incomplete-stream/hang',
150+
})
151+
})
152+
})
153+
154+
it('runs callbacks if the browser disconnects before the page finishes streaming', async () => {
155+
// `next.browser()` always waits for the `load` event, which we don't want here.
156+
// (because the page hangs forever while streaming and will thus never fire `load`)
157+
// but we can't easily bypass that, so go to a dummy page first
158+
const browser = await next.browser(
159+
pathPrefix + '/interrupted/incomplete-stream/start'
160+
)
161+
expect(await browser.elementByCss('h1').text()).toEqual('Start')
162+
163+
// navigate to a page that hangs forever while streaming...
164+
// NOTE: this needs to be a soft navigation (using Link), playwright seems to hang otherwise
165+
await browser.elementByCss('a').click()
166+
await retry(async () => {
167+
expect(await browser.hasElementByCssSelector('#loading-fallback')).toBe(
168+
true
169+
)
170+
})
171+
172+
// ...but navigate away before streaming is finished (it hangs forever, so it will never finish)
173+
await browser.get(
174+
new URL(pathPrefix + '/interrupted/incomplete-stream/end', next.url)
175+
.href
176+
)
177+
expect(await browser.elementByCss('h1').text()).toEqual('End')
178+
179+
await retry(async () => {
180+
expect(getLogs()).toContainEqual({
181+
source: '[page] /interrupted/incomplete-stream/hang',
182+
})
183+
})
184+
})
127185
})
128186

129187
it('runs in middleware', async () => {

0 commit comments

Comments
 (0)