Skip to content

Commit 9b1ba23

Browse files
authored
Add support for native histograms in querier protobuf codec (#6368)
1 parent 784578a commit 9b1ba23

File tree

4 files changed

+727
-147
lines changed

4 files changed

+727
-147
lines changed

integration/native_histogram_test.go

+143-129
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package integration
55

66
import (
7+
"fmt"
78
"math/rand"
89
"testing"
910
"time"
@@ -21,136 +22,149 @@ import (
2122
func TestNativeHistogramIngestionAndQuery(t *testing.T) {
2223
const blockRangePeriod = 5 * time.Second
2324

24-
s, err := e2e.NewScenario(networkName)
25-
require.NoError(t, err)
26-
defer s.Close()
27-
28-
// Configure the blocks storage to frequently compact TSDB head
29-
// and ship blocks to the storage.
30-
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
31-
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
32-
"-blocks-storage.tsdb.ship-interval": "1s",
33-
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
34-
"-blocks-storage.tsdb.enable-native-histograms": "true",
35-
})
36-
37-
// Start dependencies.
38-
consul := e2edb.NewConsul()
39-
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
40-
require.NoError(t, s.StartAndWaitReady(consul, minio))
41-
42-
// Start Cortex components for the write path.
43-
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
44-
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
45-
require.NoError(t, s.StartAndWaitReady(distributor, ingester))
46-
47-
// Wait until the distributor has updated the ring.
48-
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
49-
50-
// Push some series to Cortex.
51-
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1")
52-
require.NoError(t, err)
53-
54-
seriesTimestamp := time.Now()
55-
series2Timestamp := seriesTimestamp.Add(blockRangePeriod * 2)
56-
histogramIdx1 := rand.Uint32()
57-
series1 := e2e.GenerateHistogramSeries("series_1", seriesTimestamp, histogramIdx1, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"})
58-
series1Float := e2e.GenerateHistogramSeries("series_1", seriesTimestamp, histogramIdx1, true, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"})
59-
res, err := c.Push(append(series1, series1Float...))
60-
require.NoError(t, err)
61-
require.Equal(t, 200, res.StatusCode)
62-
63-
histogramIdx2 := rand.Uint32()
64-
series2 := e2e.GenerateHistogramSeries("series_2", series2Timestamp, histogramIdx2, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"})
65-
series2Float := e2e.GenerateHistogramSeries("series_2", series2Timestamp, histogramIdx2, true, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"})
66-
res, err = c.Push(append(series2, series2Float...))
67-
require.NoError(t, err)
68-
require.Equal(t, 200, res.StatusCode)
69-
70-
// Wait until the TSDB head is compacted and shipped to the storage.
71-
// The shipped block contains the 2 series from `series_1` and `series_2` will be in head.
72-
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_shipper_uploads_total"))
73-
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(4), "cortex_ingester_memory_series_created_total"))
74-
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series_removed_total"))
75-
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series"))
76-
77-
queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", flags, "")
78-
require.NoError(t, s.Start(queryFrontend))
79-
80-
// Start the querier and store-gateway, and configure them to frequently sync blocks fast enough to trigger consistency check.
81-
storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
82-
"-blocks-storage.bucket-store.sync-interval": "5s",
83-
}), "")
84-
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
85-
"-blocks-storage.bucket-store.sync-interval": "1s",
86-
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
87-
}), "")
88-
require.NoError(t, s.StartAndWaitReady(querier, storeGateway))
89-
90-
// Wait until the querier and store-gateway have updated the ring, and wait until the blocks are old enough for consistency check
91-
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512*2), "cortex_ring_tokens_total"))
92-
require.NoError(t, storeGateway.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
93-
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(4), []string{"cortex_querier_blocks_scan_duration_seconds"}, e2e.WithMetricCount))
94-
95-
// Sleep 3 * bucket sync interval to make sure consistency checker
96-
// doesn't consider block is uploaded recently.
97-
time.Sleep(3 * time.Second)
98-
99-
// Query back the series.
100-
c, err = e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", "user-1")
101-
require.NoError(t, err)
102-
103-
expectedHistogram1 := tsdbutil.GenerateTestHistogram(int(histogramIdx1))
104-
expectedHistogram2 := tsdbutil.GenerateTestHistogram(int(histogramIdx2))
105-
result, err := c.QueryRange(`series_1`, series2Timestamp.Add(-time.Minute*10), series2Timestamp, time.Second)
106-
require.NoError(t, err)
107-
require.Equal(t, model.ValMatrix, result.Type())
108-
m := result.(model.Matrix)
109-
require.Equal(t, 2, m.Len())
110-
for _, ss := range m {
111-
require.Empty(t, ss.Values)
112-
require.NotEmpty(t, ss.Histograms)
113-
for _, h := range ss.Histograms {
114-
require.NotEmpty(t, h)
115-
require.Equal(t, float64(expectedHistogram1.Count), float64(h.Histogram.Count))
116-
require.Equal(t, float64(expectedHistogram1.Sum), float64(h.Histogram.Sum))
117-
}
25+
configs := []map[string]string{
26+
{
27+
"-api.querier-default-codec": "json",
28+
},
29+
{
30+
"-api.querier-default-codec": "protobuf",
31+
},
11832
}
11933

120-
result, err = c.QueryRange(`series_2`, series2Timestamp.Add(-time.Minute*10), series2Timestamp, time.Second)
121-
require.NoError(t, err)
122-
require.Equal(t, model.ValMatrix, result.Type())
123-
m = result.(model.Matrix)
124-
require.Equal(t, 2, m.Len())
125-
for _, ss := range m {
126-
require.Empty(t, ss.Values)
127-
require.NotEmpty(t, ss.Histograms)
128-
for _, h := range ss.Histograms {
129-
require.NotEmpty(t, h)
130-
require.Equal(t, float64(expectedHistogram2.Count), float64(h.Histogram.Count))
131-
require.Equal(t, float64(expectedHistogram2.Sum), float64(h.Histogram.Sum))
132-
}
133-
}
134-
135-
result, err = c.Query(`series_1`, series2Timestamp)
136-
require.NoError(t, err)
137-
require.Equal(t, model.ValVector, result.Type())
138-
v := result.(model.Vector)
139-
require.Equal(t, 2, v.Len())
140-
for _, s := range v {
141-
require.NotNil(t, s.Histogram)
142-
require.Equal(t, float64(expectedHistogram1.Count), float64(s.Histogram.Count))
143-
require.Equal(t, float64(expectedHistogram1.Sum), float64(s.Histogram.Sum))
144-
}
145-
146-
result, err = c.Query(`series_2`, series2Timestamp)
147-
require.NoError(t, err)
148-
require.Equal(t, model.ValVector, result.Type())
149-
v = result.(model.Vector)
150-
require.Equal(t, 2, v.Len())
151-
for _, s := range v {
152-
require.NotNil(t, s.Histogram)
153-
require.Equal(t, float64(expectedHistogram2.Count), float64(s.Histogram.Count))
154-
require.Equal(t, float64(expectedHistogram2.Sum), float64(s.Histogram.Sum))
34+
for _, config := range configs {
35+
t.Run(fmt.Sprintf("native histograms with %s codec", config["-api.querier-default-codec"]), func(t *testing.T) {
36+
s, err := e2e.NewScenario(networkName)
37+
require.NoError(t, err)
38+
defer s.Close()
39+
40+
// Configure the blocks storage to frequently compact TSDB head
41+
// and ship blocks to the storage.
42+
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
43+
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
44+
"-blocks-storage.tsdb.ship-interval": "1s",
45+
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
46+
"-blocks-storage.tsdb.enable-native-histograms": "true",
47+
})
48+
49+
// Start dependencies.
50+
consul := e2edb.NewConsul()
51+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
52+
require.NoError(t, s.StartAndWaitReady(consul, minio))
53+
54+
// Start Cortex components for the write path.
55+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
56+
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
57+
require.NoError(t, s.StartAndWaitReady(distributor, ingester))
58+
59+
// Wait until the distributor has updated the ring.
60+
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
61+
62+
// Push some series to Cortex.
63+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1")
64+
require.NoError(t, err)
65+
66+
seriesTimestamp := time.Now()
67+
series2Timestamp := seriesTimestamp.Add(blockRangePeriod * 2)
68+
histogramIdx1 := rand.Uint32()
69+
series1 := e2e.GenerateHistogramSeries("series_1", seriesTimestamp, histogramIdx1, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"})
70+
series1Float := e2e.GenerateHistogramSeries("series_1", seriesTimestamp, histogramIdx1, true, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"})
71+
res, err := c.Push(append(series1, series1Float...))
72+
require.NoError(t, err)
73+
require.Equal(t, 200, res.StatusCode)
74+
75+
histogramIdx2 := rand.Uint32()
76+
series2 := e2e.GenerateHistogramSeries("series_2", series2Timestamp, histogramIdx2, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"})
77+
series2Float := e2e.GenerateHistogramSeries("series_2", series2Timestamp, histogramIdx2, true, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"})
78+
res, err = c.Push(append(series2, series2Float...))
79+
require.NoError(t, err)
80+
require.Equal(t, 200, res.StatusCode)
81+
82+
// Wait until the TSDB head is compacted and shipped to the storage.
83+
// The shipped block contains the 2 series from `series_1` and `series_2` will be in head.
84+
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_shipper_uploads_total"))
85+
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(4), "cortex_ingester_memory_series_created_total"))
86+
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series_removed_total"))
87+
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series"))
88+
89+
queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", mergeFlags(flags, config), "")
90+
require.NoError(t, s.Start(queryFrontend))
91+
92+
// Start the querier and store-gateway, and configure them to frequently sync blocks fast enough to trigger consistency check.
93+
storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
94+
"-blocks-storage.bucket-store.sync-interval": "5s",
95+
}), "")
96+
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
97+
"-blocks-storage.bucket-store.sync-interval": "1s",
98+
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
99+
}), "")
100+
require.NoError(t, s.StartAndWaitReady(querier, storeGateway))
101+
102+
// Wait until the querier and store-gateway have updated the ring, and wait until the blocks are old enough for consistency check
103+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512*2), "cortex_ring_tokens_total"))
104+
require.NoError(t, storeGateway.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
105+
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(4), []string{"cortex_querier_blocks_scan_duration_seconds"}, e2e.WithMetricCount))
106+
107+
// Sleep 3 * bucket sync interval to make sure consistency checker
108+
// doesn't consider block is uploaded recently.
109+
time.Sleep(3 * time.Second)
110+
111+
// Query back the series.
112+
c, err = e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", "user-1")
113+
require.NoError(t, err)
114+
115+
expectedHistogram1 := tsdbutil.GenerateTestHistogram(int(histogramIdx1))
116+
expectedHistogram2 := tsdbutil.GenerateTestHistogram(int(histogramIdx2))
117+
result, err := c.QueryRange(`series_1`, series2Timestamp.Add(-time.Minute*10), series2Timestamp, time.Second)
118+
require.NoError(t, err)
119+
require.Equal(t, model.ValMatrix, result.Type())
120+
m := result.(model.Matrix)
121+
require.Equal(t, 2, m.Len())
122+
for _, ss := range m {
123+
require.Empty(t, ss.Values)
124+
require.NotEmpty(t, ss.Histograms)
125+
for _, h := range ss.Histograms {
126+
require.NotEmpty(t, h)
127+
require.Equal(t, float64(expectedHistogram1.Count), float64(h.Histogram.Count))
128+
require.Equal(t, float64(expectedHistogram1.Sum), float64(h.Histogram.Sum))
129+
}
130+
}
131+
132+
result, err = c.QueryRange(`series_2`, series2Timestamp.Add(-time.Minute*10), series2Timestamp, time.Second)
133+
require.NoError(t, err)
134+
require.Equal(t, model.ValMatrix, result.Type())
135+
m = result.(model.Matrix)
136+
require.Equal(t, 2, m.Len())
137+
for _, ss := range m {
138+
require.Empty(t, ss.Values)
139+
require.NotEmpty(t, ss.Histograms)
140+
for _, h := range ss.Histograms {
141+
require.NotEmpty(t, h)
142+
require.Equal(t, float64(expectedHistogram2.Count), float64(h.Histogram.Count))
143+
require.Equal(t, float64(expectedHistogram2.Sum), float64(h.Histogram.Sum))
144+
}
145+
}
146+
147+
result, err = c.Query(`series_1`, series2Timestamp)
148+
require.NoError(t, err)
149+
require.Equal(t, model.ValVector, result.Type())
150+
v := result.(model.Vector)
151+
require.Equal(t, 2, v.Len())
152+
for _, s := range v {
153+
require.NotNil(t, s.Histogram)
154+
require.Equal(t, float64(expectedHistogram1.Count), float64(s.Histogram.Count))
155+
require.Equal(t, float64(expectedHistogram1.Sum), float64(s.Histogram.Sum))
156+
}
157+
158+
result, err = c.Query(`series_2`, series2Timestamp)
159+
require.NoError(t, err)
160+
require.Equal(t, model.ValVector, result.Type())
161+
v = result.(model.Vector)
162+
require.Equal(t, 2, v.Len())
163+
for _, s := range v {
164+
require.NotNil(t, s.Histogram)
165+
require.Equal(t, float64(expectedHistogram2.Count), float64(s.Histogram.Count))
166+
require.Equal(t, float64(expectedHistogram2.Sum), float64(s.Histogram.Sum))
167+
}
168+
})
155169
}
156170
}

0 commit comments

Comments
 (0)