Skip to content

Commit 2a40f54

Browse files
author
Paulo Janotti
authored
Add trace head-sampling (open-telemetry#583) (open-telemetry#23)
Cherry-picking census-instrumentation/opencensus-service#583 from OC. * Add trace head-sampling This is to complete the sampling feature since OC Service already offers tail-sampling. This is implemented using an extra hashing as an attempt to avoid bias from trace ID generation and also to correctly spread traces for backends that also do hashing. Tests to enforce hash of different lengths. Make hashing seed configurable * Direct recommendation about hash seeding setting * More precise wording for hash-seed comment
1 parent d41326d commit 2a40f54

File tree

6 files changed

+564
-5
lines changed

6 files changed

+564
-5
lines changed

README.md

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ For now, please use the [OpenCensus Service](https://github.com/open-telemetry/o
2020
- [Exporters](#config-exporters)
2121
- [Diagnostics](#config-diagnostics)
2222
- [Global Attributes](#global-attributes)
23-
- [Intelligent Sampling](#tail-sampling)
23+
- [Sampling](#sampling)
2424
- [Usage](#usage)
2525

2626
## Introduction
@@ -243,9 +243,33 @@ global:
243243

244244
### <a name="sampling"></a>Sampling
245245

246-
Sampling can also be configured on the OpenTelemetry Service. Tail sampling
247-
must be configured on the Collector as it requires all spans for a given trace
248-
to make a sampling decision.
246+
Sampling can also be configured on the OpenTelemetry Service. Both head-based and
247+
tail-based sampling are supported. Either the Agent or the Collector may enable
248+
head-based sampling. Tail sampling must be configured on the Collector as it
249+
requires all spans for a given trace to make a sampling decision.
250+
251+
#### Head-based Example
252+
253+
```yaml
254+
sampling:
255+
# mode indicates if the sampling is head or tail based. For probabilistic the mode is head-based.
256+
mode: head
257+
policies:
258+
# section below defines a probabilistic trace sampler based on hashing the trace ID associated to
259+
# each span and sampling the span according to the given spans.
260+
probabilistic:
261+
configuration:
262+
# sampling-percentage is the percentage of sampling to be applied to all spans, unless their service is specified
263+
# on sampling-percentage.
264+
sampling-percentage: 5
265+
# hash-seed allows choosing the seed for the hash function used in the trace sampling. This is important when
266+
# multiple layers of collectors are being used with head sampling, in such scenarios make sure to
267+
# choose different seeds for each layer.
268+
hash-seed: 1
269+
```
270+
271+
#### Tail-based Example
272+
249273
```yaml
250274
sampling:
251275
mode: tail

cmd/occollector/app/builder/sampling_builder.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ const (
3333
// NoSampling mode is the default and means that all data arriving at the collector
3434
// is passed ahead.
3535
NoSampling Mode = "no-sampling"
36+
// HeadSampling is the mode in which trace data is sampled at ingestion, without seeing
37+
// the whole trace data.
38+
HeadSampling Mode = "head"
3639
// TailSampling is the mode in which trace data is temporarily retained until an evaluation
3740
// if the trace should be sampled is performed.
3841
TailSampling Mode = "tail"

cmd/occollector/app/collector/processors.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/open-telemetry/opentelemetry-service/processor/addattributesprocessor"
3636
"github.com/open-telemetry/opentelemetry-service/processor/attributekeyprocessor"
3737
"github.com/open-telemetry/opentelemetry-service/processor/multiconsumer"
38+
"github.com/open-telemetry/opentelemetry-service/processor/tracesamplerprocessor"
3839
)
3940

4041
func createExporters(v *viper.Viper, logger *zap.Logger) ([]func(), []consumer.TraceConsumer, []consumer.MetricsConsumer) {
@@ -281,7 +282,12 @@ func startProcessor(v *viper.Viper, logger *zap.Logger) (consumer.TraceConsumer,
281282

282283
var tailSamplingProcessor consumer.TraceConsumer
283284
samplingProcessorCfg := builder.NewDefaultSamplingCfg().InitFromViper(v)
284-
if samplingProcessorCfg.Mode == builder.TailSampling {
285+
useHeadSamplingProcessor := false
286+
if samplingProcessorCfg.Mode == builder.HeadSampling {
287+
// Head-sampling should be the first processor in the pipeline to avoid global operations on data
288+
// that is not going to be sampled, for now just set a flag to added the sampler later.
289+
useHeadSamplingProcessor = true
290+
} else if samplingProcessorCfg.Mode == builder.TailSampling {
285291
var err error
286292
tailSamplingProcessor, err = buildSamplingProcessor(samplingProcessorCfg, nameToTraceConsumer, v, logger)
287293
if err != nil {
@@ -331,5 +337,26 @@ func startProcessor(v *viper.Viper, logger *zap.Logger) (consumer.TraceConsumer,
331337
tp, _ = attributekeyprocessor.NewTraceProcessor(tp, multiProcessorCfg.Global.Attributes.KeyReplacements...)
332338
}
333339
}
340+
341+
if useHeadSamplingProcessor {
342+
vTraceSampler := v.Sub("sampling.policies.probabilistic.configuration")
343+
if vTraceSampler == nil {
344+
logger.Error("Trace head-based sampling mode is enabled but there is no valid policy section defined")
345+
os.Exit(1)
346+
}
347+
348+
cfg := &tracesamplerprocessor.TraceSamplerCfg{}
349+
samplerCfg, err := cfg.InitFromViper(vTraceSampler)
350+
if err != nil {
351+
logger.Error("Trace head-based sampling configuration error", zap.Error(err))
352+
os.Exit(1)
353+
}
354+
logger.Info(
355+
"Trace head-sampling enabled",
356+
zap.Float32("sampling-percentage", samplerCfg.SamplingPercentage),
357+
)
358+
tp, _ = tracesamplerprocessor.NewTraceProcessor(tp, *samplerCfg)
359+
}
360+
334361
return tp, closeFns
335362
}

cmd/occollector/app/collector/processors_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/open-telemetry/opentelemetry-service/processor/attributekeyprocessor"
2626
"github.com/open-telemetry/opentelemetry-service/processor/multiconsumer"
2727
"github.com/open-telemetry/opentelemetry-service/processor/processortest"
28+
"github.com/open-telemetry/opentelemetry-service/processor/tracesamplerprocessor"
2829
)
2930

3031
func Test_startProcessor(t *testing.T) {
@@ -85,6 +86,24 @@ func Test_startProcessor(t *testing.T) {
8586
return attributeKeyProcessor
8687
},
8788
},
89+
{
90+
name: "sampling_config_trace_sampler",
91+
setupViperCfg: func() *viper.Viper {
92+
v := viper.New()
93+
v.Set("logging-exporter", true)
94+
v.Set("sampling.mode", "head")
95+
v.Set("sampling.policies.probabilistic.configuration.sampling-percentage", 5)
96+
return v
97+
},
98+
wantExamplar: func(t *testing.T) interface{} {
99+
nopProcessor := processortest.NewNopTraceProcessor(nil)
100+
tracesamplerprocessor, err := tracesamplerprocessor.NewTraceProcessor(nopProcessor, tracesamplerprocessor.TraceSamplerCfg{})
101+
if err != nil {
102+
t.Fatalf("tracesamplerprocessor.NewTraceProcessor() = %v", err)
103+
}
104+
return tracesamplerprocessor
105+
},
106+
},
88107
}
89108
for _, tt := range tests {
90109
t.Run(tt.name, func(t *testing.T) {
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
// Copyright 2019, OpenCensus Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package tracesamplerprocessor
16+
17+
import (
18+
"context"
19+
"errors"
20+
"fmt"
21+
22+
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
23+
"github.com/spf13/viper"
24+
25+
"github.com/open-telemetry/opentelemetry-service/consumer"
26+
"github.com/open-telemetry/opentelemetry-service/data"
27+
"github.com/open-telemetry/opentelemetry-service/processor"
28+
)
29+
30+
const (
31+
// The constants below are tags used to read the configuration via viper.
32+
samplingPercentageCfgTag = "sampling-percentage"
33+
hashSeedCfgTag = "hash-seed"
34+
35+
// The constants help translate user friendly percentages to numbers direct used in sampling.
36+
numHashBuckets = 0x4000 // Using a power of 2 to avoid division.
37+
bitMaskHashBuckets = numHashBuckets - 1
38+
percentageScaleFactor = numHashBuckets / 100.0
39+
)
40+
41+
// TraceSamplerCfg has the configuration guiding the trace sampler processor.
42+
type TraceSamplerCfg struct {
43+
// SamplingPercentage is the percentage rate at which traces are going to be sampled. Defaults to zero, i.e.: no sample.
44+
// Values greater or equal 100 are treated as "sample all traces".
45+
SamplingPercentage float32
46+
// HashSeed allows one to configure the hashing seed. This is important in scenarios where multiple layers of collectors
47+
// have different sampling rates: if they use the same seed all passing one layer may pass the other even if they have
48+
// different sampling rates, configuring different seeds avoids that.
49+
HashSeed uint32
50+
}
51+
52+
// InitFromViper updates TraceSamplerCfg according to the viper configuration.
53+
func (tsc *TraceSamplerCfg) InitFromViper(v *viper.Viper) (*TraceSamplerCfg, error) {
54+
if v == nil {
55+
return nil, errors.New("v is nil")
56+
}
57+
if err := v.UnmarshalKey(samplingPercentageCfgTag, &tsc.SamplingPercentage); err != nil {
58+
return nil, fmt.Errorf("failed to unmarshal %q: %v", samplingPercentageCfgTag, err)
59+
}
60+
if err := v.UnmarshalKey(hashSeedCfgTag, &tsc.HashSeed); err != nil {
61+
return nil, fmt.Errorf("failed to unmarshal %q: %v", hashSeedCfgTag, err)
62+
}
63+
return tsc, nil
64+
}
65+
66+
type tracesamplerprocessor struct {
67+
nextConsumer consumer.TraceConsumer
68+
scaledSamplingRate uint32
69+
hashSeed uint32
70+
}
71+
72+
var _ processor.TraceProcessor = (*tracesamplerprocessor)(nil)
73+
74+
// NewTraceProcessor returns a processor.TraceProcessor that will perform head sampling according to the given
75+
// configuration.
76+
func NewTraceProcessor(nextConsumer consumer.TraceConsumer, cfg TraceSamplerCfg) (processor.TraceProcessor, error) {
77+
if nextConsumer == nil {
78+
return nil, errors.New("nextConsumer is nil")
79+
}
80+
81+
return &tracesamplerprocessor{
82+
nextConsumer: nextConsumer,
83+
// Adjust sampling percentage on private so recalculations are avoided.
84+
scaledSamplingRate: uint32(cfg.SamplingPercentage * percentageScaleFactor),
85+
hashSeed: cfg.HashSeed,
86+
}, nil
87+
}
88+
89+
func (tsp *tracesamplerprocessor) ConsumeTraceData(ctx context.Context, td data.TraceData) error {
90+
scaledSamplingRate := tsp.scaledSamplingRate
91+
if scaledSamplingRate >= numHashBuckets {
92+
return tsp.nextConsumer.ConsumeTraceData(ctx, td)
93+
}
94+
95+
sampledTraceData := data.TraceData{
96+
Node: td.Node,
97+
Resource: td.Resource,
98+
SourceFormat: td.SourceFormat,
99+
}
100+
101+
sampledSpans := make([]*tracepb.Span, 0, len(td.Spans))
102+
for _, span := range td.Spans {
103+
// If one assumes random trace ids hashing may seems avoidable, however, traces can be coming from sources
104+
// with various different criterias to generate trace id and perhaps were already sampled without hashing.
105+
// Hashing here prevents bias due to such systems.
106+
if hash(span.TraceId, tsp.hashSeed)&bitMaskHashBuckets < scaledSamplingRate {
107+
sampledSpans = append(sampledSpans, span)
108+
}
109+
}
110+
111+
sampledTraceData.Spans = sampledSpans
112+
113+
return tsp.nextConsumer.ConsumeTraceData(ctx, sampledTraceData)
114+
}
115+
116+
// hash is a murmur3 hash function, see http://en.wikipedia.org/wiki/MurmurHash.
117+
func hash(key []byte, seed uint32) (hash uint32) {
118+
const (
119+
c1 = 0xcc9e2d51
120+
c2 = 0x1b873593
121+
c3 = 0x85ebca6b
122+
c4 = 0xc2b2ae35
123+
r1 = 15
124+
r2 = 13
125+
m = 5
126+
n = 0xe6546b64
127+
)
128+
129+
hash = seed
130+
iByte := 0
131+
for ; iByte+4 <= len(key); iByte += 4 {
132+
k := uint32(key[iByte]) | uint32(key[iByte+1])<<8 | uint32(key[iByte+2])<<16 | uint32(key[iByte+3])<<24
133+
k *= c1
134+
k = (k << r1) | (k >> (32 - r1))
135+
k *= c2
136+
hash ^= k
137+
hash = (hash << r2) | (hash >> (32 - r2))
138+
hash = hash*m + n
139+
}
140+
141+
// TraceId and SpanId have lengths that are multiple of 4 so the code below is never expected to
142+
// be hit when sampling traces. However, it is preserved here to keep it as a correct murmur3 implementation.
143+
// This is enforced via tests.
144+
var remainingBytes uint32
145+
switch len(key) - iByte {
146+
case 3:
147+
remainingBytes += uint32(key[iByte+2]) << 16
148+
fallthrough
149+
case 2:
150+
remainingBytes += uint32(key[iByte+1]) << 8
151+
fallthrough
152+
case 1:
153+
remainingBytes += uint32(key[iByte])
154+
remainingBytes *= c1
155+
remainingBytes = (remainingBytes << r1) | (remainingBytes >> (32 - r1))
156+
remainingBytes = remainingBytes * c2
157+
hash ^= remainingBytes
158+
}
159+
160+
hash ^= uint32(len(key))
161+
hash ^= hash >> 16
162+
hash *= c3
163+
hash ^= hash >> 13
164+
hash *= c4
165+
hash ^= hash >> 16
166+
167+
return
168+
}

0 commit comments

Comments
 (0)