Skip to content

Commit 69a2ad0

Browse files
songy23Paulo Janotti
authored and
Paulo Janotti
committed
Add factory and update config for tail sampling processor (#200)
* Add factory and update config for tail sampling processor Updates #146 * Move to package processor * Remove an unnecessary check * Move CreateDefaultConfig to factory and add unit tests * Fix test failure * Remove commented code
1 parent c7eacad commit 69a2ad0

11 files changed

+252
-67
lines changed

processor/tailsampling/factory.go

+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Copyright 2019, OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package tailsampling
16+
17+
import (
18+
"time"
19+
20+
"go.uber.org/zap"
21+
22+
"github.com/open-telemetry/opentelemetry-service/config/configerror"
23+
"github.com/open-telemetry/opentelemetry-service/config/configmodels"
24+
"github.com/open-telemetry/opentelemetry-service/consumer"
25+
"github.com/open-telemetry/opentelemetry-service/processor"
26+
"github.com/open-telemetry/opentelemetry-service/service/builder"
27+
)
28+
29+
const (
30+
// The value of "type" Tail Sampling in configuration.
31+
typeStr = "tail-sampling"
32+
)
33+
34+
// Factory is the factory for Tail Sampling processor.
35+
type Factory struct {
36+
}
37+
38+
// Type gets the type of the config created by this factory.
39+
func (f *Factory) Type() string {
40+
return typeStr
41+
}
42+
43+
// CreateDefaultConfig creates the default configuration for processor.
44+
func (f *Factory) CreateDefaultConfig() configmodels.Processor {
45+
return &builder.TailBasedCfg{
46+
DecisionWait: 30 * time.Second,
47+
NumTraces: 50000,
48+
}
49+
}
50+
51+
// CreateTraceProcessor creates a trace processor based on this config.
52+
func (f *Factory) CreateTraceProcessor(
53+
logger *zap.Logger,
54+
nextConsumer consumer.TraceConsumer,
55+
cfg configmodels.Processor,
56+
) (processor.TraceProcessor, error) {
57+
tCfg := cfg.(*builder.TailBasedCfg)
58+
return NewTraceProcessor(logger, nextConsumer, *tCfg)
59+
}
60+
61+
// CreateMetricsProcessor creates a metrics processor based on this config.
62+
func (f *Factory) CreateMetricsProcessor(
63+
logger *zap.Logger,
64+
nextConsumer consumer.MetricsConsumer,
65+
cfg configmodels.Processor,
66+
) (processor.MetricsProcessor, error) {
67+
return nil, configerror.ErrDataTypeIsNotSupported
68+
}
+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright 2019, OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package tailsampling
16+
17+
import (
18+
"testing"
19+
20+
"github.com/stretchr/testify/assert"
21+
"go.uber.org/zap"
22+
23+
"github.com/open-telemetry/opentelemetry-service/exporter/exportertest"
24+
)
25+
26+
func TestCreateDefaultConfig(t *testing.T) {
27+
factory := &Factory{}
28+
29+
cfg := factory.CreateDefaultConfig()
30+
assert.NotNil(t, cfg, "failed to create default config")
31+
}
32+
33+
func TestCreateProcessor(t *testing.T) {
34+
factory := &Factory{}
35+
36+
cfg := factory.CreateDefaultConfig()
37+
38+
tp, err := factory.CreateTraceProcessor(zap.NewNop(), exportertest.NewNopTraceExporter(), cfg)
39+
assert.NotNil(t, tp)
40+
assert.NoError(t, err, "cannot create trace processor")
41+
42+
mp, err := factory.CreateMetricsProcessor(zap.NewNop(), nil, cfg)
43+
assert.Nil(t, mp)
44+
assert.Error(t, err, "should not be able to create metric processor")
45+
}

internal/collector/processor/tailsampling/processor.go renamed to processor/tailsampling/processor.go

+19-23
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package tailsampling
1616

1717
import (
1818
"context"
19+
"errors"
1920
"runtime"
2021
"sync"
2122
"sync/atomic"
@@ -30,7 +31,8 @@ import (
3031
"github.com/open-telemetry/opentelemetry-service/consumer/consumerdata"
3132
"github.com/open-telemetry/opentelemetry-service/internal/collector/processor/idbatcher"
3233
"github.com/open-telemetry/opentelemetry-service/internal/collector/sampling"
33-
"github.com/open-telemetry/opentelemetry-service/observability"
34+
"github.com/open-telemetry/opentelemetry-service/processor"
35+
"github.com/open-telemetry/opentelemetry-service/service/builder"
3436
)
3537

3638
// Policy combines a sampling policy evaluator with the destinations to be
@@ -54,6 +56,7 @@ type traceKey string
5456
// policies to sample traces.
5557
type tailSamplingSpanProcessor struct {
5658
ctx context.Context
59+
nextConsumer consumer.TraceConsumer
5760
start sync.Once
5861
maxNumTraces uint64
5962
policies []*Policy
@@ -69,40 +72,33 @@ const (
6972
sourceFormat = "tail-sampling"
7073
)
7174

72-
var _ consumer.TraceConsumer = (*tailSamplingSpanProcessor)(nil)
75+
var _ processor.TraceProcessor = (*tailSamplingSpanProcessor)(nil)
7376

74-
// NewTailSamplingSpanProcessor creates a TailSamplingSpanProcessor with the given policies.
75-
// It will keep maxNumTraces on memory and will attempt to wait until decisionWait before evaluating if
76-
// a trace should be sampled or not. Providing expectedNewTracesPerSec helps with allocating data structures
77-
// with closer to actual usage size.
78-
func NewTailSamplingSpanProcessor(
79-
policies []*Policy,
80-
maxNumTraces, expectedNewTracesPerSec uint64,
81-
decisionWait time.Duration,
82-
logger *zap.Logger) (consumer.TraceConsumer, error) {
77+
// NewTraceProcessor returns a processor.TraceProcessor that will perform tail sampling according to the given
78+
// configuration.
79+
func NewTraceProcessor(logger *zap.Logger, nextConsumer consumer.TraceConsumer, cfg builder.TailBasedCfg) (processor.TraceProcessor, error) {
80+
if nextConsumer == nil {
81+
return nil, errors.New("nextConsumer is nil")
82+
}
8383

84-
numDecisionBatches := uint64(decisionWait.Seconds())
85-
inBatcher, err := idbatcher.New(numDecisionBatches, expectedNewTracesPerSec, uint64(2*runtime.NumCPU()))
84+
numDecisionBatches := uint64(cfg.DecisionWait.Seconds())
85+
inBatcher, err := idbatcher.New(numDecisionBatches, cfg.ExpectedNewTracesPerSec, uint64(2*runtime.NumCPU()))
8686
if err != nil {
8787
return nil, err
8888
}
89+
8990
tsp := &tailSamplingSpanProcessor{
9091
ctx: context.Background(),
91-
maxNumTraces: maxNumTraces,
92-
policies: policies,
92+
nextConsumer: nextConsumer,
93+
maxNumTraces: cfg.NumTraces,
9394
logger: logger,
9495
decisionBatcher: inBatcher,
9596
}
9697

97-
for _, policy := range policies {
98-
policyCtx, err := tag.New(tsp.ctx, tag.Upsert(tagPolicyKey, policy.Name), tag.Upsert(observability.TagKeyReceiver, sourceFormat))
99-
if err != nil {
100-
return nil, err
101-
}
102-
policy.ctx = policyCtx
103-
}
98+
// TODO(#146): add policies to TailBasedCfg
10499
tsp.policyTicker = &policyTicker{onTick: tsp.samplingPolicyOnTick}
105-
tsp.deleteChan = make(chan traceKey, maxNumTraces)
100+
tsp.deleteChan = make(chan traceKey, cfg.NumTraces)
101+
106102
return tsp, nil
107103
}
108104

internal/collector/processor/tailsampling/processor_test.go renamed to processor/tailsampling/processor_test.go

+35-15
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,16 @@ import (
2020
"testing"
2121
"time"
2222

23+
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
24+
"go.uber.org/zap"
25+
2326
"github.com/open-telemetry/opentelemetry-service/consumer"
2427
"github.com/open-telemetry/opentelemetry-service/consumer/consumerdata"
28+
"github.com/open-telemetry/opentelemetry-service/exporter/exportertest"
2529
"github.com/open-telemetry/opentelemetry-service/internal/collector/processor/idbatcher"
2630
"github.com/open-telemetry/opentelemetry-service/internal/collector/sampling"
31+
"github.com/open-telemetry/opentelemetry-service/service/builder"
2732
tracetranslator "github.com/open-telemetry/opentelemetry-service/translator/trace"
28-
29-
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
30-
"go.uber.org/zap"
3133
)
3234

3335
const (
@@ -36,7 +38,12 @@ const (
3638

3739
func TestSequentialTraceArrival(t *testing.T) {
3840
traceIds, batches := generateIdsAndBatches(128)
39-
sp, _ := NewTailSamplingSpanProcessor(newTestPolicy(), uint64(2*len(traceIds)), 64, defaultTestDecisionWait, zap.NewNop())
41+
cfg := builder.TailBasedCfg{
42+
DecisionWait: defaultTestDecisionWait,
43+
NumTraces: uint64(2 * len(traceIds)),
44+
ExpectedNewTracesPerSec: 64,
45+
}
46+
sp, _ := NewTraceProcessor(zap.NewNop(), &exportertest.SinkTraceExporter{}, cfg)
4047
tsp := sp.(*tailSamplingSpanProcessor)
4148
for _, batch := range batches {
4249
tsp.ConsumeTraceData(context.Background(), batch)
@@ -57,7 +64,12 @@ func TestConcurrentTraceArrival(t *testing.T) {
5764
traceIds, batches := generateIdsAndBatches(128)
5865

5966
var wg sync.WaitGroup
60-
sp, _ := NewTailSamplingSpanProcessor(newTestPolicy(), uint64(2*len(traceIds)), 64, defaultTestDecisionWait, zap.NewNop())
67+
cfg := builder.TailBasedCfg{
68+
DecisionWait: defaultTestDecisionWait,
69+
NumTraces: uint64(2 * len(traceIds)),
70+
ExpectedNewTracesPerSec: 64,
71+
}
72+
sp, _ := NewTraceProcessor(zap.NewNop(), &exportertest.SinkTraceExporter{}, cfg)
6173
tsp := sp.(*tailSamplingSpanProcessor)
6274
for _, batch := range batches {
6375
// Add the same traceId twice.
@@ -90,7 +102,12 @@ func TestConcurrentTraceArrival(t *testing.T) {
90102
func TestSequentialTraceMapSize(t *testing.T) {
91103
traceIds, batches := generateIdsAndBatches(210)
92104
const maxSize = 100
93-
sp, _ := NewTailSamplingSpanProcessor(newTestPolicy(), uint64(maxSize), 64, defaultTestDecisionWait, zap.NewNop())
105+
cfg := builder.TailBasedCfg{
106+
DecisionWait: defaultTestDecisionWait,
107+
NumTraces: uint64(maxSize),
108+
ExpectedNewTracesPerSec: 64,
109+
}
110+
sp, _ := NewTraceProcessor(zap.NewNop(), &exportertest.SinkTraceExporter{}, cfg)
94111
tsp := sp.(*tailSamplingSpanProcessor)
95112
for _, batch := range batches {
96113
tsp.ConsumeTraceData(context.Background(), batch)
@@ -108,7 +125,12 @@ func TestConcurrentTraceMapSize(t *testing.T) {
108125
_, batches := generateIdsAndBatches(210)
109126
const maxSize = 100
110127
var wg sync.WaitGroup
111-
sp, _ := NewTailSamplingSpanProcessor(newTestPolicy(), uint64(maxSize), 64, defaultTestDecisionWait, zap.NewNop())
128+
cfg := builder.TailBasedCfg{
129+
DecisionWait: defaultTestDecisionWait,
130+
NumTraces: uint64(maxSize),
131+
ExpectedNewTracesPerSec: 64,
132+
}
133+
sp, _ := NewTraceProcessor(zap.NewNop(), &exportertest.SinkTraceExporter{}, cfg)
112134
tsp := sp.(*tailSamplingSpanProcessor)
113135
for _, batch := range batches {
114136
wg.Add(1)
@@ -133,19 +155,17 @@ func TestConcurrentTraceMapSize(t *testing.T) {
133155
}
134156

135157
func TestSamplingPolicyTypicalPath(t *testing.T) {
158+
t.Skip("TODO(#146): add policies to TailBasedCfg and fix this test")
136159
const maxSize = 100
137160
const decisionWaitSeconds = 5
138-
decisionWait := time.Second * decisionWaitSeconds
139161
msp := &mockSpanProcessor{}
140162
mpe := &mockPolicyEvaluator{}
141-
testPolicy := []*Policy{
142-
{
143-
Name: "test",
144-
Evaluator: mpe,
145-
Destination: msp,
146-
},
163+
cfg := builder.TailBasedCfg{
164+
DecisionWait: defaultTestDecisionWait,
165+
NumTraces: uint64(maxSize),
166+
ExpectedNewTracesPerSec: 64,
147167
}
148-
sp, _ := NewTailSamplingSpanProcessor(testPolicy, maxSize, 64, decisionWait, zap.NewNop())
168+
sp, _ := NewTraceProcessor(zap.NewNop(), &exportertest.SinkTraceExporter{}, cfg)
149169
tsp := sp.(*tailSamplingSpanProcessor)
150170

151171
// For this test explicitly control the timer calls and batcher.

service/builder/builder_test.go

-16
Original file line numberDiff line numberDiff line change
@@ -138,22 +138,6 @@ func TestTailSamplingPoliciesConfiguration(t *testing.T) {
138138
}
139139
}
140140

141-
func TestTailSamplingConfig(t *testing.T) {
142-
v, err := loadViperFromFile("./testdata/sampling_config.yaml")
143-
if err != nil {
144-
t.Fatalf("Failed to load viper from test file: %v", err)
145-
}
146-
147-
wCfg := NewDefaultTailBasedCfg()
148-
wCfg.DecisionWait = 31 * time.Second
149-
wCfg.NumTraces = 20001
150-
151-
gCfg := NewDefaultTailBasedCfg().InitFromViper(v)
152-
if !reflect.DeepEqual(gCfg, wCfg) {
153-
t.Fatalf("Wanted %+v but got %+v", *wCfg, *gCfg)
154-
}
155-
}
156-
157141
func loadViperFromFile(file string) (*viper.Viper, error) {
158142
v := viper.New()
159143
v.SetConfigFile(file)

service/builder/sampling_builder.go

+6-8
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import (
1818
"time"
1919

2020
"github.com/spf13/viper"
21+
22+
"github.com/open-telemetry/opentelemetry-service/config/configmodels"
2123
)
2224

2325
const (
@@ -159,20 +161,16 @@ func (sCfg *SamplingCfg) InitFromViper(v *viper.Viper) *SamplingCfg {
159161

160162
// TailBasedCfg holds the configuration for tail-based sampling.
161163
type TailBasedCfg struct {
164+
configmodels.ProcessorSettings `mapstructure:",squash"`
162165
// DecisionWait is the desired wait time from the arrival of the first span of
163166
// trace until the decision about sampling it or not is evaluated.
164167
DecisionWait time.Duration `mapstructure:"decision-wait"`
165168
// NumTraces is the number of traces kept on memory. Typically most of the data
166169
// of a trace is released after a sampling decision is taken.
167170
NumTraces uint64 `mapstructure:"num-traces"`
168-
}
169-
170-
// NewDefaultTailBasedCfg creates a TailBasedCfg with the default values.
171-
func NewDefaultTailBasedCfg() *TailBasedCfg {
172-
return &TailBasedCfg{
173-
DecisionWait: 30 * time.Second,
174-
NumTraces: 50000,
175-
}
171+
// ExpectedNewTracesPerSec sets the expected number of new traces sending to the tail sampling processor
172+
// per second. This helps with allocating data structures with closer to actual usage size.
173+
ExpectedNewTracesPerSec uint64 `mapstructure:"expected-new-traces-per-sec"`
176174
}
177175

178176
// InitFromViper initializes TailBasedCfg with properties from viper.

0 commit comments

Comments
 (0)