Skip to content

Commit da17dd7

Browse files
alanprotqinxx108
authored andcommitted
Query Sample Statistics (cortexproject#4708)
* Implementing stats on results_cache * Fix TestResultsCacheRecent * adding some comments * Update Changelog * Add test case with 3 responses * address feedback * Doc update Signed-off-by: Yijie Qin <[email protected]>
1 parent bb6b026 commit da17dd7

13 files changed

+1748
-128
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
* [CHANGE] Distributor: Apply `max_fetched_series_per_query` limit for `/series` API. #4683
88
* [FEATURE] Ruler: Add `external_labels` option to tag all alerts with a given set of labels.
99
* [FEATURE] Compactor: Add `-compactor.skip-blocks-with-out-of-order-chunks-enabled` configuration to mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction
10+
* [FEATURE] Querier/Query-Frontend: Add `-querier.per-step-stats-enabled` and `-frontend.cache-queryable-samples-stats` configurations to enable query sample statistics
1011

1112
## 1.12.0 in progress
1213

docs/blocks-storage/querier.md

+4
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,10 @@ querier:
136136
# CLI flag: -querier.at-modifier-enabled
137137
[at_modifier_enabled: <boolean> | default = false]
138138

139+
# Enable returning samples stats per steps in query response.
140+
# CLI flag: -querier.per-step-stats-enabled
141+
[per_step_stats_enabled: <boolean> | default = false]
142+
139143
# The time after which a metric should be queried from storage and not just
140144
# ingesters. 0 means all queries are sent to store. When running the blocks
141145
# storage, if this option is enabled, the time range of the query sent to the

docs/configuration/config-file-reference.md

+8
Original file line numberDiff line numberDiff line change
@@ -889,6 +889,10 @@ The `querier_config` configures the Cortex querier.
889889
# CLI flag: -querier.at-modifier-enabled
890890
[at_modifier_enabled: <boolean> | default = false]
891891
892+
# Enable returning samples stats per steps in query response.
893+
# CLI flag: -querier.per-step-stats-enabled
894+
[per_step_stats_enabled: <boolean> | default = false]
895+
892896
# The time after which a metric should be queried from storage and not just
893897
# ingesters. 0 means all queries are sent to store. When running the blocks
894898
# storage, if this option is enabled, the time range of the query sent to the
@@ -1153,6 +1157,10 @@ results_cache:
11531157
# CLI flag: -frontend.compression
11541158
[compression: <string> | default = ""]
11551159
1160+
# Cache Statistics queryable samples on results cache.
1161+
# CLI flag: -frontend.cache-queryable-samples-stats
1162+
[cache_queryable_samples_stats: <boolean> | default = false]
1163+
11561164
# Cache query results.
11571165
# CLI flag: -querier.cache-results
11581166
[cache_results: <boolean> | default = false]

pkg/cortex/cortex.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ func (c *Config) Validate(log log.Logger) error {
224224
if err := c.Worker.Validate(log); err != nil {
225225
return errors.Wrap(err, "invalid frontend_worker config")
226226
}
227-
if err := c.QueryRange.Validate(); err != nil {
227+
if err := c.QueryRange.Validate(c.Querier); err != nil {
228228
return errors.Wrap(err, "invalid query_range config")
229229
}
230230
if err := c.TableManager.Validate(); err != nil {

pkg/cortex/modules.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -523,11 +523,12 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
523523
queryrange.PrometheusResponseExtractor{},
524524
t.Cfg.Schema,
525525
promql.EngineOpts{
526-
Logger: util_log.Logger,
527-
Reg: prometheus.DefaultRegisterer,
528-
MaxSamples: t.Cfg.Querier.MaxSamples,
529-
Timeout: t.Cfg.Querier.Timeout,
530-
EnableAtModifier: t.Cfg.Querier.AtModifierEnabled,
526+
Logger: util_log.Logger,
527+
Reg: prometheus.DefaultRegisterer,
528+
MaxSamples: t.Cfg.Querier.MaxSamples,
529+
Timeout: t.Cfg.Querier.Timeout,
530+
EnableAtModifier: t.Cfg.Querier.AtModifierEnabled,
531+
EnablePerStepStats: t.Cfg.Querier.EnablePerStepStats,
531532
NoStepSubqueryIntervalFn: func(int64) int64 {
532533
return t.Cfg.Querier.DefaultEvaluationInterval.Milliseconds()
533534
},

pkg/querier/querier.go

+3
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type Config struct {
4545
QueryIngestersWithin time.Duration `yaml:"query_ingesters_within"`
4646
QueryStoreForLabels bool `yaml:"query_store_for_labels_enabled"`
4747
AtModifierEnabled bool `yaml:"at_modifier_enabled"`
48+
EnablePerStepStats bool `yaml:"per_step_stats_enabled"`
4849

4950
// QueryStoreAfter the time after which queries should also be sent to the store and not just ingesters.
5051
QueryStoreAfter time.Duration `yaml:"query_store_after"`
@@ -92,6 +93,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
9293
f.DurationVar(&cfg.QueryIngestersWithin, "querier.query-ingesters-within", 0, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.")
9394
f.BoolVar(&cfg.QueryStoreForLabels, "querier.query-store-for-labels-enabled", false, "Query long-term store for series, label values and label names APIs. Works only with blocks engine.")
9495
f.BoolVar(&cfg.AtModifierEnabled, "querier.at-modifier-enabled", false, "Enable the @ modifier in PromQL.")
96+
f.BoolVar(&cfg.EnablePerStepStats, "querier.per-step-stats-enabled", false, "Enable returning samples stats per steps in query response.")
9597
f.DurationVar(&cfg.MaxQueryIntoFuture, "querier.max-query-into-future", 10*time.Minute, "Maximum duration into the future you can query. 0 to disable.")
9698
f.DurationVar(&cfg.DefaultEvaluationInterval, "querier.default-evaluation-interval", time.Minute, "The default evaluation interval or step size for subqueries.")
9799
f.DurationVar(&cfg.QueryStoreAfter, "querier.query-store-after", 0, "The time after which a metric should be queried from storage and not just ingesters. 0 means all queries are sent to store. When running the blocks storage, if this option is enabled, the time range of the query sent to the store will be manipulated to ensure the query end is not more recent than 'now - query-store-after'.")
@@ -174,6 +176,7 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor
174176
MaxSamples: cfg.MaxSamples,
175177
Timeout: cfg.Timeout,
176178
LookbackDelta: cfg.LookbackDelta,
179+
EnablePerStepStats: cfg.EnablePerStepStats,
177180
EnableAtModifier: cfg.AtModifierEnabled,
178181
NoStepSubqueryIntervalFn: func(int64) int64 {
179182
return cfg.DefaultEvaluationInterval.Milliseconds()

pkg/querier/queryrange/query_range.go

+93
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"strconv"
1313
"strings"
1414
"time"
15+
"unsafe"
1516

1617
"github.com/gogo/protobuf/proto"
1718
"github.com/gogo/status"
@@ -88,6 +89,10 @@ type Request interface {
8889
proto.Message
8990
// LogToSpan writes information about this request to an OpenTracing span
9091
LogToSpan(opentracing.Span)
92+
// GetStats returns the stats of the request.
93+
GetStats() string
94+
// WithStats clones the current `PrometheusRequest` with a new stats.
95+
WithStats(stats string) Request
9196
}
9297

9398
// Response represents a query range response.
@@ -114,6 +119,13 @@ func (q *PrometheusRequest) WithQuery(query string) Request {
114119
return &new
115120
}
116121

122+
// WithStats clones the current `PrometheusRequest` with a new stats.
123+
func (q *PrometheusRequest) WithStats(stats string) Request {
124+
new := *q
125+
new.Stats = stats
126+
return &new
127+
}
128+
117129
// LogToSpan logs the current `PrometheusRequest` parameters to the specified span.
118130
func (q *PrometheusRequest) LogToSpan(sp opentracing.Span) {
119131
sp.LogFields(
@@ -174,6 +186,7 @@ func (prometheusCodec) MergeResponse(responses ...Response) (Response, error) {
174186
Data: PrometheusData{
175187
ResultType: model.ValMatrix.String(),
176188
Result: matrixMerge(promResponses),
189+
Stats: statsMerge(promResponses),
177190
},
178191
}
179192

@@ -220,6 +233,7 @@ func (prometheusCodec) DecodeRequest(_ context.Context, r *http.Request, forward
220233
}
221234

222235
result.Query = r.FormValue("query")
236+
result.Stats = r.FormValue("stats")
223237
result.Path = r.URL.Path
224238

225239
// Include the specified headers from http request in prometheusRequest.
@@ -252,6 +266,7 @@ func (prometheusCodec) EncodeRequest(ctx context.Context, r Request) (*http.Requ
252266
"end": []string{encodeTime(promReq.End)},
253267
"step": []string{encodeDurationMs(promReq.Step)},
254268
"query": []string{promReq.Query},
269+
"stats": []string{promReq.Stats},
255270
}
256271
u := &url.URL{
257272
Path: promReq.Path,
@@ -380,6 +395,46 @@ func (s *SampleStream) MarshalJSON() ([]byte, error) {
380395
return json.Marshal(stream)
381396
}
382397

398+
// statsMerge merge the stats from 2 responses
399+
// this function is similar to matrixMerge
400+
func statsMerge(resps []*PrometheusResponse) *PrometheusResponseStats {
401+
output := map[int64]*PrometheusResponseQueryableSamplesStatsPerStep{}
402+
hasStats := false
403+
for _, resp := range resps {
404+
if resp.Data.Stats == nil {
405+
continue
406+
}
407+
408+
hasStats = true
409+
if resp.Data.Stats.Samples == nil {
410+
continue
411+
}
412+
413+
for _, s := range resp.Data.Stats.Samples.TotalQueryableSamplesPerStep {
414+
output[s.GetTimestampMs()] = s
415+
}
416+
}
417+
418+
if !hasStats {
419+
return nil
420+
}
421+
422+
keys := make([]int64, 0, len(output))
423+
for key := range output {
424+
keys = append(keys, key)
425+
}
426+
427+
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
428+
429+
result := &PrometheusResponseStats{Samples: &PrometheusResponseSamplesStats{}}
430+
for _, key := range keys {
431+
result.Samples.TotalQueryableSamplesPerStep = append(result.Samples.TotalQueryableSamplesPerStep, output[key])
432+
result.Samples.TotalQueryableSamples += output[key].Value
433+
}
434+
435+
return result
436+
}
437+
383438
func matrixMerge(resps []*PrometheusResponse) []SampleStream {
384439
output := map[string]*SampleStream{}
385440
for _, resp := range resps {
@@ -473,3 +528,41 @@ func decorateWithParamName(err error, field string) error {
473528
}
474529
return fmt.Errorf(errTmpl, field, err)
475530
}
531+
532+
func PrometheusResponseQueryableSamplesStatsPerStepJsoniterDecode(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
533+
if !iter.ReadArray() {
534+
iter.ReportError("queryrange.PrometheusResponseQueryableSamplesStatsPerStep", "expected [")
535+
return
536+
}
537+
538+
t := model.Time(iter.ReadFloat64() * float64(time.Second/time.Millisecond))
539+
540+
if !iter.ReadArray() {
541+
iter.ReportError("queryrange.PrometheusResponseQueryableSamplesStatsPerStep", "expected ,")
542+
return
543+
}
544+
v := iter.ReadInt64()
545+
546+
if iter.ReadArray() {
547+
iter.ReportError("queryrange.PrometheusResponseQueryableSamplesStatsPerStep", "expected ]")
548+
}
549+
550+
*(*PrometheusResponseQueryableSamplesStatsPerStep)(ptr) = PrometheusResponseQueryableSamplesStatsPerStep{
551+
TimestampMs: int64(t),
552+
Value: v,
553+
}
554+
}
555+
556+
func PrometheusResponseQueryableSamplesStatsPerStepJsoniterEncode(ptr unsafe.Pointer, stream *jsoniter.Stream) {
557+
stats := (*PrometheusResponseQueryableSamplesStatsPerStep)(ptr)
558+
stream.WriteArrayStart()
559+
stream.WriteFloat64(float64(stats.TimestampMs) / float64(time.Second/time.Millisecond))
560+
stream.WriteMore()
561+
stream.WriteInt64(stats.Value)
562+
stream.WriteArrayEnd()
563+
}
564+
565+
func init() {
566+
jsoniter.RegisterTypeEncoderFunc("queryrange.PrometheusResponseQueryableSamplesStatsPerStep", PrometheusResponseQueryableSamplesStatsPerStepJsoniterEncode, func(unsafe.Pointer) bool { return false })
567+
jsoniter.RegisterTypeDecoderFunc("queryrange.PrometheusResponseQueryableSamplesStatsPerStep", PrometheusResponseQueryableSamplesStatsPerStepJsoniterDecode)
568+
}

0 commit comments

Comments
 (0)