Skip to content

Commit 9e5fcdd

Browse files
bacherflandrzej-stencel
authored andcommitted
[pkg/stanza] make log emitter and entry conversion in adapter synchronous (open-telemetry#35669)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This PR changes the `LogEmitter` to accept a synchronous consumer callback function for processing a batch of log entries as an alternative to sending log entry batches to a channel. The components that use the `LogEmitter` (adapter and parser) have been adapted accordingly. Also, in the case of the adapter, the log entries are converted directly, rather than sending them over a channel to the converter and receiving the converted results over a different channel. <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes open-telemetry#35453 <!--Describe what testing was performed and which tests were added.--> #### Testing I did some initial performance tests using the `TestLogLargeFiles` load test to see how this change affects the performance. Below are the results: **Before the change (i.e. with async log entry batch processing)** ``` === RUN TestLogLargeFiles/filelog-largefiles-2Gb-lifetime 2024/10/08 09:02:53 | Sent:17,769,795 logs (179,507/sec) | Received:17,755,188 items (179,346/sec) === RUN TestLogLargeFiles/filelog-largefiles-6GB-lifetime 2024/10/08 09:06:29 | Sent:42,857,755 logs (216,465/sec) | Received:42,851,987 items (216,424/sec) Test |Result|Duration|CPU Avg%|CPU Max%|RAM Avg MiB|RAM Max MiB|Sent Items|Received Items| ---------------------------------------------|------|-------:|-------:|-------:|----------:|----------:|---------:|-------------:| LogLargeFiles/filelog-largefiles-2Gb-lifetime|PASS | 100s| 73.1| 78.4| 106| 118| 18249451| 18249451| LogLargeFiles/filelog-largefiles-6GB-lifetime|PASS | 200s| 87.5| 98.1| 110| 116| 44358460| 44358460| ``` **After the change (i.e. with sync log entry batch processing)** ``` === RUN TestLogLargeFiles/filelog-largefiles-2Gb-lifetime 2024/10/08 10:09:51 Agent RAM (RES): 139 MiB, CPU:71.7% | Sent:17,802,561 logs (179,836/sec) | Received:17,788,273 items (179,680/sec) === RUN TestLogLargeFiles/filelog-largefiles-6GB-lifetime 2024/10/08 10:05:15 Agent RAM (RES): 140 MiB, CPU:95.6% | Sent:42,912,030 logs (216,744/sec) | Received:42,904,306 items (216,689/sec) Test |Result|Duration|CPU Avg%|CPU Max%|RAM Avg MiB|RAM Max MiB|Sent Items|Received Items| ---------------------------------------------|------|-------:|-------:|-------:|----------:|----------:|---------:|-------------:| LogLargeFiles/filelog-largefiles-2Gb-lifetime|PASS | 100s| 74.8| 78.9| 127| 139| 17984687| 17984687| LogLargeFiles/filelog-largefiles-6GB-lifetime|PASS | 200s| 89.3| 100.9| 134| 140| 43376210| 43376210| ``` Those results seem to indicate comparable throughput, but with an increased resource consumption, especially in terms of memory. I also did a test comparing the performance between the synchronous and asynchronous log emitter using the same methodology as in open-telemetry#35454. The results were the following, and indicate an increase in the time it takes for reading the generated log file (see open-telemetry#35454 for details on how the file is generated and the test execution): - Async Log Emitter: ~8s - Sync Log Emitter: ~12s <details> <summary>output-async.log</summary> === Step 3: Thu Oct 10 08:54:23 CEST 2024 otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="3744d4cb-5080-427c-8c16-a96cb40a57d4",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""} 2.209674e+06 === Step 4: Thu Oct 10 08:54:25 CEST 2024 otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="3744d4cb-5080-427c-8c16-a96cb40a57d4",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""} 5.428103e+06 === Step 5: Thu Oct 10 08:54:26 CEST 2024 otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="3744d4cb-5080-427c-8c16-a96cb40a57d4",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""} 7.337017e+06 === Step 6: Thu Oct 10 08:54:27 CEST 2024 otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="3744d4cb-5080-427c-8c16-a96cb40a57d4",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""} 9.258843e+06 === Step 7: Thu Oct 10 08:54:29 CEST 2024 otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="3744d4cb-5080-427c-8c16-a96cb40a57d4",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""} 1.3082428e+07 === Step 8: Thu Oct 10 08:54:31 CEST 2024 otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="3744d4cb-5080-427c-8c16-a96cb40a57d4",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""} 1.6519068e+07 </details> <details> <summary>output-sync.log</summary> === Step 2: Thu Oct 10 08:51:27 CEST 2024 otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="dcf5371b-19eb-47b3-a820-756c1832b448",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""} 1.580891e+06 === Step 3: Thu Oct 10 08:51:28 CEST 2024 otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="dcf5371b-19eb-47b3-a820-756c1832b448",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""} 3.01034e+06 === Step 4: Thu Oct 10 08:51:29 CEST 2024 otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="dcf5371b-19eb-47b3-a820-756c1832b448",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""} 4.434627e+06 === Step 5: Thu Oct 10 08:51:31 CEST 2024 otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="dcf5371b-19eb-47b3-a820-756c1832b448",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""} 7.416612e+06 === Step 6: Thu Oct 10 08:51:34 CEST 2024 otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="dcf5371b-19eb-47b3-a820-756c1832b448",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""} 1.0496072e+07 === Step 7: Thu Oct 10 08:51:36 CEST 2024 otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="dcf5371b-19eb-47b3-a820-756c1832b448",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""} 1.3523882e+07 === Step 8: Thu Oct 10 08:51:37 CEST 2024 otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="dcf5371b-19eb-47b3-a820-756c1832b448",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""} 1.4929707e+07 === Step 9: Thu Oct 10 08:51:39 CEST 2024 otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="dcf5371b-19eb-47b3-a820-756c1832b448",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""} 1.6519105e+07 </details> --------- Signed-off-by: Florian Bacher <[email protected]> Co-authored-by: Andrzej Stencel <[email protected]>
1 parent 8703946 commit 9e5fcdd

File tree

12 files changed

+283
-377
lines changed

12 files changed

+283
-377
lines changed
+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: pkg/stanza
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Synchronous handling of entries passed from the log emitter to the receiver adapter
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [35453]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

pkg/stanza/adapter/converter.go

+28-26
Original file line numberDiff line numberDiff line change
@@ -155,44 +155,46 @@ func (c *Converter) workerLoop() {
155155
defer c.wg.Done()
156156

157157
for entries := range c.workerChan {
158+
// Send plogs directly to flushChan
159+
c.flushChan <- ConvertEntries(entries)
160+
}
161+
}
158162

159-
resourceHashToIdx := make(map[uint64]int)
160-
scopeIdxByResource := make(map[uint64]map[string]int)
163+
func ConvertEntries(entries []*entry.Entry) plog.Logs {
164+
resourceHashToIdx := make(map[uint64]int)
165+
scopeIdxByResource := make(map[uint64]map[string]int)
161166

162-
pLogs := plog.NewLogs()
163-
var sl plog.ScopeLogs
167+
pLogs := plog.NewLogs()
168+
var sl plog.ScopeLogs
164169

165-
for _, e := range entries {
166-
resourceID := HashResource(e.Resource)
167-
var rl plog.ResourceLogs
170+
for _, e := range entries {
171+
resourceID := HashResource(e.Resource)
172+
var rl plog.ResourceLogs
168173

169-
resourceIdx, ok := resourceHashToIdx[resourceID]
170-
if !ok {
171-
resourceHashToIdx[resourceID] = pLogs.ResourceLogs().Len()
174+
resourceIdx, ok := resourceHashToIdx[resourceID]
175+
if !ok {
176+
resourceHashToIdx[resourceID] = pLogs.ResourceLogs().Len()
172177

173-
rl = pLogs.ResourceLogs().AppendEmpty()
174-
upsertToMap(e.Resource, rl.Resource().Attributes())
178+
rl = pLogs.ResourceLogs().AppendEmpty()
179+
upsertToMap(e.Resource, rl.Resource().Attributes())
175180

176-
scopeIdxByResource[resourceID] = map[string]int{e.ScopeName: 0}
181+
scopeIdxByResource[resourceID] = map[string]int{e.ScopeName: 0}
182+
sl = rl.ScopeLogs().AppendEmpty()
183+
sl.Scope().SetName(e.ScopeName)
184+
} else {
185+
rl = pLogs.ResourceLogs().At(resourceIdx)
186+
scopeIdxInResource, ok := scopeIdxByResource[resourceID][e.ScopeName]
187+
if !ok {
188+
scopeIdxByResource[resourceID][e.ScopeName] = rl.ScopeLogs().Len()
177189
sl = rl.ScopeLogs().AppendEmpty()
178190
sl.Scope().SetName(e.ScopeName)
179191
} else {
180-
rl = pLogs.ResourceLogs().At(resourceIdx)
181-
scopeIdxInResource, ok := scopeIdxByResource[resourceID][e.ScopeName]
182-
if !ok {
183-
scopeIdxByResource[resourceID][e.ScopeName] = rl.ScopeLogs().Len()
184-
sl = rl.ScopeLogs().AppendEmpty()
185-
sl.Scope().SetName(e.ScopeName)
186-
} else {
187-
sl = pLogs.ResourceLogs().At(resourceIdx).ScopeLogs().At(scopeIdxInResource)
188-
}
192+
sl = pLogs.ResourceLogs().At(resourceIdx).ScopeLogs().At(scopeIdxInResource)
189193
}
190-
convertInto(e, sl.LogRecords().AppendEmpty())
191194
}
192-
193-
// Send plogs directly to flushChan
194-
c.flushChan <- pLogs
195+
convertInto(e, sl.LogRecords().AppendEmpty())
195196
}
197+
return pLogs
196198
}
197199

198200
func (c *Converter) flushLoop() {

pkg/stanza/adapter/factory.go

+21-23
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,30 @@ func createLogsReceiver(logReceiverType LogReceiverType) rcvr.CreateLogsFunc {
4646

4747
operators := append([]operator.Config{inputCfg}, baseCfg.Operators...)
4848

49+
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
50+
ReceiverID: params.ID,
51+
ReceiverCreateSettings: params,
52+
})
53+
if err != nil {
54+
return nil, err
55+
}
56+
rcv := &receiver{
57+
set: params.TelemetrySettings,
58+
id: params.ID,
59+
consumer: consumerretry.NewLogs(baseCfg.RetryOnFailure, params.Logger, nextConsumer),
60+
obsrecv: obsrecv,
61+
storageID: baseCfg.StorageID,
62+
}
63+
4964
var emitterOpts []helper.EmitterOption
5065
if baseCfg.maxBatchSize > 0 {
5166
emitterOpts = append(emitterOpts, helper.WithMaxBatchSize(baseCfg.maxBatchSize))
5267
}
5368
if baseCfg.flushInterval > 0 {
5469
emitterOpts = append(emitterOpts, helper.WithFlushInterval(baseCfg.flushInterval))
5570
}
56-
emitter := helper.NewLogEmitter(params.TelemetrySettings, emitterOpts...)
71+
72+
emitter := helper.NewLogEmitter(params.TelemetrySettings, rcv.consumeEntries, emitterOpts...)
5773
pipe, err := pipeline.Config{
5874
Operators: operators,
5975
DefaultOutput: emitter,
@@ -62,27 +78,9 @@ func createLogsReceiver(logReceiverType LogReceiverType) rcvr.CreateLogsFunc {
6278
return nil, err
6379
}
6480

65-
var converterOpts []converterOption
66-
if baseCfg.numWorkers > 0 {
67-
converterOpts = append(converterOpts, withWorkerCount(baseCfg.numWorkers))
68-
}
69-
converter := NewConverter(params.TelemetrySettings, converterOpts...)
70-
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
71-
ReceiverID: params.ID,
72-
ReceiverCreateSettings: params,
73-
})
74-
if err != nil {
75-
return nil, err
76-
}
77-
return &receiver{
78-
set: params.TelemetrySettings,
79-
id: params.ID,
80-
pipe: pipe,
81-
emitter: emitter,
82-
consumer: consumerretry.NewLogs(baseCfg.RetryOnFailure, params.Logger, nextConsumer),
83-
converter: converter,
84-
obsrecv: obsrecv,
85-
storageID: baseCfg.StorageID,
86-
}, nil
81+
rcv.emitter = emitter
82+
rcv.pipe = pipe
83+
84+
return rcv, nil
8785
}
8886
}

pkg/stanza/adapter/integration_test.go

+13-10
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727
func createNoopReceiver(nextConsumer consumer.Logs) (*receiver, error) {
2828
set := componenttest.NewNopTelemetrySettings()
2929
set.Logger = zap.NewNop()
30-
emitter := helper.NewLogEmitter(set)
30+
3131
pipe, err := pipeline.Config{
3232
Operators: []operator.Config{
3333
{
@@ -48,15 +48,18 @@ func createNoopReceiver(nextConsumer consumer.Logs) (*receiver, error) {
4848
return nil, err
4949
}
5050

51-
return &receiver{
52-
set: set,
53-
id: component.MustNewID("testReceiver"),
54-
pipe: pipe,
55-
emitter: emitter,
56-
consumer: nextConsumer,
57-
converter: NewConverter(componenttest.NewNopTelemetrySettings()),
58-
obsrecv: obsrecv,
59-
}, nil
51+
rcv := &receiver{
52+
set: set,
53+
id: component.MustNewID("testReceiver"),
54+
pipe: pipe,
55+
consumer: nextConsumer,
56+
obsrecv: obsrecv,
57+
}
58+
59+
emitter := helper.NewLogEmitter(set, rcv.consumeEntries)
60+
61+
rcv.emitter = emitter
62+
return rcv, nil
6063
}
6164

6265
// BenchmarkEmitterToConsumer serves as a benchmark for entries going from the emitter to consumer,

pkg/stanza/adapter/receiver.go

+16-82
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ package adapter // import "github.com/open-telemetry/opentelemetry-collector-con
66
import (
77
"context"
88
"fmt"
9-
"sync"
109

1110
"go.opentelemetry.io/collector/component"
1211
"go.opentelemetry.io/collector/consumer"
@@ -16,22 +15,19 @@ import (
1615
"go.uber.org/multierr"
1716
"go.uber.org/zap"
1817

18+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
1919
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
2020
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline"
2121
)
2222

2323
type receiver struct {
24-
set component.TelemetrySettings
25-
id component.ID
26-
emitWg sync.WaitGroup
27-
consumeWg sync.WaitGroup
28-
cancel context.CancelFunc
29-
30-
pipe pipeline.Pipeline
31-
emitter *helper.LogEmitter
32-
consumer consumer.Logs
33-
converter *Converter
34-
obsrecv *receiverhelper.ObsReport
24+
set component.TelemetrySettings
25+
id component.ID
26+
27+
pipe pipeline.Pipeline
28+
emitter *helper.LogEmitter
29+
consumer consumer.Logs
30+
obsrecv *receiverhelper.ObsReport
3531

3632
storageID *component.ID
3733
storageClient storage.Client
@@ -42,8 +38,6 @@ var _ rcvr.Logs = (*receiver)(nil)
4238

4339
// Start tells the receiver to start
4440
func (r *receiver) Start(ctx context.Context, host component.Host) error {
45-
rctx, cancel := context.WithCancel(ctx)
46-
r.cancel = cancel
4741
r.set.Logger.Info("Starting stanza receiver")
4842

4943
if err := r.setStorageClient(ctx, host); err != nil {
@@ -54,86 +48,26 @@ func (r *receiver) Start(ctx context.Context, host component.Host) error {
5448
return fmt.Errorf("start stanza: %w", err)
5549
}
5650

57-
r.converter.Start()
58-
59-
// Below we're starting 2 loops:
60-
// * one which reads all the logs produced by the emitter and then forwards
61-
// them to converter
62-
// ...
63-
r.emitWg.Add(1)
64-
go r.emitterLoop()
65-
66-
// ...
67-
// * second one which reads all the logs produced by the converter
68-
// (aggregated by Resource) and then calls consumer to consume them.
69-
r.consumeWg.Add(1)
70-
go r.consumerLoop(rctx)
71-
72-
// Those 2 loops are started in separate goroutines because batching in
73-
// the emitter loop can cause a flush, caused by either reaching the max
74-
// flush size or by the configurable ticker which would in turn cause
75-
// a set of log entries to be available for reading in converter's out
76-
// channel. In order to prevent backpressure, reading from the converter
77-
// channel and batching are done in those 2 goroutines.
78-
7951
return nil
8052
}
8153

82-
// emitterLoop reads the log entries produced by the emitter and batches them
83-
// in converter.
84-
func (r *receiver) emitterLoop() {
85-
defer r.emitWg.Done()
86-
87-
// Don't create done channel on every iteration.
88-
// emitter.OutChannel is closed on ctx.Done(), no need to handle ctx here
89-
// instead we should drain and process the channel to let emitter cancel properly
90-
for e := range r.emitter.OutChannel() {
91-
if err := r.converter.Batch(e); err != nil {
92-
r.set.Logger.Error("Could not add entry to batch", zap.Error(err))
93-
}
94-
}
54+
func (r *receiver) consumeEntries(ctx context.Context, entries []*entry.Entry) {
55+
obsrecvCtx := r.obsrecv.StartLogsOp(ctx)
56+
pLogs := ConvertEntries(entries)
57+
logRecordCount := pLogs.LogRecordCount()
9558

96-
r.set.Logger.Debug("Emitter loop stopped")
97-
}
98-
99-
// consumerLoop reads converter log entries and calls the consumer to consumer them.
100-
func (r *receiver) consumerLoop(ctx context.Context) {
101-
defer r.consumeWg.Done()
102-
103-
// Don't create done channel on every iteration.
104-
// converter.OutChannel is closed on Shutdown before context is cancelled.
105-
// Drain the channel and process events before exiting
106-
for pLogs := range r.converter.OutChannel() {
107-
obsrecvCtx := r.obsrecv.StartLogsOp(ctx)
108-
logRecordCount := pLogs.LogRecordCount()
109-
110-
cErr := r.consumer.ConsumeLogs(ctx, pLogs)
111-
if cErr != nil {
112-
r.set.Logger.Error("ConsumeLogs() failed", zap.Error(cErr))
113-
}
114-
r.obsrecv.EndLogsOp(obsrecvCtx, "stanza", logRecordCount, cErr)
59+
cErr := r.consumer.ConsumeLogs(ctx, pLogs)
60+
if cErr != nil {
61+
r.set.Logger.Error("ConsumeLogs() failed", zap.Error(cErr))
11562
}
116-
117-
r.set.Logger.Debug("Consumer loop stopped")
63+
r.obsrecv.EndLogsOp(obsrecvCtx, "stanza", logRecordCount, cErr)
11864
}
11965

12066
// Shutdown is invoked during service shutdown
12167
func (r *receiver) Shutdown(ctx context.Context) error {
122-
if r.cancel == nil {
123-
return nil
124-
}
125-
12668
r.set.Logger.Info("Stopping stanza receiver")
12769
pipelineErr := r.pipe.Stop()
12870

129-
// wait for emitter to finish batching and let consumers catch up
130-
r.emitWg.Wait()
131-
132-
r.converter.Stop()
133-
r.cancel()
134-
// wait for consumers to catch up
135-
r.consumeWg.Wait()
136-
13771
if r.storageClient != nil {
13872
clientErr := r.storageClient.Close(ctx)
13973
return multierr.Combine(pipelineErr, clientErr)

0 commit comments

Comments
 (0)