Skip to content

Commit 2538c40

Browse files
committed
Simplify custom type autoloading with pgxpool
Provide some backwards-compatible configuration options for pgxpool which streamlines the use of the bulk loading of custom types: - AutoLoadTypes: a list of type (or class) names to automatically load for each connection, automatically also loading any other types these depend on. - ReuseTypeMaps: if enabled, pgxpool will cache the typemap information, avoiding the need to perform any further queries as new connections are created. ReuseTypeMaps is disabled by default as in some situations, a connection string might resolve to a pool of servers which do not share the same type name -> OID mapping.
1 parent 4660f1d commit 2538c40

File tree

2 files changed

+92
-9
lines changed

2 files changed

+92
-9
lines changed

pgxpool/pool.go

Lines changed: 63 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,17 @@ import (
1212

1313
"github.com/jackc/pgx/v5"
1414
"github.com/jackc/pgx/v5/pgconn"
15+
"github.com/jackc/pgx/v5/pgtype"
1516
"github.com/jackc/puddle/v2"
1617
)
1718

18-
var defaultMaxConns = int32(4)
19-
var defaultMinConns = int32(0)
20-
var defaultMaxConnLifetime = time.Hour
21-
var defaultMaxConnIdleTime = time.Minute * 30
22-
var defaultHealthCheckPeriod = time.Minute
19+
var (
20+
defaultMaxConns = int32(4)
21+
defaultMinConns = int32(0)
22+
defaultMaxConnLifetime = time.Hour
23+
defaultMaxConnIdleTime = time.Minute * 30
24+
defaultHealthCheckPeriod = time.Minute
25+
)
2326

2427
type connResource struct {
2528
conn *pgx.Conn
@@ -100,6 +103,11 @@ type Pool struct {
100103

101104
closeOnce sync.Once
102105
closeChan chan struct{}
106+
107+
autoLoadTypeNames []string
108+
reuseTypeMap bool
109+
autoLoadMutex *sync.Mutex
110+
autoLoadTypes []*pgtype.Type
103111
}
104112

105113
// Config is the configuration struct for creating a pool. It must be created by [ParseConfig] and then it can be
@@ -147,6 +155,23 @@ type Config struct {
147155
// HealthCheckPeriod is the duration between checks of the health of idle connections.
148156
HealthCheckPeriod time.Duration
149157

158+
// AutoLoadTypes is a list of user-defined types which should automatically be loaded
159+
// as each new connection is created. This will also load any derived types, directly
160+
// or indirectly required to handle these types.
161+
// This is equivalent to manually calling pgx.LoadTypes()
162+
// followed by conn.TypeMap().RegisterTypes()
163+
// This will occur after the AfterConnect hook. If manual type
164+
// registrating is performed during AfterConnect, the autoloading
165+
// will be aware of those registrations.
166+
AutoLoadTypes []string
167+
168+
// ReuseTypeMaps, if enabled, will reuse the typemap information being used by AutoLoadTypes.
169+
// This removes the need to query the database each time a new connection is created;
170+
// only RegisterDerivedTypes will need to be called for each new connection.
171+
// In some situations, where OID mapping can differ between pg servers in the pool, perhaps due
172+
// to certain replication strategies, this should be left disabled.
173+
ReuseTypeMaps bool
174+
150175
createdByParseConfig bool // Used to enforce created by ParseConfig rule.
151176
}
152177

@@ -185,6 +210,8 @@ func NewWithConfig(ctx context.Context, config *Config) (*Pool, error) {
185210
config: config,
186211
beforeConnect: config.BeforeConnect,
187212
afterConnect: config.AfterConnect,
213+
autoLoadTypeNames: config.AutoLoadTypes,
214+
reuseTypeMap: config.ReuseTypeMaps,
188215
beforeAcquire: config.BeforeAcquire,
189216
afterRelease: config.AfterRelease,
190217
beforeClose: config.BeforeClose,
@@ -196,6 +223,7 @@ func NewWithConfig(ctx context.Context, config *Config) (*Pool, error) {
196223
healthCheckPeriod: config.HealthCheckPeriod,
197224
healthCheckChan: make(chan struct{}, 1),
198225
closeChan: make(chan struct{}),
226+
autoLoadMutex: new(sync.Mutex),
199227
}
200228

201229
if t, ok := config.ConnConfig.Tracer.(AcquireTracer); ok {
@@ -237,6 +265,15 @@ func NewWithConfig(ctx context.Context, config *Config) (*Pool, error) {
237265
}
238266
}
239267

268+
if p.autoLoadTypeNames != nil && len(p.autoLoadTypeNames) > 0 {
269+
types, err := p.loadTypes(ctx, conn, p.autoLoadTypeNames)
270+
if err != nil {
271+
conn.Close(ctx)
272+
return nil, err
273+
}
274+
conn.TypeMap().RegisterTypes(types)
275+
}
276+
240277
jitterSecs := rand.Float64() * config.MaxConnLifetimeJitter.Seconds()
241278
maxAgeTime := time.Now().Add(config.MaxConnLifetime).Add(time.Duration(jitterSecs) * time.Second)
242279

@@ -388,6 +425,27 @@ func (p *Pool) Close() {
388425
})
389426
}
390427

428+
// loadTypes is used internally to autoload the custom types for a connection,
429+
// potentially reusing previously-loaded typemap information.
430+
func (p *Pool) loadTypes(ctx context.Context, conn *pgx.Conn, typeNames []string) ([]*pgtype.Type, error) {
431+
if p.reuseTypeMap {
432+
p.autoLoadMutex.Lock()
433+
defer p.autoLoadMutex.Unlock()
434+
if p.autoLoadTypes != nil {
435+
return p.autoLoadTypes, nil
436+
}
437+
types, err := pgx.LoadTypes(ctx, conn, typeNames)
438+
if err != nil {
439+
return nil, err
440+
}
441+
p.autoLoadTypes = types
442+
return types, err
443+
}
444+
// Avoid needing to acquire the mutex and allow connections to initialise in parallel
445+
// if we have chosen to not reuse the type mapping
446+
return pgx.LoadTypes(ctx, conn, typeNames)
447+
}
448+
391449
func (p *Pool) isExpired(res *puddle.Resource[*connResource]) bool {
392450
return time.Now().After(res.Value().maxAgeTime)
393451
}
@@ -482,7 +540,6 @@ func (p *Pool) checkMinConns() error {
482540
func (p *Pool) createIdleResources(parentCtx context.Context, targetResources int) error {
483541
ctx, cancel := context.WithCancel(parentCtx)
484542
defer cancel()
485-
486543
errs := make(chan error, targetResources)
487544

488545
for i := 0; i < targetResources; i++ {
@@ -495,7 +552,6 @@ func (p *Pool) createIdleResources(parentCtx context.Context, targetResources in
495552
errs <- err
496553
}()
497554
}
498-
499555
var firstError error
500556
for i := 0; i < targetResources; i++ {
501557
err := <-errs

pgxpool/pool_test.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,35 @@ func TestPoolBeforeConnect(t *testing.T) {
261261
assert.EqualValues(t, "pgx", str)
262262
}
263263

264+
func TestAutoLoadTypes(t *testing.T) {
265+
t.Parallel()
266+
267+
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
268+
defer cancel()
269+
270+
config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
271+
require.NoError(t, err)
272+
273+
controllerConn, err := pgx.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
274+
require.NoError(t, err)
275+
defer controllerConn.Close(ctx)
276+
pgxtest.SkipCockroachDB(t, controllerConn, "Server does not support autoloading of uint64")
277+
db1, err := pgxpool.NewWithConfig(ctx, config)
278+
require.NoError(t, err)
279+
defer db1.Close()
280+
db1.Exec(ctx, "DROP DOMAIN IF EXISTS autoload_uint64; CREATE DOMAIN autoload_uint64 as numeric(20,0)")
281+
defer db1.Exec(ctx, "DROP DOMAIN autoload_uint64")
282+
283+
config.AutoLoadTypes = []string{"autoload_uint64"}
284+
db2, err := pgxpool.NewWithConfig(ctx, config)
285+
require.NoError(t, err)
286+
287+
var n uint64
288+
err = db2.QueryRow(ctx, "select 12::autoload_uint64").Scan(&n)
289+
require.NoError(t, err)
290+
assert.EqualValues(t, uint64(12), n)
291+
}
292+
264293
func TestPoolAfterConnect(t *testing.T) {
265294
t.Parallel()
266295

@@ -676,7 +705,6 @@ func TestPoolQuery(t *testing.T) {
676705
stats = pool.Stat()
677706
assert.EqualValues(t, 0, stats.AcquiredConns())
678707
assert.EqualValues(t, 1, stats.TotalConns())
679-
680708
}
681709

682710
func TestPoolQueryRow(t *testing.T) {
@@ -1104,7 +1132,6 @@ func TestConnectEagerlyReachesMinPoolSize(t *testing.T) {
11041132
}
11051133

11061134
t.Fatal("did not reach min pool size")
1107-
11081135
}
11091136

11101137
func TestPoolSendBatchBatchCloseTwice(t *testing.T) {

0 commit comments

Comments
 (0)