@@ -110,20 +110,12 @@ type connectionInfoCache interface {
110
110
io.Closer
111
111
}
112
112
113
- // monitoredCache is a wrapper around a connectionInfoCache that tracks the
114
- // number of connections to the associated instance.
115
- type monitoredCache struct {
116
- openConns * uint64
117
-
118
- connectionInfoCache
119
- }
120
-
121
113
// A Dialer is used to create connections to Cloud SQL instances.
122
114
//
123
115
// Use NewDialer to initialize a Dialer.
124
116
type Dialer struct {
125
117
lock sync.RWMutex
126
- cache map [instance.ConnName ]monitoredCache
118
+ cache map [instance.ConnName ]* monitoredCache
127
119
keyGenerator * keyGenerator
128
120
refreshTimeout time.Duration
129
121
// closed reports if the dialer has been closed.
@@ -155,7 +147,8 @@ type Dialer struct {
155
147
iamTokenSource oauth2.TokenSource
156
148
157
149
// resolver converts instance names into DNS names.
158
- resolver instance.ConnectionNameResolver
150
+ resolver instance.ConnectionNameResolver
151
+ failoverPeriod time.Duration
159
152
}
160
153
161
154
var (
@@ -179,6 +172,7 @@ func NewDialer(ctx context.Context, opts ...Option) (*Dialer, error) {
179
172
logger : nullLogger {},
180
173
useragents : []string {userAgent },
181
174
serviceUniverse : "googleapis.com" ,
175
+ failoverPeriod : cloudsql .FailoverPeriod ,
182
176
}
183
177
for _ , opt := range opts {
184
178
opt (cfg )
@@ -192,6 +186,7 @@ func NewDialer(ctx context.Context, opts ...Option) (*Dialer, error) {
192
186
if cfg .setIAMAuthNTokenSource && ! cfg .useIAMAuthN {
193
187
return nil , errUseTokenSource
194
188
}
189
+
195
190
// Add this to the end to make sure it's not overridden
196
191
cfg .sqladminOpts = append (cfg .sqladminOpts , option .WithUserAgent (strings .Join (cfg .useragents , " " )))
197
192
@@ -263,7 +258,7 @@ func NewDialer(ctx context.Context, opts ...Option) (*Dialer, error) {
263
258
264
259
d := & Dialer {
265
260
closed : make (chan struct {}),
266
- cache : make (map [instance.ConnName ]monitoredCache ),
261
+ cache : make (map [instance.ConnName ]* monitoredCache ),
267
262
lazyRefresh : cfg .lazyRefresh ,
268
263
keyGenerator : g ,
269
264
refreshTimeout : cfg .refreshTimeout ,
@@ -274,7 +269,9 @@ func NewDialer(ctx context.Context, opts ...Option) (*Dialer, error) {
274
269
iamTokenSource : cfg .iamLoginTokenSource ,
275
270
dialFunc : cfg .dialFunc ,
276
271
resolver : r ,
272
+ failoverPeriod : cfg .failoverPeriod ,
277
273
}
274
+
278
275
return d , nil
279
276
}
280
277
@@ -380,15 +377,24 @@ func (d *Dialer) Dial(ctx context.Context, icn string, opts ...DialOption) (conn
380
377
381
378
latency := time .Since (startTime ).Milliseconds ()
382
379
go func () {
383
- n := atomic .AddUint64 (c .openConns , 1 )
380
+ n := atomic .AddUint64 (c .openConnsCount , 1 )
384
381
trace .RecordOpenConnections (ctx , int64 (n ), d .dialerID , cn .String ())
385
382
trace .RecordDialLatency (ctx , icn , d .dialerID , latency )
386
383
}()
387
384
388
- return newInstrumentedConn (tlsConn , func () {
389
- n := atomic .AddUint64 (c .openConns , ^ uint64 (0 ))
385
+ iConn := newInstrumentedConn (tlsConn , func () {
386
+ n := atomic .AddUint64 (c .openConnsCount , ^ uint64 (0 ))
390
387
trace .RecordOpenConnections (context .Background (), int64 (n ), d .dialerID , cn .String ())
391
- }, d .dialerID , cn .String ()), nil
388
+ }, d .dialerID , cn .String ())
389
+
390
+ // If this connection was opened using a Domain Name, then store it for later
391
+ // in case it needs to be forcibly closed.
392
+ if cn .DomainName () != "" {
393
+ c .mu .Lock ()
394
+ c .openConns = append (c .openConns , iConn )
395
+ c .mu .Unlock ()
396
+ }
397
+ return iConn , nil
392
398
}
393
399
394
400
// removeCached stops all background refreshes and deletes the connection
@@ -448,7 +454,7 @@ func (d *Dialer) EngineVersion(ctx context.Context, icn string) (string, error)
448
454
}
449
455
ci , err := c .ConnectionInfo (ctx )
450
456
if err != nil {
451
- d .removeCached (ctx , cn , c , err )
457
+ d .removeCached (ctx , cn , c . connectionInfoCache , err )
452
458
return "" , err
453
459
}
454
460
return ci .DBVersion , nil
@@ -472,7 +478,7 @@ func (d *Dialer) Warmup(ctx context.Context, icn string, opts ...DialOption) err
472
478
}
473
479
_ , err = c .ConnectionInfo (ctx )
474
480
if err != nil {
475
- d .removeCached (ctx , cn , c , err )
481
+ d .removeCached (ctx , cn , c . connectionInfoCache , err )
476
482
}
477
483
return err
478
484
}
@@ -493,6 +499,8 @@ func newInstrumentedConn(conn net.Conn, closeFunc func(), dialerID, connName str
493
499
type instrumentedConn struct {
494
500
net.Conn
495
501
closeFunc func ()
502
+ mu sync.RWMutex
503
+ closed bool
496
504
dialerID string
497
505
connName string
498
506
}
@@ -517,9 +525,19 @@ func (i *instrumentedConn) Write(b []byte) (int, error) {
517
525
return bytesWritten , err
518
526
}
519
527
528
+ // isClosed returns true if this connection is closing or is already closed.
529
+ func (i * instrumentedConn ) isClosed () bool {
530
+ i .mu .RLock ()
531
+ defer i .mu .RUnlock ()
532
+ return i .closed
533
+ }
534
+
520
535
// Close delegates to the underlying net.Conn interface and reports the close
521
536
// to the provided closeFunc only when Close returns no error.
522
537
func (i * instrumentedConn ) Close () error {
538
+ i .mu .Lock ()
539
+ defer i .mu .Unlock ()
540
+ i .closed = true
523
541
err := i .Conn .Close ()
524
542
if err != nil {
525
543
return err
@@ -551,50 +569,104 @@ func (d *Dialer) Close() error {
551
569
// modify the existing one, or leave it unchanged as needed.
552
570
func (d * Dialer ) connectionInfoCache (
553
571
ctx context.Context , cn instance.ConnName , useIAMAuthN * bool ,
554
- ) (monitoredCache , error ) {
572
+ ) (* monitoredCache , error ) {
555
573
d .lock .RLock ()
556
574
c , ok := d .cache [cn ]
557
575
d .lock .RUnlock ()
558
- if ! ok {
559
- d .lock .Lock ()
560
- defer d .lock .Unlock ()
561
- // Recheck to ensure instance wasn't created or changed between locks
562
- c , ok = d .cache [cn ]
563
- if ! ok {
564
- var useIAMAuthNDial bool
565
- if useIAMAuthN != nil {
566
- useIAMAuthNDial = * useIAMAuthN
567
- }
568
- d .logger .Debugf (ctx , "[%v] Connection info added to cache" , cn .String ())
569
- k , err := d .keyGenerator .rsaKey ()
570
- if err != nil {
571
- return monitoredCache {}, err
572
- }
573
- var cache connectionInfoCache
574
- if d .lazyRefresh {
575
- cache = cloudsql .NewLazyRefreshCache (
576
- cn ,
577
- d .logger ,
578
- d .sqladmin , k ,
579
- d .refreshTimeout , d .iamTokenSource ,
580
- d .dialerID , useIAMAuthNDial ,
581
- )
582
- } else {
583
- cache = cloudsql .NewRefreshAheadCache (
584
- cn ,
585
- d .logger ,
586
- d .sqladmin , k ,
587
- d .refreshTimeout , d .iamTokenSource ,
588
- d .dialerID , useIAMAuthNDial ,
589
- )
590
- }
591
- var count uint64
592
- c = monitoredCache {openConns : & count , connectionInfoCache : cache }
593
- d .cache [cn ] = c
594
- }
576
+
577
+ // recheck the domain name, this may close the cache.
578
+ if ok {
579
+ c .checkDomainName (ctx )
580
+ }
581
+
582
+ if ok && ! c .isClosed () {
583
+ c .UpdateRefresh (useIAMAuthN )
584
+ return c , nil
595
585
}
596
586
597
- c .UpdateRefresh (useIAMAuthN )
587
+ d .lock .Lock ()
588
+ defer d .lock .Unlock ()
589
+
590
+ // Recheck to ensure instance wasn't created or changed between locks
591
+ c , ok = d .cache [cn ]
592
+
593
+ // c exists and is not closed
594
+ if ok && ! c .isClosed () {
595
+ c .UpdateRefresh (useIAMAuthN )
596
+ return c , nil
597
+ }
598
+
599
+ // c exists and is closed, remove it from the cache
600
+ if ok {
601
+ // remove it.
602
+ _ = c .Close ()
603
+ delete (d .cache , cn )
604
+ }
605
+
606
+ // c does not exist, check for matching domain and close it
607
+ oldCn , old , ok := d .findByDn (cn )
608
+ if ok {
609
+ _ = old .Close ()
610
+ delete (d .cache , oldCn )
611
+ }
612
+
613
+ // Create a new instance of monitoredCache
614
+ var useIAMAuthNDial bool
615
+ if useIAMAuthN != nil {
616
+ useIAMAuthNDial = * useIAMAuthN
617
+ }
618
+ d .logger .Debugf (ctx , "[%v] Connection info added to cache" , cn .String ())
619
+ k , err := d .keyGenerator .rsaKey ()
620
+ if err != nil {
621
+ return nil , err
622
+ }
623
+ var cache connectionInfoCache
624
+ if d .lazyRefresh {
625
+ cache = cloudsql .NewLazyRefreshCache (
626
+ cn ,
627
+ d .logger ,
628
+ d .sqladmin , k ,
629
+ d .refreshTimeout , d .iamTokenSource ,
630
+ d .dialerID , useIAMAuthNDial ,
631
+ )
632
+ } else {
633
+ cache = cloudsql .NewRefreshAheadCache (
634
+ cn ,
635
+ d .logger ,
636
+ d .sqladmin , k ,
637
+ d .refreshTimeout , d .iamTokenSource ,
638
+ d .dialerID , useIAMAuthNDial ,
639
+ )
640
+ }
641
+ c = newMonitoredCache (ctx , cache , cn , d .failoverPeriod , d .resolver , d .logger )
642
+ d .cache [cn ] = c
598
643
599
644
return c , nil
600
645
}
646
+
647
+ // getOrAdd returns the cache entry, creating it if necessary. This will also
648
+ // take care to remove entries with the same domain name.
649
+ //
650
+ // cn - the connection name to getOrAdd
651
+ //
652
+ // returns:
653
+ //
654
+ // monitoredCache - the cached entry
655
+ // bool ok - the instance exists
656
+ // instance.ConnName - the key to the old entry with the same domain name
657
+ //
658
+ // This method does not manage locks.
659
+ func (d * Dialer ) findByDn (cn instance.ConnName ) (instance.ConnName , * monitoredCache , bool ) {
660
+
661
+ // Try to get an instance with the same domain name but different instance
662
+ // Remove this instance from the cache, it will be replaced.
663
+ if cn .HasDomainName () {
664
+ for oldCn , oc := range d .cache {
665
+ if oldCn .DomainName () == cn .DomainName () && oldCn != cn {
666
+ return oldCn , oc , true
667
+ }
668
+ }
669
+ }
670
+
671
+ return instance.ConnName {}, nil , false
672
+ }
0 commit comments