Skip to content

Commit ec66429

Browse files
committed
Store sampling.probability in sampled span attributes
1 parent ecb27f4 commit ec66429

File tree

4 files changed

+98
-22
lines changed

4 files changed

+98
-22
lines changed

processor/samplingprocessor/probabilisticsamplerprocessor/README.md

+4
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ The following configuration options can be modified:
2020
- `hash_seed` (no default): An integer used to compute the hash algorithm. Note that all collectors for a given tier (e.g. behind the same load balancer) should have the same hash_seed.
2121
- `sampling_percentage` (default = 0): Percentage at which traces are sampled; >= 100 samples all traces
2222

23+
The sampled spans have [`sampling.probability`](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/sdk.md#sampling)
24+
attribute added, which includes the value in range of `(0, 1.0]` representing the probability with which the record
25+
was sampled. If the span was already sampled before and the attribute is present, the existing value is multiplied.
26+
2327
Examples:
2428

2529
```yaml

processor/samplingprocessor/probabilisticsamplerprocessor/probabilisticsampler.go

+19-5
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"go.opentelemetry.io/collector/component/componenterror"
2323
"go.opentelemetry.io/collector/consumer"
2424
"go.opentelemetry.io/collector/consumer/pdata"
25+
"go.opentelemetry.io/collector/translator/conventions"
2526
)
2627

2728
// samplingPriority has the semantic result of parsing the "sampling.priority"
@@ -49,9 +50,10 @@ const (
4950
)
5051

5152
type tracesamplerprocessor struct {
52-
nextConsumer consumer.TracesConsumer
53-
scaledSamplingRate uint32
54-
hashSeed uint32
53+
nextConsumer consumer.TracesConsumer
54+
scaledSamplingRate uint32
55+
samplingProbability float64
56+
hashSeed uint32
5557
}
5658

5759
// newTraceProcessor returns a processor.TracesProcessor that will perform head sampling according to the given
@@ -64,8 +66,9 @@ func newTraceProcessor(nextConsumer consumer.TracesConsumer, cfg Config) (compon
6466
return &tracesamplerprocessor{
6567
nextConsumer: nextConsumer,
6668
// Adjust sampling percentage on private so recalculations are avoided.
67-
scaledSamplingRate: uint32(cfg.SamplingPercentage * percentageScaleFactor),
68-
hashSeed: cfg.HashSeed,
69+
scaledSamplingRate: uint32(cfg.SamplingPercentage * percentageScaleFactor),
70+
samplingProbability: float64(cfg.SamplingPercentage) * 0.01,
71+
hashSeed: cfg.HashSeed,
6972
}, nil
7073
}
7174

@@ -78,6 +81,16 @@ func (tsp *tracesamplerprocessor) ConsumeTraces(ctx context.Context, td pdata.Tr
7881
return tsp.nextConsumer.ConsumeTraces(ctx, sampledTraceData)
7982
}
8083

84+
func (tsp *tracesamplerprocessor) updateSamplingProbability(sampledSpanAttributes pdata.AttributeMap) {
85+
samplingProbability := tsp.samplingProbability
86+
attr, found := sampledSpanAttributes.Get(conventions.AttributeSamplingProbability)
87+
if found && attr.Type() == pdata.AttributeValueDOUBLE {
88+
samplingProbability *= attr.DoubleVal()
89+
}
90+
91+
sampledSpanAttributes.UpsertDouble(conventions.AttributeSamplingProbability, samplingProbability)
92+
}
93+
8194
func (tsp *tracesamplerprocessor) processTraces(resourceSpans pdata.ResourceSpans, sampledTraceData pdata.Traces) {
8295
scaledSamplingRate := tsp.scaledSamplingRate
8396

@@ -108,6 +121,7 @@ func (tsp *tracesamplerprocessor) processTraces(resourceSpans pdata.ResourceSpan
108121
hash(tidBytes[:], tsp.hashSeed)&bitMaskHashBuckets < scaledSamplingRate
109122

110123
if sampled {
124+
tsp.updateSamplingProbability(span.Attributes())
111125
spns.Append(span)
112126
}
113127
}

processor/samplingprocessor/probabilisticsamplerprocessor/probabilisticsampler_test.go

+74-17
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"go.opentelemetry.io/collector/consumer"
2929
"go.opentelemetry.io/collector/consumer/consumertest"
3030
"go.opentelemetry.io/collector/consumer/pdata"
31+
"go.opentelemetry.io/collector/translator/conventions"
3132
tracetranslator "go.opentelemetry.io/collector/translator/trace"
3233
)
3334

@@ -71,6 +72,7 @@ func TestNewTraceProcessor(t *testing.T) {
7172
if !tt.wantErr {
7273
// The truncation below with uint32 cannot be defined at initialization (compiler error), performing it at runtime.
7374
tt.want.(*tracesamplerprocessor).scaledSamplingRate = uint32(tt.cfg.SamplingPercentage * percentageScaleFactor)
75+
tt.want.(*tracesamplerprocessor).samplingProbability = float64(tt.cfg.SamplingPercentage) * 0.01
7476
}
7577
got, err := newTraceProcessor(tt.nextConsumer, tt.cfg)
7678
if (err != nil) != tt.wantErr {
@@ -227,15 +229,7 @@ func Test_tracesamplerprocessor_SamplingPercentageRange_MultipleResourceSpans(t
227229

228230
// Test_tracesamplerprocessor_SpanSamplingPriority checks if handling of "sampling.priority" is correct.
229231
func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) {
230-
singleSpanWithAttrib := func(key string, attribValue pdata.AttributeValue) pdata.Traces {
231-
traces := pdata.NewTraces()
232-
traces.ResourceSpans().Resize(1)
233-
rs := traces.ResourceSpans().At(0)
234-
rs.InstrumentationLibrarySpans().Resize(1)
235-
instrLibrarySpans := rs.InstrumentationLibrarySpans().At(0)
236-
instrLibrarySpans.Spans().Append(getSpanWithAttributes(key, attribValue))
237-
return traces
238-
}
232+
239233
tests := []struct {
240234
name string
241235
cfg Config
@@ -247,7 +241,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) {
247241
cfg: Config{
248242
SamplingPercentage: 0.0,
249243
},
250-
td: singleSpanWithAttrib(
244+
td: getTracesWithSpanWithAttribute(
251245
"sampling.priority",
252246
pdata.NewAttributeValueInt(2)),
253247
sampled: true,
@@ -257,7 +251,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) {
257251
cfg: Config{
258252
SamplingPercentage: 0.0,
259253
},
260-
td: singleSpanWithAttrib(
254+
td: getTracesWithSpanWithAttribute(
261255
"sampling.priority",
262256
pdata.NewAttributeValueDouble(1)),
263257
sampled: true,
@@ -267,7 +261,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) {
267261
cfg: Config{
268262
SamplingPercentage: 0.0,
269263
},
270-
td: singleSpanWithAttrib(
264+
td: getTracesWithSpanWithAttribute(
271265
"sampling.priority",
272266
pdata.NewAttributeValueString("1")),
273267
sampled: true,
@@ -277,7 +271,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) {
277271
cfg: Config{
278272
SamplingPercentage: 100.0,
279273
},
280-
td: singleSpanWithAttrib(
274+
td: getTracesWithSpanWithAttribute(
281275
"sampling.priority",
282276
pdata.NewAttributeValueInt(0)),
283277
},
@@ -286,7 +280,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) {
286280
cfg: Config{
287281
SamplingPercentage: 100.0,
288282
},
289-
td: singleSpanWithAttrib(
283+
td: getTracesWithSpanWithAttribute(
290284
"sampling.priority",
291285
pdata.NewAttributeValueDouble(0)),
292286
},
@@ -295,7 +289,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) {
295289
cfg: Config{
296290
SamplingPercentage: 100.0,
297291
},
298-
td: singleSpanWithAttrib(
292+
td: getTracesWithSpanWithAttribute(
299293
"sampling.priority",
300294
pdata.NewAttributeValueString("0")),
301295
},
@@ -304,7 +298,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) {
304298
cfg: Config{
305299
SamplingPercentage: 0.0,
306300
},
307-
td: singleSpanWithAttrib(
301+
td: getTracesWithSpanWithAttribute(
308302
"no.sampling.priority",
309303
pdata.NewAttributeValueInt(2)),
310304
},
@@ -313,7 +307,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) {
313307
cfg: Config{
314308
SamplingPercentage: 100.0,
315309
},
316-
td: singleSpanWithAttrib(
310+
td: getTracesWithSpanWithAttribute(
317311
"no.sampling.priority",
318312
pdata.NewAttributeValueInt(2)),
319313
sampled: true,
@@ -416,13 +410,76 @@ func Test_parseSpanSamplingPriority(t *testing.T) {
416410
}
417411
}
418412

413+
// Test_tracesamplerprocessor_SamplingProbabilityAttribute verifies if the attribute describing current sampling rate is included in sampled spans
414+
func Test_tracesamplerprocessor_SamplingProbabilityAttribute(t *testing.T) {
415+
cfg := Config{
416+
SamplingPercentage: 100.0,
417+
}
418+
419+
tests := []struct {
420+
name string
421+
traces pdata.Traces
422+
wantSamplingProbabilityAttribute pdata.AttributeValue
423+
}{
424+
{
425+
name: "simple_span",
426+
traces: getTracesWithSpanWithAttribute("foo", pdata.NewAttributeValueString("bar")),
427+
wantSamplingProbabilityAttribute: pdata.NewAttributeValueDouble(1.0),
428+
},
429+
{
430+
name: "span_came_through_sampler_already",
431+
traces: getTracesWithSpanWithAttribute(conventions.AttributeSamplingProbability, pdata.NewAttributeValueDouble(0.01)),
432+
wantSamplingProbabilityAttribute: pdata.NewAttributeValueDouble(0.01),
433+
},
434+
{
435+
name: "simple_with_invalid_attribute_value",
436+
traces: getTracesWithSpanWithAttribute(conventions.AttributeSamplingProbability, pdata.NewAttributeValueString("bar")),
437+
wantSamplingProbabilityAttribute: pdata.NewAttributeValueDouble(1.0),
438+
},
439+
}
440+
441+
for _, tt := range tests {
442+
t.Run(tt.name, func(t *testing.T) {
443+
sink := new(consumertest.TracesSink)
444+
tsp, err := newTraceProcessor(sink, cfg)
445+
if err != nil {
446+
t.Errorf("error when creating tracesamplerprocessor: %v", err)
447+
return
448+
}
449+
450+
if err := tsp.ConsumeTraces(context.Background(), tt.traces); err != nil {
451+
t.Errorf("tracesamplerprocessor.ConsumeTraceData() error = %v", err)
452+
return
453+
}
454+
assert.Equal(t, 1, sink.SpansCount())
455+
for _, td := range sink.AllTraces() {
456+
span := td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0)
457+
attrValue, found := span.Attributes().Get(conventions.AttributeSamplingProbability)
458+
assert.True(t, found, "Sampling probability attribute not found")
459+
assert.Equal(t, tt.wantSamplingProbabilityAttribute, attrValue)
460+
}
461+
sink.Reset()
462+
})
463+
}
464+
}
465+
419466
func getSpanWithAttributes(key string, value pdata.AttributeValue) pdata.Span {
420467
span := pdata.NewSpan()
421468
span.SetName("spanName")
422469
span.Attributes().InitFromMap(map[string]pdata.AttributeValue{key: value})
423470
return span
424471
}
425472

473+
func getTracesWithSpanWithAttribute(key string, attribValue pdata.AttributeValue) pdata.Traces {
474+
traces := pdata.NewTraces()
475+
traces.ResourceSpans().Resize(1)
476+
rs := traces.ResourceSpans().At(0)
477+
rs.InstrumentationLibrarySpans().Resize(1)
478+
instrLibrarySpans := rs.InstrumentationLibrarySpans().At(0)
479+
instrLibrarySpans.Spans().Append(getSpanWithAttributes(key, attribValue))
480+
return traces
481+
}
482+
426483
// Test_hash ensures that the hash function supports different key lengths even if in
427484
// practice it is only expected to receive keys with length 16 (trace id length in OC proto).
428485
func Test_hash(t *testing.T) {

translator/conventions/opentelemetry.go

+1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ const (
6161
AttributeProcessExecutablePath = "process.executable.path"
6262
AttributeProcessID = "process.pid"
6363
AttributeProcessOwner = "process.owner"
64+
AttributeSamplingProbability = "sampling.probability"
6465
AttributeServiceInstance = "service.instance.id"
6566
AttributeServiceName = "service.name"
6667
AttributeServiceNamespace = "service.namespace"

0 commit comments

Comments
 (0)