Skip to content

Add per-tenant max cache freshness to query-frontend #2609

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
May 25, 2020
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
* [FEATURE] Experimental: Added support for `/api/v1/metadata` Prometheus-based endpoint. #2549
* [FEATURE] Add ability to limit concurrent queries to Cassandra with `-cassandra.query-concurrency` flag. #2562
* [FEATURE] TLS config options added for GRPC clients in Querier (Query-frontend client & Ingester client), Ruler, Store Gateway, as well as HTTP client in Config store client. #2502
* [FEATURE] Add ability to configure per-tenant MaxCacheFreshness in the query-frontend using `frontend.per-user-max-cache-freshness`. #2609
* [ENHANCEMENT] `query-tee` supports `/metadata`, `/alerts`, and `/rules` #2600
* [ENHANCEMENT] Ruler: Automatically remove unhealthy rulers from the ring. #2587
* [ENHANCEMENT] Experimental TSDB: sample ingestion errors are now reported via existing `cortex_discarded_samples_total` metric. #2370
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2366,6 +2366,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -store.cardinality-limit
[cardinality_limit: <int> | default = 100000]

# Most recent allowed cacheable result per-tenant, to prevent caching very
# recent results that might still be in flux.
# CLI flag: -frontend.per-user-max-cache-freshness
[max_cache_freshness: <duration> | default = 0s]

# File name of per-user overrides. [deprecated, use -runtime-config.file
# instead]
# CLI flag: -limits.per-user-override-config
Expand Down
1 change: 1 addition & 0 deletions pkg/querier/queryrange/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
type Limits interface {
MaxQueryLength(string) time.Duration
MaxQueryParallelism(string) int
MaxCacheFreshness(string) time.Duration
}

type limits struct {
Expand Down
23 changes: 19 additions & 4 deletions pkg/querier/queryrange/results_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,12 @@ func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) {
response Response
)

maxCacheTime := int64(model.Now().Add(-s.cfg.MaxCacheFreshness))
// check if per-tenant cache freshness value is provided
maxCacheFreshness := s.limits.MaxCacheFreshness(userID)
if maxCacheFreshness == time.Duration(0) {
maxCacheFreshness = s.cfg.MaxCacheFreshness
}
maxCacheTime := int64(model.Now().Add(-maxCacheFreshness))
if r.GetStart() > maxCacheTime {
return s.next.Do(ctx, r)
}
Expand All @@ -184,7 +189,7 @@ func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) {
}

if err == nil && len(extents) > 0 {
extents, err := s.filterRecentExtents(r, extents)
extents, err := s.filterRecentExtents(ctx, r, extents)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -417,8 +422,18 @@ func partition(req Request, extents []Extent, extractor Extractor) ([]Request, [
return requests, cachedResponses, nil
}

func (s resultsCache) filterRecentExtents(req Request, extents []Extent) ([]Extent, error) {
maxCacheTime := (int64(model.Now().Add(-s.cfg.MaxCacheFreshness)) / req.GetStep()) * req.GetStep()
func (s resultsCache) filterRecentExtents(ctx context.Context, req Request, extents []Extent) ([]Extent, error) {
userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}

// check if per-tenant cache freshness value is provided
maxCacheFreshness := s.limits.MaxCacheFreshness(userID)
if maxCacheFreshness == time.Duration(0) {
maxCacheFreshness = s.cfg.MaxCacheFreshness
}
maxCacheTime := (int64(model.Now().Add(-maxCacheFreshness)) / req.GetStep()) * req.GetStep()
for i := range extents {
// Never cache data for the latest freshness period.
if extents[i].End > maxCacheTime {
Expand Down
71 changes: 71 additions & 0 deletions pkg/querier/queryrange/results_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,18 @@ func (fakeLimits) MaxQueryParallelism(string) int {
return 14 // Flag default.
}

func (fakeLimits) MaxCacheFreshness(string) time.Duration {
return time.Duration(0)
}

type fakeLimitsWithMaxCacheFreshness struct {
fakeLimits
}

func (fakeLimitsWithMaxCacheFreshness) MaxCacheFreshness(string) time.Duration {
return 10 * time.Minute
}

func TestResultsCache(t *testing.T) {
calls := 0
cfg := ResultsCacheConfig{
Expand Down Expand Up @@ -423,6 +435,65 @@ func TestResultsCacheRecent(t *testing.T) {
require.Equal(t, parsedResponse, resp)
}

func TestResultsCacheMaxFreshness(t *testing.T) {
modelNow := model.Now()
for i, tc := range []struct {
fakeLimits Limits
Handler HandlerFunc
expectedResponse *PrometheusResponse
}{
{
fakeLimits: fakeLimits{},
Handler: nil,
expectedResponse: mkAPIResponse(int64(modelNow)-(50*1e3), int64(modelNow)-(10*1e3), 10),
},
{
fakeLimits: fakeLimitsWithMaxCacheFreshness{},
Handler: HandlerFunc(func(_ context.Context, _ Request) (Response, error) {
return parsedResponse, nil
}),
expectedResponse: parsedResponse,
},
} {
t.Run(strconv.Itoa(i), func(t *testing.T) {
var cfg ResultsCacheConfig
flagext.DefaultValues(&cfg)
cfg.CacheConfig.Cache = cache.NewMockCache()

// set a small "global" MaxCacheFreshness value
cfg.MaxCacheFreshness = 5 * time.Second

fakeLimits := tc.fakeLimits
rcm, _, err := NewResultsCacheMiddleware(
log.NewNopLogger(),
cfg,
constSplitter(day),
fakeLimits,
PrometheusCodec,
PrometheusResponseExtractor{},
nil,
)
require.NoError(t, err)

// create cache with nil handler
rc := rcm.Wrap(tc.Handler)
ctx := user.InjectOrgID(context.Background(), "1")

// create request with start end within the key extents
req := parsedRequest.WithStartEnd(int64(modelNow)-(50*1e3), int64(modelNow)-(10*1e3))

// fill cache
key := constSplitter(day).GenerateCacheKey("1", req)
rc.(*resultsCache).put(ctx, key, []Extent{mkExtent(int64(modelNow)-(60*1e3), int64(modelNow))})

// request should lookup cache.
resp, err := rc.Do(ctx, req)
require.NoError(t, err)
require.Equal(t, tc.expectedResponse, resp)
})
}
}

func Test_resultsCache_MissingData(t *testing.T) {
cfg := ResultsCacheConfig{
CacheConfig: cache.Config{
Expand Down
7 changes: 7 additions & 0 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Limits struct {
MaxQueryLength time.Duration `yaml:"max_query_length"`
MaxQueryParallelism int `yaml:"max_query_parallelism"`
CardinalityLimit int `yaml:"cardinality_limit"`
MaxCacheFreshness time.Duration `yaml:"max_cache_freshness"`

// Config for overrides, convenient if it goes here. [Deprecated in favor of RuntimeConfig flag in cortex.Config]
PerTenantOverrideConfig string `yaml:"per_tenant_override_config"`
Expand Down Expand Up @@ -103,6 +104,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&l.MaxQueryLength, "store.max-query-length", 0, "Limit to length of chunk store queries, 0 to disable.")
f.IntVar(&l.MaxQueryParallelism, "querier.max-query-parallelism", 14, "Maximum number of queries will be scheduled in parallel by the frontend.")
f.IntVar(&l.CardinalityLimit, "store.cardinality-limit", 1e5, "Cardinality limit for index queries.")
f.DurationVar(&l.MaxCacheFreshness, "frontend.per-user-max-cache-freshness", 0, "Most recent allowed cacheable result per-tenant, to prevent caching very recent results that might still be in flux.")

f.StringVar(&l.PerTenantOverrideConfig, "limits.per-user-override-config", "", "File name of per-user overrides. [deprecated, use -runtime-config.file instead]")
f.DurationVar(&l.PerTenantOverridePeriod, "limits.per-user-override-period", 10*time.Second, "Period with which to reload the overrides. [deprecated, use -runtime-config.reload-period instead]")
Expand Down Expand Up @@ -282,6 +284,11 @@ func (o *Overrides) MaxQueryLength(userID string) time.Duration {
return o.getOverridesForUser(userID).MaxQueryLength
}

// MaxCacheFreshness returns the limit of the length (in time) of a query.
func (o *Overrides) MaxCacheFreshness(userID string) time.Duration {
return o.getOverridesForUser(userID).MaxCacheFreshness
}

// MaxQueryParallelism returns the limit to the number of sub-queries the
// frontend will process in parallel.
func (o *Overrides) MaxQueryParallelism(userID string) int {
Expand Down