Skip to content

Commit de80d20

Browse files
bogdandrutujackgopack4
authored andcommitted
Fix data race condition, concurrent writes to the err variable, causes Undefined Behavior (open-telemetry#11349)
The main issue is that after open-telemetry#10910 the err variable is shared between requests because it uses the same address as the err defined outside the func. This is an UB because we are overwriting memory and will cause crashes like open-telemetry#11335, where the check for not nil happens then gets overwrite with nil and crashes. Fixes open-telemetry#11350 --------- Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 8580444 commit de80d20

File tree

7 files changed

+131
-12
lines changed

7 files changed

+131
-12
lines changed

.chloggen/fix-ub-proc-helper.yaml

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: processorhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Fix data race condition, concurrent writes to the err variable, causes UB (Undefined Behavior)
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [11350]
14+
15+
# Optional: The change log or logs in which this entry should be included.
16+
# e.g. '[user]' or '[user, api]'
17+
# Include 'user' if the change is relevant to end users.
18+
# Include 'api' if there is a change to a library API.
19+
# Default: '[user]'
20+
change_logs: [user]

processor/processorhelper/logs.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,14 @@ func NewLogs(
5151
span.AddEvent("Start processing.", eventOptions)
5252
recordsIn := ld.LogRecordCount()
5353

54-
ld, err = logsFunc(ctx, ld)
54+
var errFunc error
55+
ld, errFunc = logsFunc(ctx, ld)
5556
span.AddEvent("End processing.", eventOptions)
56-
if err != nil {
57-
if errors.Is(err, ErrSkipProcessingData) {
57+
if errFunc != nil {
58+
if errors.Is(errFunc, ErrSkipProcessingData) {
5859
return nil
5960
}
60-
return err
61+
return errFunc
6162
}
6263
recordsOut := ld.LogRecordCount()
6364
obs.recordInOut(ctx, recordsIn, recordsOut)

processor/processorhelper/logs_test.go

+32
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package processorhelper
66
import (
77
"context"
88
"errors"
9+
"sync"
910
"testing"
1011

1112
"github.com/stretchr/testify/assert"
@@ -70,6 +71,37 @@ func newTestLProcessor(retError error) ProcessLogsFunc {
7071
}
7172
}
7273

74+
func TestLogsConcurrency(t *testing.T) {
75+
logsFunc := func(_ context.Context, ld plog.Logs) (plog.Logs, error) {
76+
return ld, nil
77+
}
78+
79+
incomingLogs := plog.NewLogs()
80+
incomingLogRecords := incomingLogs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords()
81+
82+
// Add 3 records to the incoming
83+
incomingLogRecords.AppendEmpty()
84+
incomingLogRecords.AppendEmpty()
85+
incomingLogRecords.AppendEmpty()
86+
87+
lp, err := NewLogs(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), logsFunc)
88+
require.NoError(t, err)
89+
assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost()))
90+
91+
var wg sync.WaitGroup
92+
for i := 0; i < 10; i++ {
93+
wg.Add(1)
94+
go func() {
95+
defer wg.Done()
96+
for j := 0; j < 10000; j++ {
97+
assert.NoError(t, lp.ConsumeLogs(context.Background(), incomingLogs))
98+
}
99+
}()
100+
}
101+
wg.Wait()
102+
assert.NoError(t, lp.Shutdown(context.Background()))
103+
}
104+
73105
func TestLogs_RecordInOut(t *testing.T) {
74106
// Regardless of how many logs are ingested, emit just one
75107
mockAggregate := func(_ context.Context, _ plog.Logs) (plog.Logs, error) {

processor/processorhelper/metrics.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,14 @@ func NewMetrics(
5151
span.AddEvent("Start processing.", eventOptions)
5252
pointsIn := md.DataPointCount()
5353

54-
md, err = metricsFunc(ctx, md)
54+
var errFunc error
55+
md, errFunc = metricsFunc(ctx, md)
5556
span.AddEvent("End processing.", eventOptions)
56-
if err != nil {
57-
if errors.Is(err, ErrSkipProcessingData) {
57+
if errFunc != nil {
58+
if errors.Is(errFunc, ErrSkipProcessingData) {
5859
return nil
5960
}
60-
return err
61+
return errFunc
6162
}
6263
pointsOut := md.DataPointCount()
6364
obs.recordInOut(ctx, pointsIn, pointsOut)

processor/processorhelper/metrics_test.go

+31
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package processorhelper
66
import (
77
"context"
88
"errors"
9+
"sync"
910
"testing"
1011

1112
"github.com/stretchr/testify/assert"
@@ -70,6 +71,36 @@ func newTestMProcessor(retError error) ProcessMetricsFunc {
7071
}
7172
}
7273

74+
func TestMetricsConcurrency(t *testing.T) {
75+
metricsFunc := func(_ context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
76+
return md, nil
77+
}
78+
79+
incomingMetrics := pmetric.NewMetrics()
80+
dps := incomingMetrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints()
81+
82+
// Add 2 data points to the incoming
83+
dps.AppendEmpty()
84+
dps.AppendEmpty()
85+
86+
mp, err := NewMetrics(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), metricsFunc)
87+
require.NoError(t, err)
88+
assert.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost()))
89+
90+
var wg sync.WaitGroup
91+
for i := 0; i < 10; i++ {
92+
wg.Add(1)
93+
go func() {
94+
defer wg.Done()
95+
for j := 0; j < 10000; j++ {
96+
assert.NoError(t, mp.ConsumeMetrics(context.Background(), incomingMetrics))
97+
}
98+
}()
99+
}
100+
wg.Wait()
101+
assert.NoError(t, mp.Shutdown(context.Background()))
102+
}
103+
73104
func TestMetrics_RecordInOut(t *testing.T) {
74105
// Regardless of how many data points are ingested, emit 3
75106
mockAggregate := func(_ context.Context, _ pmetric.Metrics) (pmetric.Metrics, error) {

processor/processorhelper/traces.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,14 @@ func NewTraces(
5151
span.AddEvent("Start processing.", eventOptions)
5252
spansIn := td.SpanCount()
5353

54-
td, err = tracesFunc(ctx, td)
54+
var errFunc error
55+
td, errFunc = tracesFunc(ctx, td)
5556
span.AddEvent("End processing.", eventOptions)
56-
if err != nil {
57-
if errors.Is(err, ErrSkipProcessingData) {
57+
if errFunc != nil {
58+
if errors.Is(errFunc, ErrSkipProcessingData) {
5859
return nil
5960
}
60-
return err
61+
return errFunc
6162
}
6263
spansOut := td.SpanCount()
6364
obs.recordInOut(ctx, spansIn, spansOut)

processor/processorhelper/traces_test.go

+33
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package processorhelper
66
import (
77
"context"
88
"errors"
9+
"sync"
910
"testing"
1011

1112
"github.com/stretchr/testify/assert"
@@ -70,6 +71,38 @@ func newTestTProcessor(retError error) ProcessTracesFunc {
7071
}
7172
}
7273

74+
func TestTracesConcurrency(t *testing.T) {
75+
tracesFunc := func(_ context.Context, td ptrace.Traces) (ptrace.Traces, error) {
76+
return td, nil
77+
}
78+
79+
incomingTraces := ptrace.NewTraces()
80+
incomingSpans := incomingTraces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans()
81+
82+
// Add 4 records to the incoming
83+
incomingSpans.AppendEmpty()
84+
incomingSpans.AppendEmpty()
85+
incomingSpans.AppendEmpty()
86+
incomingSpans.AppendEmpty()
87+
88+
mp, err := NewTraces(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), tracesFunc)
89+
require.NoError(t, err)
90+
assert.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost()))
91+
92+
var wg sync.WaitGroup
93+
for i := 0; i < 10; i++ {
94+
wg.Add(1)
95+
go func() {
96+
defer wg.Done()
97+
for j := 0; j < 10000; j++ {
98+
assert.NoError(t, mp.ConsumeTraces(context.Background(), incomingTraces))
99+
}
100+
}()
101+
}
102+
wg.Wait()
103+
assert.NoError(t, mp.Shutdown(context.Background()))
104+
}
105+
73106
func TestTraces_RecordInOut(t *testing.T) {
74107
// Regardless of how many spans are ingested, emit just one
75108
mockAggregate := func(_ context.Context, _ ptrace.Traces) (ptrace.Traces, error) {

0 commit comments

Comments
 (0)