Skip to content

Commit 9395f36

Browse files
authored
[connector/spanmetrics] Produce delta temporality span metrics with timestamps representing an uninterrupted series (#31780)
Closes #31671 **Description:** Currently delta temporality span metrics are produced with (StartTimestamp, Timestamp)'s of `(T1, T2), (T3, T4) ...`. However, the [specification](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#temporality) says that the correct pattern for an uninterrupted delta series is `(T1, T2), (T2, T3) ...` This misalignment with the spec can confuse downstream components' conversion from delta temporality to cumulative temporality, causing each data point to be viewed as a cumulative counter "reset". An example of this is in `prometheusexporter` The conversion issue forces you to generate cumulative span metrics, which use significantly more memory to cache the cumulative counts. At work, I applied this patch to our collectors and switched to producing delta temporality metrics for `prometheusexporter` to then convert to cumulative. That caused a significant drop in-memory usage: ![image](https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/17691679/804d0792-1085-400e-a4e3-d64fb865cd4f) **Testing:** - Unit tests asserting the timestamps - Manual testing with `prometheusexporter` to make sure counter values are cumulative and no longer being reset after receiving each delta data point
1 parent d9ebb84 commit 9395f36

File tree

12 files changed

+371
-39
lines changed

12 files changed

+371
-39
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: spanmetrics
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Produce delta temporality span metrics with StartTimeUnixNano and TimeUnixNano values representing an uninterrupted series
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [31671, 30688]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: This allows producing delta span metrics instead of the more memory-intensive cumulative metrics, specifically when a downstream component can convert the delta metrics to cumulative.
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

connector/spanmetricsconnector/README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,9 @@ The following settings can be optionally configured:
114114
- `namespace`: Defines the namespace of the generated metrics. If `namespace` provided, generated metric name will be added `namespace.` prefix.
115115
- `metrics_flush_interval` (default: `60s`): Defines the flush interval of the generated metrics.
116116
- `metrics_expiration` (default: `0`): Defines the expiration time as `time.Duration`, after which, if no new spans are received, metrics will no longer be exported. Setting to `0` means the metrics will never expire (default behavior).
117+
- `metric_timestamp_cache_size` (default `1000`): Only relevant for delta temporality span metrics. Controls the size of the cache used to keep track of a metric's TimestampUnixNano the last time it was flushed. When a metric is evicted from the cache, its next data point will indicate a "reset" in the series. Downstream components converting from delta to cumulative, like `prometheusexporter`, may handle these resets by setting cumulative counters back to 0.
117118
- `exemplars`: Use to configure how to attach exemplars to metrics.
118-
- `enabled` (default: `false`): enabling will add spans as Exemplars to all metrics. Exemplars are only kept for one flush interval.
119+
- `enabled` (default: `false`): enabling will add spans as Exemplars to all metrics. Exemplars are only kept for one flush interval.rom the cache, its next data point will indicate a "reset" in the series. Downstream components converting from delta to cumulative, like `prometheusexporter`, may handle these resets by setting cumulative counters back to 0.
119120
- `events`: Use to configure the events metric.
120121
- `enabled`: (default: `false`): enabling will add the events metric.
121122
- `dimensions`: (mandatory if `enabled`) the list of the span's event attributes to add as dimensions to the events metric, which will be included _on top of_ the common and configured `dimensions` for span and resource attributes.

connector/spanmetricsconnector/config.go

+19
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ var defaultHistogramBucketsMs = []float64{
2323
2, 4, 6, 8, 10, 50, 100, 200, 400, 800, 1000, 1400, 2000, 5000, 10_000, 15_000,
2424
}
2525

26+
var defaultDeltaTimestampCacheSize = 1000
27+
2628
// Dimension defines the dimension name and optional default value if the Dimension is missing from a span attribute.
2729
type Dimension struct {
2830
Name string `mapstructure:"name"`
@@ -71,6 +73,9 @@ type Config struct {
7173
// Default value (0) means that the metrics will never expire.
7274
MetricsExpiration time.Duration `mapstructure:"metrics_expiration"`
7375

76+
// TimestampCacheSize controls the size of the cache used to keep track of delta metrics' TimestampUnixNano the last time it was flushed
77+
TimestampCacheSize *int `mapstructure:"metric_timestamp_cache_size"`
78+
7479
// Namespace is the namespace of the metrics emitted by the connector.
7580
Namespace string `mapstructure:"namespace"`
7681

@@ -139,6 +144,13 @@ func (c Config) Validate() error {
139144
return fmt.Errorf("invalid metrics_expiration: %v, the duration should be positive", c.MetricsExpiration)
140145
}
141146

147+
if c.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta && c.GetDeltaTimestampCacheSize() <= 0 {
148+
return fmt.Errorf(
149+
"invalid delta timestamp cache size: %v, the maximum number of the items in the cache should be positive",
150+
c.GetDeltaTimestampCacheSize(),
151+
)
152+
}
153+
142154
return nil
143155
}
144156

@@ -151,6 +163,13 @@ func (c Config) GetAggregationTemporality() pmetric.AggregationTemporality {
151163
return pmetric.AggregationTemporalityCumulative
152164
}
153165

166+
func (c Config) GetDeltaTimestampCacheSize() int {
167+
if c.TimestampCacheSize != nil {
168+
return *c.TimestampCacheSize
169+
}
170+
return defaultDeltaTimestampCacheSize
171+
}
172+
154173
// validateDimensions checks duplicates for reserved dimensions and additional dimensions.
155174
func validateDimensions(dimensions []Dimension) error {
156175
labelNames := make(map[string]struct{})

connector/spanmetricsconnector/config_test.go

+36-3
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@ func TestLoadConfig(t *testing.T) {
2727

2828
defaultMethod := "GET"
2929
defaultMaxPerDatapoint := 5
30+
customTimestampCacheSize := 123
3031
tests := []struct {
31-
id component.ID
32-
expected component.Config
33-
errorMessage string
32+
id component.ID
33+
expected component.Config
34+
errorMessage string
35+
extraAssertions func(config *Config)
3436
}{
3537
{
3638
id: component.NewIDWithName(metadata.Type, "default"),
@@ -125,6 +127,34 @@ func TestLoadConfig(t *testing.T) {
125127
Histogram: HistogramConfig{Disable: false, Unit: defaultUnit},
126128
},
127129
},
130+
{
131+
id: component.NewIDWithName(metadata.Type, "custom_delta_timestamp_cache_size"),
132+
expected: &Config{
133+
AggregationTemporality: "AGGREGATION_TEMPORALITY_DELTA",
134+
TimestampCacheSize: &customTimestampCacheSize,
135+
DimensionsCacheSize: defaultDimensionsCacheSize,
136+
ResourceMetricsCacheSize: defaultResourceMetricsCacheSize,
137+
MetricsFlushInterval: 60 * time.Second,
138+
Histogram: HistogramConfig{Disable: false, Unit: defaultUnit},
139+
},
140+
},
141+
{
142+
id: component.NewIDWithName(metadata.Type, "default_delta_timestamp_cache_size"),
143+
expected: &Config{
144+
AggregationTemporality: "AGGREGATION_TEMPORALITY_DELTA",
145+
DimensionsCacheSize: defaultDimensionsCacheSize,
146+
ResourceMetricsCacheSize: defaultResourceMetricsCacheSize,
147+
MetricsFlushInterval: 60 * time.Second,
148+
Histogram: HistogramConfig{Disable: false, Unit: defaultUnit},
149+
},
150+
extraAssertions: func(config *Config) {
151+
assert.Equal(t, defaultDeltaTimestampCacheSize, config.GetDeltaTimestampCacheSize())
152+
},
153+
},
154+
{
155+
id: component.NewIDWithName(metadata.Type, "invalid_delta_timestamp_cache_size"),
156+
errorMessage: "invalid delta timestamp cache size: 0, the maximum number of the items in the cache should be positive",
157+
},
128158
}
129159

130160
for _, tt := range tests {
@@ -143,6 +173,9 @@ func TestLoadConfig(t *testing.T) {
143173
}
144174
assert.NoError(t, component.ValidateConfig(cfg))
145175
assert.Equal(t, tt.expected, cfg)
176+
if tt.extraAssertions != nil {
177+
tt.extraAssertions(cfg.(*Config))
178+
}
146179
})
147180
}
148181
}

connector/spanmetricsconnector/connector.go

+46-6
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"sync"
1010
"time"
1111

12+
"github.com/hashicorp/golang-lru/v2/simplelru"
1213
"github.com/lightstep/go-expohisto/structure"
1314
"github.com/tilinna/clock"
1415
"go.opentelemetry.io/collector/component"
@@ -72,6 +73,9 @@ type connectorImp struct {
7273
eDimensions []dimension
7374

7475
events EventsConfig
76+
77+
// Tracks the last TimestampUnixNano for delta metrics so that they represent an uninterrupted series. Unused for cumulative span metrics.
78+
lastDeltaTimestamps *simplelru.LRU[metrics.Key, pcommon.Timestamp]
7579
}
7680

7781
type resourceMetrics struct {
@@ -125,6 +129,16 @@ func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Tic
125129
resourceMetricsKeyAttributes[attr] = s
126130
}
127131

132+
var lastDeltaTimestamps *simplelru.LRU[metrics.Key, pcommon.Timestamp]
133+
if cfg.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta {
134+
lastDeltaTimestamps, err = simplelru.NewLRU[metrics.Key, pcommon.Timestamp](cfg.GetDeltaTimestampCacheSize(), func(k metrics.Key, _ pcommon.Timestamp) {
135+
logger.Info("Evicting cached delta timestamp", zap.String("key", string(k)))
136+
})
137+
if err != nil {
138+
return nil, err
139+
}
140+
}
141+
128142
return &connectorImp{
129143
logger: logger,
130144
config: *cfg,
@@ -133,6 +147,7 @@ func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Tic
133147
dimensions: newDimensions(cfg.Dimensions),
134148
keyBuf: bytes.NewBuffer(make([]byte, 0, 1024)),
135149
metricKeyToDimensions: metricKeyToDimensionsCache,
150+
lastDeltaTimestamps: lastDeltaTimestamps,
136151
ticker: ticker,
137152
done: make(chan struct{}),
138153
eDimensions: newDimensions(cfg.Events.Dimensions),
@@ -251,6 +266,7 @@ func (p *connectorImp) exportMetrics(ctx context.Context) {
251266
// buildMetrics collects the computed raw metrics data and builds OTLP metrics.
252267
func (p *connectorImp) buildMetrics() pmetric.Metrics {
253268
m := pmetric.NewMetrics()
269+
timestamp := pcommon.NewTimestampFromTime(time.Now())
254270

255271
p.resourceMetrics.ForEach(func(_ resourceKey, rawMetrics *resourceMetrics) {
256272
rm := m.ResourceMetrics().AppendEmpty()
@@ -259,23 +275,46 @@ func (p *connectorImp) buildMetrics() pmetric.Metrics {
259275
sm := rm.ScopeMetrics().AppendEmpty()
260276
sm.Scope().SetName("spanmetricsconnector")
261277

278+
/**
279+
* To represent an uninterrupted stream of metrics as per the spec, the (StartTimestamp, Timestamp)'s of successive data points should be:
280+
* - For cumulative metrics: (T1, T2), (T1, T3), (T1, T4) ...
281+
* - For delta metrics: (T1, T2), (T2, T3), (T3, T4) ...
282+
*/
283+
deltaMetricKeys := make(map[metrics.Key]bool)
284+
startTimeGenerator := func(mk metrics.Key) pcommon.Timestamp {
285+
startTime := rawMetrics.startTimestamp
286+
if p.config.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta {
287+
if lastTimestamp, ok := p.lastDeltaTimestamps.Get(mk); ok {
288+
startTime = lastTimestamp
289+
}
290+
// Collect lastDeltaTimestamps keys that need to be updated. Metrics can share the same key, so defer the update.
291+
deltaMetricKeys[mk] = true
292+
}
293+
return startTime
294+
}
295+
262296
sums := rawMetrics.sums
263297
metric := sm.Metrics().AppendEmpty()
264298
metric.SetName(buildMetricName(p.config.Namespace, metricNameCalls))
265-
sums.BuildMetrics(metric, rawMetrics.startTimestamp, p.config.GetAggregationTemporality())
299+
sums.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality())
266300
if !p.config.Histogram.Disable {
267301
histograms := rawMetrics.histograms
268302
metric = sm.Metrics().AppendEmpty()
269303
metric.SetName(buildMetricName(p.config.Namespace, metricNameDuration))
270304
metric.SetUnit(p.config.Histogram.Unit.String())
271-
histograms.BuildMetrics(metric, rawMetrics.startTimestamp, p.config.GetAggregationTemporality())
305+
histograms.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality())
272306
}
273307

274308
events := rawMetrics.events
275309
if p.events.Enabled {
276310
metric = sm.Metrics().AppendEmpty()
277311
metric.SetName(buildMetricName(p.config.Namespace, metricNameEvents))
278-
events.BuildMetrics(metric, rawMetrics.startTimestamp, p.config.GetAggregationTemporality())
312+
events.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality())
313+
}
314+
315+
for mk := range deltaMetricKeys {
316+
// For delta metrics, cache the current data point's timestamp, which will be the start timestamp for the next data points in the series
317+
p.lastDeltaTimestamps.Add(mk, timestamp)
279318
}
280319
})
281320

@@ -326,6 +365,7 @@ func (p *connectorImp) resetState() {
326365
// and span metadata such as name, kind, status_code and any additional
327366
// dimensions the user has configured.
328367
func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
368+
startTimestamp := pcommon.NewTimestampFromTime(time.Now())
329369
for i := 0; i < traces.ResourceSpans().Len(); i++ {
330370
rspans := traces.ResourceSpans().At(i)
331371
resourceAttr := rspans.Resource().Attributes()
@@ -334,7 +374,7 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
334374
continue
335375
}
336376

337-
rm := p.getOrCreateResourceMetrics(resourceAttr)
377+
rm := p.getOrCreateResourceMetrics(resourceAttr, startTimestamp)
338378
sums := rm.sums
339379
histograms := rm.histograms
340380
events := rm.events
@@ -431,7 +471,7 @@ func (p *connectorImp) createResourceKey(attr pcommon.Map) resourceKey {
431471
return pdatautil.MapHash(m)
432472
}
433473

434-
func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map) *resourceMetrics {
474+
func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map, startTimestamp pcommon.Timestamp) *resourceMetrics {
435475
key := p.createResourceKey(attr)
436476
v, ok := p.resourceMetrics.Get(key)
437477
if !ok {
@@ -440,7 +480,7 @@ func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map) *resourceMet
440480
sums: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint),
441481
events: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint),
442482
attributes: attr,
443-
startTimestamp: pcommon.NewTimestampFromTime(time.Now()),
483+
startTimestamp: startTimestamp,
444484
}
445485
p.resourceMetrics.Add(key, v)
446486
}

0 commit comments

Comments
 (0)