@@ -4,21 +4,23 @@ import (
4
4
"context"
5
5
"fmt"
6
6
"net/http"
7
+ "slices"
7
8
"time"
8
9
9
10
"github.com/datawire/dlib/dlog"
10
11
"github.com/telepresenceio/telepresence/rpc/v2/manager"
11
12
"github.com/telepresenceio/telepresence/v2/pkg/forwarder"
13
+ "github.com/telepresenceio/telepresence/v2/pkg/iputil"
12
14
"github.com/telepresenceio/telepresence/v2/pkg/restapi"
13
15
"github.com/telepresenceio/telepresence/v2/pkg/tunnel"
14
16
)
15
17
16
18
type fwdState struct {
17
19
* state
18
- intercept InterceptTarget
19
- container string
20
- forwarder forwarder.Interceptor
21
- chosenIntercept * manager. InterceptInfo
20
+ intercept InterceptTarget
21
+ container string
22
+ forwarder forwarder.Interceptor
23
+ chosenInterceptId string
22
24
}
23
25
24
26
// NewInterceptState creates an InterceptState that performs intercepts by using an Interceptor which indiscriminately
@@ -73,118 +75,122 @@ func (pm *ProviderMux) CreateClientStream(ctx context.Context, tag tunnel.Tag, s
73
75
}
74
76
75
77
func (fs * fwdState ) HandleIntercepts (ctx context.Context , cepts []* manager.InterceptInfo ) []* manager.ReviewInterceptRequest {
76
- var myChoice , activeIntercept * manager.InterceptInfo
77
- if fs .chosenIntercept != nil {
78
- chosenID := fs .chosenIntercept .Id
79
- for _ , is := range cepts {
80
- if chosenID == is .Id {
81
- fs .chosenIntercept = is
82
- myChoice = is
78
+ var active []* manager.InterceptInfo
79
+ var waiting []* manager.InterceptInfo
80
+ for _ , is := range cepts {
81
+ switch is .Disposition {
82
+ case manager .InterceptDispositionType_ACTIVE :
83
+ active = append (active , is )
84
+ case manager .InterceptDispositionType_WAITING :
85
+ waiting = append (waiting , is )
86
+ }
87
+ }
88
+
89
+ var activeIntercept * manager.InterceptInfo
90
+ if fs .chosenInterceptId != "" {
91
+ for _ , is := range active {
92
+ if fs .chosenInterceptId == is .Id {
93
+ if ! is .Spec .Wiretap {
94
+ activeIntercept = is
95
+ }
83
96
break
84
97
}
85
98
}
99
+ }
100
+
101
+ if activeIntercept == nil {
102
+ fs .chosenInterceptId = ""
86
103
87
- if myChoice == nil {
88
- // Chosen intercept is not present in the snapshot
89
- fs .chosenIntercept = nil
90
- } else if myChoice .Disposition == manager .InterceptDispositionType_ACTIVE {
91
- // The chosen intercept still exists and is active
92
- activeIntercept = myChoice
93
- }
94
- } else {
95
104
// Attach to already ACTIVE intercept if there is one.
96
- for _ , cept := range cepts {
97
- if cept .Disposition == manager .InterceptDispositionType_ACTIVE {
98
- myChoice = cept
99
- fs .chosenIntercept = cept
100
- activeIntercept = cept
105
+ for _ , is := range active {
106
+ if ! is .Spec .Wiretap {
107
+ fs .chosenInterceptId = is .Id
108
+ activeIntercept = is
101
109
break
102
110
}
103
111
}
104
112
}
105
113
114
+ fwd := fs .forwarder
106
115
if fs .sessionInfo != nil {
107
116
// Update forwarding.
108
- fs . forwarder .SetStreamProvider (
117
+ fwd .SetStreamProvider (
109
118
& ProviderMux {
110
119
AgentProvider : fs ,
111
120
ManagerProvider : & tunnel.TrafficManagerStreamProvider {Manager : fs .ManagerClient (), AgentSessionID : tunnel .SessionID (fs .sessionInfo .SessionId )},
112
121
})
113
122
}
114
- fs . forwarder .SetIntercepting (activeIntercept )
123
+ fwd .SetIntercepting (activeIntercept )
115
124
116
- // Review waiting intercepts
117
- reviews := make ([]* manager.ReviewInterceptRequest , 0 , len (cepts ))
118
- for _ , cept := range cepts {
119
- container := cept .Spec .ContainerName
120
- if container == "" {
121
- container = fs .container
125
+ // Remove inactive wiretaps.
126
+ for _ , id := range fwd .WiretapIDs () {
127
+ if ! slices .ContainsFunc (active , func (ii * manager.InterceptInfo ) bool { return ii .Id == id && ii .Spec .Wiretap }) {
128
+ dlog .Debugf (ctx , "removing wiretap id %s" , id )
129
+ fwd .RemoveWiretap (id )
122
130
}
123
- cs := fs . containerStates [ container ]
124
- if cs == nil {
125
- reviews = append ( reviews , & manager. ReviewInterceptRequest {
126
- Id : cept . Id ,
127
- Disposition : manager . InterceptDispositionType_AGENT_ERROR ,
128
- Message : fmt . Sprintf ( "No match for container %q" , container ),
129
- MechanismArgsDesc : "all TCP connections" ,
130
- } )
131
- continue
131
+ }
132
+
133
+ // Add active wiretaps.
134
+ for _ , ii := range active {
135
+ if ii . Spec . Wiretap {
136
+ if ! fwd . HasWiretap ( ii . Id ) {
137
+ dlog . Debugf ( ctx , "adding wiretap id %s to %s" , ii . Id , iputil . JoinHostPort ( ii . Spec . TargetHost , uint16 ( ii . Spec . TargetPort )))
138
+ fwd . AddWiretap ( ii )
139
+ }
132
140
}
133
- if cept .Disposition == manager .InterceptDispositionType_WAITING {
141
+ }
142
+
143
+ // Review waiting intercepts
144
+ reviews := make ([]* manager.ReviewInterceptRequest , 0 , len (waiting ))
145
+ for _ , ii := range waiting {
146
+ switch {
147
+ case activeIntercept == nil || ii .Spec .Wiretap :
134
148
// This intercept is ready to be active
135
- switch {
136
- case cept == myChoice :
137
- // We've already chosen this one, but it's not active yet in this
138
- // snapshot. Let's go ahead and tell the manager to mark it ACTIVE.
139
- dlog .Infof (ctx , "Setting intercept %q as ACTIVE (again?)" , cept .Id )
140
- reviews = append (reviews , & manager.ReviewInterceptRequest {
141
- Id : cept .Id ,
142
- Disposition : manager .InterceptDispositionType_ACTIVE ,
143
- PodIp : fs .PodIP (),
144
- FtpPort : int32 (fs .FtpPort ()),
145
- SftpPort : int32 (fs .SftpPort ()),
146
- MountPoint : cs .MountPoint (),
147
- Mounts : cs .Mounts ().ToRPC (),
148
- MechanismArgsDesc : "all TCP connections" ,
149
- Environment : cs .Env (),
150
- })
151
- case fs .chosenIntercept == nil :
152
- // We don't have an intercept in play, so choose this one. All
153
- // agents will get intercepts in the same order every time, so
154
- // this will yield a consistent result. Note that the intercept
155
- // will not become active at this time. That will happen later,
156
- // once the manager assigns a port.
157
- dlog .Infof (ctx , "Setting intercept %q as ACTIVE" , cept .Id )
158
- fs .chosenIntercept = cept
159
- myChoice = cept
160
- reviews = append (reviews , & manager.ReviewInterceptRequest {
161
- Id : cept .Id ,
162
- Disposition : manager .InterceptDispositionType_ACTIVE ,
163
- PodIp : fs .PodIP (),
164
- FtpPort : int32 (fs .FtpPort ()),
165
- SftpPort : int32 (fs .SftpPort ()),
166
- MountPoint : cs .MountPoint (),
167
- Mounts : cs .Mounts ().ToRPC (),
168
- MechanismArgsDesc : "all TCP connections" ,
169
- Environment : cs .Env (),
170
- })
171
- default :
172
- // We already have an intercept in play, so reject this one.
173
- chosenID := fs .chosenIntercept .Id
174
- dlog .Infof (ctx , "Setting intercept %q as AGENT_ERROR; as it conflicts with %q as the current chosen-to-be-ACTIVE intercept" , cept .Id , chosenID )
175
- var msg string
176
- if fs .chosenIntercept .Disposition == manager .InterceptDispositionType_ACTIVE {
177
- msg = fmt .Sprintf ("Conflicts with the currently-served intercept %q" , chosenID )
178
- } else {
179
- msg = fmt .Sprintf ("Conflicts with the currently-waiting-to-be-served intercept %q" , chosenID )
180
- }
149
+ container := ii .Spec .ContainerName
150
+ if container == "" {
151
+ container = fs .container
152
+ }
153
+ cs := fs .containerStates [container ]
154
+ if cs == nil {
181
155
reviews = append (reviews , & manager.ReviewInterceptRequest {
182
- Id : cept .Id ,
156
+ Id : ii .Id ,
183
157
Disposition : manager .InterceptDispositionType_AGENT_ERROR ,
184
- Message : msg ,
158
+ Message : fmt . Sprintf ( "No match for container %q" , container ) ,
185
159
MechanismArgsDesc : "all TCP connections" ,
186
160
})
161
+ continue
162
+ }
163
+ if ! ii .Spec .Wiretap {
164
+ // We can only have one active intercept that isn't a wiretap
165
+ activeIntercept = ii
187
166
}
167
+ reviews = append (reviews , & manager.ReviewInterceptRequest {
168
+ Id : ii .Id ,
169
+ Disposition : manager .InterceptDispositionType_ACTIVE ,
170
+ PodIp : fs .PodIP (),
171
+ FtpPort : int32 (fs .FtpPort ()),
172
+ SftpPort : int32 (fs .SftpPort ()),
173
+ MountPoint : cs .MountPoint (),
174
+ Mounts : cs .Mounts ().ToRPC (),
175
+ MechanismArgsDesc : "all TCP connections" ,
176
+ Environment : cs .Env (),
177
+ })
178
+ default :
179
+ // We already have an intercept in play, so reject this one.
180
+ chosenID := activeIntercept .Id
181
+ dlog .Infof (ctx , "Setting intercept %q as AGENT_ERROR; as it conflicts with %q as the current chosen-to-be-ACTIVE intercept" , ii .Id , chosenID )
182
+ var msg string
183
+ if activeIntercept .Disposition == manager .InterceptDispositionType_ACTIVE {
184
+ msg = fmt .Sprintf ("Conflicts with the currently-served intercept %q" , chosenID )
185
+ } else {
186
+ msg = fmt .Sprintf ("Conflicts with the currently-waiting-to-be-served intercept %q" , chosenID )
187
+ }
188
+ reviews = append (reviews , & manager.ReviewInterceptRequest {
189
+ Id : ii .Id ,
190
+ Disposition : manager .InterceptDispositionType_AGENT_ERROR ,
191
+ Message : msg ,
192
+ MechanismArgsDesc : "all TCP connections" ,
193
+ })
188
194
}
189
195
}
190
196
return reviews
0 commit comments