Skip to content

Commit 3ba0eb9

Browse files
authored
Add query limiter support to parquet querier (#6870)
* add query limiter support to parquet querier Signed-off-by: yeya24 <[email protected]> * add unit tests Signed-off-by: yeya24 <[email protected]> * changelog Signed-off-by: yeya24 <[email protected]> * use upstream code instead of fork Signed-off-by: yeya24 <[email protected]> * add dedicated limit configurations for new parquet storage limits Signed-off-by: yeya24 <[email protected]> * address comments Signed-off-by: yeya24 <[email protected]> * fix test Signed-off-by: yeya24 <[email protected]> --------- Signed-off-by: yeya24 <[email protected]>
1 parent 5bb96d0 commit 3ba0eb9

File tree

12 files changed

+524
-24
lines changed

12 files changed

+524
-24
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
* [ENHANCEMENT] Store Gateway: Allow to ignore syncing blocks older than certain time using `ignore_blocks_before`. #6830
5656
* [ENHANCEMENT] Distributor: Add native histograms max sample size bytes limit validation. #6834
5757
* [ENHANCEMENT] Querier: Support caching parquet labels file in parquet queryable. #6835
58+
* [ENHANCEMENT] Querier: Support query limits in parquet queryable. #6870
5859
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
5960
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
6061
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ require (
8383
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
8484
github.com/oklog/ulid/v2 v2.1.1
8585
github.com/parquet-go/parquet-go v0.25.1
86-
github.com/prometheus-community/parquet-common v0.0.0-20250708210438-f89902fcd994
86+
github.com/prometheus-community/parquet-common v0.0.0-20250710090957-8fdc99f06643
8787
github.com/prometheus/procfs v0.16.1
8888
github.com/sercand/kuberesolver/v5 v5.1.1
8989
github.com/tjhop/slog-gokit v0.1.4

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -814,8 +814,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr
814814
github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s=
815815
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
816816
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
817-
github.com/prometheus-community/parquet-common v0.0.0-20250708210438-f89902fcd994 h1:xHR2Xex5XWYl5rQKObX8sVqykPXzlL0Rytd9mKo0sss=
818-
github.com/prometheus-community/parquet-common v0.0.0-20250708210438-f89902fcd994/go.mod h1:zJNGzMKctJoOESjRVaNTlPis3C9VcY3cRzNxj6ll3Is=
817+
github.com/prometheus-community/parquet-common v0.0.0-20250710090957-8fdc99f06643 h1:XoOXq+q+CcY8MZqAVoPtdG3R6o84aeZpZFDM+C9DJXg=
818+
github.com/prometheus-community/parquet-common v0.0.0-20250710090957-8fdc99f06643/go.mod h1:zJNGzMKctJoOESjRVaNTlPis3C9VcY3cRzNxj6ll3Is=
819819
github.com/prometheus-community/prom-label-proxy v0.11.1 h1:jX+m+BQCNM0z3/P6V6jVxbiDKgugvk91SaICD6bVhT4=
820820
github.com/prometheus-community/prom-label-proxy v0.11.1/go.mod h1:uTeQW+wZ/VPV1LL3IPfvUE++wR2nPLex+Y4RE38Cpis=
821821
github.com/prometheus/alertmanager v0.28.1 h1:BK5pCoAtaKg01BYRUJhEDV1tqJMEtYBGzPw8QdvnnvA=

pkg/querier/error_translate_queryable.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"github.com/gogo/status"
77
"github.com/pkg/errors"
8+
"github.com/prometheus-community/parquet-common/search"
89
"github.com/prometheus/prometheus/model/labels"
910
"github.com/prometheus/prometheus/promql"
1011
"github.com/prometheus/prometheus/storage"
@@ -48,6 +49,11 @@ func TranslateToPromqlAPIError(err error) error {
4849
return err // 422
4950
}
5051

52+
if search.IsResourceExhausted(err) {
53+
cause := errors.Cause(err)
54+
return cause // 422
55+
}
56+
5157
s, ok := status.FromError(err)
5258

5359
if !ok {

pkg/querier/error_translate_queryable_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/grafana/regexp"
1212
"github.com/pkg/errors"
13+
"github.com/prometheus-community/parquet-common/search"
1314
"github.com/prometheus/client_golang/prometheus"
1415
"github.com/prometheus/common/promslog"
1516
"github.com/prometheus/common/route"
@@ -82,6 +83,12 @@ func TestApiStatusCodes(t *testing.T) {
8283
expectedCode: 422,
8384
},
8485

86+
{
87+
err: search.NewQuota(1).Reserve(2),
88+
expectedString: "resource exhausted (used 1)",
89+
expectedCode: 422,
90+
},
91+
8592
// 505 is translated to 500
8693
{
8794
err: httpgrpc.Errorf(http.StatusHTTPVersionNotSupported, "test"),

pkg/querier/parquet_queryable.go

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@ import (
2323
"github.com/thanos-io/thanos/pkg/strutil"
2424
"golang.org/x/sync/errgroup"
2525

26+
"github.com/cortexproject/cortex/pkg/cortexpb"
2627
"github.com/cortexproject/cortex/pkg/storage/bucket"
2728
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
2829
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
2930
"github.com/cortexproject/cortex/pkg/tenant"
3031
"github.com/cortexproject/cortex/pkg/util"
32+
"github.com/cortexproject/cortex/pkg/util/limiter"
3133
util_log "github.com/cortexproject/cortex/pkg/util/log"
3234
"github.com/cortexproject/cortex/pkg/util/multierror"
3335
"github.com/cortexproject/cortex/pkg/util/services"
@@ -132,6 +134,62 @@ func NewParquetQueryable(
132134

133135
cDecoder := schema.NewPrometheusParquetChunksDecoder(chunkenc.NewPool())
134136

137+
parquetQueryableOpts := []queryable.QueryableOpts{
138+
queryable.WithRowCountLimitFunc(func(ctx context.Context) int64 {
139+
// Ignore error as this shouldn't happen.
140+
// If failed to resolve tenant we will just use the default limit value.
141+
userID, _ := tenant.TenantID(ctx)
142+
return int64(limits.ParquetMaxFetchedRowCount(userID))
143+
}),
144+
queryable.WithChunkBytesLimitFunc(func(ctx context.Context) int64 {
145+
// Ignore error as this shouldn't happen.
146+
// If failed to resolve tenant we will just use the default limit value.
147+
userID, _ := tenant.TenantID(ctx)
148+
return int64(limits.ParquetMaxFetchedChunkBytes(userID))
149+
}),
150+
queryable.WithDataBytesLimitFunc(func(ctx context.Context) int64 {
151+
// Ignore error as this shouldn't happen.
152+
// If failed to resolve tenant we will just use the default limit value.
153+
userID, _ := tenant.TenantID(ctx)
154+
return int64(limits.ParquetMaxFetchedDataBytes(userID))
155+
}),
156+
queryable.WithMaterializedSeriesCallback(func(ctx context.Context, cs []storage.ChunkSeries) error {
157+
queryLimiter := limiter.QueryLimiterFromContextWithFallback(ctx)
158+
lbls := make([][]cortexpb.LabelAdapter, 0, len(cs))
159+
for _, series := range cs {
160+
chkCount := 0
161+
chunkSize := 0
162+
lblSize := 0
163+
lblAdapter := cortexpb.FromLabelsToLabelAdapters(series.Labels())
164+
lbls = append(lbls, lblAdapter)
165+
for _, lbl := range lblAdapter {
166+
lblSize += lbl.Size()
167+
}
168+
iter := series.Iterator(nil)
169+
for iter.Next() {
170+
chk := iter.At()
171+
chunkSize += len(chk.Chunk.Bytes())
172+
chkCount++
173+
}
174+
if chkCount > 0 {
175+
if err := queryLimiter.AddChunks(chkCount); err != nil {
176+
return validation.LimitError(err.Error())
177+
}
178+
if err := queryLimiter.AddChunkBytes(chunkSize); err != nil {
179+
return validation.LimitError(err.Error())
180+
}
181+
}
182+
183+
if err := queryLimiter.AddDataBytes(chunkSize + lblSize); err != nil {
184+
return validation.LimitError(err.Error())
185+
}
186+
}
187+
if err := queryLimiter.AddSeries(lbls...); err != nil {
188+
return validation.LimitError(err.Error())
189+
}
190+
return nil
191+
}),
192+
}
135193
parquetQueryable, err := queryable.NewParquetQueryable(cDecoder, func(ctx context.Context, mint, maxt int64) ([]parquet_storage.ParquetShard, error) {
136194
userID, err := tenant.TenantID(ctx)
137195
if err != nil {
@@ -182,7 +240,7 @@ func NewParquetQueryable(
182240
}
183241

184242
return shards, errGroup.Wait()
185-
})
243+
}, parquetQueryableOpts...)
186244

187245
p := &parquetQueryableWithFallback{
188246
subservices: manager,
@@ -376,7 +434,7 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool
376434

377435
userID, err := tenant.TenantID(ctx)
378436
if err != nil {
379-
storage.ErrSeriesSet(err)
437+
return storage.ErrSeriesSet(err)
380438
}
381439

382440
if q.limits.QueryVerticalShardSize(userID) > 1 {

0 commit comments

Comments
 (0)