Skip to content

Commit f21ab46

Browse files
EOjeahf7o
authored andcommitted
[processor/tailsampling] Include componentID as prefix in metrics 'policy' (open-telemetry#34192)
**Description:** Fixing a bug - This change includes the componentID as a dot prefix to the metrics `policy` dimension when generating metrics for the processor. The change ensures that similarly named policy's in the tail sampling processor that belong to different components also has a unique value in the `policy` field for the metrics. Also includes minor refactor change to rename `telemetry` to `telemetryBuilder` where applicable (return type == `NewTelemetryBuilder`) Resolves: open-telemetry#34099 **Link to tracking Issue:** <Issue number if applicable> **Testing:** Ran the collector locally with `make run` with the configuration below which uses the tail sampling processor and has metrics exposed in prometheus format. Sending sample zipkin spans to the receiver ```yaml receivers: zipkin: processors: tail_sampling: policies: [ { name: test-policy-1, type: always_sample } ] tail_sampling/custom_name: policies: [ { name: test-policy-1, type: always_sample } ] exporters: debug: service: telemetry: logs: metrics: pipelines: traces: receivers: [zipkin] processors: [tail_sampling, tail_sampling/custom_name] exporters: [debug] ``` Curling the metrics endpoint shows the policy name is unique for both tail sampling processors ```bash otelcol_processor_tail_sampling_sampling_decision_latency_bucket{policy="custom_name.test-policy-1",service_instance_id="X",service_name="otelcontribcol",service_version="0.105.0-dev",le="5000"} 1 otelcol_processor_tail_sampling_sampling_decision_latency_bucket{policy="test-policy-1",service_instance_id="X",service_name="otelcontribcol",service_version="0.105.0-dev",le="5000"} 1 ``` Tasks - [ ] Confirm prefix separator as `.` - [ ] Update change log entry
1 parent bca2dda commit f21ab46

7 files changed

+159
-30
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: 'bug_fix'
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: tailsamplingprocessor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "Update the `policy` value in metrics dimension value to be unique across multiple tail sampling components with the same policy name."
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [34192]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: "This change ensures that the `policy` value in the metrics exported by the tail sampling processor is unique across multiple tail sampling processors with the same policy name."
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

processor/tailsamplingprocessor/factory.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,5 +38,5 @@ func createTracesProcessor(
3838
nextConsumer consumer.Traces,
3939
) (processor.Traces, error) {
4040
tCfg := cfg.(*Config)
41-
return newTracesProcessor(ctx, params.TelemetrySettings, nextConsumer, *tCfg)
41+
return newTracesProcessor(ctx, params, nextConsumer, *tCfg)
4242
}

processor/tailsamplingprocessor/processor.go

+11-5
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,9 @@ type Option func(*tailSamplingSpanProcessor)
8383

8484
// newTracesProcessor returns a processor.TracesProcessor that will perform tail sampling according to the given
8585
// configuration.
86-
func newTracesProcessor(ctx context.Context, settings component.TelemetrySettings, nextConsumer consumer.Traces, cfg Config, opts ...Option) (processor.Traces, error) {
87-
telemetry, err := metadata.NewTelemetryBuilder(settings)
86+
func newTracesProcessor(ctx context.Context, set processor.Settings, nextConsumer consumer.Traces, cfg Config, opts ...Option) (processor.Traces, error) {
87+
telemetrySettings := set.TelemetrySettings
88+
telemetry, err := metadata.NewTelemetryBuilder(telemetrySettings)
8889
if err != nil {
8990
return nil, err
9091
}
@@ -102,7 +103,7 @@ func newTracesProcessor(ctx context.Context, settings component.TelemetrySetting
102103
nextConsumer: nextConsumer,
103104
maxNumTraces: cfg.NumTraces,
104105
sampledIDCache: sampledDecisions,
105-
logger: settings.Logger,
106+
logger: telemetrySettings.Logger,
106107
numTracesOnMap: &atomic.Uint64{},
107108
deleteChan: make(chan pcommon.TraceID, cfg.NumTraces),
108109
}
@@ -119,6 +120,7 @@ func newTracesProcessor(ctx context.Context, settings component.TelemetrySetting
119120
if tsp.policies == nil {
120121
policyNames := map[string]bool{}
121122
tsp.policies = make([]*policy, len(cfg.PolicyCfgs))
123+
componentID := set.ID.Name()
122124
for i := range cfg.PolicyCfgs {
123125
policyCfg := &cfg.PolicyCfgs[i]
124126

@@ -127,14 +129,18 @@ func newTracesProcessor(ctx context.Context, settings component.TelemetrySetting
127129
}
128130
policyNames[policyCfg.Name] = true
129131

130-
eval, err := getPolicyEvaluator(settings, policyCfg)
132+
eval, err := getPolicyEvaluator(telemetrySettings, policyCfg)
131133
if err != nil {
132134
return nil, err
133135
}
136+
uniquePolicyName := policyCfg.Name
137+
if componentID != "" {
138+
uniquePolicyName = fmt.Sprintf("%s.%s", componentID, policyCfg.Name)
139+
}
134140
p := &policy{
135141
name: policyCfg.Name,
136142
evaluator: eval,
137-
attribute: metric.WithAttributes(attribute.String("policy", policyCfg.Name)),
143+
attribute: metric.WithAttributes(attribute.String("policy", uniquePolicyName)),
138144
}
139145
tsp.policies[i] = p
140146
}

processor/tailsamplingprocessor/processor_benchmarks_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"go.opentelemetry.io/collector/component/componenttest"
1313
"go.opentelemetry.io/collector/consumer/consumertest"
1414
"go.opentelemetry.io/collector/pdata/ptrace"
15+
"go.opentelemetry.io/collector/processor/processortest"
1516

1617
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling"
1718
)
@@ -24,8 +25,7 @@ func BenchmarkSampling(b *testing.B) {
2425
ExpectedNewTracesPerSec: 64,
2526
PolicyCfgs: testPolicy,
2627
}
27-
28-
sp, _ := newTracesProcessor(context.Background(), componenttest.NewNopTelemetrySettings(), consumertest.NewNop(), cfg)
28+
sp, _ := newTracesProcessor(context.Background(), processortest.NewNopSettings(), consumertest.NewNop(), cfg)
2929
tsp := sp.(*tailSamplingSpanProcessor)
3030
require.NoError(b, tsp.Start(context.Background(), componenttest.NewNopHost()))
3131
defer func() {

processor/tailsamplingprocessor/processor_decisions_test.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func TestSamplingPolicyTypicalPath(t *testing.T) {
2626
}
2727
nextConsumer := new(consumertest.TracesSink)
2828
s := setupTestTelemetry()
29-
ct := s.NewSettings().TelemetrySettings
29+
ct := s.NewSettings()
3030
idb := newSyncIDBatcher()
3131

3232
mpe1 := &mockPolicyEvaluator{}
@@ -71,7 +71,7 @@ func TestSamplingPolicyInvertSampled(t *testing.T) {
7171
}
7272
nextConsumer := new(consumertest.TracesSink)
7373
s := setupTestTelemetry()
74-
ct := s.NewSettings().TelemetrySettings
74+
ct := s.NewSettings()
7575
idb := newSyncIDBatcher()
7676

7777
mpe1 := &mockPolicyEvaluator{}
@@ -116,7 +116,7 @@ func TestSamplingMultiplePolicies(t *testing.T) {
116116
}
117117
nextConsumer := new(consumertest.TracesSink)
118118
s := setupTestTelemetry()
119-
ct := s.NewSettings().TelemetrySettings
119+
ct := s.NewSettings()
120120
idb := newSyncIDBatcher()
121121

122122
mpe1 := &mockPolicyEvaluator{}
@@ -167,7 +167,7 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) {
167167
}
168168
nextConsumer := new(consumertest.TracesSink)
169169
s := setupTestTelemetry()
170-
ct := s.NewSettings().TelemetrySettings
170+
ct := s.NewSettings()
171171
idb := newSyncIDBatcher()
172172

173173
mpe1 := &mockPolicyEvaluator{}
@@ -213,7 +213,7 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) {
213213
}
214214
nextConsumer := new(consumertest.TracesSink)
215215
s := setupTestTelemetry()
216-
ct := s.NewSettings().TelemetrySettings
216+
ct := s.NewSettings()
217217
idb := newSyncIDBatcher()
218218

219219
mpe1 := &mockPolicyEvaluator{}
@@ -264,7 +264,7 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) {
264264
}
265265
nextConsumer := new(consumertest.TracesSink)
266266
s := setupTestTelemetry()
267-
ct := s.NewSettings().TelemetrySettings
267+
ct := s.NewSettings()
268268
idb := newSyncIDBatcher()
269269

270270
mpe1 := &mockPolicyEvaluator{}
@@ -334,7 +334,7 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) {
334334
}
335335
nextConsumer := new(consumertest.TracesSink)
336336
s := setupTestTelemetry()
337-
ct := s.NewSettings().TelemetrySettings
337+
ct := s.NewSettings()
338338
idb := newSyncIDBatcher()
339339

340340
mpe := &mockPolicyEvaluator{}

processor/tailsamplingprocessor/processor_telemetry_test.go

+101-4
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/stretchr/testify/assert"
1111
"github.com/stretchr/testify/require"
12+
"go.opentelemetry.io/collector/component"
1213
"go.opentelemetry.io/collector/component/componenttest"
1314
"go.opentelemetry.io/collector/consumer/consumertest"
1415
"go.opentelemetry.io/collector/featuregate"
@@ -37,7 +38,7 @@ func TestMetricsAfterOneEvaluation(t *testing.T) {
3738
},
3839
}
3940
cs := &consumertest.TracesSink{}
40-
ct := s.NewSettings().TelemetrySettings
41+
ct := s.NewSettings()
4142
proc, err := newTracesProcessor(context.Background(), ct, cs, cfg, withDecisionBatcher(syncBatcher))
4243
require.NoError(t, err)
4344
defer func() {
@@ -211,6 +212,102 @@ func TestMetricsAfterOneEvaluation(t *testing.T) {
211212
assert.Len(t, cs.AllTraces(), 1)
212213
}
213214

215+
func TestMetricsWithComponentID(t *testing.T) {
216+
// prepare
217+
s := setupTestTelemetry()
218+
b := newSyncIDBatcher()
219+
syncBatcher := b.(*syncIDBatcher)
220+
221+
cfg := Config{
222+
DecisionWait: 1,
223+
NumTraces: 100,
224+
PolicyCfgs: []PolicyCfg{
225+
{
226+
sharedPolicyCfg: sharedPolicyCfg{
227+
Name: "always",
228+
Type: AlwaysSample,
229+
},
230+
},
231+
},
232+
}
233+
cs := &consumertest.TracesSink{}
234+
ct := s.NewSettings()
235+
ct.ID = component.MustNewIDWithName("tail_sampling", "unique_id") // e.g tail_sampling/unique_id
236+
proc, err := newTracesProcessor(context.Background(), ct, cs, cfg, withDecisionBatcher(syncBatcher))
237+
require.NoError(t, err)
238+
defer func() {
239+
err = proc.Shutdown(context.Background())
240+
require.NoError(t, err)
241+
}()
242+
243+
err = proc.Start(context.Background(), componenttest.NewNopHost())
244+
require.NoError(t, err)
245+
246+
// test
247+
err = proc.ConsumeTraces(context.Background(), simpleTraces())
248+
require.NoError(t, err)
249+
250+
tsp := proc.(*tailSamplingSpanProcessor)
251+
tsp.policyTicker.OnTick() // the first tick always gets an empty batch
252+
tsp.policyTicker.OnTick()
253+
254+
// verify
255+
var md metricdata.ResourceMetrics
256+
require.NoError(t, s.reader.Collect(context.Background(), &md))
257+
require.Equal(t, 8, s.len(md))
258+
259+
for _, tt := range []struct {
260+
opts []metricdatatest.Option
261+
m metricdata.Metrics
262+
}{
263+
{
264+
opts: []metricdatatest.Option{metricdatatest.IgnoreTimestamp()},
265+
m: metricdata.Metrics{
266+
Name: "otelcol_processor_tail_sampling_count_traces_sampled",
267+
Description: "Count of traces that were sampled or not per sampling policy",
268+
Unit: "{traces}",
269+
Data: metricdata.Sum[int64]{
270+
IsMonotonic: true,
271+
Temporality: metricdata.CumulativeTemporality,
272+
DataPoints: []metricdata.DataPoint[int64]{
273+
{
274+
Attributes: attribute.NewSet(
275+
attribute.String("policy", "unique_id.always"),
276+
attribute.String("sampled", "true"),
277+
),
278+
Value: 1,
279+
},
280+
},
281+
},
282+
},
283+
},
284+
{
285+
opts: []metricdatatest.Option{metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()},
286+
m: metricdata.Metrics{
287+
Name: "otelcol_processor_tail_sampling_sampling_decision_latency",
288+
Description: "Latency (in microseconds) of a given sampling policy",
289+
Unit: "µs",
290+
Data: metricdata.Histogram[int64]{
291+
Temporality: metricdata.CumulativeTemporality,
292+
DataPoints: []metricdata.HistogramDataPoint[int64]{
293+
{
294+
Attributes: attribute.NewSet(
295+
attribute.String("policy", "unique_id.always"),
296+
),
297+
},
298+
},
299+
},
300+
},
301+
},
302+
} {
303+
got := s.getMetric(tt.m.Name, md)
304+
metricdatatest.AssertEqual(t, tt.m, got, tt.opts...)
305+
}
306+
307+
// sanity check
308+
assert.Len(t, cs.AllTraces(), 1)
309+
}
310+
214311
func TestProcessorTailSamplingCountSpansSampled(t *testing.T) {
215312
err := featuregate.GlobalRegistry().Set("processor.tailsamplingprocessor.metricstatcountspanssampled", true)
216313
require.NoError(t, err)
@@ -238,7 +335,7 @@ func TestProcessorTailSamplingCountSpansSampled(t *testing.T) {
238335
},
239336
}
240337
cs := &consumertest.TracesSink{}
241-
ct := s.NewSettings().TelemetrySettings
338+
ct := s.NewSettings()
242339
proc, err := newTracesProcessor(context.Background(), ct, cs, cfg, withDecisionBatcher(syncBatcher))
243340
require.NoError(t, err)
244341
defer func() {
@@ -303,7 +400,7 @@ func TestProcessorTailSamplingSamplingTraceRemovalAge(t *testing.T) {
303400
},
304401
}
305402
cs := &consumertest.TracesSink{}
306-
ct := s.NewSettings().TelemetrySettings
403+
ct := s.NewSettings()
307404
proc, err := newTracesProcessor(context.Background(), ct, cs, cfg, withDecisionBatcher(syncBatcher))
308405
require.NoError(t, err)
309406
defer func() {
@@ -364,7 +461,7 @@ func TestProcessorTailSamplingSamplingLateSpanAge(t *testing.T) {
364461
},
365462
}
366463
cs := &consumertest.TracesSink{}
367-
ct := s.NewSettings().TelemetrySettings
464+
ct := s.NewSettings()
368465
proc, err := newTracesProcessor(context.Background(), ct, cs, cfg, withDecisionBatcher(syncBatcher))
369466
require.NoError(t, err)
370467
defer func() {

0 commit comments

Comments
 (0)