Skip to content

Commit 10f6f07

Browse files
authored
Support exponential histogram in OTLP ingestion (#6071)
* support native histogram in OTLP ingestion Signed-off-by: Ben Ye <[email protected]> * update changelog Signed-off-by: Ben Ye <[email protected]> * fix lint Signed-off-by: Ben Ye <[email protected]> * enable native histograms in OTLP test Signed-off-by: Ben Ye <[email protected]> --------- Signed-off-by: Ben Ye <[email protected]>
1 parent 779dcf4 commit 10f6f07

File tree

5 files changed

+90
-15
lines changed

5 files changed

+90
-15
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
* [CHANGE] Server: Instrument `cortex_request_duration_seconds` metric with native histogram. If `native-histograms` feature is enabled in monitoring Prometheus then the metric name needs to be updated in your dashboards. #6056
77
* [FEATURE] Ingester: Experimental: Enable native histogram ingestion via `-blocks-storage.tsdb.enable-native-histograms` flag. #5986
88
* [FEATURE] Query Frontend: Added a query rejection mechanism to block resource-intensive queries. #6005
9+
* [FEATURE] OTLP: Support ingesting OTLP exponential metrics as native histograms. #6071
910
* [ENHANCEMENT] rulers: Add support to persist tokens in rulers. #5987
1011
* [ENHANCEMENT] Query Frontend/Querier: Added store gateway postings touched count and touched size in Querier stats and log in Query Frontend. #5892
1112
* [ENHANCEMENT] Query Frontend/Querier: Returns `warnings` on prometheus query responses. #5916

integration/e2e/util.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -150,9 +150,8 @@ func GenerateSeries(name string, ts time.Time, additionalLabels ...prompb.Label)
150150
return
151151
}
152152

153-
func GenerateHistogramSeries(name string, ts time.Time, floatHistogram bool, additionalLabels ...prompb.Label) (series []prompb.TimeSeries) {
153+
func GenerateHistogramSeries(name string, ts time.Time, i uint32, floatHistogram bool, additionalLabels ...prompb.Label) (series []prompb.TimeSeries) {
154154
tsMillis := TimeToMilliseconds(ts)
155-
i := rand.Uint32()
156155

157156
lbls := append(
158157
[]prompb.Label{

integration/e2ecortex/client.go

+47-3
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ func getNameAndAttributes(ts prompb.TimeSeries) (string, map[string]any) {
159159
return metricName, attributes
160160
}
161161

162-
func createDatapointsGauge(newMetric pmetric.Metric, attributes map[string]any, samples []prompb.Sample) {
162+
func createDataPointsGauge(newMetric pmetric.Metric, attributes map[string]any, samples []prompb.Sample) {
163163
newMetric.SetEmptyGauge()
164164
for _, sample := range samples {
165165
datapoint := newMetric.Gauge().DataPoints().AppendEmpty()
@@ -172,6 +172,47 @@ func createDatapointsGauge(newMetric pmetric.Metric, attributes map[string]any,
172172
}
173173
}
174174

175+
func createDataPointsExponentialHistogram(newMetric pmetric.Metric, attributes map[string]any, histograms []prompb.Histogram) {
176+
newMetric.SetEmptyExponentialHistogram()
177+
for _, h := range histograms {
178+
datapoint := newMetric.ExponentialHistogram().DataPoints().AppendEmpty()
179+
datapoint.SetTimestamp(pcommon.Timestamp(h.Timestamp * time.Millisecond.Nanoseconds()))
180+
datapoint.SetCount(h.GetCountInt())
181+
datapoint.SetSum(h.GetSum())
182+
datapoint.SetScale(h.GetSchema())
183+
datapoint.SetZeroCount(h.GetZeroCountInt())
184+
datapoint.SetZeroThreshold(h.GetZeroThreshold())
185+
convertBucketLayout(datapoint.Positive(), h.PositiveSpans, h.PositiveDeltas)
186+
convertBucketLayout(datapoint.Negative(), h.NegativeSpans, h.NegativeDeltas)
187+
err := datapoint.Attributes().FromRaw(attributes)
188+
if err != nil {
189+
panic(err)
190+
}
191+
}
192+
}
193+
194+
// convertBucketLayout converts Prometheus remote write bucket layout to Exponential Histogram bucket layout.
195+
func convertBucketLayout(bucket pmetric.ExponentialHistogramDataPointBuckets, spans []prompb.BucketSpan, deltas []int64) {
196+
vals := make([]uint64, 0)
197+
iDelta := 0
198+
count := int64(0)
199+
for i, span := range spans {
200+
if i == 0 {
201+
bucket.SetOffset(span.GetOffset() - 1)
202+
} else {
203+
for j := 0; j < int(span.GetOffset()); j++ {
204+
vals = append(vals, 0)
205+
}
206+
}
207+
for j := 0; j < int(span.Length); j++ {
208+
count += deltas[iDelta]
209+
vals = append(vals, uint64(count))
210+
iDelta++
211+
}
212+
}
213+
bucket.BucketCounts().FromRaw(vals)
214+
}
215+
175216
// Convert Timeseries to Metrics
176217
func convertTimeseriesToMetrics(timeseries []prompb.TimeSeries) pmetric.Metrics {
177218
metrics := pmetric.NewMetrics()
@@ -181,8 +222,11 @@ func convertTimeseriesToMetrics(timeseries []prompb.TimeSeries) pmetric.Metrics
181222
newMetric.SetName(metricName)
182223
//TODO Set description for new metric
183224
//TODO Set unit for new metric
184-
createDatapointsGauge(newMetric, attributes, ts.Samples)
185-
//TODO(friedrichg): Add support for histograms
225+
if len(ts.Samples) > 0 {
226+
createDataPointsGauge(newMetric, attributes, ts.Samples)
227+
} else if len(ts.Histograms) > 0 {
228+
createDataPointsExponentialHistogram(newMetric, attributes, ts.Histograms)
229+
}
186230
}
187231
return metrics
188232
}

integration/native_histogram_test.go

+18-4
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44
package integration
55

66
import (
7+
"math/rand"
78
"testing"
89
"time"
910

1011
"github.com/prometheus/common/model"
1112
"github.com/prometheus/prometheus/prompb"
13+
"github.com/prometheus/prometheus/tsdb/tsdbutil"
1214
"github.com/stretchr/testify/require"
1315

1416
"github.com/cortexproject/cortex/integration/e2e"
@@ -51,14 +53,16 @@ func TestNativeHistogramIngestionAndQuery(t *testing.T) {
5153

5254
seriesTimestamp := time.Now()
5355
series2Timestamp := seriesTimestamp.Add(blockRangePeriod * 2)
54-
series1 := e2e.GenerateHistogramSeries("series_1", seriesTimestamp, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"})
55-
series1Float := e2e.GenerateHistogramSeries("series_1", seriesTimestamp, true, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"})
56+
histogramIdx1 := rand.Uint32()
57+
series1 := e2e.GenerateHistogramSeries("series_1", seriesTimestamp, histogramIdx1, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"})
58+
series1Float := e2e.GenerateHistogramSeries("series_1", seriesTimestamp, histogramIdx1, true, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"})
5659
res, err := c.Push(append(series1, series1Float...))
5760
require.NoError(t, err)
5861
require.Equal(t, 200, res.StatusCode)
5962

60-
series2 := e2e.GenerateHistogramSeries("series_2", series2Timestamp, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"})
61-
series2Float := e2e.GenerateHistogramSeries("series_2", series2Timestamp, true, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"})
63+
histogramIdx2 := rand.Uint32()
64+
series2 := e2e.GenerateHistogramSeries("series_2", series2Timestamp, histogramIdx2, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"})
65+
series2Float := e2e.GenerateHistogramSeries("series_2", series2Timestamp, histogramIdx2, true, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"})
6266
res, err = c.Push(append(series2, series2Float...))
6367
require.NoError(t, err)
6468
require.Equal(t, 200, res.StatusCode)
@@ -96,6 +100,8 @@ func TestNativeHistogramIngestionAndQuery(t *testing.T) {
96100
c, err = e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", "user-1")
97101
require.NoError(t, err)
98102

103+
expectedHistogram1 := tsdbutil.GenerateTestHistogram(int(histogramIdx1))
104+
expectedHistogram2 := tsdbutil.GenerateTestHistogram(int(histogramIdx2))
99105
result, err := c.QueryRange(`series_1`, series2Timestamp.Add(-time.Minute*10), series2Timestamp, time.Second)
100106
require.NoError(t, err)
101107
require.Equal(t, model.ValMatrix, result.Type())
@@ -106,6 +112,8 @@ func TestNativeHistogramIngestionAndQuery(t *testing.T) {
106112
require.NotEmpty(t, ss.Histograms)
107113
for _, h := range ss.Histograms {
108114
require.NotEmpty(t, h)
115+
require.Equal(t, float64(expectedHistogram1.Count), float64(h.Histogram.Count))
116+
require.Equal(t, float64(expectedHistogram1.Sum), float64(h.Histogram.Sum))
109117
}
110118
}
111119

@@ -119,6 +127,8 @@ func TestNativeHistogramIngestionAndQuery(t *testing.T) {
119127
require.NotEmpty(t, ss.Histograms)
120128
for _, h := range ss.Histograms {
121129
require.NotEmpty(t, h)
130+
require.Equal(t, float64(expectedHistogram2.Count), float64(h.Histogram.Count))
131+
require.Equal(t, float64(expectedHistogram2.Sum), float64(h.Histogram.Sum))
122132
}
123133
}
124134

@@ -129,6 +139,8 @@ func TestNativeHistogramIngestionAndQuery(t *testing.T) {
129139
require.Equal(t, 2, v.Len())
130140
for _, s := range v {
131141
require.NotNil(t, s.Histogram)
142+
require.Equal(t, float64(expectedHistogram1.Count), float64(s.Histogram.Count))
143+
require.Equal(t, float64(expectedHistogram1.Sum), float64(s.Histogram.Sum))
132144
}
133145

134146
result, err = c.Query(`series_2`, series2Timestamp)
@@ -138,5 +150,7 @@ func TestNativeHistogramIngestionAndQuery(t *testing.T) {
138150
require.Equal(t, 2, v.Len())
139151
for _, s := range v {
140152
require.NotNil(t, s.Histogram)
153+
require.Equal(t, float64(expectedHistogram2.Count), float64(s.Histogram.Count))
154+
require.Equal(t, float64(expectedHistogram2.Sum), float64(s.Histogram.Sum))
141155
}
142156
}

integration/otlp_test.go

+23-6
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@ package integration
55

66
import (
77
"fmt"
8+
"math/rand"
89
"testing"
910
"time"
1011

1112
"github.com/prometheus/common/model"
1213
"github.com/prometheus/prometheus/prompb"
14+
"github.com/prometheus/prometheus/tsdb/tsdbutil"
1315
"github.com/stretchr/testify/assert"
1416
"github.com/stretchr/testify/require"
1517

@@ -33,11 +35,12 @@ func TestOTLP(t *testing.T) {
3335
// Start Cortex in single binary mode, reading the config from file and overwriting
3436
// the backend config to make it work with Minio.
3537
flags := map[string]string{
36-
"-blocks-storage.s3.access-key-id": e2edb.MinioAccessKey,
37-
"-blocks-storage.s3.secret-access-key": e2edb.MinioSecretKey,
38-
"-blocks-storage.s3.bucket-name": bucketName,
39-
"-blocks-storage.s3.endpoint": fmt.Sprintf("%s-minio-9000:9000", networkName),
40-
"-blocks-storage.s3.insecure": "true",
38+
"-blocks-storage.s3.access-key-id": e2edb.MinioAccessKey,
39+
"-blocks-storage.s3.secret-access-key": e2edb.MinioSecretKey,
40+
"-blocks-storage.s3.bucket-name": bucketName,
41+
"-blocks-storage.s3.endpoint": fmt.Sprintf("%s-minio-9000:9000", networkName),
42+
"-blocks-storage.s3.insecure": "true",
43+
"-blocks-storage.tsdb.enable-native-histograms": "true",
4144
}
4245

4346
cortex := e2ecortex.NewSingleBinaryWithConfigFile("cortex-1", cortexConfigFile, flags, "", 9009, 9095)
@@ -72,5 +75,19 @@ func TestOTLP(t *testing.T) {
7275
_, err = c.QueryRange("series_1", now.Add(-15*time.Minute), now, 15*time.Second)
7376
require.NoError(t, err)
7477

75-
//TODO(friedrichg): test histograms
78+
i := rand.Uint32()
79+
histogramSeries := e2e.GenerateHistogramSeries("histogram_series", now, i, false, prompb.Label{Name: "job", Value: "test"})
80+
res, err = c.Push(histogramSeries)
81+
require.NoError(t, err)
82+
require.Equal(t, 200, res.StatusCode)
83+
84+
result, err = c.Query(`histogram_series`, now)
85+
require.NoError(t, err)
86+
require.Equal(t, model.ValVector, result.Type())
87+
v := result.(model.Vector)
88+
require.Equal(t, 1, v.Len())
89+
expectedHistogram := tsdbutil.GenerateTestHistogram(int(i))
90+
require.NotNil(t, v[0].Histogram)
91+
require.Equal(t, float64(expectedHistogram.Count), float64(v[0].Histogram.Count))
92+
require.Equal(t, expectedHistogram.Sum, float64(v[0].Histogram.Sum))
7693
}

0 commit comments

Comments
 (0)