Skip to content

Commit 947d748

Browse files
Add pool generation numbers to allow a pool to be closed instantly.
Every connection that is opened by the pool is tracked with the pool's generation number. This allows us to close the pool immediately when `.Close()` is called. * For connections that are idle, we close them directly when lowering the capacity. * For connections that are returned to the pool, we check whether the generation numbers match and if they don't, we close the returned connection directly. Signed-off-by: Arthur Schreiber <[email protected]>
1 parent 7dd3c6b commit 947d748

File tree

4 files changed

+172
-82
lines changed

4 files changed

+172
-82
lines changed

go/pools/smartconnpool/connection.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ type Pooled[C Connection] struct {
3636
timeUsed timestamp
3737
pool *ConnPool[C]
3838

39+
generation int64 // generation of the pool when this connection was created
40+
3941
Conn C
4042
}
4143

@@ -48,16 +50,16 @@ func (dbc *Pooled[C]) Recycle() {
4850
case dbc.pool == nil:
4951
dbc.Conn.Close()
5052
case dbc.Conn.IsClosed():
51-
dbc.pool.put(nil)
53+
dbc.pool.put(nil, dbc.generation)
5254
default:
53-
dbc.pool.put(dbc)
55+
dbc.pool.put(dbc, dbc.generation)
5456
}
5557
}
5658

5759
func (dbc *Pooled[C]) Taint() {
5860
if dbc.pool == nil {
5961
return
6062
}
61-
dbc.pool.put(nil)
63+
dbc.pool.put(nil, dbc.generation)
6264
dbc.pool = nil
6365
}

go/pools/smartconnpool/pool.go

Lines changed: 53 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,9 @@ type ConnPool[C Connection] struct {
131131
close chan struct{}
132132
capacityMu sync.Mutex
133133

134+
// generation of the pool, this is incremented every time the pool is closed
135+
generation atomic.Int64
136+
134137
config struct {
135138
// connect is the callback to create a new connection for the pool
136139
connect Connector[C]
@@ -395,7 +398,17 @@ func (pool *ConnPool[C]) Get(ctx context.Context, setting *Setting) (*Pooled[C],
395398

396399
// put returns a connection to the pool. This is a private API.
397400
// Return connections to the pool by calling Pooled.Recycle
398-
func (pool *ConnPool[C]) put(conn *Pooled[C]) {
401+
func (pool *ConnPool[C]) put(conn *Pooled[C], generation int64) {
402+
// If the generations don't match, just close the connection and return.
403+
// Don't update any metrics here.
404+
if generation != pool.generation.Load() {
405+
if conn != nil {
406+
conn.Close()
407+
}
408+
409+
return
410+
}
411+
399412
pool.borrowed.Add(-1)
400413

401414
if conn == nil {
@@ -524,8 +537,9 @@ func (pool *ConnPool[C]) connNew(ctx context.Context) (*Pooled[C], error) {
524537
return nil, err
525538
}
526539
pooled := &Pooled[C]{
527-
pool: pool,
528-
Conn: conn,
540+
pool: pool,
541+
generation: pool.generation.Load(),
542+
Conn: conn,
529543
}
530544
now := monotonicNow()
531545
pooled.timeUsed.set(now)
@@ -720,32 +734,46 @@ func (pool *ConnPool[C]) setCapacity(ctx context.Context, newcap int64) error {
720734
// wait for connections to be returned to the pool if we're reducing the capacity.
721735
defer pool.setIdleCount()
722736

723-
const delay = 10 * time.Millisecond
737+
if newcap == 0 {
738+
pool.generation.Add(1)
739+
pool.active.Store(0)
740+
pool.borrowed.Store(0)
724741

725-
// close connections until we're under capacity
726-
for pool.active.Load() > newcap {
727-
if err := ctx.Err(); err != nil {
728-
return vterrors.Errorf(vtrpcpb.Code_ABORTED,
729-
"timed out while waiting for connections to be returned to the pool (capacity=%d, active=%d, borrowed=%d)",
730-
pool.capacity.Load(), pool.active.Load(), pool.borrowed.Load())
731-
}
732-
// if we're closing down the pool, make sure there's no clients waiting
733-
// for connections because they won't be returned in the future
734-
if newcap == 0 {
735-
pool.wait.expire(true)
736-
}
742+
for {
743+
conn := pool.getFromSettingsStack(nil)
744+
if conn == nil {
745+
conn = pool.pop(&pool.clean)
746+
}
747+
if conn == nil {
748+
break
749+
}
737750

738-
// try closing from connections which are currently idle in the stacks
739-
conn := pool.getFromSettingsStack(nil)
740-
if conn == nil {
741-
conn = pool.pop(&pool.clean)
751+
// Close the connection but don't update the metrics as we've already updated them
752+
conn.Close()
742753
}
743-
if conn == nil {
744-
time.Sleep(delay)
745-
continue
754+
755+
pool.wait.expire(true)
756+
} else {
757+
const delay = 10 * time.Millisecond
758+
759+
// close connections until we're under capacity
760+
for pool.active.Load() > newcap {
761+
// try closing from connections which are currently idle in the stacks
762+
conn := pool.getFromSettingsStack(nil)
763+
if conn == nil {
764+
conn = pool.pop(&pool.clean)
765+
}
766+
767+
// Stop the loop if we don't have any more connections to close.
768+
// Connections that were borrowed by clients and are over capacity
769+
// will be closed when they are put back into the pool.
770+
if conn == nil {
771+
break
772+
}
773+
774+
conn.Close()
775+
pool.closedConn()
746776
}
747-
conn.Close()
748-
pool.closedConn()
749777
}
750778

751779
return nil

0 commit comments

Comments
 (0)