@@ -107,41 +107,10 @@ type connectionInfoCache interface {
107
107
ConnectionInfo (context.Context ) (cloudsql.ConnectionInfo , error )
108
108
UpdateRefresh (* bool )
109
109
ForceRefresh ()
110
+ UseIAMAuthN () bool
110
111
io.Closer
111
112
}
112
113
113
- // monitoredCache is a wrapper around a connectionInfoCache that tracks the
114
- // number of connections to the associated instance.
115
- type monitoredCache struct {
116
- openConnsCount * uint64
117
-
118
- connectionInfoCache
119
- }
120
-
121
- func (c * monitoredCache ) Close () error {
122
- return c .connectionInfoCache .Close ()
123
- }
124
-
125
- func (c * monitoredCache ) ForceRefresh () {
126
- if c == nil || c .connectionInfoCache == nil {
127
- return
128
- }
129
- c .connectionInfoCache .ForceRefresh ()
130
- }
131
-
132
- func (c * monitoredCache ) UpdateRefresh (b * bool ) {
133
- if c == nil || c .connectionInfoCache == nil {
134
- return
135
- }
136
- c .connectionInfoCache .UpdateRefresh (b )
137
- }
138
- func (c * monitoredCache ) ConnectionInfo (ctx context.Context ) (cloudsql.ConnectionInfo , error ) {
139
- if c == nil || c .connectionInfoCache == nil {
140
- return cloudsql.ConnectionInfo {}, nil
141
- }
142
- return c .connectionInfoCache .ConnectionInfo (ctx )
143
- }
144
-
145
114
// A Dialer is used to create connections to Cloud SQL instances.
146
115
//
147
116
// Use NewDialer to initialize a Dialer.
@@ -178,7 +147,8 @@ type Dialer struct {
178
147
iamTokenSource oauth2.TokenSource
179
148
180
149
// resolver converts instance names into DNS names.
181
- resolver instance.ConnectionNameResolver
150
+ resolver instance.ConnectionNameResolver
151
+ failoverPeriod time.Duration
182
152
}
183
153
184
154
var (
@@ -202,6 +172,7 @@ func NewDialer(ctx context.Context, opts ...Option) (*Dialer, error) {
202
172
logger : nullLogger {},
203
173
useragents : []string {userAgent },
204
174
serviceUniverse : "googleapis.com" ,
175
+ failoverPeriod : cloudsql .FailoverPeriod ,
205
176
}
206
177
for _ , opt := range opts {
207
178
opt (cfg )
@@ -215,6 +186,7 @@ func NewDialer(ctx context.Context, opts ...Option) (*Dialer, error) {
215
186
if cfg .setIAMAuthNTokenSource && ! cfg .useIAMAuthN {
216
187
return nil , errUseTokenSource
217
188
}
189
+
218
190
// Add this to the end to make sure it's not overridden
219
191
cfg .sqladminOpts = append (cfg .sqladminOpts , option .WithUserAgent (strings .Join (cfg .useragents , " " )))
220
192
@@ -297,7 +269,9 @@ func NewDialer(ctx context.Context, opts ...Option) (*Dialer, error) {
297
269
iamTokenSource : cfg .iamLoginTokenSource ,
298
270
dialFunc : cfg .dialFunc ,
299
271
resolver : r ,
272
+ failoverPeriod : cfg .failoverPeriod ,
300
273
}
274
+
301
275
return d , nil
302
276
}
303
277
@@ -413,6 +387,13 @@ func (d *Dialer) Dial(ctx context.Context, icn string, opts ...DialOption) (conn
413
387
trace .RecordOpenConnections (context .Background (), int64 (n ), d .dialerID , cn .String ())
414
388
}, d .dialerID , cn .String ())
415
389
390
+ // If this connection was opened using a Domain Name, then store it for later
391
+ // in case it needs to be forcibly closed.
392
+ if cn .DomainName () != "" {
393
+ c .mu .Lock ()
394
+ c .openConns = append (c .openConns , iConn )
395
+ c .mu .Unlock ()
396
+ }
416
397
return iConn , nil
417
398
}
418
399
@@ -514,6 +495,7 @@ func newInstrumentedConn(conn net.Conn, closeFunc func(), dialerID, connName str
514
495
type instrumentedConn struct {
515
496
net.Conn
516
497
closeFunc func ()
498
+ mu sync.RWMutex
517
499
closed bool
518
500
dialerID string
519
501
connName string
@@ -539,9 +521,18 @@ func (i *instrumentedConn) Write(b []byte) (int, error) {
539
521
return bytesWritten , err
540
522
}
541
523
524
+ // isClosed returns true if this connection is closing or is already closed.
525
+ func (i * instrumentedConn ) isClosed () bool {
526
+ i .mu .RLock ()
527
+ defer i .mu .RUnlock ()
528
+ return i .closed
529
+ }
530
+
542
531
// Close delegates to the underlying net.Conn interface and reports the close
543
532
// to the provided closeFunc only when Close returns no error.
544
533
func (i * instrumentedConn ) Close () error {
534
+ i .mu .Lock ()
535
+ defer i .mu .Unlock ()
545
536
i .closed = true
546
537
err := i .Conn .Close ()
547
538
if err != nil {
@@ -582,6 +573,8 @@ func (d *Dialer) connectionInfoCache(
582
573
})
583
574
584
575
if old != nil {
576
+ // ensure that the old cache entry is closed.
577
+ // monitoredCache.Close() may be called safely, even if old is closed.
585
578
old .Close ()
586
579
}
587
580
@@ -621,10 +614,7 @@ func (d *Dialer) createConnectionInfoCache(
621
614
d .dialerID , useIAMAuthNDial ,
622
615
)
623
616
}
624
- c := & monitoredCache {
625
- openConnsCount : new (uint64 ),
626
- connectionInfoCache : cache ,
627
- }
617
+ c := newMonitoredCache (ctx , cache , cn , d .failoverPeriod , d .resolver , d .logger )
628
618
629
619
c .UpdateRefresh (useIAMAuthN )
630
620
0 commit comments