Skip to content

Commit f574d98

Browse files
committed
Add ingoing and outgoing counts to processorhelper
1 parent d2ed276 commit f574d98

File tree

11 files changed

+466
-3
lines changed

11 files changed

+466
-3
lines changed
+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: processor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add incoming and outgoing counts for processors using processorhelper.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [10910]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
Any processor using the processorhelper package (this is most processors) will automatically report
20+
incoming and outgoing item counts. The new metrics are:
21+
- otelcol_processor_incoming_spans
22+
- otelcol_processor_outgoing_spans
23+
- otelcol_processor_incoming_metric_points
24+
- otelcol_processor_outgoing_metric_points
25+
- otelcol_processor_incoming_log_records
26+
- otelcol_processor_outgoing_log_records
27+
28+
# Optional: The change log or logs in which this entry should be included.
29+
# e.g. '[user]' or '[user, api]'
30+
# Include 'user' if the change is relevant to end users.
31+
# Include 'api' if there is a change to a library API.
32+
# Default: '[user]'
33+
change_logs: []

processor/processorhelper/documentation.md

+48
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,30 @@ Number of spans that were dropped.
5454
| ---- | ----------- | ---------- | --------- |
5555
| {spans} | Sum | Int | true |
5656

57+
### otelcol_processor_incoming_log_records
58+
59+
Number of log records passed to the processor.
60+
61+
| Unit | Metric Type | Value Type | Monotonic |
62+
| ---- | ----------- | ---------- | --------- |
63+
| {records} | Sum | Int | true |
64+
65+
### otelcol_processor_incoming_metric_points
66+
67+
Number of metric points passed to the processor.
68+
69+
| Unit | Metric Type | Value Type | Monotonic |
70+
| ---- | ----------- | ---------- | --------- |
71+
| {datapoints} | Sum | Int | true |
72+
73+
### otelcol_processor_incoming_spans
74+
75+
Number of spans passed to the processor.
76+
77+
| Unit | Metric Type | Value Type | Monotonic |
78+
| ---- | ----------- | ---------- | --------- |
79+
| {spans} | Sum | Int | true |
80+
5781
### otelcol_processor_inserted_log_records
5882

5983
Number of log records that were inserted.
@@ -78,6 +102,30 @@ Number of spans that were inserted.
78102
| ---- | ----------- | ---------- | --------- |
79103
| {spans} | Sum | Int | true |
80104

105+
### otelcol_processor_outgoing_log_records
106+
107+
Number of log records emitted from the processor.
108+
109+
| Unit | Metric Type | Value Type | Monotonic |
110+
| ---- | ----------- | ---------- | --------- |
111+
| {records} | Sum | Int | true |
112+
113+
### otelcol_processor_outgoing_metric_points
114+
115+
Number of metric points emitted from the processor.
116+
117+
| Unit | Metric Type | Value Type | Monotonic |
118+
| ---- | ----------- | ---------- | --------- |
119+
| {datapoints} | Sum | Int | true |
120+
121+
### otelcol_processor_outgoing_spans
122+
123+
Number of spans emitted from the processor.
124+
125+
| Unit | Metric Type | Value Type | Monotonic |
126+
| ---- | ----------- | ---------- | --------- |
127+
| {spans} | Sum | Int | true |
128+
81129
### otelcol_processor_refused_log_records
82130

83131
Number of log records that were rejected by the next component in the pipeline.

processor/processorhelper/internal/metadata/generated_telemetry.go

+42
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

processor/processorhelper/logs.go

+17-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"errors"
99

10+
"go.opentelemetry.io/otel/metric/noop"
1011
"go.opentelemetry.io/otel/trace"
1112

1213
"go.opentelemetry.io/collector/component"
@@ -39,12 +40,25 @@ func NewLogsProcessor(
3940
return nil, errors.New("nil logsFunc")
4041
}
4142

43+
if set.MeterProvider == nil {
44+
set.MeterProvider = noop.NewMeterProvider()
45+
}
46+
47+
obs, err := newObsReport(ObsReportSettings{
48+
ProcessorID: set.ID,
49+
ProcessorCreateSettings: set,
50+
})
51+
if err != nil {
52+
return nil, err
53+
}
54+
4255
eventOptions := spanAttributes(set.ID)
4356
bs := fromOptions(options)
4457
logsConsumer, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error {
4558
span := trace.SpanFromContext(ctx)
4659
span.AddEvent("Start processing.", eventOptions)
47-
var err error
60+
recordsIn := ld.LogRecordCount()
61+
4862
ld, err = logsFunc(ctx, ld)
4963
span.AddEvent("End processing.", eventOptions)
5064
if err != nil {
@@ -53,6 +67,8 @@ func NewLogsProcessor(
5367
}
5468
return err
5569
}
70+
recordsOut := ld.LogRecordCount()
71+
obs.recordInOut(ctx, component.DataTypeLogs, recordsIn, recordsOut)
5672
return nextConsumer.ConsumeLogs(ctx, ld)
5773
}, bs.consumerOptions...)
5874
if err != nil {

processor/processorhelper/logs_test.go

+75
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,19 @@ package processorhelper
66
import (
77
"context"
88
"errors"
9+
"strings"
910
"testing"
1011

1112
"github.com/stretchr/testify/assert"
1213
"github.com/stretchr/testify/require"
14+
"go.opentelemetry.io/otel/attribute"
15+
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
16+
"go.opentelemetry.io/otel/sdk/metric/metricdata"
17+
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
1318

1419
"go.opentelemetry.io/collector/component"
1520
"go.opentelemetry.io/collector/component/componenttest"
21+
"go.opentelemetry.io/collector/config/configtelemetry"
1622
"go.opentelemetry.io/collector/consumer"
1723
"go.opentelemetry.io/collector/consumer/consumertest"
1824
"go.opentelemetry.io/collector/pdata/plog"
@@ -67,3 +73,72 @@ func newTestLProcessor(retError error) ProcessLogsFunc {
6773
return ld, retError
6874
}
6975
}
76+
77+
func TestLogsProcessor_RecordInOut(t *testing.T) {
78+
// Regardless of how many logs are ingested, emit just one
79+
mockAggregate := func(_ context.Context, _ plog.Logs) (plog.Logs, error) {
80+
ld := plog.NewLogs()
81+
ld.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
82+
return ld, nil
83+
}
84+
85+
incomingLogs := plog.NewLogs()
86+
incomingLogRecords := incomingLogs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords()
87+
88+
// Add 3 records to the incoming
89+
incomingLogRecords.AppendEmpty()
90+
incomingLogRecords.AppendEmpty()
91+
incomingLogRecords.AppendEmpty()
92+
93+
metricReader := sdkmetric.NewManualReader()
94+
set := processortest.NewNopSettings()
95+
set.TelemetrySettings.MetricsLevel = configtelemetry.LevelNormal
96+
set.TelemetrySettings.MeterProvider = sdkmetric.NewMeterProvider(sdkmetric.WithReader(metricReader))
97+
98+
lp, err := NewLogsProcessor(context.Background(), set, &testLogsCfg, consumertest.NewNop(), mockAggregate)
99+
require.NoError(t, err)
100+
101+
assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost()))
102+
assert.NoError(t, lp.ConsumeLogs(context.Background(), incomingLogs))
103+
assert.NoError(t, lp.Shutdown(context.Background()))
104+
105+
ownMetrics := new(metricdata.ResourceMetrics)
106+
require.NoError(t, metricReader.Collect(context.Background(), ownMetrics))
107+
108+
require.Len(t, ownMetrics.ScopeMetrics, 1)
109+
require.Len(t, ownMetrics.ScopeMetrics[0].Metrics, 2)
110+
111+
inMetric := ownMetrics.ScopeMetrics[0].Metrics[0]
112+
outMetric := ownMetrics.ScopeMetrics[0].Metrics[1]
113+
if strings.Contains(inMetric.Name, "outgoing") {
114+
inMetric, outMetric = outMetric, inMetric
115+
}
116+
117+
metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{
118+
Temporality: metricdata.CumulativeTemporality,
119+
IsMonotonic: true,
120+
DataPoints: []metricdata.DataPoint[int64]{
121+
{
122+
Attributes: attribute.NewSet(attribute.KeyValue{
123+
Key: attribute.Key("processor"),
124+
Value: attribute.StringValue(set.ID.String()),
125+
}),
126+
Value: 3,
127+
},
128+
},
129+
}, inMetric.Data, metricdatatest.IgnoreTimestamp())
130+
131+
metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{
132+
Temporality: metricdata.CumulativeTemporality,
133+
IsMonotonic: true,
134+
DataPoints: []metricdata.DataPoint[int64]{
135+
{
136+
Attributes: attribute.NewSet(attribute.KeyValue{
137+
Key: attribute.Key("processor"),
138+
Value: attribute.StringValue(set.ID.String()),
139+
}),
140+
Value: 1,
141+
},
142+
},
143+
}, outMetric.Data, metricdatatest.IgnoreTimestamp())
144+
}

processor/processorhelper/metadata.yaml

+49
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,55 @@ status:
99

1010
telemetry:
1111
metrics:
12+
13+
processor_incoming_spans:
14+
enabled: true
15+
description: Number of spans passed to the processor.
16+
unit: "{spans}"
17+
sum:
18+
value_type: int
19+
monotonic: true
20+
21+
processor_outgoing_spans:
22+
enabled: true
23+
description: Number of spans emitted from the processor.
24+
unit: "{spans}"
25+
sum:
26+
value_type: int
27+
monotonic: true
28+
29+
processor_incoming_metric_points:
30+
enabled: true
31+
description: Number of metric points passed to the processor.
32+
unit: "{datapoints}"
33+
sum:
34+
value_type: int
35+
monotonic: true
36+
37+
processor_outgoing_metric_points:
38+
enabled: true
39+
description: Number of metric points emitted from the processor.
40+
unit: "{datapoints}"
41+
sum:
42+
value_type: int
43+
monotonic: true
44+
45+
processor_incoming_log_records:
46+
enabled: true
47+
description: Number of log records passed to the processor.
48+
unit: "{records}"
49+
sum:
50+
value_type: int
51+
monotonic: true
52+
53+
processor_outgoing_log_records:
54+
enabled: true
55+
description: Number of log records emitted from the processor.
56+
unit: "{records}"
57+
sum:
58+
value_type: int
59+
monotonic: true
60+
1261
processor_accepted_spans:
1362
enabled: true
1463
description: Number of spans successfully pushed into the next component in the pipeline.

processor/processorhelper/metrics.go

+17-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"errors"
99

10+
"go.opentelemetry.io/otel/metric/noop"
1011
"go.opentelemetry.io/otel/trace"
1112

1213
"go.opentelemetry.io/collector/component"
@@ -39,12 +40,25 @@ func NewMetricsProcessor(
3940
return nil, errors.New("nil metricsFunc")
4041
}
4142

43+
if set.MeterProvider == nil {
44+
set.MeterProvider = noop.NewMeterProvider()
45+
}
46+
47+
obs, err := newObsReport(ObsReportSettings{
48+
ProcessorID: set.ID,
49+
ProcessorCreateSettings: set,
50+
})
51+
if err != nil {
52+
return nil, err
53+
}
54+
4255
eventOptions := spanAttributes(set.ID)
4356
bs := fromOptions(options)
4457
metricsConsumer, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error {
4558
span := trace.SpanFromContext(ctx)
4659
span.AddEvent("Start processing.", eventOptions)
47-
var err error
60+
pointsIn := md.DataPointCount()
61+
4862
md, err = metricsFunc(ctx, md)
4963
span.AddEvent("End processing.", eventOptions)
5064
if err != nil {
@@ -53,6 +67,8 @@ func NewMetricsProcessor(
5367
}
5468
return err
5569
}
70+
pointsOut := md.DataPointCount()
71+
obs.recordInOut(ctx, component.DataTypeMetrics, pointsIn, pointsOut)
5672
return nextConsumer.ConsumeMetrics(ctx, md)
5773
}, bs.consumerOptions...)
5874
if err != nil {

0 commit comments

Comments
 (0)