Skip to content

Commit b591926

Browse files
committed
Fix data race condition, concurrent writes to the err variable, causes UB
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 2d25853 commit b591926

File tree

7 files changed

+131
-12
lines changed

7 files changed

+131
-12
lines changed

.chloggen/fix-ub-proc-helper.yaml

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: processorhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Fix data race condition, concurrent writes to the err variable, causes UB
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [11350]
14+
15+
# Optional: The change log or logs in which this entry should be included.
16+
# e.g. '[user]' or '[user, api]'
17+
# Include 'user' if the change is relevant to end users.
18+
# Include 'api' if there is a change to a library API.
19+
# Default: '[user]'
20+
change_logs: [user]

processor/processorhelper/logs.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,14 @@ func NewLogs(
5151
span.AddEvent("Start processing.", eventOptions)
5252
recordsIn := ld.LogRecordCount()
5353

54-
ld, err = logsFunc(ctx, ld)
54+
var errFunc error
55+
ld, errFunc = logsFunc(ctx, ld)
5556
span.AddEvent("End processing.", eventOptions)
56-
if err != nil {
57-
if errors.Is(err, ErrSkipProcessingData) {
57+
if errFunc != nil {
58+
if errors.Is(errFunc, ErrSkipProcessingData) {
5859
return nil
5960
}
60-
return err
61+
return errFunc
6162
}
6263
recordsOut := ld.LogRecordCount()
6364
obs.recordInOut(ctx, recordsIn, recordsOut)

processor/processorhelper/logs_test.go

+32
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package processorhelper
66
import (
77
"context"
88
"errors"
9+
"sync"
910
"testing"
1011

1112
"github.com/stretchr/testify/assert"
@@ -70,6 +71,37 @@ func newTestLProcessor(retError error) ProcessLogsFunc {
7071
}
7172
}
7273

74+
func TestLogsConcurrency(t *testing.T) {
75+
logsFunc := func(_ context.Context, ld plog.Logs) (plog.Logs, error) {
76+
return ld, nil
77+
}
78+
79+
incomingLogs := plog.NewLogs()
80+
incomingLogRecords := incomingLogs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords()
81+
82+
// Add 3 records to the incoming
83+
incomingLogRecords.AppendEmpty()
84+
incomingLogRecords.AppendEmpty()
85+
incomingLogRecords.AppendEmpty()
86+
87+
lp, err := NewLogs(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), logsFunc)
88+
require.NoError(t, err)
89+
assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost()))
90+
91+
var wg sync.WaitGroup
92+
for i := 0; i < 10; i++ {
93+
wg.Add(1)
94+
go func() {
95+
defer wg.Done()
96+
for j := 0; j < 10000; j++ {
97+
assert.NoError(t, lp.ConsumeLogs(context.Background(), incomingLogs))
98+
}
99+
}()
100+
}
101+
wg.Wait()
102+
assert.NoError(t, lp.Shutdown(context.Background()))
103+
}
104+
73105
func TestLogs_RecordInOut(t *testing.T) {
74106
// Regardless of how many logs are ingested, emit just one
75107
mockAggregate := func(_ context.Context, _ plog.Logs) (plog.Logs, error) {

processor/processorhelper/metrics.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,14 @@ func NewMetrics(
5151
span.AddEvent("Start processing.", eventOptions)
5252
pointsIn := md.DataPointCount()
5353

54-
md, err = metricsFunc(ctx, md)
54+
var errFunc error
55+
md, errFunc = metricsFunc(ctx, md)
5556
span.AddEvent("End processing.", eventOptions)
56-
if err != nil {
57-
if errors.Is(err, ErrSkipProcessingData) {
57+
if errFunc != nil {
58+
if errors.Is(errFunc, ErrSkipProcessingData) {
5859
return nil
5960
}
60-
return err
61+
return errFunc
6162
}
6263
pointsOut := md.DataPointCount()
6364
obs.recordInOut(ctx, pointsIn, pointsOut)

processor/processorhelper/metrics_test.go

+31
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package processorhelper
66
import (
77
"context"
88
"errors"
9+
"sync"
910
"testing"
1011

1112
"github.com/stretchr/testify/assert"
@@ -70,6 +71,36 @@ func newTestMProcessor(retError error) ProcessMetricsFunc {
7071
}
7172
}
7273

74+
func TestMetricsConcurrency(t *testing.T) {
75+
metricsFunc := func(_ context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
76+
return md, nil
77+
}
78+
79+
incomingMetrics := pmetric.NewMetrics()
80+
dps := incomingMetrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints()
81+
82+
// Add 2 data points to the incoming
83+
dps.AppendEmpty()
84+
dps.AppendEmpty()
85+
86+
mp, err := NewMetrics(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), metricsFunc)
87+
require.NoError(t, err)
88+
assert.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost()))
89+
90+
var wg sync.WaitGroup
91+
for i := 0; i < 10; i++ {
92+
wg.Add(1)
93+
go func() {
94+
defer wg.Done()
95+
for j := 0; j < 10000; j++ {
96+
assert.NoError(t, mp.ConsumeMetrics(context.Background(), incomingMetrics))
97+
}
98+
}()
99+
}
100+
wg.Wait()
101+
assert.NoError(t, mp.Shutdown(context.Background()))
102+
}
103+
73104
func TestMetrics_RecordInOut(t *testing.T) {
74105
// Regardless of how many data points are ingested, emit 3
75106
mockAggregate := func(_ context.Context, _ pmetric.Metrics) (pmetric.Metrics, error) {

processor/processorhelper/traces.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,14 @@ func NewTraces(
5151
span.AddEvent("Start processing.", eventOptions)
5252
spansIn := td.SpanCount()
5353

54-
td, err = tracesFunc(ctx, td)
54+
var errFunc error
55+
td, errFunc = tracesFunc(ctx, td)
5556
span.AddEvent("End processing.", eventOptions)
56-
if err != nil {
57-
if errors.Is(err, ErrSkipProcessingData) {
57+
if errFunc != nil {
58+
if errors.Is(errFunc, ErrSkipProcessingData) {
5859
return nil
5960
}
60-
return err
61+
return errFunc
6162
}
6263
spansOut := td.SpanCount()
6364
obs.recordInOut(ctx, spansIn, spansOut)

processor/processorhelper/traces_test.go

+33
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package processorhelper
66
import (
77
"context"
88
"errors"
9+
"sync"
910
"testing"
1011

1112
"github.com/stretchr/testify/assert"
@@ -70,6 +71,38 @@ func newTestTProcessor(retError error) ProcessTracesFunc {
7071
}
7172
}
7273

74+
func TestTracesConcurrency(t *testing.T) {
75+
tracesFunc := func(_ context.Context, td ptrace.Traces) (ptrace.Traces, error) {
76+
return td, nil
77+
}
78+
79+
incomingTraces := ptrace.NewTraces()
80+
incomingSpans := incomingTraces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans()
81+
82+
// Add 4 records to the incoming
83+
incomingSpans.AppendEmpty()
84+
incomingSpans.AppendEmpty()
85+
incomingSpans.AppendEmpty()
86+
incomingSpans.AppendEmpty()
87+
88+
mp, err := NewTraces(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), tracesFunc)
89+
require.NoError(t, err)
90+
assert.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost()))
91+
92+
var wg sync.WaitGroup
93+
for i := 0; i < 10; i++ {
94+
wg.Add(1)
95+
go func() {
96+
defer wg.Done()
97+
for j := 0; j < 10000; j++ {
98+
assert.NoError(t, mp.ConsumeTraces(context.Background(), incomingTraces))
99+
}
100+
}()
101+
}
102+
wg.Wait()
103+
assert.NoError(t, mp.Shutdown(context.Background()))
104+
}
105+
73106
func TestTraces_RecordInOut(t *testing.T) {
74107
// Regardless of how many spans are ingested, emit just one
75108
mockAggregate := func(_ context.Context, _ ptrace.Traces) (ptrace.Traces, error) {

0 commit comments

Comments
 (0)