Skip to content

Commit 68f3d7d

Browse files
[sumconnector] implement summing logic (#34797)
**Description:** - Adds connector and summing logic - Adds testing of traces and spans summing Note: testing and test data makes up the bulk of the lines in this PR **Link to tracking Issue:** 32669 **Testing:** - condition, attribute, default attribute, multiple condition, multiple attributes, multiple metrics for traces / spans telemetry types **Documentation:** No new docs --------- Co-authored-by: Antoine Toulme <[email protected]>
1 parent c5aa92a commit 68f3d7d

15 files changed

+2375
-6
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: 'enhancement'
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: 'sumconnector'
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "adds connector and summing logic along with tests"
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [32669]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

connector/sumconnector/README.md

+6-6
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@ The `sum` connector can be used to sum attribute values from spans, span events,
2727

2828
If you are not already familiar with connectors, you may find it helpful to first visit the [Connectors README](https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md).
2929

30-
### Configuration
31-
32-
#### Basic configuration
30+
### Basic configuration
3331

3432
This configuration will sum numerical values found within the attribute `attribute.with.numerical.value` of any log telemetry routed to the connector. It will then output a metric time series with the name `my.example.metric.name` with those summed values.
3533

34+
Note: Values found within an attribute will be converted into a float regardless of their original type before being summed and output as a metric value. Non-convertible strings will be dropped and not included.
35+
3636
```yaml
3737
receivers:
3838
foo:
@@ -58,15 +58,15 @@ service:
5858
5959
The sum connector has three required configuration settings and numerous optional settings
6060
61-
- Telemetry type: Nested below the `sum:` connector declaration. Declared as `logs:` in the [Basic Example](#basic-configuration).
61+
- Telemetry type: Nested below the `sum:` connector declaration. Declared as `logs:` in the [Basic Example](#basic-configuration).
6262
- Can be any of `spans`, `spanevents`, `metrics`, `datapoints`, or `logs`.
6363
- Metric name: Nested below the telemetry type; this is the metric name the sum connector will output summed values to. Declared as `my.example.metric.name` in the [Basic Example](#basic-configuration)
6464
- `source_attribute`: A specific attribute to search for within the source telemetry being fed to the connector. This attribute is where the connector will look for numerical values to sum into the output metric value. Declared as `attribute.with.numerical.value` in the [Basic Example](#basic-configuration)
6565

6666
#### Optional Settings
6767

68-
- `conditions`: [OTTL syntax](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/LANGUAGE.md) can be used to provide conditions for processing incoming telemetry. Conditions are ORed together, so if any condition is met the attribute's value will be included in the resulting sum.
69-
- `attributes`: Declaration of attributes to include. Any of these attributes found will generate a separate sum for each set of unique combination of attribute values and output as its own datapoint in the metric time series.
68+
- `conditions`: [OTTL syntax](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/LANGUAGE.md) can be used to provide conditions for processing incoming telemetry. Conditions are ORed together, so if any condition is met the attribute's value will be included in the resulting sum.
69+
- `attributes`: Declaration of attributes to include. Any of these attributes found will generate a separate sum for each set of unique combination of attribute values and output as its own datapoint in the metric time series.
7070
- `key`: (required for `attributes`) the attribute name to match against
7171
- `default_value`: (optional for `attributes`) a default value for the attribute when no matches are found. The `default_value` value can be of type string, integer, or float.
7272

connector/sumconnector/connector.go

+139
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@ package sumconnector // import "github.com/open-telemetry/opentelemetry-collecto
55

66
import (
77
"context"
8+
"errors"
9+
"fmt"
810

911
"go.opentelemetry.io/collector/component"
1012
"go.opentelemetry.io/collector/consumer"
13+
"go.opentelemetry.io/collector/pdata/pcommon"
1114
"go.opentelemetry.io/collector/pdata/plog"
1215
"go.opentelemetry.io/collector/pdata/pmetric"
1316
"go.opentelemetry.io/collector/pdata/ptrace"
@@ -38,22 +41,158 @@ func (c *sum) Capabilities() consumer.Capabilities {
3841
}
3942

4043
func (c *sum) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
44+
var multiError error
4145
sumMetrics := pmetric.NewMetrics()
4246
sumMetrics.ResourceMetrics().EnsureCapacity(td.ResourceSpans().Len())
47+
for i := 0; i < td.ResourceSpans().Len(); i++ {
48+
resourceSpan := td.ResourceSpans().At(i)
49+
spansSummer := newSummer[ottlspan.TransformContext](c.spansMetricDefs)
50+
spanEventsSummer := newSummer[ottlspanevent.TransformContext](c.spanEventsMetricDefs)
4351

52+
for j := 0; j < resourceSpan.ScopeSpans().Len(); j++ {
53+
scopeSpan := resourceSpan.ScopeSpans().At(j)
54+
55+
for k := 0; k < scopeSpan.Spans().Len(); k++ {
56+
span := scopeSpan.Spans().At(k)
57+
sCtx := ottlspan.NewTransformContext(span, scopeSpan.Scope(), resourceSpan.Resource(), scopeSpan, resourceSpan)
58+
multiError = errors.Join(multiError, spansSummer.update(ctx, span.Attributes(), sCtx))
59+
60+
for l := 0; l < span.Events().Len(); l++ {
61+
event := span.Events().At(l)
62+
eCtx := ottlspanevent.NewTransformContext(event, span, scopeSpan.Scope(), resourceSpan.Resource(), scopeSpan, resourceSpan)
63+
multiError = errors.Join(multiError, spanEventsSummer.update(ctx, event.Attributes(), eCtx))
64+
}
65+
}
66+
}
67+
68+
if len(spansSummer.sums)+len(spanEventsSummer.sums) == 0 {
69+
continue // don't add an empty resource
70+
}
71+
72+
sumResource := sumMetrics.ResourceMetrics().AppendEmpty()
73+
resourceSpan.Resource().Attributes().CopyTo(sumResource.Resource().Attributes())
74+
75+
sumResource.ScopeMetrics().EnsureCapacity(resourceSpan.ScopeSpans().Len())
76+
sumScope := sumResource.ScopeMetrics().AppendEmpty()
77+
78+
spansSummer.appendMetricsTo(sumScope.Metrics())
79+
spanEventsSummer.appendMetricsTo(sumScope.Metrics())
80+
}
81+
if multiError != nil {
82+
return multiError
83+
}
4484
return c.metricsConsumer.ConsumeMetrics(ctx, sumMetrics)
4585
}
4686

4787
func (c *sum) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
88+
var multiError error
4889
sumMetrics := pmetric.NewMetrics()
4990
sumMetrics.ResourceMetrics().EnsureCapacity(md.ResourceMetrics().Len())
91+
for i := 0; i < md.ResourceMetrics().Len(); i++ {
92+
resourceMetric := md.ResourceMetrics().At(i)
93+
metricsSummer := newSummer[ottlmetric.TransformContext](c.metricsMetricDefs)
94+
dataPointsSummer := newSummer[ottldatapoint.TransformContext](c.dataPointsMetricDefs)
95+
96+
for j := 0; j < resourceMetric.ScopeMetrics().Len(); j++ {
97+
scopeMetrics := resourceMetric.ScopeMetrics().At(j)
98+
99+
for k := 0; k < scopeMetrics.Metrics().Len(); k++ {
100+
metric := scopeMetrics.Metrics().At(k)
101+
mCtx := ottlmetric.NewTransformContext(metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
102+
multiError = errors.Join(multiError, metricsSummer.update(ctx, pcommon.NewMap(), mCtx))
103+
104+
//exhaustive:enforce
105+
// For metric types each must be handled in exactly the same way
106+
// Switch case required because each type calls DataPoints() differently
107+
switch metric.Type() {
108+
case pmetric.MetricTypeGauge:
109+
dps := metric.Gauge().DataPoints()
110+
for i := 0; i < dps.Len(); i++ {
111+
dCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
112+
multiError = errors.Join(multiError, dataPointsSummer.update(ctx, dps.At(i).Attributes(), dCtx))
113+
}
114+
case pmetric.MetricTypeSum:
115+
dps := metric.Sum().DataPoints()
116+
for i := 0; i < dps.Len(); i++ {
117+
dCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
118+
multiError = errors.Join(multiError, dataPointsSummer.update(ctx, dps.At(i).Attributes(), dCtx))
119+
}
120+
case pmetric.MetricTypeSummary:
121+
dps := metric.Summary().DataPoints()
122+
for i := 0; i < dps.Len(); i++ {
123+
dCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
124+
multiError = errors.Join(multiError, dataPointsSummer.update(ctx, dps.At(i).Attributes(), dCtx))
125+
}
126+
case pmetric.MetricTypeHistogram:
127+
dps := metric.Histogram().DataPoints()
128+
for i := 0; i < dps.Len(); i++ {
129+
dCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
130+
multiError = errors.Join(multiError, dataPointsSummer.update(ctx, dps.At(i).Attributes(), dCtx))
131+
}
132+
case pmetric.MetricTypeExponentialHistogram:
133+
dps := metric.ExponentialHistogram().DataPoints()
134+
for i := 0; i < dps.Len(); i++ {
135+
dCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
136+
multiError = errors.Join(multiError, dataPointsSummer.update(ctx, dps.At(i).Attributes(), dCtx))
137+
}
138+
case pmetric.MetricTypeEmpty:
139+
multiError = errors.Join(multiError, fmt.Errorf("metric %q: invalid metric type: %v", metric.Name(), metric.Type()))
140+
}
141+
}
142+
}
50143

144+
if len(metricsSummer.sums)+len(dataPointsSummer.sums) == 0 {
145+
continue // don't add an empty resource
146+
}
147+
148+
sumResource := sumMetrics.ResourceMetrics().AppendEmpty()
149+
resourceMetric.Resource().Attributes().CopyTo(sumResource.Resource().Attributes())
150+
151+
sumResource.ScopeMetrics().EnsureCapacity(resourceMetric.ScopeMetrics().Len())
152+
sumScope := sumResource.ScopeMetrics().AppendEmpty()
153+
154+
metricsSummer.appendMetricsTo(sumScope.Metrics())
155+
dataPointsSummer.appendMetricsTo(sumScope.Metrics())
156+
}
157+
if multiError != nil {
158+
return multiError
159+
}
51160
return c.metricsConsumer.ConsumeMetrics(ctx, sumMetrics)
52161
}
53162

54163
func (c *sum) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
164+
var multiError error
55165
sumMetrics := pmetric.NewMetrics()
56166
sumMetrics.ResourceMetrics().EnsureCapacity(ld.ResourceLogs().Len())
167+
for i := 0; i < ld.ResourceLogs().Len(); i++ {
168+
resourceLog := ld.ResourceLogs().At(i)
169+
summer := newSummer[ottllog.TransformContext](c.logsMetricDefs)
170+
171+
for j := 0; j < resourceLog.ScopeLogs().Len(); j++ {
172+
scopeLogs := resourceLog.ScopeLogs().At(j)
173+
174+
for k := 0; k < scopeLogs.LogRecords().Len(); k++ {
175+
logRecord := scopeLogs.LogRecords().At(k)
176+
177+
lCtx := ottllog.NewTransformContext(logRecord, scopeLogs.Scope(), resourceLog.Resource(), scopeLogs, resourceLog)
178+
multiError = errors.Join(multiError, summer.update(ctx, logRecord.Attributes(), lCtx))
179+
}
180+
}
181+
182+
if len(summer.sums) == 0 {
183+
continue // don't add an empty resource
184+
}
185+
186+
sumResource := sumMetrics.ResourceMetrics().AppendEmpty()
187+
resourceLog.Resource().Attributes().CopyTo(sumResource.Resource().Attributes())
188+
189+
sumResource.ScopeMetrics().EnsureCapacity(resourceLog.ScopeLogs().Len())
190+
sumScope := sumResource.ScopeMetrics().AppendEmpty()
57191

192+
summer.appendMetricsTo(sumScope.Metrics())
193+
}
194+
if multiError != nil {
195+
return multiError
196+
}
58197
return c.metricsConsumer.ConsumeMetrics(ctx, sumMetrics)
59198
}

0 commit comments

Comments
 (0)