Skip to content

Commit e658ee8

Browse files
linalinnidohalevi
authored and
idohalevi
committed
Add ability to collect pod labels to Kubernetes input (influxdata#6764)
1 parent 5e28453 commit e658ee8

File tree

4 files changed

+161
-41
lines changed

4 files changed

+161
-41
lines changed

plugins/inputs/kubernetes/README.md

+6-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Kubernetes Input Plugin
22

3-
This input plugin talks to the kubelet api using the `/stats/summary` endpoint to gather metrics about the running pods and containers for a single host. It is assumed that this plugin is running as part of a `daemonset` within a kubernetes installation. This means that telegraf is running on every node within the cluster. Therefore, you should configure this plugin to talk to its locally running kubelet.
3+
This input plugin talks to the kubelet api using the `/stats/summary` and `/pods` endpoint to gather metrics about the running pods and containers for a single host. It is assumed that this plugin is running as part of a `daemonset` within a kubernetes installation. This means that telegraf is running on every node within the cluster. Therefore, you should configure this plugin to talk to its locally running kubelet.
44

55
To find the ip address of the host you are running on you can issue a command like the following:
66

@@ -44,6 +44,11 @@ avoid cardinality issues:
4444
## OR
4545
# bearer_token_string = "abc_123"
4646

47+
# Labels to include and exclude
48+
# An empty array for include and exclude will include all labels
49+
# label_include = []
50+
# label_exclude = ["*"]
51+
4752
## Set response_timeout (default 5 seconds)
4853
# response_timeout = "5s"
4954

plugins/inputs/kubernetes/kubernetes.go

+87-36
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package kubernetes
33
import (
44
"encoding/json"
55
"fmt"
6+
"github.com/influxdata/telegraf/filter"
67
"io/ioutil"
78
"net/http"
89
"net/url"
@@ -23,6 +24,11 @@ type Kubernetes struct {
2324
BearerToken string `toml:"bearer_token"`
2425
BearerTokenString string `toml:"bearer_token_string"`
2526

27+
LabelInclude []string `toml:"label_include"`
28+
LabelExclude []string `toml:"label_exclude"`
29+
30+
labelFilter filter.Filter
31+
2632
// HTTP Timeout specified as a string - 3s, 1m, 1h
2733
ResponseTimeout internal.Duration
2834

@@ -42,6 +48,11 @@ var sampleConfig = `
4248
## OR
4349
# bearer_token_string = "abc_123"
4450
51+
# Labels to include and exclude
52+
# An empty array for include and exclude will include all labels
53+
# label_include = []
54+
# label_exclude = ["*"]
55+
4556
## Set response_timeout (default 5 seconds)
4657
# response_timeout = "5s"
4758
@@ -60,7 +71,10 @@ const (
6071

6172
func init() {
6273
inputs.Add("kubernetes", func() telegraf.Input {
63-
return &Kubernetes{}
74+
return &Kubernetes{
75+
LabelInclude: []string{},
76+
LabelExclude: []string{"*"},
77+
}
6478
})
6579
}
6680

@@ -75,6 +89,7 @@ func (k *Kubernetes) Description() string {
7589
}
7690

7791
func (k *Kubernetes) Init() error {
92+
7893
// If neither are provided, use the default service account.
7994
if k.BearerToken == "" && k.BearerTokenString == "" {
8095
k.BearerToken = defaultServiceAccountPath
@@ -88,6 +103,12 @@ func (k *Kubernetes) Init() error {
88103
k.BearerTokenString = strings.TrimSpace(string(token))
89104
}
90105

106+
labelFilter, err := filter.NewIncludeExcludeFilter(k.LabelInclude, k.LabelExclude)
107+
if err != nil {
108+
return err
109+
}
110+
k.labelFilter = labelFilter
111+
91112
return nil
92113
}
93114

@@ -107,48 +128,19 @@ func buildURL(endpoint string, base string) (*url.URL, error) {
107128
}
108129

109130
func (k *Kubernetes) gatherSummary(baseURL string, acc telegraf.Accumulator) error {
110-
url := fmt.Sprintf("%s/stats/summary", baseURL)
111-
var req, err = http.NewRequest("GET", url, nil)
112-
var resp *http.Response
113-
114-
tlsCfg, err := k.ClientConfig.TLSConfig()
131+
summaryMetrics := &SummaryMetrics{}
132+
err := k.LoadJson(fmt.Sprintf("%s/stats/summary", baseURL), summaryMetrics)
115133
if err != nil {
116134
return err
117135
}
118136

119-
if k.RoundTripper == nil {
120-
// Set default values
121-
if k.ResponseTimeout.Duration < time.Second {
122-
k.ResponseTimeout.Duration = time.Second * 5
123-
}
124-
k.RoundTripper = &http.Transport{
125-
TLSHandshakeTimeout: 5 * time.Second,
126-
TLSClientConfig: tlsCfg,
127-
ResponseHeaderTimeout: k.ResponseTimeout.Duration,
128-
}
129-
}
130-
131-
req.Header.Set("Authorization", "Bearer "+k.BearerTokenString)
132-
req.Header.Add("Accept", "application/json")
133-
134-
resp, err = k.RoundTripper.RoundTrip(req)
135-
if err != nil {
136-
return fmt.Errorf("error making HTTP request to %s: %s", url, err)
137-
}
138-
defer resp.Body.Close()
139-
140-
if resp.StatusCode != http.StatusOK {
141-
return fmt.Errorf("%s returned HTTP status %s", url, resp.Status)
142-
}
143-
144-
summaryMetrics := &SummaryMetrics{}
145-
err = json.NewDecoder(resp.Body).Decode(summaryMetrics)
137+
podInfos, err := k.gatherPodInfo(baseURL)
146138
if err != nil {
147-
return fmt.Errorf(`Error parsing response: %s`, err)
139+
return err
148140
}
149141
buildSystemContainerMetrics(summaryMetrics, acc)
150142
buildNodeMetrics(summaryMetrics, acc)
151-
buildPodMetrics(summaryMetrics, acc)
143+
buildPodMetrics(baseURL, summaryMetrics, podInfos, k.labelFilter, acc)
152144
return nil
153145
}
154146

@@ -200,7 +192,56 @@ func buildNodeMetrics(summaryMetrics *SummaryMetrics, acc telegraf.Accumulator)
200192
acc.AddFields("kubernetes_node", fields, tags)
201193
}
202194

203-
func buildPodMetrics(summaryMetrics *SummaryMetrics, acc telegraf.Accumulator) {
195+
func (k *Kubernetes) gatherPodInfo(baseURL string) ([]Metadata, error) {
196+
var podApi Pods
197+
err := k.LoadJson(fmt.Sprintf("%s/pods", baseURL), &podApi)
198+
if err != nil {
199+
return nil, err
200+
}
201+
var podInfos []Metadata
202+
for _, podMetadata := range podApi.Items {
203+
podInfos = append(podInfos, podMetadata.Metadata)
204+
}
205+
return podInfos, nil
206+
}
207+
208+
func (k *Kubernetes) LoadJson(url string, v interface{}) error {
209+
var req, err = http.NewRequest("GET", url, nil)
210+
var resp *http.Response
211+
tlsCfg, err := k.ClientConfig.TLSConfig()
212+
if err != nil {
213+
return err
214+
}
215+
if k.RoundTripper == nil {
216+
if k.ResponseTimeout.Duration < time.Second {
217+
k.ResponseTimeout.Duration = time.Second * 5
218+
}
219+
k.RoundTripper = &http.Transport{
220+
TLSHandshakeTimeout: 5 * time.Second,
221+
TLSClientConfig: tlsCfg,
222+
ResponseHeaderTimeout: k.ResponseTimeout.Duration,
223+
}
224+
}
225+
req.Header.Set("Authorization", "Bearer "+k.BearerTokenString)
226+
req.Header.Add("Accept", "application/json")
227+
resp, err = k.RoundTripper.RoundTrip(req)
228+
if err != nil {
229+
return fmt.Errorf("error making HTTP request to %s: %s", url, err)
230+
}
231+
defer resp.Body.Close()
232+
if resp.StatusCode != http.StatusOK {
233+
return fmt.Errorf("%s returned HTTP status %s", url, resp.Status)
234+
}
235+
236+
err = json.NewDecoder(resp.Body).Decode(v)
237+
if err != nil {
238+
return fmt.Errorf(`Error parsing response: %s`, err)
239+
}
240+
241+
return nil
242+
}
243+
244+
func buildPodMetrics(baseURL string, summaryMetrics *SummaryMetrics, podInfo []Metadata, labelFilter filter.Filter, acc telegraf.Accumulator) {
204245
for _, pod := range summaryMetrics.Pods {
205246
for _, container := range pod.Containers {
206247
tags := map[string]string{
@@ -209,6 +250,16 @@ func buildPodMetrics(summaryMetrics *SummaryMetrics, acc telegraf.Accumulator) {
209250
"container_name": container.Name,
210251
"pod_name": pod.PodRef.Name,
211252
}
253+
for _, info := range podInfo {
254+
if info.Name == pod.PodRef.Name && info.Namespace == pod.PodRef.Namespace {
255+
for k, v := range info.Labels {
256+
if labelFilter.Match(k) {
257+
tags[k] = v
258+
}
259+
}
260+
}
261+
}
262+
212263
fields := make(map[string]interface{})
213264
fields["cpu_usage_nanocores"] = container.CPU.UsageNanoCores
214265
fields["cpu_usage_core_nanoseconds"] = container.CPU.UsageCoreNanoSeconds
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package kubernetes
2+
3+
type Pods struct {
4+
Kind string `json:"kind"`
5+
ApiVersion string `json:"apiVersion"`
6+
Items []Item `json:"items"`
7+
}
8+
9+
type Item struct {
10+
Metadata Metadata `json:"metadata"`
11+
}
12+
13+
type Metadata struct {
14+
Name string `json:"name"`
15+
Namespace string `json:"namespace"`
16+
Labels map[string]string `json:"labels"`
17+
}

plugins/inputs/kubernetes/kubernetes_test.go

+51-4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package kubernetes
22

33
import (
44
"fmt"
5+
"github.com/influxdata/telegraf/filter"
56
"net/http"
67
"net/http/httptest"
78
"testing"
@@ -12,13 +13,23 @@ import (
1213

1314
func TestKubernetesStats(t *testing.T) {
1415
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
15-
w.WriteHeader(http.StatusOK)
16-
fmt.Fprintln(w, response)
16+
if r.RequestURI == "/stats/summary" {
17+
w.WriteHeader(http.StatusOK)
18+
fmt.Fprintln(w, responseStatsSummery)
19+
}
20+
if r.RequestURI == "/pods" {
21+
w.WriteHeader(http.StatusOK)
22+
fmt.Fprintln(w, responsePods)
23+
}
24+
1725
}))
1826
defer ts.Close()
1927

28+
labelFilter, _ := filter.NewIncludeExcludeFilter([]string{"app", "superkey"}, nil)
29+
2030
k := &Kubernetes{
21-
URL: ts.URL,
31+
URL: ts.URL,
32+
labelFilter: labelFilter,
2233
}
2334

2435
var acc testutil.Accumulator
@@ -89,6 +100,8 @@ func TestKubernetesStats(t *testing.T) {
89100
"container_name": "foocontainer",
90101
"namespace": "foons",
91102
"pod_name": "foopod",
103+
"app": "foo",
104+
"superkey": "foobar",
92105
}
93106
acc.AssertContainsTaggedFields(t, "kubernetes_pod_container", fields, tags)
94107

@@ -112,6 +125,8 @@ func TestKubernetesStats(t *testing.T) {
112125
"container_name": "stopped-container",
113126
"namespace": "foons",
114127
"pod_name": "stopped-pod",
128+
"app": "foo-stop",
129+
"superkey": "superfoo",
115130
}
116131
acc.AssertContainsTaggedFields(t, "kubernetes_pod_container", fields, tags)
117132

@@ -143,7 +158,39 @@ func TestKubernetesStats(t *testing.T) {
143158

144159
}
145160

146-
var response = `
161+
var responsePods = `
162+
{
163+
"kind": "PodList",
164+
"apiVersion": "v1",
165+
"metadata": {},
166+
"items": [
167+
{
168+
"metadata": {
169+
"name": "foopod",
170+
"namespace": "foons",
171+
"labels": {
172+
"superkey": "foobar",
173+
"app": "foo",
174+
"exclude": "exclude0"
175+
}
176+
}
177+
},
178+
{
179+
"metadata": {
180+
"name": "stopped-pod",
181+
"namespace": "foons",
182+
"labels": {
183+
"superkey": "superfoo",
184+
"app": "foo-stop",
185+
"exclude": "exclude1"
186+
}
187+
}
188+
}
189+
]
190+
}
191+
`
192+
193+
var responseStatsSummery = `
147194
{
148195
"node": {
149196
"nodeName": "node1",

0 commit comments

Comments
 (0)