Skip to content

Commit 95ff5f2

Browse files
authored
[processor/transform] introduce aggregate_on_attribute_value function for metrics (#33423)
**Link to tracking Issue:** #16224 **Changes:** - implemented `aggregate_on_attribute_value` function - tests - documentation **Depends on** #33669 --------- Signed-off-by: odubajDT <[email protected]>
1 parent 25cb194 commit 95ff5f2

7 files changed

+855
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: transformprocessor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "Support aggregating metrics based on their attribute values and substituting the values with a new value."
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [16224]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

processor/transformprocessor/README.md

+48-1
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ In addition to OTTL functions, the processor defines its own functions to help w
220220
- [copy_metric](#copy_metric)
221221
- [scale_metric](#scale_metric)
222222
- [aggregate_on_attributes](#aggregate_on_attributes)
223+
- [aggregate_on_attribute_value](#aggregate_on_attribute_value)
223224

224225
### convert_sum_to_gauge
225226

@@ -374,10 +375,12 @@ Examples:
374375

375376
`aggregate_on_attributes(function, Optional[attributes])`
376377

377-
The `aggregate_on_attributes` function aggregates all datapoints in the metric based on the supplied attributes. `function` is a case-sensitive string that represents the aggregation function and `attributes` is an optional list of attribute keys to aggregate upon.
378+
The `aggregate_on_attributes` function aggregates all datapoints in the metric based on the supplied attributes. `function` is a case-sensitive string that represents the aggregation function and `attributes` is an optional list of attribute keys of type string to aggregate upon.
378379

379380
`aggregate_on_attributes` function removes all attributes that are present in datapoints except the ones that are specified in the `attributes` parameter. If `attributes` parameter is not set, all attributes are removed from datapoints. Afterwards all datapoints are aggregated depending on the attributes left (none or the ones present in the list).
380381

382+
**NOTE:** This function is supported only in `metric` context.
383+
381384
The following metric types can be aggregated:
382385

383386
- sum
@@ -415,6 +418,50 @@ statements:
415418

416419
To aggregate only using a specified set of attributes, you can use `keep_matching_keys`.
417420

421+
### aggregate_on_attribute_value
422+
423+
`aggregate_on_attribute_value(function, attribute, values, newValue)`
424+
425+
The `aggregate_on_attribute_value` function aggregates all datapoints in the metric containing the attribute `attribute` (type string) with one of the values present in the `values` parameter (list of strings) into a single datapoint where the attribute has the value `newValue` (type string). `function` is a case-sensitive string that represents the aggregation function.
426+
427+
**NOTE:** This function is supported only in `metric` context.
428+
429+
The following metric types can be aggregated:
430+
431+
- sum
432+
- gauge
433+
- histogram
434+
- exponential histogram
435+
436+
Supported aggregation functions are:
437+
438+
- sum
439+
- max
440+
- min
441+
- mean
442+
- median
443+
- count
444+
445+
**NOTE:** Only the `sum` agregation function is supported for histogram and exponential histogram datatypes.
446+
447+
Examples:
448+
449+
- `aggregate_on_attribute_value("sum", "attr1", ["val1", "val2"], "new_val") where name == "system.memory.usage"`
450+
451+
The `aggregate_on_attribute_value` function can also be used in conjunction with
452+
[keep_matching_keys](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/ottlfuncs#keep_matching_keys) or
453+
[delete_matching_keys](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/ottlfuncs#delete_matching_keys).
454+
455+
For example, to remove attribute keys matching a regex and aggregate the metrics on the remaining attributes, you can perform the following statement sequence:
456+
457+
```yaml
458+
statements:
459+
- delete_matching_keys(attributes, "(?i).*myRegex.*") where name == "system.memory.usage"
460+
- aggregate_on_attribute_value("sum", "attr1", ["val1", "val2"], "new_val") where name == "system.memory.usage"
461+
```
462+
463+
To aggregate only using a specified set of attributes, you can use `keep_matching_keys`.
464+
418465
## Examples
419466

420467
### Perform transformation if field does not exist
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics"
5+
6+
import (
7+
"context"
8+
"fmt"
9+
10+
"go.opentelemetry.io/collector/pdata/pcommon"
11+
"go.opentelemetry.io/collector/pdata/pmetric"
12+
13+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil"
14+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
15+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric"
16+
)
17+
18+
type aggregateOnAttributeValueArguments struct {
19+
AggregationFunction string
20+
Attribute string
21+
Values []string
22+
NewValue string
23+
}
24+
25+
func newAggregateOnAttributeValueFactory() ottl.Factory[ottlmetric.TransformContext] {
26+
return ottl.NewFactory("aggregate_on_attribute_value", &aggregateOnAttributeValueArguments{}, createAggregateOnAttributeValueFunction)
27+
}
28+
29+
func createAggregateOnAttributeValueFunction(_ ottl.FunctionContext, oArgs ottl.Arguments) (ottl.ExprFunc[ottlmetric.TransformContext], error) {
30+
args, ok := oArgs.(*aggregateOnAttributeValueArguments)
31+
32+
if !ok {
33+
return nil, fmt.Errorf("AggregateOnAttributeValueFactory args must be of type *AggregateOnAttributeValueArguments")
34+
}
35+
36+
t, err := aggregateutil.ConvertToAggregationFunction(args.AggregationFunction)
37+
if err != nil {
38+
return nil, fmt.Errorf("invalid aggregation function: '%s', valid options: %s", err.Error(), aggregateutil.GetSupportedAggregationFunctionsList())
39+
}
40+
41+
return AggregateOnAttributeValue(t, args.Attribute, args.Values, args.NewValue)
42+
}
43+
44+
func AggregateOnAttributeValue(aggregationType aggregateutil.AggregationType, attribute string, values []string, newValue string) (ottl.ExprFunc[ottlmetric.TransformContext], error) {
45+
return func(_ context.Context, tCtx ottlmetric.TransformContext) (any, error) {
46+
metric := tCtx.GetMetric()
47+
48+
aggregateutil.RangeDataPointAttributes(metric, func(attrs pcommon.Map) bool {
49+
val, ok := attrs.Get(attribute)
50+
if !ok {
51+
return true
52+
}
53+
54+
for _, v := range values {
55+
if val.Str() == v {
56+
val.SetStr(newValue)
57+
}
58+
}
59+
return true
60+
})
61+
ag := aggregateutil.AggGroups{}
62+
newMetric := pmetric.NewMetric()
63+
aggregateutil.CopyMetricDetails(metric, newMetric)
64+
aggregateutil.GroupDataPoints(metric, &ag)
65+
aggregateutil.MergeDataPoints(newMetric, aggregationType, ag)
66+
newMetric.MoveTo(metric)
67+
68+
return nil, nil
69+
}, nil
70+
}

0 commit comments

Comments
 (0)