Skip to content

Commit 7028ba7

Browse files
committed
[extension/k8sobserver] Emit endpoint per Pod's container
Signed-off-by: ChrsMark <[email protected]>
1 parent 947d1b5 commit 7028ba7

File tree

12 files changed

+213
-15
lines changed

12 files changed

+213
-15
lines changed

.chloggen/k8sobserver_endpoints.yaml

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: k8sobserver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Emit endpoint per Pod's container
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [35491]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

extension/observer/endpoints.go

+27
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ const (
2424
PortType EndpointType = "port"
2525
// PodType is a pod endpoint.
2626
PodType EndpointType = "pod"
27+
// PodContainerType is a pod's container endpoint.
28+
PodContainerType EndpointType = "pod.container"
2729
// K8sServiceType is a service endpoint.
2830
K8sServiceType EndpointType = "k8s.service"
2931
// K8sIngressType is a ingress endpoint.
@@ -218,6 +220,31 @@ func (p *Pod) Type() EndpointType {
218220
return PodType
219221
}
220222

223+
// PodContainer is a discovered k8s pod's container
224+
type PodContainer struct {
225+
// Name of the container
226+
Name string
227+
// Image of the container
228+
Image string
229+
// ContainerID is the id of the container exposing the Endpoint
230+
ContainerID string
231+
// Pod is the k8s pod in which the container is running
232+
Pod Pod
233+
}
234+
235+
func (p *PodContainer) Env() EndpointEnv {
236+
return map[string]any{
237+
"container_name": p.Name,
238+
"container_id": p.ContainerID,
239+
"container_image": p.Image,
240+
"pod": p.Pod.Env(),
241+
}
242+
}
243+
244+
func (p *PodContainer) Type() EndpointType {
245+
return PodContainerType
246+
}
247+
221248
// Port is an endpoint that has a target as well as a port.
222249
type Port struct {
223250
// Name is the name of the container port.

extension/observer/k8sobserver/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
<!-- end autogenerated section -->
1414

1515
The `k8s_observer` is a [Receiver Creator](../../../receiver/receivercreator/README.md)-compatible "watch observer" that will detect and report
16-
Kubernetes pod, port, service, ingress and node endpoints via the Kubernetes API.
16+
Kubernetes pod, port, container, service, ingress and node endpoints via the Kubernetes API.
1717

1818
## Example Config
1919

extension/observer/k8sobserver/handler_test.go

+28-3
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,20 @@ func TestPodEndpointsAdded(t *testing.T) {
3636
UID: "pod-2-UID",
3737
Labels: map[string]string{"env": "prod"},
3838
},
39-
}, {
39+
},
40+
{
41+
ID: "test-1/pod-2-UID/container-2",
42+
Target: "1.2.3.4",
43+
Details: &observer.PodContainer{
44+
Name: "container-2",
45+
Image: "container-image-2",
46+
ContainerID: "a808232bb4a57d421bb16f20dc9ab2a441343cb0aae8c369dc375838c7a49fd7",
47+
Pod: observer.Pod{
48+
Name: "pod-2",
49+
Namespace: "default",
50+
UID: "pod-2-UID",
51+
Labels: map[string]string{"env": "prod"}}}},
52+
{
4053
ID: "test-1/pod-2-UID/https(443)",
4154
Target: "1.2.3.4:443",
4255
Details: &observer.Port{
@@ -73,8 +86,8 @@ func TestPodEndpointsChanged(t *testing.T) {
7386

7487
endpoints := th.ListEndpoints()
7588
require.ElementsMatch(t,
76-
[]observer.EndpointID{"test-1/pod-2-UID", "test-1/pod-2-UID/https(443)"},
77-
[]observer.EndpointID{endpoints[0].ID, endpoints[1].ID},
89+
[]observer.EndpointID{"test-1/pod-2-UID", "test-1/pod-2-UID/container-2", "test-1/pod-2-UID/https(443)"},
90+
[]observer.EndpointID{endpoints[0].ID, endpoints[1].ID, endpoints[2].ID},
7891
)
7992

8093
// Running state changed, one added and one removed.
@@ -90,6 +103,18 @@ func TestPodEndpointsChanged(t *testing.T) {
90103
Namespace: "default",
91104
UID: "pod-2-UID",
92105
Labels: map[string]string{"env": "prod", "updated-label": "true"}}},
106+
{
107+
ID: "test-1/pod-2-UID/container-2",
108+
Target: "1.2.3.4",
109+
Details: &observer.PodContainer{
110+
Name: "container-2",
111+
Image: "container-image-2",
112+
ContainerID: "a808232bb4a57d421bb16f20dc9ab2a441343cb0aae8c369dc375838c7a49fd7",
113+
Pod: observer.Pod{
114+
Name: "pod-2",
115+
Namespace: "default",
116+
UID: "pod-2-UID",
117+
Labels: map[string]string{"env": "prod", "updated-label": "true"}}}},
93118
{
94119
ID: "test-1/pod-2-UID/https(443)",
95120
Target: "1.2.3.4:443",

extension/observer/k8sobserver/k8s_fixtures_test.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ var container1StatusWaiting = v1.ContainerStatus{
7171
RestartCount: 1,
7272
Image: "container-image-1",
7373
ImageID: "12345",
74-
ContainerID: "82389",
74+
ContainerID: "containerd://a808232bb4a57d421bb16f20dc9ab2a441343cb0aae8c369dc375838c7a49fd7",
7575
Started: nil,
7676
}
7777

@@ -80,9 +80,10 @@ var container2StatusRunning = v1.ContainerStatus{
8080
State: v1.ContainerState{
8181
Running: &v1.ContainerStateRunning{StartedAt: metav1.Now()},
8282
},
83-
Ready: true,
84-
Image: "container-image-1",
85-
Started: pointerBool(true),
83+
Ready: true,
84+
Image: "container-image-1",
85+
Started: pointerBool(true),
86+
ContainerID: "containerd://a808232bb4a57d421bb16f20dc9ab2a441343cb0aae8c369dc375838c7a49fd7",
8687
}
8788

8889
var podWithNamedPorts = func() *v1.Pod {

extension/observer/k8sobserver/pod_endpoint.go

+41-4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package k8sobserver // import "github.com/open-telemetry/opentelemetry-collector
55

66
import (
77
"fmt"
8+
"strings"
89

910
v1 "k8s.io/api/core/v1"
1011

@@ -38,22 +39,41 @@ func convertPodToEndpoints(idNamespace string, pod *v1.Pod) []observer.Endpoint
3839
}}
3940

4041
// Map of running containers by name.
41-
containerRunning := map[string]bool{}
42+
runningContainers := map[string]RunningContainer{}
4243

4344
for _, container := range pod.Status.ContainerStatuses {
4445
if container.State.Running != nil {
45-
containerRunning[container.Name] = true
46+
runningContainers[container.Name] = containerIDWithRuntime(container)
4647
}
4748
}
4849

4950
// Create endpoint for each named container port.
5051
for _, container := range pod.Spec.Containers {
51-
if !containerRunning[container.Name] {
52+
var runningContainer RunningContainer
53+
var ok bool
54+
if runningContainer, ok = runningContainers[container.Name]; !ok {
5255
continue
5356
}
5457

58+
endpointID := observer.EndpointID(
59+
fmt.Sprintf(
60+
"%s/%s", podID, container.Name,
61+
),
62+
)
63+
endpoints = append(endpoints, observer.Endpoint{
64+
ID: endpointID,
65+
Target: podIP,
66+
Details: &observer.PodContainer{
67+
Name: container.Name,
68+
ContainerID: runningContainer.ID,
69+
Image: container.Image,
70+
Pod: podDetails,
71+
},
72+
})
73+
74+
// Create endpoint for each named container port.
5575
for _, port := range container.Ports {
56-
endpointID := observer.EndpointID(
76+
endpointID = observer.EndpointID(
5777
fmt.Sprintf(
5878
"%s/%s(%d)", podID, port.Name, port.ContainerPort,
5979
),
@@ -83,3 +103,20 @@ func getTransport(protocol v1.Protocol) observer.Transport {
83103
}
84104
return observer.ProtocolUnknown
85105
}
106+
107+
// containerIDWithRuntime parses the container ID to get the actual ID string
108+
func containerIDWithRuntime(c v1.ContainerStatus) RunningContainer {
109+
cID := c.ContainerID
110+
if cID != "" {
111+
parts := strings.Split(cID, "://")
112+
if len(parts) == 2 {
113+
return RunningContainer{parts[1], parts[0]}
114+
}
115+
}
116+
return RunningContainer{}
117+
}
118+
119+
type RunningContainer struct {
120+
ID string
121+
Runtime string
122+
}

extension/observer/k8sobserver/pod_endpoint_test.go

+12
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,18 @@ func TestPodObjectToPortEndpoint(t *testing.T) {
2121
Namespace: "default",
2222
UID: "pod-2-UID",
2323
Labels: map[string]string{"env": "prod"}}},
24+
{
25+
ID: "namespace/pod-2-UID/container-2",
26+
Target: "1.2.3.4",
27+
Details: &observer.PodContainer{
28+
Name: "container-2",
29+
Image: "container-image-2",
30+
ContainerID: "a808232bb4a57d421bb16f20dc9ab2a441343cb0aae8c369dc375838c7a49fd7",
31+
Pod: observer.Pod{
32+
Name: "pod-2",
33+
Namespace: "default",
34+
UID: "pod-2-UID",
35+
Labels: map[string]string{"env": "prod"}}}},
2436
{
2537
ID: "namespace/pod-2-UID/https(443)",
2638
Target: "1.2.3.4:443",

receiver/receivercreator/README.md

+61-1
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,18 @@ Note that the backticks below are not typos--they indicate the value is set dyna
109109
| k8s.pod.uid | \`pod.uid\` |
110110
| k8s.namespace.name | \`pod.namespace\` |
111111

112+
`type == "pod.container"`
113+
114+
| Resource Attribute | Default |
115+
|----------------------|---------------------|
116+
| k8s.pod.name | \`pod.name\` |
117+
| k8s.pod.uid | \`pod.uid\` |
118+
| k8s.namespace.name | \`pod.namespace\` |
119+
| container.name | \`name\` |
120+
| k8s.container.name | \`container_name\` |
121+
| container.image.name | \`container_image\` |
122+
| container.id | \`container_id\` |
123+
112124
`type == "container"`
113125

114126
| Resource Attribute | Default |
@@ -155,7 +167,7 @@ Similar to the per-endpoint type `resource_attributes` described above but for i
155167

156168
## Rule Expressions
157169

158-
Each rule must start with `type == ("pod"|"port"|"hostport"|"container"|"k8s.service"|"k8s.node"|"k8s.ingress") &&` such that the rule matches
170+
Each rule must start with `type == ("pod"|"port"|"pod.container"|"hostport"|"container"|"k8s.service"|"k8s.node"|"k8s.ingress") &&` such that the rule matches
159171
only one endpoint type. Depending on the type of endpoint the rule is
160172
targeting it will have different variables available.
161173

@@ -186,6 +198,21 @@ targeting it will have different variables available.
186198
| pod.labels | map of labels of the owning pod | Map with String key and value |
187199
| pod.annotations | map of annotations of the owning pod | Map with String key and value |
188200

201+
### Pod Container
202+
203+
| Variable | Description | Data Type |
204+
|-----------------|--------------------------------------|-------------------------------|
205+
| type | `"pod.container"` | String |
206+
| id | ID of source endpoint | String |
207+
| container_name | container name | String |
208+
| container_id | container id | String |
209+
| container_image | container image | String |
210+
| pod.name | name of the owning pod | String |
211+
| pod.namespace | namespace of the pod | String |
212+
| pod.uid | unique id of the pod | String |
213+
| pod.labels | map of labels of the owning pod | Map with String key and value |
214+
| pod.annotations | map of annotations of the owning pod | Map with String key and value |
215+
189216
### Host Port
190217

191218
| Variable | Description | Data Type |
@@ -359,6 +386,35 @@ receivers:
359386
- endpoint: '`scheme`://`endpoint`:`port``"prometheus.io/path" in annotations ? annotations["prometheus.io/path"] : "/health"`'
360387
method: GET
361388
collection_interval: 10s
389+
receiver_creator/logs:
390+
watch_observers: [ k8s_observer ]
391+
receivers:
392+
filelog/busybox:
393+
rule: type == "pod.container" && container_name == "busybox"
394+
config:
395+
include:
396+
- /var/log/pods/`pod.namespace`_`pod.name`_`pod.uid`/`container_name`/*.log
397+
include_file_name: false
398+
include_file_path: true
399+
operators:
400+
- id: container-parser
401+
type: container
402+
- type: add
403+
field: attributes.log.template
404+
value: busybox
405+
filelog/lazybox:
406+
rule: type == "pod.container" && container_name == "lazybox"
407+
config:
408+
include:
409+
- /var/log/pods/`pod.namespace`_`pod.name`_`pod.uid`/`container_name`/*.log
410+
include_file_name: false
411+
include_file_path: true
412+
operators:
413+
- id: container-parser
414+
type: container
415+
- type: add
416+
field: attributes.log.template
417+
value: lazybox
362418

363419
processors:
364420
exampleprocessor:
@@ -372,6 +428,10 @@ service:
372428
receivers: [receiver_creator/1, receiver_creator/2, receiver_creator/3, receiver_creator/4]
373429
processors: [exampleprocessor]
374430
exporters: [exampleexporter]
431+
logs:
432+
receivers: [receiver_creator/logs]
433+
processors: [exampleprocessor]
434+
exporters: [exampleexporter]
375435
extensions: [k8s_observer, host_observer]
376436
```
377437

receiver/receivercreator/config.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func (cfg *Config) Unmarshal(componentParser *confmap.Conf) error {
9292

9393
for endpointType := range cfg.ResourceAttributes {
9494
switch endpointType {
95-
case observer.ContainerType, observer.K8sServiceType, observer.K8sIngressType, observer.HostPortType, observer.K8sNodeType, observer.PodType, observer.PortType:
95+
case observer.ContainerType, observer.K8sServiceType, observer.K8sIngressType, observer.HostPortType, observer.K8sNodeType, observer.PodType, observer.PortType, observer.PodContainerType:
9696
default:
9797
return fmt.Errorf("resource attributes for unsupported endpoint type %q", endpointType)
9898
}

receiver/receivercreator/factory.go

+8
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,14 @@ func createDefaultConfig() component.Config {
5050
conventions.AttributeK8SPodUID: "`pod.uid`",
5151
conventions.AttributeK8SNamespaceName: "`pod.namespace`",
5252
},
53+
observer.PodContainerType: map[string]string{
54+
conventions.AttributeK8SPodName: "`pod.name`",
55+
conventions.AttributeK8SPodUID: "`pod.uid`",
56+
conventions.AttributeK8SNamespaceName: "`pod.namespace`",
57+
conventions.AttributeK8SContainerName: "`container_name`",
58+
conventions.AttributeContainerID: "`container_id`",
59+
conventions.AttributeContainerImageName: "`container_image`",
60+
},
5361
observer.ContainerType: map[string]string{
5462
conventions.AttributeContainerName: "`name`",
5563
conventions.AttributeContainerImageName: "`image`",

receiver/receivercreator/observerhandler.go

+1
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ func (obs *observerHandler) OnAdd(added []observer.Endpoint) {
103103
obs.params.TelemetrySettings.Logger.Error("unable to resolve template config", zap.String("receiver", template.id.String()), zap.Error(err))
104104
continue
105105
}
106+
obs.params.TelemetrySettings.Logger.Debug("resolved config", zap.String("receiver", template.id.String()), zap.Any("config", resolvedConfig))
106107

107108
discoveredCfg := userConfigMap{}
108109
// If user didn't set endpoint set to default value as well as

receiver/receivercreator/rules.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ type rule struct {
2222

2323
// ruleRe is used to verify the rule starts type check.
2424
var ruleRe = regexp.MustCompile(
25-
fmt.Sprintf(`^type\s*==\s*(%q|%q|%q|%q|%q|%q|%q)`, observer.PodType, observer.K8sServiceType, observer.K8sIngressType, observer.PortType, observer.HostPortType, observer.ContainerType, observer.K8sNodeType),
25+
fmt.Sprintf(`^type\s*==\s*(%q|%q|%q|%q|%q|%q|%q|%q)`, observer.PodType, observer.K8sServiceType, observer.K8sIngressType, observer.PortType, observer.PodContainerType, observer.HostPortType, observer.ContainerType, observer.K8sNodeType),
2626
)
2727

2828
// newRule creates a new rule instance.

0 commit comments

Comments
 (0)