@@ -15,6 +15,7 @@ import (
15
15
"google.golang.org/grpc"
16
16
"google.golang.org/grpc/codes"
17
17
"google.golang.org/grpc/status"
18
+ "k8s.io/apimachinery/pkg/types"
18
19
19
20
"github.com/datawire/dlib/dlog"
20
21
"github.com/datawire/dlib/dtime"
@@ -48,7 +49,7 @@ func (ac *client) String() string {
48
49
return "<nil>"
49
50
}
50
51
ai := ac .info
51
- return fmt .Sprintf ("%s.%s:%d " , ai .PodName , ai .Namespace , ai .ApiPort )
52
+ return fmt .Sprintf ("%s(%s), port %d, dormant %t " , ai .PodName , net . IP ( ai .PodIp ) , ai .ApiPort , ac . dormant () )
52
53
}
53
54
54
55
func (ac * client ) ensureConnect (ctx context.Context ) (err error ) {
@@ -78,8 +79,10 @@ func (ac *client) Tunnel(ctx context.Context, opts ...grpc.CallOption) (tunnel.C
78
79
// Client was closed.
79
80
return nil , io .EOF
80
81
}
82
+ dlog .Tracef (ctx , "%s(%s) creating Tunnel over gRPC" , ac , net .IP (ac .info .PodIp ))
81
83
tc , err := cli .Tunnel (ctx , opts ... )
82
84
if err != nil {
85
+ dlog .Tracef (ctx , "%s(%s) failed to create Tunnel over gRPC: %v" , ac , net .IP (ac .info .PodIp ), err )
83
86
return nil , err
84
87
}
85
88
atomic .AddInt32 (& ac .tunnelCount , 1 )
@@ -107,7 +110,8 @@ func (ac *client) connect(ctx context.Context, deleteMe func()) {
107
110
var conn * grpc.ClientConn
108
111
var cli agent.AgentClient
109
112
110
- conn , cli , _ , err = k8sclient .ConnectToAgent (dialCtx , ac .info .PodName , ac .info .Namespace , uint16 (ac .info .ApiPort ))
113
+ ai := ac .info
114
+ conn , cli , _ , err = k8sclient .ConnectToAgent (dialCtx , ai .PodName , ai .Namespace , uint16 (ac .info .ApiPort ), types .UID (ai .PodId ))
111
115
if err != nil {
112
116
return
113
117
}
@@ -117,18 +121,16 @@ func (ac *client) connect(ctx context.Context, deleteMe func()) {
117
121
ac .cancelClient = func () {
118
122
// Need to run this in a separate thread to avoid deadlock.
119
123
go func () {
120
- ac . Lock ()
124
+ // Need to invalidate the pod connection cache here, because StatefulSets reuse the same pod name.
121
125
conn .Close ()
126
+ ac .Lock ()
122
127
ac .cancelClient = nil
123
128
ac .cli = nil
124
129
ac .infant .Store (true )
125
- for len (ac .ready ) > 0 {
126
- <- ac .ready
127
- }
128
130
ac .Unlock ()
129
131
}()
130
132
}
131
- intercepted := ac . info .Intercepted
133
+ intercepted := ai .Intercepted
132
134
ac .Unlock ()
133
135
if intercepted {
134
136
err = ac .startDialWatcherReady (ctx )
@@ -169,26 +171,29 @@ func (ac *client) cancel() bool {
169
171
return didCancel
170
172
}
171
173
172
- func (ac * client ) setIntercepted (ctx context.Context , k string , status bool ) {
173
- ac .RLock ()
174
- aci := ac .info .Intercepted
174
+ func (ac * client ) refresh (ctx context.Context , ai * manager.AgentPodInfo ) {
175
+ ac .Lock ()
176
+ oldStatus := ac .info .Intercepted
177
+ ac .info = ai
175
178
cdw := ac .cancelDialWatch
176
- ac .RUnlock ()
177
- if status {
178
- if aci {
179
- return
180
- }
181
- dlog .Debugf (ctx , "Agent %s changed to intercepted" , k )
179
+ ac .Unlock ()
180
+ if ai . Intercepted == oldStatus {
181
+ return
182
+ }
183
+ if ai . Intercepted {
184
+ dlog .Debugf (ctx , "Agent %s(%s) changed to intercepted" , ai . PodName , net . IP ( ai . PodIp ) )
182
185
go func () {
183
186
if err := ac .startDialWatcher (ctx ); err != nil {
184
- dlog .Errorf (ctx , "failed to start client watcher for %s: %v" , k , err )
187
+ dlog .Errorf (ctx , "failed to start client watcher for %s(%s) : %v" , ai . PodName , net . IP ( ai . PodIp ) , err )
185
188
}
186
189
}()
187
190
// This agent is now intercepting. Start a dial watcher.
188
- } else if aci && cdw != nil {
191
+ } else {
189
192
// This agent is no longer intercepting. Stop the dial watcher
190
- dlog .Debugf (ctx , "Agent %s changed to not intercepted" , k )
191
- cdw ()
193
+ dlog .Debugf (ctx , "Agent %s(%s) changed to not intercepted" , ai .PodName , net .IP (ai .PodIp ))
194
+ if cdw != nil {
195
+ cdw ()
196
+ }
192
197
}
193
198
}
194
199
@@ -203,7 +208,11 @@ func (ac *client) startDialWatcher(ctx context.Context) error {
203
208
func (ac * client ) startDialWatcherReady (ctx context.Context ) error {
204
209
ac .RLock ()
205
210
cli := ac .cli
211
+ running := ac .cancelDialWatch != nil
206
212
ac .RUnlock ()
213
+ if running {
214
+ return nil
215
+ }
207
216
if cli == nil {
208
217
return fmt .Errorf ("agent connection closed" )
209
218
}
@@ -218,7 +227,6 @@ func (ac *client) startDialWatcherReady(ctx context.Context) error {
218
227
}
219
228
220
229
ac .Lock ()
221
- ac .info .Intercepted = true
222
230
ac .cancelDialWatch = func () {
223
231
ac .Lock ()
224
232
ac .info .Intercepted = false
@@ -549,7 +557,7 @@ func (s *clients) updateClients(ctx context.Context, ais []*manager.AgentPodInfo
549
557
// Refresh current clients
550
558
for k , ai := range aim {
551
559
if ac , ok := s .clients .Load (k ); ok {
552
- ac .setIntercepted (ctx , k , ai . Intercepted )
560
+ ac .refresh (ctx , ai )
553
561
}
554
562
}
555
563
0 commit comments