Skip to content

signaltometricsconnector: support gauges #40113

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/signaltometrics-gauge-support.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: signaltometricsconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for parsing gauge metrics from any signal types

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [37093]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
40 changes: 39 additions & 1 deletion connector/signaltometricsconnector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ signaltometrics:

`signaltometrics` produces a variety of metric types by utilizing [OTTL](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/README.md)
to extract the relevant data for a metric type from the incoming data. The
component can produce the following metric types for each signal types:
component can produce the following metric types for each signal type:

- [Sum](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#sums)
- [Gauge](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#gauge)
- [Histogram](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#histogram)
- [Exponential Histogram](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram)

Expand All @@ -91,6 +92,43 @@ sum:
[OTTL converters](https://pkg.go.dev/github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottlfuncs#readme-converters)
can be used to transform the data.

#### Gauge

Gauge metrics aggregate the last value of a signal and have the following configuration:

```yaml
gauge:
value: <ottl_value_expression>
```

- [**Required**] `value`represents an OTTL expression to extract a numeric value from
the signal. Only OTTL expressions that return a value are accepted. The returned
value determines the value type of the `gauge` metric (`int` or `double`).
- For logs: Use e.g. `ExtractGrokPatterns` with a single key selector (see below).
- For other signals: Use a field such as `value_int`, `value_double`, or a valid OTTL expression.

**Examples:**

_Logs (with Grok pattern):_
```yaml
signaltometrics:
logs:
- name: logs.memory_mb
description: Extract memory_mb from log records
gauge:
value: ExtractGrokPatterns(body, "Memory usage %{NUMBER:memory_mb:int}MB")["memory_mb"]
```

_Traces:_
```yaml
signaltometrics:
spans:
- name: span.duration.gauge
description: Span duration as gauge
gauge:
value: Int(Seconds(end_time - start_time))
```

#### Histogram

Histogram metrics have the following configurations:
Expand Down
35 changes: 35 additions & 0 deletions connector/signaltometricsconnector/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package config // import "github.com/open-telemetry/opentelemetry-collector-cont
import (
"errors"
"fmt"
"regexp"
"strings"

"github.com/lightstep/go-expohisto/structure"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -35,6 +37,9 @@ var defaultHistogramBuckets = []float64{
2, 4, 6, 8, 10, 50, 100, 200, 400, 800, 1000, 1400, 2000, 5000, 10_000, 15_000,
}

// Regex for [key] selector after ExtractGrokPatterns
var grokPatternKey = regexp.MustCompile(`ExtractGrokPatterns\([^)]*\)\s*\[[^\]]+\]`)

var _ confmap.Unmarshaler = (*Config)(nil)

// Config for the connector. Each configuration field describes the metrics
Expand Down Expand Up @@ -163,6 +168,10 @@ type Sum struct {
Value string `mapstructure:"value"`
}

type Gauge struct {
Value string `mapstructure:"value"`
}

// MetricInfo defines the structure of the metric produced by the connector.
type MetricInfo struct {
Name string `mapstructure:"name"`
Expand All @@ -181,6 +190,7 @@ type MetricInfo struct {
Histogram *Histogram `mapstructure:"histogram"`
ExponentialHistogram *ExponentialHistogram `mapstructure:"exponential_histogram"`
Sum *Sum `mapstructure:"sum"`
Gauge *Gauge `mapstructure:"gauge"`
// prevent unkeyed literal initialization
_ struct{}
}
Expand Down Expand Up @@ -251,6 +261,15 @@ func (mi *MetricInfo) validateSum() error {
return nil
}

func (mi *MetricInfo) validateGauge() error {
if mi.Gauge != nil {
if mi.Gauge.Value == "" {
return errors.New("value must be defined for gauge metrics")
}
}
return nil
}

// validateMetricInfo is an utility method validate all supported metric
// types defined for the metric info including any ottl expressions.
func validateMetricInfo[K any](mi MetricInfo, parser ottl.Parser[K]) error {
Expand All @@ -266,6 +285,9 @@ func validateMetricInfo[K any](mi MetricInfo, parser ottl.Parser[K]) error {
if err := mi.validateSum(); err != nil {
return fmt.Errorf("sum validation failed: %w", err)
}
if err := mi.validateGauge(); err != nil {
return fmt.Errorf("gauge validation failed: %w", err)
}

// Exactly one metric should be defined. Also, validate OTTL expressions,
// note that, here we only evaluate if statements are valid. Check for
Expand Down Expand Up @@ -299,6 +321,19 @@ func validateMetricInfo[K any](mi MetricInfo, parser ottl.Parser[K]) error {
return fmt.Errorf("failed to parse value OTTL expression for summary: %w", err)
}
}
if mi.Gauge != nil {
metricsDefinedCount++
if _, err := parser.ParseValueExpression(mi.Gauge.Value); err != nil {
return fmt.Errorf("failed to parse value OTTL expression for gauge: %w", err)
}
// if ExtractGrokPatterns is used, validate the key selector
if strings.Contains(mi.Gauge.Value, "ExtractGrokPatterns") {
// Ensure a [key] selector is present after ExtractGrokPatterns
if !grokPatternKey.MatchString(mi.Gauge.Value) {
return errors.New("ExtractGrokPatterns: a single key selector[key] is required for signal to gauge")
}
}
}
if metricsDefinedCount != 1 {
return fmt.Errorf("exactly one of the metrics must be defined, %d found", metricsDefinedCount)
}
Expand Down
6 changes: 6 additions & 0 deletions connector/signaltometricsconnector/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ func TestConfig(t *testing.T) {
fullErrorForSignal(t, "profiles", "exactly one of the metrics must be defined"),
},
},
{
path: "invalid_grok_type_map",
errorMsgs: []string{
fullErrorForSignal(t, "logs", "ExtractGrokPatterns: a single key selector[key] is required for signal to gauge"),
},
},
{
path: "invalid_ottl_value_expression",
errorMsgs: []string{
Expand Down
3 changes: 3 additions & 0 deletions connector/signaltometricsconnector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestConnectorWithTraces(t *testing.T) {
"histograms",
"exponential_histograms",
"metric_identity",
"gauge",
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -71,6 +72,7 @@ func TestConnectorWithMetrics(t *testing.T) {
"sum",
"histograms",
"exponential_histograms",
"gauge",
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -104,6 +106,7 @@ func TestConnectorWithLogs(t *testing.T) {
"histograms",
"exponential_histograms",
"metric_identity",
"gauge",
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package aggregator // import "github.com/open-telemetry/opentelemetry-collector-
import (
"context"
"fmt"
"strings"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
Expand All @@ -27,6 +28,7 @@ type Aggregator[K any] struct {
smLookup map[[16]byte]pmetric.ScopeMetrics
valueCounts map[model.MetricKey]map[[16]byte]map[[16]byte]*valueCountDP
sums map[model.MetricKey]map[[16]byte]map[[16]byte]*sumDP
gauges map[model.MetricKey]map[[16]byte]map[[16]byte]*gaugeDP
timestamp time.Time
}

Expand All @@ -37,6 +39,7 @@ func NewAggregator[K any](metrics pmetric.Metrics) *Aggregator[K] {
smLookup: make(map[[16]byte]pmetric.ScopeMetrics),
valueCounts: make(map[model.MetricKey]map[[16]byte]map[[16]byte]*valueCountDP),
sums: make(map[model.MetricKey]map[[16]byte]map[[16]byte]*sumDP),
gauges: make(map[model.MetricKey]map[[16]byte]map[[16]byte]*gaugeDP),
timestamp: time.Now(),
}
}
Expand Down Expand Up @@ -87,6 +90,27 @@ func (a *Aggregator[K]) Aggregate(
v, v,
)
}
case pmetric.MetricTypeGauge:
raw, err := md.Gauge.Value.Eval(ctx, tCtx)
if err != nil {
if strings.Contains(err.Error(), "key not found in map") {
// Gracefully skip missing keys in ExtractGrokPatterns
return nil
}
return fmt.Errorf("failed to execute OTTL value for gauge: %w", err)
}
if raw == nil {
return nil
}
switch v := raw.(type) {
case int64, float64:
return a.aggregateGauge(md, resAttrs, srcAttrs, v)
default:
return fmt.Errorf(
"failed to parse gauge OTTL value of type %T into int64 or float64: %v",
v, v,
)
}
}
return nil
}
Expand Down Expand Up @@ -145,11 +169,27 @@ func (a *Aggregator[K]) Finalize(mds []model.MetricDef[K]) {
dp.Copy(a.timestamp, destCounter.DataPoints().AppendEmpty())
}
}
for resID, dpMap := range a.gauges[md.Key] {
if md.Gauge == nil {
continue
}
metrics := a.smLookup[resID].Metrics()
destMetric := metrics.AppendEmpty()
destMetric.SetName(md.Key.Name)
destMetric.SetUnit(md.Key.Unit)
destMetric.SetDescription(md.Key.Description)
destGauge := destMetric.SetEmptyGauge()
destGauge.DataPoints().EnsureCapacity(len(dpMap))
for _, dp := range dpMap {
dp.Copy(a.timestamp, destGauge.DataPoints().AppendEmpty())
}
}
// If there are two metric defined with the same key required by metricKey
// then they will be aggregated within the same metric and produced
// together. Deleting the key ensures this while preventing duplicates.
delete(a.valueCounts, md.Key)
delete(a.sums, md.Key)
delete(a.gauges, md.Key)
}
}

Expand Down Expand Up @@ -193,6 +233,26 @@ func (a *Aggregator[K]) aggregateDouble(
return nil
}

func (a *Aggregator[K]) aggregateGauge(
md model.MetricDef[K],
resAttrs, srcAttrs pcommon.Map,
v any,
) error {
resID := a.getResourceID(resAttrs)
attrID := pdatautil.MapHash(srcAttrs)
if _, ok := a.gauges[md.Key]; !ok {
a.gauges[md.Key] = make(map[[16]byte]map[[16]byte]*gaugeDP)
}
if _, ok := a.gauges[md.Key][resID]; !ok {
a.gauges[md.Key][resID] = make(map[[16]byte]*gaugeDP)
}
if _, ok := a.gauges[md.Key][resID][attrID]; !ok {
a.gauges[md.Key][resID][attrID] = newGaugeDP(srcAttrs)
}
a.gauges[md.Key][resID][attrID].Aggregate(v)
return nil
}

func (a *Aggregator[K]) aggregateValueCount(
md model.MetricDef[K],
resAttrs, srcAttrs pcommon.Map,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package aggregator // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/internal/aggregator"

import (
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

// gaugeDP is a data point for gauge metrics.
type gaugeDP struct {
attrs pcommon.Map
val any
}

func newGaugeDP(attrs pcommon.Map) *gaugeDP {
return &gaugeDP{
attrs: attrs,
}
}

func (dp *gaugeDP) Aggregate(v any) {
switch v := v.(type) {
case float64, int64:
dp.val = v
default:
panic("unexpected usage of gauge datapoint, only double or int value expected")
}
}

// Copy copies the gauge data point to the destination number data point
func (dp *gaugeDP) Copy(
timestamp time.Time,
dest pmetric.NumberDataPoint,
) {
dp.attrs.CopyTo(dest.Attributes())
switch v := dp.val.(type) {
case float64:
dest.SetDoubleValue(v)
case int64:
dest.SetIntValue(v)
}
// TODO determine appropriate start time
dest.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
}
Loading
Loading