Skip to content

[receiver/k8scluster] Add support for observing multiple namespaces #40220

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
6bdf3eb
add namespaces list to k8scluster receiver configuration
bacherfl May 20, 2025
04ef39c
add e2e test
bacherfl May 21, 2025
ec9c89f
fix check for length of namespaces list
bacherfl May 22, 2025
b881515
fix linting
bacherfl May 22, 2025
07948d9
add tests for metadata store, create separate namespace for new e2e test
bacherfl May 22, 2025
9db5230
fix copy paste error
bacherfl May 22, 2025
1685a4c
add license header in added file
bacherfl May 22, 2025
064fcd5
fix linting, cleanup
bacherfl May 22, 2025
48a38f2
ignore value for k8s.node.name in e2e tests
bacherfl May 22, 2025
1614811
ignore value for k8s.node.name in e2e tests
bacherfl May 22, 2025
5a66ed1
update documentation
bacherfl May 27, 2025
b19c971
add changelog entry
bacherfl May 27, 2025
8322f43
Merge branch 'main' into feat/k8scluster/namespaces
bacherfl May 27, 2025
19f9df6
Apply suggestions from code review
bacherfl Jun 2, 2025
1b070be
Merge branch 'main' into feat/k8scluster/namespaces
bacherfl Jun 2, 2025
a5d6d80
add comments to describe how the informer factories are organized
bacherfl Jun 2, 2025
6d9fb94
Merge branch 'main' into feat/k8scluster/namespaces
bacherfl Jun 4, 2025
421e0c5
reduce duplicated code, add more tests
bacherfl Jun 4, 2025
175cf54
remove ineffectual assign
bacherfl Jun 4, 2025
c7bdd4a
remove special logic for testing deprecated field
bacherfl Jun 6, 2025
a2008b4
Merge branch 'main' into feat/k8scluster/namespaces
bacherfl Jun 24, 2025
d46c712
Merge branch 'main' into feat/k8scluster/namespaces
bacherfl Jun 26, 2025
3055bb3
use a more descriptive name for the cluster wide informer store key
bacherfl Jun 26, 2025
57de91c
fix linting
bacherfl Jun 26, 2025
9f7bc50
fix linting
bacherfl Jun 26, 2025
08fd64b
Merge branch 'main' into feat/k8scluster/namespaces
bacherfl Jul 3, 2025
53bc7fc
Merge branch 'main' into feat/k8scluster/namespaces
bacherfl Jul 7, 2025
9f315a5
Merge branch 'main' into feat/k8scluster/namespaces
bacherfl Jul 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/k8scluster-namespace-list.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: k8sclusterreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add option `namespaces` for setting a list of namespaces to be observed by the receiver. This supersedes the `namespace` option which is now deprecated.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [40089]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
9 changes: 5 additions & 4 deletions receiver/k8sclusterreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ The following allocatable resource types are available.
- pods
- `metrics`: Allows to enable/disable metrics.
- `resource_attributes`: Allows to enable/disable resource attributes.
- `namespace`: Allows to observe resources for a particular namespace only. If this option is set to a non-empty string, `Nodes`, `Namespaces` and `ClusterResourceQuotas` will not be observed.
- `namespace` (deprecated, use `namespaces` instead): Allows to observe resources for a particular namespace only. If this option is set to a non-empty string, `Nodes`, `Namespaces` and `ClusterResourceQuotas` will not be observed.
- `namespaces`: Allows to observe resources for a list of given namespaces. If this option is set, `Nodes`, `Namespaces` and `ClusterResourceQuotas` will not be observed, as those are cluster-scoped resources.

Example:

Expand Down Expand Up @@ -286,16 +287,16 @@ subjects:
EOF
```

As an alternative to setting up a `ClusterRole`/`ClusterRoleBinding`, it is also possible to limit the observed resources to a
particular namespace by setting the `namespace` option of the receiver. This allows the collector to only rely on `Roles`/`RoleBindings`,
As an alternative to setting up a `ClusterRole`/`ClusterRoleBinding`, it is also possible to limit the observed resources to a list of
particular namespaces by setting the `namespaces` option of the receiver. This allows the collector to only rely on `Roles`/`RoleBindings`,
instead of granting the collector cluster-wide read access to resources.
Note however, that in this case the following resources will not be observed by the `k8sclusterreceiver`:

- `Nodes`
- `Namespaces`
- `ClusterResourceQuotas`

To use this approach, use the commands below to create the required `Role` and `RoleBinding`:
To use this approach, use the commands below to create the required `Role` and `RoleBinding` for each of the namespaces the collector should observe:

```bash
<<EOF | kubectl apply -f -
Expand Down
7 changes: 5 additions & 2 deletions receiver/k8sclusterreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@ type Config struct {
// MetricsBuilderConfig allows customizing scraped metrics/attributes representation.
metadata.MetricsBuilderConfig `mapstructure:",squash"`

// Namespace to fetch resources from. If this is set, certain cluster-wide resources such as Nodes or Namespaces
// Deprecated: This field is no longer supported, use cfg.Namespaces instead.
Namespace string `mapstructure:"namespace"`

// Namespaces to fetch resources from. If this is set, certain cluster-wide resources such as Nodes or Namespaces
// will not be able to be observed. Setting this option is recommended in environments where due to security restrictions
// the collector cannot be granted cluster-wide permissions.
Namespace string `mapstructure:"namespace"`
Namespaces []string `mapstructure:"namespaces"`

// K8sLeaderElector defines the reference to the k8s leader elector extension
// use this when k8s cluster receiver needs to be deployed in HA mode
Expand Down
106 changes: 100 additions & 6 deletions receiver/k8sclusterreceiver/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ import (
)

const (
expectedFileClusterScoped = "./testdata/e2e/cluster-scoped/expected.yaml"
expectedFileNamespaceScoped = "./testdata/e2e/namespace-scoped/expected.yaml"

testObjectsDirClusterScoped = "./testdata/e2e/cluster-scoped/testobjects"
testObjectsDirNamespaceScoped = "./testdata/e2e/namespace-scoped/testobjects"
testKubeConfig = "/tmp/kube-config-otelcol-e2e-testing"
expectedFileClusterScoped = "./testdata/e2e/cluster-scoped/expected.yaml"
expectedFileNamespaceScoped = "./testdata/e2e/namespace-scoped/expected.yaml"
expectedFileNamespaceScopedMultipleNamespaces = "./testdata/e2e/namespace-scoped-multiple-namespaces/expected.yaml"

testObjectsDirClusterScoped = "./testdata/e2e/cluster-scoped/testobjects"
testObjectsDirNamespaceScoped = "./testdata/e2e/namespace-scoped/testobjects"
testObjectsDirNamespaceScopedMultipleNamespaces = "./testdata/e2e/namespace-scoped-multiple-namespaces/testobjects"
testKubeConfig = "/tmp/kube-config-otelcol-e2e-testing"
)

// TestE2EClusterScoped tests the k8s cluster receiver with a real k8s cluster.
Expand Down Expand Up @@ -224,6 +226,98 @@ func TestE2ENamespaceScoped(t *testing.T) {
}, 3*time.Minute, 1*time.Second)
}

// TestE2ENamespaceScoped tests the k8s cluster receiver with a real k8s cluster.
// The test requires a prebuilt otelcontribcol image uploaded to a kind k8s cluster defined in
// `/tmp/kube-config-otelcol-e2e-testing`. Run the following command prior to running the test locally:
//
// kind create cluster --kubeconfig=/tmp/kube-config-otelcol-e2e-testing
// make docker-otelcontribcol
// KUBECONFIG=/tmp/kube-config-otelcol-e2e-testing kind load docker-image otelcontribcol:latest
func TestE2ENamespaceScopedMultipleNamespaces(t *testing.T) {
var expected pmetric.Metrics
expected, err := golden.ReadMetrics(expectedFileNamespaceScopedMultipleNamespaces)
require.NoError(t, err)

k8sClient, err := k8stest.NewK8sClient(testKubeConfig)
require.NoError(t, err)

// k8s test objs
testObjs, err := k8stest.CreateObjects(k8sClient, testObjectsDirNamespaceScopedMultipleNamespaces)
require.NoErrorf(t, err, "failed to create objects")

t.Cleanup(func() {
require.NoErrorf(t, k8stest.DeleteObjects(k8sClient, testObjs), "failed to delete objects")
})

metricsConsumer := new(consumertest.MetricsSink)
shutdownSink := startUpSink(t, metricsConsumer)
defer shutdownSink()

testID := uuid.NewString()[:8]
collectorObjs := k8stest.CreateCollectorObjects(t, k8sClient, testID, filepath.Join(".", "testdata", "e2e", "namespace-scoped-multiple-namespaces", "collector"), map[string]string{}, "")

t.Cleanup(func() {
for _, obj := range append(collectorObjs) {
require.NoErrorf(t, k8stest.DeleteObject(k8sClient, obj), "failed to delete object %s", obj.GetName())
}
})

// CronJob is scheduled to be executed every minute (on the full minute)
// This creates a delay and the resources deployed by CronJob (Job, Pod, Container)
// might be available later and won't make it to the resulting metrics, which may cause the test to fail
time.Sleep(calculateCronJobExecution())

wantEntries := 10 // Minimal number of metrics to wait for.
// the commented line below writes the received list of metrics to the expected.yaml
// golden.WriteMetrics(t, expectedFileNamespaceScopedMultipleNamespaces, metricsConsumer.AllMetrics()[len(metricsConsumer.AllMetrics())-1])
waitForData(t, wantEntries, metricsConsumer)

require.EventuallyWithT(t, func(tt *assert.CollectT) {
assert.NoError(tt, pmetrictest.CompareMetrics(expected, metricsConsumer.AllMetrics()[len(metricsConsumer.AllMetrics())-1],
pmetrictest.IgnoreTimestamp(),
pmetrictest.IgnoreStartTimestamp(),
pmetrictest.IgnoreMetricValues(
"k8s.container.cpu_request",
"k8s.container.memory_limit",
"k8s.container.memory_request",
"k8s.container.restarts",
"k8s.cronjob.active_jobs",
"k8s.deployment.available",
"k8s.deployment.desired",
"k8s.job.active_pods",
"k8s.job.desired_successful_pods",
"k8s.job.failed_pods",
"k8s.job.max_parallel_pods",
"k8s.hpa.current_replicas",
"k8s.job.successful_pods"),
pmetrictest.ChangeResourceAttributeValue("container.id", replaceWithStar),
pmetrictest.ChangeResourceAttributeValue("container.image.name", containerImageShorten),
pmetrictest.ChangeResourceAttributeValue("container.image.tag", replaceWithStar),
pmetrictest.ChangeResourceAttributeValue("k8s.cronjob.uid", replaceWithStar),
pmetrictest.ChangeResourceAttributeValue("k8s.daemonset.uid", replaceWithStar),
pmetrictest.ChangeResourceAttributeValue("k8s.deployment.name", shortenNames),
pmetrictest.ChangeResourceAttributeValue("k8s.deployment.uid", replaceWithStar),
pmetrictest.ChangeResourceAttributeValue("k8s.hpa.uid", replaceWithStar),
pmetrictest.ChangeResourceAttributeValue("k8s.job.name", shortenNames),
pmetrictest.ChangeResourceAttributeValue("k8s.job.uid", replaceWithStar),
pmetrictest.ChangeResourceAttributeValue("k8s.namespace.uid", replaceWithStar),
pmetrictest.ChangeResourceAttributeValue("k8s.node.uid", replaceWithStar),
pmetrictest.ChangeResourceAttributeValue("k8s.pod.name", shortenNames),
pmetrictest.ChangeResourceAttributeValue("k8s.pod.uid", replaceWithStar),
pmetrictest.ChangeResourceAttributeValue("k8s.replicaset.name", shortenNames),
pmetrictest.ChangeResourceAttributeValue("k8s.replicaset.uid", replaceWithStar),
pmetrictest.ChangeResourceAttributeValue("k8s.statefulset.uid", replaceWithStar),
pmetrictest.ChangeResourceAttributeValue("k8s.node.name", replaceWithStar),
pmetrictest.IgnoreScopeVersion(),
pmetrictest.IgnoreResourceMetricsOrder(),
pmetrictest.IgnoreMetricsOrder(),
pmetrictest.IgnoreScopeMetricsOrder(),
pmetrictest.IgnoreMetricDataPointsOrder(),
),
)
}, 3*time.Minute, 1*time.Second)
}

func calculateCronJobExecution() time.Duration {
// extract the number of second from the current timestamp
seconds := time.Now().Second()
Expand Down
24 changes: 12 additions & 12 deletions receiver/k8sclusterreceiver/internal/collection/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestCollectMetricData(t *testing.T) {
ms := metadata.NewStore()
var expectedRMs int

ms.Setup(gvk.Pod, &testutils.MockStore{
ms.Setup(gvk.Pod, metadata.ClusterWideInformerKey, &testutils.MockStore{
Cache: map[string]any{
"pod1-uid": testutils.NewPodWithContainer(
"1",
Expand All @@ -31,78 +31,78 @@ func TestCollectMetricData(t *testing.T) {
})
expectedRMs += 2 // 1 for pod, 1 for container

ms.Setup(gvk.Node, &testutils.MockStore{
ms.Setup(gvk.Node, metadata.ClusterWideInformerKey, &testutils.MockStore{
Cache: map[string]any{
"node1-uid": testutils.NewNode("1"),
"node2-uid": testutils.NewNode("2"),
},
})
expectedRMs += 2

ms.Setup(gvk.Namespace, &testutils.MockStore{
ms.Setup(gvk.Namespace, metadata.ClusterWideInformerKey, &testutils.MockStore{
Cache: map[string]any{
"namespace1-uid": testutils.NewNamespace("1"),
},
})
expectedRMs++

ms.Setup(gvk.ReplicationController, &testutils.MockStore{
ms.Setup(gvk.ReplicationController, metadata.ClusterWideInformerKey, &testutils.MockStore{
Cache: map[string]any{
"replicationcontroller1-uid": testutils.NewReplicationController("1"),
},
})
expectedRMs++

ms.Setup(gvk.ResourceQuota, &testutils.MockStore{
ms.Setup(gvk.ResourceQuota, metadata.ClusterWideInformerKey, &testutils.MockStore{
Cache: map[string]any{
"resourcequota1-uid": testutils.NewResourceQuota("1"),
},
})
expectedRMs++

ms.Setup(gvk.Deployment, &testutils.MockStore{
ms.Setup(gvk.Deployment, metadata.ClusterWideInformerKey, &testutils.MockStore{
Cache: map[string]any{
"deployment1-uid": testutils.NewDeployment("1"),
},
})
expectedRMs++

ms.Setup(gvk.ReplicaSet, &testutils.MockStore{
ms.Setup(gvk.ReplicaSet, metadata.ClusterWideInformerKey, &testutils.MockStore{
Cache: map[string]any{
"replicaset1-uid": testutils.NewReplicaSet("1"),
},
})
expectedRMs++

ms.Setup(gvk.DaemonSet, &testutils.MockStore{
ms.Setup(gvk.DaemonSet, metadata.ClusterWideInformerKey, &testutils.MockStore{
Cache: map[string]any{
"daemonset1-uid": testutils.NewDaemonset("1"),
},
})
expectedRMs++

ms.Setup(gvk.StatefulSet, &testutils.MockStore{
ms.Setup(gvk.StatefulSet, metadata.ClusterWideInformerKey, &testutils.MockStore{
Cache: map[string]any{
"statefulset1-uid": testutils.NewStatefulset("1"),
},
})
expectedRMs++

ms.Setup(gvk.Job, &testutils.MockStore{
ms.Setup(gvk.Job, metadata.ClusterWideInformerKey, &testutils.MockStore{
Cache: map[string]any{
"job1-uid": testutils.NewJob("1"),
},
})
expectedRMs++

ms.Setup(gvk.CronJob, &testutils.MockStore{
ms.Setup(gvk.CronJob, metadata.ClusterWideInformerKey, &testutils.MockStore{
Cache: map[string]any{
"cronjob1-uid": testutils.NewCronJob("1"),
},
})
expectedRMs++

ms.Setup(gvk.HorizontalPodAutoscaler, &testutils.MockStore{
ms.Setup(gvk.HorizontalPodAutoscaler, metadata.ClusterWideInformerKey, &testutils.MockStore{
Cache: map[string]any{
"horizontalpodautoscaler1-uid": testutils.NewHPA("1"),
},
Expand Down
26 changes: 14 additions & 12 deletions receiver/k8sclusterreceiver/internal/metadata/metadatastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,38 +8,40 @@ import (
"k8s.io/client-go/tools/cache"
)

const ClusterWideInformerKey = "<cluster-wide-informer-key>"

// Store keeps track of required caches exposed by informers.
// This store is used while collecting metadata about Pods to be able
// to correlate other Kubernetes objects with a Pod.
type Store struct {
stores map[schema.GroupVersionKind]cache.Store
stores map[schema.GroupVersionKind]map[string]cache.Store
}

// NewStore creates a new Store.
func NewStore() *Store {
return &Store{
stores: make(map[schema.GroupVersionKind]cache.Store),
stores: make(map[schema.GroupVersionKind]map[string]cache.Store),
}
}

// Get returns a cache.Store for a given GroupVersionKind.
func (ms *Store) Get(gvk schema.GroupVersionKind) cache.Store {
func (ms *Store) Get(gvk schema.GroupVersionKind) map[string]cache.Store {
return ms.stores[gvk]
}

// Setup tracks metadata of services, jobs and replicasets.
func (ms *Store) Setup(gvk schema.GroupVersionKind, store cache.Store) {
ms.stores[gvk] = store
func (ms *Store) Setup(gvk schema.GroupVersionKind, namespace string, store cache.Store) {
if _, ok := ms.stores[gvk]; !ok {
ms.stores[gvk] = make(map[string]cache.Store)
}
ms.stores[gvk][namespace] = store
}

// ForEach iterates over all objects in a given cache.Store.
func (ms *Store) ForEach(gvk schema.GroupVersionKind, f func(o any)) {
store := ms.Get(gvk)
if store == nil {
// This is normal, not all caches are set up, e.g. ClusterResourceQuota is only available in OpenShift.
return
}
for _, obj := range store.List() {
f(obj)
for _, store := range ms.stores[gvk] {
for _, obj := range store.List() {
f(obj)
}
}
}
Loading