Skip to content

Commit 6b9d125

Browse files
authored
Statically validate connector config during dry-run. (#12712)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description As described in [#8721](#8721), the current `validate` command cannot capture improper connector usage errors during a dry run. One proposed solution _(as mentioned in [this comment](#8721 (comment) is to surface configuration errors by instantiating the service during the dry run. However, reviewers preferred statically verifying the errors over instantiating the service (see the [PR #12488 comments](#12488)). The validation logic for connectors is located in the [`service/internal/graph`](https://github.com/open-telemetry/opentelemetry-collector/blob/1daa315455d02bac90185027878d858ba08a0f07/service/internal/graph) package. In the discussion of [PR #12681](#12681), it was concluded that instantiating the graph is the better approach (see [this comment](#12681 (comment))). Finally, this PR uses the `graph.Build()` method to surface configuration errors during the dry run. <!-- Issue number if applicable --> #### Link to tracking issue Fixes #8721 #12535 <!--Describe what testing was performed and which tests were added.--> #### Testing <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.--> --------- Signed-off-by: sudipto baral <[email protected]>
1 parent 3c5c8b1 commit 6b9d125

10 files changed

+346
-1
lines changed
+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: otelcol
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "Enhance config validation using <validate> command to capture all validation errors that prevents the collector from starting."
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [8721]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [user]

otelcol/collector.go

+17-1
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,23 @@ func (col *Collector) DryRun(ctx context.Context) error {
272272
return fmt.Errorf("failed to get config: %w", err)
273273
}
274274

275-
return xconfmap.Validate(cfg)
275+
if err = xconfmap.Validate(cfg); err != nil {
276+
return err
277+
}
278+
279+
return service.Validate(ctx, service.Settings{
280+
BuildInfo: col.set.BuildInfo,
281+
ReceiversConfigs: cfg.Receivers,
282+
ReceiversFactories: factories.Receivers,
283+
ProcessorsConfigs: cfg.Processors,
284+
ProcessorsFactories: factories.Processors,
285+
ExportersConfigs: cfg.Exporters,
286+
ExportersFactories: factories.Exporters,
287+
ConnectorsConfigs: cfg.Connectors,
288+
ConnectorsFactories: factories.Connectors,
289+
}, service.Config{
290+
Pipelines: cfg.Service.Pipelines,
291+
})
276292
}
277293

278294
func newFallbackLogger(options []zap.Option) (*zap.Logger, error) {

otelcol/collector_test.go

+24
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,30 @@ func TestCollectorDryRun(t *testing.T) {
462462
},
463463
expectedErr: `service::pipelines::traces: references processor "invalid" which is not configured`,
464464
},
465+
"invalid_connector_use_unused_exp": {
466+
settings: CollectorSettings{
467+
BuildInfo: component.NewDefaultBuildInfo(),
468+
Factories: nopFactories,
469+
ConfigProviderSettings: newDefaultConfigProviderSettings(t, []string{filepath.Join("testdata", "otelcol-invalid-connector-unused-exp.yaml")}),
470+
},
471+
expectedErr: `failed to build pipelines: connector "nop/connector1" used as receiver in [logs/in2] pipeline but not used in any supported exporter pipeline`,
472+
},
473+
"invalid_connector_use_unused_rec": {
474+
settings: CollectorSettings{
475+
BuildInfo: component.NewDefaultBuildInfo(),
476+
Factories: nopFactories,
477+
ConfigProviderSettings: newDefaultConfigProviderSettings(t, []string{filepath.Join("testdata", "otelcol-invalid-connector-unused-rec.yaml")}),
478+
},
479+
expectedErr: `failed to build pipelines: connector "nop/connector1" used as exporter in [logs/in2] pipeline but not used in any supported receiver pipeline`,
480+
},
481+
"cyclic_connector": {
482+
settings: CollectorSettings{
483+
BuildInfo: component.NewDefaultBuildInfo(),
484+
Factories: nopFactories,
485+
ConfigProviderSettings: newDefaultConfigProviderSettings(t, []string{filepath.Join("testdata", "otelcol-cyclic-connector.yaml")}),
486+
},
487+
expectedErr: `failed to build pipelines: cycle detected: connector "nop/forward" (traces to traces) -> connector "nop/forward" (traces to traces)`,
488+
},
465489
}
466490

467491
for name, test := range tests {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
receivers:
2+
nop:
3+
4+
exporters:
5+
nop:
6+
7+
connectors:
8+
nop/forward:
9+
10+
service:
11+
pipelines:
12+
traces/in:
13+
receivers: [nop/forward]
14+
processors: [ ]
15+
exporters: [nop/forward]
16+
traces/out:
17+
receivers: [nop/forward]
18+
processors: [ ]
19+
exporters: [nop/forward]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
receivers:
2+
nop:
3+
4+
exporters:
5+
nop:
6+
7+
connectors:
8+
nop/connector1:
9+
10+
service:
11+
pipelines:
12+
logs/in1:
13+
receivers: [nop]
14+
processors: [ ]
15+
exporters: [nop]
16+
logs/in2:
17+
receivers: [nop/connector1]
18+
processors: [ ]
19+
exporters: [nop]
20+
logs/out:
21+
receivers: [nop]
22+
processors: [ ]
23+
exporters: [nop]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
receivers:
2+
nop:
3+
4+
exporters:
5+
nop:
6+
7+
connectors:
8+
nop/connector1:
9+
10+
service:
11+
pipelines:
12+
logs/in1:
13+
receivers: [nop]
14+
processors: [ ]
15+
exporters: [nop]
16+
logs/in2:
17+
receivers: [nop]
18+
processors: [ ]
19+
exporters: [nop/connector1]
20+
logs/out:
21+
receivers: [nop]
22+
processors: [ ]
23+
exporters: [nop]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
receivers:
2+
nop:
3+
4+
exporters:
5+
nop:
6+
7+
connectors:
8+
nop/connector1:
9+
10+
service:
11+
pipelines:
12+
logs/in1:
13+
receivers: [nop]
14+
processors: [ ]
15+
exporters: [nop]
16+
logs/in2:
17+
receivers: [nop]
18+
processors: [ ]
19+
exporters: [nop/connector1]
20+
logs/out:
21+
receivers: [nop/connector1]
22+
processors: [ ]
23+
exporters: [nop]

otelcol/unmarshal_dry_run_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ func TestDryRunWithExpandedValues(t *testing.T) {
7171
mockMap: map[string]string{
7272
"number": "123",
7373
},
74+
expectErr: true,
7475
},
7576
{
7677
name: "string that looks like a bool",

service/service.go

+25
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ import (
1414
config "go.opentelemetry.io/contrib/otelconf/v0.3.0"
1515
"go.opentelemetry.io/otel/log"
1616
"go.opentelemetry.io/otel/metric"
17+
noopmetric "go.opentelemetry.io/otel/metric/noop"
1718
sdkresource "go.opentelemetry.io/otel/sdk/resource"
19+
nooptrace "go.opentelemetry.io/otel/trace/noop"
1820
"go.uber.org/multierr"
1921
"go.uber.org/zap"
2022

@@ -535,3 +537,26 @@ func configureViews(level configtelemetry.Level) []config.View {
535537
func ptr[T any](v T) *T {
536538
return &v
537539
}
540+
541+
// Validate verifies the graph by calling the internal graph.Build.
542+
func Validate(ctx context.Context, set Settings, cfg Config) error {
543+
tel := component.TelemetrySettings{
544+
Logger: zap.NewNop(),
545+
TracerProvider: nooptrace.NewTracerProvider(),
546+
MeterProvider: noopmetric.NewMeterProvider(),
547+
Resource: pcommon.NewResource(),
548+
}
549+
_, err := graph.Build(ctx, graph.Settings{
550+
Telemetry: tel,
551+
BuildInfo: set.BuildInfo,
552+
ReceiverBuilder: builders.NewReceiver(set.ReceiversConfigs, set.ReceiversFactories),
553+
ProcessorBuilder: builders.NewProcessor(set.ProcessorsConfigs, set.ProcessorsFactories),
554+
ExporterBuilder: builders.NewExporter(set.ExportersConfigs, set.ExportersFactories),
555+
ConnectorBuilder: builders.NewConnector(set.ConnectorsConfigs, set.ConnectorsFactories),
556+
PipelineConfigs: cfg.Pipelines,
557+
})
558+
if err != nil {
559+
return fmt.Errorf("failed to build pipelines: %w", err)
560+
}
561+
return nil
562+
}

service/service_test.go

+166
Original file line numberDiff line numberDiff line change
@@ -741,3 +741,169 @@ func newConfigWatcherExtensionFactory(name component.Type) extension.Factory {
741741
func newPtr[T int | string](str T) *T {
742742
return &str
743743
}
744+
745+
func TestValidateGraph(t *testing.T) {
746+
testCases := map[string]struct {
747+
connectorCfg map[component.ID]component.Config
748+
receiverCfg map[component.ID]component.Config
749+
exporterCfg map[component.ID]component.Config
750+
pipelinesCfg pipelines.Config
751+
expectedError string
752+
}{
753+
"Valid connector usage": {
754+
connectorCfg: map[component.ID]component.Config{
755+
component.NewIDWithName(nopType, "connector1"): &struct{}{},
756+
},
757+
receiverCfg: map[component.ID]component.Config{
758+
component.NewID(nopType): &struct{}{},
759+
},
760+
exporterCfg: map[component.ID]component.Config{
761+
component.NewID(nopType): &struct{}{},
762+
},
763+
pipelinesCfg: pipelines.Config{
764+
pipeline.NewIDWithName(pipeline.SignalLogs, "in"): {
765+
Receivers: []component.ID{component.NewID(nopType)},
766+
Processors: []component.ID{},
767+
Exporters: []component.ID{component.NewIDWithName(nopType, "connector1")},
768+
},
769+
pipeline.NewIDWithName(pipeline.SignalLogs, "out"): {
770+
Receivers: []component.ID{component.NewIDWithName(nopType, "connector1")},
771+
Processors: []component.ID{},
772+
Exporters: []component.ID{component.NewID(nopType)},
773+
},
774+
},
775+
expectedError: "",
776+
},
777+
"Valid without Connector": {
778+
receiverCfg: map[component.ID]component.Config{
779+
component.NewID(nopType): &struct{}{},
780+
},
781+
exporterCfg: map[component.ID]component.Config{
782+
component.NewID(nopType): &struct{}{},
783+
},
784+
pipelinesCfg: pipelines.Config{
785+
pipeline.NewIDWithName(pipeline.SignalLogs, "in"): {
786+
Receivers: []component.ID{component.NewID(nopType)},
787+
Processors: []component.ID{},
788+
Exporters: []component.ID{component.NewID(nopType)},
789+
},
790+
pipeline.NewIDWithName(pipeline.SignalLogs, "out"): {
791+
Receivers: []component.ID{component.NewID(nopType)},
792+
Processors: []component.ID{},
793+
Exporters: []component.ID{component.NewID(nopType)},
794+
},
795+
},
796+
expectedError: "",
797+
},
798+
"Connector used as exporter but not as receiver": {
799+
connectorCfg: map[component.ID]component.Config{
800+
component.NewIDWithName(nopType, "connector1"): &struct{}{},
801+
},
802+
receiverCfg: map[component.ID]component.Config{
803+
component.NewID(nopType): &struct{}{},
804+
},
805+
exporterCfg: map[component.ID]component.Config{
806+
component.NewID(nopType): &struct{}{},
807+
},
808+
pipelinesCfg: pipelines.Config{
809+
pipeline.NewIDWithName(pipeline.SignalLogs, "in1"): {
810+
Receivers: []component.ID{component.NewID(nopType)},
811+
Processors: []component.ID{},
812+
Exporters: []component.ID{component.NewID(nopType)},
813+
},
814+
pipeline.NewIDWithName(pipeline.SignalLogs, "in2"): {
815+
Receivers: []component.ID{component.NewID(nopType)},
816+
Processors: []component.ID{},
817+
Exporters: []component.ID{component.NewIDWithName(nopType, "connector1")},
818+
},
819+
pipeline.NewIDWithName(pipeline.SignalLogs, "out"): {
820+
Receivers: []component.ID{component.NewID(nopType)},
821+
Processors: []component.ID{},
822+
Exporters: []component.ID{component.NewID(nopType)},
823+
},
824+
},
825+
expectedError: `failed to build pipelines: connector "nop/connector1" used as exporter in [logs/in2] pipeline but not used in any supported receiver pipeline`,
826+
},
827+
"Connector used as receiver but not as exporter": {
828+
connectorCfg: map[component.ID]component.Config{
829+
component.NewIDWithName(nopType, "connector1"): &struct{}{},
830+
},
831+
receiverCfg: map[component.ID]component.Config{
832+
component.NewID(nopType): &struct{}{},
833+
},
834+
exporterCfg: map[component.ID]component.Config{
835+
component.NewID(nopType): &struct{}{},
836+
},
837+
pipelinesCfg: pipelines.Config{
838+
pipeline.NewIDWithName(pipeline.SignalLogs, "in1"): {
839+
Receivers: []component.ID{component.NewID(nopType)},
840+
Processors: []component.ID{},
841+
Exporters: []component.ID{component.NewID(nopType)},
842+
},
843+
pipeline.NewIDWithName(pipeline.SignalLogs, "in2"): {
844+
Receivers: []component.ID{component.NewIDWithName(nopType, "connector1")},
845+
Processors: []component.ID{},
846+
Exporters: []component.ID{component.NewID(nopType)},
847+
},
848+
pipeline.NewIDWithName(pipeline.SignalLogs, "out"): {
849+
Receivers: []component.ID{component.NewID(nopType)},
850+
Processors: []component.ID{},
851+
Exporters: []component.ID{component.NewID(nopType)},
852+
},
853+
},
854+
expectedError: `failed to build pipelines: connector "nop/connector1" used as receiver in [logs/in2] pipeline but not used in any supported exporter pipeline`,
855+
},
856+
"Connector creates direct cycle between pipelines": {
857+
connectorCfg: map[component.ID]component.Config{
858+
component.NewIDWithName(nopType, "forward"): &struct{}{},
859+
},
860+
receiverCfg: map[component.ID]component.Config{
861+
component.NewID(nopType): &struct{}{},
862+
},
863+
exporterCfg: map[component.ID]component.Config{
864+
component.NewID(nopType): &struct{}{},
865+
},
866+
pipelinesCfg: pipelines.Config{
867+
pipeline.NewIDWithName(pipeline.SignalTraces, "in"): {
868+
Receivers: []component.ID{component.NewIDWithName(nopType, "forward")},
869+
Processors: []component.ID{},
870+
Exporters: []component.ID{component.NewIDWithName(nopType, "forward")},
871+
},
872+
pipeline.NewIDWithName(pipeline.SignalTraces, "out"): {
873+
Receivers: []component.ID{component.NewIDWithName(nopType, "forward")},
874+
Processors: []component.ID{},
875+
Exporters: []component.ID{component.NewIDWithName(nopType, "forward")},
876+
},
877+
},
878+
expectedError: `failed to build pipelines: cycle detected: connector "nop/forward" (traces to traces) -> connector "nop/forward" (traces to traces)`,
879+
},
880+
}
881+
882+
_, connectorsFactories := builders.NewNopConnectorConfigsAndFactories()
883+
_, receiversFactories := builders.NewNopReceiverConfigsAndFactories()
884+
_, exportersFactories := builders.NewNopExporterConfigsAndFactories()
885+
886+
for name, tc := range testCases {
887+
t.Run(name, func(t *testing.T) {
888+
settings := Settings{
889+
ConnectorsConfigs: tc.connectorCfg,
890+
ConnectorsFactories: connectorsFactories,
891+
ReceiversConfigs: tc.receiverCfg,
892+
ReceiversFactories: receiversFactories,
893+
ExportersConfigs: tc.exporterCfg,
894+
ExportersFactories: exportersFactories,
895+
}
896+
cfg := Config{
897+
Pipelines: tc.pipelinesCfg,
898+
}
899+
900+
err := Validate(context.Background(), settings, cfg)
901+
if tc.expectedError == "" {
902+
require.NoError(t, err)
903+
} else {
904+
require.Error(t, err)
905+
assert.Equal(t, tc.expectedError, err.Error())
906+
}
907+
})
908+
}
909+
}

0 commit comments

Comments
 (0)