Skip to content

Commit 5ee96a4

Browse files
committed
fix: interceptor back-pressure
Refs: #3368 Refs: #3370
1 parent dd98299 commit 5ee96a4

11 files changed

+75
-44
lines changed

lib/dispatcher/agent.js

+3-9
Original file line numberDiff line numberDiff line change
@@ -79,30 +79,24 @@ class Agent extends DispatcherBase {
7979
return ret
8080
}
8181

82-
[kDispatch] (opts, handler) {
82+
[kDispatch] (opts, handler, onDrain) {
8383
let key
8484
if (opts.origin && (typeof opts.origin === 'string' || opts.origin instanceof URL)) {
8585
key = String(opts.origin)
8686
} else {
8787
throw new InvalidArgumentError('opts.origin must be a non-empty string or URL.')
8888
}
8989

90-
let dispatcher = this[kClients].get(key)
90+
const dispatcher = this[kClients].get(key)
9191

9292
if (!dispatcher) {
93-
dispatcher = this[kFactory](opts.origin, this[kOptions])
94-
.on('drain', this[kOnDrain])
95-
.on('connect', this[kOnConnect])
96-
.on('disconnect', this[kOnDisconnect])
97-
.on('connectionError', this[kOnConnectionError])
98-
9993
// This introduces a tiny memory leak, as dispatchers are never removed from the map.
10094
// TODO(mcollina): remove te timer when the client/pool do not have any more
10195
// active connections.
10296
this[kClients].set(key, dispatcher)
10397
}
10498

105-
return dispatcher.dispatch(opts, handler)
99+
return dispatcher.dispatch(opts, handler, onDrain)
106100
}
107101

108102
async [kClose] () {

lib/dispatcher/client.js

+24-10
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ const connectH2 = require('./client-h2.js')
6262
let deprecatedInterceptorWarned = false
6363

6464
const kClosedResolve = Symbol('kClosedResolve')
65+
const kDrainQueue = Symbol('kDrainQueue')
6566

6667
function getPipelining (client) {
6768
return client[kPipelining] ?? client[kHTTPContext]?.defaultPipelining ?? 1
@@ -243,6 +244,14 @@ class Client extends DispatcherBase {
243244
this[kMaxConcurrentStreams] = maxConcurrentStreams != null ? maxConcurrentStreams : 100 // Max peerConcurrentStreams for a Node h2 server
244245
this[kHTTPContext] = null
245246

247+
// TODO (fix): What if destroyed?
248+
this[kDrainQueue] = []
249+
this.on('drain', () => {
250+
for (const callback of this[kDrainQueue].splice(0)) {
251+
callback(null)
252+
}
253+
})
254+
246255
// kQueue is built up of 3 sections separated by
247256
// the kRunningIdx and kPendingIdx indices.
248257
// | complete | running | pending |
@@ -299,26 +308,31 @@ class Client extends DispatcherBase {
299308
this.once('connect', cb)
300309
}
301310

302-
[kDispatch] (opts, handler) {
311+
[kDispatch] (opts, handler, onDrain) {
303312
const origin = opts.origin || this[kUrl].origin
304313
const request = new Request(origin, opts, handler)
305314

306-
this[kQueue].push(request)
307-
if (this[kResuming]) {
308-
// Do nothing.
309-
} else if (util.bodyLength(request.body) == null && util.isIterable(request.body)) {
310-
// Wait a tick in case stream/iterator is ended in the same tick.
311-
this[kResuming] = 1
312-
queueMicrotask(() => resume(this))
315+
if (this[kBusy] && onDrain) {
316+
this[kDrainQueue].push(onDrain)
317+
return false
313318
} else {
314-
this[kResume](true)
319+
this[kQueue].push(request)
320+
if (this[kResuming]) {
321+
// Do nothing.
322+
} else if (util.bodyLength(request.body) == null && util.isIterable(request.body)) {
323+
// Wait a tick in case stream/iterator is ended in the same tick.
324+
this[kResuming] = 1
325+
queueMicrotask(() => resume(this))
326+
} else {
327+
this[kResume](true)
328+
}
315329
}
316330

317331
if (this[kResuming] && this[kNeedDrain] !== 2 && this[kBusy]) {
318332
this[kNeedDrain] = 2
319333
}
320334

321-
return this[kNeedDrain] < 2
335+
return onDrain ? true : this[kNeedDrain] < 2
322336
}
323337

324338
async [kClose] () {

lib/dispatcher/dispatcher-base.js

+5-5
Original file line numberDiff line numberDiff line change
@@ -142,21 +142,21 @@ class DispatcherBase extends Dispatcher {
142142
})
143143
}
144144

145-
[kInterceptedDispatch] (opts, handler) {
145+
[kInterceptedDispatch] (opts, handler, onDrain) {
146146
if (!this[kInterceptors] || this[kInterceptors].length === 0) {
147147
this[kInterceptedDispatch] = this[kDispatch]
148-
return this[kDispatch](opts, handler)
148+
return this[kDispatch](opts, handler, onDrain)
149149
}
150150

151151
let dispatch = this[kDispatch].bind(this)
152152
for (let i = this[kInterceptors].length - 1; i >= 0; i--) {
153153
dispatch = this[kInterceptors][i](dispatch)
154154
}
155155
this[kInterceptedDispatch] = dispatch
156-
return dispatch(opts, handler)
156+
return dispatch(opts, handler, onDrain)
157157
}
158158

159-
dispatch (opts, handler) {
159+
dispatch (opts, handler, onDrain) {
160160
if (!handler || typeof handler !== 'object') {
161161
throw new InvalidArgumentError('handler must be an object')
162162
}
@@ -174,7 +174,7 @@ class DispatcherBase extends Dispatcher {
174174
throw new ClientClosedError()
175175
}
176176

177-
return this[kInterceptedDispatch](opts, handler)
177+
return this[kInterceptedDispatch](opts, handler, onDrain)
178178
} catch (err) {
179179
if (typeof handler.onError !== 'function') {
180180
throw new InvalidArgumentError('invalid onError method')

lib/dispatcher/dispatcher.js

+10-2
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,16 @@ class ComposedDispatcher extends Dispatcher {
4949
this.#dispatch = dispatch
5050
}
5151

52-
dispatch (...args) {
53-
this.#dispatch(...args)
52+
dispatch (opts, handler, onDrain) {
53+
onDrain ??= (err) => {
54+
if (err) {
55+
handler.onError(err)
56+
} else {
57+
this.dispatch(opts, handler, onDrain)
58+
}
59+
}
60+
61+
return this.#dispatch(opts, handler, onDrain)
5462
}
5563

5664
close (...args) {

lib/dispatcher/pool-base.js

+15-3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ const kGetDispatcher = Symbol('get dispatcher')
1717
const kAddClient = Symbol('add client')
1818
const kRemoveClient = Symbol('remove client')
1919
const kStats = Symbol('stats')
20+
const kDrainQueue = Symbol('kDrainQueue')
2021

2122
class PoolBase extends DispatcherBase {
2223
constructor () {
@@ -69,6 +70,14 @@ class PoolBase extends DispatcherBase {
6970
}
7071

7172
this[kStats] = new PoolStats(this)
73+
74+
// TODO (fix): What if destroyed?
75+
this[kDrainQueue] = []
76+
this.on('drain', () => {
77+
for (const callback of this[kDrainQueue].splice(0)) {
78+
callback(null)
79+
}
80+
})
7281
}
7382

7483
get [kBusy] () {
@@ -133,10 +142,13 @@ class PoolBase extends DispatcherBase {
133142
return Promise.all(this[kClients].map(c => c.destroy(err)))
134143
}
135144

136-
[kDispatch] (opts, handler) {
145+
[kDispatch] (opts, handler, onDrain) {
137146
const dispatcher = this[kGetDispatcher]()
138147

139-
if (!dispatcher) {
148+
if (!dispatcher && onDrain) {
149+
this[kDrainQueue].push(onDrain)
150+
return false
151+
} else if (!dispatcher) {
140152
this[kNeedDrain] = true
141153
this[kQueue].push({ opts, handler })
142154
this[kQueued]++
@@ -145,7 +157,7 @@ class PoolBase extends DispatcherBase {
145157
this[kNeedDrain] = !this[kGetDispatcher]()
146158
}
147159

148-
return !this[kNeedDrain]
160+
return onDrain ? true : !this[kNeedDrain]
149161
}
150162

151163
[kAddClient] (client) {

lib/dispatcher/proxy-agent.js

+3-2
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ class ProxyAgent extends DispatcherBase {
107107
})
108108
}
109109

110-
dispatch (opts, handler) {
110+
dispatch (opts, handler, onDrain) {
111111
const headers = buildHeaders(opts.headers)
112112
throwIfProxyAuthIsSent(headers)
113113

@@ -121,7 +121,8 @@ class ProxyAgent extends DispatcherBase {
121121
...opts,
122122
headers
123123
},
124-
handler
124+
handler,
125+
onDrain
125126
)
126127
}
127128

lib/dispatcher/retry-agent.js

+5-4
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,24 @@ const Dispatcher = require('./dispatcher')
44
const RetryHandler = require('../handler/retry-handler')
55

66
class RetryAgent extends Dispatcher {
7-
#agent = null
8-
#options = null
7+
#agent
8+
#options
9+
910
constructor (agent, options = {}) {
1011
super(options)
1112
this.#agent = agent
1213
this.#options = options
1314
}
1415

15-
dispatch (opts, handler) {
16+
dispatch (opts, handler, onDrain) {
1617
const retry = new RetryHandler({
1718
...opts,
1819
retryOptions: this.#options
1920
}, {
2021
dispatch: this.#agent.dispatch.bind(this.#agent),
2122
handler
2223
})
23-
return this.#agent.dispatch(opts, retry)
24+
return this.#agent.dispatch(opts, retry, onDrain)
2425
}
2526

2627
close () {

lib/interceptor/dump.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ function createDumpInterceptor (
106106
}
107107
) {
108108
return dispatch => {
109-
return function Intercept (opts, handler) {
109+
return function Intercept (opts, handler, onDrain) {
110110
const { dumpMaxSize = defaultMaxSize } =
111111
opts
112112

@@ -115,7 +115,7 @@ function createDumpInterceptor (
115115
handler
116116
)
117117

118-
return dispatch(opts, dumpHandler)
118+
return dispatch(opts, dumpHandler, onDrain)
119119
}
120120
}
121121
}

lib/interceptor/redirect-interceptor.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,16 @@ const RedirectHandler = require('../handler/redirect-handler')
44

55
function createRedirectInterceptor ({ maxRedirections: defaultMaxRedirections }) {
66
return (dispatch) => {
7-
return function Intercept (opts, handler) {
7+
return function Intercept (opts, handler, onDrain) {
88
const { maxRedirections = defaultMaxRedirections } = opts
99

1010
if (!maxRedirections) {
11-
return dispatch(opts, handler)
11+
return dispatch(opts, handler, onDrain)
1212
}
1313

1414
const redirectHandler = new RedirectHandler(dispatch, maxRedirections, opts, handler)
1515
opts = { ...opts, maxRedirections: 0 } // Stop sub dispatcher from also redirecting.
16-
return dispatch(opts, redirectHandler)
16+
return dispatch(opts, redirectHandler, onDrain)
1717
}
1818
}
1919
}

lib/interceptor/redirect.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ const RedirectHandler = require('../handler/redirect-handler')
44
module.exports = opts => {
55
const globalMaxRedirections = opts?.maxRedirections
66
return dispatch => {
7-
return function redirectInterceptor (opts, handler) {
7+
return function redirectInterceptor (opts, handler, onDrain) {
88
const { maxRedirections = globalMaxRedirections, ...baseOpts } = opts
99

1010
if (!maxRedirections) {
@@ -18,7 +18,7 @@ module.exports = opts => {
1818
handler
1919
)
2020

21-
return dispatch(baseOpts, redirectHandler)
21+
return dispatch(baseOpts, redirectHandler, onDrain)
2222
}
2323
}
2424
}

lib/interceptor/retry.js

+3-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ const RetryHandler = require('../handler/retry-handler')
33

44
module.exports = globalOpts => {
55
return dispatch => {
6-
return function retryInterceptor (opts, handler) {
6+
return function retryInterceptor (opts, handler, onDrain) {
77
return dispatch(
88
opts,
99
new RetryHandler(
@@ -12,7 +12,8 @@ module.exports = globalOpts => {
1212
handler,
1313
dispatch
1414
}
15-
)
15+
),
16+
onDrain
1617
)
1718
}
1819
}

0 commit comments

Comments
 (0)