Skip to content

Commit 7a68c73

Browse files
authored
[receiver/awscontainerinsightreceiver] Add new deployment and daemonset metrics (#20)
1 parent b482642 commit 7a68c73

File tree

12 files changed

+896
-41
lines changed

12 files changed

+896
-41
lines changed

internal/aws/containerinsight/const.go

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,14 @@ const (
9393
StatusConditionNetworkUnavailable = "status_condition_network_unavailable"
9494
StatusCapacityPods = "status_capacity_pods"
9595
StatusAllocatablePods = "status_allocatable_pods"
96+
StatusNumberAvailable = "status_number_available"
97+
StatusNumberUnavailable = "status_number_unavailable"
98+
StatusDesiredNumberScheduled = "status_desired_number_scheduled"
99+
StatusCurrentNumberScheduled = "status_current_number_scheduled"
100+
StatusReplicasAvailable = "status_replicas_available"
101+
StatusReplicasUnavailable = "status_replicas_unavailable"
102+
StatusReplicas = "status_replicas"
103+
SpecReplicas = "spec_replicas"
96104
StatusRunning = "status_running"
97105
StatusTerminated = "status_terminated"
98106
StatusWaiting = "status_waiting"
@@ -115,23 +123,25 @@ const (
115123
DiskIOTotal = "Total"
116124

117125
// Define the metric types
118-
TypeCluster = "Cluster"
119-
TypeClusterService = "ClusterService"
120-
TypeClusterNamespace = "ClusterNamespace"
121-
TypeService = "Service"
122-
TypeInstance = "Instance" // mean EC2 Instance in ECS
123-
TypeNode = "Node" // mean EC2 Instance in EKS
124-
TypeInstanceFS = "InstanceFS"
125-
TypeNodeFS = "NodeFS"
126-
TypeInstanceNet = "InstanceNet"
127-
TypeNodeNet = "NodeNet"
128-
TypeInstanceDiskIO = "InstanceDiskIO"
129-
TypeNodeDiskIO = "NodeDiskIO"
130-
TypePod = "Pod"
131-
TypePodNet = "PodNet"
132-
TypeContainer = "Container"
133-
TypeContainerFS = "ContainerFS"
134-
TypeContainerDiskIO = "ContainerDiskIO"
126+
TypeCluster = "Cluster"
127+
TypeClusterService = "ClusterService"
128+
TypeClusterDeployment = "ClusterDeployment"
129+
TypeClusterDaemonSet = "ClusterDaemonSet"
130+
TypeClusterNamespace = "ClusterNamespace"
131+
TypeService = "Service"
132+
TypeInstance = "Instance" // mean EC2 Instance in ECS
133+
TypeNode = "Node" // mean EC2 Instance in EKS
134+
TypeInstanceFS = "InstanceFS"
135+
TypeNodeFS = "NodeFS"
136+
TypeInstanceNet = "InstanceNet"
137+
TypeNodeNet = "NodeNet"
138+
TypeInstanceDiskIO = "InstanceDiskIO"
139+
TypeNodeDiskIO = "NodeDiskIO"
140+
TypePod = "Pod"
141+
TypePodNet = "PodNet"
142+
TypeContainer = "Container"
143+
TypeContainerFS = "ContainerFS"
144+
TypeContainerDiskIO = "ContainerDiskIO"
135145
// Special type for pause container
136146
// because containerd does not set container name pause container name to POD like docker does.
137147
TypeInfraContainer = "InfraContainer"
@@ -213,14 +223,22 @@ func init() {
213223
FSInodesfree: UnitCount,
214224
FSUtilization: UnitPercent,
215225

216-
// status metrics
226+
// status & spec metrics
217227
StatusConditionReady: UnitCount,
218228
StatusConditionDiskPressure: UnitCount,
219229
StatusConditionMemoryPressure: UnitCount,
220230
StatusConditionPIDPressure: UnitCount,
221231
StatusConditionNetworkUnavailable: UnitCount,
222232
StatusCapacityPods: UnitCount,
223233
StatusAllocatablePods: UnitCount,
234+
StatusReplicas: UnitCount,
235+
StatusReplicasAvailable: UnitCount,
236+
StatusReplicasUnavailable: UnitCount,
237+
StatusNumberAvailable: UnitCount,
238+
StatusNumberUnavailable: UnitCount,
239+
StatusDesiredNumberScheduled: UnitCount,
240+
StatusCurrentNumberScheduled: UnitCount,
241+
SpecReplicas: UnitCount,
224242

225243
// kube-state-metrics equivalents
226244
StatusRunning: UnitCount,

internal/aws/containerinsight/utils.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ func getPrefixByMetricType(mType string) string {
106106
service := "service_"
107107
cluster := "cluster_"
108108
namespace := "namespace_"
109+
deployment := "deployment_"
110+
daemonSet := "daemonset_"
109111

110112
switch mType {
111113
case TypeInstance:
@@ -142,6 +144,10 @@ func getPrefixByMetricType(mType string) string {
142144
prefix = service
143145
case TypeClusterNamespace:
144146
prefix = namespace
147+
case TypeClusterDeployment:
148+
prefix = deployment
149+
case TypeClusterDaemonSet:
150+
prefix = daemonSet
145151
default:
146152
log.Printf("E! Unexpected MetricType: %s", mType)
147153
}

internal/aws/k8s/k8sclient/clientset.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,16 @@ type replicaSetClientWithStopper interface {
194194
stopper
195195
}
196196

197+
type deploymentClientWithStopper interface {
198+
DeploymentClient
199+
stopper
200+
}
201+
202+
type daemonSetClientWithStopper interface {
203+
DaemonSetClient
204+
stopper
205+
}
206+
197207
type K8sClient struct {
198208
kubeConfigPath string
199209
initSyncPollInterval time.Duration
@@ -221,6 +231,12 @@ type K8sClient struct {
221231
rsMu sync.Mutex
222232
replicaSet replicaSetClientWithStopper
223233

234+
dMu sync.Mutex
235+
deployment deploymentClientWithStopper
236+
237+
dsMu sync.Mutex
238+
daemonSet daemonSetClientWithStopper
239+
224240
logger *zap.Logger
225241
}
226242

@@ -264,6 +280,8 @@ func (c *K8sClient) init(logger *zap.Logger, options ...Option) error {
264280
c.node = nil
265281
c.job = nil
266282
c.replicaSet = nil
283+
c.deployment = nil
284+
c.daemonSet = nil
267285

268286
return nil
269287
}
@@ -357,6 +375,46 @@ func (c *K8sClient) ShutdownReplicaSetClient() {
357375
})
358376
}
359377

378+
func (c *K8sClient) GetDeploymentClient() DeploymentClient {
379+
var err error
380+
c.dMu.Lock()
381+
if c.deployment == nil || reflect.ValueOf(c.deployment).IsNil() {
382+
c.deployment, err = newDeploymentClient(c.clientSet, c.logger, deploymentSyncCheckerOption(c.syncChecker))
383+
if err != nil {
384+
c.logger.Error("use an no-op deployment client instead because of error", zap.Error(err))
385+
c.deployment = &noOpDeploymentClient{}
386+
}
387+
}
388+
c.dMu.Unlock()
389+
return c.deployment
390+
}
391+
392+
func (c *K8sClient) ShutdownDeploymentClient() {
393+
shutdownClient(c.deployment, &c.dMu, func() {
394+
c.deployment = nil
395+
})
396+
}
397+
398+
func (c *K8sClient) GetDaemonSetClient() DaemonSetClient {
399+
var err error
400+
c.dsMu.Lock()
401+
if c.daemonSet == nil || reflect.ValueOf(c.daemonSet).IsNil() {
402+
c.daemonSet, err = newDaemonSetClient(c.clientSet, c.logger, daemonSetSyncCheckerOption(c.syncChecker))
403+
if err != nil {
404+
c.logger.Error("use an no-op daemonSet client instead because of error", zap.Error(err))
405+
c.daemonSet = &noOpDaemonSetClient{}
406+
}
407+
}
408+
c.dsMu.Unlock()
409+
return c.daemonSet
410+
}
411+
412+
func (c *K8sClient) ShutdownDaemonSetClient() {
413+
shutdownClient(c.daemonSet, &c.dsMu, func() {
414+
c.daemonSet = nil
415+
})
416+
}
417+
360418
func (c *K8sClient) GetClientSet() kubernetes.Interface {
361419
return c.clientSet
362420
}
@@ -371,6 +429,8 @@ func (c *K8sClient) Shutdown() {
371429
c.ShutdownNodeClient()
372430
c.ShutdownJobClient()
373431
c.ShutdownReplicaSetClient()
432+
c.ShutdownDeploymentClient()
433+
c.ShutdownDaemonSetClient()
374434

375435
// remove the current instance of k8s client from map
376436
for key, val := range optionsToK8sClient {
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
// Copyright OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package k8sclient // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient"
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"go.uber.org/zap"
21+
appsv1 "k8s.io/api/apps/v1"
22+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
23+
"k8s.io/apimachinery/pkg/runtime"
24+
"k8s.io/apimachinery/pkg/watch"
25+
"k8s.io/client-go/kubernetes"
26+
"k8s.io/client-go/tools/cache"
27+
"sync"
28+
)
29+
30+
type DaemonSetClient interface {
31+
// DaemonSetInfos contains the information about each daemon set in the cluster
32+
DaemonSetInfos() []*DaemonSetInfo
33+
}
34+
35+
type noOpDaemonSetClient struct {
36+
}
37+
38+
func (nd *noOpDaemonSetClient) DaemonSetInfos() []*DaemonSetInfo {
39+
return []*DaemonSetInfo{}
40+
}
41+
42+
func (nd *noOpDaemonSetClient) shutdown() {
43+
}
44+
45+
type daemonSetClientOption func(*daemonSetClient)
46+
47+
func daemonSetSyncCheckerOption(checker initialSyncChecker) daemonSetClientOption {
48+
return func(d *daemonSetClient) {
49+
d.syncChecker = checker
50+
}
51+
}
52+
53+
type daemonSetClient struct {
54+
stopChan chan struct{}
55+
stopped bool
56+
57+
store *ObjStore
58+
59+
syncChecker initialSyncChecker
60+
61+
mu sync.RWMutex
62+
daemonSetInfos []*DaemonSetInfo
63+
}
64+
65+
func (d *daemonSetClient) refresh() {
66+
d.mu.Lock()
67+
defer d.mu.Unlock()
68+
69+
var daemonSetInfos []*DaemonSetInfo
70+
objsList := d.store.List()
71+
for _, obj := range objsList {
72+
daemonSet, ok := obj.(*DaemonSetInfo)
73+
if !ok {
74+
continue
75+
}
76+
daemonSetInfos = append(daemonSetInfos, daemonSet)
77+
}
78+
79+
d.daemonSetInfos = daemonSetInfos
80+
}
81+
82+
func (d *daemonSetClient) DaemonSetInfos() []*DaemonSetInfo {
83+
if d.store.GetResetRefreshStatus() {
84+
d.refresh()
85+
}
86+
d.mu.RLock()
87+
defer d.mu.RUnlock()
88+
return d.daemonSetInfos
89+
}
90+
91+
func newDaemonSetClient(clientSet kubernetes.Interface, logger *zap.Logger, options ...daemonSetClientOption) (*daemonSetClient, error) {
92+
d := &daemonSetClient{
93+
stopChan: make(chan struct{}),
94+
}
95+
96+
for _, option := range options {
97+
option(d)
98+
}
99+
100+
ctx := context.Background()
101+
if _, err := clientSet.AppsV1().DaemonSets(metav1.NamespaceAll).List(ctx, metav1.ListOptions{}); err != nil {
102+
return nil, fmt.Errorf("cannot list DaemonSets. err: %w", err)
103+
}
104+
105+
d.store = NewObjStore(transformFuncDaemonSet, logger)
106+
lw := createDaemonSetListWatch(clientSet, metav1.NamespaceAll)
107+
reflector := cache.NewReflector(lw, &appsv1.DaemonSet{}, d.store, 0)
108+
109+
go reflector.Run(d.stopChan)
110+
111+
if d.syncChecker != nil {
112+
// check the init sync for potential connection issue
113+
d.syncChecker.Check(reflector, "DaemonSet initial sync timeout")
114+
}
115+
116+
return d, nil
117+
}
118+
119+
func (d *daemonSetClient) shutdown() {
120+
close(d.stopChan)
121+
d.stopped = true
122+
}
123+
124+
func transformFuncDaemonSet(obj interface{}) (interface{}, error) {
125+
daemonSet, ok := obj.(*appsv1.DaemonSet)
126+
if !ok {
127+
return nil, fmt.Errorf("input obj %v is not DaemonSet type", obj)
128+
}
129+
info := new(DaemonSetInfo)
130+
info.Name = daemonSet.Name
131+
info.Namespace = daemonSet.Namespace
132+
info.Status = &DaemonSetStatus{
133+
NumberAvailable: uint32(daemonSet.Status.NumberAvailable),
134+
NumberUnavailable: uint32(daemonSet.Status.NumberUnavailable),
135+
DesiredNumberScheduled: uint32(daemonSet.Status.DesiredNumberScheduled),
136+
CurrentNumberScheduled: uint32(daemonSet.Status.CurrentNumberScheduled),
137+
}
138+
return info, nil
139+
}
140+
141+
func createDaemonSetListWatch(client kubernetes.Interface, ns string) cache.ListerWatcher {
142+
ctx := context.Background()
143+
return &cache.ListWatch{
144+
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
145+
return client.AppsV1().DaemonSets(ns).List(ctx, opts)
146+
},
147+
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
148+
return client.AppsV1().DaemonSets(ns).Watch(ctx, opts)
149+
},
150+
}
151+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Copyright OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package k8sclient // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient"
16+
17+
type DaemonSetInfo struct {
18+
Name string
19+
Namespace string
20+
Status *DaemonSetStatus
21+
}
22+
23+
type DaemonSetStatus struct {
24+
NumberAvailable uint32
25+
NumberUnavailable uint32
26+
DesiredNumberScheduled uint32
27+
CurrentNumberScheduled uint32
28+
}

0 commit comments

Comments
 (0)