@@ -20,9 +20,13 @@ package client
20
20
21
21
import (
22
22
"context"
23
+ "fmt"
23
24
"testing"
24
25
"time"
25
26
27
+ "github.com/google/go-cmp/cmp"
28
+ "github.com/google/go-cmp/cmp/cmpopts"
29
+
26
30
"google.golang.org/grpc"
27
31
"google.golang.org/grpc/internal/grpctest"
28
32
"google.golang.org/grpc/internal/testutils"
@@ -49,7 +53,8 @@ const (
49
53
testEDSName = "test-eds"
50
54
51
55
defaultTestWatchExpiryTimeout = 500 * time .Millisecond
52
- defaultTestTimeout = 1 * time .Second
56
+ defaultTestTimeout = 5 * time .Second
57
+ defaultTestShortTimeout = 10 * time .Millisecond // For events expected to *not* happen.
53
58
)
54
59
55
60
func clientOpts (balancerName string , overrideWatchExpiryTImeout bool ) Options {
@@ -68,24 +73,22 @@ func clientOpts(balancerName string, overrideWatchExpiryTImeout bool) Options {
68
73
}
69
74
70
75
type testAPIClient struct {
71
- r UpdateHandler
72
-
73
76
addWatches map [ResourceType ]* testutils.Channel
74
77
removeWatches map [ResourceType ]* testutils.Channel
75
78
}
76
79
77
- func overrideNewAPIClient () (<- chan * testAPIClient , func ()) {
80
+ func overrideNewAPIClient () (* testutils. Channel , func ()) {
78
81
origNewAPIClient := newAPIClient
79
- ch := make ( chan * testAPIClient , 1 )
82
+ ch := testutils . NewChannel ( )
80
83
newAPIClient = func (apiVersion version.TransportAPI , cc * grpc.ClientConn , opts BuildOptions ) (APIClient , error ) {
81
- ret := newTestAPIClient (opts . Parent )
82
- ch <- ret
84
+ ret := newTestAPIClient ()
85
+ ch . Send ( ret )
83
86
return ret , nil
84
87
}
85
88
return ch , func () { newAPIClient = origNewAPIClient }
86
89
}
87
90
88
- func newTestAPIClient (r UpdateHandler ) * testAPIClient {
91
+ func newTestAPIClient () * testAPIClient {
89
92
addWatches := map [ResourceType ]* testutils.Channel {
90
93
ListenerResource : testutils .NewChannel (),
91
94
RouteConfigResource : testutils .NewChannel (),
@@ -99,7 +102,6 @@ func newTestAPIClient(r UpdateHandler) *testAPIClient {
99
102
EndpointsResource : testutils .NewChannel (),
100
103
}
101
104
return & testAPIClient {
102
- r : r ,
103
105
addWatches : addWatches ,
104
106
removeWatches : removeWatches ,
105
107
}
@@ -121,53 +123,108 @@ func (c *testAPIClient) Close() {}
121
123
// TestWatchCallAnotherWatch covers the case where watch() is called inline by a
122
124
// callback. It makes sure it doesn't cause a deadlock.
123
125
func (s ) TestWatchCallAnotherWatch (t * testing.T ) {
124
- v2ClientCh , cleanup := overrideNewAPIClient ()
126
+ apiClientCh , cleanup := overrideNewAPIClient ()
125
127
defer cleanup ()
126
128
127
- c , err := New (clientOpts (testXDSServer , false ))
129
+ client , err := New (clientOpts (testXDSServer , false ))
128
130
if err != nil {
129
131
t .Fatalf ("failed to create client: %v" , err )
130
132
}
131
- defer c .Close ()
133
+ defer client .Close ()
132
134
133
- v2Client := <- v2ClientCh
135
+ ctx , cancel := context .WithTimeout (context .Background (), defaultTestTimeout )
136
+ defer cancel ()
137
+ c , err := apiClientCh .Receive (ctx )
138
+ if err != nil {
139
+ t .Fatalf ("timeout when waiting for API client to be created: %v" , err )
140
+ }
141
+ apiClient := c .(* testAPIClient )
134
142
135
143
clusterUpdateCh := testutils .NewChannel ()
136
144
firstTime := true
137
- c .WatchCluster (testCDSName , func (update ClusterUpdate , err error ) {
145
+ client .WatchCluster (testCDSName , func (update ClusterUpdate , err error ) {
138
146
clusterUpdateCh .Send (clusterUpdateErr {u : update , err : err })
139
147
// Calls another watch inline, to ensure there's deadlock.
140
- c .WatchCluster ("another-random-name" , func (ClusterUpdate , error ) {})
148
+ client .WatchCluster ("another-random-name" , func (ClusterUpdate , error ) {})
141
149
142
- ctx , cancel := context .WithTimeout (context .Background (), defaultTestTimeout )
143
- defer cancel ()
144
- if _ , err := v2Client .addWatches [ClusterResource ].Receive (ctx ); firstTime && err != nil {
150
+ if _ , err := apiClient .addWatches [ClusterResource ].Receive (ctx ); firstTime && err != nil {
145
151
t .Fatalf ("want new watch to start, got error %v" , err )
146
152
}
147
153
firstTime = false
148
154
})
149
-
150
- ctx , cancel := context .WithTimeout (context .Background (), defaultTestTimeout )
151
- defer cancel ()
152
- if _ , err := v2Client .addWatches [ClusterResource ].Receive (ctx ); err != nil {
155
+ if _ , err := apiClient .addWatches [ClusterResource ].Receive (ctx ); err != nil {
153
156
t .Fatalf ("want new watch to start, got error %v" , err )
154
157
}
155
158
156
159
wantUpdate := ClusterUpdate {ServiceName : testEDSName }
157
- v2Client .r .NewClusters (map [string ]ClusterUpdate {
158
- testCDSName : wantUpdate ,
159
- })
160
-
161
- if u , err := clusterUpdateCh .Receive (ctx ); err != nil || u != (clusterUpdateErr {wantUpdate , nil }) {
162
- t .Errorf ("unexpected clusterUpdate: %v, error receiving from channel: %v" , u , err )
160
+ client .NewClusters (map [string ]ClusterUpdate {testCDSName : wantUpdate })
161
+ if err := verifyClusterUpdate (ctx , clusterUpdateCh , wantUpdate ); err != nil {
162
+ t .Fatal (err )
163
163
}
164
164
165
165
wantUpdate2 := ClusterUpdate {ServiceName : testEDSName + "2" }
166
- v2Client .r .NewClusters (map [string ]ClusterUpdate {
167
- testCDSName : wantUpdate2 ,
168
- })
166
+ client .NewClusters (map [string ]ClusterUpdate {testCDSName : wantUpdate2 })
167
+ if err := verifyClusterUpdate (ctx , clusterUpdateCh , wantUpdate2 ); err != nil {
168
+ t .Fatal (err )
169
+ }
170
+ }
171
+
172
+ func verifyListenerUpdate (ctx context.Context , updateCh * testutils.Channel , wantUpdate ListenerUpdate ) error {
173
+ u , err := updateCh .Receive (ctx )
174
+ if err != nil {
175
+ return fmt .Errorf ("timeout when waiting for listener update: %v" , err )
176
+ }
177
+ gotUpdate := u .(ldsUpdateErr )
178
+ if gotUpdate .err != nil || ! cmp .Equal (gotUpdate .u , wantUpdate ) {
179
+ return fmt .Errorf ("unexpected endpointsUpdate: (%v, %v), want: (%v, nil)" , gotUpdate .u , gotUpdate .err , wantUpdate )
180
+ }
181
+ return nil
182
+ }
183
+
184
+ func verifyRouteConfigUpdate (ctx context.Context , updateCh * testutils.Channel , wantUpdate RouteConfigUpdate ) error {
185
+ u , err := updateCh .Receive (ctx )
186
+ if err != nil {
187
+ return fmt .Errorf ("timeout when waiting for route configuration update: %v" , err )
188
+ }
189
+ gotUpdate := u .(rdsUpdateErr )
190
+ if gotUpdate .err != nil || ! cmp .Equal (gotUpdate .u , wantUpdate ) {
191
+ return fmt .Errorf ("unexpected route config update: (%v, %v), want: (%v, nil)" , gotUpdate .u , gotUpdate .err , wantUpdate )
192
+ }
193
+ return nil
194
+ }
195
+
196
+ func verifyServiceUpdate (ctx context.Context , updateCh * testutils.Channel , wantUpdate ServiceUpdate ) error {
197
+ u , err := updateCh .Receive (ctx )
198
+ if err != nil {
199
+ return fmt .Errorf ("timeout when waiting for service update: %v" , err )
200
+ }
201
+ gotUpdate := u .(serviceUpdateErr )
202
+ if gotUpdate .err != nil || ! cmp .Equal (gotUpdate .u , wantUpdate , cmpopts .EquateEmpty ()) {
203
+ return fmt .Errorf ("unexpected service update: (%v, %v), want: (%v, nil)" , gotUpdate .u , gotUpdate .err , wantUpdate )
204
+ }
205
+ return nil
206
+ }
169
207
170
- if u , err := clusterUpdateCh .Receive (ctx ); err != nil || u != (clusterUpdateErr {wantUpdate2 , nil }) {
171
- t .Errorf ("unexpected clusterUpdate: %v, error receiving from channel: %v" , u , err )
208
+ func verifyClusterUpdate (ctx context.Context , updateCh * testutils.Channel , wantUpdate ClusterUpdate ) error {
209
+ u , err := updateCh .Receive (ctx )
210
+ if err != nil {
211
+ return fmt .Errorf ("timeout when waiting for cluster update: %v" , err )
212
+ }
213
+ gotUpdate := u .(clusterUpdateErr )
214
+ if gotUpdate .err != nil || ! cmp .Equal (gotUpdate .u , wantUpdate ) {
215
+ return fmt .Errorf ("unexpected clusterUpdate: (%v, %v), want: (%v, nil)" , gotUpdate .u , gotUpdate .err , wantUpdate )
216
+ }
217
+ return nil
218
+ }
219
+
220
+ func verifyEndpointsUpdate (ctx context.Context , updateCh * testutils.Channel , wantUpdate EndpointsUpdate ) error {
221
+ u , err := updateCh .Receive (ctx )
222
+ if err != nil {
223
+ return fmt .Errorf ("timeout when waiting for endpoints update: %v" , err )
224
+ }
225
+ gotUpdate := u .(endpointsUpdateErr )
226
+ if gotUpdate .err != nil || ! cmp .Equal (gotUpdate .u , wantUpdate , cmpopts .EquateEmpty ()) {
227
+ return fmt .Errorf ("unexpected endpointsUpdate: (%v, %v), want: (%v, nil)" , gotUpdate .u , gotUpdate .err , wantUpdate )
172
228
}
229
+ return nil
173
230
}
0 commit comments