Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce "max chunks per query" limit in ingesters too #4125

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## master / unreleased

* [CHANGE] Querier / ruler: deprecated `-store.query-chunk-limit` CLI flag (and its respective YAML config option `max_chunks_per_query`) in favour of `-querier.max-fetched-chunks-per-query` (and its respective YAML config option `max_fetched_chunks_per_query`). The new limit specifies the maximum number of chunks that can be fetched in a single query from ingesters and long-term storage: the total number of actual fetched chunks could be 2x the limit, being independently applied when querying ingesters and long-term storage. #4125

## 1.9.0 in progress

* [CHANGE] Fix for CVE-2021-31232: Local file disclosure vulnerability when `-experimental.alertmanager.enable-api` is used. The HTTP basic auth `password_file` can be used as an attack vector to send any file content via a webhook. The alertmanager templates can be used as an attack vector to send any file content because the alertmanager can load any text file specified in the templates list. #4129
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
# This file is left empty. It can be configured with overrides or other runtime config.
# This file can be used to set overrides or other runtime config.
ingester_stream_chunks_when_using_blocks: true
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
# This file is left empty. It can be configured with overrides or other runtime config.
# This file can be used to set overrides or other runtime config.
ingester_stream_chunks_when_using_blocks: true
3 changes: 2 additions & 1 deletion development/tsdb-blocks-storage-s3/config/runtime.yaml
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
# This file is left empty. It can be configured with overrides or other runtime config.
# This file can be used to set overrides or other runtime config.
ingester_stream_chunks_when_using_blocks: true
21 changes: 16 additions & 5 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4007,14 +4007,25 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -ingester.max-global-metadata-per-metric
[max_global_metadata_per_metric: <int> | default = 0]

# Maximum number of chunks that can be fetched in a single query. This limit is
# enforced when fetching chunks from the long-term storage. When running the
# Cortex chunks storage, this limit is enforced in the querier, while when
# running the Cortex blocks storage this limit is both enforced in the querier
# and store-gateway. 0 to disable.
# Deprecated. Use -querier.max-fetched-chunks-per-query CLI flag and its
# respective YAML config option instead. Maximum number of chunks that can be
# fetched in a single query. This limit is enforced when fetching chunks from
# the long-term storage only. When running the Cortex chunks storage, this limit
# is enforced in the querier and ruler, while when running the Cortex blocks
# storage this limit is enforced in the querier, ruler and store-gateway. 0 to
# disable.
# CLI flag: -store.query-chunk-limit
[max_chunks_per_query: <int> | default = 2000000]

# Maximum number of chunks that can be fetched in a single query from ingesters
# and long-term storage: the total number of actual fetched chunks could be 2x
# the limit, being independently applied when querying ingesters and long-term
# storage. This limit is enforced in the ingester (if chunks streaming is
# enabled), querier, ruler and store-gateway. Takes precedence over the
# deprecated -store.query-chunk-limit. 0 to disable.
# CLI flag: -querier.max-fetched-chunks-per-query
[max_fetched_chunks_per_query: <int> | default = 0]

# Limit how long back data (series and metadata) can be queried, up until
# <lookback> duration ago. This limit is enforced in the query-frontend, querier
# and ruler. If the requested time range is outside the allowed range, the
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunk/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func (c *store) getMetricNameChunks(ctx context.Context, userID string, from, th
filtered := filterChunksByTime(from, through, chunks)
level.Debug(log).Log("Chunks post filtering", len(chunks))

maxChunksPerQuery := c.limits.MaxChunksPerQuery(userID)
maxChunksPerQuery := c.limits.MaxChunksPerQueryFromStore(userID)
if maxChunksPerQuery > 0 && len(filtered) > maxChunksPerQuery {
err := QueryError(fmt.Sprintf("Query %v fetched too many chunks (%d > %d)", allMatchers, len(filtered), maxChunksPerQuery))
level.Error(log).Log("err", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunk/composite_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

// StoreLimits helps get Limits specific to Queries for Stores
type StoreLimits interface {
MaxChunksPerQuery(userID string) int
MaxChunksPerQueryFromStore(userID string) int
MaxQueryLength(userID string) time.Duration
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/chunk/series_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (c *seriesStore) Get(ctx context.Context, userID string, from, through mode
chunks := chks[0]
fetcher := fetchers[0]
// Protect ourselves against OOMing.
maxChunksPerQuery := c.limits.MaxChunksPerQuery(userID)
maxChunksPerQuery := c.limits.MaxChunksPerQueryFromStore(userID)
if maxChunksPerQuery > 0 && len(chunks) > maxChunksPerQuery {
err := QueryError(fmt.Sprintf("Query %v fetched too many chunks (%d > %d)", allMatchers, len(chunks), maxChunksPerQuery))
level.Error(log).Log("err", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunk/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func RegisterIndexStore(name string, indexClientFactory IndexClientFactoryFunc,
// StoreLimits helps get Limits specific to Queries for Stores
type StoreLimits interface {
CardinalityLimit(userID string) int
MaxChunksPerQuery(userID string) int
MaxChunksPerQueryFromStore(userID string) int
MaxQueryLength(userID string) time.Duration
}

Expand Down
90 changes: 74 additions & 16 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,60 @@ func TestDistributor_PushQuery(t *testing.T) {
}
}

func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunksPerQueryLimitIsReached(t *testing.T) {
const maxChunksLimit = 30 // Chunks are duplicated due to replication factor.

limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.MaxChunksPerQuery = maxChunksLimit

// Prepare distributors.
ds, _, r, _ := prepare(t, prepConfig{
numIngesters: 3,
happyIngesters: 3,
numDistributors: 1,
shardByAllLabels: true,
limits: limits,
})
defer stopAll(ds, r)

// Push a number of series below the max chunks limit. Each series has 1 sample,
// so expect 1 chunk per series when querying back.
initialSeries := maxChunksLimit / 3
writeReq := makeWriteRequest(0, initialSeries, 0)
writeRes, err := ds[0].Push(ctx, writeReq)
assert.Equal(t, &cortexpb.WriteResponse{}, writeRes)
assert.Nil(t, err)

allSeriesMatchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, ".+"),
}

// Since the number of series (and thus chunks) is equal to the limit (but doesn't
// exceed it), we expect a query running on all series to succeed.
queryRes, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
require.NoError(t, err)
assert.Len(t, queryRes.Chunkseries, initialSeries)

// Push more series to exceed the limit once we'll query back all series.
writeReq = &cortexpb.WriteRequest{}
for i := 0; i < maxChunksLimit; i++ {
writeReq.Timeseries = append(writeReq.Timeseries,
makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: fmt.Sprintf("another_series_%d", i)}}, 0, 0),
)
}

writeRes, err = ds[0].Push(ctx, writeReq)
assert.Equal(t, &cortexpb.WriteResponse{}, writeRes)
assert.Nil(t, err)

// Since the number of series (and thus chunks) is exceeding to the limit, we expect
// a query running on all series to fail.
_, err = ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
require.Error(t, err)
assert.Contains(t, err.Error(), "the query hit the max number of chunks limit")
}

func TestDistributor_Push_LabelRemoval(t *testing.T) {
ctx = user.InjectOrgID(context.Background(), "user")

Expand Down Expand Up @@ -1754,22 +1808,12 @@ func stopAll(ds []*Distributor, r *ring.Ring) {
func makeWriteRequest(startTimestampMs int64, samples int, metadata int) *cortexpb.WriteRequest {
request := &cortexpb.WriteRequest{}
for i := 0; i < samples; i++ {
ts := cortexpb.PreallocTimeseries{
TimeSeries: &cortexpb.TimeSeries{
Labels: []cortexpb.LabelAdapter{
{Name: model.MetricNameLabel, Value: "foo"},
{Name: "bar", Value: "baz"},
{Name: "sample", Value: fmt.Sprintf("%d", i)},
},
},
}
ts.Samples = []cortexpb.Sample{
{
Value: float64(i),
TimestampMs: startTimestampMs + int64(i),
},
}
request.Timeseries = append(request.Timeseries, ts)
request.Timeseries = append(request.Timeseries, makeWriteRequestTimeseries(
[]cortexpb.LabelAdapter{
{Name: model.MetricNameLabel, Value: "foo"},
{Name: "bar", Value: "baz"},
{Name: "sample", Value: fmt.Sprintf("%d", i)},
}, startTimestampMs+int64(i), float64(i)))
}

for i := 0; i < metadata; i++ {
Expand All @@ -1784,6 +1828,20 @@ func makeWriteRequest(startTimestampMs int64, samples int, metadata int) *cortex
return request
}

func makeWriteRequestTimeseries(labels []cortexpb.LabelAdapter, ts int64, value float64) cortexpb.PreallocTimeseries {
return cortexpb.PreallocTimeseries{
TimeSeries: &cortexpb.TimeSeries{
Labels: labels,
Samples: []cortexpb.Sample{
{
Value: value,
TimestampMs: ts,
},
},
},
}
}

func makeWriteRequestHA(samples int, replica, cluster string) *cortexpb.WriteRequest {
request := &cortexpb.WriteRequest{}
for i := 0; i < samples; i++ {
Expand Down
32 changes: 30 additions & 2 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package distributor

import (
"context"
"fmt"
"io"
"time"

"github.com/opentracing/opentracing-go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/instrument"
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/cortexpb"
ingester_client "github.com/cortexproject/cortex/pkg/ingester/client"
Expand All @@ -17,6 +19,11 @@ import (
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/extract"
grpc_util "github.com/cortexproject/cortex/pkg/util/grpc"
"github.com/cortexproject/cortex/pkg/util/validation"
)

var (
errMaxChunksPerQueryLimit = "the query hit the max number of chunks limit while fetching chunks from ingesters for %s (limit: %d)"
)

// Query multiple ingesters and returns a Matrix of samples.
Expand Down Expand Up @@ -50,6 +57,11 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers .
func (d *Distributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*ingester_client.QueryStreamResponse, error) {
var result *ingester_client.QueryStreamResponse
err := instrument.CollectedRequest(ctx, "Distributor.QueryStream", d.queryDuration, instrument.ErrorCode, func(ctx context.Context) error {
userID, err := tenant.TenantID(ctx)
if err != nil {
return err
}

req, err := ingester_client.ToQueryRequest(from, to, matchers)
if err != nil {
return err
Expand All @@ -60,7 +72,7 @@ func (d *Distributor) QueryStream(ctx context.Context, from, to model.Time, matc
return err
}

result, err = d.queryIngesterStream(ctx, replicationSet, req)
result, err = d.queryIngesterStream(ctx, userID, replicationSet, req)
if err != nil {
return err
}
Expand Down Expand Up @@ -173,7 +185,12 @@ func (d *Distributor) queryIngesters(ctx context.Context, replicationSet ring.Re
}

// queryIngesterStream queries the ingesters using the new streaming API.
func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (*ingester_client.QueryStreamResponse, error) {
func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (*ingester_client.QueryStreamResponse, error) {
var (
chunksLimit = d.limits.MaxChunksPerQueryFromIngesters(userID)
chunksCount = atomic.Int32{}
)

// Fetch samples from multiple ingesters
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
client, err := d.ingesterPool.GetClientFor(ing.Addr)
Expand Down Expand Up @@ -203,6 +220,17 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
return nil, err
}

// Enforce the max chunks limits.
if chunksLimit > 0 {
if count := int(chunksCount.Add(int32(resp.ChunksCount()))); count > chunksLimit {
// We expect to be always able to convert the label matchers back to Prometheus ones.
// In case we fail (unexpected) the error will not include the matchers, but the core
// logic doesn't break.
matchers, _ := ingester_client.FromLabelMatchers(req.Matchers)
return nil, validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), chunksLimit))
}
}

result.Chunkseries = append(result.Chunkseries, resp.Chunkseries...)
result.Timeseries = append(result.Timeseries, resp.Timeseries...)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/ingester/client/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func ToQueryRequest(from, to model.Time, matchers []*labels.Matcher) (*QueryRequ

// FromQueryRequest unpacks a QueryRequest proto.
func FromQueryRequest(req *QueryRequest) (model.Time, model.Time, []*labels.Matcher, error) {
matchers, err := fromLabelMatchers(req.Matchers)
matchers, err := FromLabelMatchers(req.Matchers)
if err != nil {
return 0, 0, nil, err
}
Expand Down Expand Up @@ -90,7 +90,7 @@ func ToMetricsForLabelMatchersRequest(from, to model.Time, matchers []*labels.Ma
func FromMetricsForLabelMatchersRequest(req *MetricsForLabelMatchersRequest) (model.Time, model.Time, [][]*labels.Matcher, error) {
matchersSet := make([][]*labels.Matcher, 0, len(req.MatchersSet))
for _, matchers := range req.MatchersSet {
matchers, err := fromLabelMatchers(matchers.Matchers)
matchers, err := FromLabelMatchers(matchers.Matchers)
if err != nil {
return 0, 0, nil, err
}
Expand Down Expand Up @@ -131,7 +131,7 @@ func FromLabelValuesRequest(req *LabelValuesRequest) (string, int64, int64, []*l
var matchers []*labels.Matcher

if req.Matchers != nil {
matchers, err = fromLabelMatchers(req.Matchers.Matchers)
matchers, err = FromLabelMatchers(req.Matchers.Matchers)
if err != nil {
return "", 0, 0, nil, err
}
Expand Down Expand Up @@ -165,7 +165,7 @@ func toLabelMatchers(matchers []*labels.Matcher) ([]*LabelMatcher, error) {
return result, nil
}

func fromLabelMatchers(matchers []*LabelMatcher) ([]*labels.Matcher, error) {
func FromLabelMatchers(matchers []*LabelMatcher) ([]*labels.Matcher, error) {
result := make([]*labels.Matcher, 0, len(matchers))
for _, matcher := range matchers {
var mtype labels.MatchType
Expand Down
14 changes: 14 additions & 0 deletions pkg/ingester/client/custom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package client

// ChunksCount returns the number of chunks in response.
func (m *QueryStreamResponse) ChunksCount() int {
if len(m.Chunkseries) == 0 {
return 0
}

count := 0
for _, entry := range m.Chunkseries {
count += len(entry.Chunks)
}
return count
}
24 changes: 4 additions & 20 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ const (

var (
errNoStoreGatewayAddress = errors.New("no store-gateway address configured")
errMaxChunksPerQueryLimit = "the query hit the max number of chunks limit while fetching chunks for %s (limit: %d)"
errMaxChunksPerQueryLimit = "the query hit the max number of chunks limit while fetching chunks from store-gateways for %s (limit: %d)"
)

// BlocksStoreSet is the interface used to get the clients to query series on a set of blocks.
Expand Down Expand Up @@ -89,7 +89,7 @@ type BlocksStoreClient interface {
type BlocksStoreLimits interface {
bucket.TenantConfigProvider

MaxChunksPerQuery(userID string) int
MaxChunksPerQueryFromStore(userID string) int
StoreGatewayTenantShardSize(userID string) int
}

Expand Down Expand Up @@ -401,7 +401,7 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...*
resSeriesSets = []storage.SeriesSet(nil)
resWarnings = storage.Warnings(nil)

maxChunksLimit = q.limits.MaxChunksPerQuery(q.userID)
maxChunksLimit = q.limits.MaxChunksPerQueryFromStore(q.userID)
leftChunksLimit = maxChunksLimit

resultMtx sync.Mutex
Expand Down Expand Up @@ -615,7 +615,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
if maxChunksLimit > 0 {
actual := numChunks.Add(int32(len(s.Chunks)))
if actual > int32(leftChunksLimit) {
return validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, convertMatchersToString(matchers), maxChunksLimit))
return validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), maxChunksLimit))
}
}
}
Expand Down Expand Up @@ -937,19 +937,3 @@ func countSeriesBytes(series []*storepb.Series) (count uint64) {

return count
}

func convertMatchersToString(matchers []*labels.Matcher) string {
out := strings.Builder{}
out.WriteRune('{')

for idx, m := range matchers {
if idx > 0 {
out.WriteRune(',')
}

out.WriteString(m.String())
}

out.WriteRune('}')
return out.String()
}
2 changes: 1 addition & 1 deletion pkg/querier/blocks_store_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1300,7 +1300,7 @@ type blocksStoreLimitsMock struct {
storeGatewayTenantShardSize int
}

func (m *blocksStoreLimitsMock) MaxChunksPerQuery(_ string) int {
func (m *blocksStoreLimitsMock) MaxChunksPerQueryFromStore(_ string) int {
return m.maxChunksPerQuery
}

Expand Down
Loading