-
Notifications
You must be signed in to change notification settings - Fork 4.5k
pickfirst: Implement Happy Eyeballs #7725
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 24 commits
db0dda7
826bb03
4e68e58
fe69816
3022304
0a3ffd3
6697267
67f7a1a
9712ec5
6c8fb41
04a912f
0bb745f
99e2e89
592ba0d
84d6ed4
8f63d8e
09f27c6
6610516
d6bc007
19a3165
598fdd0
d3bde50
8b4b28e
6c16943
11fe515
5c4ff49
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ | |
"fmt" | ||
"net" | ||
"sync" | ||
"time" | ||
|
||
"google.golang.org/grpc/balancer" | ||
"google.golang.org/grpc/balancer/pickfirst/internal" | ||
|
@@ -59,8 +60,13 @@ | |
Name = "pick_first_leaf" | ||
) | ||
|
||
// TODO: change to pick-first when this becomes the default pick_first policy. | ||
const logPrefix = "[pick-first-leaf-lb %p] " | ||
const ( | ||
// TODO: change to pick-first when this becomes the default pick_first policy. | ||
logPrefix = "[pick-first-leaf-lb %p] " | ||
// connectionDelayInterval is the time to wait for during the happy eyeballs | ||
// pass before starting the next connection attempt. | ||
connectionDelayInterval = 250 * time.Millisecond | ||
) | ||
|
||
type ipAddrFamily int | ||
|
||
|
@@ -76,11 +82,12 @@ | |
|
||
func (pickfirstBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer { | ||
b := &pickfirstBalancer{ | ||
cc: cc, | ||
addressList: addressList{}, | ||
subConns: resolver.NewAddressMap(), | ||
state: connectivity.Connecting, | ||
mu: sync.Mutex{}, | ||
cc: cc, | ||
addressList: addressList{}, | ||
subConns: resolver.NewAddressMap(), | ||
state: connectivity.Connecting, | ||
mu: sync.Mutex{}, | ||
cancelConnectionTimer: func() {}, | ||
} | ||
b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b)) | ||
return b | ||
|
@@ -115,8 +122,9 @@ | |
subConn balancer.SubConn | ||
addr resolver.Address | ||
|
||
state connectivity.State | ||
lastErr error | ||
state connectivity.State | ||
lastErr error | ||
connectionFailed bool | ||
} | ||
|
||
func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) { | ||
|
@@ -148,10 +156,11 @@ | |
mu sync.Mutex | ||
state connectivity.State | ||
// scData for active subonns mapped by address. | ||
subConns *resolver.AddressMap | ||
addressList addressList | ||
firstPass bool | ||
numTF int | ||
subConns *resolver.AddressMap | ||
addressList addressList | ||
firstPass bool | ||
numTF int | ||
cancelConnectionTimer func() | ||
} | ||
|
||
// ResolverError is called by the ClientConn when the name resolver produces | ||
|
@@ -186,6 +195,7 @@ | |
func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error { | ||
b.mu.Lock() | ||
defer b.mu.Unlock() | ||
b.cancelConnectionTimer() | ||
if len(state.ResolverState.Addresses) == 0 && len(state.ResolverState.Endpoints) == 0 { | ||
// Cleanup state pertaining to the previous resolver state. | ||
// Treat an empty address list like an error by calling b.ResolverError. | ||
|
@@ -239,12 +249,8 @@ | |
// Not de-duplicating would result in attempting to connect to the same | ||
// SubConn multiple times in the same pass. We don't want this. | ||
newAddrs = deDupAddresses(newAddrs) | ||
|
||
newAddrs = interleaveAddresses(newAddrs) | ||
|
||
// Since we have a new set of addresses, we are again at first pass. | ||
b.firstPass = true | ||
|
||
// If the previous ready SubConn exists in new address list, | ||
// keep this connection and don't create new SubConns. | ||
prevAddr := b.addressList.currentAddress() | ||
|
@@ -269,11 +275,11 @@ | |
ConnectivityState: connectivity.Connecting, | ||
Picker: &picker{err: balancer.ErrNoSubConnAvailable}, | ||
}) | ||
b.requestConnectionLocked() | ||
b.startFirstPassLocked() | ||
} else if b.state == connectivity.TransientFailure { | ||
// If we're in TRANSIENT_FAILURE, we stay in TRANSIENT_FAILURE until | ||
// we're READY. See A62. | ||
b.requestConnectionLocked() | ||
b.startFirstPassLocked() | ||
} | ||
return nil | ||
} | ||
|
@@ -288,6 +294,7 @@ | |
b.mu.Lock() | ||
defer b.mu.Unlock() | ||
b.closeSubConnsLocked() | ||
b.cancelConnectionTimer() | ||
b.state = connectivity.Shutdown | ||
} | ||
|
||
|
@@ -297,12 +304,21 @@ | |
func (b *pickfirstBalancer) ExitIdle() { | ||
b.mu.Lock() | ||
defer b.mu.Unlock() | ||
if b.state == connectivity.Idle && b.addressList.currentAddress() == b.addressList.first() { | ||
b.firstPass = true | ||
b.requestConnectionLocked() | ||
if b.state == connectivity.Idle { | ||
b.startFirstPassLocked() | ||
} | ||
} | ||
|
||
func (b *pickfirstBalancer) startFirstPassLocked() { | ||
b.firstPass = true | ||
b.numTF = 0 | ||
// Reset the connection attempt record for existing SubConns. | ||
for _, sd := range b.subConns.Values() { | ||
sd.(*scData).connectionFailed = false | ||
} | ||
b.requestConnectionLocked() | ||
} | ||
|
||
func (b *pickfirstBalancer) closeSubConnsLocked() { | ||
for _, sd := range b.subConns.Values() { | ||
sd.(*scData).subConn.Shutdown() | ||
|
@@ -413,6 +429,7 @@ | |
// shutdownRemainingLocked shuts down remaining subConns. Called when a subConn | ||
// becomes ready, which means that all other subConn must be shutdown. | ||
func (b *pickfirstBalancer) shutdownRemainingLocked(selected *scData) { | ||
b.cancelConnectionTimer() | ||
for _, v := range b.subConns.Values() { | ||
sd := v.(*scData) | ||
if sd.subConn != selected.subConn { | ||
|
@@ -456,30 +473,73 @@ | |
switch scd.state { | ||
easwars marked this conversation as resolved.
Show resolved
Hide resolved
|
||
case connectivity.Idle: | ||
scd.subConn.Connect() | ||
b.scheduleNextConnectionLocked() | ||
return | ||
case connectivity.TransientFailure: | ||
// Try the next address. | ||
// The SubConn is being re-used and failed during a previous pass | ||
// over the addressList. It has not completed backoff yet. | ||
// Mark it as having failed and try the next address. | ||
scd.connectionFailed = true | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Consider a subchannel is being re-used after getting a resolver update because it's address is present in the new address list. The subchannel has already failed, it has
The above steps ensure that the subchannel always has a non-nil error to update the picker. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, I see what's happening here, thanks for the explanation. Maybe name it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Renamed to |
||
lastErr = scd.lastErr | ||
continue | ||
case connectivity.Ready: | ||
dfawley marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// Should never happen. | ||
b.logger.Errorf("Requesting a connection even though we have a READY SubConn") | ||
return | ||
case connectivity.Shutdown: | ||
// Should never happen. | ||
b.logger.Errorf("SubConn with state SHUTDOWN present in SubConns map") | ||
return | ||
case connectivity.Connecting: | ||
// Wait for the SubConn to report success or failure. | ||
// Wait for the connection attempt to complete or the timer to fire | ||
// before attempting the next address. | ||
b.scheduleNextConnectionLocked() | ||
dfawley marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return | ||
} | ||
return | ||
} | ||
|
||
// All the remaining addresses in the list are in TRANSIENT_FAILURE, end the | ||
// first pass. | ||
b.endFirstPassLocked(lastErr) | ||
// first pass if possible. | ||
b.endFirstPassIfPossibleLocked(lastErr) | ||
} | ||
|
||
func (b *pickfirstBalancer) scheduleNextConnectionLocked() { | ||
b.cancelConnectionTimer() | ||
if !b.addressList.hasNext() { | ||
return | ||
} | ||
curAddr := b.addressList.currentAddress() | ||
cancelled := false // Access to this is protected by the balancer's mutex. | ||
closeFn := internal.TimeAfterFunc(connectionDelayInterval, func() { | ||
b.mu.Lock() | ||
defer b.mu.Unlock() | ||
// If the scheduled task is cancelled while acquiring the mutex, return. | ||
if cancelled { | ||
return | ||
} | ||
if b.logger.V(2) { | ||
b.logger.Infof("Happy Eyeballs timer expired while waiting for connection to %q.", curAddr.Addr) | ||
} | ||
if b.addressList.increment() { | ||
b.requestConnectionLocked() | ||
} | ||
}) | ||
// Access to the cancellation callback held by the balancer is guarded by | ||
// the balancer's mutex, so it's safe to set the boolean from the callback. | ||
b.cancelConnectionTimer = sync.OnceFunc(func() { | ||
cancelled = true | ||
easwars marked this conversation as resolved.
Show resolved
Hide resolved
|
||
closeFn() | ||
}) | ||
} | ||
|
||
func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.SubConnState) { | ||
b.mu.Lock() | ||
defer b.mu.Unlock() | ||
oldState := sd.state | ||
// Record a connection attempt when exiting CONNECTING. | ||
if newState.ConnectivityState == connectivity.TransientFailure { | ||
sd.connectionFailed = true | ||
} | ||
sd.state = newState.ConnectivityState | ||
// Previously relevant SubConns can still callback with state updates. | ||
// To prevent pickers from returning these obsolete SubConns, this logic | ||
|
@@ -545,17 +605,20 @@ | |
sd.lastErr = newState.ConnectionError | ||
// Since we're re-using common SubConns while handling resolver | ||
// updates, we could receive an out of turn TRANSIENT_FAILURE from | ||
// a pass over the previous address list. We ignore such updates. | ||
|
||
if curAddr := b.addressList.currentAddress(); !equalAddressIgnoringBalAttributes(&curAddr, &sd.addr) { | ||
return | ||
} | ||
if b.addressList.increment() { | ||
b.requestConnectionLocked() | ||
return | ||
// a pass over the previous address list. Happy Eyeballs will also | ||
// cause out of order updates to arrive. | ||
|
||
if curAddr := b.addressList.currentAddress(); equalAddressIgnoringBalAttributes(&curAddr, &sd.addr) { | ||
b.cancelConnectionTimer() | ||
if b.addressList.increment() { | ||
b.requestConnectionLocked() | ||
return | ||
} | ||
} | ||
// End of the first pass. | ||
b.endFirstPassLocked(newState.ConnectionError) | ||
|
||
// End the first pass if we've seen a TRANSIENT_FAILURE from all | ||
// SubConns once. | ||
b.endFirstPassIfPossibleLocked(newState.ConnectionError) | ||
} | ||
return | ||
} | ||
|
@@ -580,9 +643,17 @@ | |
} | ||
} | ||
|
||
func (b *pickfirstBalancer) endFirstPassLocked(lastErr error) { | ||
func (b *pickfirstBalancer) endFirstPassIfPossibleLocked(lastErr error) { | ||
if b.addressList.isValid() || b.subConns.Len() < b.addressList.size() { | ||
dfawley marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return | ||
} | ||
for _, v := range b.subConns.Values() { | ||
sd := v.(*scData) | ||
if !sd.connectionFailed { | ||
return | ||
} | ||
} | ||
b.firstPass = false | ||
b.numTF = 0 | ||
b.state = connectivity.TransientFailure | ||
|
||
b.cc.UpdateState(balancer.State{ | ||
dfawley marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
@@ -654,15 +725,6 @@ | |
return al.addresses[al.idx] | ||
} | ||
|
||
// first returns the first address in the list. If the list is empty, it returns | ||
// an empty address instead. | ||
func (al *addressList) first() resolver.Address { | ||
if len(al.addresses) == 0 { | ||
return resolver.Address{} | ||
} | ||
return al.addresses[0] | ||
} | ||
|
||
func (al *addressList) reset() { | ||
al.idx = 0 | ||
} | ||
|
@@ -685,6 +747,16 @@ | |
return false | ||
} | ||
|
||
// hasNext returns whether incrementing the addressList will result in moving | ||
// past the end of the list. If the list has already moved past the end, it | ||
// returns false. | ||
func (al *addressList) hasNext() bool { | ||
if !al.isValid() { | ||
return false | ||
} | ||
return al.idx+1 < len(al.addresses) | ||
} | ||
|
||
// equalAddressIgnoringBalAttributes returns true is a and b are considered | ||
// equal. This is different from the Equal method on the resolver.Address type | ||
// which considers all fields to determine equality. Here, we only consider | ||
|
Uh oh!
There was an error while loading. Please reload this page.