@@ -5,17 +5,24 @@ package routingconnector // import "github.com/open-telemetry/opentelemetry-coll
5
5
6
6
import (
7
7
"context"
8
+ "os"
9
+ "path/filepath"
8
10
"testing"
9
11
10
12
"github.com/stretchr/testify/assert"
11
13
"github.com/stretchr/testify/require"
14
+ "go.opentelemetry.io/collector/component"
12
15
"go.opentelemetry.io/collector/component/componenttest"
16
+ "go.opentelemetry.io/collector/confmap/confmaptest"
13
17
"go.opentelemetry.io/collector/connector"
14
18
"go.opentelemetry.io/collector/connector/connectortest"
15
19
"go.opentelemetry.io/collector/consumer"
16
20
"go.opentelemetry.io/collector/consumer/consumertest"
17
21
"go.opentelemetry.io/collector/pdata/plog"
18
22
"go.opentelemetry.io/collector/pipeline"
23
+
24
+ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
25
+ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest"
19
26
)
20
27
21
28
func TestLogsRegisterConsumersForValidRoute (t * testing.T ) {
@@ -465,3 +472,88 @@ func TestLogsConnectorCapabilities(t *testing.T) {
465
472
require .NoError (t , err )
466
473
assert .False (t , conn .Capabilities ().MutatesData )
467
474
}
475
+
476
+ func TestLogsConnectorDetailed (t * testing.T ) {
477
+ testCases := []string {
478
+ filepath .Join ("testdata" , "logs" , "resource_context" , "all_match_first_only" ),
479
+ filepath .Join ("testdata" , "logs" , "resource_context" , "all_match_last_only" ),
480
+ filepath .Join ("testdata" , "logs" , "resource_context" , "all_match_once" ),
481
+ filepath .Join ("testdata" , "logs" , "resource_context" , "each_matches_one" ),
482
+ filepath .Join ("testdata" , "logs" , "resource_context" , "match_none_with_default" ),
483
+ filepath .Join ("testdata" , "logs" , "resource_context" , "match_none_without_default" ),
484
+ }
485
+
486
+ for _ , tt := range testCases {
487
+ t .Run (tt , func (t * testing.T ) {
488
+
489
+ cm , err := confmaptest .LoadConf (filepath .Join (tt , "config.yaml" ))
490
+ require .NoError (t , err )
491
+ factory := NewFactory ()
492
+ cfg := factory .CreateDefaultConfig ()
493
+ sub , err := cm .Sub ("routing" )
494
+ require .NoError (t , err )
495
+ require .NoError (t , sub .Unmarshal (cfg ))
496
+ require .NoError (t , component .ValidateConfig (cfg ))
497
+
498
+ var sinkDefault , sink0 , sink1 consumertest.LogsSink
499
+ router := connector .NewLogsRouter (map [pipeline.ID ]consumer.Logs {
500
+ pipeline .NewIDWithName (pipeline .SignalLogs , "default" ): & sinkDefault ,
501
+ pipeline .NewIDWithName (pipeline .SignalLogs , "0" ): & sink0 ,
502
+ pipeline .NewIDWithName (pipeline .SignalLogs , "1" ): & sink1 ,
503
+ })
504
+
505
+ conn , err := factory .CreateLogsToLogs (
506
+ context .Background (),
507
+ connectortest .NewNopSettings (),
508
+ cfg ,
509
+ router .(consumer.Logs ),
510
+ )
511
+ require .NoError (t , err )
512
+
513
+ var expected0 , expected1 , expectedDefault * plog.Logs
514
+ if expected , readErr := golden .ReadLogs (filepath .Join (tt , "sink_0.yaml" )); readErr == nil {
515
+ expected0 = & expected
516
+ } else if ! os .IsNotExist (readErr ) {
517
+ t .Fatalf ("Error reading sink_0.yaml: %v" , readErr )
518
+ }
519
+
520
+ if expected , readErr := golden .ReadLogs (filepath .Join (tt , "sink_1.yaml" )); readErr == nil {
521
+ expected1 = & expected
522
+ } else if ! os .IsNotExist (readErr ) {
523
+ t .Fatalf ("Error reading sink_1.yaml: %v" , readErr )
524
+ }
525
+
526
+ if expected , readErr := golden .ReadLogs (filepath .Join (tt , "sink_default.yaml" )); readErr == nil {
527
+ expectedDefault = & expected
528
+ } else if ! os .IsNotExist (readErr ) {
529
+ t .Fatalf ("Error reading sink_default.yaml: %v" , readErr )
530
+ }
531
+
532
+ input , readErr := golden .ReadLogs (filepath .Join (tt , "input.yaml" ))
533
+ require .NoError (t , readErr )
534
+
535
+ require .NoError (t , conn .ConsumeLogs (context .Background (), input ))
536
+
537
+ if expected0 == nil {
538
+ assert .Empty (t , sink0 .AllLogs (), "sink0 should be empty" )
539
+ } else {
540
+ require .Len (t , sink0 .AllLogs (), 1 , "sink0 should have one plog.Logs" )
541
+ assert .NoError (t , plogtest .CompareLogs (* expected0 , sink0 .AllLogs ()[0 ]), "sink0 has unexpected result" )
542
+ }
543
+
544
+ if expected1 == nil {
545
+ assert .Empty (t , sink1 .AllLogs (), "sink1 should be empty" )
546
+ } else {
547
+ require .Len (t , sink1 .AllLogs (), 1 , "sink1 should have one plog.Logs" )
548
+ assert .NoError (t , plogtest .CompareLogs (* expected1 , sink1 .AllLogs ()[0 ]), "sink1 has unexpected result" )
549
+ }
550
+
551
+ if expectedDefault == nil {
552
+ assert .Empty (t , sinkDefault .AllLogs (), "sinkDefault should be empty" )
553
+ } else {
554
+ require .Len (t , sinkDefault .AllLogs (), 1 , "sinkDefault should have one plog.Logs" )
555
+ assert .NoError (t , plogtest .CompareLogs (* expectedDefault , sinkDefault .AllLogs ()[0 ]), "sinkDefault has unexpected result" )
556
+ }
557
+ })
558
+ }
559
+ }
0 commit comments