From 6f2862b291b1cae93e7d6edfd47ca89b023eb186 Mon Sep 17 00:00:00 2001 From: Sean Porter Date: Fri, 25 Apr 2025 12:48:10 -0700 Subject: [PATCH 1/2] drop policy type Signed-off-by: Sean Porter --- processor/tailsamplingprocessor/config.go | 9 ++ .../tailsamplingprocessor/drop_helper.go | 28 +++++ .../tailsamplingprocessor/drop_helper_test.go | 51 ++++++++ .../internal/sampling/and.go | 6 - .../internal/sampling/composite.go | 18 --- .../internal/sampling/drop.go | 44 +++++++ .../internal/sampling/drop_test.go | 114 ++++++++++++++++++ .../internal/sampling/policy.go | 4 +- processor/tailsamplingprocessor/processor.go | 14 ++- 9 files changed, 260 insertions(+), 28 deletions(-) create mode 100644 processor/tailsamplingprocessor/drop_helper.go create mode 100644 processor/tailsamplingprocessor/drop_helper_test.go create mode 100644 processor/tailsamplingprocessor/internal/sampling/drop.go create mode 100644 processor/tailsamplingprocessor/internal/sampling/drop_test.go diff --git a/processor/tailsamplingprocessor/config.go b/processor/tailsamplingprocessor/config.go index 2342e8fbe997..b4c884c38659 100644 --- a/processor/tailsamplingprocessor/config.go +++ b/processor/tailsamplingprocessor/config.go @@ -33,6 +33,8 @@ const ( Composite PolicyType = "composite" // And allows defining a And policy, combining the other policies in one And PolicyType = "and" + // Drop allows defining a Drop policy, combining one or more policies to drop traces. + Drop PolicyType = "drop" // SpanCount sample traces that are have more spans per Trace than a given threshold. SpanCount PolicyType = "span_count" // TraceState sample traces with specified values by the given key @@ -100,6 +102,11 @@ type AndCfg struct { SubPolicyCfg []AndSubPolicyCfg `mapstructure:"and_sub_policy"` } +// DropCfg holds the common configuration to all policies under drop policy. +type DropCfg struct { + SubPolicyCfg []AndSubPolicyCfg `mapstructure:"drop_sub_policy"` +} + // CompositeCfg holds the configurable settings to create a composite // sampling policy evaluator. type CompositeCfg struct { @@ -123,6 +130,8 @@ type PolicyCfg struct { CompositeCfg CompositeCfg `mapstructure:"composite"` // Configs for defining and policy AndCfg AndCfg `mapstructure:"and"` + // Configs for defining drop policy + DropCfg DropCfg `mapstructure:"drop"` } // LatencyCfg holds the configurable settings to create a latency filter sampling policy diff --git a/processor/tailsamplingprocessor/drop_helper.go b/processor/tailsamplingprocessor/drop_helper.go new file mode 100644 index 000000000000..5fbd0f7f00b7 --- /dev/null +++ b/processor/tailsamplingprocessor/drop_helper.go @@ -0,0 +1,28 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package tailsamplingprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor" + +import ( + "go.opentelemetry.io/collector/component" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling" +) + +func getNewDropPolicy(settings component.TelemetrySettings, config *DropCfg) (sampling.PolicyEvaluator, error) { + subPolicyEvaluators := make([]sampling.PolicyEvaluator, len(config.SubPolicyCfg)) + for i := range config.SubPolicyCfg { + policyCfg := &config.SubPolicyCfg[i] + policy, err := getDropSubPolicyEvaluator(settings, policyCfg) + if err != nil { + return nil, err + } + subPolicyEvaluators[i] = policy + } + return sampling.NewDrop(settings.Logger, subPolicyEvaluators), nil +} + +// Return instance of and sub-policy +func getDropSubPolicyEvaluator(settings component.TelemetrySettings, cfg *AndSubPolicyCfg) (sampling.PolicyEvaluator, error) { + return getSharedPolicyEvaluator(settings, &cfg.sharedPolicyCfg) +} diff --git a/processor/tailsamplingprocessor/drop_helper_test.go b/processor/tailsamplingprocessor/drop_helper_test.go new file mode 100644 index 000000000000..898824dbcee1 --- /dev/null +++ b/processor/tailsamplingprocessor/drop_helper_test.go @@ -0,0 +1,51 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package tailsamplingprocessor + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling" +) + +func TestDropHelper(t *testing.T) { + t.Run("valid", func(t *testing.T) { + actual, err := getNewDropPolicy(componenttest.NewNopTelemetrySettings(), &DropCfg{ + SubPolicyCfg: []AndSubPolicyCfg{ + { + sharedPolicyCfg: sharedPolicyCfg{ + Name: "test-and-policy-1", + Type: Latency, + LatencyCfg: LatencyCfg{ThresholdMs: 100}, + }, + }, + }, + }) + require.NoError(t, err) + + expected := sampling.NewDrop(zap.NewNop(), []sampling.PolicyEvaluator{ + sampling.NewLatency(componenttest.NewNopTelemetrySettings(), 100, 0), + }) + assert.Equal(t, expected, actual) + }) + + t.Run("unsupported sampling policy type", func(t *testing.T) { + _, err := getNewDropPolicy(componenttest.NewNopTelemetrySettings(), &DropCfg{ + SubPolicyCfg: []AndSubPolicyCfg{ + { + sharedPolicyCfg: sharedPolicyCfg{ + Name: "test-and-policy-2", + Type: Drop, // nested drop is not allowed + }, + }, + }, + }) + require.EqualError(t, err, "unknown sampling policy type drop") + }) +} diff --git a/processor/tailsamplingprocessor/internal/sampling/and.go b/processor/tailsamplingprocessor/internal/sampling/and.go index 0be2a52e60f7..98a910c47d64 100644 --- a/processor/tailsamplingprocessor/internal/sampling/and.go +++ b/processor/tailsamplingprocessor/internal/sampling/and.go @@ -41,9 +41,3 @@ func (c *And) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace *Trac } return Sampled, nil } - -// OnDroppedSpans is called when the trace needs to be dropped, due to memory -// pressure, before the decision_wait time has been reached. -func (c *And) OnDroppedSpans(pcommon.TraceID, *TraceData) (Decision, error) { - return Sampled, nil -} diff --git a/processor/tailsamplingprocessor/internal/sampling/composite.go b/processor/tailsamplingprocessor/internal/sampling/composite.go index a88d32e3df31..c28c1935864e 100644 --- a/processor/tailsamplingprocessor/internal/sampling/composite.go +++ b/processor/tailsamplingprocessor/internal/sampling/composite.go @@ -132,21 +132,3 @@ func (c *Composite) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace return NotSampled, nil } - -// OnDroppedSpans is called when the trace needs to be dropped, due to memory -// pressure, before the decision_wait time has been reached. -func (c *Composite) OnDroppedSpans(pcommon.TraceID, *TraceData) (Decision, error) { - // Here we have a number of possible solutions: - // 1. Random sample traces based on maxTotalSPS. - // 2. Perform full composite sampling logic by calling Composite.Evaluate(), essentially - // using partial trace data for sampling. - // 3. Sample everything. - // - // It seems that #2 may be the best choice from end user perspective, but - // it is not certain and it is also additional performance penalty when we are - // already under a memory (and possibly CPU) pressure situation. - // - // For now we are playing safe and go with #3. Investigating alternate options - // should be a future task. - return Sampled, nil -} diff --git a/processor/tailsamplingprocessor/internal/sampling/drop.go b/processor/tailsamplingprocessor/internal/sampling/drop.go new file mode 100644 index 000000000000..6af8a20ad67f --- /dev/null +++ b/processor/tailsamplingprocessor/internal/sampling/drop.go @@ -0,0 +1,44 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sampling // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling" + +import ( + "context" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.uber.org/zap" +) + +type Drop struct { + // the subpolicy evaluators + subpolicies []PolicyEvaluator + logger *zap.Logger +} + +func NewDrop( + logger *zap.Logger, + subpolicies []PolicyEvaluator, +) PolicyEvaluator { + return &Drop{ + subpolicies: subpolicies, + logger: logger, + } +} + +// Evaluate looks at the trace data and returns a corresponding SamplingDecision. +func (c *Drop) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace *TraceData) (Decision, error) { + // The policy iterates over all sub-policies and returns Dropped if all + // sub-policies returned a Sampled Decision. If any subpolicy returns + // NotSampled, it returns NotSampled Decision. + for _, sub := range c.subpolicies { + decision, err := sub.Evaluate(ctx, traceID, trace) + if err != nil { + return Unspecified, err + } + if decision == NotSampled || decision == InvertNotSampled { + return NotSampled, nil + } + } + return Dropped, nil +} diff --git a/processor/tailsamplingprocessor/internal/sampling/drop_test.go b/processor/tailsamplingprocessor/internal/sampling/drop_test.go new file mode 100644 index 000000000000..39e50f8f6ee7 --- /dev/null +++ b/processor/tailsamplingprocessor/internal/sampling/drop_test.go @@ -0,0 +1,114 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sampling + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" +) + +func TestDropEvaluatorNotSampled(t *testing.T) { + n1 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "name", []string{"value"}, false, 0, false) + n2, err := NewStatusCodeFilter(componenttest.NewNopTelemetrySettings(), []string{"ERROR"}) + require.NoError(t, err) + + and := NewDrop(zap.NewNop(), []PolicyEvaluator{n1, n2}) + + traces := ptrace.NewTraces() + rs := traces.ResourceSpans().AppendEmpty() + ils := rs.ScopeSpans().AppendEmpty() + + span := ils.Spans().AppendEmpty() + span.Status().SetCode(ptrace.StatusCodeError) + span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) + span.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}) + + trace := &TraceData{ + ReceivedBatches: traces, + } + decision, err := and.Evaluate(context.Background(), traceID, trace) + require.NoError(t, err, "Failed to evaluate and policy: %v", err) + assert.Equal(t, NotSampled, decision) +} + +func TestDropEvaluatorSampled(t *testing.T) { + n1 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "attribute_name", []string{"attribute_value"}, false, 0, false) + n2, err := NewStatusCodeFilter(componenttest.NewNopTelemetrySettings(), []string{"ERROR"}) + require.NoError(t, err) + + and := NewDrop(zap.NewNop(), []PolicyEvaluator{n1, n2}) + + traces := ptrace.NewTraces() + rs := traces.ResourceSpans().AppendEmpty() + ils := rs.ScopeSpans().AppendEmpty() + + span := ils.Spans().AppendEmpty() + span.Attributes().PutStr("attribute_name", "attribute_value") + span.Status().SetCode(ptrace.StatusCodeError) + span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) + span.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}) + + trace := &TraceData{ + ReceivedBatches: traces, + } + decision, err := and.Evaluate(context.Background(), traceID, trace) + require.NoError(t, err, "Failed to evaluate and policy: %v", err) + assert.Equal(t, Dropped, decision) +} + +func TestDropEvaluatorStringInvertMatch(t *testing.T) { + n1 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "attribute_name", []string{"no_match"}, false, 0, true) + n2, err := NewStatusCodeFilter(componenttest.NewNopTelemetrySettings(), []string{"ERROR"}) + require.NoError(t, err) + + and := NewDrop(zap.NewNop(), []PolicyEvaluator{n1, n2}) + + traces := ptrace.NewTraces() + rs := traces.ResourceSpans().AppendEmpty() + ils := rs.ScopeSpans().AppendEmpty() + + span := ils.Spans().AppendEmpty() + span.Attributes().PutStr("attribute_name", "attribute_value") + span.Status().SetCode(ptrace.StatusCodeError) + span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) + span.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}) + + trace := &TraceData{ + ReceivedBatches: traces, + } + decision, err := and.Evaluate(context.Background(), traceID, trace) + require.NoError(t, err, "Failed to evaluate and policy: %v", err) + assert.Equal(t, Dropped, decision) +} + +func TestDropEvaluatorStringInvertNotMatch(t *testing.T) { + n1 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "attribute_name", []string{"attribute_value"}, false, 0, true) + n2, err := NewStatusCodeFilter(componenttest.NewNopTelemetrySettings(), []string{"ERROR"}) + require.NoError(t, err) + + and := NewDrop(zap.NewNop(), []PolicyEvaluator{n1, n2}) + + traces := ptrace.NewTraces() + rs := traces.ResourceSpans().AppendEmpty() + ils := rs.ScopeSpans().AppendEmpty() + + span := ils.Spans().AppendEmpty() + span.Attributes().PutStr("attribute_name", "attribute_value") + span.Status().SetCode(ptrace.StatusCodeError) + span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) + span.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}) + + trace := &TraceData{ + ReceivedBatches: traces, + } + decision, err := and.Evaluate(context.Background(), traceID, trace) + require.NoError(t, err, "Failed to evaluate and policy: %v", err) + assert.Equal(t, NotSampled, decision) +} diff --git a/processor/tailsamplingprocessor/internal/sampling/policy.go b/processor/tailsamplingprocessor/internal/sampling/policy.go index 9441e8cfb6f0..d9d98f4744aa 100644 --- a/processor/tailsamplingprocessor/internal/sampling/policy.go +++ b/processor/tailsamplingprocessor/internal/sampling/policy.go @@ -42,8 +42,8 @@ const ( // NotSampled is used to indicate that the decision was already taken // to not sample the data. NotSampled - // Dropped is used when data needs to be purged before the sampling policy - // had a chance to evaluate it. + // Dropped is used to indicate that a trace should be dropped regardless of + // all other decisions. Dropped // Error is used to indicate that policy evaluation was not succeeded. Error diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index fa0d06e1814f..ccfc3a522499 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -82,6 +82,7 @@ var ( sampling.NotSampled: attrSampledFalse, sampling.InvertNotSampled: attrSampledFalse, sampling.InvertSampled: attrSampledTrue, + sampling.Dropped: attrSampledFalse, } ) @@ -201,6 +202,8 @@ func getPolicyEvaluator(settings component.TelemetrySettings, cfg *PolicyCfg) (s return getNewCompositePolicy(settings, &cfg.CompositeCfg) case And: return getNewAndPolicy(settings, &cfg.AndCfg) + case Drop: + return getNewDropPolicy(settings, &cfg.DropCfg) default: return getSharedPolicyEvaluator(settings, &cfg.sharedPolicyCfg) } @@ -391,6 +394,7 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa sampling.NotSampled: nil, sampling.InvertSampled: nil, sampling.InvertNotSampled: nil, + sampling.Dropped: nil, } ctx := context.Background() @@ -421,13 +425,19 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa if samplingDecisions[decision] == nil { samplingDecisions[decision] = p } + + // Break early if dropped. This can drastically reduce tick/decision latency. + if decision == sampling.Dropped { + break + } } var sampledPolicy *policy - // InvertNotSampled takes precedence over any other decision switch { - case samplingDecisions[sampling.InvertNotSampled] != nil: + case samplingDecisions[sampling.Dropped] != nil: // Dropped takes precedence + finalDecision = sampling.NotSampled + case samplingDecisions[sampling.InvertNotSampled] != nil: // Then InvertNotSampled finalDecision = sampling.NotSampled case samplingDecisions[sampling.Sampled] != nil: finalDecision = sampling.Sampled From 6507e43c519d06fa0dc69e83515282cc4f87111f Mon Sep 17 00:00:00 2001 From: Sean Porter Date: Fri, 25 Apr 2025 13:11:18 -0700 Subject: [PATCH 2/2] drop policy changelog entry Signed-off-by: Sean Porter --- ...amplingprocessor-new-drop-policy-type.yaml | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 .chloggen/tailsamplingprocessor-new-drop-policy-type.yaml diff --git a/.chloggen/tailsamplingprocessor-new-drop-policy-type.yaml b/.chloggen/tailsamplingprocessor-new-drop-policy-type.yaml new file mode 100644 index 000000000000..29868c849bff --- /dev/null +++ b/.chloggen/tailsamplingprocessor-new-drop-policy-type.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: processor/tailsampling + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: New policy type to explicitly drop traces regardless of other policy decisions. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [39668] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: []