Skip to content

FIFO cache to support eviction based on memory usage #2319

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Apr 3, 2020
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@
* [ENHANCEMENT] Output all config fields to /config API, including those with empty value. #2209
* [ENHANCEMENT] Add "missing_metric_name" and "metric_name_invalid" reasons to cortex_discarded_samples_total metric. #2346
* [ENHANCEMENT] Experimental TSDB: sample ingestion errors are now reported via existing `cortex_discarded_samples_total` metric. #2370
* [ENHANCEMENT] FIFO cache to support eviction based on memory usage. The `-<prefix>.fifocache.size` CLI flag has been renamed to `-<prefix>.fifocache.max-size-items` as well as its YAML config option `size` renamed to `max_size_items`. Added `-<prefix>.fifocache.max-size-bytes` CLI flag and YAML config option `max_size_bytes` to specify memory limit of the cache. #2319
* [BUGFIX] Ensure user state metrics are updated if a transfer fails. #2338
* [BUGFIX] Fixed etcd client keepalive settings. #2278
* [BUGFIX] Fixed bug in updating last element of FIFO cache. #2270
* [BUGFIX] Register the metrics of the WAL. #2295

### config file breaking changes
Expand Down
15 changes: 12 additions & 3 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2271,13 +2271,22 @@ The `fifo_cache_config` configures the local in-memory cache. The supported CLI
&nbsp;

```yaml
# The number of entries to cache.
# CLI flag: -<prefix>.fifocache.size
[size: <int> | default = 0]
# Maximum memory size of the cache.
# CLI flag: -<prefix>.fifocache.max-size-bytes
[max_size_bytes: <int> | default = 0]

# Maximum number of entries in the cache.
# CLI flag: -<prefix>.fifocache.max-size-items
[max_size_items: <int> | default = 0]

# The expiry duration for the cache.
# CLI flag: -<prefix>.fifocache.duration
[validity: <duration> | default = 0s]

# Deprecated (use max-size-items or max-size-bytes instead): The number of
# entries to cache.
# CLI flag: -<prefix>.fifocache.size
[size: <int> | default = 0]
```

### `configs_config`
Expand Down
6 changes: 4 additions & 2 deletions docs/production/caching.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ To enable the FIFO cache, use the following flags:
Enable in-memory cache.
-<prefix>.fifocache.duration duration
The expiry duration for the cache.
-<prefix>.fifocache.size int
The number of entries to cache.
-<prefix>.fifocache.max-size-bytes int
Maximum memory size of the cache.
-<prefix>.fifocache.max-size-items int
Maximum number of entries in the cache.
```

See [`fifo_cache_config` documentation](../configuration/config-file-reference.md#fifo-cache-config) if you use a config file with Cortex.
Expand Down
5 changes: 3 additions & 2 deletions pkg/chunk/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ func New(cfg Config) (Cache, error) {
cfg.Fifocache.Validity = cfg.DefaultValidity
}

cache := NewFifoCache(cfg.Prefix+"fifocache", cfg.Fifocache)
caches = append(caches, Instrument(cfg.Prefix+"fifocache", cache))
if cache := NewFifoCache(cfg.Prefix+"fifocache", cfg.Fifocache); cache != nil {
caches = append(caches, Instrument(cfg.Prefix+"fifocache", cache))
}
}

if (cfg.MemcacheClient.Host != "" || cfg.MemcacheClient.Addresses != "") && cfg.Redis.Endpoint != "" {
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunk/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestMemcache(t *testing.T) {
}

func TestFifoCache(t *testing.T) {
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{Size: 1e3, Validity: 1 * time.Hour})
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{MaxSizeItems: 1e3, Validity: 1 * time.Hour})
testCache(t, cache)
}

Expand Down
193 changes: 102 additions & 91 deletions pkg/chunk/cache/fifo_cache.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package cache

import (
"container/list"
"context"
"flag"
"sync"
"time"
"unsafe"

"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
)

var (
Expand Down Expand Up @@ -71,29 +74,40 @@ var (
}, []string{"cache"})
)

// This FIFO cache implementation supports two eviction methods - based on number of items in the cache, and based on memory usage.
// For the memory-based eviction, set FifoCacheConfig.MaxSizeBytes to a positive integer, indicating upper limit of memory allocated by items in the cache.
// Alternatively, set FifoCacheConfig.MaxSizeItems to a positive integer, indicating maximum number of items in the cache.
// If both parameters are set, both methods are enforced, whichever hits first.

// FifoCacheConfig holds config for the FifoCache.
type FifoCacheConfig struct {
Size int `yaml:"size"`
Validity time.Duration `yaml:"validity"`
MaxSizeBytes int `yaml:"max_size_bytes"`
MaxSizeItems int `yaml:"max_size_items"`
Validity time.Duration `yaml:"validity"`

DeprecatedSize int `yaml:"size"`
}

// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
func (cfg *FifoCacheConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) {
f.IntVar(&cfg.Size, prefix+"fifocache.size", 0, description+"The number of entries to cache.")
f.IntVar(&cfg.MaxSizeBytes, prefix+"fifocache.max-size-bytes", 0, description+"Maximum memory size of the cache.")
f.IntVar(&cfg.MaxSizeItems, prefix+"fifocache.max-size-items", 0, description+"Maximum number of entries in the cache.")
f.DurationVar(&cfg.Validity, prefix+"fifocache.duration", 0, description+"The expiry duration for the cache.")

f.IntVar(&cfg.DeprecatedSize, prefix+"fifocache.size", 0, "Deprecated (use max-size-items or max-size-bytes instead): "+description+"The number of entries to cache. ")
}

// FifoCache is a simple string -> interface{} cache which uses a fifo slide to
// manage evictions. O(1) inserts and updates, O(1) gets.
type FifoCache struct {
lock sync.RWMutex
size int
validity time.Duration
entries []cacheEntry
index map[string]int
lock sync.RWMutex
maxSizeItems int
maxSizeBytes int
currSizeBytes int
validity time.Duration

// indexes into entries to identify the most recent and least recent entry.
first, last int
entries map[string]*list.Element
lru *list.List

entriesAdded prometheus.Counter
entriesAddedNew prometheus.Counter
Expand All @@ -106,22 +120,32 @@ type FifoCache struct {
}

type cacheEntry struct {
updated time.Time
key string
value interface{}
prev, next int
updated time.Time
key string
value interface{}
}

// NewFifoCache returns a new initialised FifoCache of size.
// TODO(bwplotka): Fix metrics, get them out of globals, separate or allow prefixing.
func NewFifoCache(name string, cfg FifoCacheConfig) *FifoCache {
util.WarnExperimentalUse("In-memory (FIFO) cache")

cache := &FifoCache{
size: cfg.Size,
validity: cfg.Validity,
entries: make([]cacheEntry, 0, cfg.Size),
index: make(map[string]int, cfg.Size),
if cfg.DeprecatedSize > 0 {
flagext.DeprecatedFlagsUsed.Inc()
level.Warn(util.Logger).Log("msg", "running with DEPRECATED flag fifocache.size, use fifocache.max-size-items or fifocache.max-size-bytes instead", "cache", name)
cfg.MaxSizeItems = cfg.DeprecatedSize
}
if cfg.MaxSizeBytes == 0 && cfg.MaxSizeItems == 0 {
// zero cache capacity - no need to create cache
level.Warn(util.Logger).Log("msg", "neither fifocache.max-size-bytes nor fifocache.max-size-items is set", "cache", name)
return nil
}
return &FifoCache{
maxSizeItems: cfg.MaxSizeItems,
maxSizeBytes: cfg.MaxSizeBytes,
validity: cfg.Validity,
entries: make(map[string]*list.Element),
lru: list.New(),

// TODO(bwplotka): There might be simple cache.Cache wrapper for those.
entriesAdded: cacheEntriesAdded.WithLabelValues(name),
Expand All @@ -133,9 +157,6 @@ func NewFifoCache(name string, cfg FifoCacheConfig) *FifoCache {
staleGets: cacheStaleGets.WithLabelValues(name),
memoryBytes: cacheMemoryBytes.WithLabelValues(name),
}
// set initial memory allocation
cache.memoryBytes.Set(float64(int(unsafe.Sizeof(cacheEntry{})) * cache.size))
return cache
}

// Fetch implements Cache.
Expand All @@ -151,7 +172,6 @@ func (c *FifoCache) Fetch(ctx context.Context, keys []string) (found []string, b
found = append(found, key)
bufs = append(bufs, val.([]byte))
}

return
}

Expand All @@ -166,108 +186,94 @@ func (c *FifoCache) Store(ctx context.Context, keys []string, bufs [][]byte) {

// Stop implements Cache.
func (c *FifoCache) Stop() {
c.lock.Lock()
defer c.lock.Unlock()

c.entriesEvicted.Add(float64(c.lru.Len()))

c.entries = make(map[string]*list.Element)
c.lru.Init()
c.currSizeBytes = 0

c.entriesCurrent.Set(float64(0))
c.memoryBytes.Set(float64(0))
}

// Put stores the value against the key.
func (c *FifoCache) Put(ctx context.Context, keys []string, values []interface{}) {
c.entriesAdded.Inc()
if c.size == 0 {
return
}

c.lock.Lock()
defer c.lock.Unlock()

for i := range keys {
c.put(ctx, keys[i], values[i])
c.put(keys[i], values[i])
}
}

func (c *FifoCache) put(ctx context.Context, key string, value interface{}) {
// See if we already have the entry
index, ok := c.index[key]
func (c *FifoCache) put(key string, value interface{}) {
// See if we already have the item in the cache.
element, ok := c.entries[key]
if ok {
entry := c.entries[index]
deltaSize := sizeOf(value) - sizeOf(entry.value)

entry.updated = time.Now()
entry.value = value
// Remove the item from the cache.
entry := c.lru.Remove(element).(*cacheEntry)
delete(c.entries, key)
c.currSizeBytes -= sizeOf(entry)
c.entriesCurrent.Dec()
}

// Remove this entry from the FIFO linked-list.
c.entries[entry.prev].next = entry.next
c.entries[entry.next].prev = entry.prev
entry := &cacheEntry{
updated: time.Now(),
key: key,
value: value,
}
entrySz := sizeOf(entry)

// Corner case: updating last element
if c.last == index {
c.last = entry.prev
if c.maxSizeBytes > 0 && entrySz > c.maxSizeBytes {
// Cannot keep this item in the cache.
if ok {
// We do not replace this item.
c.entriesEvicted.Inc()
}

// Insert it at the beginning
entry.next = c.first
entry.prev = c.last
c.entries[entry.next].prev = index
c.entries[entry.prev].next = index
c.first = index

c.entries[index] = entry
c.memoryBytes.Add(float64(deltaSize))
c.memoryBytes.Set(float64(c.currSizeBytes))
return
}
c.entriesAddedNew.Inc()

// Otherwise, see if we need to evict an entry.
if len(c.entries) >= c.size {
// Otherwise, see if we need to evict item(s).
for (c.maxSizeBytes > 0 && c.currSizeBytes+entrySz > c.maxSizeBytes) || (c.maxSizeItems > 0 && len(c.entries) >= c.maxSizeItems) {
lastElement := c.lru.Back()
if lastElement == nil {
break
}
evicted := c.lru.Remove(lastElement).(*cacheEntry)
delete(c.entries, evicted.key)
c.currSizeBytes -= sizeOf(evicted)
c.entriesCurrent.Dec()
c.entriesEvicted.Inc()
index = c.last
entry := c.entries[index]
deltaSize := sizeOf(key) + sizeOf(value) - sizeOf(entry.key) - sizeOf(entry.value)

c.last = entry.prev
c.first = index
delete(c.index, entry.key)
c.index[key] = index

entry.updated = time.Now()
entry.value = value
entry.key = key
c.entries[index] = entry
c.memoryBytes.Add(float64(deltaSize))
return
}

// Finally, no hit and we have space.
index = len(c.entries)
c.entries = append(c.entries, cacheEntry{
updated: time.Now(),
key: key,
value: value,
prev: c.last,
next: c.first,
})
c.entries[c.first].prev = index
c.entries[c.last].next = index
c.first = index
c.index[key] = index

c.memoryBytes.Add(float64(sizeOf(key) + sizeOf(value)))
// Finally, we have space to add the item.
c.entries[key] = c.lru.PushFront(entry)
c.currSizeBytes += entrySz
if !ok {
c.entriesAddedNew.Inc()
}
c.entriesCurrent.Inc()
c.memoryBytes.Set(float64(c.currSizeBytes))
}

// Get returns the stored value against the key and when the key was last updated.
func (c *FifoCache) Get(ctx context.Context, key string) (interface{}, bool) {
c.totalGets.Inc()
if c.size == 0 {
return nil, false
}

c.lock.RLock()
defer c.lock.RUnlock()

index, ok := c.index[key]
element, ok := c.entries[key]
if ok {
updated := c.entries[index].updated
if c.validity == 0 || time.Since(updated) < c.validity {
return c.entries[index].value, true
entry := element.Value.(*cacheEntry)
if c.validity == 0 || time.Since(entry.updated) < c.validity {
return entry.value, true
}

c.totalMisses.Inc()
Expand All @@ -281,6 +287,11 @@ func (c *FifoCache) Get(ctx context.Context, key string) (interface{}, bool) {

func sizeOf(i interface{}) int {
switch v := i.(type) {
case *cacheEntry:
return int(unsafe.Sizeof(*v)) + // size of cacheEntry
sizeOf(v.value) + // size of entry.value
(2 * sizeOf(v.key)) + // counting key twice: in the cacheEntry and in the map
int(unsafe.Sizeof(v)) // size of *cacheEntry in the map
case string:
return len(v)
case []int8:
Expand Down
Loading