Skip to content

Commit 4b49101

Browse files
committed
fix: request abort signal
1 parent 08363f0 commit 4b49101

File tree

2 files changed

+97
-8
lines changed

2 files changed

+97
-8
lines changed

lib/api/api-request.js

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ const { InvalidArgumentError } = require('../core/errors')
66
const util = require('../core/util')
77
const { getResolveErrorBodyCallback } = require('./util')
88
const { AsyncResource } = require('node:async_hooks')
9-
const { addSignal, removeSignal } = require('./abort-signal')
109

1110
class RequestHandler extends AsyncResource {
1211
constructor (opts, callback) {
@@ -56,19 +55,28 @@ class RequestHandler extends AsyncResource {
5655
this.onInfo = onInfo || null
5756
this.throwOnError = throwOnError
5857
this.highWaterMark = highWaterMark
58+
this.signal = signal
5959

6060
if (util.isStream(body)) {
6161
body.on('error', (err) => {
6262
this.onError(err)
6363
})
6464
}
6565

66-
addSignal(this, signal)
66+
if (this.signal) {
67+
this.removeAbortListener = util.addAbortListener(this.signal, () => {
68+
if (this.res) {
69+
this.res.destroy(this.signal.reason)
70+
} else {
71+
this.abort(this.signal.reason)
72+
}
73+
})
74+
}
6775
}
6876

6977
onConnect (abort, context) {
70-
if (this.reason) {
71-
abort(this.reason)
78+
if (this.signal && this.signal.aborted) {
79+
abort(this.signal.reason)
7280
return
7381
}
7482

@@ -95,6 +103,13 @@ class RequestHandler extends AsyncResource {
95103
const contentLength = parsedHeaders['content-length']
96104
const body = new Readable({ resume, abort, contentType, contentLength, highWaterMark })
97105

106+
if (this.removeAbortListener) {
107+
// TODO (fix): 'close' is sufficient but breaks tests.
108+
body
109+
.on('end', this.removeAbortListener)
110+
.on('error', this.removeAbortListener)
111+
}
112+
98113
this.callback = null
99114
this.res = body
100115
if (callback !== null) {
@@ -123,8 +138,6 @@ class RequestHandler extends AsyncResource {
123138
onComplete (trailers) {
124139
const { res } = this
125140

126-
removeSignal(this)
127-
128141
util.parseHeaders(trailers, this.trailers)
129142

130143
res.push(null)
@@ -133,8 +146,6 @@ class RequestHandler extends AsyncResource {
133146
onError (err) {
134147
const { res, callback, body, opaque } = this
135148

136-
removeSignal(this)
137-
138149
if (callback) {
139150
// TODO: Does this need queueMicrotask?
140151
this.callback = null
@@ -149,6 +160,8 @@ class RequestHandler extends AsyncResource {
149160
queueMicrotask(() => {
150161
util.destroy(res, err)
151162
})
163+
} else if (this.removeAbortListener) {
164+
this.removeAbortListener()
152165
}
153166

154167
if (body) {

test/request-signal.js

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
'use strict'
2+
3+
const { createServer } = require('node:http')
4+
const { test, after } = require('node:test')
5+
const { tspl } = require('@matteo.collina/tspl')
6+
const { request } = require('..')
7+
8+
test('pre abort signal w/ reason', async (t) => {
9+
t = tspl(t, { plan: 1 })
10+
11+
const server = createServer((req, res) => {
12+
res.end('asd')
13+
})
14+
after(() => server.close())
15+
16+
server.listen(0, async () => {
17+
const ac = new AbortController()
18+
const _err = new Error()
19+
ac.abort(_err)
20+
try {
21+
await request(`http://0.0.0.0:${server.address().port}`, { signal: ac.signal })
22+
} catch (err) {
23+
t.equal(err, _err)
24+
}
25+
})
26+
await t.completed
27+
})
28+
29+
test('post abort signal', async (t) => {
30+
t = tspl(t, { plan: 1 })
31+
32+
const server = createServer((req, res) => {
33+
res.end('asd')
34+
})
35+
after(() => server.close())
36+
37+
server.listen(0, async () => {
38+
const ac = new AbortController()
39+
const ures = await request(`http://0.0.0.0:${server.address().port}`, { signal: ac.signal })
40+
ac.abort()
41+
try {
42+
/* eslint-disable-next-line no-unused-vars */
43+
for await (const chunk of ures.body) {
44+
// Do nothing...
45+
}
46+
} catch (err) {
47+
t.equal(err.name, 'AbortError')
48+
}
49+
})
50+
await t.completed
51+
})
52+
53+
test('post abort signal w/ reason', async (t) => {
54+
t = tspl(t, { plan: 1 })
55+
56+
const server = createServer((req, res) => {
57+
res.end('asd')
58+
})
59+
after(() => server.close())
60+
61+
server.listen(0, async () => {
62+
const ac = new AbortController()
63+
const _err = new Error()
64+
const ures = await request(`http://0.0.0.0:${server.address().port}`, { signal: ac.signal })
65+
ac.abort(_err)
66+
try {
67+
/* eslint-disable-next-line no-unused-vars */
68+
for await (const chunk of ures.body) {
69+
// Do nothing...
70+
}
71+
} catch (err) {
72+
t.equal(err, _err)
73+
}
74+
})
75+
await t.completed
76+
})

0 commit comments

Comments
 (0)