Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[querier] honor querier mint,maxt if no SelectHints are passed to Select #4413

Merged
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@
* [BUGFIX] Memberlist: forward only changes, not entire original message. #4419
* [BUGFIX] Memberlist: don't accept old tombstones as incoming change, and don't forward such messages to other gossip members. #4420
* [BUGFIX] Querier: fixed panic when querying exemplars and using `-distributor.shard-by-all-labels=false`. #4473
* [BUGFIX] Querier: honor querier minT,maxT if `nil` SelectHints are passed to Select(). #4413
* [BUGFIX] Compactor: fixed panic while collecting Prometheus metrics. #4483


## 1.10.0 / 2021-08-03

* [CHANGE] Prevent path traversal attack from users able to control the HTTP header `X-Scope-OrgID`. #4375 (CVE-2021-36157)
Expand Down
9 changes: 7 additions & 2 deletions pkg/querier/chunk_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,19 @@ func (q *chunkStoreQuerier) Select(_ bool, sp *storage.SelectHints, matchers ...
return storage.ErrSeriesSet(err)
}

minT, maxT := q.mint, q.maxt
if sp != nil {
minT, maxT = sp.Start, sp.End
}

// We will hit this for /series lookup when -querier.query-store-for-labels-enabled is set.
// If we don't skip here, it'll make /series lookups extremely slow as all the chunks will be loaded.
// That flag is only to be set with blocks storage engine, and this is a protective measure.
if sp == nil || sp.Func == "series" {
if sp != nil && sp.Func == "series" {
return storage.EmptySeriesSet()
}

chunks, err := q.store.Get(q.ctx, userID, model.Time(sp.Start), model.Time(sp.End), matchers...)
chunks, err := q.store.Get(q.ctx, userID, model.Time(minT), model.Time(maxT), matchers...)
if err != nil {
return storage.ErrSeriesSet(err)
}
Expand Down
13 changes: 8 additions & 5 deletions pkg/querier/distributor_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,23 +83,26 @@ func (q *distributorQuerier) Select(_ bool, sp *storage.SelectHints, matchers ..
log, ctx := spanlogger.New(q.ctx, "distributorQuerier.Select")
defer log.Span.Finish()

// Kludge: Prometheus passes nil SelectParams if it is doing a 'series' operation,
// which needs only metadata. For this specific case we shouldn't apply the queryIngestersWithin
minT, maxT := q.mint, q.maxt
if sp != nil {
minT, maxT = sp.Start, sp.End
}

// If the querier receives a 'series' query, it means only metadata is needed.
// For this specific case we shouldn't apply the queryIngestersWithin
// time range manipulation, otherwise we'll end up returning no series at all for
// older time ranges (while in Cortex we do ignore the start/end and always return
// series in ingesters).
// Also, in the recent versions of Prometheus, we pass in the hint but with Func set to "series".
// See: https://github.com/prometheus/prometheus/pull/8050
if sp == nil || sp.Func == "series" {
if sp != nil && sp.Func == "series" {
ms, err := q.distributor.MetricsForLabelMatchers(ctx, model.Time(q.mint), model.Time(q.maxt), matchers...)
if err != nil {
return storage.ErrSeriesSet(err)
}
return series.MetricsToSeriesSet(ms)
}

minT, maxT := sp.Start, sp.End

// If queryIngestersWithin is enabled, we do manipulate the query mint to query samples up until
// now - queryIngestersWithin, because older time ranges are covered by the storage. This
// optimization is particularly important for the blocks storage where the blocks retention in the
Expand Down
53 changes: 9 additions & 44 deletions pkg/querier/distributor_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand All @@ -29,7 +28,7 @@ const (
)

func TestDistributorQuerier(t *testing.T) {
d := &mockDistributor{}
d := &MockDistributor{}
d.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
model.Matrix{
// Matrixes are unsorted, so this tests that the labels get sorted.
Expand Down Expand Up @@ -117,7 +116,7 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T)
for _, streamingEnabled := range []bool{false, true} {
for testName, testData := range tests {
t.Run(fmt.Sprintf("%s (streaming enabled: %t)", testName, streamingEnabled), func(t *testing.T) {
distributor := &mockDistributor{}
distributor := &MockDistributor{}
distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(model.Matrix{}, nil)
distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil)
distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]metric.Metric{}, nil)
Expand All @@ -127,10 +126,10 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T)
querier, err := queryable.Querier(ctx, testData.queryMinT, testData.queryMaxT)
require.NoError(t, err)

// Select hints are not passed by Prometheus when querying /series.
// Select hints are passed by Prometheus when querying /series.
var hints *storage.SelectHints
if !testData.querySeries {
hints = &storage.SelectHints{Start: testData.queryMinT, End: testData.queryMaxT}
if testData.querySeries {
hints = &storage.SelectHints{Func: "series"}
}

seriesSet := querier.Select(true, hints)
Expand All @@ -149,7 +148,7 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T)
}

func TestDistributorQueryableFilter(t *testing.T) {
d := &mockDistributor{}
d := &MockDistributor{}
dq := newDistributorQueryable(d, false, nil, 1*time.Hour)

now := time.Now()
Expand All @@ -175,7 +174,7 @@ func TestIngesterStreaming(t *testing.T) {
})
require.NoError(t, err)

d := &mockDistributor{}
d := &MockDistributor{}
d.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
&client.QueryStreamResponse{
Chunkseries: []client.TimeSeriesChunk{
Expand Down Expand Up @@ -244,7 +243,7 @@ func TestIngesterStreamingMixedResults(t *testing.T) {
{Value: 5.5, TimestampMs: 5500},
}

d := &mockDistributor{}
d := &MockDistributor{}
d.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
&client.QueryStreamResponse{
Chunkseries: []client.TimeSeriesChunk{
Expand Down Expand Up @@ -316,7 +315,7 @@ func TestDistributorQuerier_LabelNames(t *testing.T) {
{Metric: model.Metric{"job": "baz"}},
{Metric: model.Metric{"job": "baz", "foo": "boom"}},
}
d := &mockDistributor{}
d := &MockDistributor{}
d.On("MetricsForLabelMatchers", mock.Anything, model.Time(mint), model.Time(maxt), someMatchers).
Return(metrics, nil)

Expand Down Expand Up @@ -350,37 +349,3 @@ func convertToChunks(t *testing.T, samples []cortexpb.Sample) []client.Chunk {

return clientChunks
}

type mockDistributor struct {
mock.Mock
}

func (m *mockDistributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) {
args := m.Called(ctx, from, to, matchers)
return args.Get(0).(model.Matrix), args.Error(1)
}
func (m *mockDistributor) QueryExemplars(ctx context.Context, from, to model.Time, matchers ...[]*labels.Matcher) (*client.ExemplarQueryResponse, error) {
args := m.Called(ctx, from, to, matchers)
return args.Get(0).(*client.ExemplarQueryResponse), args.Error(1)
}
func (m *mockDistributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*client.QueryStreamResponse, error) {
args := m.Called(ctx, from, to, matchers)
return args.Get(0).(*client.QueryStreamResponse), args.Error(1)
}
func (m *mockDistributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, lbl model.LabelName, matchers ...*labels.Matcher) ([]string, error) {
args := m.Called(ctx, from, to, lbl, matchers)
return args.Get(0).([]string), args.Error(1)
}
func (m *mockDistributor) LabelNames(ctx context.Context, from, to model.Time) ([]string, error) {
args := m.Called(ctx, from, to)
return args.Get(0).([]string), args.Error(1)
}
func (m *mockDistributor) MetricsForLabelMatchers(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error) {
args := m.Called(ctx, from, to, matchers)
return args.Get(0).([]metric.Metric), args.Error(1)
}

func (m *mockDistributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) {
args := m.Called(ctx)
return args.Get(0).([]scrape.MetricMetadata), args.Error(1)
}
4 changes: 2 additions & 2 deletions pkg/querier/metadata_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

func TestMetadataHandler_Success(t *testing.T) {
d := &mockDistributor{}
d := &MockDistributor{}
d.On("MetricsMetadata", mock.Anything).Return(
[]scrape.MetricMetadata{
{Metric: "alertmanager_dispatcher_aggregation_groups", Help: "Number of active aggregation groups", Type: "gauge", Unit: ""},
Expand Down Expand Up @@ -51,7 +51,7 @@ func TestMetadataHandler_Success(t *testing.T) {
}

func TestMetadataHandler_Error(t *testing.T) {
d := &mockDistributor{}
d := &MockDistributor{}
d.On("MetricsMetadata", mock.Anything).Return([]scrape.MetricMetadata{}, fmt.Errorf("no user id"))

handler := MetadataHandler(d)
Expand Down
16 changes: 9 additions & 7 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,13 +300,15 @@ func (q querier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Mat
level.Debug(log).Log("start", util.TimeFromMillis(sp.Start).UTC().String(), "end", util.TimeFromMillis(sp.End).UTC().String(), "step", sp.Step, "matchers", matchers)
}

// Kludge: Prometheus passes nil SelectHints if it is doing a 'series' operation,
// which needs only metadata. Here we expect that metadataQuerier querier will handle that.
// In Cortex it is not feasible to query entire history (with no mint/maxt), so we only ask ingesters and skip
// querying the long-term storage.
// Also, in the recent versions of Prometheus, we pass in the hint but with Func set to "series".
// See: https://github.com/prometheus/prometheus/pull/8050
if (sp == nil || sp.Func == "series") && !q.queryStoreForLabels {
if sp == nil {
// if SelectHints is null, rely on minT, maxT of querier to scope in range for Select stmt
sp = &storage.SelectHints{Start: q.mint, End: q.maxt}
} else if sp.Func == "series" && !q.queryStoreForLabels {
// Else if the querier receives a 'series' query, it means only metadata is needed.
// Here we expect that metadataQuerier querier will handle that.
// Also, in the recent versions of Prometheus, we pass in the hint but with Func set to "series".
// See: https://github.com/prometheus/prometheus/pull/8050

// In this case, the query time range has already been validated when the querier has been
// created.
return q.metadataQuerier.Select(true, sp, matchers...)
Expand Down
34 changes: 14 additions & 20 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func TestQuerier(t *testing.T) {
chunkStore, through := makeMockChunkStore(t, chunks, encoding.e)
distributor := mockDistibutorFor(t, chunkStore, through)

overrides, err := validation.NewOverrides(defaultLimitsConfig(), nil)
overrides, err := validation.NewOverrides(DefaultLimitsConfig(), nil)
require.NoError(t, err)

queryables := []QueryableWithFilter{UseAlwaysQueryable(NewChunkStoreQueryable(cfg, chunkStore)), UseAlwaysQueryable(db)}
Expand Down Expand Up @@ -280,7 +280,7 @@ func TestNoHistoricalQueryToIngester(t *testing.T) {
chunkStore, _ := makeMockChunkStore(t, 24, encodings[0].e)
distributor := &errDistributor{}

overrides, err := validation.NewOverrides(defaultLimitsConfig(), nil)
overrides, err := validation.NewOverrides(DefaultLimitsConfig(), nil)
require.NoError(t, err)

queryable, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewChunkStoreQueryable(cfg, chunkStore))}, purger.NewTombstonesLoader(nil, nil), nil, log.NewNopLogger())
Expand Down Expand Up @@ -364,11 +364,11 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryIntoFuture(t *testing.T) {
t.Run(fmt.Sprintf("%s (ingester streaming enabled = %t)", name, cfg.IngesterStreaming), func(t *testing.T) {
// We don't need to query any data for this test, so an empty store is fine.
chunkStore := &emptyChunkStore{}
distributor := &mockDistributor{}
distributor := &MockDistributor{}
distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(model.Matrix{}, nil)
distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil)

overrides, err := validation.NewOverrides(defaultLimitsConfig(), nil)
overrides, err := validation.NewOverrides(DefaultLimitsConfig(), nil)
require.NoError(t, err)

queryables := []QueryableWithFilter{UseAlwaysQueryable(NewChunkStoreQueryable(cfg, chunkStore))}
Expand Down Expand Up @@ -438,7 +438,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLength(t *testing.T) {
var cfg Config
flagext.DefaultValues(&cfg)

limits := defaultLimitsConfig()
limits := DefaultLimitsConfig()
limits.MaxQueryLength = model.Duration(maxQueryLength)
overrides, err := validation.NewOverrides(limits, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -560,7 +560,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) {
flagext.DefaultValues(&cfg)
cfg.IngesterStreaming = ingesterStreaming

limits := defaultLimitsConfig()
limits := DefaultLimitsConfig()
limits.MaxQueryLookback = testData.maxQueryLookback
overrides, err := validation.NewOverrides(limits, nil)
require.NoError(t, err)
Expand All @@ -570,7 +570,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) {
queryables := []QueryableWithFilter{UseAlwaysQueryable(NewChunkStoreQueryable(cfg, chunkStore))}

t.Run("query range", func(t *testing.T) {
distributor := &mockDistributor{}
distributor := &MockDistributor{}
distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(model.Matrix{}, nil)
distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil)

Expand Down Expand Up @@ -599,7 +599,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) {
})

t.Run("series", func(t *testing.T) {
distributor := &mockDistributor{}
distributor := &MockDistributor{}
distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]metric.Metric{}, nil)

queryable, _, _ := New(cfg, overrides, distributor, queryables, purger.NewTombstonesLoader(nil, nil), nil, log.NewNopLogger())
Expand Down Expand Up @@ -631,7 +631,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) {
})

t.Run("label names", func(t *testing.T) {
distributor := &mockDistributor{}
distributor := &MockDistributor{}
distributor.On("LabelNames", mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil)

queryable, _, _ := New(cfg, overrides, distributor, queryables, purger.NewTombstonesLoader(nil, nil), nil, log.NewNopLogger())
Expand All @@ -658,7 +658,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) {
matchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchNotEqual, "route", "get_user"),
}
distributor := &mockDistributor{}
distributor := &MockDistributor{}
distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, matchers).Return([]metric.Metric{}, nil)

queryable, _, _ := New(cfg, overrides, distributor, queryables, purger.NewTombstonesLoader(nil, nil), nil, log.NewNopLogger())
Expand All @@ -684,7 +684,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) {
})

t.Run("label values", func(t *testing.T) {
distributor := &mockDistributor{}
distributor := &MockDistributor{}
distributor.On("LabelValuesForLabelName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil)

queryable, _, _ := New(cfg, overrides, distributor, queryables, purger.NewTombstonesLoader(nil, nil), nil, log.NewNopLogger())
Expand Down Expand Up @@ -713,7 +713,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) {

// mockDistibutorFor duplicates the chunks in the mockChunkStore into the mockDistributor
// so we can test everything is dedupe correctly.
func mockDistibutorFor(t *testing.T, cs mockChunkStore, through model.Time) *mockDistributor {
func mockDistibutorFor(t *testing.T, cs mockChunkStore, through model.Time) *MockDistributor {
chunks, err := chunkcompat.ToChunks(cs.chunks)
require.NoError(t, err)

Expand All @@ -724,7 +724,7 @@ func mockDistibutorFor(t *testing.T, cs mockChunkStore, through model.Time) *moc
matrix, err := chunk.ChunksToMatrix(context.Background(), cs.chunks, 0, through)
require.NoError(t, err)

result := &mockDistributor{}
result := &MockDistributor{}
result.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(matrix, nil)
result.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{Chunkseries: []client.TimeSeriesChunk{tsc}}, nil)
return result
Expand Down Expand Up @@ -902,7 +902,7 @@ func TestShortTermQueryToLTS(t *testing.T) {
chunkStore := &emptyChunkStore{}
distributor := &errDistributor{}

overrides, err := validation.NewOverrides(defaultLimitsConfig(), nil)
overrides, err := validation.NewOverrides(DefaultLimitsConfig(), nil)
require.NoError(t, err)

queryable, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewChunkStoreQueryable(cfg, chunkStore))}, purger.NewTombstonesLoader(nil, nil), nil, log.NewNopLogger())
Expand Down Expand Up @@ -1020,9 +1020,3 @@ func (m *mockQueryableWithFilter) UseQueryable(_ time.Time, _, _ int64) bool {
m.useQueryableCalled = true
return true
}

func defaultLimitsConfig() validation.Limits {
limits := validation.Limits{}
flagext.DefaultValues(&limits)
return limits
}
Loading