Skip to content

[connector/spanmetrics] Fix spanmetrics for child spans #36718

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Apr 13, 2025
Merged
22 changes: 22 additions & 0 deletions .chloggen/fix_spanmetrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: connector/spanmetrics
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: This change proposes moving the start timestamp (and last seen timestamp) from the resourceMetrics level to the individual metrics level. This will ensure that each metric has its own accurate start and last seen timestamps, regardless of its relationship to other spans.
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35994]
# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
30 changes: 13 additions & 17 deletions connector/spanmetricsconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ type resourceMetrics struct {
sums metrics.SumMetrics
events metrics.SumMetrics
attributes pcommon.Map
// startTimestamp captures when the first data points for this resource are recorded.
startTimestamp pcommon.Timestamp
// lastSeen captures when the last data points for this resource were recorded.
lastSeen time.Time
}
Expand Down Expand Up @@ -281,8 +279,7 @@ func (p *connectorImp) buildMetrics() pmetric.Metrics {
* - For delta metrics: (T1, T2), (T2, T3), (T3, T4) ...
*/
deltaMetricKeys := make(map[metrics.Key]bool)
startTimeGenerator := func(mk metrics.Key) pcommon.Timestamp {
startTime := rawMetrics.startTimestamp
timeStampGenerator := func(mk metrics.Key, startTime pcommon.Timestamp) pcommon.Timestamp {
if p.config.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta {
if lastTimestamp, ok := p.lastDeltaTimestamps.Get(mk); ok {
startTime = lastTimestamp
Expand All @@ -301,21 +298,21 @@ func (p *connectorImp) buildMetrics() pmetric.Metrics {
sums := rawMetrics.sums
metric := sm.Metrics().AppendEmpty()
metric.SetName(buildMetricName(metricsNamespace, metricNameCalls))
sums.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality())
sums.BuildMetrics(metric, timestamp, timeStampGenerator, p.config.GetAggregationTemporality())

if !p.config.Histogram.Disable {
histograms := rawMetrics.histograms
metric = sm.Metrics().AppendEmpty()
metric.SetName(buildMetricName(metricsNamespace, metricNameDuration))
metric.SetUnit(p.config.Histogram.Unit.String())
histograms.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality())
histograms.BuildMetrics(metric, timestamp, timeStampGenerator, p.config.GetAggregationTemporality())
}

events := rawMetrics.events
if p.events.Enabled {
metric = sm.Metrics().AppendEmpty()
metric.SetName(buildMetricName(metricsNamespace, metricNameEvents))
events.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality())
events.BuildMetrics(metric, timestamp, timeStampGenerator, p.config.GetAggregationTemporality())
}

for mk := range deltaMetricKeys {
Expand Down Expand Up @@ -379,7 +376,7 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
continue
}

rm := p.getOrCreateResourceMetrics(resourceAttr, startTimestamp)
rm := p.getOrCreateResourceMetrics(resourceAttr)
sums := rm.sums
histograms := rm.histograms
events := rm.events
Expand Down Expand Up @@ -408,12 +405,12 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
}
if !p.config.Histogram.Disable {
// aggregate histogram metrics
h := histograms.GetOrCreate(key, attributes)
h := histograms.GetOrCreate(key, attributes, startTimestamp)
p.addExemplar(span, duration, h)
h.Observe(duration)
}
// aggregate sums metrics
s := sums.GetOrCreate(key, attributes)
s := sums.GetOrCreate(key, attributes, startTimestamp)
if p.config.Exemplars.Enabled && !span.TraceID().IsEmpty() {
s.AddExemplar(span.TraceID(), span.SpanID(), duration)
}
Expand All @@ -437,7 +434,7 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
eAttributes = p.buildAttributes(serviceName, span, rscAndEventAttrs, eDimensions, ils.Scope())
p.metricKeyToDimensions.Add(eKey, eAttributes)
}
e := events.GetOrCreate(eKey, eAttributes)
e := events.GetOrCreate(eKey, eAttributes, startTimestamp)
if p.config.Exemplars.Enabled && !span.TraceID().IsEmpty() {
e.AddExemplar(span.TraceID(), span.SpanID(), duration)
}
Expand Down Expand Up @@ -475,16 +472,15 @@ func (p *connectorImp) createResourceKey(attr pcommon.Map) resourceKey {
return pdatautil.MapHash(m)
}

func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map, startTimestamp pcommon.Timestamp) *resourceMetrics {
func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map) *resourceMetrics {
key := p.createResourceKey(attr)
v, ok := p.resourceMetrics.Get(key)
if !ok {
v = &resourceMetrics{
histograms: initHistogramMetrics(p.config),
sums: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint),
events: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint),
attributes: attr,
startTimestamp: startTimestamp,
histograms: initHistogramMetrics(p.config),
sums: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint),
events: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint),
attributes: attr,
}
p.resourceMetrics.Add(key, v)
}
Expand Down
55 changes: 32 additions & 23 deletions connector/spanmetricsconnector/internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
type Key string

type HistogramMetrics interface {
GetOrCreate(key Key, attributes pcommon.Map) Histogram
BuildMetrics(pmetric.Metric, generateStartTimestamp, pcommon.Timestamp, pmetric.AggregationTemporality)
GetOrCreate(key Key, attributes pcommon.Map, startTimestamp pcommon.Timestamp) Histogram
BuildMetrics(pmetric.Metric, pcommon.Timestamp, func(Key, pcommon.Timestamp) pcommon.Timestamp, pmetric.AggregationTemporality)
ClearExemplars()
}

Expand Down Expand Up @@ -47,6 +47,8 @@ type explicitHistogram struct {
bounds []float64

maxExemplarCount *int

startTimestamp pcommon.Timestamp
}

type exponentialHistogram struct {
Expand All @@ -56,9 +58,9 @@ type exponentialHistogram struct {
histogram *structure.Histogram[float64]

maxExemplarCount *int
}

type generateStartTimestamp = func(Key) pcommon.Timestamp
startTimestamp pcommon.Timestamp
}

func NewExponentialHistogramMetrics(maxSize int32, maxExemplarCount *int) HistogramMetrics {
return &exponentialHistogramMetrics{
Expand All @@ -76,7 +78,7 @@ func NewExplicitHistogramMetrics(bounds []float64, maxExemplarCount *int) Histog
}
}

func (m *explicitHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map) Histogram {
func (m *explicitHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map, startTimestamp pcommon.Timestamp) Histogram {
h, ok := m.metrics[key]
if !ok {
h = &explicitHistogram{
Expand All @@ -85,25 +87,26 @@ func (m *explicitHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map)
bounds: m.bounds,
bucketCounts: make([]uint64, len(m.bounds)+1),
maxExemplarCount: m.maxExemplarCount,
startTimestamp: startTimestamp,
}
m.metrics[key] = h
}

return h
}

func (m *explicitHistogramMetrics) BuildMetrics(
metric pmetric.Metric,
startTimestamp generateStartTimestamp,
timestamp pcommon.Timestamp,
startTimeStampGenerator func(Key, pcommon.Timestamp) pcommon.Timestamp,
temporality pmetric.AggregationTemporality,
) {
metric.SetEmptyHistogram().SetAggregationTemporality(temporality)
dps := metric.Histogram().DataPoints()
dps.EnsureCapacity(len(m.metrics))
for k, h := range m.metrics {
dp := dps.AppendEmpty()
dp.SetStartTimestamp(startTimestamp(k))
startTimestamp := startTimeStampGenerator(k, h.startTimestamp)
dp.SetStartTimestamp(startTimestamp)
dp.SetTimestamp(timestamp)
dp.ExplicitBounds().FromRaw(h.bounds)
dp.BucketCounts().FromRaw(h.bucketCounts)
Expand All @@ -123,7 +126,7 @@ func (m *explicitHistogramMetrics) ClearExemplars() {
}
}

func (m *exponentialHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map) Histogram {
func (m *exponentialHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map, startTimeStamp pcommon.Timestamp) Histogram {
h, ok := m.metrics[key]
if !ok {
histogram := new(structure.Histogram[float64])
Expand All @@ -137,32 +140,33 @@ func (m *exponentialHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Ma
attributes: attributes,
exemplars: pmetric.NewExemplarSlice(),
maxExemplarCount: m.maxExemplarCount,
startTimestamp: startTimeStamp,
}
m.metrics[key] = h
}

return h
}

func (m *exponentialHistogramMetrics) BuildMetrics(
metric pmetric.Metric,
startTimestamp generateStartTimestamp,
timestamp pcommon.Timestamp,
startTimeStampGenerator func(Key, pcommon.Timestamp) pcommon.Timestamp,
temporality pmetric.AggregationTemporality,
) {
metric.SetEmptyExponentialHistogram().SetAggregationTemporality(temporality)
dps := metric.ExponentialHistogram().DataPoints()
dps.EnsureCapacity(len(m.metrics))
for k, m := range m.metrics {
for k, e := range m.metrics {
dp := dps.AppendEmpty()
dp.SetStartTimestamp(startTimestamp(k))
startTimestamp := startTimeStampGenerator(k, e.startTimestamp)
dp.SetStartTimestamp(startTimestamp)
dp.SetTimestamp(timestamp)
expoHistToExponentialDataPoint(m.histogram, dp)
for i := 0; i < m.exemplars.Len(); i++ {
m.exemplars.At(i).SetTimestamp(timestamp)
expoHistToExponentialDataPoint(e.histogram, dp)
for i := 0; i < e.exemplars.Len(); i++ {
e.exemplars.At(i).SetTimestamp(timestamp)
}
m.exemplars.CopyTo(dp.Exemplars())
m.attributes.CopyTo(dp.Attributes())
e.exemplars.CopyTo(dp.Exemplars())
e.attributes.CopyTo(dp.Attributes())
}
}

Expand Down Expand Up @@ -237,10 +241,13 @@ func (h *exponentialHistogram) AddExemplar(traceID pcommon.TraceID, spanID pcomm
}

type Sum struct {
attributes pcommon.Map
count uint64
attributes pcommon.Map
count uint64

exemplars pmetric.ExemplarSlice
maxExemplarCount *int

startTimestamp pcommon.Timestamp
// isFirst is used to track if this datapoint is new to the Sum. This
// is used to ensure that new Sum metrics being with 0, and then are incremented
// to the desired value. This avoids Prometheus throwing away the first
Expand All @@ -264,13 +271,14 @@ type SumMetrics struct {
maxExemplarCount *int
}

func (m *SumMetrics) GetOrCreate(key Key, attributes pcommon.Map) *Sum {
func (m *SumMetrics) GetOrCreate(key Key, attributes pcommon.Map, startTimestamp pcommon.Timestamp) *Sum {
s, ok := m.metrics[key]
if !ok {
s = &Sum{
attributes: attributes,
exemplars: pmetric.NewExemplarSlice(),
maxExemplarCount: m.maxExemplarCount,
startTimestamp: startTimestamp,
isFirst: true,
}
m.metrics[key] = s
Expand All @@ -290,8 +298,8 @@ func (s *Sum) AddExemplar(traceID pcommon.TraceID, spanID pcommon.SpanID, value

func (m *SumMetrics) BuildMetrics(
metric pmetric.Metric,
startTimestamp generateStartTimestamp,
timestamp pcommon.Timestamp,
startTimeStampGenerator func(Key, pcommon.Timestamp) pcommon.Timestamp,
temporality pmetric.AggregationTemporality,
) {
metric.SetEmptySum().SetIsMonotonic(true)
Expand All @@ -301,7 +309,8 @@ func (m *SumMetrics) BuildMetrics(
dps.EnsureCapacity(len(m.metrics))
for k, s := range m.metrics {
dp := dps.AppendEmpty()
dp.SetStartTimestamp(startTimestamp(k))
startTimeStamp := startTimeStampGenerator(k, s.startTimestamp)
dp.SetStartTimestamp(startTimeStamp)
dp.SetTimestamp(timestamp)
if s.isFirst {
dp.SetIntValue(0)
Expand Down
Loading