Skip to content

Commit c7d17bb

Browse files
authored
Merge pull request #1 from hkfgo/chore/rebase-nabam-delta-prom
Chore/rebase nabam delta prom
2 parents 1979cbf + b2e18ca commit c7d17bb

File tree

2 files changed

+84
-14
lines changed

2 files changed

+84
-14
lines changed

exporter/prometheusexporter/accumulator.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -241,14 +241,15 @@ func (a *lastValueAccumulator) accumulateHistogram(metric pmetric.Metric, il pco
241241
for i := 0; i < dps.Len(); i++ {
242242
ip := dps.At(i)
243243

244-
signature := timeseriesSignature(il.Name(), metric, ip.Attributes(), resourceAttrs)
244+
signature := timeseriesSignature(il.Name(), metric, ip.Attributes(), resourceAttrs) // uniquely idenity this time series you are accumulating for
245245
if ip.Flags().NoRecordedValue() {
246246
a.registeredMetrics.Delete(signature)
247247
return 0
248248
}
249249

250-
v, ok := a.registeredMetrics.Load(signature)
250+
v, ok := a.registeredMetrics.Load(signature) // a accumulates metric values for all times series. Get value for particular time series
251251
if !ok {
252+
// first data point
252253
m := copyMetricMetadata(metric)
253254
ip.CopyTo(m.SetEmptyHistogram().DataPoints().AppendEmpty())
254255
m.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
@@ -263,7 +264,19 @@ func (a *lastValueAccumulator) accumulateHistogram(metric pmetric.Metric, il pco
263264

264265
switch histogram.AggregationTemporality() {
265266
case pmetric.AggregationTemporalityDelta:
266-
accumulateHistogramValues(mv.value.Histogram().DataPoints().At(0), ip, m.Histogram().DataPoints().AppendEmpty())
267+
if ip.StartTimestamp().AsTime() != mv.value.Histogram().DataPoints().At(0).StartTimestamp().AsTime() {
268+
// treat misalgnment as restart and reset or violation of single-writer principle and drop
269+
if ip.StartTimestamp().AsTime().After(mv.value.Histogram().DataPoints().At(0).Timestamp().AsTime()) {
270+
ip.CopyTo(m.Histogram().DataPoints().AppendEmpty())
271+
} else {
272+
a.logger.With(
273+
zap.String("time_series", signature),
274+
).Warn("Dropped misaligned histogram datapoint")
275+
continue
276+
}
277+
} else {
278+
accumulateHistogramValues(mv.value.Histogram().DataPoints().At(0), ip, m.Histogram().DataPoints().AppendEmpty())
279+
}
267280
case pmetric.AggregationTemporalityCumulative:
268281
if ip.Timestamp().AsTime().Before(mv.value.Histogram().DataPoints().At(0).Timestamp().AsTime()) {
269282
// only keep datapoint with latest timestamp
@@ -336,11 +349,7 @@ func copyMetricMetadata(metric pmetric.Metric) pmetric.Metric {
336349
}
337350

338351
func accumulateHistogramValues(prev, current, dest pmetric.HistogramDataPoint) {
339-
if current.StartTimestamp().AsTime().Before(prev.StartTimestamp().AsTime()) {
340-
dest.SetStartTimestamp(current.StartTimestamp())
341-
} else {
342-
dest.SetStartTimestamp(prev.StartTimestamp())
343-
}
352+
dest.SetStartTimestamp(prev.StartTimestamp())
344353

345354
older := prev
346355
newer := current
@@ -352,9 +361,10 @@ func accumulateHistogramValues(prev, current, dest pmetric.HistogramDataPoint) {
352361
newer.Attributes().CopyTo(dest.Attributes())
353362
dest.SetTimestamp(newer.Timestamp())
354363

364+
// checking for bucket boundary alignment, optionally re-aggregate on newer boundaries
355365
match := true
356366
if older.ExplicitBounds().Len() == newer.ExplicitBounds().Len() {
357-
for i := 0; i < newer.BucketCounts().Len(); i++ {
367+
for i := 0; i < newer.ExplicitBounds().Len(); i++ {
358368
if older.ExplicitBounds().At(i) != newer.ExplicitBounds().At(i) {
359369
match = false
360370
break

exporter/prometheusexporter/accumulator_test.go

Lines changed: 65 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727
)
2828

2929
func TestAccumulateHistogram(t *testing.T) {
30-
appendHistogram := func(ts time.Time, count uint64, sum float64, counts []uint64, bounds []float64, metrics pmetric.MetricSlice) {
30+
appendHistogram := func(startTs time.Time, ts time.Time, count uint64, sum float64, counts []uint64, bounds []float64, metrics pmetric.MetricSlice) {
3131
metric := metrics.AppendEmpty()
3232
metric.SetName("test_metric")
3333
metric.SetEmptyHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
@@ -40,20 +40,26 @@ func TestAccumulateHistogram(t *testing.T) {
4040
dp.Attributes().PutStr("label_1", "1")
4141
dp.Attributes().PutStr("label_2", "2")
4242
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
43+
dp.SetStartTimestamp(pcommon.NewTimestampFromTime(startTs))
4344
}
4445

46+
startTs1 := time.Now().Add(-6 * time.Second)
47+
startTs2 := time.Now().Add(-5 * time.Second)
48+
startTs3 := time.Now()
4549
ts1 := time.Now().Add(-4 * time.Second)
4650
ts2 := time.Now().Add(-3 * time.Second)
4751
ts3 := time.Now().Add(-2 * time.Second)
4852
ts4 := time.Now().Add(-1 * time.Second)
53+
ts5 := time.Now().Add(1 * time.Second)
4954

5055
a := newAccumulator(zap.NewNop(), 1*time.Hour).(*lastValueAccumulator)
5156

5257
resourceMetrics1 := pmetric.NewResourceMetrics()
5358
ilm1 := resourceMetrics1.ScopeMetrics().AppendEmpty()
5459
ilm1.Scope().SetName("test")
55-
appendHistogram(ts3, 5, 2.5, []uint64{1, 3, 1, 0}, []float64{0.1, 0.5, 1, 10}, ilm1.Metrics())
56-
appendHistogram(ts2, 4, 8.3, []uint64{1, 1, 2, 0}, []float64{0.1, 0.5, 1, 10}, ilm1.Metrics())
60+
// counts is one more than explicit bounds to account for the one implicit count/bucket for <=inf
61+
appendHistogram(startTs2, ts3, 5, 2.5, []uint64{1, 3, 1, 0, 0}, []float64{0.1, 0.5, 1, 10}, ilm1.Metrics())
62+
appendHistogram(startTs2, ts2, 4, 8.3, []uint64{1, 1, 2, 0, 0}, []float64{0.1, 0.5, 1, 10}, ilm1.Metrics())
5763

5864
m3 := ilm1.Metrics().At(0).Histogram().DataPoints().At(0)
5965
m2 := ilm1.Metrics().At(1).Histogram().DataPoints().At(0)
@@ -63,16 +69,30 @@ func TestAccumulateHistogram(t *testing.T) {
6369
resourceMetrics2 := pmetric.NewResourceMetrics()
6470
ilm2 := resourceMetrics2.ScopeMetrics().AppendEmpty()
6571
ilm2.Scope().SetName("test")
66-
appendHistogram(ts1, 7, 5, []uint64{3, 1, 1, 0}, []float64{0.1, 0.2, 1, 10}, ilm2.Metrics())
72+
appendHistogram(startTs2, ts1, 7, 5, []uint64{3, 1, 1, 0, 0}, []float64{0.1, 0.2, 1, 10}, ilm2.Metrics())
6773

6874
// add extra buckets
6975
resourceMetrics3 := pmetric.NewResourceMetrics()
7076
ilm3 := resourceMetrics3.ScopeMetrics().AppendEmpty()
7177
ilm3.Scope().SetName("test")
72-
appendHistogram(ts4, 7, 5, []uint64{3, 1, 1, 0, 0}, []float64{0.1, 0.2, 1, 10, 15}, ilm3.Metrics())
78+
appendHistogram(startTs2, ts4, 7, 5, []uint64{3, 1, 1, 0, 0, 0}, []float64{0.1, 0.2, 1, 10, 15}, ilm3.Metrics())
7379

7480
m4 := ilm3.Metrics().At(0).Histogram().DataPoints().At(0)
7581

82+
// misaligned start timestamp, drop
83+
resourceMetrics4 := pmetric.NewResourceMetrics()
84+
ilm4 := resourceMetrics4.ScopeMetrics().AppendEmpty()
85+
ilm4.Scope().SetName("test")
86+
appendHistogram(startTs1, ts5, 4, 8.3, []uint64{1, 1, 2, 0, 0}, []float64{0.1, 0.5, 1, 10}, ilm4.Metrics())
87+
appendHistogram(ts3, ts5, 4, 8.3, []uint64{1, 1, 2, 0, 0}, []float64{0.1, 0.5, 1, 10}, ilm4.Metrics())
88+
89+
// misaligned start timestamp, treat as restart
90+
resourceMetrics5 := pmetric.NewResourceMetrics()
91+
ilm5 := resourceMetrics5.ScopeMetrics().AppendEmpty()
92+
ilm5.Scope().SetName("test")
93+
appendHistogram(startTs3, ts5, 4, 8.3, []uint64{1, 1, 2, 0, 0}, []float64{0.1, 0.5, 1, 10}, ilm5.Metrics())
94+
m5 := ilm5.Metrics().At(0).Histogram().DataPoints().At(0)
95+
7696
t.Run("Accumulate", func(t *testing.T) {
7797
n := a.Accumulate(resourceMetrics1)
7898
require.Equal(t, 2, n)
@@ -133,6 +153,46 @@ func TestAccumulateHistogram(t *testing.T) {
133153
require.Equal(t, m4.ExplicitBounds().At(i), v.ExplicitBounds().At(i))
134154
}
135155
})
156+
t.Run("MisalignedTimestamps/Drop", func(t *testing.T) {
157+
// should reset when different buckets arrive
158+
n := a.Accumulate(resourceMetrics4)
159+
require.Equal(t, 0, n)
160+
161+
m, ok := a.registeredMetrics.Load(signature)
162+
v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0)
163+
require.True(t, ok)
164+
165+
require.Equal(t, m4.Sum(), v.Sum())
166+
require.Equal(t, m4.Count(), v.Count())
167+
168+
for i := 0; i < v.BucketCounts().Len(); i++ {
169+
require.Equal(t, m4.BucketCounts().At(i), v.BucketCounts().At(i))
170+
}
171+
172+
for i := 0; i < v.ExplicitBounds().Len(); i++ {
173+
require.Equal(t, m4.ExplicitBounds().At(i), v.ExplicitBounds().At(i))
174+
}
175+
})
176+
t.Run("MisalignedTimestamps/Reset", func(t *testing.T) {
177+
// reset when start timestamp skips ahead
178+
n := a.Accumulate(resourceMetrics5)
179+
require.Equal(t, 1, n)
180+
181+
m, ok := a.registeredMetrics.Load(signature)
182+
v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0)
183+
require.True(t, ok)
184+
185+
require.Equal(t, m5.Sum(), v.Sum())
186+
require.Equal(t, m5.Count(), v.Count())
187+
188+
for i := 0; i < v.BucketCounts().Len(); i++ {
189+
require.Equal(t, m5.BucketCounts().At(i), v.BucketCounts().At(i))
190+
}
191+
192+
for i := 0; i < v.ExplicitBounds().Len(); i++ {
193+
require.Equal(t, m5.ExplicitBounds().At(i), v.ExplicitBounds().At(i))
194+
}
195+
})
136196
}
137197

138198
func TestAccumulateMetrics(t *testing.T) {

0 commit comments

Comments
 (0)