Skip to content

Commit 2a3fbd0

Browse files
[connector/otlpjson]: Do not emit empty batches (#35827)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description The connector now does not emit empty batches for invalid otlp payload and throws an error instead. Approach discussed here #35738 (comment) <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes #35738 and #35739 <!--Describe what testing was performed and which tests were added.--> #### Testing Manual Testing <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.--> --------- Co-authored-by: Daniel Jaglowski <[email protected]>
1 parent d1dcee9 commit 2a3fbd0

File tree

6 files changed

+138
-24
lines changed

6 files changed

+138
-24
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: connector/otlpjson
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Throw error on invalid otlp payload.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [35738, 35739]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

connector/otlpjsonconnector/connector_test.go

+46
Original file line numberDiff line numberDiff line change
@@ -178,3 +178,49 @@ func TestLogsToTraces(t *testing.T) {
178178
})
179179
}
180180
}
181+
182+
// This benchmark looks at how performance is affected when all three connectors are consuming logs (at the same time)
183+
func BenchmarkConsumeLogs(b *testing.B) {
184+
inputlogs := "input-log.yaml"
185+
inputTraces := "input-trace.yaml"
186+
inputMetrics := "input-metric.yaml"
187+
188+
factory := NewFactory()
189+
// initialize log -> log connector
190+
logsink := &consumertest.LogsSink{}
191+
logscon, _ := factory.CreateLogsToLogs(context.Background(),
192+
connectortest.NewNopSettings(), createDefaultConfig(), logsink)
193+
194+
require.NoError(b, logscon.Start(context.Background(), componenttest.NewNopHost()))
195+
defer func() {
196+
assert.NoError(b, logscon.Shutdown(context.Background()))
197+
}()
198+
199+
// initialize log -> traces connector
200+
tracesink := &consumertest.TracesSink{}
201+
traceconn, _ := factory.CreateLogsToTraces(context.Background(),
202+
connectortest.NewNopSettings(), createDefaultConfig(), tracesink)
203+
require.NoError(b, traceconn.Start(context.Background(), componenttest.NewNopHost()))
204+
defer func() {
205+
assert.NoError(b, traceconn.Shutdown(context.Background()))
206+
}()
207+
208+
// initialize log -> metric connector
209+
metricsink := &consumertest.MetricsSink{}
210+
metricconn, _ := factory.CreateLogsToMetrics(context.Background(),
211+
connectortest.NewNopSettings(), createDefaultConfig(), metricsink)
212+
require.NoError(b, metricconn.Start(context.Background(), componenttest.NewNopHost()))
213+
defer func() {
214+
assert.NoError(b, metricconn.Shutdown(context.Background()))
215+
}()
216+
217+
testLogs, _ := golden.ReadLogs(filepath.Join("testdata", "logsToLogs", inputlogs))
218+
testTraces, _ := golden.ReadLogs(filepath.Join("testdata", "logsToTraces", inputTraces))
219+
testMetrics, _ := golden.ReadLogs(filepath.Join("testdata", "logsToMetrics", inputMetrics))
220+
221+
for i := 0; i < b.N; i++ {
222+
assert.NoError(b, logscon.ConsumeLogs(context.Background(), testLogs))
223+
assert.NoError(b, traceconn.ConsumeLogs(context.Background(), testTraces))
224+
assert.NoError(b, metricconn.ConsumeLogs(context.Background(), testMetrics))
225+
}
226+
}

connector/otlpjsonconnector/factory.go

+5
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package otlpjsonconnector // import "github.com/open-telemetry/opentelemetry-col
55

66
import (
77
"context"
8+
"regexp"
89

910
"go.opentelemetry.io/collector/component"
1011
"go.opentelemetry.io/collector/connector"
@@ -13,6 +14,10 @@ import (
1314
"github.com/open-telemetry/opentelemetry-collector-contrib/connector/otlpjsonconnector/internal/metadata"
1415
)
1516

17+
var logRegex = regexp.MustCompile(`^\{\s*"resourceLogs"\s*:\s*\[`)
18+
var metricRegex = regexp.MustCompile(`^\{\s*"resourceMetrics"\s*:\s*\[`)
19+
var traceRegex = regexp.MustCompile(`^\{\s*"resourceSpans"\s*:\s*\[`)
20+
1621
// NewFactory returns a ConnectorFactory.
1722
func NewFactory() connector.Factory {
1823
return connector.NewFactory(

connector/otlpjsonconnector/logs.go

+21-8
Original file line numberDiff line numberDiff line change
@@ -50,16 +50,29 @@ func (c *connectorLogs) ConsumeLogs(ctx context.Context, pl plog.Logs) error {
5050
for k := 0; k < logRecord.LogRecords().Len(); k++ {
5151
lRecord := logRecord.LogRecords().At(k)
5252
token := lRecord.Body()
53-
var l plog.Logs
54-
l, err := logsUnmarshaler.UnmarshalLogs([]byte(token.AsString()))
55-
if err != nil {
56-
c.logger.Error("could not extract logs from otlp json", zap.Error(err))
53+
54+
// Check if the "resourceLogs" key exists in the JSON data
55+
value := token.AsString()
56+
switch {
57+
case logRegex.MatchString(value):
58+
var l plog.Logs
59+
l, err := logsUnmarshaler.UnmarshalLogs([]byte(value))
60+
if err != nil {
61+
c.logger.Error("could not extract logs from otlp json", zap.Error(err))
62+
continue
63+
}
64+
err = c.logsConsumer.ConsumeLogs(ctx, l)
65+
if err != nil {
66+
c.logger.Error("could not consume logs from otlp json", zap.Error(err))
67+
}
68+
case metricRegex.MatchString(value), traceRegex.MatchString(value):
69+
// If it's a metric or trace payload, simply continue
5770
continue
71+
default:
72+
// If no regex matches, log the invalid payload
73+
c.logger.Error("Invalid otlp payload")
5874
}
59-
err = c.logsConsumer.ConsumeLogs(ctx, l)
60-
if err != nil {
61-
c.logger.Error("could not consume logs from otlp json", zap.Error(err))
62-
}
75+
6376
}
6477
}
6578
}

connector/otlpjsonconnector/metrics.go

+20-8
Original file line numberDiff line numberDiff line change
@@ -51,16 +51,28 @@ func (c *connectorMetrics) ConsumeLogs(ctx context.Context, pl plog.Logs) error
5151
for k := 0; k < logRecord.LogRecords().Len(); k++ {
5252
lRecord := logRecord.LogRecords().At(k)
5353
token := lRecord.Body()
54-
var m pmetric.Metrics
55-
m, err := metricsUnmarshaler.UnmarshalMetrics([]byte(token.AsString()))
56-
if err != nil {
57-
c.logger.Error("could extract metrics from otlp json", zap.Error(err))
54+
55+
value := token.AsString()
56+
switch {
57+
case metricRegex.MatchString(value):
58+
var m pmetric.Metrics
59+
m, err := metricsUnmarshaler.UnmarshalMetrics([]byte(value))
60+
if err != nil {
61+
c.logger.Error("could not extract metrics from otlp json", zap.Error(err))
62+
continue
63+
}
64+
err = c.metricsConsumer.ConsumeMetrics(ctx, m)
65+
if err != nil {
66+
c.logger.Error("could not consume metrics from otlp json", zap.Error(err))
67+
}
68+
case logRegex.MatchString(value), traceRegex.MatchString(value):
69+
// If it's a log or trace payload, simply continue
5870
continue
71+
default:
72+
// If no regex matches, log the invalid payload
73+
c.logger.Error("Invalid otlp payload")
5974
}
60-
err = c.metricsConsumer.ConsumeMetrics(ctx, m)
61-
if err != nil {
62-
c.logger.Error("could not consume metrics from otlp json", zap.Error(err))
63-
}
75+
6476
}
6577
}
6678
}

connector/otlpjsonconnector/traces.go

+19-8
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,26 @@ func (c *connectorTraces) ConsumeLogs(ctx context.Context, pl plog.Logs) error {
5151
for k := 0; k < logRecord.LogRecords().Len(); k++ {
5252
lRecord := logRecord.LogRecords().At(k)
5353
token := lRecord.Body()
54-
var t ptrace.Traces
55-
t, err := tracesUnmarshaler.UnmarshalTraces([]byte(token.AsString()))
56-
if err != nil {
57-
c.logger.Error("could extract traces from otlp json", zap.Error(err))
54+
55+
value := token.AsString()
56+
switch {
57+
case traceRegex.MatchString(value):
58+
var t ptrace.Traces
59+
t, err := tracesUnmarshaler.UnmarshalTraces([]byte(value))
60+
if err != nil {
61+
c.logger.Error("could not extract traces from otlp json", zap.Error(err))
62+
continue
63+
}
64+
err = c.tracesConsumer.ConsumeTraces(ctx, t)
65+
if err != nil {
66+
c.logger.Error("could not consume traces from otlp json", zap.Error(err))
67+
}
68+
case metricRegex.MatchString(value), logRegex.MatchString(value):
69+
// If it's a metric or log payload, continue to the next iteration
5870
continue
59-
}
60-
err = c.tracesConsumer.ConsumeTraces(ctx, t)
61-
if err != nil {
62-
c.logger.Error("could not consume traces from otlp json", zap.Error(err))
71+
default:
72+
// If no regex matches, log the invalid payload
73+
c.logger.Error("Invalid otlp payload")
6374
}
6475
}
6576
}

0 commit comments

Comments
 (0)