Skip to content

[pkg/stanza] make log emitter and entry conversion in adapter synchronous #35669

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e39f7d6
[pkg/stanza] make log emitter and entry conversion synchronous
bacherfl Oct 8, 2024
0229024
Merge branch 'main' into feat/35453/synchronous-log-emitter
bacherfl Oct 9, 2024
9d7f616
adapt container parser and log transformer to work synchronously with…
bacherfl Oct 9, 2024
f2a4063
adapt container parser and log transformer to work synchronously with…
bacherfl Oct 9, 2024
04709ed
trigger CI checks
bacherfl Oct 9, 2024
602a01d
adapt tests
bacherfl Oct 9, 2024
827cc94
fix linting
bacherfl Oct 9, 2024
6e5fac5
remove unused fields and adapt integration tests
bacherfl Oct 10, 2024
0dd265d
remove unused fields, fix linting
bacherfl Oct 10, 2024
5d957f1
fix data race in unit test
bacherfl Oct 10, 2024
7430d6c
Merge branch 'main' into feat/35453/synchronous-log-emitter
bacherfl Oct 10, 2024
404cc31
add changelog entry
bacherfl Oct 10, 2024
9d69cb7
Merge branch 'main' into feat/35453/synchronous-log-emitter
bacherfl Oct 10, 2024
3c92b32
address comments from PR review
bacherfl Oct 16, 2024
4f1624d
send converted entries to flushChan directly
bacherfl Oct 17, 2024
e10cf01
Merge branch 'main' into feat/35453/synchronous-log-emitter
bacherfl Oct 28, 2024
2d19d41
Merge branch 'main' into feat/35453/synchronous-log-emitter
bacherfl Oct 28, 2024
0672398
adapt the receiver benchmark test to the changes introduced in this PR
bacherfl Oct 28, 2024
e17f428
Merge branch 'main' into feat/35453/synchronous-log-emitter
bacherfl Oct 29, 2024
6907735
fix linting
bacherfl Oct 29, 2024
38c34ef
Update pkg/stanza/operator/helper/emitter.go
bacherfl Nov 4, 2024
58ec352
fix previous benchmark tests
bacherfl Nov 4, 2024
85bd70e
fix linting
bacherfl Nov 4, 2024
2eeacc3
fix linting
bacherfl Nov 4, 2024
2546832
Merge branch 'main' into feat/35453/synchronous-log-emitter
bacherfl Nov 4, 2024
b5a211a
Merge branch 'main' into feat/35453/synchronous-log-emitter
bacherfl Nov 5, 2024
dd6adce
revert import of file input package
bacherfl Nov 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/stanza-sync-log-emitter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Synchronous handling of entries passed from the log emitter to the receiver adapter

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35453]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
57 changes: 31 additions & 26 deletions pkg/stanza/adapter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,43 +156,48 @@ func (c *Converter) workerLoop() {

for entries := range c.workerChan {

resourceHashToIdx := make(map[uint64]int)
scopeIdxByResource := make(map[uint64]map[string]int)
pLogs := ConvertEntries(entries)

pLogs := plog.NewLogs()
var sl plog.ScopeLogs
// Send plogs directly to flushChan
c.flushChan <- pLogs
}
}

for _, e := range entries {
resourceID := HashResource(e.Resource)
var rl plog.ResourceLogs
func ConvertEntries(entries []*entry.Entry) plog.Logs {
resourceHashToIdx := make(map[uint64]int)
scopeIdxByResource := make(map[uint64]map[string]int)

resourceIdx, ok := resourceHashToIdx[resourceID]
if !ok {
resourceHashToIdx[resourceID] = pLogs.ResourceLogs().Len()
pLogs := plog.NewLogs()
var sl plog.ScopeLogs

for _, e := range entries {
resourceID := HashResource(e.Resource)
var rl plog.ResourceLogs

rl = pLogs.ResourceLogs().AppendEmpty()
upsertToMap(e.Resource, rl.Resource().Attributes())
resourceIdx, ok := resourceHashToIdx[resourceID]
if !ok {
resourceHashToIdx[resourceID] = pLogs.ResourceLogs().Len()

scopeIdxByResource[resourceID] = map[string]int{e.ScopeName: 0}
rl = pLogs.ResourceLogs().AppendEmpty()
upsertToMap(e.Resource, rl.Resource().Attributes())

scopeIdxByResource[resourceID] = map[string]int{e.ScopeName: 0}
sl = rl.ScopeLogs().AppendEmpty()
sl.Scope().SetName(e.ScopeName)
} else {
rl = pLogs.ResourceLogs().At(resourceIdx)
scopeIdxInResource, ok := scopeIdxByResource[resourceID][e.ScopeName]
if !ok {
scopeIdxByResource[resourceID][e.ScopeName] = rl.ScopeLogs().Len()
sl = rl.ScopeLogs().AppendEmpty()
sl.Scope().SetName(e.ScopeName)
} else {
rl = pLogs.ResourceLogs().At(resourceIdx)
scopeIdxInResource, ok := scopeIdxByResource[resourceID][e.ScopeName]
if !ok {
scopeIdxByResource[resourceID][e.ScopeName] = rl.ScopeLogs().Len()
sl = rl.ScopeLogs().AppendEmpty()
sl.Scope().SetName(e.ScopeName)
} else {
sl = pLogs.ResourceLogs().At(resourceIdx).ScopeLogs().At(scopeIdxInResource)
}
sl = pLogs.ResourceLogs().At(resourceIdx).ScopeLogs().At(scopeIdxInResource)
}
convertInto(e, sl.LogRecords().AppendEmpty())
}

// Send plogs directly to flushChan
c.flushChan <- pLogs
convertInto(e, sl.LogRecords().AppendEmpty())
}
return pLogs
}

func (c *Converter) flushLoop() {
Expand Down
44 changes: 21 additions & 23 deletions pkg/stanza/adapter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,30 @@ func createLogsReceiver(logReceiverType LogReceiverType) rcvr.CreateLogsFunc {

operators := append([]operator.Config{inputCfg}, baseCfg.Operators...)

obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: params.ID,
ReceiverCreateSettings: params,
})
if err != nil {
return nil, err
}
rcv := &receiver{
set: params.TelemetrySettings,
id: params.ID,
consumer: consumerretry.NewLogs(baseCfg.RetryOnFailure, params.Logger, nextConsumer),
obsrecv: obsrecv,
storageID: baseCfg.StorageID,
}

var emitterOpts []helper.EmitterOption
if baseCfg.maxBatchSize > 0 {
emitterOpts = append(emitterOpts, helper.WithMaxBatchSize(baseCfg.maxBatchSize))
}
if baseCfg.flushInterval > 0 {
emitterOpts = append(emitterOpts, helper.WithFlushInterval(baseCfg.flushInterval))
}
emitter := helper.NewLogEmitter(params.TelemetrySettings, emitterOpts...)

emitter := helper.NewLogEmitter(params.TelemetrySettings, rcv.consumeEntries, emitterOpts...)
pipe, err := pipeline.Config{
Operators: operators,
DefaultOutput: emitter,
Expand All @@ -62,27 +78,9 @@ func createLogsReceiver(logReceiverType LogReceiverType) rcvr.CreateLogsFunc {
return nil, err
}

var converterOpts []converterOption
if baseCfg.numWorkers > 0 {
converterOpts = append(converterOpts, withWorkerCount(baseCfg.numWorkers))
}
converter := NewConverter(params.TelemetrySettings, converterOpts...)
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: params.ID,
ReceiverCreateSettings: params,
})
if err != nil {
return nil, err
}
return &receiver{
set: params.TelemetrySettings,
id: params.ID,
pipe: pipe,
emitter: emitter,
consumer: consumerretry.NewLogs(baseCfg.RetryOnFailure, params.Logger, nextConsumer),
converter: converter,
obsrecv: obsrecv,
storageID: baseCfg.StorageID,
}, nil
rcv.emitter = emitter
rcv.pipe = pipe

return rcv, nil
}
}
23 changes: 13 additions & 10 deletions pkg/stanza/adapter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
func createNoopReceiver(nextConsumer consumer.Logs) (*receiver, error) {
set := componenttest.NewNopTelemetrySettings()
set.Logger = zap.NewNop()
emitter := helper.NewLogEmitter(set)

pipe, err := pipeline.Config{
Operators: []operator.Config{
{
Expand All @@ -48,15 +48,18 @@ func createNoopReceiver(nextConsumer consumer.Logs) (*receiver, error) {
return nil, err
}

return &receiver{
set: set,
id: component.MustNewID("testReceiver"),
pipe: pipe,
emitter: emitter,
consumer: nextConsumer,
converter: NewConverter(componenttest.NewNopTelemetrySettings()),
obsrecv: obsrecv,
}, nil
rcv := &receiver{
set: set,
id: component.MustNewID("testReceiver"),
pipe: pipe,
consumer: nextConsumer,
obsrecv: obsrecv,
}

emitter := helper.NewLogEmitter(set, rcv.consumeEntries)

rcv.emitter = emitter
return rcv, nil
}

// BenchmarkEmitterToConsumer serves as a benchmark for entries going from the emitter to consumer,
Expand Down
93 changes: 18 additions & 75 deletions pkg/stanza/adapter/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package adapter // import "github.com/open-telemetry/opentelemetry-collector-con
import (
"context"
"fmt"
"sync"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
Expand All @@ -16,22 +15,20 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline"
)

type receiver struct {
set component.TelemetrySettings
id component.ID
emitWg sync.WaitGroup
consumeWg sync.WaitGroup
cancel context.CancelFunc

pipe pipeline.Pipeline
emitter *helper.LogEmitter
consumer consumer.Logs
converter *Converter
obsrecv *receiverhelper.ObsReport
set component.TelemetrySettings
id component.ID
cancel context.CancelFunc

pipe pipeline.Pipeline
emitter *helper.LogEmitter
consumer consumer.Logs
obsrecv *receiverhelper.ObsReport

storageID *component.ID
storageClient storage.Client
Expand All @@ -42,7 +39,7 @@ var _ rcvr.Logs = (*receiver)(nil)

// Start tells the receiver to start
func (r *receiver) Start(ctx context.Context, host component.Host) error {
rctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancel(ctx)
r.cancel = cancel
r.set.Logger.Info("Starting stanza receiver")

Expand All @@ -54,67 +51,19 @@ func (r *receiver) Start(ctx context.Context, host component.Host) error {
return fmt.Errorf("start stanza: %w", err)
}

r.converter.Start()

// Below we're starting 2 loops:
// * one which reads all the logs produced by the emitter and then forwards
// them to converter
// ...
r.emitWg.Add(1)
go r.emitterLoop()

// ...
// * second one which reads all the logs produced by the converter
// (aggregated by Resource) and then calls consumer to consume them.
r.consumeWg.Add(1)
go r.consumerLoop(rctx)

// Those 2 loops are started in separate goroutines because batching in
// the emitter loop can cause a flush, caused by either reaching the max
// flush size or by the configurable ticker which would in turn cause
// a set of log entries to be available for reading in converter's out
// channel. In order to prevent backpressure, reading from the converter
// channel and batching are done in those 2 goroutines.

return nil
}

// emitterLoop reads the log entries produced by the emitter and batches them
// in converter.
func (r *receiver) emitterLoop() {
defer r.emitWg.Done()

// Don't create done channel on every iteration.
// emitter.OutChannel is closed on ctx.Done(), no need to handle ctx here
// instead we should drain and process the channel to let emitter cancel properly
for e := range r.emitter.OutChannel() {
if err := r.converter.Batch(e); err != nil {
r.set.Logger.Error("Could not add entry to batch", zap.Error(err))
}
}

r.set.Logger.Debug("Emitter loop stopped")
}
func (r *receiver) consumeEntries(ctx context.Context, entries []*entry.Entry) {
pLogs := ConvertEntries(entries)
obsrecvCtx := r.obsrecv.StartLogsOp(ctx)
logRecordCount := pLogs.LogRecordCount()

// consumerLoop reads converter log entries and calls the consumer to consumer them.
func (r *receiver) consumerLoop(ctx context.Context) {
defer r.consumeWg.Done()

// Don't create done channel on every iteration.
// converter.OutChannel is closed on Shutdown before context is cancelled.
// Drain the channel and process events before exiting
for pLogs := range r.converter.OutChannel() {
obsrecvCtx := r.obsrecv.StartLogsOp(ctx)
logRecordCount := pLogs.LogRecordCount()

cErr := r.consumer.ConsumeLogs(ctx, pLogs)
if cErr != nil {
r.set.Logger.Error("ConsumeLogs() failed", zap.Error(cErr))
}
r.obsrecv.EndLogsOp(obsrecvCtx, "stanza", logRecordCount, cErr)
cErr := r.consumer.ConsumeLogs(ctx, pLogs)
if cErr != nil {
r.set.Logger.Error("ConsumeLogs() failed", zap.Error(cErr))
}

r.set.Logger.Debug("Consumer loop stopped")
r.obsrecv.EndLogsOp(obsrecvCtx, "stanza", logRecordCount, cErr)
}

// Shutdown is invoked during service shutdown
Expand All @@ -126,13 +75,7 @@ func (r *receiver) Shutdown(ctx context.Context) error {
r.set.Logger.Info("Stopping stanza receiver")
pipelineErr := r.pipe.Stop()

// wait for emitter to finish batching and let consumers catch up
r.emitWg.Wait()

r.converter.Stop()
r.cancel()
// wait for consumers to catch up
r.consumeWg.Wait()

if r.storageClient != nil {
clientErr := r.storageClient.Close(ctx)
Expand Down
Loading
Loading