Skip to content

Add a telepresence wiretap command. #3825

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@
#
# For older changes, see CHANGELOG.OLD.md
items:
- version: 2.23.0
date: (TBD)
notes:
- type: feature
title: New telepresence wiretap command
body: >-
The new `telepresence wiretap` command introduces a read-only form of an `intercept` where the original
container will run unaffected while a copy of the wiretapped traffic is sent to the client.

Similar to an `ingest`, a `wiretap` will always enforce read-only status on all volume mounts, and since that
makes the `wiretap` completely read-only, there's no limit to how many simultaneous wiretaps that can be
served. In fact, a `wiretap` and an `intercept` on the same port can run simultaneously.
- version: 2.22.1
date: (TBD)
notes:
Expand Down
186 changes: 96 additions & 90 deletions cmd/traffic/cmd/agent/fwdstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,23 @@ import (
"context"
"fmt"
"net/http"
"slices"
"time"

"github.com/datawire/dlib/dlog"
"github.com/telepresenceio/telepresence/rpc/v2/manager"
"github.com/telepresenceio/telepresence/v2/pkg/forwarder"
"github.com/telepresenceio/telepresence/v2/pkg/iputil"
"github.com/telepresenceio/telepresence/v2/pkg/restapi"
"github.com/telepresenceio/telepresence/v2/pkg/tunnel"
)

type fwdState struct {
*state
intercept InterceptTarget
container string
forwarder forwarder.Interceptor
chosenIntercept *manager.InterceptInfo
intercept InterceptTarget
container string
forwarder forwarder.Interceptor
chosenInterceptId string
}

// NewInterceptState creates an InterceptState that performs intercepts by using an Interceptor which indiscriminately
Expand Down Expand Up @@ -73,118 +75,122 @@ func (pm *ProviderMux) CreateClientStream(ctx context.Context, tag tunnel.Tag, s
}

func (fs *fwdState) HandleIntercepts(ctx context.Context, cepts []*manager.InterceptInfo) []*manager.ReviewInterceptRequest {
var myChoice, activeIntercept *manager.InterceptInfo
if fs.chosenIntercept != nil {
chosenID := fs.chosenIntercept.Id
for _, is := range cepts {
if chosenID == is.Id {
fs.chosenIntercept = is
myChoice = is
var active []*manager.InterceptInfo
var waiting []*manager.InterceptInfo
for _, is := range cepts {
switch is.Disposition {
case manager.InterceptDispositionType_ACTIVE:
active = append(active, is)
case manager.InterceptDispositionType_WAITING:
waiting = append(waiting, is)
}
}

var activeIntercept *manager.InterceptInfo
if fs.chosenInterceptId != "" {
for _, is := range active {
if fs.chosenInterceptId == is.Id {
if !is.Spec.Wiretap {
activeIntercept = is
}
break
}
}
}

if activeIntercept == nil {
fs.chosenInterceptId = ""

if myChoice == nil {
// Chosen intercept is not present in the snapshot
fs.chosenIntercept = nil
} else if myChoice.Disposition == manager.InterceptDispositionType_ACTIVE {
// The chosen intercept still exists and is active
activeIntercept = myChoice
}
} else {
// Attach to already ACTIVE intercept if there is one.
for _, cept := range cepts {
if cept.Disposition == manager.InterceptDispositionType_ACTIVE {
myChoice = cept
fs.chosenIntercept = cept
activeIntercept = cept
for _, is := range active {
if !is.Spec.Wiretap {
fs.chosenInterceptId = is.Id
activeIntercept = is
break
}
}
}

fwd := fs.forwarder
if fs.sessionInfo != nil {
// Update forwarding.
fs.forwarder.SetStreamProvider(
fwd.SetStreamProvider(
&ProviderMux{
AgentProvider: fs,
ManagerProvider: &tunnel.TrafficManagerStreamProvider{Manager: fs.ManagerClient(), AgentSessionID: tunnel.SessionID(fs.sessionInfo.SessionId)},
})
}
fs.forwarder.SetIntercepting(activeIntercept)
fwd.SetIntercepting(activeIntercept)

// Review waiting intercepts
reviews := make([]*manager.ReviewInterceptRequest, 0, len(cepts))
for _, cept := range cepts {
container := cept.Spec.ContainerName
if container == "" {
container = fs.container
// Remove inactive wiretaps.
for _, id := range fwd.WiretapIDs() {
if !slices.ContainsFunc(active, func(ii *manager.InterceptInfo) bool { return ii.Id == id && ii.Spec.Wiretap }) {
dlog.Debugf(ctx, "removing wiretap id %s", id)
fwd.RemoveWiretap(id)
}
cs := fs.containerStates[container]
if cs == nil {
reviews = append(reviews, &manager.ReviewInterceptRequest{
Id: cept.Id,
Disposition: manager.InterceptDispositionType_AGENT_ERROR,
Message: fmt.Sprintf("No match for container %q", container),
MechanismArgsDesc: "all TCP connections",
})
continue
}

// Add active wiretaps.
for _, ii := range active {
if ii.Spec.Wiretap {
if !fwd.HasWiretap(ii.Id) {
dlog.Debugf(ctx, "adding wiretap id %s to %s", ii.Id, iputil.JoinHostPort(ii.Spec.TargetHost, uint16(ii.Spec.TargetPort)))
fwd.AddWiretap(ii)
}
}
if cept.Disposition == manager.InterceptDispositionType_WAITING {
}

// Review waiting intercepts
reviews := make([]*manager.ReviewInterceptRequest, 0, len(waiting))
for _, ii := range waiting {
switch {
case activeIntercept == nil || ii.Spec.Wiretap:
// This intercept is ready to be active
switch {
case cept == myChoice:
// We've already chosen this one, but it's not active yet in this
// snapshot. Let's go ahead and tell the manager to mark it ACTIVE.
dlog.Infof(ctx, "Setting intercept %q as ACTIVE (again?)", cept.Id)
reviews = append(reviews, &manager.ReviewInterceptRequest{
Id: cept.Id,
Disposition: manager.InterceptDispositionType_ACTIVE,
PodIp: fs.PodIP(),
FtpPort: int32(fs.FtpPort()),
SftpPort: int32(fs.SftpPort()),
MountPoint: cs.MountPoint(),
Mounts: cs.Mounts().ToRPC(),
MechanismArgsDesc: "all TCP connections",
Environment: cs.Env(),
})
case fs.chosenIntercept == nil:
// We don't have an intercept in play, so choose this one. All
// agents will get intercepts in the same order every time, so
// this will yield a consistent result. Note that the intercept
// will not become active at this time. That will happen later,
// once the manager assigns a port.
dlog.Infof(ctx, "Setting intercept %q as ACTIVE", cept.Id)
fs.chosenIntercept = cept
myChoice = cept
reviews = append(reviews, &manager.ReviewInterceptRequest{
Id: cept.Id,
Disposition: manager.InterceptDispositionType_ACTIVE,
PodIp: fs.PodIP(),
FtpPort: int32(fs.FtpPort()),
SftpPort: int32(fs.SftpPort()),
MountPoint: cs.MountPoint(),
Mounts: cs.Mounts().ToRPC(),
MechanismArgsDesc: "all TCP connections",
Environment: cs.Env(),
})
default:
// We already have an intercept in play, so reject this one.
chosenID := fs.chosenIntercept.Id
dlog.Infof(ctx, "Setting intercept %q as AGENT_ERROR; as it conflicts with %q as the current chosen-to-be-ACTIVE intercept", cept.Id, chosenID)
var msg string
if fs.chosenIntercept.Disposition == manager.InterceptDispositionType_ACTIVE {
msg = fmt.Sprintf("Conflicts with the currently-served intercept %q", chosenID)
} else {
msg = fmt.Sprintf("Conflicts with the currently-waiting-to-be-served intercept %q", chosenID)
}
container := ii.Spec.ContainerName
if container == "" {
container = fs.container
}
cs := fs.containerStates[container]
if cs == nil {
reviews = append(reviews, &manager.ReviewInterceptRequest{
Id: cept.Id,
Id: ii.Id,
Disposition: manager.InterceptDispositionType_AGENT_ERROR,
Message: msg,
Message: fmt.Sprintf("No match for container %q", container),
MechanismArgsDesc: "all TCP connections",
})
continue
}
if !ii.Spec.Wiretap {
// We can only have one active intercept that isn't a wiretap
activeIntercept = ii
}
reviews = append(reviews, &manager.ReviewInterceptRequest{
Id: ii.Id,
Disposition: manager.InterceptDispositionType_ACTIVE,
PodIp: fs.PodIP(),
FtpPort: int32(fs.FtpPort()),
SftpPort: int32(fs.SftpPort()),
MountPoint: cs.MountPoint(),
Mounts: cs.Mounts().ToRPC(),
MechanismArgsDesc: "all TCP connections",
Environment: cs.Env(),
})
default:
// We already have an intercept in play, so reject this one.
chosenID := activeIntercept.Id
dlog.Infof(ctx, "Setting intercept %q as AGENT_ERROR; as it conflicts with %q as the current chosen-to-be-ACTIVE intercept", ii.Id, chosenID)
var msg string
if activeIntercept.Disposition == manager.InterceptDispositionType_ACTIVE {
msg = fmt.Sprintf("Conflicts with the currently-served intercept %q", chosenID)
} else {
msg = fmt.Sprintf("Conflicts with the currently-waiting-to-be-served intercept %q", chosenID)
}
reviews = append(reviews, &manager.ReviewInterceptRequest{
Id: ii.Id,
Disposition: manager.InterceptDispositionType_AGENT_ERROR,
Message: msg,
MechanismArgsDesc: "all TCP connections",
})
}
}
return reviews
Expand Down
6 changes: 5 additions & 1 deletion cmd/traffic/cmd/manager/state/intercept.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,14 @@ func (s *state) preparePorts(ac *agentconfig.Sidecar, cn *agentconfig.Container,
return info.Disposition == rpc.InterceptDispositionType_ACTIVE && info.Spec.Agent == ac.AgentName && info.Spec.Namespace == ac.Namespace
})

if spec.Mechanism != "http" {
if !(spec.Wiretap || spec.Mechanism == "http") {
// Intercept is global, so it will conflict with any other intercept using the same port and protocol.
for _, otherIc := range otherIcs {
oSpec := otherIc.Spec // Validate that there's no port conflict
if oSpec.Wiretap {
// wiretaps will not cause conflicts
continue
}
for cp := range uniqueContainerPorts {
if cp.Port == uint16(oSpec.ContainerPort) && string(cp.Proto) == oSpec.Protocol {
name := oSpec.Name
Expand Down
2 changes: 1 addition & 1 deletion docs/compare/mirrord.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ This comparison chart applies to the Open Source editions of both products.
| Doesn't require injection of a sidecar | ✅ [^3] | ✅ |
| Supports connecting to clusters over a corporate VPN | ✅ | ✅ |
| Can intercept traffic | ✅ | ✅ |
| Can mirror traffic | ✅ | ✅ |
| Can ingest a container | ✅ | ❌ |
| Can replace a container | ✅ | ❌ |
| Can mirror traffic | ❌ | ✅ |
| Can act as a cluster VPN only | ✅ | ❌ |
| Will work with statically linked binaries | ✅ | ❌ |
| Runs natively on windows | ✅ | ❌ |
Expand Down
55 changes: 54 additions & 1 deletion docs/howtos/engage.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,24 @@ Telepresence offers three powerful ways to develop your services locally:
- You want your local service to only receive specific ingress traffic, while other traffic must be untouched.
- You want your remote container to continue processing other requests or background tasks.

### Wiretap
* **How it Works:**
- Adds a wiretap on a specific service port (or ports) and sends the data to the local workstation.
- Makes the remote environment of the targeted container available to the local workstation.
- Provides read-only access to the volumes mounted by the targeted container.
* **Impact:**
- A Traffic Agent is injected into the pods of the targeted workload.
- All containers keep on running.
- All traffic will still reach the remote service.
- Wiretapped traffic is rerouted to the local workstation.
* **Use-cases:**
- You need a solution where several developers can engage with the same service simultaneously.
- Your main focus is the service API rather than the cluster's pods and containers.
- You want your local service to only receive specific ingress traffic.
- You don't care about the responses sent by your local service.
- You don't want breakpoints in your local service to affect the remote service.
- You want to keep the impact that your local development has on the cluster to a minimum.

### Ingest
* **How it Works:**
- Makes the remote environment of the ingested container available to the local workstation.
Expand All @@ -46,7 +64,7 @@ Telepresence offers three powerful ways to develop your services locally:
- A Traffic Agent is injected into the pods of the targeted workload.
- No traffic is rerouted and all containers keep on running.
* **Use-cases:**
- You want to keep the impact of your local development to a minimum.
- You want to keep the impact that your local development has on the cluster to a minimum.
- You have don't need traffic being routed from the cluster, and read-only access to the container's volumes is ok.

## Prerequisites
Expand Down Expand Up @@ -199,6 +217,41 @@ You can now:
- Query services only exposed in your cluster's network.
- Set breakpoints in your IDE to investigate bugs.

## Wiretap your application

You can use the `telepresence wiretap` command when you want to wiretap the traffic for a specific service and send a
copy of it to your workstation. The `wiretap` is less intrusive than the `intercept`, because it does not interfere
with the traffic at all.

1. Connect to your cluster with `telepresence connect`.

2. Put a wiretap on all traffic going to the application's http port in your cluster and send it to port 8080 on your workstation.
```console
$ telepresence wiretap example-app --port 8080:http --env-file ~/example-app-intercept.env --mount /tmp/example-app-mounts
Using Deployment example-app
wiretapped
Wiretap name : example-app
State : ACTIVE
Workload kind : Deployment
Destination : 127.0.0.1:8080
Intercepting : all TCP connections
```

* For `--port`: specify the port the local instance of your application is running on, and optionally the remote port
that you want to wiretap. Telepresence will select the remote port automatically when there's only one service
port available to access the workload. You must specify the port to wiretap when the workload exposes multiple
ports. You can do this by specifying the port you want to wiretap after a colon in the `--port` argument (like in
the example), and/or by specifying the service you want to wiretap using the `--service` flag.

* For `--env-file`: specify a file path for Telepresence to write the environment variables that are set for the targeted
container.

3. Start your local application using the environment variables retrieved and the volumes that were mounted in the previous step.

You can now:
- Query services only exposed in your cluster's network.
- Set breakpoints in your IDE to investigate bugs.

### Running everything using Docker

This approach eliminates the need for root access and confines the Telepresence network interface and remote mounts
Expand Down
4 changes: 3 additions & 1 deletion docs/reference/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ in the telepresence client binary (`telepresence helm install`) or by using a He

## Traffic Agent

The Traffic Agent is a sidecar container that facilitates engagements. When a `replace`, `ingest` or `intercept` is first
The Traffic Agent is a sidecar container that facilitates engagements. When a `replace`, `ingest`, `intercept`, or `wiretap` is first
started, the Traffic Agent container is injected into the workload's pod(s). You can see the Traffic Agent's status by
running `telepresence list` or `kubectl describe pod <pod-name>`.

Depending on if an `replace` or `intercept` is active or not, the Traffic Agent will either route the incoming request
to your workstation, or it will pass it along to the container in the pod usually handling requests.

When a `wiretap` is active, the Traffic Agent will send a copy of the incoming requests to your workstation.

Please see [Traffic Agent Sidecar](engagements/sidecar.md) for details.
Loading
Loading