Skip to content

Commit afea4c7

Browse files
committed
Add new query stats metrics to track prometheus querystats
Signed-off-by: SungJin1212 <[email protected]>
1 parent cccfd73 commit afea4c7

File tree

10 files changed

+349
-56
lines changed

10 files changed

+349
-56
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
* [CHANGE] Enable Compactor and Alertmanager in target all. #6204
77
* [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151
88
* [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129
9+
* [ENHANCEMENT] Query Frontend: Add new query stats metrics `cortex_query_total_queryable_samples_total` and `cortex_query_peak_samples` to track totalQueryableSamples and peakSample per user. #6228
910
* [ENHANCEMENT] Ingester: Add `blocks-storage.tsdb.wal-compression-type` to support zstd wal compression type. #6232
1011
* [ENHANCEMENT] Query Frontend: Add info field to query response. #6207
1112
* [ENHANCEMENT] Query Frontend: Add peakSample in query stats response. #6188

pkg/api/handlers.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ func NewQuerierHandler(
226226
// This is used for the stats API which we should not support. Or find other ways to.
227227
prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) { return nil, nil }),
228228
reg,
229-
nil,
229+
querier.StatsRenderer,
230230
false,
231231
nil,
232232
false,

pkg/frontend/transport/handler.go

+29-7
Original file line numberDiff line numberDiff line change
@@ -89,13 +89,15 @@ type Handler struct {
8989
roundTripper http.RoundTripper
9090

9191
// Metrics.
92-
querySeconds *prometheus.CounterVec
93-
querySeries *prometheus.CounterVec
94-
querySamples *prometheus.CounterVec
95-
queryChunkBytes *prometheus.CounterVec
96-
queryDataBytes *prometheus.CounterVec
97-
rejectedQueries *prometheus.CounterVec
98-
activeUsers *util.ActiveUsersCleanupService
92+
querySeconds *prometheus.CounterVec
93+
querySeries *prometheus.CounterVec
94+
querySamples *prometheus.CounterVec
95+
queryProcessedSamples *prometheus.CounterVec
96+
queryPeakSamples *prometheus.HistogramVec
97+
queryChunkBytes *prometheus.CounterVec
98+
queryDataBytes *prometheus.CounterVec
99+
rejectedQueries *prometheus.CounterVec
100+
activeUsers *util.ActiveUsersCleanupService
99101
}
100102

101103
// NewHandler creates a new frontend handler.
@@ -122,6 +124,20 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
122124
Help: "Number of samples fetched to execute a query.",
123125
}, []string{"user"})
124126

127+
// It tracks TotalSamples in https://github.com/prometheus/prometheus/blob/main/util/stats/query_stats.go#L237 for each user.
128+
h.queryProcessedSamples = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
129+
Name: "cortex_query_samples_processed_total",
130+
Help: "Number of samples processed to execute a query.",
131+
}, []string{"user"})
132+
133+
h.queryPeakSamples = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
134+
Name: "cortex_query_peak_samples",
135+
Help: "Highest count of samples considered to execute a query.",
136+
NativeHistogramBucketFactor: 1.1,
137+
NativeHistogramMaxBucketNumber: 100,
138+
NativeHistogramMinResetDuration: 1 * time.Hour,
139+
}, []string{"user"})
140+
125141
h.queryChunkBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
126142
Name: "cortex_query_fetched_chunks_bytes_total",
127143
Help: "Size of all chunks fetched to execute a query in bytes.",
@@ -144,6 +160,8 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
144160
h.querySeconds.DeleteLabelValues(user)
145161
h.querySeries.DeleteLabelValues(user)
146162
h.querySamples.DeleteLabelValues(user)
163+
h.queryProcessedSamples.DeleteLabelValues(user)
164+
h.queryPeakSamples.DeleteLabelValues(user)
147165
h.queryChunkBytes.DeleteLabelValues(user)
148166
h.queryDataBytes.DeleteLabelValues(user)
149167
if err := util.DeleteMatchingLabels(h.rejectedQueries, map[string]string{"user": user}); err != nil {
@@ -301,6 +319,8 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
301319
numSeries := stats.LoadFetchedSeries()
302320
numChunks := stats.LoadFetchedChunks()
303321
numSamples := stats.LoadFetchedSamples()
322+
numProcessedSamples := stats.LoadProcessedSamples()
323+
numPeakSamples := stats.LoadPeakSamples()
304324
numChunkBytes := stats.LoadFetchedChunkBytes()
305325
numDataBytes := stats.LoadFetchedDataBytes()
306326
numStoreGatewayTouchedPostings := stats.LoadStoreGatewayTouchedPostings()
@@ -313,6 +333,8 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
313333
f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds())
314334
f.querySeries.WithLabelValues(userID).Add(float64(numSeries))
315335
f.querySamples.WithLabelValues(userID).Add(float64(numSamples))
336+
f.queryProcessedSamples.WithLabelValues(userID).Add(float64(numProcessedSamples))
337+
f.queryPeakSamples.WithLabelValues(userID).Observe(float64(numPeakSamples))
316338
f.queryChunkBytes.WithLabelValues(userID).Add(float64(numChunkBytes))
317339
f.queryDataBytes.WithLabelValues(userID).Add(float64(numDataBytes))
318340
f.activeUsers.UpdateUserTimestamp(userID, time.Now())

pkg/frontend/transport/handler_test.go

+14-12
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
188188
{
189189
name: "test handler with stats enabled",
190190
cfg: HandlerConfig{QueryStatsEnabled: true},
191-
expectedMetrics: 4,
191+
expectedMetrics: 6,
192192
roundTripperFunc: roundTripper,
193193
expectedStatusCode: http.StatusOK,
194194
},
@@ -202,7 +202,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
202202
{
203203
name: "test handler with reasonResponseTooLarge",
204204
cfg: HandlerConfig{QueryStatsEnabled: true},
205-
expectedMetrics: 4,
205+
expectedMetrics: 6,
206206
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
207207
return &http.Response{
208208
StatusCode: http.StatusRequestEntityTooLarge,
@@ -218,7 +218,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
218218
{
219219
name: "test handler with reasonTooManyRequests",
220220
cfg: HandlerConfig{QueryStatsEnabled: true},
221-
expectedMetrics: 4,
221+
expectedMetrics: 6,
222222
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
223223
return &http.Response{
224224
StatusCode: http.StatusTooManyRequests,
@@ -234,7 +234,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
234234
{
235235
name: "test handler with reasonTooManySamples",
236236
cfg: HandlerConfig{QueryStatsEnabled: true},
237-
expectedMetrics: 4,
237+
expectedMetrics: 6,
238238
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
239239
return &http.Response{
240240
StatusCode: http.StatusUnprocessableEntity,
@@ -250,7 +250,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
250250
{
251251
name: "test handler with reasonTooLongRange",
252252
cfg: HandlerConfig{QueryStatsEnabled: true},
253-
expectedMetrics: 4,
253+
expectedMetrics: 6,
254254
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
255255
return &http.Response{
256256
StatusCode: http.StatusUnprocessableEntity,
@@ -266,7 +266,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
266266
{
267267
name: "test handler with reasonSeriesFetched",
268268
cfg: HandlerConfig{QueryStatsEnabled: true},
269-
expectedMetrics: 4,
269+
expectedMetrics: 6,
270270
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
271271
return &http.Response{
272272
StatusCode: http.StatusUnprocessableEntity,
@@ -282,7 +282,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
282282
{
283283
name: "test handler with reasonChunksFetched",
284284
cfg: HandlerConfig{QueryStatsEnabled: true},
285-
expectedMetrics: 4,
285+
expectedMetrics: 6,
286286
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
287287
return &http.Response{
288288
StatusCode: http.StatusUnprocessableEntity,
@@ -298,7 +298,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
298298
{
299299
name: "test handler with reasonChunkBytesFetched",
300300
cfg: HandlerConfig{QueryStatsEnabled: true},
301-
expectedMetrics: 4,
301+
expectedMetrics: 6,
302302
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
303303
return &http.Response{
304304
StatusCode: http.StatusUnprocessableEntity,
@@ -314,7 +314,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
314314
{
315315
name: "test handler with reasonDataBytesFetched",
316316
cfg: HandlerConfig{QueryStatsEnabled: true},
317-
expectedMetrics: 4,
317+
expectedMetrics: 6,
318318
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
319319
return &http.Response{
320320
StatusCode: http.StatusUnprocessableEntity,
@@ -330,7 +330,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
330330
{
331331
name: "test handler with reasonSeriesLimitStoreGateway",
332332
cfg: HandlerConfig{QueryStatsEnabled: true},
333-
expectedMetrics: 4,
333+
expectedMetrics: 6,
334334
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
335335
return &http.Response{
336336
StatusCode: http.StatusUnprocessableEntity,
@@ -346,7 +346,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
346346
{
347347
name: "test handler with reasonChunksLimitStoreGateway",
348348
cfg: HandlerConfig{QueryStatsEnabled: true},
349-
expectedMetrics: 4,
349+
expectedMetrics: 6,
350350
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
351351
return &http.Response{
352352
StatusCode: http.StatusUnprocessableEntity,
@@ -362,7 +362,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
362362
{
363363
name: "test handler with reasonBytesLimitStoreGateway",
364364
cfg: HandlerConfig{QueryStatsEnabled: true},
365-
expectedMetrics: 4,
365+
expectedMetrics: 6,
366366
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
367367
return &http.Response{
368368
StatusCode: http.StatusUnprocessableEntity,
@@ -395,6 +395,8 @@ func TestHandler_ServeHTTP(t *testing.T) {
395395
"cortex_query_fetched_series_total",
396396
"cortex_query_samples_total",
397397
"cortex_query_fetched_chunks_bytes_total",
398+
"cortex_query_samples_processed_total",
399+
"cortex_query_peak_samples_total",
398400
)
399401

400402
assert.NoError(t, err)

pkg/querier/stats/stats.go

+42
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,46 @@ func (s *QueryStats) LoadStoreGatewayTouchedPostingBytes() uint64 {
302302
return atomic.LoadUint64(&s.StoreGatewayTouchedPostingBytes)
303303
}
304304

305+
func (s *QueryStats) AddProcessedSamples(count uint64) {
306+
if s == nil {
307+
return
308+
}
309+
310+
atomic.AddUint64(&s.ProcessedSamples, count)
311+
}
312+
313+
func (s *QueryStats) LoadProcessedSamples() uint64 {
314+
if s == nil {
315+
return 0
316+
}
317+
318+
return atomic.LoadUint64(&s.ProcessedSamples)
319+
}
320+
321+
func (s *QueryStats) AddPeakSamples(count uint64) {
322+
if s == nil {
323+
return
324+
}
325+
326+
atomic.AddUint64(&s.PeakSamples, count)
327+
}
328+
329+
func (s *QueryStats) SetPeakSamples(count uint64) {
330+
if s == nil {
331+
return
332+
}
333+
334+
atomic.StoreUint64(&s.PeakSamples, count)
335+
}
336+
337+
func (s *QueryStats) LoadPeakSamples() uint64 {
338+
if s == nil {
339+
return 0
340+
}
341+
342+
return atomic.LoadUint64(&s.PeakSamples)
343+
}
344+
305345
// Merge the provided Stats into this one.
306346
func (s *QueryStats) Merge(other *QueryStats) {
307347
if s == nil || other == nil {
@@ -317,6 +357,8 @@ func (s *QueryStats) Merge(other *QueryStats) {
317357
s.AddFetchedChunks(other.LoadFetchedChunks())
318358
s.AddStoreGatewayTouchedPostings(other.LoadStoreGatewayTouchedPostings())
319359
s.AddStoreGatewayTouchedPostingBytes(other.LoadStoreGatewayTouchedPostingBytes())
360+
s.AddProcessedSamples(other.LoadProcessedSamples())
361+
s.SetPeakSamples(max(s.LoadPeakSamples(), other.LoadPeakSamples()))
320362
s.AddExtraFields(other.LoadExtraFields()...)
321363
}
322364

0 commit comments

Comments
 (0)