Skip to content

xdsclient: make sending requests more deterministic #7774

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 39 additions & 24 deletions xds/internal/xdsclient/transport/ads/ads_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
nonce string // Last received nonce. Should be reset when the stream breaks.
bufferedRequests chan struct{} // Channel to buffer requests when writing is blocked.
subscribedResources map[string]*ResourceWatchState // Map of subscribed resource names to their state.
pendingWrite bool // True if there is a pending write for this resource type.
}

// StreamImpl provides the functionality associated with an ADS (Aggregated
Expand Down Expand Up @@ -203,6 +204,7 @@
// Create state for the newly subscribed resource. The watch timer will
// be started when a request for this resource is actually sent out.
state.subscribedResources[name] = &ResourceWatchState{State: ResourceWatchStateStarted}
state.pendingWrite = true

// Send a request for the resource type with updated subscriptions.
s.requestCh.Put(typ)
Expand Down Expand Up @@ -233,6 +235,7 @@
rs.ExpiryTimer.Stop()
}
delete(state.subscribedResources, name)
state.pendingWrite = true

// Send a request for the resource type with updated subscriptions.
s.requestCh.Put(typ)
Expand Down Expand Up @@ -346,17 +349,7 @@
return nil
}

names := resourceNames(state.subscribedResources)
if err := s.sendMessageLocked(stream, names, typ.TypeURL(), state.version, state.nonce, nil); err != nil {
return err

}
select {
case <-state.bufferedRequests:
default:
}
s.startWatchTimersLocked(typ, names)
return nil
return s.sendMessageIfWritePendingLocked(stream, typ, state)
}

// sendExisting sends out discovery requests for existing resources when
Expand Down Expand Up @@ -385,18 +378,10 @@
continue
}

names := resourceNames(state.subscribedResources)
if s.logger.V(2) {
s.logger.Infof("Re-requesting resources %v of type %q, as the stream has been recreated", names, typ.TypeURL())
}
Comment on lines -389 to -391
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be put on line 381 now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to get rid of this log message since the resources are anyway logged from sendMessageLocked.

if err := s.sendMessageLocked(stream, names, typ.TypeURL(), state.version, state.nonce, nil); err != nil {
state.pendingWrite = true
if err := s.sendMessageIfWritePendingLocked(stream, typ, state); err != nil {
return err
}
select {
case <-state.bufferedRequests:
default:
}
s.startWatchTimersLocked(typ, names)
}
return nil
}
Expand All @@ -413,11 +398,9 @@
for typ, state := range s.resourceTypeState {
select {
case <-state.bufferedRequests:
names := resourceNames(state.subscribedResources)
if err := s.sendMessageLocked(stream, names, typ.TypeURL(), state.version, state.nonce, nil); err != nil {
if err := s.sendMessageIfWritePendingLocked(stream, typ, state); err != nil {

Check warning on line 401 in xds/internal/xdsclient/transport/ads/ads_stream.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/transport/ads/ads_stream.go#L401

Added line #L401 was not covered by tests
return err
}
s.startWatchTimersLocked(typ, names)
default:
// No buffered request.
continue
Expand All @@ -426,6 +409,38 @@
return nil
}

// sendMessageIfWritePendingLocked attempts to sends a discovery request to the
// server, if there is a pending write for the given resource type.
//
// If the request is successfully sent, the pending write field is cleared and
// watch timers are started for the resources in the request.
//
// Caller needs to hold c.mu.
func (s *StreamImpl) sendMessageIfWritePendingLocked(stream transport.StreamingCall, typ xdsresource.Type, state *resourceTypeState) error {
if !state.pendingWrite {
if s.logger.V(2) {
s.logger.Infof("Skipping sending request for type %q, because all subscribed resources were already sent", typ.TypeURL())
}

Check warning on line 423 in xds/internal/xdsclient/transport/ads/ads_stream.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/transport/ads/ads_stream.go#L422-L423

Added lines #L422 - L423 were not covered by tests
return nil
}

names := resourceNames(state.subscribedResources)
if err := s.sendMessageLocked(stream, names, typ.TypeURL(), state.version, state.nonce, nil); err != nil {
return err
}

Check warning on line 430 in xds/internal/xdsclient/transport/ads/ads_stream.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/transport/ads/ads_stream.go#L429-L430

Added lines #L429 - L430 were not covered by tests
state.pendingWrite = false

// Drain the buffered requests channel because we just sent a request for this
// resource type.
select {
case <-state.bufferedRequests:

Check warning on line 436 in xds/internal/xdsclient/transport/ads/ads_stream.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/transport/ads/ads_stream.go#L436

Added line #L436 was not covered by tests
default:
}

s.startWatchTimersLocked(typ, names)
return nil
}

// sendMessageLocked sends a discovery request to the server, populating the
// different fields of the message with the given parameters. Returns a non-nil
// error if the request could not be sent.
Expand Down
Loading