Skip to content

Commit 98a6954

Browse files
[receiver/k8sobjectsreceiver] Refactor Validate to only validate static config (#39469)
#### Description Refactor Validate() to only validate static config without external calls to Kubernetes API server as discussed in #38851 (comment) Once this is merged, the PR linked above should introduce `error_mode` implemented in `start` too. #### Link to tracking issue It is related to the issue and follow-up PR mentioned above, but it doesn't fix the issue. #### Testing I've refactored the tests in config_test.go, so we are not testing object discovery and added new tests for additional failure modes. I've build a dev images, and validated that functionality is the same as before the refactor. #### Documentation This is a "breaking change" in a sense we are changing what we are validating in Validate. The new validation logic will not fail when objects are missing. The receiver will fail the same way in this case, but in `start`.
1 parent d26e2b7 commit 98a6954

File tree

7 files changed

+140
-114
lines changed

7 files changed

+140
-114
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: breaking
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: k8sobjectsreceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Check for K8s API objects existence on receiver startup and not during config validation.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [38803]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/k8sobjectsreceiver/config.go

-23
Original file line numberDiff line numberDiff line change
@@ -58,28 +58,7 @@ type Config struct {
5858
}
5959

6060
func (c *Config) Validate() error {
61-
validObjects, err := c.getValidObjects()
62-
if err != nil {
63-
return err
64-
}
6561
for _, object := range c.Objects {
66-
gvrs, ok := validObjects[object.Name]
67-
if !ok {
68-
availableResource := make([]string, len(validObjects))
69-
for k := range validObjects {
70-
availableResource = append(availableResource, k)
71-
}
72-
return fmt.Errorf("resource %v not found. Valid resources are: %v", object.Name, availableResource)
73-
}
74-
75-
gvr := gvrs[0]
76-
for i := range gvrs {
77-
if gvrs[i].Group == object.Group {
78-
gvr = gvrs[i]
79-
break
80-
}
81-
}
82-
8362
if object.Mode == "" {
8463
object.Mode = defaultMode
8564
} else if _, ok := modeMap[object.Mode]; !ok {
@@ -93,8 +72,6 @@ func (c *Config) Validate() error {
9372
if object.Mode == PullMode && len(object.ExcludeWatchType) != 0 {
9473
return errors.New("the Exclude config can only be used with watch mode")
9574
}
96-
97-
object.gvr = gvr
9875
}
9976
return nil
10077
}

receiver/k8sobjectsreceiver/config_test.go

+64-83
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@ import (
1212
"github.com/stretchr/testify/require"
1313
"go.opentelemetry.io/collector/component"
1414
"go.opentelemetry.io/collector/confmap/confmaptest"
15-
"go.opentelemetry.io/collector/confmap/xconfmap"
16-
"k8s.io/apimachinery/pkg/runtime/schema"
1715
apiWatch "k8s.io/apimachinery/pkg/watch"
1816

1917
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
@@ -40,29 +38,17 @@ func TestLoadConfig(t *testing.T) {
4038
Interval: time.Hour,
4139
FieldSelector: "status.phase=Running",
4240
LabelSelector: "environment in (production),tier in (frontend)",
43-
gvr: &schema.GroupVersionResource{
44-
Group: "",
45-
Version: "v1",
46-
Resource: "pods",
47-
},
4841
},
4942
{
50-
Name: "events",
51-
Mode: WatchMode,
52-
Namespaces: []string{"default"},
53-
Group: "events.k8s.io",
54-
ResourceVersion: "",
43+
Name: "events",
44+
Mode: WatchMode,
45+
Namespaces: []string{"default"},
46+
Group: "events.k8s.io",
5547
ExcludeWatchType: []apiWatch.EventType{
5648
apiWatch.Deleted,
5749
},
58-
gvr: &schema.GroupVersionResource{
59-
Group: "events.k8s.io",
60-
Version: "v1",
61-
Resource: "events",
62-
},
6350
},
6451
},
65-
makeDiscoveryClient: getMockDiscoveryClient,
6652
},
6753
},
6854
{
@@ -77,24 +63,13 @@ func TestLoadConfig(t *testing.T) {
7763
Mode: PullMode,
7864
ResourceVersion: "1",
7965
Interval: time.Hour,
80-
gvr: &schema.GroupVersionResource{
81-
Group: "",
82-
Version: "v1",
83-
Resource: "pods",
84-
},
8566
},
8667
{
8768
Name: "events",
8869
Mode: PullMode,
8970
Interval: time.Hour,
90-
gvr: &schema.GroupVersionResource{
91-
Group: "",
92-
Version: "v1",
93-
Resource: "events",
94-
},
9571
},
9672
},
97-
makeDiscoveryClient: getMockDiscoveryClient,
9873
},
9974
},
10075
{
@@ -110,34 +85,17 @@ func TestLoadConfig(t *testing.T) {
11085
Namespaces: []string{"default"},
11186
Group: "events.k8s.io",
11287
ResourceVersion: "",
113-
gvr: &schema.GroupVersionResource{
114-
Group: "events.k8s.io",
115-
Version: "v1",
116-
Resource: "events",
117-
},
11888
},
11989
{
12090
Name: "events",
12191
Mode: WatchMode,
12292
Namespaces: []string{"default"},
12393
Group: "events.k8s.io",
12494
ResourceVersion: "2",
125-
gvr: &schema.GroupVersionResource{
126-
Group: "events.k8s.io",
127-
Version: "v1",
128-
Resource: "events",
129-
},
13095
},
13196
},
132-
makeDiscoveryClient: getMockDiscoveryClient,
13397
},
13498
},
135-
{
136-
id: component.NewIDWithName(metadata.Type, "invalid_resource"),
137-
},
138-
{
139-
id: component.NewIDWithName(metadata.Type, "exclude_deleted_with_pull"),
140-
},
14199
}
142100

143101
for _, tt := range tests {
@@ -147,58 +105,81 @@ func TestLoadConfig(t *testing.T) {
147105

148106
factory := NewFactory()
149107
cfg := factory.CreateDefaultConfig().(*Config)
150-
cfg.makeDiscoveryClient = getMockDiscoveryClient
151108

152109
sub, err := cm.Sub(tt.id.String())
153110
require.NoError(t, err)
154111
require.NoError(t, sub.Unmarshal(cfg))
155112

156-
if tt.expected == nil {
157-
err = xconfmap.Validate(cfg)
158-
assert.Error(t, err)
159-
return
160-
}
161-
assert.NoError(t, xconfmap.Validate(cfg))
162113
assert.Equal(t, tt.expected.AuthType, cfg.AuthType)
163114
assert.Equal(t, tt.expected.Objects, cfg.Objects)
164115
})
165116
}
166117
}
167118

168-
func TestValidateResourceConflict(t *testing.T) {
169-
mockClient := newMockDynamicClient()
170-
rCfg := createDefaultConfig().(*Config)
171-
rCfg.makeDynamicClient = mockClient.getMockDynamicClient
172-
rCfg.makeDiscoveryClient = getMockDiscoveryClient
173-
174-
// Validate it should choose first gvr if group is not specified
175-
rCfg.Objects = []*K8sObjectsConfig{
119+
func TestValidate(t *testing.T) {
120+
tests := []struct {
121+
desc string
122+
cfg *Config
123+
expectedErr string
124+
}{
176125
{
177-
Name: "myresources",
178-
Mode: PullMode,
126+
desc: "invalid mode",
127+
cfg: &Config{
128+
Objects: []*K8sObjectsConfig{
129+
{
130+
Name: "pods",
131+
Mode: "invalid_mode",
132+
},
133+
},
134+
},
135+
expectedErr: "invalid mode: invalid_mode",
179136
},
180-
}
181-
182-
err := rCfg.Validate()
183-
require.NoError(t, err)
184-
assert.Equal(t, "group1", rCfg.Objects[0].gvr.Group)
185-
186-
// Validate it should choose gvr for specified group
187-
rCfg.Objects = []*K8sObjectsConfig{
188137
{
189-
Name: "myresources",
190-
Mode: PullMode,
191-
Group: "group2",
138+
desc: "exclude watch type with pull mode",
139+
cfg: &Config{
140+
Objects: []*K8sObjectsConfig{
141+
{
142+
Name: "pods",
143+
Mode: PullMode,
144+
ExcludeWatchType: []apiWatch.EventType{
145+
apiWatch.Deleted,
146+
},
147+
},
148+
},
149+
},
150+
expectedErr: "the Exclude config can only be used with watch mode",
151+
},
152+
{
153+
desc: "default mode is set",
154+
cfg: &Config{
155+
Objects: []*K8sObjectsConfig{
156+
{
157+
Name: "pods",
158+
},
159+
},
160+
},
161+
},
162+
{
163+
desc: "default interval for pull mode",
164+
cfg: &Config{
165+
Objects: []*K8sObjectsConfig{
166+
{
167+
Name: "pods",
168+
Mode: PullMode,
169+
},
170+
},
171+
},
192172
},
193173
}
194174

195-
err = rCfg.Validate()
196-
require.NoError(t, err)
197-
assert.Equal(t, "group2", rCfg.Objects[0].gvr.Group)
198-
}
199-
200-
func TestClientRequired(t *testing.T) {
201-
rCfg := createDefaultConfig().(*Config)
202-
err := rCfg.Validate()
203-
require.Error(t, err)
175+
for _, tt := range tests {
176+
t.Run(tt.desc, func(t *testing.T) {
177+
err := tt.cfg.Validate()
178+
if tt.expectedErr != "" {
179+
assert.EqualError(t, err, tt.expectedErr)
180+
return
181+
}
182+
assert.NoError(t, err)
183+
})
184+
}
204185
}

receiver/k8sobjectsreceiver/go.mod

-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ require (
1212
go.opentelemetry.io/collector/component v1.30.0
1313
go.opentelemetry.io/collector/component/componenttest v0.124.0
1414
go.opentelemetry.io/collector/confmap v1.30.0
15-
go.opentelemetry.io/collector/confmap/xconfmap v0.124.0
1615
go.opentelemetry.io/collector/consumer v1.30.0
1716
go.opentelemetry.io/collector/consumer/consumertest v0.124.0
1817
go.opentelemetry.io/collector/pdata v1.30.0

receiver/k8sobjectsreceiver/receiver.go

+38-6
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
type k8sobjectsreceiver struct {
3131
setting receiver.Settings
3232
config *Config
33+
objects []*K8sObjectsConfig
3334
stopperChanList []chan struct{}
3435
client dynamic.Interface
3536
consumer consumer.Logs
@@ -50,17 +51,21 @@ func newReceiver(params receiver.Settings, config *Config, consumer consumer.Log
5051
return nil, err
5152
}
5253

53-
for _, object := range config.Objects {
54-
object.exclude = make(map[apiWatch.EventType]bool)
55-
for _, item := range object.ExcludeWatchType {
56-
object.exclude[item] = true
54+
objects := make([]*K8sObjectsConfig, len(config.Objects))
55+
for i, obj := range config.Objects {
56+
copied := *obj // Copy the object
57+
objects[i] = &copied
58+
objects[i].exclude = make(map[apiWatch.EventType]bool)
59+
for _, item := range objects[i].ExcludeWatchType {
60+
objects[i].exclude[item] = true
5761
}
5862
}
5963

6064
return &k8sobjectsreceiver{
6165
setting: params,
62-
consumer: consumer,
6366
config: config,
67+
objects: objects,
68+
consumer: consumer,
6469
obsrecv: obsrecv,
6570
mu: sync.Mutex{},
6671
}, nil
@@ -72,12 +77,39 @@ func (kr *k8sobjectsreceiver) Start(ctx context.Context, _ component.Host) error
7277
return err
7378
}
7479
kr.client = client
80+
81+
// Validate objects against K8s API
82+
validObjects, err := kr.config.getValidObjects()
83+
if err != nil {
84+
return err
85+
}
86+
87+
for _, object := range kr.objects {
88+
gvrs, ok := validObjects[object.Name]
89+
if !ok {
90+
availableResource := make([]string, len(validObjects))
91+
for k := range validObjects {
92+
availableResource = append(availableResource, k)
93+
}
94+
return fmt.Errorf("resource %v not found. Valid resources are: %v", object.Name, availableResource)
95+
}
96+
97+
gvr := gvrs[0]
98+
for i := range gvrs {
99+
if gvrs[i].Group == object.Group {
100+
gvr = gvrs[i]
101+
break
102+
}
103+
}
104+
object.gvr = gvr
105+
}
106+
75107
kr.setting.Logger.Info("Object Receiver started")
76108

77109
cctx, cancel := context.WithCancel(ctx)
78110
kr.cancel = cancel
79111

80-
for _, object := range kr.config.Objects {
112+
for _, object := range kr.objects {
81113
kr.start(cctx, object)
82114
}
83115
return nil

receiver/k8sobjectsreceiver/receiver_test.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@ import (
2121
func TestNewReceiver(t *testing.T) {
2222
t.Parallel()
2323

24+
mockClient := newMockDynamicClient()
2425
rCfg := createDefaultConfig().(*Config)
25-
rCfg.makeDynamicClient = newMockDynamicClient().getMockDynamicClient
26+
rCfg.makeDynamicClient = mockClient.getMockDynamicClient
27+
rCfg.makeDiscoveryClient = getMockDiscoveryClient
28+
2629
r, err := newReceiver(
2730
receivertest.NewNopSettings(metadata.Type),
2831
rCfg,

0 commit comments

Comments
 (0)