Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Relax adaptive sampling dependency on sampler tags #6692

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 3 additions & 11 deletions internal/sampling/samplingstrategy/adaptive/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (a *aggregator) saveThroughput() {
a.storage.InsertThroughput(throughputSlice)
}

func (a *aggregator) RecordThroughput(service, operation string, samplerType span_model.SamplerType, probability float64) {
func (a *aggregator) RecordThroughput(service, operation string, _ span_model.SamplerType, probability float64) {
a.Lock()
defer a.Unlock()
if _, ok := a.currentThroughput[service]; !ok {
Expand All @@ -113,16 +113,11 @@ func (a *aggregator) RecordThroughput(service, operation string, samplerType spa
}
a.currentThroughput[service][operation] = throughput
}
probStr := TruncateFloat(probability)
throughput.Count++
probStr := truncateFloat(probability)
if len(throughput.Probabilities) != maxProbabilities {
throughput.Probabilities[probStr] = struct{}{}
}
// Only if we see probabilistically sampled root spans do we increment the throughput counter,
// for lowerbound sampled spans, we don't increment at all but we still save a count of 0 as
// the throughput so that the adaptive sampling processor is made aware of the endpoint.
if samplerType == span_model.SamplerTypeProbabilistic {
throughput.Count++
}
}

func (a *aggregator) Start() {
Expand Down Expand Up @@ -152,9 +147,6 @@ func (a *aggregator) HandleRootSpan(span *span_model.Span) {
return
}
samplerType, samplerParam := getSamplerParams(span, a.postAggregator.logger)
if samplerType == span_model.SamplerTypeUnrecognized {
return
}
a.RecordThroughput(service, span.OperationName, samplerType, samplerParam)
}

Expand Down
56 changes: 8 additions & 48 deletions internal/sampling/samplingstrategy/adaptive/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestLowerboundThroughput(t *testing.T) {
a, err := NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage)
require.NoError(t, err)
a.RecordThroughput("A", http.MethodGet, model.SamplerTypeLowerBound, 0.001)
assert.EqualValues(t, 0, a.(*aggregator).currentThroughput["A"][http.MethodGet].Count)
assert.EqualValues(t, 1, a.(*aggregator).currentThroughput["A"][http.MethodGet].Count)
assert.Empty(t, a.(*aggregator).currentThroughput["A"][http.MethodGet].Probabilities["0.001000"])
}

Expand All @@ -116,76 +116,36 @@ func TestRecordThroughput(t *testing.T) {
AggregationBuckets: 1,
BucketsForCalculation: 1,
}
logger := zap.NewNop()
a, err := NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage)
agg, err := NewAggregator(testOpts, zap.NewNop(), metricsFactory, mockEP, mockStorage)
require.NoError(t, err)
a := agg.(*aggregator)

// Testing non-root span
span := &model.Span{References: []model.SpanRef{{SpanID: model.NewSpanID(1), RefType: model.ChildOf}}}
a.HandleRootSpan(span)
require.Empty(t, a.(*aggregator).currentThroughput)
require.Empty(t, a.currentThroughput)

// Testing span with service name but no operation
span.References = []model.SpanRef{}
span.Process = &model.Process{
ServiceName: "A",
}
a.HandleRootSpan(span)
require.Empty(t, a.(*aggregator).currentThroughput)
require.Empty(t, a.currentThroughput)

// Testing span with service name and operation but no probabilistic sampling tags
span.OperationName = http.MethodGet
a.HandleRootSpan(span)
require.Empty(t, a.(*aggregator).currentThroughput)
assert.EqualValues(t, 1, a.currentThroughput["A"][http.MethodGet].Count)
delete(a.currentThroughput, "A")

// Testing span with service name, operation, and probabilistic sampling tags
span.Tags = model.KeyValues{
model.String("sampler.type", "probabilistic"),
model.String("sampler.param", "0.001"),
}
a.HandleRootSpan(span)
assert.EqualValues(t, 1, a.(*aggregator).currentThroughput["A"][http.MethodGet].Count)
}

func TestRecordThroughputFunc(t *testing.T) {
metricsFactory := metricstest.NewFactory(0)
mockStorage := &mocks.Store{}
mockEP := &epmocks.ElectionParticipant{}
logger := zap.NewNop()
testOpts := Options{
CalculationInterval: 1 * time.Second,
AggregationBuckets: 1,
BucketsForCalculation: 1,
}

a, err := NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage)
require.NoError(t, err)

// Testing non-root span
span := &model.Span{References: []model.SpanRef{{SpanID: model.NewSpanID(1), RefType: model.ChildOf}}}
a.HandleRootSpan(span)
require.Empty(t, a.(*aggregator).currentThroughput)

// Testing span with service name but no operation
span.References = []model.SpanRef{}
span.Process = &model.Process{
ServiceName: "A",
}
a.HandleRootSpan(span)
require.Empty(t, a.(*aggregator).currentThroughput)

// Testing span with service name and operation but no probabilistic sampling tags
span.OperationName = http.MethodGet
a.HandleRootSpan(span)
require.Empty(t, a.(*aggregator).currentThroughput)

// Testing span with service name, operation, and probabilistic sampling tags
span.Tags = model.KeyValues{
model.String("sampler.type", "probabilistic"),
model.String("sampler.param", "0.001"),
}
a.HandleRootSpan(span)
assert.EqualValues(t, 1, a.(*aggregator).currentThroughput["A"][http.MethodGet].Count)
assert.EqualValues(t, 1, a.currentThroughput["A"][http.MethodGet].Count)
}

func TestGetSamplerParams(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions internal/sampling/samplingstrategy/adaptive/floatutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import (
"strconv"
)

// TruncateFloat truncates float to 6 decimal positions and converts to string.
func TruncateFloat(v float64) string {
// truncateFloat truncates float to 6 decimal positions and converts to string.
func truncateFloat(v float64) string {
return strconv.FormatFloat(v, 'f', 6, 64)
}

// FloatEquals compares two floats with 10 decimal positions precision.
func FloatEquals(a, b float64) bool {
// floatEquals compares two floats with 10 decimal positions precision.
func floatEquals(a, b float64) bool {
return math.Abs(a-b) < 1e-10
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestTruncateFloat(t *testing.T) {
{prob: 0.10404450002098709, expected: "0.104045"},
}
for _, test := range tests {
assert.Equal(t, test.expected, TruncateFloat(test.prob))
assert.Equal(t, test.expected, truncateFloat(test.prob))
}
}

Expand All @@ -36,6 +36,6 @@ func TestFloatEquals(t *testing.T) {
{f1: 0.123456780000, f2: 0.123456781111, equal: false},
}
for _, test := range tests {
assert.Equal(t, test.equal, FloatEquals(test.f1, test.f2))
assert.Equal(t, test.equal, floatEquals(test.f1, test.f2))
}
}
20 changes: 20 additions & 0 deletions internal/sampling/samplingstrategy/adaptive/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,26 @@ type Options struct {
// FollowerLeaseRefreshInterval is the duration to sleep if this processor is a follower
// (ie. failed to gain the leader lock).
FollowerLeaseRefreshInterval time.Duration `mapstructure:"follower_lease_refresh_interval"`

// Use at your own risk! When this setting is enabled, the engine will not attempt
// to infer the actual sampling probability used in the SDKs and may cause a spike
// of trace volume under the conditions explained below.
//
// The original adaptive sampling logic was built to work with legacy Jaeger SDK
// which used to report via span tag when the probabilistic sampled was used and
// with which probability value. The sampler implementation in the OpenTelemetry
// SDKs do not include such span tags, which makes it impossible for the engine
// to verify if the adaptive sampling rates are being respected / used by the sampler.
// However, this validation is not critical to the engine's operation, as it was
// done as a protection measure against a situation when a non-adaptive sampler
// is used in the SDK with a very low probability, and the engine keeps trying
// to increase this probability and not seeing an expected change in the trace
// volume (aka throughput), which will eventually result in the calculated
// probability reaching 100%. This could present a danger if the SDK is then
// switched to respect adaptive sampling rate since it will drastically increase
// the volume of traces sampled and the engine will take a few minutes to react
// to that.
IgnoreSamplerTags bool
}

func DefaultOptions() Options {
Expand Down
19 changes: 11 additions & 8 deletions internal/sampling/samplingstrategy/adaptive/post_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
)

const (
maxSamplingProbability = 1.0

getThroughputErrMsg = "failed to get throughput from storage"

// The number of past entries for samplingCache the leader keeps in memory
Expand Down Expand Up @@ -330,8 +328,8 @@
}

func (p *PostAggregator) calculateProbability(service, operation string, qps float64) float64 {
// TODO: is this method overly expensive to run in a loop?
oldProbability := p.InitialSamplingProbability
// TODO: is this loop overly expensive?
p.RLock()
if opProbabilities, ok := p.probabilities[service]; ok {
if probability, ok := opProbabilities[operation]; ok {
Expand All @@ -353,14 +351,14 @@
return oldProbability
}
var newProbability float64
if FloatEquals(qps, 0) {
if floatEquals(qps, 0) {
// Edge case; we double the sampling probability if the QPS is 0 so that we force the service
// to at least sample one span probabilistically.
newProbability = oldProbability * 2.0
} else {
newProbability = p.probabilityCalculator.Calculate(p.TargetSamplesPerSecond, qps, oldProbability)
}
return math.Min(maxSamplingProbability, math.Max(p.MinSamplingProbability, newProbability))
return math.Min(1.0, math.Max(p.MinSamplingProbability, newProbability))
}

// is actual value within p.DeltaTolerance percentage of expected value.
Expand All @@ -382,14 +380,19 @@
operation string,
throughput serviceOperationThroughput,
) bool {
if FloatEquals(probability, p.InitialSamplingProbability) {
if p.IgnoreSamplerTags {
return true
}

Check warning on line 385 in internal/sampling/samplingstrategy/adaptive/post_aggregator.go

View check run for this annotation

Codecov / codecov/patch

internal/sampling/samplingstrategy/adaptive/post_aggregator.go#L384-L385

Added lines #L384 - L385 were not covered by tests
if floatEquals(probability, p.InitialSamplingProbability) {
// If the service is seen for the first time, assume it's using adaptive sampling (ie prob == initialProb).
// Even if this isn't the case, the next time around this loop, the newly calculated probability will not equal
// the initialProb so the logic will fall through.
return true
}
// if the previous probability can be found in the set of observed values
// in the latest time bucket then assume service is respecting adaptive sampling.
if opThroughput, ok := throughput.get(service, operation); ok {
f := TruncateFloat(probability)
f := truncateFloat(probability)
_, ok := opThroughput.Probabilities[f]
return ok
}
Expand All @@ -398,7 +401,7 @@
// before.
if len(p.serviceCache) > 1 {
if e := p.serviceCache[1].Get(service, operation); e != nil {
return e.UsingAdaptive && !FloatEquals(e.Probability, p.InitialSamplingProbability)
return e.UsingAdaptive && !floatEquals(e.Probability, p.InitialSamplingProbability)
}
}
return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package adaptive

import (
"errors"
"net/http"
"testing"
"time"
Expand Down Expand Up @@ -61,10 +60,6 @@ func testThroughputBuckets() []*throughputBucket {
}
}

func errTestStorage() error {
return errors.New("storage error")
}

func testCalculator() calculationstrategy.ProbabilityCalculator {
return calculationstrategy.CalculateFunc(func(targetQPS, qps, oldProbability float64) float64 {
factor := targetQPS / qps
Expand Down Expand Up @@ -131,7 +126,7 @@ func TestInitializeThroughput(t *testing.T) {
func TestInitializeThroughputFailure(t *testing.T) {
mockStorage := &smocks.Store{}
mockStorage.On("GetThroughput", time.Time{}.Add(time.Minute*19), time.Time{}.Add(time.Minute*20)).
Return(nil, errTestStorage())
Return(nil, assert.AnError)
p := &PostAggregator{storage: mockStorage, Options: Options{CalculationInterval: time.Minute, AggregationBuckets: 1}}
p.initializeThroughput(time.Time{}.Add(time.Minute * 20))

Expand Down Expand Up @@ -326,10 +321,10 @@ func TestRunCalculationLoop(t *testing.T) {
mockStorage := &smocks.Store{}
mockStorage.On("GetThroughput", mock.AnythingOfType("time.Time"), mock.AnythingOfType("time.Time")).
Return(testThroughputs(), nil)
mockStorage.On("GetLatestProbabilities").Return(model.ServiceOperationProbabilities{}, errTestStorage())
mockStorage.On("GetLatestProbabilities").Return(model.ServiceOperationProbabilities{}, assert.AnError)
mockStorage.On("InsertProbabilitiesAndQPS", mock.AnythingOfType("string"), mock.AnythingOfType("model.ServiceOperationProbabilities"),
mock.AnythingOfType("model.ServiceOperationQPS")).Return(errTestStorage())
mockStorage.On("InsertThroughput", mock.AnythingOfType("[]*model.Throughput")).Return(errTestStorage())
mock.AnythingOfType("model.ServiceOperationQPS")).Return(assert.AnError)
mockStorage.On("InsertThroughput", mock.AnythingOfType("[]*model.Throughput")).Return(assert.AnError)
mockEP := &epmocks.ElectionParticipant{}
mockEP.On("Start").Return(nil)
mockEP.On("Close").Return(nil)
Expand Down Expand Up @@ -372,11 +367,11 @@ func TestRunCalculationLoop_GetThroughputError(t *testing.T) {
logger, logBuffer := testutils.NewLogger()
mockStorage := &smocks.Store{}
mockStorage.On("GetThroughput", mock.AnythingOfType("time.Time"), mock.AnythingOfType("time.Time")).
Return(nil, errTestStorage())
mockStorage.On("GetLatestProbabilities").Return(model.ServiceOperationProbabilities{}, errTestStorage())
Return(nil, assert.AnError)
mockStorage.On("GetLatestProbabilities").Return(model.ServiceOperationProbabilities{}, assert.AnError)
mockStorage.On("InsertProbabilitiesAndQPS", mock.AnythingOfType("string"), mock.AnythingOfType("model.ServiceOperationProbabilities"),
mock.AnythingOfType("model.ServiceOperationQPS")).Return(errTestStorage())
mockStorage.On("InsertThroughput", mock.AnythingOfType("[]*model.Throughput")).Return(errTestStorage())
mock.AnythingOfType("model.ServiceOperationQPS")).Return(assert.AnError)
mockStorage.On("InsertThroughput", mock.AnythingOfType("[]*model.Throughput")).Return(assert.AnError)

mockEP := &epmocks.ElectionParticipant{}
mockEP.On("Start").Return(nil)
Expand Down
2 changes: 1 addition & 1 deletion internal/sampling/samplingstrategy/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type Aggregator interface {
io.Closer

// The HandleRootSpan function processes a span, checking if it's a root span.
// If it is, it extracts sampler parameters, then calls RecordThroughput.
// If it is, it records the throughput.
HandleRootSpan(span *model.Span)

// RecordThroughput records throughput for an operation for aggregation.
Expand Down
Loading