Skip to content

Commit 32c08f4

Browse files
swaldmanndanjoa
andauthored
feat: opt-in replacement for generic-pool (#815)
Drop-in replacement for `generic-pool`. For now, we'll keep this opt-in – activated via `cds.requires.db.pool.builtin = true`. --------- Co-authored-by: Daniel Hutzel <[email protected]>
1 parent e6d3d1b commit 32c08f4

File tree

2 files changed

+278
-11
lines changed

2 files changed

+278
-11
lines changed

db-service/lib/common/generic-pool.js

Lines changed: 260 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
1-
const { createPool } = require('generic-pool')
1+
const cds = require('@sap/cds')
2+
const LOG = cds.log('db')
3+
4+
const createPool = (factory, config) => {
5+
if (cds.requires.db?.pool?.builtin) return new Pool(factory, config)
6+
return require('generic-pool').createPool(factory, config)
7+
}
28

39
function ConnectionPool (factory, tenant) {
410
let bound_factory = { __proto__: factory, create: factory.create.bind(null, tenant) }
@@ -29,4 +35,256 @@ function TrackedConnectionPool (factory, tenant) {
2935
}
3036

3137
const DEBUG = /\bpool\b/.test(process.env.DEBUG)
32-
module.exports = DEBUG ? TrackedConnectionPool : ConnectionPool
38+
module.exports = DEBUG && !cds.requires.db?.pool?.builtin ? TrackedConnectionPool : ConnectionPool
39+
40+
// Drop-in replacement for https://github.com/coopernurse/node-pool
41+
// TODO: fifo: true? relevant for our use case?
42+
43+
const { EventEmitter } = require('events')
44+
45+
const ResourceState = Object.freeze({
46+
ALLOCATED: 'allocated',
47+
IDLE: 'idle',
48+
INVALID: 'invalid',
49+
VALIDATION: 'validation'
50+
})
51+
52+
const RequestState = Object.freeze({
53+
PENDING: 'pending',
54+
RESOLVED: 'resolved',
55+
REJECTED: 'rejected'
56+
})
57+
58+
class Request {
59+
constructor (ttl) {
60+
this.state = 'pending'
61+
this.promise = new Promise((resolve, reject) => {
62+
this._resolve = value => {
63+
clearTimeout(this._timeout)
64+
this.state = RequestState.RESOLVED
65+
resolve(value)
66+
}
67+
this._reject = reason => {
68+
clearTimeout(this._timeout)
69+
this.state = RequestState.REJECTED
70+
reject(reason)
71+
}
72+
if (typeof ttl === 'number' && ttl >= 0) {
73+
const err = new Error(`Pool resource could not be acquired within ${ttl / 1000}s`)
74+
this._timeout = setTimeout(() => this._reject(err), ttl)
75+
}
76+
})
77+
}
78+
resolve (v) { if (this.state === 'pending') this._resolve(v) }
79+
reject (e) { if (this.state === 'pending') this._reject(e) }
80+
}
81+
82+
class PooledResource {
83+
constructor(resource) {
84+
this.obj = resource
85+
this.creationTime = Date.now()
86+
this.idle()
87+
}
88+
89+
update(newState) {
90+
this.state = newState
91+
}
92+
idle() {
93+
this.state = ResourceState.IDLE
94+
this.lastIdleTime = Date.now()
95+
}
96+
}
97+
98+
class Pool extends EventEmitter {
99+
100+
constructor (factory, options = {}) {
101+
super()
102+
this.factory = factory
103+
104+
this.options = Object.assign({
105+
testOnBorrow: false,
106+
evictionRunIntervalMillis: 100000,
107+
numTestsPerEvictionRun: 3,
108+
softIdleTimeoutMillis: -1,
109+
idleTimeoutMillis: 30000,
110+
acquireTimeoutMillis: null,
111+
destroyTimeoutMillis: null,
112+
fifo: false,
113+
min: 0,
114+
max: 10
115+
}, options)
116+
117+
/** @type {boolean} */
118+
this._draining = false
119+
120+
/** @type {Set<PooledResource>} */
121+
this._available = new Set()
122+
123+
/** @type {Map<any, { pooledResource: PooledResource }>} */
124+
this._loans = new Map()
125+
126+
/** @type {Set<PooledResource>} */
127+
this._all = new Set()
128+
129+
/** @type {Set<Promise<void>>} */
130+
this._creates = new Set()
131+
132+
/** @type {Request[]} */
133+
this._queue = []
134+
135+
this.#scheduleEviction()
136+
for (let i = 0; i < this.options.min - this.size; i++) this.#createResource()
137+
}
138+
139+
async acquire() {
140+
if (this._draining) throw new Error('Pool is draining and cannot accept new requests')
141+
const request = new Request(this.options.acquireTimeoutMillis)
142+
this._queue.push(request)
143+
this.#dispense()
144+
return request.promise
145+
}
146+
147+
async release(resource) {
148+
const loan = this._loans.get(resource)
149+
if (!loan) throw new Error('Resource not currently part of this pool')
150+
this._loans.delete(resource)
151+
const pooledResource = loan.pooledResource
152+
pooledResource.idle()
153+
this._available.add(pooledResource)
154+
this.#dispense()
155+
}
156+
157+
async destroy(resource) {
158+
const loan = this._loans.get(resource)
159+
if (!loan) throw new Error('Resource not currently part of this pool')
160+
this._loans.delete(resource)
161+
const pooledResource = loan.pooledResource
162+
await this.#destroy(pooledResource)
163+
this.#dispense()
164+
}
165+
166+
async drain() {
167+
this._draining = true
168+
for (const request of this._queue.splice(0)) {
169+
if (request.state === RequestState.PENDING) request.reject(new Error('Pool is draining and cannot fulfil request'))
170+
}
171+
clearTimeout(this._scheduledEviction)
172+
}
173+
174+
async clear() {
175+
await Promise.allSettled(Array.from(this._creates))
176+
await Promise.allSettled(Array.from(this._available).map(resource => this.#destroy(resource)))
177+
}
178+
179+
async #createResource() {
180+
const createPromise = (async () => {
181+
try {
182+
const resource = new PooledResource(await this.factory.create())
183+
this._all.add(resource)
184+
this._available.add(resource)
185+
} catch (err) {
186+
this._queue.shift()?.reject(err)
187+
}
188+
})()
189+
this._creates.add(createPromise)
190+
createPromise.finally(() => {
191+
this._creates.delete(createPromise)
192+
this.#dispense()
193+
})
194+
return createPromise
195+
}
196+
197+
async #dispense() {
198+
const waiting = this._queue.length
199+
if (waiting === 0) return
200+
const capacity = this._available.size + this._creates.size
201+
const shortfall = waiting - capacity
202+
if (shortfall > 0 && this.size < this.options.max) {
203+
const needed = Math.min(shortfall, this.options.max - this.size)
204+
for (let i = 0; i < needed; i++) this.#createResource()
205+
}
206+
const dispense = async resource => {
207+
const request = this._queue.shift()
208+
if (!request) {
209+
resource.idle()
210+
this._available.add(resource)
211+
return false
212+
}
213+
if (request.state !== RequestState.PENDING) {
214+
this.#dispense()
215+
return false
216+
}
217+
this._loans.set(resource.obj, { pooledResource: resource })
218+
resource.update(ResourceState.ALLOCATED)
219+
request.resolve(resource.obj)
220+
return true
221+
}
222+
223+
const _dispenses = []
224+
for (let i = 0; i < Math.min(this._available.size, waiting); i++) {
225+
const resource = this._available.values().next().value
226+
this._available.delete(resource)
227+
if (this.options.testOnBorrow) {
228+
const validationPromise = (async () => {
229+
resource.update(ResourceState.VALIDATION)
230+
try {
231+
const isValid = await this.factory.validate(resource.obj)
232+
if (isValid) return dispense(resource)
233+
} catch {/* marked as invalid below */}
234+
resource.update(ResourceState.INVALID)
235+
await this.#destroy(resource)
236+
this.#dispense()
237+
return false
238+
})()
239+
_dispenses.push(validationPromise)
240+
} else {
241+
_dispenses.push(dispense(resource))
242+
}
243+
}
244+
await Promise.all(_dispenses)
245+
}
246+
247+
async #destroy(resource) {
248+
resource.update(ResourceState.INVALID)
249+
this._all.delete(resource)
250+
this._available.delete(resource)
251+
this._loans.delete(resource.obj)
252+
try {
253+
await this.factory.destroy(resource.obj)
254+
} catch (e) {
255+
LOG.error(e)
256+
/* FIXME: We have to ignore errors here due to a TypeError in hdb */
257+
/* This was also a problem with the old (generic-pool) implementation */
258+
/* Root cause in hdb needs to be fixed */
259+
} finally {
260+
if (!this._draining && this.size < this.options.min) {
261+
await this.#createResource()
262+
}
263+
}
264+
}
265+
266+
#scheduleEviction() {
267+
const { evictionRunIntervalMillis, numTestsPerEvictionRun, softIdleTimeoutMillis, min, idleTimeoutMillis } = this.options
268+
if (evictionRunIntervalMillis <= 0) return
269+
this._scheduledEviction = setTimeout(async () => {
270+
try {
271+
const resourcesToEvict = Array.from(this._available)
272+
.slice(0, numTestsPerEvictionRun)
273+
.filter(resource => {
274+
const idleTime = Date.now() - resource.lastIdleTime
275+
const softEvict = softIdleTimeoutMillis > 0 && softIdleTimeoutMillis < idleTime && min < this._available.size
276+
return softEvict || idleTimeoutMillis < idleTime
277+
})
278+
await Promise.all(resourcesToEvict.map(resource => this.#destroy(resource)))
279+
} finally {
280+
this.#scheduleEviction()
281+
}
282+
}, evictionRunIntervalMillis).unref()
283+
}
284+
285+
get size() { return this._all.size + this._creates.size }
286+
get available() { return this._available.size }
287+
get borrowed() { return this._loans.size }
288+
get pending() { return this._queue.length }
289+
get tenant() { return this.options.tenant }
290+
}

hana/lib/HANAService.js

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class HANAService extends SQLService {
5757
testOnBorrow: true,
5858
fifo: false
5959
},
60-
create: async function (tenant) {
60+
create: async function create(tenant, start = Date.now()) {
6161
try {
6262
const { credentials } = isMultitenant
6363
? await require('@sap/cds-mtxs/lib').xt.serviceManager.get(tenant, { disableCache: false })
@@ -68,15 +68,24 @@ class HANAService extends SQLService {
6868
return dbc
6969
} catch (err) {
7070
if (isMultitenant) {
71-
// REVISIT: throw the error and break retry loop
72-
// Stop trying when the tenant does not exist or is rate limited
73-
if (err.status == 404 || err.status == 429)
74-
return new Promise(function (_, reject) {
75-
setTimeout(() => reject(err), acquireTimeoutMillis)
76-
})
71+
if (cds.requires.db?.pool?.builtin) {
72+
if (err.status === 404 || err.status === 429) {
73+
throw new Error(`Pool failed connecting to '${tenant}'`, { cause: err })
74+
}
75+
await require('@sap/cds-mtxs/lib').xt.serviceManager.get(tenant, { disableCache: true })
76+
if (Date.now() - start < acquireTimeoutMillis) return create(tenant, start)
77+
else throw new Error(`Pool failed connecting to '${tenant}' within ${acquireTimeoutMillis}ms`, { cause: err })
78+
} else {
79+
// Stop trying when the tenant does not exist or is rate limited
80+
if (err.status == 404 || err.status == 429) {
81+
return new Promise(function (_, reject) { // break retry loop for generic-pool
82+
setTimeout(() => reject(err), acquireTimeoutMillis)
83+
})
84+
}
85+
await require('@sap/cds-mtxs/lib').xt.serviceManager.get(tenant, { disableCache: true })
86+
throw new Error(`Pool failed connecting to'${tenant}'`, { cause: err }) // generic-pool will retry on errors
87+
}
7788
} else if (err.code !== 10) throw err
78-
await require('@sap/cds-mtxs/lib').xt.serviceManager.get(tenant, { disableCache: true })
79-
return this.create(tenant)
8089
}
8190
},
8291
error: (err /*, tenant*/) => {

0 commit comments

Comments
 (0)