Skip to content

Add per-tenant max cache freshness to query-frontend #2609

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
May 25, 2020
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* [CHANGE] Query Frontend now uses Round Robin to choose a tenant queue to service next. #2553
* [FEATURE] TLS config options added for GRPC clients in Querier (Query-frontend client & Ingester client), Ruler, Store Gateway, as well as HTTP client in Config store client. #2502
* [FEATURE] The flag `frontend.max-cache-freshness` can now be used to specify per-tenant max cache freshness values. The corresponding YAML config parameter has been changed from `results_cache.max_freshness` to `limits_config.max_cache_freshness`. The legacy YAML config parameter (`results_cache.max_freshness`) will continue to be supported till Cortex release `v1.4.0`. #2609
* [ENHANCEMENT] Experimental TSDB: added the following metrics to the ingester: #2580 #2583 #2589
* `cortex_ingester_tsdb_appender_add_duration_seconds`
* `cortex_ingester_tsdb_appender_commit_duration_seconds`
Expand Down
10 changes: 5 additions & 5 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -741,11 +741,6 @@ results_cache:
# The CLI flags prefix for this block config is: frontend
[fifocache: <fifo_cache_config>]

# Most recent allowed cacheable result, to prevent caching very recent results
# that might still be in flux.
# CLI flag: -frontend.max-cache-freshness
[max_freshness: <duration> | default = 1m]

# Cache query results.
# CLI flag: -querier.cache-results
[cache_results: <boolean> | default = false]
Expand Down Expand Up @@ -2366,6 +2361,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -store.cardinality-limit
[cardinality_limit: <int> | default = 100000]

# Most recent allowed cacheable result per-tenant, to prevent caching very
# recent results that might still be in flux.
# CLI flag: -frontend.max-cache-freshness
[max_cache_freshness: <duration> | default = 1m]

# File name of per-user overrides. [deprecated, use -runtime-config.file
# instead]
# CLI flag: -limits.per-user-override-config
Expand Down
1 change: 1 addition & 0 deletions pkg/querier/queryrange/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
type Limits interface {
MaxQueryLength(string) time.Duration
MaxQueryParallelism(string) int
MaxCacheFreshness(string) time.Duration
}

type limits struct {
Expand Down
17 changes: 10 additions & 7 deletions pkg/querier/queryrange/results_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,14 @@ type CacheGenNumberLoader interface {
// ResultsCacheConfig is the config for the results cache.
type ResultsCacheConfig struct {
CacheConfig cache.Config `yaml:"cache"`
MaxCacheFreshness time.Duration `yaml:"max_freshness"`
MaxCacheFreshness time.Duration `yaml:"max_freshness" doc:"hidden"` // TODO: (deprecated) remove in Cortex v1.4.0
}

// RegisterFlags registers flags.
func (cfg *ResultsCacheConfig) RegisterFlags(f *flag.FlagSet) {
cfg.CacheConfig.RegisterFlagsWithPrefix("frontend.", "", f)

flagext.DeprecatedFlag(f, "frontend.cache-split-interval", "Deprecated: The maximum interval expected for each request, results will be cached per single interval. This behavior is now determined by querier.split-queries-by-interval.")

f.DurationVar(&cfg.MaxCacheFreshness, "frontend.max-cache-freshness", 1*time.Minute, "Most recent allowed cacheable result, to prevent caching very recent results that might still be in flux.")
}

// Extractor is used by the cache to extract a subset of a response from a cache entry.
Expand Down Expand Up @@ -171,7 +169,12 @@ func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) {
response Response
)

maxCacheTime := int64(model.Now().Add(-s.cfg.MaxCacheFreshness))
// check if cache freshness value is provided in legacy config
maxCacheFreshness := s.cfg.MaxCacheFreshness
if maxCacheFreshness == time.Duration(0) {
maxCacheFreshness = s.limits.MaxCacheFreshness(userID)
}
maxCacheTime := int64(model.Now().Add(-maxCacheFreshness))
if r.GetStart() > maxCacheTime {
return s.next.Do(ctx, r)
}
Expand All @@ -184,7 +187,7 @@ func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) {
}

if err == nil && len(extents) > 0 {
extents, err := s.filterRecentExtents(r, extents)
extents, err := s.filterRecentExtents(r, maxCacheFreshness, extents)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -417,8 +420,8 @@ func partition(req Request, extents []Extent, extractor Extractor) ([]Request, [
return requests, cachedResponses, nil
}

func (s resultsCache) filterRecentExtents(req Request, extents []Extent) ([]Extent, error) {
maxCacheTime := (int64(model.Now().Add(-s.cfg.MaxCacheFreshness)) / req.GetStep()) * req.GetStep()
func (s resultsCache) filterRecentExtents(req Request, maxCacheFreshness time.Duration, extents []Extent) ([]Extent, error) {
maxCacheTime := (int64(model.Now().Add(-maxCacheFreshness)) / req.GetStep()) * req.GetStep()
for i := range extents {
// Never cache data for the latest freshness period.
if extents[i].End > maxCacheTime {
Expand Down
76 changes: 75 additions & 1 deletion pkg/querier/queryrange/results_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,18 @@ func (fakeLimits) MaxQueryParallelism(string) int {
return 14 // Flag default.
}

func (fakeLimits) MaxCacheFreshness(string) time.Duration {
return time.Duration(0)
}

type fakeLimitsHighMaxCacheFreshness struct {
fakeLimits
}

func (fakeLimitsHighMaxCacheFreshness) MaxCacheFreshness(string) time.Duration {
return 10 * time.Minute
}

func TestResultsCache(t *testing.T) {
calls := 0
cfg := ResultsCacheConfig{
Expand Down Expand Up @@ -397,7 +409,7 @@ func TestResultsCacheRecent(t *testing.T) {
var cfg ResultsCacheConfig
flagext.DefaultValues(&cfg)
cfg.CacheConfig.Cache = cache.NewMockCache()
rcm, _, err := NewResultsCacheMiddleware(log.NewNopLogger(), cfg, constSplitter(day), fakeLimits{}, PrometheusCodec, PrometheusResponseExtractor{}, nil)
rcm, _, err := NewResultsCacheMiddleware(log.NewNopLogger(), cfg, constSplitter(day), fakeLimitsHighMaxCacheFreshness{}, PrometheusCodec, PrometheusResponseExtractor{}, nil)
require.NoError(t, err)

req := parsedRequest.WithStartEnd(int64(model.Now())-(60*1e3), int64(model.Now()))
Expand All @@ -423,6 +435,68 @@ func TestResultsCacheRecent(t *testing.T) {
require.Equal(t, parsedResponse, resp)
}

func TestResultsCacheMaxFreshness(t *testing.T) {
modelNow := model.Now()
for i, tc := range []struct {
legacyCacheMaxFreshness time.Duration
fakeLimits Limits
Handler HandlerFunc
expectedResponse *PrometheusResponse
}{
{
// should lookup cache because legacy cache max freshness will be applied
legacyCacheMaxFreshness: 5 * time.Second,
fakeLimits: fakeLimits{},
Handler: nil,
expectedResponse: mkAPIResponse(int64(modelNow)-(50*1e3), int64(modelNow)-(10*1e3), 10),
},
{
// should not lookup cache because per-tenant override will be applied
legacyCacheMaxFreshness: time.Duration(0),
fakeLimits: fakeLimitsHighMaxCacheFreshness{},
Handler: HandlerFunc(func(_ context.Context, _ Request) (Response, error) {
return parsedResponse, nil
}),
expectedResponse: parsedResponse,
},
} {
t.Run(strconv.Itoa(i), func(t *testing.T) {
var cfg ResultsCacheConfig
flagext.DefaultValues(&cfg)
cfg.CacheConfig.Cache = cache.NewMockCache()

cfg.MaxCacheFreshness = tc.legacyCacheMaxFreshness

fakeLimits := tc.fakeLimits
rcm, _, err := NewResultsCacheMiddleware(
log.NewNopLogger(),
cfg,
constSplitter(day),
fakeLimits,
PrometheusCodec,
PrometheusResponseExtractor{},
nil,
)
require.NoError(t, err)

// create cache with handler
rc := rcm.Wrap(tc.Handler)
ctx := user.InjectOrgID(context.Background(), "1")

// create request with start end within the key extents
req := parsedRequest.WithStartEnd(int64(modelNow)-(50*1e3), int64(modelNow)-(10*1e3))

// fill cache
key := constSplitter(day).GenerateCacheKey("1", req)
rc.(*resultsCache).put(ctx, key, []Extent{mkExtent(int64(modelNow)-(60*1e3), int64(modelNow))})

resp, err := rc.Do(ctx, req)
require.NoError(t, err)
require.Equal(t, tc.expectedResponse, resp)
})
}
}

func Test_resultsCache_MissingData(t *testing.T) {
cfg := ResultsCacheConfig{
CacheConfig: cache.Config{
Expand Down
7 changes: 7 additions & 0 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Limits struct {
MaxQueryLength time.Duration `yaml:"max_query_length"`
MaxQueryParallelism int `yaml:"max_query_parallelism"`
CardinalityLimit int `yaml:"cardinality_limit"`
MaxCacheFreshness time.Duration `yaml:"max_cache_freshness"`

// Config for overrides, convenient if it goes here. [Deprecated in favor of RuntimeConfig flag in cortex.Config]
PerTenantOverrideConfig string `yaml:"per_tenant_override_config"`
Expand Down Expand Up @@ -103,6 +104,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&l.MaxQueryLength, "store.max-query-length", 0, "Limit to length of chunk store queries, 0 to disable.")
f.IntVar(&l.MaxQueryParallelism, "querier.max-query-parallelism", 14, "Maximum number of queries will be scheduled in parallel by the frontend.")
f.IntVar(&l.CardinalityLimit, "store.cardinality-limit", 1e5, "Cardinality limit for index queries.")
f.DurationVar(&l.MaxCacheFreshness, "frontend.max-cache-freshness", 1*time.Minute, "Most recent allowed cacheable result per-tenant, to prevent caching very recent results that might still be in flux.")

f.StringVar(&l.PerTenantOverrideConfig, "limits.per-user-override-config", "", "File name of per-user overrides. [deprecated, use -runtime-config.file instead]")
f.DurationVar(&l.PerTenantOverridePeriod, "limits.per-user-override-period", 10*time.Second, "Period with which to reload the overrides. [deprecated, use -runtime-config.reload-period instead]")
Expand Down Expand Up @@ -282,6 +284,11 @@ func (o *Overrides) MaxQueryLength(userID string) time.Duration {
return o.getOverridesForUser(userID).MaxQueryLength
}

// MaxCacheFreshness returns the limit of the length (in time) of a query.
func (o *Overrides) MaxCacheFreshness(userID string) time.Duration {
return o.getOverridesForUser(userID).MaxCacheFreshness
}

// MaxQueryParallelism returns the limit to the number of sub-queries the
// frontend will process in parallel.
func (o *Overrides) MaxQueryParallelism(userID string) int {
Expand Down