Skip to content

Commit ae204d9

Browse files
authored
[prometheusremotewritereceiver] add exponential histograms datapoints (#39864)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This PR partially implements the native histograms from prometheus v2 to the Otel exponential histograms. For now, it's missing some implementations that I take care on a next PR. Like: - [x] Deal with scale - [x] Deal with positive and negative spans - [ ] Maybe exemplars (I'm think that we can deal with exemplars in a next PR) - [x] Deal with positive and negative buckets <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Related to #37277
1 parent 47d322d commit ae204d9

File tree

3 files changed

+630
-13
lines changed

3 files changed

+630
-13
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: 'enhancement'
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: prometheusremotewritereceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add exponential histograms datapoints to the prometheusremotewritereceiver
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [37277]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/prometheusremotewritereceiver/receiver.go

Lines changed: 179 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,11 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr
353353
sum.SetIsMonotonic(true)
354354
sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
355355
case writev2.Metadata_METRIC_TYPE_HISTOGRAM:
356-
metric.SetEmptyHistogram()
356+
// Histograms that comes with samples are considered as classic histograms and are not supported.
357+
if len(ts.Samples) == 0 {
358+
hist := metric.SetEmptyExponentialHistogram()
359+
hist.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
360+
}
357361
case writev2.Metadata_METRIC_TYPE_SUMMARY:
358362
metric.SetEmptySummary()
359363
}
@@ -374,7 +378,10 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr
374378
case writev2.Metadata_METRIC_TYPE_COUNTER:
375379
addNumberDatapoints(metric.Sum().DataPoints(), ls, ts, &stats)
376380
case writev2.Metadata_METRIC_TYPE_HISTOGRAM:
377-
addHistogramDatapoints(metric.Histogram().DataPoints(), ls, ts)
381+
// Histograms that comes with samples are considered as classic histograms and are not supported.
382+
if len(ts.Samples) == 0 {
383+
addExponentialHistogramDatapoints(metric.ExponentialHistogram().DataPoints(), ls, ts, &stats)
384+
}
378385
case writev2.Metadata_METRIC_TYPE_SUMMARY:
379386
addSummaryDatapoints(metric.Summary().DataPoints(), ls, ts)
380387
default:
@@ -413,24 +420,183 @@ func addNumberDatapoints(datapoints pmetric.NumberDataPointSlice, ls labels.Labe
413420
dp.SetDoubleValue(sample.Value)
414421

415422
attributes := dp.Attributes()
416-
for _, l := range ls {
417-
if l.Name == "instance" || l.Name == "job" || // Become resource attributes
418-
l.Name == labels.MetricName || // Becomes metric name
419-
l.Name == "otel_scope_name" || l.Name == "otel_scope_version" { // Becomes scope name and version
420-
continue
421-
}
422-
attributes.PutStr(l.Name, l.Value)
423-
}
424-
stats.Samples++
423+
extractAttributes(ls).CopyTo(attributes)
425424
}
425+
stats.Samples += len(ts.Samples)
426426
}
427427

428428
func addSummaryDatapoints(_ pmetric.SummaryDataPointSlice, _ labels.Labels, _ writev2.TimeSeries) {
429429
// TODO: Implement this function
430430
}
431431

432-
func addHistogramDatapoints(_ pmetric.HistogramDataPointSlice, _ labels.Labels, _ writev2.TimeSeries) {
433-
// TODO: Implement this function
432+
func addExponentialHistogramDatapoints(datapoints pmetric.ExponentialHistogramDataPointSlice, ls labels.Labels, ts writev2.TimeSeries, stats *promremote.WriteResponseStats) {
433+
for _, histogram := range ts.Histograms {
434+
// Drop histograms with RESET_HINT_GAUGE or negative counts.
435+
if histogram.ResetHint == writev2.Histogram_RESET_HINT_GAUGE || hasNegativeCounts(histogram) {
436+
continue
437+
}
438+
439+
// If we reach here, the histogram passed validation - proceed with conversion
440+
dp := datapoints.AppendEmpty()
441+
dp.SetStartTimestamp(pcommon.Timestamp(ts.CreatedTimestamp * int64(time.Millisecond)))
442+
dp.SetTimestamp(pcommon.Timestamp(histogram.Timestamp * int64(time.Millisecond)))
443+
444+
// The difference between float and integer histograms is that float histograms are stored as absolute counts
445+
// while integer histograms are stored as deltas.
446+
if histogram.IsFloatHistogram() {
447+
// Float histograms
448+
if len(histogram.PositiveSpans) > 0 {
449+
dp.Positive().SetOffset(histogram.PositiveSpans[0].Offset - 1) // -1 because OTEL offset are for the lower bound, not the upper bound
450+
convertAbsoluteBuckets(histogram.PositiveSpans, histogram.PositiveCounts, dp.Positive().BucketCounts())
451+
}
452+
if len(histogram.NegativeSpans) > 0 {
453+
dp.Negative().SetOffset(histogram.NegativeSpans[0].Offset - 1) // -1 because OTEL offset are for the lower bound, not the upper bound
454+
convertAbsoluteBuckets(histogram.NegativeSpans, histogram.NegativeCounts, dp.Negative().BucketCounts())
455+
}
456+
457+
dp.SetScale(histogram.Schema)
458+
dp.SetZeroThreshold(histogram.ZeroThreshold)
459+
zeroCountFloat := histogram.GetZeroCountFloat()
460+
dp.SetZeroCount(uint64(zeroCountFloat))
461+
dp.SetSum(histogram.Sum)
462+
countFloat := histogram.GetCountFloat()
463+
dp.SetCount(uint64(countFloat))
464+
} else {
465+
// Integer histograms
466+
if len(histogram.PositiveSpans) > 0 {
467+
dp.Positive().SetOffset(histogram.PositiveSpans[0].Offset - 1) // -1 because OTEL offset are for the lower bound, not the upper bound
468+
convertDeltaBuckets(histogram.PositiveSpans, histogram.PositiveDeltas, dp.Positive().BucketCounts())
469+
}
470+
if len(histogram.NegativeSpans) > 0 {
471+
dp.Negative().SetOffset(histogram.NegativeSpans[0].Offset - 1) // -1 because OTEL offset are for the lower bound, not the upper bound
472+
convertDeltaBuckets(histogram.NegativeSpans, histogram.NegativeDeltas, dp.Negative().BucketCounts())
473+
}
474+
475+
dp.SetScale(histogram.Schema)
476+
dp.SetZeroThreshold(histogram.ZeroThreshold)
477+
zeroCountInt := histogram.GetZeroCountInt()
478+
dp.SetZeroCount(zeroCountInt)
479+
dp.SetSum(histogram.Sum)
480+
countInt := histogram.GetCountInt()
481+
dp.SetCount(countInt)
482+
}
483+
484+
attributes := dp.Attributes()
485+
stats.Histograms++
486+
extractAttributes(ls).CopyTo(attributes)
487+
}
488+
}
489+
490+
// hasNegativeCounts checks if a histogram has any negative counts
491+
func hasNegativeCounts(histogram writev2.Histogram) bool {
492+
if histogram.IsFloatHistogram() {
493+
// Check overall count
494+
if histogram.GetCountFloat() < 0 {
495+
return true
496+
}
497+
498+
// Check zero count
499+
if histogram.GetZeroCountFloat() < 0 {
500+
return true
501+
}
502+
503+
// Check positive bucket counts
504+
for _, count := range histogram.PositiveCounts {
505+
if count < 0 {
506+
return true
507+
}
508+
}
509+
510+
// Check negative bucket counts
511+
for _, count := range histogram.NegativeCounts {
512+
if count < 0 {
513+
return true
514+
}
515+
}
516+
} else {
517+
// Integer histograms
518+
var absolute int64
519+
for _, delta := range histogram.NegativeDeltas {
520+
absolute += delta
521+
if absolute < 0 {
522+
return true
523+
}
524+
}
525+
526+
absolute = 0
527+
for _, delta := range histogram.PositiveDeltas {
528+
absolute += delta
529+
if absolute < 0 {
530+
return true
531+
}
532+
}
533+
}
534+
535+
return false
536+
}
537+
538+
// convertDeltaBuckets converts Prometheus native histogram spans and deltas to OpenTelemetry bucket counts
539+
// For integer buckets, the values are deltas between the buckets. i.e a bucket list of 1,2,-2 would correspond to a bucket count of 1,3,1
540+
func convertDeltaBuckets(spans []writev2.BucketSpan, deltas []int64, buckets pcommon.UInt64Slice) {
541+
// The total capacity is the sum of the deltas and the offsets of the spans.
542+
totalCapacity := len(deltas)
543+
for _, span := range spans {
544+
totalCapacity += int(span.Offset)
545+
}
546+
buckets.EnsureCapacity(totalCapacity)
547+
548+
bucketIdx := 0
549+
bucketCount := int64(0)
550+
for spanIdx, span := range spans {
551+
if spanIdx > 0 {
552+
for i := int32(0); i < span.Offset; i++ {
553+
buckets.Append(uint64(0))
554+
}
555+
}
556+
for i := uint32(0); i < span.Length; i++ {
557+
bucketCount += deltas[bucketIdx]
558+
bucketIdx++
559+
buckets.Append(uint64(bucketCount))
560+
}
561+
}
562+
}
563+
564+
// convertAbsoluteBuckets converts Prometheus native histogram spans and absolute counts to OpenTelemetry bucket counts
565+
// For float buckets, the values are absolute counts, and must be 0 or positive.
566+
func convertAbsoluteBuckets(spans []writev2.BucketSpan, counts []float64, buckets pcommon.UInt64Slice) {
567+
// The total capacity is the sum of the counts and the offsets of the spans.
568+
totalCapacity := len(counts)
569+
for _, span := range spans {
570+
totalCapacity += int(span.Offset)
571+
}
572+
buckets.EnsureCapacity(totalCapacity)
573+
574+
bucketIdx := 0
575+
for spanIdx, span := range spans {
576+
if spanIdx > 0 {
577+
for i := int32(0); i < span.Offset; i++ {
578+
buckets.Append(uint64(0))
579+
}
580+
}
581+
for i := uint32(0); i < span.Length; i++ {
582+
buckets.Append(uint64(counts[bucketIdx]))
583+
bucketIdx++
584+
}
585+
}
586+
}
587+
588+
// extractAttributes return all attributes different from job, instance, metric name and scope name/version
589+
func extractAttributes(ls labels.Labels) pcommon.Map {
590+
attrs := pcommon.NewMap()
591+
for _, l := range ls {
592+
if l.Name == "instance" || l.Name == "job" || // Become resource attributes
593+
l.Name == labels.MetricName || // Becomes metric name
594+
l.Name == "otel_scope_name" || l.Name == "otel_scope_version" { // Becomes scope name and version
595+
continue
596+
}
597+
attrs.PutStr(l.Name, l.Value)
598+
}
599+
return attrs
434600
}
435601

436602
// extractScopeInfo extracts the scope name and version from the labels. If the labels do not contain the scope name/version,

0 commit comments

Comments
 (0)