Skip to content

Commit 6795013

Browse files
authored
Merge pull request #48 from SumoLogic/span-filtering
Add span filtering capability
2 parents ec66429 + 77a85af commit 6795013

File tree

9 files changed

+376
-40
lines changed

9 files changed

+376
-40
lines changed

processor/filterprocessor/README.md

+24-6
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,33 @@
11
# Filter Processor
22

3-
Supported pipeline types: metrics
3+
Supported pipeline types: traces, metrics
44

55
The filter processor can be configured to include or exclude metrics based on
66
metric name in the case of the 'strict' or 'regexp' match types, or based on other
77
metric attributes in the case of the 'expr' match type. Please refer to
88
[config.go](./config.go) for the config spec.
99

10-
It takes a pipeline type, of which only `metrics` is supported, followed by an
11-
action:
10+
It takes a pipeline type, of which only `metrics` and `spans` is supported, where following actions are available:
1211
- `include`: Any names NOT matching filters are excluded from remainder of pipeline
1312
- `exclude`: Any names matching filters are excluded from remainder of pipeline
1413

15-
For the actions the following parameters are required:
14+
For the `metrics` actions the following parameters are required:
1615
- `match_type`: strict|regexp|expr
1716
- `metric_names`: (only for a `match_type` of 'strict' or 'regexp') list of strings or re2 regex patterns
1817
- `expressions`: (only for a `match_type` of 'expr') list of expr expressions (see "Using an 'expr' match_type" below)
1918

20-
More details can found at [include/exclude metrics](../README.md#includeexclude-metrics).
19+
For `spans`, following parameters are available:
20+
- `match_type`: strict|regexp|expr
21+
- `services`: list of strings for matching service names
22+
- `span_names`: list of strings for matching span names
23+
- `attributes`: list of attributes to match against
24+
25+
More details can found at [include/exclude](../README.md#includeexclude-metrics).
2126

2227
Examples:
2328

2429
```yaml
25-
processors:
30+
processors:
2631
filter/1:
2732
metrics:
2833
include:
@@ -35,6 +40,19 @@ processors:
3540
metric_names:
3641
- hello_world
3742
- hello/world
43+
filter/2:
44+
spans:
45+
include:
46+
match_type: regexp
47+
span_names:
48+
# re2 regexp patterns
49+
- ^prefix/.*
50+
- .*/suffix
51+
exclude:
52+
match_type: regexp
53+
span_names:
54+
- ^other_prefix/.*
55+
- .*/other_suffix
3856
```
3957
4058
Refer to the config files in [testdata](./testdata) for detailed

processor/filterprocessor/config.go

+15
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,28 @@ package filterprocessor
1616

1717
import (
1818
"go.opentelemetry.io/collector/config/configmodels"
19+
"go.opentelemetry.io/collector/internal/processor/filterconfig"
1920
"go.opentelemetry.io/collector/internal/processor/filtermetric"
2021
)
2122

2223
// Config defines configuration for Resource processor.
2324
type Config struct {
2425
configmodels.ProcessorSettings `mapstructure:",squash"`
2526
Metrics MetricFilters `mapstructure:"metrics"`
27+
Spans SpanFilters `mapstructure:"spans"`
28+
}
29+
30+
// SpanFilters filters by Span properties
31+
type SpanFilters struct {
32+
// Include match properties describe spans that should be included in the Collector Service pipeline,
33+
// all other spans should be dropped from further processing.
34+
// If both Include and Exclude are specified, Include filtering occurs first.
35+
Include *filterconfig.MatchProperties `mapstructure:"include"`
36+
37+
// Exclude match properties describe spans that should be excluded from the Collector Service pipeline,
38+
// all other spans should be included.
39+
// If both Include and Exclude are specified, Include filtering occurs first.
40+
Exclude *filterconfig.MatchProperties `mapstructure:"exclude"`
2641
}
2742

2843
// MetricFilter filters by Metric properties.

processor/filterprocessor/config_test.go

+44
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ import (
1818
"path"
1919
"testing"
2020

21+
"go.opentelemetry.io/collector/internal/processor/filterconfig"
22+
"go.opentelemetry.io/collector/internal/processor/filterset"
23+
2124
"github.com/stretchr/testify/assert"
2225
"github.com/stretchr/testify/require"
2326

@@ -311,3 +314,44 @@ func TestLoadingConfigExpr(t *testing.T) {
311314
})
312315
}
313316
}
317+
318+
func TestLoadingConfigSpans(t *testing.T) {
319+
factories, err := componenttest.ExampleComponents()
320+
require.NoError(t, err)
321+
factory := NewFactory()
322+
factories.Processors[configmodels.Type(typeStr)] = factory
323+
config, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config_spans.yaml"), factories)
324+
require.NoError(t, err)
325+
require.NotNil(t, config)
326+
327+
tests := []struct {
328+
filterName string
329+
expCfg configmodels.Processor
330+
}{
331+
{
332+
filterName: "filter/spans",
333+
expCfg: &Config{
334+
ProcessorSettings: configmodels.ProcessorSettings{
335+
NameVal: "filter/spans",
336+
TypeVal: typeStr,
337+
},
338+
Spans: SpanFilters{
339+
Include: &filterconfig.MatchProperties{
340+
Config: filterset.Config{MatchType: "regexp"},
341+
SpanNames: []string{"prefix/.*", ".*/suffix"},
342+
},
343+
Exclude: &filterconfig.MatchProperties{
344+
Config: filterset.Config{MatchType: "regexp"},
345+
SpanNames: []string{"other_prefix/.*", ".*/other_suffix"},
346+
},
347+
},
348+
},
349+
},
350+
}
351+
for _, test := range tests {
352+
t.Run(test.filterName, func(t *testing.T) {
353+
cfg := config.Processors[test.filterName]
354+
assert.Equal(t, test.expCfg, cfg)
355+
})
356+
}
357+
}

processor/filterprocessor/factory.go

+18
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ func NewFactory() component.ProcessorFactory {
3535
return processorhelper.NewFactory(
3636
typeStr,
3737
createDefaultConfig,
38+
processorhelper.WithTraces(createTracesProcessor),
3839
processorhelper.WithMetrics(createMetricsProcessor))
3940
}
4041

@@ -63,3 +64,20 @@ func createMetricsProcessor(
6364
fp,
6465
processorhelper.WithCapabilities(processorCapabilities))
6566
}
67+
68+
func createTracesProcessor(
69+
_ context.Context,
70+
params component.ProcessorCreateParams,
71+
cfg configmodels.Processor,
72+
nextConsumer consumer.TracesConsumer,
73+
) (component.TracesProcessor, error) {
74+
fp, err := newFilterSpanProcessor(params.Logger, cfg.(*Config))
75+
if err != nil {
76+
return nil, err
77+
}
78+
return processorhelper.NewTraceProcessor(
79+
cfg,
80+
nextConsumer,
81+
fp,
82+
processorhelper.WithCapabilities(processorCapabilities))
83+
}

processor/filterprocessor/factory_test.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,8 @@ func TestCreateProcessors(t *testing.T) {
8181
factory := NewFactory()
8282

8383
tp, tErr := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, cfg, consumertest.NewTracesNop())
84-
// Not implemented error
85-
assert.NotNil(t, tErr)
86-
assert.Nil(t, tp)
84+
assert.Equal(t, test.succeed, tp != nil)
85+
assert.Equal(t, test.succeed, tErr == nil)
8786

8887
mp, mErr := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, cfg, consumertest.NewMetricsNop())
8988
assert.Equal(t, test.succeed, mp != nil)

processor/filterprocessor/filter_processor.go

+129-27
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,32 @@ package filterprocessor
1717
import (
1818
"context"
1919

20+
"go.opentelemetry.io/collector/internal/processor/filterconfig"
21+
"go.opentelemetry.io/collector/internal/processor/filterspan"
22+
2023
"go.uber.org/zap"
2124

2225
"go.opentelemetry.io/collector/consumer/pdata"
2326
"go.opentelemetry.io/collector/internal/processor/filtermetric"
2427
"go.opentelemetry.io/collector/processor/processorhelper"
2528
)
2629

27-
type filterMetricProcessor struct {
28-
cfg *Config
29-
include filtermetric.Matcher
30-
exclude filtermetric.Matcher
31-
logger *zap.Logger
30+
type filterProcessor struct {
31+
cfg *Config
32+
includeMetrics filtermetric.Matcher
33+
excludeMetrics filtermetric.Matcher
34+
includeSpans filterspan.Matcher
35+
excludeSpans filterspan.Matcher
36+
logger *zap.Logger
3237
}
3338

34-
func newFilterMetricProcessor(logger *zap.Logger, cfg *Config) (*filterMetricProcessor, error) {
35-
inc, err := createMatcher(cfg.Metrics.Include)
39+
func newFilterMetricProcessor(logger *zap.Logger, cfg *Config) (*filterProcessor, error) {
40+
inc, err := createMetricsMatcher(cfg.Metrics.Include)
3641
if err != nil {
3742
return nil, err
3843
}
3944

40-
exc, err := createMatcher(cfg.Metrics.Exclude)
45+
exc, err := createMetricsMatcher(cfg.Metrics.Exclude)
4146
if err != nil {
4247
return nil, err
4348
}
@@ -62,32 +67,86 @@ func newFilterMetricProcessor(logger *zap.Logger, cfg *Config) (*filterMetricPro
6267

6368
logger.Info(
6469
"Metric filter configured",
65-
zap.String("include match_type", includeMatchType),
66-
zap.Strings("include expressions", includeExpressions),
67-
zap.Strings("include metric names", includeMetricNames),
68-
zap.String("exclude match_type", excludeMatchType),
69-
zap.Strings("exclude expressions", excludeExpressions),
70-
zap.Strings("exclude metric names", excludeMetricNames),
70+
zap.String("includeMetrics match_type", includeMatchType),
71+
zap.Strings("includeMetrics expressions", includeExpressions),
72+
zap.Strings("includeMetrics metric names", includeMetricNames),
73+
zap.String("excludeMetrics match_type", excludeMatchType),
74+
zap.Strings("excludeMetrics expressions", excludeExpressions),
75+
zap.Strings("excludeMetrics metric names", excludeMetricNames),
76+
)
77+
78+
return &filterProcessor{
79+
cfg: cfg,
80+
includeMetrics: inc,
81+
excludeMetrics: exc,
82+
logger: logger,
83+
}, nil
84+
}
85+
86+
func newFilterSpanProcessor(logger *zap.Logger, cfg *Config) (*filterProcessor, error) {
87+
inc, err := createSpansMatcher(cfg.Spans.Include)
88+
if err != nil {
89+
return nil, err
90+
}
91+
92+
exc, err := createSpansMatcher(cfg.Spans.Exclude)
93+
if err != nil {
94+
return nil, err
95+
}
96+
97+
includeMatchType := ""
98+
var includeServices []string
99+
var includeSpanNames []string
100+
if cfg.Spans.Include != nil {
101+
includeMatchType = string(cfg.Spans.Include.MatchType)
102+
includeServices = cfg.Spans.Include.Services
103+
includeSpanNames = cfg.Spans.Include.SpanNames
104+
}
105+
106+
excludeMatchType := ""
107+
var excludeServices []string
108+
var excludeSpanNames []string
109+
if cfg.Spans.Exclude != nil {
110+
excludeMatchType = string(cfg.Spans.Exclude.MatchType)
111+
excludeServices = cfg.Spans.Exclude.Services
112+
excludeSpanNames = cfg.Spans.Exclude.SpanNames
113+
}
114+
115+
logger.Info(
116+
"Span filter configured",
117+
zap.String("includeSpans match_type", includeMatchType),
118+
zap.Strings("includeSpans services", includeServices),
119+
zap.Strings("includeSpans span names", includeSpanNames),
120+
zap.String("excludeSpans match_type", excludeMatchType),
121+
zap.Strings("excludeSpans services", excludeServices),
122+
zap.Strings("excludeSpans span names", excludeSpanNames),
71123
)
72124

73-
return &filterMetricProcessor{
74-
cfg: cfg,
75-
include: inc,
76-
exclude: exc,
77-
logger: logger,
125+
return &filterProcessor{
126+
cfg: cfg,
127+
includeSpans: inc,
128+
excludeSpans: exc,
129+
logger: logger,
78130
}, nil
79131
}
80132

81-
func createMatcher(mp *filtermetric.MatchProperties) (filtermetric.Matcher, error) {
133+
func createSpansMatcher(mp *filterconfig.MatchProperties) (filterspan.Matcher, error) {
134+
if mp == nil {
135+
return nil, nil
136+
}
137+
return filterspan.NewMatcher(mp)
138+
}
139+
140+
func createMetricsMatcher(mp *filtermetric.MatchProperties) (filtermetric.Matcher, error) {
82141
// Nothing specified in configuration
83142
if mp == nil {
84143
return nil, nil
85144
}
86145
return filtermetric.NewMatcher(mp)
87146
}
88147

89-
// ProcessMetrics filters the given metrics based off the filterMetricProcessor's filters.
90-
func (fmp *filterMetricProcessor) ProcessMetrics(_ context.Context, pdm pdata.Metrics) (pdata.Metrics, error) {
148+
// ProcessMetrics filters the given metrics based off the filterProcessor's filters.
149+
func (fmp *filterProcessor) ProcessMetrics(_ context.Context, pdm pdata.Metrics) (pdata.Metrics, error) {
91150
rms := pdm.ResourceMetrics()
92151
idx := newMetricIndex()
93152
for i := 0; i < rms.Len(); i++ {
@@ -112,9 +171,34 @@ func (fmp *filterMetricProcessor) ProcessMetrics(_ context.Context, pdm pdata.Me
112171
return idx.extract(pdm), nil
113172
}
114173

115-
func (fmp *filterMetricProcessor) shouldKeepMetric(metric pdata.Metric) (bool, error) {
116-
if fmp.include != nil {
117-
matches, err := fmp.include.MatchMetric(metric)
174+
// ProcessTraces filters the given spans based off the filterProcessor's filters.
175+
func (fmp *filterProcessor) ProcessTraces(_ context.Context, pdt pdata.Traces) (pdata.Traces, error) {
176+
rs := pdt.ResourceSpans()
177+
for i := 0; i < rs.Len(); i++ {
178+
rss := rs.At(i)
179+
resource := rss.Resource()
180+
ils := rss.InstrumentationLibrarySpans()
181+
182+
for j := 0; j < ils.Len(); j++ {
183+
ilss := ils.At(j)
184+
library := ilss.InstrumentationLibrary()
185+
inputSpans := pdata.NewSpanSlice()
186+
ilss.Spans().MoveAndAppendTo(inputSpans)
187+
for k := 0; k < inputSpans.Len(); k++ {
188+
span := inputSpans.At(k)
189+
if fmp.shouldKeepSpan(span, resource, library) {
190+
ilss.Spans().Append(span)
191+
}
192+
}
193+
}
194+
}
195+
196+
return pdt, nil
197+
}
198+
199+
func (fmp *filterProcessor) shouldKeepMetric(metric pdata.Metric) (bool, error) {
200+
if fmp.includeMetrics != nil {
201+
matches, err := fmp.includeMetrics.MatchMetric(metric)
118202
if err != nil {
119203
// default to keep if there's an error
120204
return true, err
@@ -124,8 +208,8 @@ func (fmp *filterMetricProcessor) shouldKeepMetric(metric pdata.Metric) (bool, e
124208
}
125209
}
126210

127-
if fmp.exclude != nil {
128-
matches, err := fmp.exclude.MatchMetric(metric)
211+
if fmp.excludeMetrics != nil {
212+
matches, err := fmp.excludeMetrics.MatchMetric(metric)
129213
if err != nil {
130214
return true, err
131215
}
@@ -136,3 +220,21 @@ func (fmp *filterMetricProcessor) shouldKeepMetric(metric pdata.Metric) (bool, e
136220

137221
return true, nil
138222
}
223+
224+
func (fmp *filterProcessor) shouldKeepSpan(span pdata.Span, resource pdata.Resource, library pdata.InstrumentationLibrary) bool {
225+
if fmp.includeSpans != nil {
226+
matches := fmp.includeSpans.MatchSpan(span, resource, library)
227+
if !matches {
228+
return false
229+
}
230+
}
231+
232+
if fmp.excludeSpans != nil {
233+
matches := fmp.excludeSpans.MatchSpan(span, resource, library)
234+
if matches {
235+
return false
236+
}
237+
}
238+
239+
return true
240+
}

0 commit comments

Comments
 (0)